{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
module Hindsight.Store.PostgreSQL.Events.Subscription (
startNotifier,
shutdownNotifier,
subscribe,
RetryPolicy (..),
RetryConfig (..),
) where
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (async, cancel, wait)
import Control.Concurrent.STM (
TChan,
atomically,
dupTChan,
newBroadcastTChanIO,
readTChan,
writeTChan,
)
import Control.Exception (AsyncException, Exception, SomeException, finally, try)
import Control.Monad (forever)
import Data.Aeson (Value)
import Data.Aeson.Types qualified as Aeson
import Data.Functor.Contravariant (contramap)
import Data.Int (Int32, Int64)
import Data.Map.Strict qualified as Map
import Data.Maybe (listToMaybe)
import Data.Text (Text, isInfixOf, pack)
import Data.Text qualified as T
import Data.Text.Encoding (decodeUtf8)
import Data.Time (UTCTime)
import Data.UUID (UUID)
import GHC.Generics (Generic)
import Hasql.Connection qualified as Connection
import Hasql.Connection.Setting qualified as ConnectionSetting
import Hasql.Connection.Setting.Connection qualified as ConnectionSettingConnection
import Hasql.Decoders qualified as D
import Hasql.Encoders qualified as E
import Hasql.Notifications qualified as Notifications
import Hasql.Pool (UsageError (..))
import Hasql.Pool qualified as Pool
import Hasql.Session qualified as Session
import Hasql.Statement (Statement (..))
import Hindsight.Events
import Hindsight.Store hiding (subscribe)
import Hindsight.Store qualified as Store
import Hindsight.Store.PostgreSQL.Core.Types
import UnliftIO (
MonadIO,
MonadUnliftIO (..),
askRunInIO,
liftIO,
newIORef,
readIORef,
writeIORef,
)
import UnliftIO.Exception (catch, throwIO)
data SubscriptionFailure = SubscriptionFailure
{ SubscriptionFailure -> Text
failureReason :: Text
, SubscriptionFailure -> UsageError
underlyingError :: UsageError
}
deriving (Int -> SubscriptionFailure -> ShowS
[SubscriptionFailure] -> ShowS
SubscriptionFailure -> String
(Int -> SubscriptionFailure -> ShowS)
-> (SubscriptionFailure -> String)
-> ([SubscriptionFailure] -> ShowS)
-> Show SubscriptionFailure
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SubscriptionFailure -> ShowS
showsPrec :: Int -> SubscriptionFailure -> ShowS
$cshow :: SubscriptionFailure -> String
show :: SubscriptionFailure -> String
$cshowList :: [SubscriptionFailure] -> ShowS
showList :: [SubscriptionFailure] -> ShowS
Show, (forall x. SubscriptionFailure -> Rep SubscriptionFailure x)
-> (forall x. Rep SubscriptionFailure x -> SubscriptionFailure)
-> Generic SubscriptionFailure
forall x. Rep SubscriptionFailure x -> SubscriptionFailure
forall x. SubscriptionFailure -> Rep SubscriptionFailure x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. SubscriptionFailure -> Rep SubscriptionFailure x
from :: forall x. SubscriptionFailure -> Rep SubscriptionFailure x
$cto :: forall x. Rep SubscriptionFailure x -> SubscriptionFailure
to :: forall x. Rep SubscriptionFailure x -> SubscriptionFailure
Generic)
instance Exception SubscriptionFailure
data RetryPolicy = RetryPolicy
{ RetryPolicy -> Int
maxRetries :: Int
, RetryPolicy -> Int
baseDelayMs :: Int
, RetryPolicy -> Int
maxDelayMs :: Int
, RetryPolicy -> Double
backoffMultiplier :: Double
, RetryPolicy -> Double
jitterPercent :: Double
}
deriving (Int -> RetryPolicy -> ShowS
[RetryPolicy] -> ShowS
RetryPolicy -> String
(Int -> RetryPolicy -> ShowS)
-> (RetryPolicy -> String)
-> ([RetryPolicy] -> ShowS)
-> Show RetryPolicy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RetryPolicy -> ShowS
showsPrec :: Int -> RetryPolicy -> ShowS
$cshow :: RetryPolicy -> String
show :: RetryPolicy -> String
$cshowList :: [RetryPolicy] -> ShowS
showList :: [RetryPolicy] -> ShowS
Show, RetryPolicy -> RetryPolicy -> Bool
(RetryPolicy -> RetryPolicy -> Bool)
-> (RetryPolicy -> RetryPolicy -> Bool) -> Eq RetryPolicy
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RetryPolicy -> RetryPolicy -> Bool
== :: RetryPolicy -> RetryPolicy -> Bool
$c/= :: RetryPolicy -> RetryPolicy -> Bool
/= :: RetryPolicy -> RetryPolicy -> Bool
Eq, (forall x. RetryPolicy -> Rep RetryPolicy x)
-> (forall x. Rep RetryPolicy x -> RetryPolicy)
-> Generic RetryPolicy
forall x. Rep RetryPolicy x -> RetryPolicy
forall x. RetryPolicy -> Rep RetryPolicy x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. RetryPolicy -> Rep RetryPolicy x
from :: forall x. RetryPolicy -> Rep RetryPolicy x
$cto :: forall x. Rep RetryPolicy x -> RetryPolicy
to :: forall x. Rep RetryPolicy x -> RetryPolicy
Generic)
data RetryConfig = RetryConfig
{ RetryConfig -> Maybe RetryPolicy
connectionRetryPolicy :: Maybe RetryPolicy
, RetryConfig -> Maybe RetryPolicy
sessionRetryPolicy :: Maybe RetryPolicy
, RetryConfig -> Maybe RetryPolicy
timeoutRetryPolicy :: Maybe RetryPolicy
}
deriving (Int -> RetryConfig -> ShowS
[RetryConfig] -> ShowS
RetryConfig -> String
(Int -> RetryConfig -> ShowS)
-> (RetryConfig -> String)
-> ([RetryConfig] -> ShowS)
-> Show RetryConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RetryConfig -> ShowS
showsPrec :: Int -> RetryConfig -> ShowS
$cshow :: RetryConfig -> String
show :: RetryConfig -> String
$cshowList :: [RetryConfig] -> ShowS
showList :: [RetryConfig] -> ShowS
Show, RetryConfig -> RetryConfig -> Bool
(RetryConfig -> RetryConfig -> Bool)
-> (RetryConfig -> RetryConfig -> Bool) -> Eq RetryConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RetryConfig -> RetryConfig -> Bool
== :: RetryConfig -> RetryConfig -> Bool
$c/= :: RetryConfig -> RetryConfig -> Bool
/= :: RetryConfig -> RetryConfig -> Bool
Eq, (forall x. RetryConfig -> Rep RetryConfig x)
-> (forall x. Rep RetryConfig x -> RetryConfig)
-> Generic RetryConfig
forall x. Rep RetryConfig x -> RetryConfig
forall x. RetryConfig -> Rep RetryConfig x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. RetryConfig -> Rep RetryConfig x
from :: forall x. RetryConfig -> Rep RetryConfig x
$cto :: forall x. Rep RetryConfig x -> RetryConfig
to :: forall x. Rep RetryConfig x -> RetryConfig
Generic)
shouldCrashImmediately :: UsageError -> Bool
shouldCrashImmediately :: UsageError -> Bool
shouldCrashImmediately UsageError
err = case UsageError
err of
ConnectionUsageError ConnectionError
connErr | Text
"too many clients already" Text -> Text -> Bool
`isInfixOf` (String -> Text
pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ ConnectionError -> String
forall a. Show a => a -> String
show ConnectionError
connErr) -> Bool
True
UsageError
_ -> Bool
False
createSubscriptionFailure :: UsageError -> SubscriptionFailure
createSubscriptionFailure :: UsageError -> SubscriptionFailure
createSubscriptionFailure UsageError
err = Text -> UsageError -> SubscriptionFailure
SubscriptionFailure Text
reason UsageError
err
where
reason :: Text
reason = case UsageError
err of
ConnectionUsageError ConnectionError
connErr
| Text
"too many clients already" Text -> Text -> Bool
`isInfixOf` (String -> Text
pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ ConnectionError -> String
forall a. Show a => a -> String
show ConnectionError
connErr) ->
Text
"Connection pool exhausted - too many concurrent subscriptions"
ConnectionUsageError ConnectionError
_ ->
Text
"Database connection failed"
SessionUsageError SessionError
_ ->
Text
"Database query or session error"
UsageError
AcquisitionTimeoutUsageError ->
Text
"Failed to acquire database connection within timeout"
startNotifier :: (MonadIO m) => ByteString -> m Notifier
startNotifier :: forall (m :: * -> *). MonadIO m => ByteString -> m Notifier
startNotifier ByteString
connectionString = IO Notifier -> m Notifier
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Notifier -> m Notifier) -> IO Notifier -> m Notifier
forall a b. (a -> b) -> a -> b
$ do
chan <- IO (TChan ())
forall a. IO (TChan a)
newBroadcastTChanIO
thread <- async $ notifierLoop connectionString chan
pure $ Notifier chan thread
shutdownNotifier :: Notifier -> IO ()
shutdownNotifier :: Notifier -> IO ()
shutdownNotifier Notifier
notifier = do
Async () -> IO ()
forall a. Async a -> IO ()
cancel (Notifier -> Async ()
notifierThread Notifier
notifier)
_ <- forall e a. Exception e => IO a -> IO (Either e a)
try @SomeException (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ Async () -> IO ()
forall a. Async a -> IO a
wait (Notifier -> Async ()
notifierThread Notifier
notifier)
pure ()
notifierLoop :: ByteString -> TChan () -> IO ()
notifierLoop :: ByteString -> TChan () -> IO ()
notifierLoop ByteString
connectionString TChan ()
chan =
( IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
eConn <- [Setting] -> IO (Either ConnectionError Connection)
Connection.acquire [Connection -> Setting
ConnectionSetting.connection (Connection -> Setting) -> Connection -> Setting
forall a b. (a -> b) -> a -> b
$ Text -> Connection
ConnectionSettingConnection.string (ByteString -> Text
decodeUtf8 ByteString
connectionString)]
case eConn of
Left ConnectionError
_err -> do
Int -> IO ()
threadDelay Int
1000000
Right Connection
conn -> do
let cleanup :: IO ()
cleanup = Connection -> IO ()
Connection.release Connection
conn
listen :: IO ()
listen = Connection -> PgIdentifier -> IO ()
Notifications.listen Connection
conn (Text -> PgIdentifier
Notifications.toPgIdentifier Text
"event_store_transaction")
handler :: ByteString -> ByteString -> IO ()
handler = \ByteString
_ ByteString
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan () -> () -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan ()
chan ()
waitForNotification :: IO ()
waitForNotification = (ByteString -> ByteString -> IO ()) -> Connection -> IO ()
Notifications.waitForNotifications ByteString -> ByteString -> IO ()
handler Connection
conn
(IO ()
listen IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
waitForNotification) IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`finally` IO ()
cleanup
)
IO () -> (AsyncException -> IO ()) -> IO ()
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(AsyncException
_e :: AsyncException) -> do
() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
subscribe ::
forall m ts.
(MonadUnliftIO m) =>
SQLStoreHandle ->
EventMatcher ts SQLStore m ->
EventSelector SQLStore ->
m (Store.SubscriptionHandle SQLStore)
subscribe :: forall (m :: * -> *) (ts :: [Symbol]).
MonadUnliftIO m =>
SQLStoreHandle
-> EventMatcher ts SQLStore m
-> EventSelector SQLStore
-> m (SubscriptionHandle SQLStore)
subscribe SQLStoreHandle
handle EventMatcher ts SQLStore m
matcher EventSelector SQLStore
selector = do
runInIO <- m (m () -> IO ())
forall (m :: * -> *) a. MonadUnliftIO m => m (m a -> IO a)
askRunInIO
liftIO $ do
tickChannel <- atomically $ dupTChan (notifierChannel (notifier handle))
let initialCursor = case EventSelector SQLStore
selector.startupPosition of
StartupPosition SQLStore
FromBeginning -> Int64 -> Int32 -> SQLCursor
SQLCursor (-Int64
1) (-Int32
1)
FromLastProcessed Cursor SQLStore
cursor -> Cursor SQLStore
SQLCursor
cursor
workerThread <- async $ runInIO $ workerLoop (pool handle) tickChannel initialCursor matcher selector
pure $
Store.SubscriptionHandle
{ cancel = cancel workerThread
, wait = wait workerThread
}
workerLoop ::
(MonadUnliftIO m) =>
Pool ->
TChan () ->
SQLCursor ->
EventMatcher ts SQLStore m ->
EventSelector SQLStore ->
m ()
workerLoop :: forall (m :: * -> *) (ts :: [Symbol]).
MonadUnliftIO m =>
Pool
-> TChan ()
-> SQLCursor
-> EventMatcher ts SQLStore m
-> EventSelector SQLStore
-> m ()
workerLoop Pool
pool TChan ()
tickChannel SQLCursor
initialCursor EventMatcher ts SQLStore m
matcher EventSelector SQLStore
selector = do
cursorRef <- SQLCursor -> m (IORef SQLCursor)
forall (m :: * -> *) a. MonadIO m => a -> m (IORef a)
newIORef SQLCursor
initialCursor
let batchSize = Int
1000
let loop = do
cursor <- IORef SQLCursor -> m SQLCursor
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef IORef SQLCursor
cursorRef
batch <- fetchEventBatch pool cursor batchSize selector
if null batch
then do
liftIO $ atomically $ readTChan tickChannel
loop
else do
(newCursor, shouldContinue) <- processEventBatch matcher batch
writeIORef cursorRef newCursor
if shouldContinue
then loop
else pure ()
loop
data EventData = EventData
{ EventData -> Int64
transactionNo :: Int64
, EventData -> Int32
seqNo :: Int32
, EventData -> UUID
eventId :: UUID
, EventData -> UUID
streamId :: UUID
, EventData -> Maybe UUID
correlationId :: Maybe UUID
, EventData -> UTCTime
createdAt :: UTCTime
, EventData -> Text
eventName :: Text
, EventData -> Int32
eventVersion :: Int32
, EventData -> Value
payload :: Value
, EventData -> Int64
streamVersion :: Int64
}
deriving (Int -> EventData -> ShowS
[EventData] -> ShowS
EventData -> String
(Int -> EventData -> ShowS)
-> (EventData -> String)
-> ([EventData] -> ShowS)
-> Show EventData
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> EventData -> ShowS
showsPrec :: Int -> EventData -> ShowS
$cshow :: EventData -> String
show :: EventData -> String
$cshowList :: [EventData] -> ShowS
showList :: [EventData] -> ShowS
Show)
fetchEventBatch ::
(MonadIO m) =>
Pool ->
SQLCursor ->
Int ->
EventSelector SQLStore ->
m [EventData]
fetchEventBatch :: forall (m :: * -> *).
MonadIO m =>
Pool -> SQLCursor -> Int -> EventSelector SQLStore -> m [EventData]
fetchEventBatch Pool
pool SQLCursor
cursor Int
limit EventSelector SQLStore
selector = IO [EventData] -> m [EventData]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [EventData] -> m [EventData])
-> IO [EventData] -> m [EventData]
forall a b. (a -> b) -> a -> b
$ do
let (ByteString
sql, Params (SQLCursor, Int, EventSelector SQLStore)
params) = case EventSelector SQLStore
selector.streamId of
StreamSelector
AllStreams -> (ByteString
allStreamsSql, Params (SQLCursor, Int, EventSelector SQLStore)
allStreamsEncoder)
SingleStream StreamId
_ -> (ByteString
singleStreamSql, Params (SQLCursor, Int, EventSelector SQLStore)
singleStreamEncoder)
statement :: Statement (SQLCursor, Int, EventSelector SQLStore) [EventData]
statement = ByteString
-> Params (SQLCursor, Int, EventSelector SQLStore)
-> Result [EventData]
-> Bool
-> Statement (SQLCursor, Int, EventSelector SQLStore) [EventData]
forall params result.
ByteString
-> Params params
-> Result result
-> Bool
-> Statement params result
Statement ByteString
sql Params (SQLCursor, Int, EventSelector SQLStore)
params Result [EventData]
decoder Bool
True
runSession :: Session [EventData]
runSession = (SQLCursor, Int, EventSelector SQLStore)
-> Statement (SQLCursor, Int, EventSelector SQLStore) [EventData]
-> Session [EventData]
forall params result.
params -> Statement params result -> Session result
Session.statement (SQLCursor
cursor, Int
limit, EventSelector SQLStore
selector) Statement (SQLCursor, Int, EventSelector SQLStore) [EventData]
statement
Pool -> Session [EventData] -> IO (Either UsageError [EventData])
forall a. Pool -> Session a -> IO (Either UsageError a)
Pool.use Pool
pool Session [EventData]
runSession IO (Either UsageError [EventData])
-> (Either UsageError [EventData] -> IO [EventData])
-> IO [EventData]
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right [EventData]
events -> [EventData] -> IO [EventData]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [EventData]
events
Left UsageError
err -> do
String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Failed to fetch event batch: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> UsageError -> String
forall a. Show a => a -> String
show UsageError
err
if UsageError -> Bool
shouldCrashImmediately UsageError
err
then SubscriptionFailure -> IO [EventData]
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (UsageError -> SubscriptionFailure
createSubscriptionFailure UsageError
err)
else [EventData] -> IO [EventData]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
where
decoder :: Result [EventData]
decoder =
Row EventData -> Result [EventData]
forall a. Row a -> Result [a]
D.rowList (Row EventData -> Result [EventData])
-> Row EventData -> Result [EventData]
forall a b. (a -> b) -> a -> b
$
Int64
-> Int32
-> UUID
-> UUID
-> Maybe UUID
-> UTCTime
-> Text
-> Int32
-> Value
-> Int64
-> EventData
EventData
(Int64
-> Int32
-> UUID
-> UUID
-> Maybe UUID
-> UTCTime
-> Text
-> Int32
-> Value
-> Int64
-> EventData)
-> Row Int64
-> Row
(Int32
-> UUID
-> UUID
-> Maybe UUID
-> UTCTime
-> Text
-> Int32
-> Value
-> Int64
-> EventData)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NullableOrNot Value Int64 -> Row Int64
forall a. NullableOrNot Value a -> Row a
D.column (Value Int64 -> NullableOrNot Value Int64
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value Int64
D.int8)
Row
(Int32
-> UUID
-> UUID
-> Maybe UUID
-> UTCTime
-> Text
-> Int32
-> Value
-> Int64
-> EventData)
-> Row Int32
-> Row
(UUID
-> UUID
-> Maybe UUID
-> UTCTime
-> Text
-> Int32
-> Value
-> Int64
-> EventData)
forall a b. Row (a -> b) -> Row a -> Row b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NullableOrNot Value Int32 -> Row Int32
forall a. NullableOrNot Value a -> Row a
D.column (Value Int32 -> NullableOrNot Value Int32
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value Int32
D.int4)
Row
(UUID
-> UUID
-> Maybe UUID
-> UTCTime
-> Text
-> Int32
-> Value
-> Int64
-> EventData)
-> Row UUID
-> Row
(UUID
-> Maybe UUID
-> UTCTime
-> Text
-> Int32
-> Value
-> Int64
-> EventData)
forall a b. Row (a -> b) -> Row a -> Row b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NullableOrNot Value UUID -> Row UUID
forall a. NullableOrNot Value a -> Row a
D.column (Value UUID -> NullableOrNot Value UUID
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value UUID
D.uuid)
Row
(UUID
-> Maybe UUID
-> UTCTime
-> Text
-> Int32
-> Value
-> Int64
-> EventData)
-> Row UUID
-> Row
(Maybe UUID
-> UTCTime -> Text -> Int32 -> Value -> Int64 -> EventData)
forall a b. Row (a -> b) -> Row a -> Row b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NullableOrNot Value UUID -> Row UUID
forall a. NullableOrNot Value a -> Row a
D.column (Value UUID -> NullableOrNot Value UUID
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value UUID
D.uuid)
Row
(Maybe UUID
-> UTCTime -> Text -> Int32 -> Value -> Int64 -> EventData)
-> Row (Maybe UUID)
-> Row (UTCTime -> Text -> Int32 -> Value -> Int64 -> EventData)
forall a b. Row (a -> b) -> Row a -> Row b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NullableOrNot Value (Maybe UUID) -> Row (Maybe UUID)
forall a. NullableOrNot Value a -> Row a
D.column (Value UUID -> NullableOrNot Value (Maybe UUID)
forall (decoder :: * -> *) a.
decoder a -> NullableOrNot decoder (Maybe a)
D.nullable Value UUID
D.uuid)
Row (UTCTime -> Text -> Int32 -> Value -> Int64 -> EventData)
-> Row UTCTime
-> Row (Text -> Int32 -> Value -> Int64 -> EventData)
forall a b. Row (a -> b) -> Row a -> Row b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NullableOrNot Value UTCTime -> Row UTCTime
forall a. NullableOrNot Value a -> Row a
D.column (Value UTCTime -> NullableOrNot Value UTCTime
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value UTCTime
D.timestamptz)
Row (Text -> Int32 -> Value -> Int64 -> EventData)
-> Row Text -> Row (Int32 -> Value -> Int64 -> EventData)
forall a b. Row (a -> b) -> Row a -> Row b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NullableOrNot Value Text -> Row Text
forall a. NullableOrNot Value a -> Row a
D.column (Value Text -> NullableOrNot Value Text
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value Text
D.text)
Row (Int32 -> Value -> Int64 -> EventData)
-> Row Int32 -> Row (Value -> Int64 -> EventData)
forall a b. Row (a -> b) -> Row a -> Row b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NullableOrNot Value Int32 -> Row Int32
forall a. NullableOrNot Value a -> Row a
D.column (Value Int32 -> NullableOrNot Value Int32
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value Int32
D.int4)
Row (Value -> Int64 -> EventData)
-> Row Value -> Row (Int64 -> EventData)
forall a b. Row (a -> b) -> Row a -> Row b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NullableOrNot Value Value -> Row Value
forall a. NullableOrNot Value a -> Row a
D.column (Value Value -> NullableOrNot Value Value
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value Value
D.jsonb)
Row (Int64 -> EventData) -> Row Int64 -> Row EventData
forall a b. Row (a -> b) -> Row a -> Row b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NullableOrNot Value Int64 -> Row Int64
forall a. NullableOrNot Value a -> Row a
D.column (Value Int64 -> NullableOrNot Value Int64
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value Int64
D.int8)
baseSql :: ByteString
baseSql :: ByteString
baseSql =
ByteString
"SELECT transaction_no, seq_no, event_id, stream_id, correlation_id, created_at, event_name, event_version, payload, stream_version "
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"FROM events "
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"WHERE (transaction_no, seq_no) > ($1, $2) "
ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"AND transaction_no <= get_safe_transaction_number_mvcc() "
allStreamsSql :: ByteString
allStreamsSql :: ByteString
allStreamsSql = ByteString
baseSql ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"ORDER BY transaction_no, seq_no LIMIT $3"
singleStreamSql :: ByteString
singleStreamSql :: ByteString
singleStreamSql = ByteString
baseSql ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"AND stream_id = $4 ORDER BY transaction_no, seq_no LIMIT $3"
allStreamsEncoder :: E.Params (SQLCursor, Int, EventSelector SQLStore)
allStreamsEncoder :: Params (SQLCursor, Int, EventSelector SQLStore)
allStreamsEncoder =
((SQLCursor, Int, EventSelector SQLStore) -> Int64)
-> Params Int64 -> Params (SQLCursor, Int, EventSelector SQLStore)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(SQLCursor
c, Int
_, EventSelector SQLStore
_) -> SQLCursor
c.transactionNo) (NullableOrNot Value Int64 -> Params Int64
forall a. NullableOrNot Value a -> Params a
E.param (Value Int64 -> NullableOrNot Value Int64
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int64
E.int8))
Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
forall a. Semigroup a => a -> a -> a
<> ((SQLCursor, Int, EventSelector SQLStore) -> Int32)
-> Params Int32 -> Params (SQLCursor, Int, EventSelector SQLStore)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(SQLCursor
c, Int
_, EventSelector SQLStore
_) -> SQLCursor
c.sequenceNo) (NullableOrNot Value Int32 -> Params Int32
forall a. NullableOrNot Value a -> Params a
E.param (Value Int32 -> NullableOrNot Value Int32
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int32
E.int4))
Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
forall a. Semigroup a => a -> a -> a
<> ((SQLCursor, Int, EventSelector SQLStore) -> Int32)
-> Params Int32 -> Params (SQLCursor, Int, EventSelector SQLStore)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(SQLCursor
_, Int
l, EventSelector SQLStore
_) -> Int -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
l) (NullableOrNot Value Int32 -> Params Int32
forall a. NullableOrNot Value a -> Params a
E.param (Value Int32 -> NullableOrNot Value Int32
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int32
E.int4))
singleStreamEncoder :: E.Params (SQLCursor, Int, EventSelector SQLStore)
singleStreamEncoder :: Params (SQLCursor, Int, EventSelector SQLStore)
singleStreamEncoder =
((SQLCursor, Int, EventSelector SQLStore) -> Int64)
-> Params Int64 -> Params (SQLCursor, Int, EventSelector SQLStore)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(SQLCursor
c, Int
_, EventSelector SQLStore
_) -> SQLCursor
c.transactionNo) (NullableOrNot Value Int64 -> Params Int64
forall a. NullableOrNot Value a -> Params a
E.param (Value Int64 -> NullableOrNot Value Int64
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int64
E.int8))
Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
forall a. Semigroup a => a -> a -> a
<> ((SQLCursor, Int, EventSelector SQLStore) -> Int32)
-> Params Int32 -> Params (SQLCursor, Int, EventSelector SQLStore)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(SQLCursor
c, Int
_, EventSelector SQLStore
_) -> SQLCursor
c.sequenceNo) (NullableOrNot Value Int32 -> Params Int32
forall a. NullableOrNot Value a -> Params a
E.param (Value Int32 -> NullableOrNot Value Int32
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int32
E.int4))
Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
forall a. Semigroup a => a -> a -> a
<> ((SQLCursor, Int, EventSelector SQLStore) -> Int32)
-> Params Int32 -> Params (SQLCursor, Int, EventSelector SQLStore)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(SQLCursor
_, Int
l, EventSelector SQLStore
_) -> Int -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
l) (NullableOrNot Value Int32 -> Params Int32
forall a. NullableOrNot Value a -> Params a
E.param (Value Int32 -> NullableOrNot Value Int32
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int32
E.int4))
Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
forall a. Semigroup a => a -> a -> a
<> ((SQLCursor, Int, EventSelector SQLStore) -> UUID)
-> Params UUID -> Params (SQLCursor, Int, EventSelector SQLStore)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(SQLCursor
_, Int
_, EventSelector SQLStore
s) -> case EventSelector SQLStore
s.streamId of SingleStream (StreamId UUID
sid) -> UUID
sid; StreamSelector
_ -> String -> UUID
forall a. HasCallStack => String -> a
error String
"impossible") (NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
E.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value UUID
E.uuid))
processEventBatch ::
forall m ts.
(MonadUnliftIO m) =>
EventMatcher ts SQLStore m ->
[EventData] ->
m (SQLCursor, Bool)
processEventBatch :: forall (m :: * -> *) (ts :: [Symbol]).
MonadUnliftIO m =>
EventMatcher ts SQLStore m -> [EventData] -> m (SQLCursor, Bool)
processEventBatch EventMatcher ts SQLStore m
matcher [EventData]
batch = do
stopRef <- Bool -> m (IORef Bool)
forall (m :: * -> *) a. MonadIO m => a -> m (IORef a)
newIORef Bool
False
lastCursorRef <- newIORef Nothing
let tryMatchers :: forall ts'. EventMatcher ts' SQLStore m -> EventData -> m ()
tryMatchers EventMatcher ts' SQLStore m
MatchEnd EventData
eventData = do
let cursor :: SQLCursor
cursor = Int64 -> Int32 -> SQLCursor
SQLCursor EventData
eventData.transactionNo EventData
eventData.seqNo
IORef (Maybe SQLCursor) -> Maybe SQLCursor -> m ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef (Maybe SQLCursor)
lastCursorRef (SQLCursor -> Maybe SQLCursor
forall a. a -> Maybe a
Just SQLCursor
cursor)
tryMatchers ((Proxy event
proxy, EventHandler event m SQLStore
handler) :? EventMatcher ts1 SQLStore m
rest) EventData
eventData = do
let cursor :: SQLCursor
cursor = Int64 -> Int32 -> SQLCursor
SQLCursor EventData
eventData.transactionNo EventData
eventData.seqNo
if EventData
eventData.eventName Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Proxy event -> Text
forall (event :: Symbol). KnownSymbol event => Proxy event -> Text
getEventName Proxy event
proxy
then do
case Int
-> Map
Int
(Value -> Parser (FinalVersionType (FromList (Versions event))))
-> Maybe
(Value -> Parser (FinalVersionType (FromList (Versions event))))
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral EventData
eventData.eventVersion) (Proxy event -> Map Int (Value -> Parser (CurrentPayloadType event))
forall (event :: Symbol).
Event event =>
Proxy event -> Map Int (Value -> Parser (CurrentPayloadType event))
parseMapFromProxy Proxy event
proxy) of
Just Value -> Parser (FinalVersionType (FromList (Versions event)))
parser ->
case (Value -> Parser (FinalVersionType (FromList (Versions event))))
-> Value
-> Either String (FinalVersionType (FromList (Versions event)))
forall a b. (a -> Parser b) -> a -> Either String b
Aeson.parseEither Value -> Parser (FinalVersionType (FromList (Versions event)))
parser EventData
eventData.payload of
Right FinalVersionType (FromList (Versions event))
parsedPayload -> do
let envelope :: EventEnvelope event SQLStore
envelope =
EventWithMetadata
{ position :: Cursor SQLStore
position = Cursor SQLStore
SQLCursor
cursor
, eventId :: EventId
eventId = UUID -> EventId
EventId EventData
eventData.eventId
, streamId :: StreamId
streamId = UUID -> StreamId
StreamId EventData
eventData.streamId
, streamVersion :: StreamVersion
streamVersion = Int64 -> StreamVersion
StreamVersion EventData
eventData.streamVersion
, correlationId :: Maybe CorrelationId
correlationId = UUID -> CorrelationId
CorrelationId (UUID -> CorrelationId) -> Maybe UUID -> Maybe CorrelationId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventData
eventData.correlationId
, createdAt :: UTCTime
createdAt = EventData
eventData.createdAt
, payload :: CurrentPayloadType event
payload = FinalVersionType (FromList (Versions event))
CurrentPayloadType event
parsedPayload
}
result <-
(EventHandler event m SQLStore
handler EventEnvelope event SQLStore
envelope) m SubscriptionResult
-> (SomeException -> m SubscriptionResult) -> m SubscriptionResult
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(SomeException
e :: SomeException) ->
HandlerException -> m SubscriptionResult
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (HandlerException -> m SubscriptionResult)
-> HandlerException -> m SubscriptionResult
forall a b. (a -> b) -> a -> b
$
Store.HandlerException
{ originalException :: SomeException
Store.originalException = SomeException
e
, failedEventPosition :: Text
Store.failedEventPosition = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ SQLCursor -> String
forall a. Show a => a -> String
show SQLCursor
cursor
, failedEventId :: EventId
Store.failedEventId = UUID -> EventId
EventId EventData
eventData.eventId
, failedEventName :: Text
Store.failedEventName = EventData
eventData.eventName
, failedEventStreamId :: StreamId
Store.failedEventStreamId = UUID -> StreamId
StreamId EventData
eventData.streamId
, failedEventStreamVersion :: StreamVersion
Store.failedEventStreamVersion = Int64 -> StreamVersion
StreamVersion EventData
eventData.streamVersion
, failedEventCorrelationId :: Maybe CorrelationId
Store.failedEventCorrelationId = UUID -> CorrelationId
CorrelationId (UUID -> CorrelationId) -> Maybe UUID -> Maybe CorrelationId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventData
eventData.correlationId
, failedEventCreatedAt :: UTCTime
Store.failedEventCreatedAt = EventData
eventData.createdAt
}
writeIORef lastCursorRef (Just cursor)
case result of
SubscriptionResult
Store.Stop -> IORef Bool -> Bool -> m ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef Bool
stopRef Bool
True
SubscriptionResult
Store.Continue -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left String
err -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Failed to parse event payload: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> ShowS
forall a. Show a => a -> String
show String
err
IORef (Maybe SQLCursor) -> Maybe SQLCursor -> m ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef (Maybe SQLCursor)
lastCursorRef (SQLCursor -> Maybe SQLCursor
forall a. a -> Maybe a
Just SQLCursor
cursor)
Maybe
(Value -> Parser (FinalVersionType (FromList (Versions event))))
Nothing -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Unknown event version for " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> String
forall a. Show a => a -> String
show (EventData
eventData.eventName)
IORef (Maybe SQLCursor) -> Maybe SQLCursor -> m ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef (Maybe SQLCursor)
lastCursorRef (SQLCursor -> Maybe SQLCursor
forall a. a -> Maybe a
Just SQLCursor
cursor)
else do
EventMatcher ts1 SQLStore m -> EventData -> m ()
forall (ts' :: [Symbol]).
EventMatcher ts' SQLStore m -> EventData -> m ()
tryMatchers EventMatcher ts1 SQLStore m
rest EventData
eventData
let processAllEvents [] = () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
processAllEvents (EventData
event : [EventData]
remaining) = do
shouldStop <- IORef Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef IORef Bool
stopRef
if shouldStop
then pure ()
else do
tryMatchers matcher event
processAllEvents remaining
processAllEvents batch
stopped <- readIORef stopRef
lastCursor <- readIORef lastCursorRef
let finalCursor = case Maybe SQLCursor
lastCursor of
Just SQLCursor
cursor -> SQLCursor
cursor
Maybe SQLCursor
Nothing -> case [EventData] -> Maybe EventData
forall a. [a] -> Maybe a
listToMaybe ([EventData] -> [EventData]
forall a. [a] -> [a]
reverse [EventData]
batch) of
Just EventData
lastEvent -> Int64 -> Int32 -> SQLCursor
SQLCursor EventData
lastEvent.transactionNo EventData
lastEvent.seqNo
Maybe EventData
Nothing -> String -> SQLCursor
forall a. HasCallStack => String -> a
error String
"processEventBatch called with an empty batch"
pure (finalCursor, not stopped)