{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wno-orphans #-}

{- |
Module      : Hindsight.Store.PostgreSQL.Projections.Sync
Description : PostgreSQL synchronous projection system
Copyright   : (c) 2024
License     : BSD3
Maintainer  : maintainer@example.com
Stability   : experimental

This module provides a complete synchronous projection system for PostgreSQL,
including registry management, event processing, catch-up functionality, and
transaction-based execution. It merges functionality from the original
SyncProjection, SyncProjectionCatchUp, and Projection.Sync modules.
-}
module Hindsight.Store.PostgreSQL.Projections.Sync (
    -- * Registry Management
    SyncProjectionRegistry,
    emptySyncProjectionRegistry,
    registerSyncProjection,

    -- * Event Processing
    executeSyncProjectionForEvent,

    -- * Catch-up Functionality
    catchUpSyncProjections,
    CatchUpError (..),

    -- * PostgreSQL-Specific Projection Functions
    matchEventHandlers,
    executeHandlerChain,
    handleProjectionError,
)
where

import Control.Exception (Exception)
import Control.Monad (forM, forM_)
import Data.Aeson qualified as Aeson
import Data.Int (Int32, Int64)
import Data.Map qualified as Map
import Data.Proxy (Proxy (..))
import Data.Text (Text, pack)
import Data.Time (UTCTime)
import Data.Typeable (Typeable)
import Data.UUID (UUID)
import Data.Vector qualified as Vector
import Hasql.Pool (Pool)
import Hasql.Pool qualified as Pool
import Hasql.Statement (Statement)
import Hasql.TH
import Hasql.Transaction qualified as HasqlTransaction
import Hasql.Transaction.Sessions qualified as TransactionSession
import Hindsight.Events (Event)
import Hindsight.Projection (ProjectionError (..), ProjectionId (..), ProjectionResult (..))
import Hindsight.Projection.Matching (ProjectionHandler, ProjectionHandlers (..), SomeProjectionHandler (..), extractMatchingHandlers, handlersForEventName)
import Hindsight.Store (CorrelationId (..), EventEnvelope (..), EventId (..), StreamId (..), StreamVersion (..))
import Hindsight.Store.Parsing (parseStoredEventToEnvelope)
import Hindsight.Store.PostgreSQL.Core.Types (SQLCursor (..), SQLStore, SomeProjectionHandlers (..), SyncProjectionRegistry (..))
import Hindsight.Store.PostgreSQL.Projections.State (
    SyncProjectionState (..),
    getActiveProjections,
    registerSyncProjectionInDb,
    updateSyncProjectionState,
 )

-- | Errors that can occur during catch-up
data CatchUpError
    = ProjectionExecutionError ProjectionId Text
    | DatabaseError Text
    | NoActiveProjections
    deriving (Int -> CatchUpError -> ShowS
[CatchUpError] -> ShowS
CatchUpError -> String
(Int -> CatchUpError -> ShowS)
-> (CatchUpError -> String)
-> ([CatchUpError] -> ShowS)
-> Show CatchUpError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> CatchUpError -> ShowS
showsPrec :: Int -> CatchUpError -> ShowS
$cshow :: CatchUpError -> String
show :: CatchUpError -> String
$cshowList :: [CatchUpError] -> ShowS
showList :: [CatchUpError] -> ShowS
Show, CatchUpError -> CatchUpError -> Bool
(CatchUpError -> CatchUpError -> Bool)
-> (CatchUpError -> CatchUpError -> Bool) -> Eq CatchUpError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: CatchUpError -> CatchUpError -> Bool
== :: CatchUpError -> CatchUpError -> Bool
$c/= :: CatchUpError -> CatchUpError -> Bool
/= :: CatchUpError -> CatchUpError -> Bool
Eq, Typeable)

instance Exception CatchUpError

-- | Stored event data from database
data StoredEvent = StoredEvent
    { StoredEvent -> Int64
transactionNo :: Int64
    , StoredEvent -> Int32
seqNo :: Int32
    , StoredEvent -> StreamId
streamId :: StreamId
    , StoredEvent -> EventId
eventId :: EventId
    , StoredEvent -> UTCTime
createdAt :: UTCTime
    , StoredEvent -> Maybe UUID
correlationId :: Maybe UUID
    , StoredEvent -> Text
eventName :: Text
    , StoredEvent -> Int32
eventVersion :: Int32
    , StoredEvent -> Value
payload :: Aeson.Value
    , StoredEvent -> Int64
streamVersion :: Int64
    }

-- =============================================================================
-- PostgreSQL-Specific Projection Functions
-- =============================================================================

{- | Type-safe handler matching using unified logic

Uses the common handler matching logic from Hindsight.Projection.Matching
-}
matchEventHandlers ::
    forall event ts backend.
    (Event event) =>
    ProjectionHandlers ts backend ->
    Proxy event ->
    [ProjectionHandler event backend]
matchEventHandlers :: forall (event :: Symbol) (ts :: [Symbol]) backend.
Event event =>
ProjectionHandlers ts backend
-> Proxy event -> [ProjectionHandler event backend]
matchEventHandlers = ProjectionHandlers ts backend
-> Proxy event -> [ProjectionHandler event backend]
forall (event :: Symbol) (ts :: [Symbol]) backend.
Event event =>
ProjectionHandlers ts backend
-> Proxy event -> [ProjectionHandler event backend]
extractMatchingHandlers

{- | Execute a chain of handlers for an event (PostgreSQL Transaction context)

This processes handlers in sequence, collecting results.
Unlike the generic version, this is specialized for PostgreSQL transactions.
-}
executeHandlerChain ::
    forall event backend.
    (Event event) =>
    [ProjectionHandler event backend] ->
    EventEnvelope event backend ->
    HasqlTransaction.Transaction [ProjectionResult]
executeHandlerChain :: forall (event :: Symbol) backend.
Event event =>
[ProjectionHandler event backend]
-> EventEnvelope event backend -> Transaction [ProjectionResult]
executeHandlerChain [] EventEnvelope event backend
_ = [ProjectionResult] -> Transaction [ProjectionResult]
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
executeHandlerChain (ProjectionHandler event backend
handler : [ProjectionHandler event backend]
rest) EventEnvelope event backend
envelope = do
    -- Execute the handler directly - exceptions will naturally propagate up
    -- and cause the transaction to be rolled back by the higher level code
    result <- do
        ProjectionHandler event backend
handler EventEnvelope event backend
envelope
        ProjectionResult -> Transaction ProjectionResult
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ProjectionResult
ProjectionSuccess
    results <- executeHandlerChain rest envelope
    pure (result : results)

-- | Handle projection errors in PostgreSQL Transaction context
handleProjectionError :: ProjectionError -> HasqlTransaction.Transaction ProjectionResult
handleProjectionError :: ProjectionError -> Transaction ProjectionResult
handleProjectionError ProjectionError
err = do
    -- In sync projections, errors should propagate to roll back the transaction
    Transaction ()
HasqlTransaction.condemn
    ProjectionResult -> Transaction ProjectionResult
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ProjectionError -> ProjectionResult
ProjectionError ProjectionError
err)

