{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}

{- |
Module      : Hindsight.Projection
Description : Backend-agnostic projection system for building read models
Copyright   : (c) 2024
License     : BSD3
Maintainer  : maintainer@example.com
Stability   : experimental

This module provides the projection system for transforming event streams into
queryable read models using PostgreSQL.

= Overview

Projections subscribe to events from ANY backend (Memory, Filesystem, or PostgreSQL)
and execute handlers within PostgreSQL transactions. State is persisted in the
@projections@ table, enabling:

* Automatic resumption after restarts
* Progress tracking via cursors
* LISTEN\/NOTIFY for efficient waiting

= Backend-Agnostic Design

The key insight: projection execution and state are ALWAYS in PostgreSQL, but
events can come from any storage backend:

@
-- Events from MemoryStore, projections in PostgreSQL
runProjection projId pool Nothing memoryStore handlers

-- Events from PostgreSQL, projections in PostgreSQL
runProjection projId pool Nothing sqlStore handlers
@

This enables testing with fast in-memory events while validating real SQL projection logic.

= Asynchronous Projections (This Module)

Projections run in a separate thread and process events asynchronously:

* Eventually consistent (small delay between insert and projection update)
* Work with ANY event store backend
* Failures don't block event insertion

For synchronous projections (PostgreSQL-only), see "Hindsight.Store.PostgreSQL".

= Example: User Directory Projection

@
import Hindsight.Projection
import Hindsight.Projection.Matching (ProjectionHandlers(..))
import Hindsight.Store (match)

-- Define handler
userProjection :: ProjectionHandlers '[\"user_registered\"] backend
userProjection =
  match \"user_registered\" handleUser :-> ProjectionEnd
  where
    handleUser envelope = do
      let user = envelope.payload
      Transaction.statement (user.userId, user.email)
        [resultlessStatement|
          INSERT INTO users (id, email) VALUES (\$1 :: uuid, \$2 :: text)
        |]

-- Run projection
main = do
  pool <- createPool postgresSettings
  store <- newMemoryStore  -- Or any backend
  runProjection (ProjectionId \"users\") pool Nothing store userProjection
@

= Waiting for Projection Progress

Use 'waitForEvent' to synchronize with projection progress:

@
result <- insertEvents store Nothing batch
case result of
  SuccessfulInsertion cursor -> do
    -- Wait for projection to catch up
    bracket
      (Connection.acquire settings)
      Connection.release
      (waitForEvent projId cursor.finalCursor)
@

This uses PostgreSQL LISTEN\/NOTIFY for efficient waiting without polling.

= Projection State Management

Projection state is tracked in the @projections@ table with:

* @id@ - Unique projection identifier
* @last_updated@ - Timestamp of last event processed
* @head_position@ - Cursor position of last processed event (JSON)
* @is_active@ - Whether projection is currently running

The 'loadProjectionState' function reads this state, and handlers automatically
update it after each successful event.
-}
module Hindsight.Projection (
    -- * Projection types
    ProjectionId (..),
    ProjectionState (..),
    ProjectionStateError (..),

    -- * Projection results and errors
    ProjectionResult (..),
    ProjectionError (..),

    -- * Running projections
    runProjection,
    loadProjectionState,

    -- * Waiting for events
    waitForEvent,
)
where

import Control.Concurrent (forkIO, killThread)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import Control.Exception (Exception, SomeException, bracket, bracket_, throwIO)
import Control.Monad (when)
import Control.Monad.IO.Class (liftIO)
import Data.Aeson (FromJSON, ToJSON, (.:))
import Data.Aeson qualified as Aeson
import Data.ByteString.Char8 qualified as ByteString
import Data.Function ((&))
import Data.Profunctor (dimap)
import Data.Text (Text, pack)
import Data.Text.Encoding qualified as Data.Text
import Data.Time (UTCTime, getCurrentTime)
import GHC.Generics (Generic)
import Hasql.Connection (Connection)
import Hasql.Notifications qualified as Notifications
import Hasql.Pool (Pool)
import Hasql.Pool qualified as Pool
import Hasql.Session (Session)
import Hasql.Session qualified as Session
import Hasql.Statement (Statement)
import Hasql.TH (maybeStatement)
import Hasql.Transaction qualified as Transaction
import Hasql.Transaction.Sessions qualified as Session
import Hindsight.Projection.Matching (ProjectionHandler, ProjectionHandlers (..))
import Hindsight.Projection.State qualified as ProjectionState
import Hindsight.Store (
    BackendHandle,
    Cursor,
    EventEnvelope (position),
    EventHandler,
    EventMatcher (..),
    EventSelector (EventSelector, startupPosition, streamId),
    EventStore (StoreConstraints, subscribe),
    StartupPosition (FromBeginning, FromLastProcessed),
    StreamSelector (AllStreams),
    SubscriptionResult (Continue, Stop),
 )
import UnliftIO (MonadUnliftIO (..))
import UnliftIO.STM (TVar, atomically, writeTVar)

--------------------------------------------------------------------------------
-- Projection types
--------------------------------------------------------------------------------

newtype ProjectionId = ProjectionId
    {ProjectionId -> Text
unProjectionId :: Text}
    deriving (Int -> ProjectionId -> ShowS
[ProjectionId] -> ShowS
ProjectionId -> String
(Int -> ProjectionId -> ShowS)
-> (ProjectionId -> String)
-> ([ProjectionId] -> ShowS)
-> Show ProjectionId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ProjectionId -> ShowS
showsPrec :: Int -> ProjectionId -> ShowS
$cshow :: ProjectionId -> String
show :: ProjectionId -> String
$cshowList :: [ProjectionId] -> ShowS
showList :: [ProjectionId] -> ShowS
Show, ProjectionId -> ProjectionId -> Bool
(ProjectionId -> ProjectionId -> Bool)
-> (ProjectionId -> ProjectionId -> Bool) -> Eq ProjectionId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ProjectionId -> ProjectionId -> Bool
== :: ProjectionId -> ProjectionId -> Bool
$c/= :: ProjectionId -> ProjectionId -> Bool
/= :: ProjectionId -> ProjectionId -> Bool
Eq, Eq ProjectionId
Eq ProjectionId =>
(ProjectionId -> ProjectionId -> Ordering)
-> (ProjectionId -> ProjectionId -> Bool)
-> (ProjectionId -> ProjectionId -> Bool)
-> (ProjectionId -> ProjectionId -> Bool)
-> (ProjectionId -> ProjectionId -> Bool)
-> (ProjectionId -> ProjectionId -> ProjectionId)
-> (ProjectionId -> ProjectionId -> ProjectionId)
-> Ord ProjectionId
ProjectionId -> ProjectionId -> Bool
ProjectionId -> ProjectionId -> Ordering
ProjectionId -> ProjectionId -> ProjectionId
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: ProjectionId -> ProjectionId -> Ordering
compare :: ProjectionId -> ProjectionId -> Ordering
$c< :: ProjectionId -> ProjectionId -> Bool
< :: ProjectionId -> ProjectionId -> Bool
$c<= :: ProjectionId -> ProjectionId -> Bool
<= :: ProjectionId -> ProjectionId -> Bool
$c> :: ProjectionId -> ProjectionId -> Bool
> :: ProjectionId -> ProjectionId -> Bool
$c>= :: ProjectionId -> ProjectionId -> Bool
>= :: ProjectionId -> ProjectionId -> Bool
$cmax :: ProjectionId -> ProjectionId -> ProjectionId
max :: ProjectionId -> ProjectionId -> ProjectionId
$cmin :: ProjectionId -> ProjectionId -> ProjectionId
min :: ProjectionId -> ProjectionId -> ProjectionId
Ord)

--------------------------------------------------------------------------------
-- Projection results and errors
--------------------------------------------------------------------------------

-- | Result of projection execution
data ProjectionResult
    = ProjectionSuccess
    | -- | Handler didn't match the event
      ProjectionSkipped
    | ProjectionError ProjectionError
    deriving (Int -> ProjectionResult -> ShowS
[ProjectionResult] -> ShowS
ProjectionResult -> String
(Int -> ProjectionResult -> ShowS)
-> (ProjectionResult -> String)
-> ([ProjectionResult] -> ShowS)
-> Show ProjectionResult
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ProjectionResult -> ShowS
showsPrec :: Int -> ProjectionResult -> ShowS
$cshow :: ProjectionResult -> String
show :: ProjectionResult -> String
$cshowList :: [ProjectionResult] -> ShowS
showList :: [ProjectionResult] -> ShowS
Show, ProjectionResult -> ProjectionResult -> Bool
(ProjectionResult -> ProjectionResult -> Bool)
-> (ProjectionResult -> ProjectionResult -> Bool)
-> Eq ProjectionResult
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ProjectionResult -> ProjectionResult -> Bool
== :: ProjectionResult -> ProjectionResult -> Bool
$c/= :: ProjectionResult -> ProjectionResult -> Bool
/= :: ProjectionResult -> ProjectionResult -> Bool
Eq)

-- | Types of projection errors
data ProjectionError
    = ParseError Text
    | HandlerError SomeException
    | BackendError Text
    deriving (Int -> ProjectionError -> ShowS
[ProjectionError] -> ShowS
ProjectionError -> String
(Int -> ProjectionError -> ShowS)
-> (ProjectionError -> String)
-> ([ProjectionError] -> ShowS)
-> Show ProjectionError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ProjectionError -> ShowS
showsPrec :: Int -> ProjectionError -> ShowS
$cshow :: ProjectionError -> String
show :: ProjectionError -> String
$cshowList :: [ProjectionError] -> ShowS
showList :: [ProjectionError] -> ShowS
Show)

{- | Manual Eq instance for ProjectionError

SomeException doesn't have an Eq instance, so we compare based on the string representation
-}
instance Eq ProjectionError where
    (ParseError Text
t1) == :: ProjectionError -> ProjectionError -> Bool
== (ParseError Text
t2) = Text
t1 Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
t2
    (HandlerError SomeException
e1) == (HandlerError SomeException
e2) = SomeException -> String
forall a. Show a => a -> String
show SomeException
e1 String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== SomeException -> String
forall a. Show a => a -> String
show SomeException
e2
    (BackendError Text
t1) == (BackendError Text
t2) = Text
t1 Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
t2
    ProjectionError
_ == ProjectionError
_ = Bool
False

-- | State of a running projection tracked in PostgreSQL.
data ProjectionState backend = ProjectionState
    { forall backend. ProjectionState backend -> ProjectionId
projectionId :: ProjectionId
    -- ^ Unique identifier for this projection
    , forall backend. ProjectionState backend -> Cursor backend
lastProcessed :: Cursor backend
    -- ^ Last event cursor successfully processed
    , forall backend. ProjectionState backend -> UTCTime
lastUpdated :: UTCTime
    -- ^ Timestamp of last state update
    }

data ProjectionStateError = ProjectionStateError
    {ProjectionStateError -> Text
errorMessage :: Text}
    deriving (Int -> ProjectionStateError -> ShowS
[ProjectionStateError] -> ShowS
ProjectionStateError -> String
(Int -> ProjectionStateError -> ShowS)
-> (ProjectionStateError -> String)
-> ([ProjectionStateError] -> ShowS)
-> Show ProjectionStateError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ProjectionStateError -> ShowS
showsPrec :: Int -> ProjectionStateError -> ShowS
$cshow :: ProjectionStateError -> String
show :: ProjectionStateError -> String
$cshowList :: [ProjectionStateError] -> ShowS
showList :: [ProjectionStateError] -> ShowS
Show, ProjectionStateError -> ProjectionStateError -> Bool
(ProjectionStateError -> ProjectionStateError -> Bool)
-> (ProjectionStateError -> ProjectionStateError -> Bool)
-> Eq ProjectionStateError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ProjectionStateError -> ProjectionStateError -> Bool
== :: ProjectionStateError -> ProjectionStateError -> Bool
$c/= :: ProjectionStateError -> ProjectionStateError -> Bool
/= :: ProjectionStateError -> ProjectionStateError -> Bool
Eq, (forall x. ProjectionStateError -> Rep ProjectionStateError x)
-> (forall x. Rep ProjectionStateError x -> ProjectionStateError)
-> Generic ProjectionStateError
forall x. Rep ProjectionStateError x -> ProjectionStateError
forall x. ProjectionStateError -> Rep ProjectionStateError x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. ProjectionStateError -> Rep ProjectionStateError x
from :: forall x. ProjectionStateError -> Rep ProjectionStateError x
$cto :: forall x. Rep ProjectionStateError x -> ProjectionStateError
to :: forall x. Rep ProjectionStateError x -> ProjectionStateError
Generic, Maybe ProjectionStateError
Value -> Parser [ProjectionStateError]
Value -> Parser ProjectionStateError
(Value -> Parser ProjectionStateError)
-> (Value -> Parser [ProjectionStateError])
-> Maybe ProjectionStateError
-> FromJSON ProjectionStateError
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser ProjectionStateError
parseJSON :: Value -> Parser ProjectionStateError
$cparseJSONList :: Value -> Parser [ProjectionStateError]
parseJSONList :: Value -> Parser [ProjectionStateError]
$comittedField :: Maybe ProjectionStateError
omittedField :: Maybe ProjectionStateError
FromJSON, [ProjectionStateError] -> Value
[ProjectionStateError] -> Encoding
ProjectionStateError -> Bool
ProjectionStateError -> Value
ProjectionStateError -> Encoding
(ProjectionStateError -> Value)
-> (ProjectionStateError -> Encoding)
-> ([ProjectionStateError] -> Value)
-> ([ProjectionStateError] -> Encoding)
-> (ProjectionStateError -> Bool)
-> ToJSON ProjectionStateError
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: ProjectionStateError -> Value
toJSON :: ProjectionStateError -> Value
$ctoEncoding :: ProjectionStateError -> Encoding
toEncoding :: ProjectionStateError -> Encoding
$ctoJSONList :: [ProjectionStateError] -> Value
toJSONList :: [ProjectionStateError] -> Value
$ctoEncodingList :: [ProjectionStateError] -> Encoding
toEncodingList :: [ProjectionStateError] -> Encoding
$comitField :: ProjectionStateError -> Bool
omitField :: ProjectionStateError -> Bool
ToJSON)

instance Exception ProjectionStateError

--------------------------------------------------------------------------------
-- Main projection runner
--------------------------------------------------------------------------------

{- | Run a projection continuously, processing events and maintaining state in PostgreSQL.

The projection subscribes to events from the provided backend and executes handlers
within PostgreSQL transactions. State is persisted after each successful event processing.
-}
runProjection ::
    forall backend m ts.
    ( EventStore backend
    , MonadFail m
    , FromJSON (Cursor backend)
    , ToJSON (Cursor backend)
    , StoreConstraints backend m
    , MonadUnliftIO m
    ) =>
    -- | Unique identifier for this projection
    ProjectionId ->
    -- | PostgreSQL connection pool for state management
    Pool ->
    -- | Optional TVar for exposing state to other threads
    Maybe (TVar (Maybe (ProjectionState backend))) ->
    -- | Event store backend to subscribe to
    BackendHandle backend ->
    -- | Handlers for processing events
    ProjectionHandlers ts backend ->
    -- | Returns when subscription ends
    m ()
runProjection :: forall backend (m :: * -> *) (ts :: [Symbol]).
(EventStore backend, MonadFail m, FromJSON (Cursor backend),
 ToJSON (Cursor backend), StoreConstraints backend m,
 MonadUnliftIO m) =>
ProjectionId
-> Pool
-> Maybe (TVar (Maybe (ProjectionState backend)))
-> BackendHandle backend
-> ProjectionHandlers ts backend
-> m ()
runProjection ProjectionId
projId Pool
pool Maybe (TVar (Maybe (ProjectionState backend)))
mbTVar BackendHandle backend
store ProjectionHandlers ts backend
handlers = do
    -- Load projection state
    mbLastState <- ProjectionId -> Pool -> m (Maybe (ProjectionState backend))
forall (m :: * -> *) backend.
(MonadUnliftIO m, MonadFail m, FromJSON (Cursor backend)) =>
ProjectionId -> Pool -> m (Maybe (ProjectionState backend))
loadProjectionState ProjectionId
projId Pool
pool

    -- Update TVar if provided
    case (mbTVar, mbLastState) of
        (Just TVar (Maybe (ProjectionState backend))
tvar, Just ProjectionState backend
state) ->
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (ProjectionState backend))
-> Maybe (ProjectionState backend) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (ProjectionState backend))
tvar (ProjectionState backend -> Maybe (ProjectionState backend)
forall a. a -> Maybe a
Just ProjectionState backend
state)
        (Maybe (TVar (Maybe (ProjectionState backend))),
 Maybe (ProjectionState backend))
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

    -- Start subscription

    _ <-
        subscribe
            store
            (makeEventMatcher projId pool mbTVar handlers)
            EventSelector
                { streamId = AllStreams
                , startupPosition = maybe FromBeginning FromLastProcessed (fmap (.lastProcessed) mbLastState)
                }

    pure ()

