3. PostgreSQL Projections

PostgreSQL projections provide durable, database-native read models with ACID guarantees. Unlike in-memory projections, these survive application restarts and leverage SQL’s full power.

Hindsight’s PostgreSQL projections are backend-agnostic. For example, you can perfectly use PostgreSQL read models while using the filesystem event store.

The SQL projection mechanism is based on Hasql.

3.1. Prerequisites

{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RequiredTypeArguments #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# OPTIONS_GHC -Wno-orphans #-}

module Main where

import Control.Concurrent (forkIO, killThread)
import Control.Exception (bracket)
import Data.Aeson (FromJSON, ToJSON)
import Data.Functor.Contravariant ((>$<))
import Data.Map.Strict qualified as Map
import Data.Text (Text)
import Data.Text.Encoding (decodeUtf8)
import Data.UUID.V4 qualified as UUID
import Database.Postgres.Temp qualified as Temp
import GHC.Generics (Generic)
import Hasql.Connection qualified as Connection
import Hasql.Connection.Setting qualified as ConnectionSetting
import Hasql.Connection.Setting.Connection qualified as ConnectionSettingConnection
import Hasql.Decoders qualified as D
import Hasql.Encoders qualified as E
import Hasql.Pool qualified as Pool
import Hasql.Pool.Config qualified as Config
import Hasql.Session qualified as Session
import Hasql.Statement (Statement(..))
import Hasql.Transaction qualified as Transaction
import Hindsight
import Hindsight.Projection (runProjection, waitForEvent, ProjectionId(..))
import Hindsight.Projection.Matching (ProjectionHandlers(..))
import Hindsight.Store.Memory (newMemoryStore)
import Hindsight.Store.PostgreSQL.Core.Schema qualified as Schema
import GHC.RTS.Flags (MiscFlags(disableDelayedOsMemoryReturn))

3.2. Define Events

As usual, let us start by defining our events:

type UserRegistered = "user_registered"

data UserInfo = UserInfo
  { userId :: Text
  , userName :: Text
  } deriving (Show, Eq, Generic, FromJSON, ToJSON)

-- Event versioning
type instance MaxVersion UserRegistered = 0
type instance Versions UserRegistered = '[UserInfo]
instance Event UserRegistered
instance MigrateVersion 0 UserRegistered

-- Event helper
registerUser :: Text -> Text -> SomeLatestEvent
registerUser uid name =
  mkEvent UserRegistered (UserInfo uid name)

3.3. Create a Projection Handler

Similar to subscription handlers, projection handlers take an event envelope (payload + metadata) as their first argument. However, they must return a Hasql transaction object to be run by the projection engine.

Projection handlers are not tied to a particular backend and can be backend agnostic (as exemplified here).

-- Projection handler logic - updates a PostgreSQL table
-- This same logic works whether events come from Memory, Filesystem, or PostgreSQL
handleUserRegistration :: EventEnvelope UserRegistered backend -> Transaction.Transaction ()
handleUserRegistration eventData = do
  let payload = eventData.payload :: UserInfo
  -- Use parameterized Hasql query
  -- Encoder: (userId, userName) -> SQL parameters
  -- Decoder: () (no result expected)
  Transaction.statement (payload.userId, payload.userName) insertUserStatement
  where
    insertUserStatement :: Statement (Text, Text) ()
    insertUserStatement = Statement sql encoder decoder True
      where
        sql = "INSERT INTO users (user_id, user_name) VALUES ($1, $2)"
        encoder = (fst >$< E.param (E.nonNullable E.text))
               <> (snd >$< E.param (E.nonNullable E.text))
        decoder = D.noResult

3.4. Complete Demo

This demo creates a store, inserts events, and creates a projection that persists a read model in a SQL database.

The key function is runProjection, which subscribes to events from any backend and runs handlers as PostgreSQL transactions. We run it in a background thread using forkIO, then use waitForEvent to block until the projection catches up to a specific event cursor.

demoPostgreSQLProjection :: IO ()
demoPostgreSQLProjection = do
  putStrLn "=== PostgreSQL Projection Demo ==="

  -- Create a temporary PostgreSQL database for projections
  let dbConfig = Temp.defaultConfig
        <> mempty
          { Temp.postgresConfigFile =
              [ ("log_min_messages", "FATAL")
              , ("client_min_messages", "ERROR")
              ]
          }

  result <- Temp.withConfig dbConfig $ \db -> do
    let connStr = Temp.toConnectionString db
        connectionSettings = [ConnectionSetting.connection $ ConnectionSettingConnection.string (decodeUtf8 connStr)]
    putStrLn "✓ Created temporary PostgreSQL database"

    -- Initialize PostgreSQL schema for projections
    pool <- Pool.acquire $ Config.settings [Config.size 1, Config.staticConnectionSettings connectionSettings]
    _ <- Pool.use pool Schema.createSchema

    -- Create our projection table
    _ <- Pool.use pool $ Session.sql
      "CREATE TABLE users (user_id TEXT PRIMARY KEY, user_name TEXT)"

    putStrLn "✓ Initialized projection schema"

    -- Create a MemoryStore for events (not PostgreSQL!)
    store <- newMemoryStore
    putStrLn "✓ Created MemoryStore for events"

    -- Insert events into MemoryStore
    streamId <- StreamId <$> UUID.nextRandom
    let events = [ registerUser "U001" "Alice"
                 , registerUser "U002" "Bob"
                 ]

    insertionResult <- insertEvents store Nothing $
      Transaction $ Map.singleton streamId (StreamWrite Any events)

    case insertionResult of
      FailedInsertion err -> do
        putStrLn $ "✗ Insert failed: " <> show err

      SuccessfulInsertion (InsertionSuccess{finalCursor = cursor}) -> do
        putStrLn "✓ Inserted events into MemoryStore"

        -- Start projection in background thread
        -- Events come from MemoryStore, but projection runs in PostgreSQL!
        let projId = ProjectionId "user-directory"
        projectionThread <- forkIO $
          runProjection
            projId
            pool
            Nothing
            store
            -- Define the handler list inline with concrete backend
            (match UserRegistered handleUserRegistration :-> ProjectionEnd)

        putStrLn "✓ Started projection (subscribing to MemoryStore)"

        -- Wait for projection to catch up
        bracket
          (do Right conn <- Connection.acquire connectionSettings; pure conn)
          Connection.release
          $ \conn -> do
            waitForEvent  projId cursor conn
            putStrLn "✓ Projection caught up"

        killThread projectionThread

        -- Query the projection table in PostgreSQL
        userCount <- Pool.use pool $ Session.statement () $
          Statement
            "SELECT COUNT(*) FROM users"
            mempty
            (D.singleRow (D.column (D.nonNullable D.int8)))
            True

        case userCount of
          Left err -> putStrLn $ "✗ Query failed: " <> show err
          Right count -> putStrLn $ "✓ Users in projection: " <> show count

        -- Query actual user data
        users <- Pool.use pool $ Session.statement () $
          Statement
            "SELECT user_id, user_name FROM users ORDER BY user_id"
            mempty
            (D.rowList ((,) <$> D.column (D.nonNullable D.text)
                            <*> D.column (D.nonNullable D.text)))
            True

        case users of
          Left err -> putStrLn $ "✗ Query failed: " <> show err
          Right rows -> do
            putStrLn "\n📋 Users in directory:"
            mapM_ (\(uid, name) -> putStrLn $ "  " <> show uid <> ": " <> show name) rows

    -- Cleanup
    Pool.release pool

  case result of
    Left err -> putStrLn $ "\n✗ Database error: " <> show err
    Right () -> putStrLn "\n✓ Demo complete (database cleaned up)"

3.5. Running the Example

main :: IO ()
main = do
  putStrLn "=== Hindsight Tutorial 03: PostgreSQL Projections ==="
  putStrLn ""

  demoPostgreSQLProjection

  putStrLn ""
  putStrLn "Tutorial complete!"

3.6. Summary

Key concepts:

  • Backend-agnostic projections: events can come from any store (Memory, Filesystem, PostgreSQL), while projections always run in PostgreSQL

  • Hasql transactions provide ACID guarantees for projection state updates

  • Durable projections survive application restarts, unlike in-memory models

3.7. Next Steps

In the next tutorial, we’ll explore event versioning - how to evolve your event schemas over time while maintaining backward compatibility.