-- =============================================================================
-- Registry Management
-- =============================================================================

-- | Create an empty registry
emptySyncProjectionRegistry :: SyncProjectionRegistry
emptySyncProjectionRegistry :: SyncProjectionRegistry
emptySyncProjectionRegistry = Map ProjectionId (SomeProjectionHandlers SQLStore)
-> SyncProjectionRegistry
SyncProjectionRegistry Map ProjectionId (SomeProjectionHandlers SQLStore)
forall k a. Map k a
Map.empty

-- | Register a synchronous projection
registerSyncProjection ::
    ProjectionId ->
    ProjectionHandlers ts SQLStore ->
    SyncProjectionRegistry ->
    SyncProjectionRegistry
registerSyncProjection :: forall (ts :: [Symbol]).
ProjectionId
-> ProjectionHandlers ts SQLStore
-> SyncProjectionRegistry
-> SyncProjectionRegistry
registerSyncProjection ProjectionId
projId ProjectionHandlers ts SQLStore
handlers (SyncProjectionRegistry Map ProjectionId (SomeProjectionHandlers SQLStore)
reg) =
    let newReg :: Map ProjectionId (SomeProjectionHandlers SQLStore)
newReg = ProjectionId
-> SomeProjectionHandlers SQLStore
-> Map ProjectionId (SomeProjectionHandlers SQLStore)
-> Map ProjectionId (SomeProjectionHandlers SQLStore)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert ProjectionId
projId (ProjectionHandlers ts SQLStore -> SomeProjectionHandlers SQLStore
forall backend (ts :: [Symbol]).
ProjectionHandlers ts backend -> SomeProjectionHandlers backend
SomeProjectionHandlers ProjectionHandlers ts SQLStore
handlers) Map ProjectionId (SomeProjectionHandlers SQLStore)
reg
     in Map ProjectionId (SomeProjectionHandlers SQLStore)
