{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

{- |
Module      : Hindsight.Store.PostgreSQL.Events.Subscription
Description : Unified subscription system for PostgreSQL event store
Copyright   : (c) 2024
License     : BSD3
Maintainer  : maintainer@example.com
Stability   : experimental

This module provides a decentralized, pull-based subscription system where each subscriber
is a self-contained agent.

= Design

The architecture consists of two main components:

1.  __The Notifier__: A single, lightweight process that listens for a generic
    "new event" notification from PostgreSQL and broadcasts a simple "tick" to all
    active subscribers.

2.  __The Subscriber Worker__: Each subscription runs in its own thread. The worker
    pulls data from the database in a unified loop, naturally handling both
    catch-up and real-time processing. It is responsible for its own state
    management and data fetching.

This design eliminates the complexity and bottlenecks of a centralized manager, provides
inherent backpressure, and leverages the database for efficient filtering.
-}
module Hindsight.Store.PostgreSQL.Events.Subscription (
    startNotifier,
    shutdownNotifier,
    subscribe,
    RetryPolicy (..),
    RetryConfig (..),
) where

import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (async, cancel, wait)
import Control.Concurrent.STM (
    TChan,
    atomically,
    dupTChan,
    newBroadcastTChanIO,
    readTChan,
    writeTChan,
 )
import Control.Exception (AsyncException, Exception, SomeException, finally, try)
import Control.Monad (forever)
import Data.Aeson (Value)
import Data.Aeson.Types qualified as Aeson
import Data.Functor.Contravariant (contramap)
import Data.Int (Int32, Int64)
import Data.Map.Strict qualified as Map
import Data.Maybe (listToMaybe)
import Data.Text (Text, isInfixOf, pack)
import Data.Text qualified as T
import Data.Text.Encoding (decodeUtf8)
import Data.Time (UTCTime)
import Data.UUID (UUID)
import GHC.Generics (Generic)
import Hasql.Connection qualified as Connection
import Hasql.Connection.Setting qualified as ConnectionSetting
import Hasql.Connection.Setting.Connection qualified as ConnectionSettingConnection
import Hasql.Decoders qualified as D
import Hasql.Encoders qualified as E
import Hasql.Notifications qualified as Notifications
import Hasql.Pool (UsageError (..))
import Hasql.Pool qualified as Pool
import Hasql.Session qualified as Session
import Hasql.Statement (Statement (..))
import Hindsight.Events
import Hindsight.Store hiding (subscribe)
import Hindsight.Store qualified as Store
import Hindsight.Store.PostgreSQL.Core.Types
import UnliftIO (
    MonadIO,
    MonadUnliftIO (..),
    askRunInIO,
    liftIO,
    newIORef,
    readIORef,
    writeIORef,
 )
import UnliftIO.Exception (catch, throwIO)

-- | Exception thrown when a subscription fails and should crash
data SubscriptionFailure = SubscriptionFailure
    { SubscriptionFailure -> Text
failureReason :: Text
    , SubscriptionFailure -> UsageError
underlyingError :: UsageError
    }
    deriving (Int -> SubscriptionFailure -> ShowS
[SubscriptionFailure] -> ShowS
SubscriptionFailure -> String
(Int -> SubscriptionFailure -> ShowS)
-> (SubscriptionFailure -> String)
-> ([SubscriptionFailure] -> ShowS)
-> Show SubscriptionFailure
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SubscriptionFailure -> ShowS
showsPrec :: Int -> SubscriptionFailure -> ShowS
$cshow :: SubscriptionFailure -> String
show :: SubscriptionFailure -> String
$cshowList :: [SubscriptionFailure] -> ShowS
showList :: [SubscriptionFailure] -> ShowS
Show, (forall x. SubscriptionFailure -> Rep SubscriptionFailure x)
-> (forall x. Rep SubscriptionFailure x -> SubscriptionFailure)
-> Generic SubscriptionFailure
forall x. Rep SubscriptionFailure x -> SubscriptionFailure
forall x. SubscriptionFailure -> Rep SubscriptionFailure x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. SubscriptionFailure -> Rep SubscriptionFailure x
from :: forall x. SubscriptionFailure -> Rep SubscriptionFailure x
$cto :: forall x. Rep SubscriptionFailure x -> SubscriptionFailure
to :: forall x. Rep SubscriptionFailure x -> SubscriptionFailure
Generic)

instance Exception SubscriptionFailure

-- | Retry policy for different error types
data RetryPolicy = RetryPolicy
    { RetryPolicy -> Int
maxRetries :: Int -- Maximum number of retry attempts
    , RetryPolicy -> Int
baseDelayMs :: Int -- Base delay in milliseconds
    , RetryPolicy -> Int
maxDelayMs :: Int -- Maximum delay cap in milliseconds
    , RetryPolicy -> Double
backoffMultiplier :: Double -- Exponential backoff multiplier (e.g., 2.0)
    , RetryPolicy -> Double
jitterPercent :: Double -- Jitter percentage (0.0 - 1.0)
    }
    deriving (Int -> RetryPolicy -> ShowS
[RetryPolicy] -> ShowS
RetryPolicy -> String
(Int -> RetryPolicy -> ShowS)
-> (RetryPolicy -> String)
-> ([RetryPolicy] -> ShowS)
-> Show RetryPolicy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RetryPolicy -> ShowS
showsPrec :: Int -> RetryPolicy -> ShowS
$cshow :: RetryPolicy -> String
show :: RetryPolicy -> String
$cshowList :: [RetryPolicy] -> ShowS
showList :: [RetryPolicy] -> ShowS
Show, RetryPolicy -> RetryPolicy -> Bool
(RetryPolicy -> RetryPolicy -> Bool)
-> (RetryPolicy -> RetryPolicy -> Bool) -> Eq RetryPolicy
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RetryPolicy -> RetryPolicy -> Bool
== :: RetryPolicy -> RetryPolicy -> Bool
$c/= :: RetryPolicy -> RetryPolicy -> Bool
/= :: RetryPolicy -> RetryPolicy -> Bool
Eq, (forall x. RetryPolicy -> Rep RetryPolicy x)
-> (forall x. Rep RetryPolicy x -> RetryPolicy)
-> Generic RetryPolicy
forall x. Rep RetryPolicy x -> RetryPolicy
forall x. RetryPolicy -> Rep RetryPolicy x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. RetryPolicy -> Rep RetryPolicy x
from :: forall x. RetryPolicy -> Rep RetryPolicy x
$cto :: forall x. Rep RetryPolicy x -> RetryPolicy
to :: forall x. Rep RetryPolicy x -> RetryPolicy
Generic)

-- | Default retry policies for different error scenarios
data RetryConfig = RetryConfig
    { RetryConfig -> Maybe RetryPolicy
connectionRetryPolicy :: Maybe RetryPolicy -- For connection errors (transient)
    , RetryConfig -> Maybe RetryPolicy
sessionRetryPolicy :: Maybe RetryPolicy -- For session/query errors
    , RetryConfig -> Maybe RetryPolicy
timeoutRetryPolicy :: Maybe RetryPolicy -- For timeout errors
    }
    deriving (Int -> RetryConfig -> ShowS
[RetryConfig] -> ShowS
RetryConfig -> String
(Int -> RetryConfig -> ShowS)
-> (RetryConfig -> String)
-> ([RetryConfig] -> ShowS)
-> Show RetryConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RetryConfig -> ShowS
showsPrec :: Int -> RetryConfig -> ShowS
$cshow :: RetryConfig -> String
show :: RetryConfig -> String
$cshowList :: [RetryConfig] -> ShowS
showList :: [RetryConfig] -> ShowS
Show, RetryConfig -> RetryConfig -> Bool
(RetryConfig -> RetryConfig -> Bool)
-> (RetryConfig -> RetryConfig -> Bool) -> Eq RetryConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RetryConfig -> RetryConfig -> Bool
== :: RetryConfig -> RetryConfig -> Bool
$c/= :: RetryConfig -> RetryConfig -> Bool
/= :: RetryConfig -> RetryConfig -> Bool
Eq, (forall x. RetryConfig -> Rep RetryConfig x)
-> (forall x. Rep RetryConfig x -> RetryConfig)
-> Generic RetryConfig
forall x. Rep RetryConfig x -> RetryConfig
forall x. RetryConfig -> Rep RetryConfig x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. RetryConfig -> Rep RetryConfig x
from :: forall x. RetryConfig -> Rep RetryConfig x
$cto :: forall x. Rep RetryConfig x -> RetryConfig
to :: forall x. Rep RetryConfig x -> RetryConfig
Generic)

-- | Determines if a Pool usage error should cause immediate crash (no retry)
shouldCrashImmediately :: UsageError -> Bool
shouldCrashImmediately :: UsageError -> Bool
shouldCrashImmediately UsageError
err = case UsageError
err of
    -- Pool exhaustion - crash immediately, no point in retrying
    ConnectionUsageError ConnectionError
connErr | Text
"too many clients already" Text -> Text -> Bool
`isInfixOf` (String -> Text
pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ ConnectionError -> String
forall a. Show a => a -> String
show ConnectionError
connErr) -> Bool
True
    -- Other errors might be retriable depending on configuration
    UsageError
_ -> Bool
False

-- | Creates a SubscriptionFailure with appropriate reason
createSubscriptionFailure :: UsageError -> SubscriptionFailure
createSubscriptionFailure :: UsageError -> SubscriptionFailure
createSubscriptionFailure UsageError
err = Text -> UsageError -> SubscriptionFailure
SubscriptionFailure Text
reason UsageError
err
  where
    reason :: Text
reason = case UsageError
err of
        ConnectionUsageError ConnectionError
connErr
            | Text
"too many clients already" Text -> Text -> Bool
`isInfixOf` (String -> Text
pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ ConnectionError -> String
forall a. Show a => a -> String
show ConnectionError
connErr) ->
                Text
"Connection pool exhausted - too many concurrent subscriptions"
        ConnectionUsageError ConnectionError
_ ->
            Text
"Database connection failed"
        SessionUsageError SessionError
_ ->
            Text
"Database query or session error"
        UsageError
AcquisitionTimeoutUsageError ->
            Text
"Failed to acquire database connection within timeout"

{- | Starts the notifier thread.
This should be created once per application and shared.
-}
startNotifier :: (MonadIO m) => ByteString -> m Notifier
startNotifier :: forall (m :: * -> *). MonadIO m => ByteString -> m Notifier
startNotifier ByteString
connectionString = IO Notifier -> m Notifier
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Notifier -> m Notifier) -> IO Notifier -> m Notifier
forall a b. (a -> b) -> a -> b
$ do
    chan <- IO (TChan ())
