| Copyright | (c) 2024 |
|---|---|
| License | BSD3 |
| Maintainer | maintainer@example.com |
| Stability | experimental |
| Safe Haskell | None |
| Language | GHC2021 |
Hindsight.Projection
Description
This module provides the projection system for transforming event streams into queryable read models using PostgreSQL.
Overview
Projections subscribe to events from ANY backend (Memory, Filesystem, or PostgreSQL)
and execute handlers within PostgreSQL transactions. State is persisted in the
projections table, enabling:
- Automatic resumption after restarts
- Progress tracking via cursors
- LISTEN/NOTIFY for efficient waiting
Backend-Agnostic Design
The key insight: projection execution and state are ALWAYS in PostgreSQL, but events can come from any storage backend:
-- Events from MemoryStore, projections in PostgreSQL runProjection projId pool Nothing memoryStore handlers -- Events from PostgreSQL, projections in PostgreSQL runProjection projId pool Nothing sqlStore handlers
This enables testing with fast in-memory events while validating real SQL projection logic.
Asynchronous Projections (This Module)
Projections run in a separate thread and process events asynchronously:
- Eventually consistent (small delay between insert and projection update)
- Work with ANY event store backend
- Failures don't block event insertion
For synchronous projections (PostgreSQL-only), see Hindsight.Store.PostgreSQL.
Example: User Directory Projection
import Hindsight.Projection
import Hindsight.Projection.Matching (ProjectionHandlers(..))
import Hindsight.Store (match)
-- Define handler
userProjection :: ProjectionHandlers '["user_registered"] backend
userProjection =
match "user_registered" handleUser :-> ProjectionEnd
where
handleUser envelope = do
let user = envelope.payload
Transaction.statement (user.userId, user.email)
[resultlessStatement|
INSERT INTO users (id, email) VALUES ($1 :: uuid, $2 :: text)
|]
-- Run projection
main = do
pool <- createPool postgresSettings
store <- newMemoryStore -- Or any backend
runProjection (ProjectionId "users") pool Nothing store userProjection
Waiting for Projection Progress
Use waitForEvent to synchronize with projection progress:
result <- insertEvents store Nothing batch
case result of
SuccessfulInsertion cursor -> do
-- Wait for projection to catch up
bracket
(Connection.acquire settings)
Connection.release
(waitForEvent projId cursor.finalCursor)
This uses PostgreSQL LISTEN/NOTIFY for efficient waiting without polling.
Projection State Management
Projection state is tracked in the projections table with:
id- Unique projection identifierlast_updated- Timestamp of last event processedhead_position- Cursor position of last processed event (JSON)is_active- Whether projection is currently running
The loadProjectionState function reads this state, and handlers automatically
update it after each successful event.
Synopsis
- newtype ProjectionId = ProjectionId {}
- data ProjectionState backend = ProjectionState {
- projectionId :: ProjectionId
- lastProcessed :: Cursor backend
- lastUpdated :: UTCTime
- data ProjectionStateError = ProjectionStateError {
- errorMessage :: Text
- data ProjectionResult
- data ProjectionError
- runProjection :: forall backend m (ts :: [Symbol]). (EventStore backend, MonadFail m, FromJSON (Cursor backend), ToJSON (Cursor backend), StoreConstraints backend m, MonadUnliftIO m) => ProjectionId -> Pool -> Maybe (TVar (Maybe (ProjectionState backend))) -> BackendHandle backend -> ProjectionHandlers ts backend -> m ()
- loadProjectionState :: (MonadUnliftIO m, MonadFail m, FromJSON (Cursor backend)) => ProjectionId -> Pool -> m (Maybe (ProjectionState backend))
- waitForEvent :: (Ord (Cursor backend), MonadUnliftIO m, FromJSON (Cursor backend)) => ProjectionId -> Cursor backend -> Connection -> m ()
Projection types
newtype ProjectionId Source #
Constructors
| ProjectionId | |
Fields | |
Instances
| Show ProjectionId Source # | |
Defined in Hindsight.Projection Methods showsPrec :: Int -> ProjectionId -> ShowS # show :: ProjectionId -> String # showList :: [ProjectionId] -> ShowS # | |
| Eq ProjectionId Source # | |
Defined in Hindsight.Projection | |
| Ord ProjectionId Source # | |
Defined in Hindsight.Projection Methods compare :: ProjectionId -> ProjectionId -> Ordering # (<) :: ProjectionId -> ProjectionId -> Bool # (<=) :: ProjectionId -> ProjectionId -> Bool # (>) :: ProjectionId -> ProjectionId -> Bool # (>=) :: ProjectionId -> ProjectionId -> Bool # max :: ProjectionId -> ProjectionId -> ProjectionId # min :: ProjectionId -> ProjectionId -> ProjectionId # | |
data ProjectionState backend Source #
State of a running projection tracked in PostgreSQL.
Constructors
| ProjectionState | |
Fields
| |
data ProjectionStateError Source #
Constructors
| ProjectionStateError | |
Fields
| |
Instances
Projection results and errors
data ProjectionResult Source #
Result of projection execution
Constructors
| ProjectionSuccess | |
| ProjectionSkipped | Handler didn't match the event |
| ProjectionError ProjectionError |
Instances
| Show ProjectionResult Source # | |
Defined in Hindsight.Projection Methods showsPrec :: Int -> ProjectionResult -> ShowS # show :: ProjectionResult -> String # showList :: [ProjectionResult] -> ShowS # | |
| Eq ProjectionResult Source # | |
Defined in Hindsight.Projection Methods (==) :: ProjectionResult -> ProjectionResult -> Bool # (/=) :: ProjectionResult -> ProjectionResult -> Bool # | |
data ProjectionError Source #
Types of projection errors
Constructors
| ParseError Text | |
| HandlerError SomeException | |
| BackendError Text |
Instances
| Show ProjectionError Source # | |
Defined in Hindsight.Projection Methods showsPrec :: Int -> ProjectionError -> ShowS # show :: ProjectionError -> String # showList :: [ProjectionError] -> ShowS # | |
| Eq ProjectionError Source # | Manual Eq instance for ProjectionError SomeException doesn't have an Eq instance, so we compare based on the string representation |
Defined in Hindsight.Projection Methods (==) :: ProjectionError -> ProjectionError -> Bool # (/=) :: ProjectionError -> ProjectionError -> Bool # | |
Running projections
Arguments
| :: forall backend m (ts :: [Symbol]). (EventStore backend, MonadFail m, FromJSON (Cursor backend), ToJSON (Cursor backend), StoreConstraints backend m, MonadUnliftIO m) | |
| => ProjectionId | Unique identifier for this projection |
| -> Pool | PostgreSQL connection pool for state management |
| -> Maybe (TVar (Maybe (ProjectionState backend))) | Optional TVar for exposing state to other threads |
| -> BackendHandle backend | Event store backend to subscribe to |
| -> ProjectionHandlers ts backend | Handlers for processing events |
| -> m () | Returns when subscription ends |
Run a projection continuously, processing events and maintaining state in PostgreSQL.
The projection subscribes to events from the provided backend and executes handlers within PostgreSQL transactions. State is persisted after each successful event processing.
Arguments
| :: (MonadUnliftIO m, MonadFail m, FromJSON (Cursor backend)) | |
| => ProjectionId | Projection identifier |
| -> Pool | PostgreSQL connection pool |
| -> m (Maybe (ProjectionState backend)) | Current state, or Nothing if never run |
Load the current state of a projection from PostgreSQL.
Returns Nothing if the projection has never been run, or throws ProjectionStateError if there's a database or JSON parsing error.
Waiting for events
Arguments
| :: (Ord (Cursor backend), MonadUnliftIO m, FromJSON (Cursor backend)) | |
| => ProjectionId | Projection to monitor |
| -> Cursor backend | Target cursor position to wait for |
| -> Connection | PostgreSQL connection for LISTEN/NOTIFY |
| -> m () | Returns when target cursor reached or throws on error |
Wait for a projection to process up to (or past) a specific cursor position.
This function uses PostgreSQL LISTEN/NOTIFY to efficiently wait for projection progress without polling. It returns once the projection has processed the target cursor or throws an error if the projection state cannot be read.