-> SyncProjectionRegistry
SyncProjectionRegistry Map ProjectionId (SomeProjectionHandlers SQLStore)
newReg

-- =============================================================================
-- Event Processing
-- =============================================================================

-- | Execute all registered synchronous projections for a single event
executeSyncProjectionForEvent ::
    forall event.
    (Event event) =>
    SyncProjectionRegistry ->
    Proxy event ->
    EventEnvelope event SQLStore ->
    HasqlTransaction.Transaction ()
executeSyncProjectionForEvent :: forall (event :: Symbol).
Event event =>
SyncProjectionRegistry
-> Proxy event -> EventEnvelope event SQLStore -> Transaction ()
executeSyncProjectionForEvent (SyncProjectionRegistry Map ProjectionId (SomeProjectionHandlers SQLStore)
reg) Proxy event
eventProxy EventEnvelope event SQLStore
envelope = do
    [SomeProjectionHandlers SQLStore]
-> (SomeProjectionHandlers SQLStore -> Transaction ())
-> Transaction ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map ProjectionId (SomeProjectionHandlers SQLStore)
-> [SomeProjectionHandlers SQLStore]
forall k a. Map k a -> [a]
Map.elems Map ProjectionId (SomeProjectionHandlers SQLStore)
reg) ((SomeProjectionHandlers SQLStore -> Transaction ())
 -> Transaction ())
-> (SomeProjectionHandlers SQLStore -> Transaction ())
-> Transaction ()
forall a b. (a -> b) -> a -> b
$ \(SomeProjectionHandlers ProjectionHandlers ts SQLStore
handlers) ->
        ProjectionHandlers ts SQLStore
-> Proxy event -> EventEnvelope event SQLStore -> Transaction ()
forall (event :: Symbol) (ts :: [Symbol]).
Event event =>
ProjectionHandlers ts SQLStore
-> Proxy event -> EventEnvelope event SQLStore -> Transaction ()
processHandlersWithCommonLogic ProjectionHandlers ts SQLStore
handlers Proxy event
eventProxy EventEnvelope event SQLStore
envelope

-- | Process a set of handlers for a specific event
processHandlersWithCommonLogic ::
    forall event ts.
    (Event event) =>
    ProjectionHandlers ts SQLStore ->
    Proxy event ->
    EventEnvelope event SQLStore ->
    HasqlTransaction.Transaction ()
processHandlersWithCommonLogic :: forall (event :: Symbol) (ts :: [Symbol]).
Event event =>
ProjectionHandlers ts SQLStore
-> Proxy event -> EventEnvelope event SQLStore -> Transaction ()
processHandlersWithCommonLogic ProjectionHandlers ts SQLStore
handlers Proxy event
eventProxy EventEnvelope event SQLStore
envelope = do
    -- Use the common handler matching logic
    let matchingHandlers :: [ProjectionHandler event SQLStore]