forall a. IO (TChan a)
newBroadcastTChanIO
    thread <- async $ notifierLoop connectionString chan
    pure $ Notifier chan thread

{- | Stops the notifier thread and waits for it to terminate.

This ensures the notifier is fully stopped before returning,
preventing connection attempts after database shutdown.
-}
shutdownNotifier :: Notifier -> IO ()
shutdownNotifier :: Notifier -> IO ()
shutdownNotifier Notifier
notifier = do
    Async () -> IO ()
forall a. Async a -> IO ()
cancel (Notifier -> Async ()
notifierThread Notifier
notifier)
    -- Wait for the thread to actually terminate to avoid race conditions
    -- The wait will catch the async exception (ThreadKilled) that we sent via cancel
    _ <- forall e a. Exception e => IO a -> IO (Either e a)
try @SomeException (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ Async () -> IO ()
forall a. Async a -> IO a
wait (Notifier -> Async ()
notifierThread Notifier
notifier)
    pure ()

{- | The main loop for the notifier.
Connects to Postgres, listens for notifications, and broadcasts a tick.
Implements reconnect logic on connection failure.

Handles async exceptions (like ThreadKilled from shutdown) gracefully
by exiting immediately without attempting reconnection.
-}
notifierLoop :: ByteString -> TChan () -> IO ()
notifierLoop :: ByteString -> TChan () -> IO ()
notifierLoop ByteString
connectionString TChan ()
chan =
    ( IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        eConn <- [Setting] -> IO (Either ConnectionError Connection)
Connection.acquire [Connection -> Setting
ConnectionSetting.connection (Connection -> Setting) -> Connection -> Setting
forall a b. (a -> b) -> a -> b
$ Text -> Connection
ConnectionSettingConnection.string (ByteString -> Text
decodeUtf8 ByteString
connectionString)]
        case eConn of
            Left ConnectionError
_err -> do
                -- On connection error, wait and retry
                Int -> IO ()
threadDelay Int
1000000
            Right Connection
conn -> do
                let cleanup :: IO ()
cleanup = Connection -> IO ()
Connection.release Connection
conn
                    listen :: IO ()
listen = Connection -> PgIdentifier -> IO ()
Notifications.listen Connection
conn (Text -> PgIdentifier
Notifications.toPgIdentifier Text
"event_store_transaction")
                    handler :: ByteString -> ByteString -> IO ()
handler = \ByteString
_ ByteString
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan () -> () -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan ()
chan ()
                    waitForNotification :: IO ()
waitForNotification = (ByteString -> ByteString -> IO ()) -> Connection -> IO ()
Notifications.waitForNotifications ByteString -> ByteString -> IO ()
handler Connection
conn

                -- Run the listener, ensuring cleanup happens on any exception
                (IO ()
listen IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
waitForNotification) IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`finally` IO ()
cleanup
    )
        IO () -> (AsyncException -> IO ()) -> IO ()
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(AsyncException
_e :: AsyncException) -> do
            -- On async exception (shutdown/cancel), exit cleanly without reconnecting
            () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

{- | The main subscription function.
It replaces the old, complex manager-based subscription.
-}
subscribe ::
    forall m ts.
    (MonadUnliftIO m) =>
    SQLStoreHandle ->
    EventMatcher ts SQLStore m ->
    EventSelector SQLStore ->
    m (Store.SubscriptionHandle SQLStore)
subscribe :: forall (m :: * -> *) (ts :: [Symbol]).
MonadUnliftIO m =>
SQLStoreHandle
-> EventMatcher ts SQLStore m
-> EventSelector SQLStore
-> m (SubscriptionHandle SQLStore)
subscribe SQLStoreHandle
handle EventMatcher ts SQLStore m
matcher EventSelector SQLStore
selector = do
    runInIO <- m (m () -> IO ())
forall (m :: * -> *) a. MonadUnliftIO m => m (m a -> IO a)
askRunInIO
    liftIO $ do
        -- Get a personal channel from the notifier's broadcast
        tickChannel <- atomically $ dupTChan (notifierChannel (notifier handle))

        let initialCursor = case EventSelector SQLStore
selector.startupPosition of
                StartupPosition SQLStore
FromBeginning -> Int64 -> Int32 -> SQLCursor
SQLCursor (-Int64
1) (-Int32
1)
                FromLastProcessed Cursor SQLStore
cursor -> Cursor SQLStore
SQLCursor
cursor

        -- Spawn an independent worker thread for this subscription
        workerThread <- async $ runInIO $ workerLoop (pool handle) tickChannel initialCursor matcher selector

        -- Return a handle that allows the user to cancel the subscription
        pure $
            Store.SubscriptionHandle
                { cancel = cancel workerThread
                , wait = wait workerThread
                }

-- | The main loop for an individual subscriber worker.
workerLoop ::
    (MonadUnliftIO m) =>
    Pool ->
    TChan () ->
    SQLCursor ->
    EventMatcher ts SQLStore m ->
    EventSelector SQLStore ->
    m ()
workerLoop :: forall (m :: * -> *) (ts :: [Symbol]).
MonadUnliftIO m =>
Pool
-> TChan ()
-> SQLCursor
-> EventMatcher ts SQLStore m
-> EventSelector SQLStore
-> m ()
workerLoop Pool
pool TChan ()
tickChannel SQLCursor
initialCursor EventMatcher ts SQLStore m
matcher EventSelector SQLStore
selector = do
    cursorRef <- SQLCursor -> m (IORef SQLCursor)
forall (m :: * -> *) a. MonadIO m => a -> m (IORef a)
newIORef SQLCursor
initialCursor
    let batchSize = Int
1000 -- A configurable batch size would be better
    let loop = do
            cursor <- IORef SQLCursor -> m SQLCursor
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef IORef SQLCursor
cursorRef
            batch <- fetchEventBatch pool cursor batchSize selector

            if null batch
                then do
                    -- No events available, wait for notification
                    liftIO $ atomically $ readTChan tickChannel
                    loop -- Continue loop
                else do
                    -- Process the batch of events
                    (newCursor, shouldContinue) <- processEventBatch matcher batch
                    writeIORef cursorRef newCursor
                    -- Only continue if handler didn't return Stop
                    if shouldContinue
                        then loop
                        else pure () -- Exit loop on Stop
    loop

-- | The raw event data structure fetched from the database.
data EventData = EventData
    { EventData -> Int64
transactionNo :: Int64
    , EventData -> Int32
seqNo :: Int32
    , EventData -> UUID
eventId :: UUID
    , EventData -> UUID
streamId :: UUID
    , EventData -> Maybe UUID
correlationId :: Maybe UUID
    , EventData -> UTCTime
createdAt :: UTCTime
    , EventData -> Text
eventName :: Text
    , EventData -> Int32
eventVersion :: Int32
    , EventData -> Value
payload :: Value
    , EventData -> Int64
streamVersion :: Int64
    }
    deriving (Int -> EventData -> ShowS
[EventData] -> ShowS
EventData -> String
(Int -> EventData -> ShowS)
-> (EventData -> String)
-> ([EventData] -> ShowS)
-> Show EventData
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> EventData -> ShowS
showsPrec :: Int -> EventData -> ShowS
$cshow :: EventData -> String
show :: EventData -> String
$cshowList :: [EventData] -> ShowS
showList :: [EventData] -> ShowS
Show)

-- | Fetches a batch of events from the database using the given selector.
fetchEventBatch ::
    (MonadIO m) =>
    Pool ->
    SQLCursor ->
    Int ->
    EventSelector SQLStore ->
    m [EventData]
fetchEventBatch :: forall (m :: * -> *).
MonadIO m =>
Pool -> SQLCursor -> Int -> EventSelector SQLStore -> m [EventData]
fetchEventBatch Pool
pool SQLCursor
cursor Int
limit EventSelector SQLStore
selector = IO [EventData] -> m [EventData]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [EventData] -> m [EventData])
-> IO [EventData] -> m [EventData]
forall a b. (a -> b) -> a -> b
$ do
    let (ByteString
sql, Params (SQLCursor, Int, EventSelector SQLStore)
params) = case EventSelector SQLStore
selector.streamId of
            StreamSelector