{- | Load the current state of a projection from PostgreSQL.

Returns Nothing if the projection has never been run, or throws ProjectionStateError
if there's a database or JSON parsing error.
-}
loadProjectionState ::
    (MonadUnliftIO m, MonadFail m, FromJSON (Cursor backend)) =>
    -- | Projection identifier
    ProjectionId ->
    -- | PostgreSQL connection pool
    Pool ->
    -- | Current state, or Nothing if never run
    m (Maybe (ProjectionState backend))
loadProjectionState :: forall (m :: * -> *) backend.
(MonadUnliftIO m, MonadFail m, FromJSON (Cursor backend)) =>
ProjectionId -> Pool -> m (Maybe (ProjectionState backend))
loadProjectionState ProjectionId
projId Pool
pool = do
    result <- IO
  (Either
     UsageError
     (Maybe (Either ProjectionStateError (ProjectionState backend))))
-> m (Either
        UsageError
        (Maybe (Either ProjectionStateError (ProjectionState backend))))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
   (Either
      UsageError
      (Maybe (Either ProjectionStateError (ProjectionState backend))))
 -> m (Either
         UsageError
         (Maybe (Either ProjectionStateError (ProjectionState backend)))))
-> IO
     (Either
        UsageError
        (Maybe (Either ProjectionStateError (ProjectionState backend))))