matchingHandlers = ProjectionHandlers ts SQLStore
-> Proxy event -> [ProjectionHandler event SQLStore]
forall (event :: Symbol) (ts :: [Symbol]) backend.
Event event =>
ProjectionHandlers ts backend
-> Proxy event -> [ProjectionHandler event backend]
matchEventHandlers ProjectionHandlers ts SQLStore
handlers Proxy event
eventProxy

    -- Execute all matching handlers using the common execution logic
    results <- [ProjectionHandler event SQLStore]
-> EventEnvelope event SQLStore -> Transaction [ProjectionResult]
forall (event :: Symbol) backend.
Event event =>
[ProjectionHandler event backend]
-> EventEnvelope event backend -> Transaction [ProjectionResult]
executeHandlerChain [ProjectionHandler event SQLStore]
matchingHandlers EventEnvelope event SQLStore
envelope

    -- Handle any errors (will condemn the transaction)
    sequence_ [handleProjectionError err | ProjectionError err <- results]

-- =============================================================================
-- Catch-up Functionality
-- =============================================================================

-- | Catch up all sync projections to the latest events
catchUpSyncProjections ::
    Pool ->
    SyncProjectionRegistry ->
    IO (Either CatchUpError ())
catchUpSyncProjections :: Pool -> SyncProjectionRegistry -> IO (Either CatchUpError ())
catchUpSyncProjections Pool
pool registry :: SyncProjectionRegistry
registry@(SyncProjectionRegistry Map ProjectionId (SomeProjectionHandlers SQLStore)
regMap) = do
    if Map ProjectionId (SomeProjectionHandlers SQLStore) -> Bool
forall k a. Map k a -> Bool
Map.null Map ProjectionId (SomeProjectionHandlers SQLStore)
regMap
        then Either CatchUpError () -> IO (Either CatchUpError ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either CatchUpError ()
forall a b. b -> Either a b
Right ()) -- No projections to catch up
        else do
            result <-
                Pool
-> Session (Either CatchUpError ())
-> IO (Either UsageError (Either CatchUpError ()))
forall a. Pool -> Session a -> IO (Either UsageError a)
Pool.use Pool
pool (Session (Either CatchUpError ())
 -> IO (Either UsageError (Either CatchUpError ())))
-> Session (Either CatchUpError ())
-> IO (Either UsageError (Either CatchUpError ()))
forall a b. (a -> b) -> a -> b
$
                    IsolationLevel
-> Mode
-> Transaction (Either CatchUpError ())
-> Session (Either CatchUpError ())
forall a. IsolationLevel -> Mode -> Transaction a -> Session a
TransactionSession.transaction
                        IsolationLevel
TransactionSession.ReadCommitted
                        Mode
TransactionSession.Write
                        (SyncProjectionRegistry -> Transaction (Either CatchUpError ())
catchUpTransaction SyncProjectionRegistry
registry)

            case result of
                Left UsageError
err -> Either CatchUpError () -> IO (Either CatchUpError ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either CatchUpError () -> IO (Either CatchUpError ()))
-> Either CatchUpError () -> IO (Either CatchUpError ())
forall a b. (a -> b) -> a -> b
$ CatchUpError -> Either CatchUpError ()
forall a b. a -> Either a b
Left (CatchUpError -> Either CatchUpError ())
-> CatchUpError -> Either CatchUpError ()
forall a b. (a -> b) -> a -> b
$ Text -> CatchUpError
DatabaseError (Text -> CatchUpError) -> Text -> CatchUpError
forall a b. (a -> b) -> a -> b
$ String -> Text
pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ UsageError -> String
forall a. Show a => a -> String
show UsageError
err
                Right Either CatchUpError ()
res -> Either CatchUpError () -> IO (Either CatchUpError ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either CatchUpError ()
res

-- | Internal transaction for catching up projections
catchUpTransaction :: SyncProjectionRegistry -> HasqlTransaction.Transaction (Either CatchUpError ())
catchUpTransaction :: SyncProjectionRegistry -> Transaction (Either CatchUpError ())
catchUpTransaction registry :: SyncProjectionRegistry
registry@(SyncProjectionRegistry Map ProjectionId (SomeProjectionHandlers SQLStore)
regMap) = do
    -- Register all projections in the database
    [ProjectionId]
-> (ProjectionId -> Transaction ()) -> Transaction ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map ProjectionId (SomeProjectionHandlers SQLStore)
-> [ProjectionId]
forall k a. Map k a -> [k]
Map.keys Map ProjectionId (SomeProjectionHandlers SQLStore)
regMap) ((ProjectionId -> Transaction ()) -> Transaction ())
-> (ProjectionId -> Transaction ()) -> Transaction ()
forall a b. (a -> b) -> a -> b
$ \ProjectionId
projId ->
        ProjectionId -> Transaction ()