AllStreams -> (ByteString
allStreamsSql, Params (SQLCursor, Int, EventSelector SQLStore)
allStreamsEncoder)
            SingleStream StreamId
_ -> (ByteString
singleStreamSql, Params (SQLCursor, Int, EventSelector SQLStore)
singleStreamEncoder)

        statement :: Statement (SQLCursor, Int, EventSelector SQLStore) [EventData]
statement = ByteString
-> Params (SQLCursor, Int, EventSelector SQLStore)
-> Result [EventData]
-> Bool
-> Statement (SQLCursor, Int, EventSelector SQLStore) [EventData]
forall params result.
ByteString
-> Params params
-> Result result
-> Bool
-> Statement params result
Statement ByteString
sql Params (SQLCursor, Int, EventSelector SQLStore)
params Result [EventData]
decoder Bool
True
        runSession :: Session [EventData]
runSession = (SQLCursor, Int, EventSelector SQLStore)
-> Statement (SQLCursor, Int, EventSelector SQLStore) [EventData]
-> Session [EventData]
forall params result.
params -> Statement params result -> Session result
Session.statement (SQLCursor
cursor, Int
limit, EventSelector SQLStore
selector) Statement (SQLCursor, Int, EventSelector SQLStore) [EventData]
statement

    Pool -> Session [EventData] -> IO (Either UsageError [EventData])