-> m (Either
        UsageError
        (Maybe (Either ProjectionStateError (ProjectionState backend))))
forall a b. (a -> b) -> a -> b
$ Pool
-> Session
     (Maybe (Either ProjectionStateError (ProjectionState backend)))
-> IO
     (Either
        UsageError
        (Maybe (Either ProjectionStateError (ProjectionState backend))))
forall a. Pool -> Session a -> IO (Either UsageError a)
Pool.use Pool
pool (Session
   (Maybe (Either ProjectionStateError (ProjectionState backend)))
 -> IO
      (Either
         UsageError
         (Maybe (Either ProjectionStateError (ProjectionState backend)))))
-> Session
     (Maybe (Either ProjectionStateError (ProjectionState backend)))
-> IO
     (Either
        UsageError
        (Maybe (Either ProjectionStateError (ProjectionState backend))))
forall a b. (a -> b) -> a -> b
$ ProjectionId
-> Session
     (Maybe (Either ProjectionStateError (ProjectionState backend)))
forall backend.
FromJSON (Cursor backend) =>
ProjectionId
-> Session
     (Maybe (Either ProjectionStateError (ProjectionState backend)))
getProjectionState ProjectionId
projId
    case result of
        Left UsageError
err -> do
            -- Database error - propagate it with context
            IO (Maybe (ProjectionState backend))