registerSyncProjectionInDb ProjectionId
projId

    -- Get active projections
    activeProjections <- Transaction [SyncProjectionState]
getActiveProjections

    if null activeProjections
        then do
            -- If there are registered projections but none are active in DB,
            -- we need to process all of them from the beginning
            let allProjections = Map ProjectionId (SomeProjectionHandlers SQLStore)
-> [ProjectionId]
forall k a. Map k a -> [k]
Map.keys Map ProjectionId (SomeProjectionHandlers SQLStore)
regMap
            results <- forM allProjections $ \ProjectionId
projId ->
                SyncProjectionRegistry
-> ProjectionId
-> SQLCursor
-> Transaction (Either CatchUpError ())
catchUpProjection SyncProjectionRegistry
registry ProjectionId
projId (Int64 -> Int32 -> SQLCursor
SQLCursor Int64
0 Int32
0)

            -- Check if any failed
            let errors = [CatchUpError
err | Left CatchUpError
err <- [Either CatchUpError ()]
results]
            if null errors
                then pure $ Right ()
                else case errors of
                    (CatchUpError
err : [CatchUpError]
_) -> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either CatchUpError () -> Transaction (Either CatchUpError ()))
-> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a b. (a -> b) -> a -> b
$ CatchUpError -> Either CatchUpError ()
forall a b. a -> Either a b
Left CatchUpError
err
                    [] -> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either CatchUpError () -> Transaction (Either CatchUpError ()))
-> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a b. (a -> b) -> a -> b
$ () -> Either CatchUpError ()
forall a b. b -> Either a b
Right ()
        else do
            -- Process each active projection
            results <- forM activeProjections $ \SyncProjectionState
projState -> do
                let cursor :: SQLCursor
cursor =
                        Int64 -> Int32 -> SQLCursor
SQLCursor
                            SyncProjectionState
projState.lastProcessedTransactionNo
                            SyncProjectionState
projState.lastProcessedSeqNo
                SyncProjectionRegistry
-> ProjectionId
-> SQLCursor
-> Transaction (Either CatchUpError ())
catchUpProjection SyncProjectionRegistry
registry SyncProjectionState
projState.projectionId SQLCursor
cursor

            -- Check if any failed
            let errors = [CatchUpError
err | Left CatchUpError
err <- [Either CatchUpError ()]
results]
            if null errors
                then pure $ Right ()
                else case errors of
                    (CatchUpError
err : [CatchUpError]
_) -> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either CatchUpError () -> Transaction (Either CatchUpError ()))
-> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a b. (a -> b) -> a -> b
$ CatchUpError -> Either CatchUpError ()
forall a b. a -> Either a b
Left CatchUpError
err
                    [] -> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either CatchUpError () -> Transaction (Either CatchUpError ()))
-> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a b. (a -> b) -> a -> b
$ () -> Either CatchUpError ()
forall a b. b -> Either a b
Right ()

-- | Catch up a single projection
catchUpProjection ::
    SyncProjectionRegistry ->
    ProjectionId ->
    SQLCursor ->
    HasqlTransaction.Transaction (Either CatchUpError ())
catchUpProjection :: SyncProjectionRegistry
-> ProjectionId
-> SQLCursor
-> Transaction (Either CatchUpError ())
catchUpProjection (SyncProjectionRegistry Map ProjectionId (SomeProjectionHandlers SQLStore)
regMap) ProjectionId
projId SQLCursor
cursor = do
    -- Check if this projection is actually registered
    case ProjectionId