forall a. Pool -> Session a -> IO (Either UsageError a)
Pool.use Pool
pool Session [EventData]
runSession IO (Either UsageError [EventData])
-> (Either UsageError [EventData] -> IO [EventData])
-> IO [EventData]
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Right [EventData]
events -> [EventData] -> IO [EventData]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [EventData]
events
        Left UsageError
err -> do
            String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Failed to fetch event batch: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> UsageError -> String
forall a. Show a => a -> String
show UsageError
err
            if UsageError -> Bool
shouldCrashImmediately UsageError
err
                then SubscriptionFailure -> IO [EventData]
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (UsageError -> SubscriptionFailure
createSubscriptionFailure UsageError
err)
                else [EventData] -> IO [EventData]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [] -- Could add retry logic here for recoverable errors
  where
    decoder :: Result [EventData]
decoder =
        Row EventData -> Result [EventData]
forall a. Row a -> Result [a]
D.rowList (Row EventData -> Result [EventData])
-> Row EventData -> Result [EventData]
forall a b. (a -> b) -> a -> b
$
            Int64
-> Int32
-> UUID
-> UUID
-> Maybe UUID
-> UTCTime
-> Text
-> Int32
-> Value
-> Int64
-> EventData
EventData
                (Int64
 -> Int32
 -> UUID
 -> UUID
 -> Maybe UUID
 -> UTCTime
 -> Text
 -> Int32
 -> Value
 -> Int64
 -> EventData)
