{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wno-orphans #-}
module Hindsight.Store.PostgreSQL.Projections.Sync (
SyncProjectionRegistry,
emptySyncProjectionRegistry,
registerSyncProjection,
executeSyncProjectionForEvent,
catchUpSyncProjections,
CatchUpError (..),
matchEventHandlers,
executeHandlerChain,
handleProjectionError,
)
where
import Control.Exception (Exception)
import Control.Monad (forM, forM_)
import Data.Aeson qualified as Aeson
import Data.Int (Int32, Int64)
import Data.Map qualified as Map
import Data.Proxy (Proxy (..))
import Data.Text (Text, pack)
import Data.Time (UTCTime)
import Data.Typeable (Typeable)
import Data.UUID (UUID)
import Data.Vector qualified as Vector
import Hasql.Pool (Pool)
import Hasql.Pool qualified as Pool
import Hasql.Statement (Statement)
import Hasql.TH
import Hasql.Transaction qualified as HasqlTransaction
import Hasql.Transaction.Sessions qualified as TransactionSession
import Hindsight.Events (Event)
import Hindsight.Projection (ProjectionError (..), ProjectionId (..), ProjectionResult (..))
import Hindsight.Projection.Matching (ProjectionHandler, ProjectionHandlers (..), SomeProjectionHandler (..), extractMatchingHandlers, handlersForEventName)
import Hindsight.Store (CorrelationId (..), EventEnvelope (..), EventId (..), StreamId (..), StreamVersion (..))
import Hindsight.Store.Parsing (parseStoredEventToEnvelope)
import Hindsight.Store.PostgreSQL.Core.Types (SQLCursor (..), SQLStore, SomeProjectionHandlers (..), SyncProjectionRegistry (..))
import Hindsight.Store.PostgreSQL.Projections.State (
SyncProjectionState (..),
getActiveProjections,
registerSyncProjectionInDb,
updateSyncProjectionState,
)
data CatchUpError
= ProjectionExecutionError ProjectionId Text
| DatabaseError Text
| NoActiveProjections
deriving (Int -> CatchUpError -> ShowS
[CatchUpError] -> ShowS
CatchUpError -> String
(Int -> CatchUpError -> ShowS)
-> (CatchUpError -> String)
-> ([CatchUpError] -> ShowS)
-> Show CatchUpError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> CatchUpError -> ShowS
showsPrec :: Int -> CatchUpError -> ShowS
$cshow :: CatchUpError -> String
show :: CatchUpError -> String
$cshowList :: [CatchUpError] -> ShowS
showList :: [CatchUpError] -> ShowS
Show, CatchUpError -> CatchUpError -> Bool
(CatchUpError -> CatchUpError -> Bool)
-> (CatchUpError -> CatchUpError -> Bool) -> Eq CatchUpError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: CatchUpError -> CatchUpError -> Bool
== :: CatchUpError -> CatchUpError -> Bool
$c/= :: CatchUpError -> CatchUpError -> Bool
/= :: CatchUpError -> CatchUpError -> Bool
Eq, Typeable)
instance Exception CatchUpError
data StoredEvent = StoredEvent
{ StoredEvent -> Int64
transactionNo :: Int64
, StoredEvent -> Int32
seqNo :: Int32
, StoredEvent -> StreamId
streamId :: StreamId
, StoredEvent -> EventId
eventId :: EventId
, StoredEvent -> UTCTime
createdAt :: UTCTime
, StoredEvent -> Maybe UUID
correlationId :: Maybe UUID
, StoredEvent -> Text
eventName :: Text
, StoredEvent -> Int32
eventVersion :: Int32
, StoredEvent -> Value
payload :: Aeson.Value
, StoredEvent -> Int64
streamVersion :: Int64
}
matchEventHandlers ::
forall event ts backend.
(Event event) =>
ProjectionHandlers ts backend ->
Proxy event ->
[ProjectionHandler event backend]
matchEventHandlers :: forall (event :: Symbol) (ts :: [Symbol]) backend.
Event event =>
ProjectionHandlers ts backend
-> Proxy event -> [ProjectionHandler event backend]
matchEventHandlers = ProjectionHandlers ts backend
-> Proxy event -> [ProjectionHandler event backend]
forall (event :: Symbol) (ts :: [Symbol]) backend.
Event event =>
ProjectionHandlers ts backend
-> Proxy event -> [ProjectionHandler event backend]
extractMatchingHandlers
executeHandlerChain ::
forall event backend.
(Event event) =>
[ProjectionHandler event backend] ->
EventEnvelope event backend ->
HasqlTransaction.Transaction [ProjectionResult]
executeHandlerChain :: forall (event :: Symbol) backend.
Event event =>
[ProjectionHandler event backend]
-> EventEnvelope event backend -> Transaction [ProjectionResult]
executeHandlerChain [] EventEnvelope event backend
_ = [ProjectionResult] -> Transaction [ProjectionResult]
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
executeHandlerChain (ProjectionHandler event backend
handler : [ProjectionHandler event backend]
rest) EventEnvelope event backend
envelope = do
result <- do
ProjectionHandler event backend
handler EventEnvelope event backend
envelope
ProjectionResult -> Transaction ProjectionResult
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ProjectionResult
ProjectionSuccess
results <- executeHandlerChain rest envelope
pure (result : results)
handleProjectionError :: ProjectionError -> HasqlTransaction.Transaction ProjectionResult
handleProjectionError :: ProjectionError -> Transaction ProjectionResult
handleProjectionError ProjectionError
err = do
Transaction ()
HasqlTransaction.condemn
ProjectionResult -> Transaction ProjectionResult
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ProjectionError -> ProjectionResult
ProjectionError ProjectionError
err)
emptySyncProjectionRegistry :: SyncProjectionRegistry
emptySyncProjectionRegistry :: SyncProjectionRegistry
emptySyncProjectionRegistry = Map ProjectionId (SomeProjectionHandlers SQLStore)
-> SyncProjectionRegistry
SyncProjectionRegistry Map ProjectionId (SomeProjectionHandlers SQLStore)
forall k a. Map k a
Map.empty
registerSyncProjection ::
ProjectionId ->
ProjectionHandlers ts SQLStore ->
SyncProjectionRegistry ->
SyncProjectionRegistry
registerSyncProjection :: forall (ts :: [Symbol]).
ProjectionId
-> ProjectionHandlers ts SQLStore
-> SyncProjectionRegistry
-> SyncProjectionRegistry
registerSyncProjection ProjectionId
projId ProjectionHandlers ts SQLStore
handlers (SyncProjectionRegistry Map ProjectionId (SomeProjectionHandlers SQLStore)
reg) =
let newReg :: Map ProjectionId (SomeProjectionHandlers SQLStore)
newReg = ProjectionId
-> SomeProjectionHandlers SQLStore
-> Map ProjectionId (SomeProjectionHandlers SQLStore)
-> Map ProjectionId (SomeProjectionHandlers SQLStore)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert ProjectionId
projId (ProjectionHandlers ts SQLStore -> SomeProjectionHandlers SQLStore
forall backend (ts :: [Symbol]).
ProjectionHandlers ts backend -> SomeProjectionHandlers backend
SomeProjectionHandlers ProjectionHandlers ts SQLStore
handlers) Map ProjectionId (SomeProjectionHandlers SQLStore)
reg
in Map ProjectionId (SomeProjectionHandlers SQLStore)
-> SyncProjectionRegistry
SyncProjectionRegistry Map ProjectionId (SomeProjectionHandlers SQLStore)
newReg
executeSyncProjectionForEvent ::
forall event.
(Event event) =>
SyncProjectionRegistry ->
Proxy event ->
EventEnvelope event SQLStore ->
HasqlTransaction.Transaction ()
executeSyncProjectionForEvent :: forall (event :: Symbol).
Event event =>
SyncProjectionRegistry
-> Proxy event -> EventEnvelope event SQLStore -> Transaction ()
executeSyncProjectionForEvent (SyncProjectionRegistry Map ProjectionId (SomeProjectionHandlers SQLStore)
reg) Proxy event
eventProxy EventEnvelope event SQLStore
envelope = do
[SomeProjectionHandlers SQLStore]
-> (SomeProjectionHandlers SQLStore -> Transaction ())
-> Transaction ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map ProjectionId (SomeProjectionHandlers SQLStore)
-> [SomeProjectionHandlers SQLStore]
forall k a. Map k a -> [a]
Map.elems Map ProjectionId (SomeProjectionHandlers SQLStore)
reg) ((SomeProjectionHandlers SQLStore -> Transaction ())
-> Transaction ())
-> (SomeProjectionHandlers SQLStore -> Transaction ())
-> Transaction ()
forall a b. (a -> b) -> a -> b
$ \(SomeProjectionHandlers ProjectionHandlers ts SQLStore
handlers) ->
ProjectionHandlers ts SQLStore
-> Proxy event -> EventEnvelope event SQLStore -> Transaction ()
forall (event :: Symbol) (ts :: [Symbol]).
Event event =>
ProjectionHandlers ts SQLStore
-> Proxy event -> EventEnvelope event SQLStore -> Transaction ()
processHandlersWithCommonLogic ProjectionHandlers ts SQLStore
handlers Proxy event
eventProxy EventEnvelope event SQLStore
envelope
processHandlersWithCommonLogic ::
forall event ts.
(Event event) =>
ProjectionHandlers ts SQLStore ->
Proxy event ->
EventEnvelope event SQLStore ->
HasqlTransaction.Transaction ()
processHandlersWithCommonLogic :: forall (event :: Symbol) (ts :: [Symbol]).
Event event =>
ProjectionHandlers ts SQLStore
-> Proxy event -> EventEnvelope event SQLStore -> Transaction ()
processHandlersWithCommonLogic ProjectionHandlers ts SQLStore
handlers Proxy event
eventProxy EventEnvelope event SQLStore
envelope = do
let matchingHandlers :: [ProjectionHandler event SQLStore]
matchingHandlers = ProjectionHandlers ts SQLStore
-> Proxy event -> [ProjectionHandler event SQLStore]
forall (event :: Symbol) (ts :: [Symbol]) backend.
Event event =>
ProjectionHandlers ts backend
-> Proxy event -> [ProjectionHandler event backend]
matchEventHandlers ProjectionHandlers ts SQLStore
handlers Proxy event
eventProxy
results <- [ProjectionHandler event SQLStore]
-> EventEnvelope event SQLStore -> Transaction [ProjectionResult]
forall (event :: Symbol) backend.
Event event =>
[ProjectionHandler event backend]
-> EventEnvelope event backend -> Transaction [ProjectionResult]
executeHandlerChain [ProjectionHandler event SQLStore]
matchingHandlers EventEnvelope event SQLStore
envelope
sequence_ [handleProjectionError err | ProjectionError err <- results]
catchUpSyncProjections ::
Pool ->
SyncProjectionRegistry ->
IO (Either CatchUpError ())
catchUpSyncProjections :: Pool -> SyncProjectionRegistry -> IO (Either CatchUpError ())
catchUpSyncProjections Pool
pool registry :: SyncProjectionRegistry
registry@(SyncProjectionRegistry Map ProjectionId (SomeProjectionHandlers SQLStore)
regMap) = do
if Map ProjectionId (SomeProjectionHandlers SQLStore) -> Bool
forall k a. Map k a -> Bool
Map.null Map ProjectionId (SomeProjectionHandlers SQLStore)
regMap
then Either CatchUpError () -> IO (Either CatchUpError ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either CatchUpError ()
forall a b. b -> Either a b
Right ())
else do
result <-
Pool
-> Session (Either CatchUpError ())
-> IO (Either UsageError (Either CatchUpError ()))
forall a. Pool -> Session a -> IO (Either UsageError a)
Pool.use Pool
pool (Session (Either CatchUpError ())
-> IO (Either UsageError (Either CatchUpError ())))
-> Session (Either CatchUpError ())
-> IO (Either UsageError (Either CatchUpError ()))
forall a b. (a -> b) -> a -> b
$
IsolationLevel
-> Mode
-> Transaction (Either CatchUpError ())
-> Session (Either CatchUpError ())
forall a. IsolationLevel -> Mode -> Transaction a -> Session a
TransactionSession.transaction
IsolationLevel
TransactionSession.ReadCommitted
Mode
TransactionSession.Write
(SyncProjectionRegistry -> Transaction (Either CatchUpError ())
catchUpTransaction SyncProjectionRegistry
registry)
case result of
Left UsageError
err -> Either CatchUpError () -> IO (Either CatchUpError ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either CatchUpError () -> IO (Either CatchUpError ()))
-> Either CatchUpError () -> IO (Either CatchUpError ())
forall a b. (a -> b) -> a -> b
$ CatchUpError -> Either CatchUpError ()
forall a b. a -> Either a b
Left (CatchUpError -> Either CatchUpError ())
-> CatchUpError -> Either CatchUpError ()
forall a b. (a -> b) -> a -> b
$ Text -> CatchUpError
DatabaseError (Text -> CatchUpError) -> Text -> CatchUpError
forall a b. (a -> b) -> a -> b
$ String -> Text
pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ UsageError -> String
forall a. Show a => a -> String
show UsageError
err
Right Either CatchUpError ()
res -> Either CatchUpError () -> IO (Either CatchUpError ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either CatchUpError ()
res
catchUpTransaction :: SyncProjectionRegistry -> HasqlTransaction.Transaction (Either CatchUpError ())
catchUpTransaction :: SyncProjectionRegistry -> Transaction (Either CatchUpError ())
catchUpTransaction registry :: SyncProjectionRegistry
registry@(SyncProjectionRegistry Map ProjectionId (SomeProjectionHandlers SQLStore)
regMap) = do
[ProjectionId]
-> (ProjectionId -> Transaction ()) -> Transaction ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map ProjectionId (SomeProjectionHandlers SQLStore)
-> [ProjectionId]
forall k a. Map k a -> [k]
Map.keys Map ProjectionId (SomeProjectionHandlers SQLStore)
regMap) ((ProjectionId -> Transaction ()) -> Transaction ())
-> (ProjectionId -> Transaction ()) -> Transaction ()
forall a b. (a -> b) -> a -> b
$ \ProjectionId
projId ->
ProjectionId -> Transaction ()
registerSyncProjectionInDb ProjectionId
projId
activeProjections <- Transaction [SyncProjectionState]
getActiveProjections
if null activeProjections
then do
let allProjections = Map ProjectionId (SomeProjectionHandlers SQLStore)
-> [ProjectionId]
forall k a. Map k a -> [k]
Map.keys Map ProjectionId (SomeProjectionHandlers SQLStore)
regMap
results <- forM allProjections $ \ProjectionId
projId ->
SyncProjectionRegistry
-> ProjectionId
-> SQLCursor
-> Transaction (Either CatchUpError ())
catchUpProjection SyncProjectionRegistry
registry ProjectionId
projId (Int64 -> Int32 -> SQLCursor
SQLCursor Int64
0 Int32
0)
let errors = [CatchUpError
err | Left CatchUpError
err <- [Either CatchUpError ()]
results]
if null errors
then pure $ Right ()
else case errors of
(CatchUpError
err : [CatchUpError]
_) -> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either CatchUpError () -> Transaction (Either CatchUpError ()))
-> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a b. (a -> b) -> a -> b
$ CatchUpError -> Either CatchUpError ()
forall a b. a -> Either a b
Left CatchUpError
err
[] -> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either CatchUpError () -> Transaction (Either CatchUpError ()))
-> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a b. (a -> b) -> a -> b
$ () -> Either CatchUpError ()
forall a b. b -> Either a b
Right ()
else do
results <- forM activeProjections $ \SyncProjectionState
projState -> do
let cursor :: SQLCursor
cursor =
Int64 -> Int32 -> SQLCursor
SQLCursor
SyncProjectionState
projState.lastProcessedTransactionNo
SyncProjectionState
projState.lastProcessedSeqNo
SyncProjectionRegistry
-> ProjectionId
-> SQLCursor
-> Transaction (Either CatchUpError ())
catchUpProjection SyncProjectionRegistry
registry SyncProjectionState
projState.projectionId SQLCursor
cursor
let errors = [CatchUpError
err | Left CatchUpError
err <- [Either CatchUpError ()]
results]
if null errors
then pure $ Right ()
else case errors of
(CatchUpError
err : [CatchUpError]
_) -> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either CatchUpError () -> Transaction (Either CatchUpError ()))
-> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a b. (a -> b) -> a -> b
$ CatchUpError -> Either CatchUpError ()
forall a b. a -> Either a b
Left CatchUpError
err
[] -> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either CatchUpError () -> Transaction (Either CatchUpError ()))
-> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a b. (a -> b) -> a -> b
$ () -> Either CatchUpError ()
forall a b. b -> Either a b
Right ()
catchUpProjection ::
SyncProjectionRegistry ->
ProjectionId ->
SQLCursor ->
HasqlTransaction.Transaction (Either CatchUpError ())
catchUpProjection :: SyncProjectionRegistry
-> ProjectionId
-> SQLCursor
-> Transaction (Either CatchUpError ())
catchUpProjection (SyncProjectionRegistry Map ProjectionId (SomeProjectionHandlers SQLStore)
regMap) ProjectionId
projId SQLCursor
cursor = do
case ProjectionId
-> Map ProjectionId (SomeProjectionHandlers SQLStore)
-> Maybe (SomeProjectionHandlers SQLStore)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup ProjectionId
projId Map ProjectionId (SomeProjectionHandlers SQLStore)
regMap of
Maybe (SomeProjectionHandlers SQLStore)
Nothing -> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either CatchUpError () -> Transaction (Either CatchUpError ()))
-> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a b. (a -> b) -> a -> b
$ () -> Either CatchUpError ()
forall a b. b -> Either a b
Right ()
Just (SomeProjectionHandlers ProjectionHandlers ts SQLStore
handlers) -> do
events <- SQLCursor -> Transaction [StoredEvent]
getUnprocessedEvents SQLCursor
cursor
forM_ events $ \StoredEvent
storedEvent -> do
ProjectionHandlers ts SQLStore -> StoredEvent -> Transaction ()
forall (ts :: [Symbol]).
ProjectionHandlers ts SQLStore -> StoredEvent -> Transaction ()
processStoredEvent ProjectionHandlers ts SQLStore
handlers StoredEvent
storedEvent
let eventCursor :: SQLCursor
eventCursor = Int64 -> Int32 -> SQLCursor
SQLCursor StoredEvent
storedEvent.transactionNo StoredEvent
storedEvent.seqNo
ProjectionId -> SQLCursor -> Transaction ()
updateSyncProjectionState ProjectionId
projId SQLCursor
eventCursor
pure $ Right ()
processStoredEvent ::
forall ts.
ProjectionHandlers ts SQLStore ->
StoredEvent ->
HasqlTransaction.Transaction ()
processStoredEvent :: forall (ts :: [Symbol]).
ProjectionHandlers ts SQLStore -> StoredEvent -> Transaction ()
processStoredEvent ProjectionHandlers ts SQLStore
handlers StoredEvent
storedEvent = do
let matchingHandlers :: [SomeProjectionHandler SQLStore]
matchingHandlers = Text
-> ProjectionHandlers ts SQLStore
-> [SomeProjectionHandler SQLStore]
forall (ts :: [Symbol]) backend.
Text
-> ProjectionHandlers ts backend -> [SomeProjectionHandler backend]
handlersForEventName StoredEvent
storedEvent.eventName ProjectionHandlers ts SQLStore
handlers
[SomeProjectionHandler SQLStore]
-> (SomeProjectionHandler SQLStore -> Transaction ())
-> Transaction ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [SomeProjectionHandler SQLStore]
matchingHandlers ((SomeProjectionHandler SQLStore -> Transaction ())
-> Transaction ())
-> (SomeProjectionHandler SQLStore -> Transaction ())
-> Transaction ()
forall a b. (a -> b) -> a -> b
$ \(SomeProjectionHandler Proxy event
eventProxy ProjectionHandler event SQLStore
handler) -> do
case Proxy event
-> EventId
-> StreamId
-> Cursor SQLStore
-> StreamVersion
-> Maybe CorrelationId
-> UTCTime
-> Value
-> Integer
-> Maybe (EventEnvelope event SQLStore)
forall (event :: Symbol) backend.
Event event =>
Proxy event
-> EventId
-> StreamId
-> Cursor backend
-> StreamVersion
-> Maybe CorrelationId
-> UTCTime
-> Value
-> Integer
-> Maybe (EventEnvelope event backend)
parseStoredEventToEnvelope
Proxy event
eventProxy
StoredEvent
storedEvent.eventId
StoredEvent
storedEvent.streamId
(Int64 -> Int32 -> SQLCursor
SQLCursor StoredEvent
storedEvent.transactionNo StoredEvent
storedEvent.seqNo)
(Int64 -> StreamVersion
StreamVersion StoredEvent
storedEvent.streamVersion)
(UUID -> CorrelationId
CorrelationId (UUID -> CorrelationId) -> Maybe UUID -> Maybe CorrelationId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StoredEvent
storedEvent.correlationId)
StoredEvent
storedEvent.createdAt
StoredEvent
storedEvent.payload
(Int32 -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral StoredEvent
storedEvent.eventVersion) of
Just EventEnvelope event SQLStore
envelope -> ProjectionHandler event SQLStore
handler EventEnvelope event SQLStore
envelope
Maybe (EventEnvelope event SQLStore)
Nothing -> () -> Transaction ()
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
getUnprocessedEvents ::
SQLCursor ->
HasqlTransaction.Transaction [StoredEvent]
getUnprocessedEvents :: SQLCursor -> Transaction [StoredEvent]
getUnprocessedEvents (SQLCursor Int64
lastTxNo Int32
lastSeqNo) = do
results <- (Int64, Int32)
-> Statement
(Int64, Int32)
(Vector
(Int64, Int32, UUID, UUID, UTCTime, Maybe UUID, Text, Int32, Value,
Int64))
-> Transaction
(Vector
(Int64, Int32, UUID, UUID, UTCTime, Maybe UUID, Text, Int32, Value,
Int64))
forall a b. a -> Statement a b -> Transaction b
HasqlTransaction.statement (Int64
lastTxNo, Int32
lastSeqNo) Statement
(Int64, Int32)
(Vector
(Int64, Int32, UUID, UUID, UTCTime, Maybe UUID, Text, Int32, Value,
Int64))
getEventsStmt
pure $ map toStoredEvent $ Vector.toList results
where
getEventsStmt :: Statement (Int64, Int32) (Vector.Vector (Int64, Int32, UUID, UUID, UTCTime, Maybe UUID, Text, Int32, Aeson.Value, Int64))
getEventsStmt :: Statement
(Int64, Int32)
(Vector
(Int64, Int32, UUID, UUID, UTCTime, Maybe UUID, Text, Int32, Value,
Int64))
getEventsStmt =
[vectorStatement|
SELECT
e.transaction_no :: int8,
e.seq_no :: int4,
e.stream_id :: uuid,
e.event_id :: uuid,
e.created_at :: timestamptz,
e.correlation_id :: uuid?,
e.event_name :: text,
e.event_version :: int4,
e.payload :: jsonb,
e.stream_version :: int8
FROM events e
WHERE (e.transaction_no > $1 :: int8)
OR (e.transaction_no = $1 :: int8 AND e.seq_no > $2 :: int4)
ORDER BY e.transaction_no, e.seq_no
LIMIT 1000
|]
toStoredEvent :: (Int64, Int32, UUID, UUID, UTCTime, Maybe UUID, Text, Int32, Value,
Int64)
-> StoredEvent
toStoredEvent (Int64
txNo, Int32
seqNo, UUID
streamId, UUID
eventId, UTCTime
createdAt, Maybe UUID
corrId, Text
eventName, Int32
eventVersion, Value
payload, Int64
streamVer) =
StoredEvent
{ transactionNo :: Int64
transactionNo = Int64
txNo
, seqNo :: Int32
seqNo = Int32
seqNo
, streamId :: StreamId
streamId = UUID -> StreamId
StreamId UUID
streamId
, eventId :: EventId
eventId = UUID -> EventId
EventId UUID
eventId
, createdAt :: UTCTime
createdAt = UTCTime
createdAt
, correlationId :: Maybe UUID
correlationId = Maybe UUID
corrId
, eventName :: Text
eventName = Text
eventName
, eventVersion :: Int32
eventVersion = Int32
eventVersion
, payload :: Value
payload = Value
payload
, streamVersion :: Int64
streamVersion = Int64
streamVer
}