-> Map ProjectionId (SomeProjectionHandlers SQLStore)
-> Maybe (SomeProjectionHandlers SQLStore)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup ProjectionId
projId Map ProjectionId (SomeProjectionHandlers SQLStore)
regMap of
        Maybe (SomeProjectionHandlers SQLStore)
Nothing -> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either CatchUpError () -> Transaction (Either CatchUpError ()))
-> Either CatchUpError () -> Transaction (Either CatchUpError ())
forall a b. (a -> b) -> a -> b
$ () -> Either CatchUpError ()
forall a b. b -> Either a b
Right () -- Skip unregistered projections
        Just (SomeProjectionHandlers ProjectionHandlers ts SQLStore
handlers) -> do
            -- Get unprocessed events
            events <- SQLCursor -> Transaction [StoredEvent]
getUnprocessedEvents SQLCursor
cursor

            -- Process each event
            forM_ events $ \StoredEvent
storedEvent -> do
                -- Process this event through the projection handlers
                ProjectionHandlers ts SQLStore -> StoredEvent -> Transaction ()
forall (ts :: [Symbol]).
ProjectionHandlers ts SQLStore -> StoredEvent -> Transaction ()
processStoredEvent ProjectionHandlers ts SQLStore
handlers StoredEvent
storedEvent

                -- Update projection state after successful processing
                let eventCursor :: SQLCursor
eventCursor = Int64 -> Int32 -> SQLCursor
SQLCursor StoredEvent
storedEvent.transactionNo StoredEvent
storedEvent.seqNo
                ProjectionId -> SQLCursor -> Transaction ()
updateSyncProjectionState ProjectionId
projId SQLCursor
eventCursor

            pure $ Right ()

-- | Process a stored event through projection handlers
processStoredEvent ::
    forall ts.
    ProjectionHandlers ts SQLStore ->
    StoredEvent ->
    HasqlTransaction.Transaction ()
processStoredEvent :: forall (ts :: [Symbol]).
ProjectionHandlers ts SQLStore -> StoredEvent -> Transaction ()
processStoredEvent ProjectionHandlers ts SQLStore
handlers StoredEvent
storedEvent = do
    -- Find matching handlers for the event
    let matchingHandlers :: [SomeProjectionHandler SQLStore]
matchingHandlers = Text
-> ProjectionHandlers ts SQLStore
-> [SomeProjectionHandler SQLStore]
forall (ts :: [Symbol]) backend.
Text
-> ProjectionHandlers ts backend -> [SomeProjectionHandler backend]
handlersForEventName StoredEvent
storedEvent.eventName ProjectionHandlers ts SQLStore
handlers

    -- Process each matching handler
    [SomeProjectionHandler SQLStore]
-> (SomeProjectionHandler SQLStore -> Transaction ())
-> Transaction ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [SomeProjectionHandler SQLStore]
matchingHandlers ((SomeProjectionHandler SQLStore -> Transaction ())
 -> Transaction ())
-> (SomeProjectionHandler SQLStore -> Transaction ())
-> Transaction ()
forall a b. (a -> b) -> a -> b
$ \(SomeProjectionHandler Proxy event
eventProxy ProjectionHandler event SQLStore
handler) -> do
        -- Try to parse and process the event
        case Proxy event
-> EventId
-> StreamId
-> Cursor SQLStore
-> StreamVersion
-> Maybe CorrelationId
-> UTCTime
-> Value
-> Integer
-> Maybe (EventEnvelope event SQLStore)
forall (event :: Symbol) backend.
Event event =>
Proxy event
-> EventId
-> StreamId
-> Cursor backend
-> StreamVersion
-> Maybe CorrelationId
-> UTCTime
-> Value
-> Integer
-> Maybe (EventEnvelope event backend)
parseStoredEventToEnvelope
            Proxy event
eventProxy
            StoredEvent
storedEvent.eventId
            StoredEvent