-> Row Int64
-> Row
     (Int32
      -> UUID
      -> UUID
      -> Maybe UUID
      -> UTCTime
      -> Text
      -> Int32
      -> Value
      -> Int64
      -> EventData)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NullableOrNot Value Int64 -> Row Int64
forall a. NullableOrNot Value a -> Row a
D.column (Value Int64 -> NullableOrNot Value Int64
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value Int64
D.int8)
                Row
  (Int32
   -> UUID
   -> UUID
   -> Maybe UUID
   -> UTCTime
   -> Text
   -> Int32
   -> Value
   -> Int64
   -> EventData)
-> Row Int32
-> Row
     (UUID
      -> UUID
      -> Maybe UUID
      -> UTCTime
      -> Text
      -> Int32
      -> Value
      -> Int64
      -> EventData)
forall a b. Row (a -> b) -> Row a -> Row b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NullableOrNot Value Int32 -> Row Int32
forall a. NullableOrNot Value a -> Row a
D.column (Value Int32 -> NullableOrNot Value Int32
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value Int32
D.int4)
                Row
  (UUID
   -> UUID
   -> Maybe UUID
   -> UTCTime
   -> Text
   -> Int32
   -> Value
   -> Int64
   -> EventData)
-> Row UUID
-> Row
     (UUID
      -> Maybe UUID
      -> UTCTime
      -> Text
      -> Int32
      -> Value
      -> Int64
      -> EventData)
forall a b. Row (a -> b) -> Row a -> Row b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NullableOrNot Value UUID -> Row UUID
forall a. NullableOrNot Value a -> Row a
D.column (Value UUID -> NullableOrNot Value UUID
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value UUID
D.uuid)
                Row
  (UUID
   -> Maybe UUID
   -> UTCTime
   -> Text
   -> Int32
   -> Value
   -> Int64
   -> EventData)
-> Row UUID
-> Row
     (Maybe UUID
      -> UTCTime -> Text -> Int32 -> Value -> Int64 -> EventData)
forall a b. Row (a -> b) -> Row a -> Row b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NullableOrNot Value UUID -> Row UUID
forall a. NullableOrNot Value a -> Row a
D.column (Value UUID -> NullableOrNot Value UUID
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value UUID
D.uuid)
                Row
  (Maybe UUID
   -> UTCTime -> Text -> Int32 -> Value -> Int64 -> EventData)
-> Row (Maybe UUID)
-> Row (UTCTime -> Text -> Int32 -> Value -> Int64 -> EventData)
forall a b. Row (a -> b) -> Row a -> Row b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NullableOrNot Value (Maybe UUID) -> Row (Maybe UUID)
forall a. NullableOrNot Value a -> Row a
D.column (Value UUID -> NullableOrNot Value (Maybe UUID)
forall (decoder :: * -> *) a.
decoder a -> NullableOrNot decoder (Maybe a)
D.nullable Value UUID
D.uuid)
                Row (UTCTime -> Text -> Int32 -> Value -> Int64 -> EventData)
-> Row UTCTime
-> Row (Text -> Int32 -> Value -> Int64 -> EventData)
forall a b. Row (a -> b) -> Row a -> Row b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NullableOrNot Value UTCTime -> Row UTCTime
forall a. NullableOrNot Value a -> Row a
D.column (Value UTCTime -> NullableOrNot Value UTCTime
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value UTCTime
D.timestamptz)
                Row (Text -> Int32 -> Value -> Int64 -> EventData)