-> m (Maybe (ProjectionState backend))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (ProjectionState backend))
 -> m (Maybe (ProjectionState backend)))
-> IO (Maybe (ProjectionState backend))
-> m (Maybe (ProjectionState backend))
forall a b. (a -> b) -> a -> b
$
                ProjectionStateError -> IO (Maybe (ProjectionState backend))
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (ProjectionStateError -> IO (Maybe (ProjectionState backend)))
-> ProjectionStateError -> IO (Maybe (ProjectionState backend))
forall a b. (a -> b) -> a -> b
$
                    Text -> ProjectionStateError
ProjectionStateError (Text -> ProjectionStateError) -> Text -> ProjectionStateError
forall a b. (a -> b) -> a -> b
$
                        Text
"Failed to query projection state for " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (String -> Text
pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ ProjectionId -> String
forall a. Show a => a -> String
show ProjectionId
projId) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
": " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (String -> Text
pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ UsageError -> String
forall a. Show a => a -> String
show UsageError
err)
        Right Maybe (Either ProjectionStateError (ProjectionState backend))
mbStateResult -> case Maybe (Either ProjectionStateError (ProjectionState backend))
mbStateResult of
            Maybe (Either ProjectionStateError (ProjectionState backend))
Nothing -> Maybe (ProjectionState backend)
-> m (Maybe (ProjectionState backend))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (ProjectionState backend)
forall a. Maybe a
Nothing -- Table exists, but no row for this projection
            Just (Left ProjectionStateError
err) -> IO (Maybe (ProjectionState backend))
-> m (Maybe (ProjectionState backend))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (ProjectionState backend))
 -> m (Maybe (ProjectionState backend)))