storedEvent.streamId
            (Int64 -> Int32 -> SQLCursor
SQLCursor StoredEvent
storedEvent.transactionNo StoredEvent
storedEvent.seqNo)
            (Int64 -> StreamVersion
StreamVersion StoredEvent
storedEvent.streamVersion)
            (UUID -> CorrelationId
CorrelationId (UUID -> CorrelationId) -> Maybe UUID -> Maybe CorrelationId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StoredEvent
storedEvent.correlationId)
            StoredEvent
storedEvent.createdAt
            StoredEvent
storedEvent.payload
            (Int32 -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral StoredEvent
storedEvent.eventVersion) of
            Just EventEnvelope event SQLStore
envelope -> ProjectionHandler event SQLStore
handler EventEnvelope event SQLStore
envelope
            Maybe (EventEnvelope event SQLStore)
Nothing -> () -> Transaction ()
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- Skip if parsing fails

-- | Get unprocessed events from the database
getUnprocessedEvents ::
    SQLCursor ->
    HasqlTransaction.Transaction [StoredEvent]
getUnprocessedEvents :: SQLCursor -> Transaction [StoredEvent]
getUnprocessedEvents (SQLCursor Int64
lastTxNo Int32
lastSeqNo) = do
    results <- (Int64, Int32)
-> Statement
     (Int64, Int32)
     (Vector
        (Int64, Int32, UUID, UUID, UTCTime, Maybe UUID, Text, Int32, Value,
         Int64))
-> Transaction
     (Vector
        (Int64, Int32, UUID, UUID, UTCTime, Maybe UUID, Text, Int32, Value,
         Int64))
forall a b. a -> Statement a b -> Transaction b
HasqlTransaction.statement (Int64
lastTxNo, Int32
lastSeqNo) Statement
  (Int64, Int32)
  (Vector
     (Int64, Int32, UUID, UUID, UTCTime, Maybe UUID, Text, Int32, Value,
      Int64))
getEventsStmt
    pure $ map toStoredEvent $ Vector.toList results
  where
    getEventsStmt :: Statement (Int64, Int32) (Vector.Vector (Int64, Int32, UUID, UUID, UTCTime, Maybe UUID, Text, Int32, Aeson.Value, Int64))
    getEventsStmt :: Statement
  (Int64, Int32)
  (Vector
     (Int64, Int32, UUID, UUID, UTCTime, Maybe UUID, Text, Int32, Value,
      Int64))
getEventsStmt =
        [vectorStatement|
        SELECT
          e.transaction_no :: int8,
          e.seq_no :: int4,
          e.stream_id :: uuid,
          e.event_id :: uuid,
          e.created_at :: timestamptz,
          e.correlation_id :: uuid?,
          e.event_name :: text,
          e.event_version :: int4,
          e.payload :: jsonb,
          e.stream_version :: int8
        FROM events e
        WHERE (e.transaction_no > $1 :: int8)
           OR (e.transaction_no = $1 :: int8 AND e.seq_no > $2 :: int4)
        ORDER BY e.transaction_no, e.seq_no
        LIMIT 1000
      |]

    toStoredEvent :: (Int64, Int32, UUID, UUID, UTCTime, Maybe UUID, Text, Int32, Value,
 Int64)
-> StoredEvent
toStoredEvent (Int64
txNo, Int32
seqNo, UUID
streamId, UUID
eventId, UTCTime
createdAt, Maybe UUID
corrId, Text
eventName, Int32
eventVersion, Value
payload, Int64
streamVer) =
        StoredEvent
            { transactionNo :: Int64
transactionNo = Int64
txNo
            , seqNo :: Int32
seqNo = Int32
seqNo
            , streamId :: StreamId
streamId = UUID -> StreamId
StreamId UUID
streamId
            , eventId :: EventId
eventId = UUID -> EventId
EventId UUID
eventId
            , createdAt :: UTCTime
createdAt = UTCTime
createdAt
            , correlationId :: Maybe UUID
correlationId = Maybe UUID
corrId
            , eventName :: Text
eventName = Text
eventName
            , eventVersion :: Int32
eventVersion = Int32
eventVersion
            , payload :: Value
payload = Value
payload
            , streamVersion :: Int64
streamVersion = Int64
streamVer
            }