-> Row Text -> Row (Int32 -> Value -> Int64 -> EventData)
forall a b. Row (a -> b) -> Row a -> Row b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NullableOrNot Value Text -> Row Text
forall a. NullableOrNot Value a -> Row a
D.column (Value Text -> NullableOrNot Value Text
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value Text
D.text)
                Row (Int32 -> Value -> Int64 -> EventData)
-> Row Int32 -> Row (Value -> Int64 -> EventData)
forall a b. Row (a -> b) -> Row a -> Row b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NullableOrNot Value Int32 -> Row Int32
forall a. NullableOrNot Value a -> Row a
D.column (Value Int32 -> NullableOrNot Value Int32
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value Int32
D.int4)
                Row (Value -> Int64 -> EventData)
-> Row Value -> Row (Int64 -> EventData)
forall a b. Row (a -> b) -> Row a -> Row b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NullableOrNot Value Value -> Row Value
forall a. NullableOrNot Value a -> Row a
D.column (Value Value -> NullableOrNot Value Value
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value Value
D.jsonb)
                Row (Int64 -> EventData) -> Row Int64 -> Row EventData
forall a b. Row (a -> b) -> Row a -> Row b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NullableOrNot Value Int64 -> Row Int64
forall a. NullableOrNot Value a -> Row a
D.column (Value Int64 -> NullableOrNot Value Int64
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value Int64
D.int8)

-- SQL statements and encoders

baseSql :: ByteString
baseSql :: ByteString
baseSql =
    ByteString
"SELECT transaction_no, seq_no, event_id, stream_id, correlation_id, created_at, event_name, event_version, payload, stream_version "
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"FROM events "
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"WHERE (transaction_no, seq_no) > ($1, $2) "
        ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"AND transaction_no <= get_safe_transaction_number_mvcc() "

allStreamsSql :: ByteString
allStreamsSql :: ByteString
allStreamsSql = ByteString
baseSql ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"ORDER BY transaction_no, seq_no LIMIT $3"

singleStreamSql :: ByteString
singleStreamSql :: ByteString
singleStreamSql = ByteString
baseSql ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"AND stream_id = $4 ORDER BY transaction_no, seq_no LIMIT $3"

allStreamsEncoder :: E.Params (SQLCursor, Int, EventSelector SQLStore)
allStreamsEncoder :: Params (SQLCursor, Int, EventSelector SQLStore)
allStreamsEncoder =
    ((SQLCursor, Int, EventSelector SQLStore) -> Int64)
-> Params Int64 -> Params (SQLCursor, Int, EventSelector SQLStore)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(SQLCursor
c, Int
_, EventSelector SQLStore
_) -> SQLCursor
c.transactionNo) (NullableOrNot Value Int64 -> Params Int64
forall a. NullableOrNot Value a -> Params a
E.param (Value Int64 -> NullableOrNot Value Int64
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int64
E.int8))
        Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
forall a. Semigroup a => a -> a -> a
<> ((SQLCursor, Int, EventSelector SQLStore) -> Int32)
-> Params Int32 -> Params (SQLCursor, Int, EventSelector SQLStore)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(SQLCursor
c, Int
_, EventSelector SQLStore
_) -> SQLCursor
c.sequenceNo) (NullableOrNot Value Int32 -> Params Int32
forall a. NullableOrNot Value a -> Params a
E.param (Value Int32 -> NullableOrNot Value Int32
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int32
E.int4))
        Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
forall a. Semigroup a => a -> a -> a
<> ((SQLCursor, Int, EventSelector SQLStore) -> Int32)
-> Params Int32 -> Params (SQLCursor, Int, EventSelector SQLStore)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(SQLCursor
_, Int
l, EventSelector SQLStore
_) -> Int -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
l) (NullableOrNot Value Int32 -> Params Int32
forall a. NullableOrNot Value a -> Params a
E.param (Value Int32 -> NullableOrNot Value Int32
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int32
E.int4))

singleStreamEncoder :: E.Params (SQLCursor, Int, EventSelector SQLStore)
singleStreamEncoder :: Params (SQLCursor, Int, EventSelector SQLStore)
singleStreamEncoder =
    ((SQLCursor, Int, EventSelector SQLStore) -> Int64)
-> Params Int64 -> Params (SQLCursor, Int, EventSelector SQLStore)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(SQLCursor
c, Int
_, EventSelector SQLStore
_) -> SQLCursor
c.transactionNo) (NullableOrNot Value Int64 -> Params Int64
forall a. NullableOrNot Value a -> Params a
E.param (Value Int64 -> NullableOrNot Value Int64
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int64
E.int8))
        Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
forall a. Semigroup a => a -> a -> a
<> ((SQLCursor, Int, EventSelector SQLStore) -> Int32)
-> Params Int32 -> Params (SQLCursor, Int, EventSelector SQLStore)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(SQLCursor
c, Int
_, EventSelector SQLStore
_) -> SQLCursor
c.sequenceNo) (NullableOrNot Value Int32 -> Params Int32
forall a. NullableOrNot Value a -> Params a
E.param (Value Int32 -> NullableOrNot Value Int32
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int32
E.int4))
        Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