-> IO (Maybe (ProjectionState backend))
-> m (Maybe (ProjectionState backend))
forall a b. (a -> b) -> a -> b
$ ProjectionStateError -> IO (Maybe (ProjectionState backend))
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO ProjectionStateError
err -- JSON parsing error
            Just (Right ProjectionState backend
state) -> Maybe (ProjectionState backend)
-> m (Maybe (ProjectionState backend))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (ProjectionState backend)
 -> m (Maybe (ProjectionState backend)))
-> Maybe (ProjectionState backend)
-> m (Maybe (ProjectionState backend))
forall a b. (a -> b) -> a -> b
$ ProjectionState backend -> Maybe (ProjectionState backend)
forall a. a -> Maybe a
Just ProjectionState backend
state -- Successfully loaded state

--------------------------------------------------------------------------------
-- Event matcher
--------------------------------------------------------------------------------

makeEventMatcher ::
    forall ts backend m.
    (MonadUnliftIO m, ToJSON (Cursor backend)) =>
    ProjectionId ->
    Pool ->
    Maybe (TVar (Maybe (ProjectionState backend))) ->
    ProjectionHandlers ts backend ->
    EventMatcher ts backend m
makeEventMatcher :: forall (ts :: [Symbol]) backend (m :: * -> *).
(MonadUnliftIO m, ToJSON (Cursor backend)) =>
ProjectionId
-> Pool
-> Maybe (TVar (Maybe (ProjectionState backend)))
-> ProjectionHandlers ts backend
-> EventMatcher ts backend m
makeEventMatcher ProjectionId
projId Pool
pool Maybe (TVar (Maybe (ProjectionState backend)))
mbTVar = ProjectionHandlers ts backend -> EventMatcher ts backend m
forall (ts' :: [Symbol]).
ProjectionHandlers ts' backend -> EventMatcher ts' backend m
go
  where
    go :: ProjectionHandlers ts' backend -> EventMatcher ts' backend m
    go :: forall (ts' :: [Symbol]).
ProjectionHandlers ts' backend -> EventMatcher ts' backend m
go ProjectionHandlers ts' backend
ProjectionEnd = EventMatcher ts' backend m
EventMatcher '[] backend m
forall backend (m :: * -> *). EventMatcher '[] backend m
MatchEnd
    go ((Proxy event
proxy, ProjectionHandler event backend
handler) :-> ProjectionHandlers ts1 backend
rest) =
        (Proxy event
proxy, ProjectionHandler event backend -> EventHandler event m backend
forall (event :: Symbol).
ProjectionHandler event backend -> EventHandler event m backend
handleEvent ProjectionHandler event backend
handler) (Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? ProjectionHandlers ts1 backend -> EventMatcher ts1 backend m
forall (ts' :: [Symbol]).
ProjectionHandlers ts' backend -> EventMatcher ts' backend m
go ProjectionHandlers ts1 backend
rest

    handleEvent ::
        forall event.
        ProjectionHandler event backend ->
        EventHandler event m backend
    handleEvent :: forall (event :: Symbol).
ProjectionHandler event backend -> EventHandler event m backend
handleEvent ProjectionHandler event backend
projHandler EventEnvelope event backend
envelope = do
        now <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
        let state =
                ProjectionState
                    { projectionId :: ProjectionId
projectionId = ProjectionId
projId
                    , lastProcessed :: Cursor backend
lastProcessed = EventEnvelope event backend
envelope.position
                    , lastUpdated :: UTCTime
lastUpdated = UTCTime
now
                    }

        -- Run projection logic in transaction
        result <- liftIO $
            Pool.use pool $
                Session.transaction Session.ReadCommitted Session.Write $ do
                    projHandler envelope
                    Transaction.statement state (updateProjectionStatement)

        case result of
            Left UsageError
_err -> do
                SubscriptionResult -> m SubscriptionResult
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Stop
            Right ()
_ -> do
                case Maybe (TVar (Maybe (ProjectionState backend)))
mbTVar of
                    Maybe (TVar (Maybe (ProjectionState backend)))
Nothing -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                    Just TVar (Maybe (ProjectionState backend))
tvar -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (ProjectionState backend))
-> Maybe (ProjectionState backend) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (ProjectionState backend))
tvar (ProjectionState backend -> Maybe (ProjectionState backend)
forall a. a -> Maybe a
Just ProjectionState backend
state)
                SubscriptionResult -> m SubscriptionResult
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue

-- Rest of the module remains largely unchanged, including:
-- - Schema setup
-- - Database operations
-- - Notification handling
-- These parts don't need tracing modifications as they're either
-- infrastructure setup or already properly scoped operations

--------------------------------------------------------------------------------
-- Fetching/updating projection state
--------------------------------------------------------------------------------

getProjectionState ::
    (FromJSON (Cursor backend)) =>
    ProjectionId ->
    Session (Maybe (Either ProjectionStateError (ProjectionState backend)))
getProjectionState :: forall backend.
FromJSON (Cursor backend) =>
ProjectionId
-> Session
     (Maybe (Either ProjectionStateError (ProjectionState backend)))
getProjectionState (ProjectionId Text
pid) =
    Text
-> Statement Text (Maybe (Text, UTCTime, Maybe Value))
-> Session (Maybe (Text, UTCTime, Maybe Value))
forall params result.
params -> Statement params result -> Session result
Session.statement
        Text
pid
        [maybeStatement|
            select 
                id :: text,
                last_updated :: timestamptz,
                head_position :: jsonb?
            from projections
            where id = $1 :: text
        |]
        Session (Maybe (Text, UTCTime, Maybe Value))
-> (Session (Maybe (Text, UTCTime, Maybe Value))
    -> Session
         (Maybe (Either ProjectionStateError (ProjectionState backend))))
-> Session
     (Maybe (Either ProjectionStateError (ProjectionState backend)))
forall a b. a -> (a -> b) -> b
& (Maybe (Text, UTCTime, Maybe Value)
 -> Maybe (Either ProjectionStateError (ProjectionState backend)))
-> Session (Maybe (Text, UTCTime, Maybe Value))
-> Session
     (Maybe (Either ProjectionStateError (ProjectionState backend)))
forall a b. (a -> b) -> Session a -> Session b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe (Text, UTCTime, Maybe Value)
-> Maybe (Either ProjectionStateError (ProjectionState backend))
forall {backend}.
FromJSON (Cursor backend) =>
Maybe (Text, UTCTime, Maybe Value)
-> Maybe (Either ProjectionStateError (ProjectionState backend))
transform
  where
    transform :: Maybe (Text, UTCTime, Maybe Value)
-> Maybe (Either ProjectionStateError (ProjectionState backend))
transform Maybe (Text, UTCTime, Maybe Value)
res =
        case Maybe (Text, UTCTime, Maybe Value)
res of
            Maybe (Text, UTCTime, Maybe Value)
Nothing -> Maybe (Either ProjectionStateError (ProjectionState backend))
forall a. Maybe a
Nothing
            Just (Text
id', UTCTime
updated, Maybe Value
mbCursorJson) ->
                case Maybe Value
mbCursorJson of
                    Maybe Value
Nothing -> Maybe (Either ProjectionStateError (ProjectionState backend))
forall a. Maybe a
Nothing -- Never processed
                    Just Value
cursorJson ->
                        case Value -> Result (Cursor backend)
forall a. FromJSON a => Value -> Result a
Aeson.fromJSON Value
cursorJson of
                            Aeson.Success Cursor backend
cursor ->
                                Either ProjectionStateError (ProjectionState backend)
-> Maybe (Either ProjectionStateError (ProjectionState backend))
forall a. a -> Maybe a
Just (Either ProjectionStateError (ProjectionState backend)
 -> Maybe (Either ProjectionStateError (ProjectionState backend)))
-> Either ProjectionStateError (ProjectionState backend)
-> Maybe (Either ProjectionStateError (ProjectionState backend))
forall a b. (a -> b) -> a -> b
$
                                    ProjectionState backend
-> Either ProjectionStateError (ProjectionState backend)
forall a b. b -> Either a b
Right (ProjectionState backend
 -> Either ProjectionStateError (ProjectionState backend))
-> ProjectionState backend
-> Either ProjectionStateError (ProjectionState backend)
forall a b. (a -> b) -> a -> b
$
                                        ProjectionState
                                            { projectionId :: ProjectionId
projectionId = Text -> ProjectionId
ProjectionId Text
id'
                                            , lastProcessed :: Cursor backend
lastProcessed = Cursor backend
cursor
                                            , lastUpdated :: UTCTime
lastUpdated = UTCTime
updated
                                            }
                            Aeson.Error String
err ->
                                Either ProjectionStateError (ProjectionState backend)
-> Maybe (Either ProjectionStateError (ProjectionState backend))
forall a. a -> Maybe a
Just (Either ProjectionStateError (ProjectionState backend)
 -> Maybe (Either ProjectionStateError (ProjectionState backend)))
-> Either ProjectionStateError (ProjectionState backend)
-> Maybe (Either ProjectionStateError (ProjectionState backend))
forall a b. (a -> b) -> a -> b
$
                                    ProjectionStateError
-> Either ProjectionStateError (ProjectionState backend)
forall a b. a -> Either a b
Left (ProjectionStateError
 -> Either ProjectionStateError (ProjectionState backend))
-> ProjectionStateError
-> Either ProjectionStateError (ProjectionState backend)
forall a b. (a -> b) -> a -> b
$
                                        Text -> ProjectionStateError
ProjectionStateError (Text -> ProjectionStateError) -> Text -> ProjectionStateError
forall a b. (a -> b) -> a -> b
$
                                            Text
"Could not parse projection state cursor: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
pack String
err

updateProjectionStatement ::
    (ToJSON (Cursor backend)) =>
    Statement (ProjectionState backend) ()
updateProjectionStatement :: forall backend.
ToJSON (Cursor backend) =>
Statement (ProjectionState backend) ()
updateProjectionStatement =
    Statement (Text, Value) ()
ProjectionState.upsertProjectionCursor
        Statement (Text, Value) ()
-> (Statement (Text, Value) ()
    -> Statement (ProjectionState backend) ())
-> Statement (ProjectionState backend) ()
forall a b. a -> (a -> b) -> b
& (ProjectionState backend -> (Text, Value))
-> (() -> ())
-> Statement (Text, Value) ()
-> Statement (ProjectionState backend) ()
forall a b c d.
(a -> b) -> (c -> d) -> Statement b c -> Statement a d
forall (p :: * -> * -> *) a b c d.
Profunctor p =>
(a -> b) -> (c -> d) -> p b c -> p a d
dimap
            (\(ProjectionState (ProjectionId Text
pid) Cursor backend
cursor UTCTime
_ts) -> (Text
pid, Cursor backend -> Value
forall a. ToJSON a => a -> Value
Aeson.toJSON Cursor backend
cursor))
            () -> ()
forall a. a -> a
id

--------------------------------------------------------------------------------
-- Waiting on events
--------------------------------------------------------------------------------

{- | Wait for a projection to process up to (or past) a specific cursor position.

This function uses PostgreSQL LISTEN/NOTIFY to efficiently wait for projection
progress without polling. It returns once the projection has processed the target
cursor or throws an error if the projection state cannot be read.
-}
waitForEvent ::
    forall backend m.
    ( Ord (Cursor backend)
    , MonadUnliftIO m
    , FromJSON (Cursor backend)
    ) =>
    -- | Projection to monitor
    ProjectionId ->
    -- | Target cursor position to wait for
    Cursor backend ->
    -- | PostgreSQL connection for LISTEN/NOTIFY
    Connection ->
    -- | Returns when target cursor reached or throws on error
    m ()
waitForEvent :: forall backend (m :: * -> *).
(Ord (Cursor backend), MonadUnliftIO m,
 FromJSON (Cursor backend)) =>
ProjectionId -> Cursor backend -> Connection -> m ()
waitForEvent projectionId :: ProjectionId
projectionId@(ProjectionId Text
projId) Cursor backend
targetCursor Connection
conn = do
    let pgId :: PgIdentifier
pgId = Text -> PgIdentifier
Notifications.toPgIdentifier Text
projId
    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
        IO () -> IO () -> IO () -> IO ()
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_
            (Connection -> PgIdentifier -> IO ()
Notifications.listen Connection
conn PgIdentifier
pgId)
            (Connection -> PgIdentifier -> IO ()
Notifications.unlisten Connection
conn PgIdentifier
pgId)
            ( do
                result <- Session
  (Maybe (Either ProjectionStateError (ProjectionState backend)))
-> Connection
-> IO
     (Either
        SessionError
        (Maybe (Either ProjectionStateError (ProjectionState backend))))
forall a. Session a -> Connection -> IO (Either SessionError a)
Session.run (ProjectionId
-> Session
     (Maybe (Either ProjectionStateError (ProjectionState backend)))
forall backend.
FromJSON (Cursor backend) =>
ProjectionId
-> Session
     (Maybe (Either ProjectionStateError (ProjectionState backend)))
getProjectionState ProjectionId
projectionId) Connection
conn
                case result of
                    Right (Just (Right ProjectionState backend
currState))
                        | ProjectionState backend
currState.lastProcessed Cursor backend -> Cursor backend -> Bool
forall a. Ord a => a -> a -> Bool
>= Cursor backend
targetCursor ->
                            () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                    Right Maybe (Either ProjectionStateError (ProjectionState backend))
Nothing ->
                        IO ()
waitForNotifications
                    Right (Just (Right ProjectionState backend
_)) ->
                        IO ()
waitForNotifications
                    Right (Just (Left ProjectionStateError
err)) ->
                        ProjectionStateError -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO ProjectionStateError
err
                    Left SessionError
err ->
                        SessionError -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO SessionError
err
            )
  where
    waitForNotifications :: IO ()
waitForNotifications = do
        completionVar <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
        bracket
            ( forkIO $ flip Notifications.waitForNotifications conn $ \ByteString
channel ByteString
notifPayload -> do
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> Text
Data.Text.decodeASCII ByteString
channel Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
projId) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                    case forall a. FromJSON a => ByteString -> Maybe a
Aeson.decode @(NotificationPayload backend) (ByteString -> Maybe (NotificationPayload backend))
-> ByteString -> Maybe (NotificationPayload backend)
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
ByteString.fromStrict ByteString
notifPayload of
                        Maybe (NotificationPayload backend)
Nothing ->
                            ProjectionStateError -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (ProjectionStateError -> IO ()) -> ProjectionStateError -> IO ()
forall a b. (a -> b) -> a -> b
$
                                Text -> ProjectionStateError
ProjectionStateError (Text -> ProjectionStateError) -> Text -> ProjectionStateError
forall a b. (a -> b) -> a -> b
$
                                    Text
"Could not decode notification payload: "
                                        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
Data.Text.pack (ShowS
forall a. Show a => a -> String
show ShowS -> ShowS
forall a b. (a -> b) -> a -> b
$ ByteString -> String
ByteString.unpack ByteString
notifPayload)
                        Just NotificationPayload backend
payload ->
                            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NotificationPayload backend
payload.headPosition Cursor backend -> Cursor backend -> Bool
forall a. Ord a => a -> a -> Bool
>= Cursor backend
targetCursor) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                                MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
completionVar ()
            )
            killThread
            (\ThreadId
_ -> MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
completionVar)

