7. Synchronous Projections

Tutorial 03 introduced PostgreSQL projections that run asynchronously - they subscribe to events and update in a separate thread, leading to eventual consistency. Synchronous projections are a PostgreSQL-specific feature that runs projections within the event insert transaction, eliminating lag and providing immediate consistency.

7.1. Prerequisites

{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RequiredTypeArguments #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# OPTIONS_GHC -Wno-orphans #-}

module Main where

import Data.Aeson (FromJSON, ToJSON)
import Data.Functor.Contravariant ((>$<))
import Data.Int (Int32)
import Data.Map.Strict qualified as Map
import Data.Proxy (Proxy(..))
import Data.Text (Text)
import Data.Text.Encoding (decodeUtf8)
import Data.UUID.V4 qualified as UUID
import Database.Postgres.Temp qualified as Temp
import GHC.Generics (Generic)
import Hasql.Connection.Setting qualified as ConnectionSetting
import Hasql.Connection.Setting.Connection qualified as ConnectionSettingConnection
import Hasql.Decoders qualified as D
import Hasql.Encoders qualified as E
import Hasql.Pool qualified as Pool
import Hasql.Pool.Config qualified as Config
import Hasql.Session qualified as Session
import Hasql.Statement (Statement(..))
import Hasql.Transaction qualified as Transaction
import Hindsight
import Hindsight.Projection (ProjectionId(..))
import Hindsight.Projection.Matching (ProjectionHandlers(..))
import Hindsight.Store.PostgreSQL (SQLStore, getPool, emptySyncProjectionRegistry, registerSyncProjection, newSQLStoreWithProjections, shutdownSQLStore, createSQLSchema)

7.2. Define Events

type AccountCredited = "account_credited"

data CreditInfo = CreditInfo
  { accountId :: Text
  , amount :: Int32
  } deriving (Show, Eq, Generic, FromJSON, ToJSON)

-- Event versioning
type instance MaxVersion AccountCredited = 0
type instance Versions AccountCredited = '[CreditInfo]
instance Event AccountCredited
instance MigrateVersion 0 AccountCredited

7.3. Synchronous Projection Handler

Define a projection that updates account balances:

-- Projection runs within insert transaction
balanceProjection :: ProjectionHandlers '[AccountCredited] SQLStore
balanceProjection =
  (Proxy @AccountCredited, handleCredit) :-> ProjectionEnd
  where
    handleCredit eventData = do
      let credit = eventData.payload :: CreditInfo
      Transaction.statement (credit.accountId, credit.amount) upsertBalanceStatement
      where
        upsertBalanceStatement :: Statement (Text, Int32) ()
        upsertBalanceStatement = Statement sql encoder decoder True
          where
            sql = "INSERT INTO balances (account_id, balance) \
                  \VALUES ($1, $2) \
                  \ON CONFLICT (account_id) DO UPDATE SET \
                  \  balance = balances.balance + EXCLUDED.balance"
            encoder = (fst >$< E.param (E.nonNullable E.text))
                   <> (snd >$< E.param (E.nonNullable E.int4))
            decoder = D.noResult

7.4. Demonstration

We’ll demonstrate synchronous projections in three steps: create a projection registry, pass it to a special store constructor, then insert events and query immediately without any delay.

Start with database setup (boilerplate):

demoSyncProjection :: IO ()
demoSyncProjection = do
  putStrLn "=== Synchronous Projection Demo ==="

  let dbConfig = Temp.defaultConfig
        <> mempty { Temp.postgresConfigFile = [("log_min_messages", "FATAL")] }

  result <- Temp.withConfig dbConfig $ \db -> do
    let connStr = Temp.toConnectionString db

    pool <- Pool.acquire $ Config.settings [
        Config.size 1,
        Config.staticConnectionSettings [
          ConnectionSetting.connection $ ConnectionSettingConnection.string (decodeUtf8 connStr)
        ]
      ]
    _ <- Pool.use pool createSQLSchema
    _ <- Pool.use pool $ Session.sql
      "CREATE TABLE balances (account_id TEXT PRIMARY KEY, balance INTEGER)"
    Pool.release pool

Now create a synchronous projection registry. Start with emptySyncProjectionRegistry and add projections using registerSyncProjection. You can chain multiple registrations together:

let registry = registerSyncProjection
                 (ProjectionId "balances")
                 balanceProjection
                 emptySyncProjectionRegistry

Pass the registry to newSQLStoreWithProjections. This is different from the regular store constructor - it wires registered projections to run within event insert transactions:

store <- newSQLStoreWithProjections connStr registry
putStrLn "✓ Created store with synchronous projection"

Insert events. The projection runs atomically in the same transaction:

streamId <- StreamId <$> UUID.nextRandom
let events = [ mkEvent AccountCredited (CreditInfo "A1" 100)
             , mkEvent AccountCredited (CreditInfo "A1" 50)
             , mkEvent AccountCredited (CreditInfo "A2" 200)
             ]

_ <- insertEvents store Nothing $
  Transaction $ Map.singleton streamId (StreamWrite Any events)

putStrLn "✓ Inserted events (projection ran immediately)"

Query immediately. Notice we don’t need threadDelay - the projection already completed as part of the insert transaction:

  balance <- Pool.use (getPool store) $ Session.statement () $
    Statement
      "SELECT account_id, balance FROM balances ORDER BY account_id"
      mempty
      (D.rowList ((,) <$> D.column (D.nonNullable D.text)
                      <*> D.column (D.nonNullable D.int4)))
      True

  case balance of
    Left err -> putStrLn $ "✗ Query failed: " <> show err
    Right rows -> do
      putStrLn "\nBalances (immediately available):"
      mapM_ (\(acid, bal) -> putStrLn $ "  " <> show acid <> ": " <> show bal) rows

  shutdownSQLStore store
  Pool.release (getPool store)

case result of
  Left err -> putStrLn $ "\n✗ Database error: " <> show err
  Right () -> putStrLn "\n✓ Demo complete"

7.5. Trade-offs

Synchronous projections:

  • ✅ Immediate consistency - no lag

  • ✅ Query immediately after insert

  • ❌ PostgreSQL-only

  • ❌ Projection failures block inserts

Asynchronous projections (Tutorial 03):

  • ✅ Backend-agnostic (works with Memory/Filesystem/PostgreSQL)

  • ✅ Projection failures don’t block inserts

  • ❌ Eventually consistent - small lag

  • ❌ Need to wait for projection updates

7.6. Running the Example

main :: IO ()
main = do
  putStrLn "=== Hindsight Tutorial 07: Synchronous Projections ==="
  putStrLn ""

  demoSyncProjection

  putStrLn "\nTutorial complete!"

7.7. Summary

Key concepts:

  • Synchronous projections run within the insert transaction, eliminating lag

  • Immediate consistency: Query results available instantly after insert

  • PostgreSQL-only: Trade backend flexibility for consistency guarantees