forall a. Semigroup a => a -> a -> a
<> ((SQLCursor, Int, EventSelector SQLStore) -> Int32)
-> Params Int32 -> Params (SQLCursor, Int, EventSelector SQLStore)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(SQLCursor
_, Int
l, EventSelector SQLStore
_) -> Int -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
l) (NullableOrNot Value Int32 -> Params Int32
forall a. NullableOrNot Value a -> Params a
E.param (Value Int32 -> NullableOrNot Value Int32
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int32
E.int4))
        Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
-> Params (SQLCursor, Int, EventSelector SQLStore)
forall a. Semigroup a => a -> a -> a
<> ((SQLCursor, Int, EventSelector SQLStore) -> UUID)
-> Params UUID -> Params (SQLCursor, Int, EventSelector SQLStore)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (\(SQLCursor
_, Int
_, EventSelector SQLStore
s) -> case EventSelector SQLStore
s.streamId of SingleStream (StreamId UUID
sid) -> UUID
sid; StreamSelector
_ -> String -> UUID
forall a. HasCallStack => String -> a
error String
"impossible") (NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
E.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value UUID
E.uuid))

{- | Processes a batch of events, calling the appropriate handlers.
Returns (cursor, shouldContinue) where shouldContinue = False means handler returned Stop
IMPORTANT: Events are processed in the order they appear in the batch to respect causality
-}
processEventBatch ::
    forall m ts.
    (MonadUnliftIO m) =>
    EventMatcher ts SQLStore m ->
    [EventData] ->
    m (SQLCursor, Bool)
processEventBatch :: forall (m :: * -> *) (ts :: [Symbol]).
MonadUnliftIO m =>
EventMatcher ts SQLStore m -> [EventData] -> m (SQLCursor, Bool)
processEventBatch EventMatcher ts SQLStore m
matcher [EventData]
batch = do
    stopRef <- Bool -> m (IORef Bool)
forall (m :: * -> *) a. MonadIO m => a -> m (IORef a)
newIORef Bool
False
    lastCursorRef <- newIORef Nothing

    -- Try to match a single event against all matchers
    let tryMatchers :: forall ts'. EventMatcher ts' SQLStore m -> EventData -> m ()
        tryMatchers EventMatcher ts' SQLStore m
MatchEnd EventData
eventData = do
            -- No matcher matched this event, just update cursor
            let cursor :: SQLCursor
cursor = Int64 -> Int32 -> SQLCursor
SQLCursor EventData
eventData.transactionNo EventData
eventData.seqNo
            IORef (Maybe SQLCursor) -> Maybe SQLCursor -> m ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef (Maybe SQLCursor)
lastCursorRef (SQLCursor -> Maybe SQLCursor
forall a. a -> Maybe a
Just SQLCursor
cursor)
        tryMatchers ((Proxy event
proxy, EventHandler event m SQLStore
handler) :? EventMatcher ts1 SQLStore m
rest) EventData
eventData = do
            let cursor :: SQLCursor
cursor = Int64 -> Int32 -> SQLCursor
SQLCursor EventData
eventData.transactionNo EventData
eventData.seqNo
            if EventData
eventData.eventName Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Proxy event -> Text
forall (event :: Symbol). KnownSymbol event => Proxy event -> Text
getEventName Proxy event
proxy
                then do
                    -- This matcher matches the event
                    case Int
-> Map
     Int
     (Value -> Parser (FinalVersionType (FromList (Versions event))))
-> Maybe
     (Value -> Parser (FinalVersionType (FromList (Versions event))))
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral EventData
eventData.eventVersion) (Proxy event -> Map Int (Value -> Parser (CurrentPayloadType event))
forall (event :: Symbol).
Event event =>
Proxy event -> Map Int (Value -> Parser (CurrentPayloadType event))
parseMapFromProxy Proxy event
proxy) of
                        Just Value -> Parser (FinalVersionType (FromList (Versions event)))
parser ->
                            case (Value -> Parser (FinalVersionType (FromList (Versions event))))
-> Value
-> Either String (FinalVersionType (FromList (Versions event)))
forall a b. (a -> Parser b) -> a -> Either String b
Aeson.parseEither Value -> Parser (FinalVersionType (FromList (Versions event)))
parser EventData
eventData.payload of
                                Right FinalVersionType (FromList (Versions event))
parsedPayload -> do
                                    let envelope :: EventEnvelope event SQLStore
