hindsight-postgresql-projections
Copyright(c) 2024
LicenseBSD3
Maintainermaintainer@example.com
Stabilityexperimental
Safe HaskellNone
LanguageGHC2021

Hindsight.Projection

Description

This module provides the projection system for transforming event streams into queryable read models using PostgreSQL.

Overview

Projections subscribe to events from ANY backend (Memory, Filesystem, or PostgreSQL) and execute handlers within PostgreSQL transactions. State is persisted in the projections table, enabling:

  • Automatic resumption after restarts
  • Progress tracking via cursors
  • LISTEN/NOTIFY for efficient waiting

Backend-Agnostic Design

The key insight: projection execution and state are ALWAYS in PostgreSQL, but events can come from any storage backend:

-- Events from MemoryStore, projections in PostgreSQL
runProjection projId pool Nothing memoryStore handlers

-- Events from PostgreSQL, projections in PostgreSQL
runProjection projId pool Nothing sqlStore handlers

This enables testing with fast in-memory events while validating real SQL projection logic.

Asynchronous Projections (This Module)

Projections run in a separate thread and process events asynchronously:

  • Eventually consistent (small delay between insert and projection update)
  • Work with ANY event store backend
  • Failures don't block event insertion

For synchronous projections (PostgreSQL-only), see Hindsight.Store.PostgreSQL.

Example: User Directory Projection

import Hindsight.Projection
import Hindsight.Projection.Matching (ProjectionHandlers(..))
import Hindsight.Store (match)

-- Define handler
userProjection :: ProjectionHandlers '["user_registered"] backend
userProjection =
  match "user_registered" handleUser :-> ProjectionEnd
  where
    handleUser envelope = do
      let user = envelope.payload
      Transaction.statement (user.userId, user.email)
        [resultlessStatement|
          INSERT INTO users (id, email) VALUES ($1 :: uuid, $2 :: text)
        |]

-- Run projection
main = do
  pool <- createPool postgresSettings
  store <- newMemoryStore  -- Or any backend
  runProjection (ProjectionId "users") pool Nothing store userProjection

Waiting for Projection Progress

Use waitForEvent to synchronize with projection progress:

result <- insertEvents store Nothing batch
case result of
  SuccessfulInsertion cursor -> do
    -- Wait for projection to catch up
    bracket
      (Connection.acquire settings)
      Connection.release
      (waitForEvent projId cursor.finalCursor)

This uses PostgreSQL LISTEN/NOTIFY for efficient waiting without polling.

Projection State Management

Projection state is tracked in the projections table with:

  • id - Unique projection identifier
  • last_updated - Timestamp of last event processed
  • head_position - Cursor position of last processed event (JSON)
  • is_active - Whether projection is currently running

The loadProjectionState function reads this state, and handlers automatically update it after each successful event.

Synopsis

Projection types

data ProjectionState backend Source #

State of a running projection tracked in PostgreSQL.

Constructors

ProjectionState 

Fields

data ProjectionStateError Source #

Constructors

ProjectionStateError 

Fields

Instances

Instances details
FromJSON ProjectionStateError Source # 
Instance details

Defined in Hindsight.Projection

ToJSON ProjectionStateError Source # 
Instance details

Defined in Hindsight.Projection

Exception ProjectionStateError Source # 
Instance details

Defined in Hindsight.Projection

Generic ProjectionStateError Source # 
Instance details

Defined in Hindsight.Projection

Associated Types

type Rep ProjectionStateError 
Instance details

Defined in Hindsight.Projection

type Rep ProjectionStateError = D1 ('MetaData "ProjectionStateError" "Hindsight.Projection" "hindsight-postgresql-projections-0.1.0.0-inplace" 'False) (C1 ('MetaCons "ProjectionStateError" 'PrefixI 'True) (S1 ('MetaSel ('Just "errorMessage") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedStrict) (Rec0 Text)))
Show ProjectionStateError Source # 
Instance details

Defined in Hindsight.Projection

Eq ProjectionStateError Source # 
Instance details

Defined in Hindsight.Projection

type Rep ProjectionStateError Source # 
Instance details

Defined in Hindsight.Projection

type Rep ProjectionStateError = D1 ('MetaData "ProjectionStateError" "Hindsight.Projection" "hindsight-postgresql-projections-0.1.0.0-inplace" 'False) (C1 ('MetaCons "ProjectionStateError" 'PrefixI 'True) (S1 ('MetaSel ('Just "errorMessage") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedStrict) (Rec0 Text)))

Projection results and errors

data ProjectionResult Source #

Result of projection execution

Constructors

ProjectionSuccess 
ProjectionSkipped

Handler didn't match the event

ProjectionError ProjectionError 

data ProjectionError Source #

Types of projection errors

Instances

Instances details
Show ProjectionError Source # 
Instance details

Defined in Hindsight.Projection

Eq ProjectionError Source #

Manual Eq instance for ProjectionError

SomeException doesn't have an Eq instance, so we compare based on the string representation

Instance details

Defined in Hindsight.Projection

Running projections

runProjection Source #

Arguments

:: forall backend m (ts :: [Symbol]). (EventStore backend, MonadFail m, FromJSON (Cursor backend), ToJSON (Cursor backend), StoreConstraints backend m, MonadUnliftIO m) 
=> ProjectionId

Unique identifier for this projection

-> Pool

PostgreSQL connection pool for state management

-> Maybe (TVar (Maybe (ProjectionState backend)))

Optional TVar for exposing state to other threads

-> BackendHandle backend

Event store backend to subscribe to

-> ProjectionHandlers ts backend

Handlers for processing events

-> m ()

Returns when subscription ends

Run a projection continuously, processing events and maintaining state in PostgreSQL.

The projection subscribes to events from the provided backend and executes handlers within PostgreSQL transactions. State is persisted after each successful event processing.

loadProjectionState Source #

Arguments

:: (MonadUnliftIO m, MonadFail m, FromJSON (Cursor backend)) 
=> ProjectionId

Projection identifier

-> Pool

PostgreSQL connection pool

-> m (Maybe (ProjectionState backend))

Current state, or Nothing if never run

Load the current state of a projection from PostgreSQL.

Returns Nothing if the projection has never been run, or throws ProjectionStateError if there's a database or JSON parsing error.

Waiting for events

waitForEvent Source #

Arguments

:: (Ord (Cursor backend), MonadUnliftIO m, FromJSON (Cursor backend)) 
=> ProjectionId

Projection to monitor

-> Cursor backend

Target cursor position to wait for

-> Connection

PostgreSQL connection for LISTEN/NOTIFY

-> m ()

Returns when target cursor reached or throws on error

Wait for a projection to process up to (or past) a specific cursor position.

This function uses PostgreSQL LISTEN/NOTIFY to efficiently wait for projection progress without polling. It returns once the projection has processed the target cursor or throws an error if the projection state cannot be read.