-- | Payload sent via PostgreSQL NOTIFY when projection state updates.
data NotificationPayload backend = NotificationPayload
    { forall backend. NotificationPayload backend -> Cursor backend
headPosition :: Cursor backend
    -- ^ Latest cursor position processed by projection
    , forall backend. NotificationPayload backend -> UTCTime
lastUpdated :: UTCTime
    -- ^ Timestamp of the state update
    }
    deriving ((forall x.
 NotificationPayload backend -> Rep (NotificationPayload backend) x)
-> (forall x.
    Rep (NotificationPayload backend) x -> NotificationPayload backend)
-> Generic (NotificationPayload backend)
forall x.
Rep (NotificationPayload backend) x -> NotificationPayload backend
forall x.
NotificationPayload backend -> Rep (NotificationPayload backend) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall backend x.
Rep (NotificationPayload backend) x -> NotificationPayload backend
forall backend x.
NotificationPayload backend -> Rep (NotificationPayload backend) x
$cfrom :: forall backend x.
NotificationPayload backend -> Rep (NotificationPayload backend) x
from :: forall x.
NotificationPayload backend -> Rep (NotificationPayload backend) x
$cto :: forall backend x.
Rep (NotificationPayload backend) x -> NotificationPayload backend
to :: forall x.
Rep (NotificationPayload backend) x -> NotificationPayload backend
Generic)

deriving instance (Show (Cursor backend)) => Show (NotificationPayload backend)

deriving instance (Eq (Cursor backend)) => Eq (NotificationPayload backend)

instance (FromJSON (Cursor backend)) => FromJSON (NotificationPayload backend) where
    parseJSON :: Value -> Parser (NotificationPayload backend)
parseJSON = String
-> (Object -> Parser (NotificationPayload backend))
-> Value
-> Parser (NotificationPayload backend)
forall a. String -> (Object -> Parser a) -> Value -> Parser a
Aeson.withObject String
"NotificationPayload" ((Object -> Parser (NotificationPayload backend))
 -> Value -> Parser (NotificationPayload backend))
-> (Object -> Parser (NotificationPayload backend))
-> Value
-> Parser (NotificationPayload backend)
forall a b. (a -> b) -> a -> b
$ \Object
obj -> do
        headPosition <- Object
obj Object -> Key -> Parser (Cursor backend)
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"head_position"
        lastUpdated <- obj .: "last_updated"
        pure NotificationPayload{..}