envelope =
                                            EventWithMetadata
                                                { position :: Cursor SQLStore
position = Cursor SQLStore
SQLCursor
cursor
                                                , eventId :: EventId
eventId = UUID -> EventId
EventId EventData
eventData.eventId
                                                , streamId :: StreamId
streamId = UUID -> StreamId
StreamId EventData
eventData.streamId
                                                , streamVersion :: StreamVersion
streamVersion = Int64 -> StreamVersion
StreamVersion EventData
eventData.streamVersion
                                                , correlationId :: Maybe CorrelationId
correlationId = UUID -> CorrelationId
CorrelationId (UUID -> CorrelationId) -> Maybe UUID -> Maybe CorrelationId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventData
eventData.correlationId
                                                , createdAt :: UTCTime
createdAt = EventData
eventData.createdAt
                                                , payload :: CurrentPayloadType event
payload = FinalVersionType (FromList (Versions event))
CurrentPayloadType event
parsedPayload
                                                }
                                    -- Catch exceptions and enrich with event context
                                    result <-
                                        (EventHandler event m SQLStore
handler EventEnvelope event SQLStore
envelope) m SubscriptionResult
-> (SomeException -> m SubscriptionResult) -> m SubscriptionResult
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(SomeException
e :: SomeException) ->
                                            HandlerException -> m SubscriptionResult
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (HandlerException -> m SubscriptionResult)
-> HandlerException -> m SubscriptionResult
forall a b. (a -> b) -> a -> b
$
                                                Store.HandlerException
                                                    { originalException :: SomeException
Store.originalException = SomeException
e
                                                    , failedEventPosition :: Text
Store.failedEventPosition = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ SQLCursor -> String
forall a. Show a => a -> String
show SQLCursor
cursor
                                                    , failedEventId :: EventId
Store.failedEventId = UUID -> EventId
EventId EventData
eventData.eventId
                                                    , failedEventName :: Text
Store.failedEventName = EventData
eventData.eventName
                                                    , failedEventStreamId :: StreamId
Store.failedEventStreamId = UUID -> StreamId
StreamId EventData
eventData.streamId
                                                    , failedEventStreamVersion :: StreamVersion
Store.failedEventStreamVersion = Int64 -> StreamVersion
StreamVersion EventData
eventData.streamVersion
                                                    , failedEventCorrelationId :: Maybe CorrelationId
Store.failedEventCorrelationId = UUID -> CorrelationId
CorrelationId (UUID -> CorrelationId) -> Maybe UUID -> Maybe CorrelationId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventData
eventData.correlationId
                                                    , failedEventCreatedAt :: UTCTime
Store.failedEventCreatedAt = EventData
eventData.createdAt
                                                    }
                                    -- Update last processed cursor
                                    writeIORef lastCursorRef (Just cursor)
                                    -- Check if handler wants to stop
                                    case result of
                                        SubscriptionResult
Store.Stop -> IORef Bool -> Bool -> m ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef Bool
stopRef Bool
True
                                        SubscriptionResult
Store.Continue -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                                Left String
err -> do
                                    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Failed to parse event payload: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> ShowS
forall a. Show a => a -> String
show String
err
                                    IORef (Maybe SQLCursor) -> Maybe SQLCursor -> m ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef (Maybe SQLCursor)
lastCursorRef (SQLCursor -> Maybe SQLCursor
forall a. a -> Maybe a
Just SQLCursor
cursor)
                        Maybe
  (Value -> Parser (FinalVersionType (FromList (Versions event))))
Nothing -> do
                            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Unknown event version for " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> String
forall a. Show a => a -> String
show (EventData
eventData.eventName)
                            IORef (Maybe SQLCursor) -> Maybe SQLCursor -> m ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef (Maybe SQLCursor)
lastCursorRef (SQLCursor -> Maybe SQLCursor
forall a. a -> Maybe a
Just SQLCursor
cursor)
                else do
                    -- This matcher doesn't match, try next matcher
                    EventMatcher ts1 SQLStore m -> EventData -> m ()
forall (ts' :: [Symbol]).
EventMatcher ts' SQLStore m -> EventData -> m ()
tryMatchers EventMatcher ts1 SQLStore m
rest EventData
eventData

    -- Process all events in order, stopping if Stop is encountered
    let processAllEvents [] = () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        processAllEvents (EventData
event : [EventData]
remaining) = do
            shouldStop <- IORef Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef IORef Bool
stopRef
            if shouldStop
                then pure ()
                else do
                    tryMatchers matcher event
                    processAllEvents remaining

    processAllEvents batch

    -- Return the cursor and stop flag
    stopped <- readIORef stopRef
    lastCursor <- readIORef lastCursorRef

    -- Use last processed cursor, or fall back to last event in batch
    let finalCursor = case Maybe SQLCursor
lastCursor of
            Just SQLCursor
cursor -> SQLCursor
cursor
            Maybe SQLCursor
Nothing -> case [EventData] -> Maybe EventData
forall a. [a] -> Maybe a
listToMaybe ([EventData] -> [EventData]
forall a. [a] -> [a]
reverse [EventData]
batch) of
                Just EventData
lastEvent -> Int64 -> Int32 -> SQLCursor
SQLCursor EventData
lastEvent.transactionNo EventData
lastEvent.seqNo
                Maybe EventData
Nothing -> String -> SQLCursor
forall a. HasCallStack => String -> a
error String
"processEventBatch called with an empty batch"

    pure (finalCursor, not stopped)