8. Cross-Stream Consistency
Event sourcing with multiple streams creates challenges for invariants that span aggregates (e.g., email uniqueness, capacity limits). Two patterns solve this: SQL constraints for simple invariants, and multi-stream transactions for coordinated updates.
8.1. Prerequisites
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RequiredTypeArguments #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wno-orphans #-}
module Main where
import Control.Monad (void)
import Data.Aeson (FromJSON, ToJSON)
import Data.Map.Strict qualified as Map
import Data.Text (Text)
import Data.Text qualified as Text
import Data.Text.Encoding qualified as Text
import Data.UUID (UUID)
import Database.Postgres.Temp qualified as Temp
import GHC.Generics (Generic)
import Hasql.Connection.Setting qualified as ConnectionSetting
import Hasql.Connection.Setting.Connection qualified as ConnectionSettingConnection
import Hasql.Pool qualified as Pool
import Hasql.Pool.Config qualified as Config
import Hasql.Session qualified as Session
import Hasql.TH (resultlessStatement)
import Hasql.Transaction qualified as Transaction
import Hindsight
import Hindsight.Projection (ProjectionId(..))
import Hindsight.Projection.Matching (ProjectionHandlers (..))
import Hindsight.Store.PostgreSQL
( SQLStore,
createSQLSchema,
emptySyncProjectionRegistry,
newSQLStoreWithProjections,
registerSyncProjection,
shutdownSQLStore,
)
import Data.Int (Int32)
import Data.Proxy (Proxy (..))
8.2. Pattern 1: SQL Constraints for Uniqueness
Email uniqueness is a cross-stream invariant: no two users can share an email. Use synchronous projections with SQL UNIQUE constraints.
8.3. Define Events
type UserRegistered = "user_registered"
data UserInfo = UserInfo
{ userId :: UUID,
email :: Text,
name :: Text
}
deriving (Show, Eq, Generic, FromJSON, ToJSON)
type instance MaxVersion UserRegistered = 0
type instance Versions UserRegistered = '[UserInfo]
instance Event UserRegistered
instance MigrateVersion 0 UserRegistered
-- Helper
registerUser :: UUID -> Text -> Text -> SomeLatestEvent
registerUser uid email name =
mkEvent UserRegistered (UserInfo uid email name)
8.4. Course Enrollment Example
We’ll also use a course enrollment scenario to show multi-stream coordination:
type CourseCreated = "course_created"
type StudentEnrolled = "student_enrolled"
data CourseInfo = CourseInfo
{ courseId :: UUID,
courseName :: Text,
maxCapacity :: Int32
}
deriving (Show, Eq, Generic, FromJSON, ToJSON)
data EnrollmentInfo = EnrollmentInfo
{ courseId :: UUID,
studentId :: UUID
}
deriving (Show, Eq, Generic, FromJSON, ToJSON)
type instance MaxVersion CourseCreated = 0
type instance Versions CourseCreated = '[CourseInfo]
instance Event CourseCreated
instance MigrateVersion 0 CourseCreated
type instance MaxVersion StudentEnrolled = 0
type instance Versions StudentEnrolled = '[EnrollmentInfo]
instance Event StudentEnrolled
instance MigrateVersion 0 StudentEnrolled
-- Helpers
createCourse :: UUID -> Text -> Int32 -> SomeLatestEvent
createCourse cid name cap =
mkEvent CourseCreated (CourseInfo cid name cap)
enrollStudent :: UUID -> UUID -> SomeLatestEvent
enrollStudent cid sid =
mkEvent StudentEnrolled (EnrollmentInfo cid sid)
8.5. Synchronous Projection with UNIQUE Constraint
The projection runs in the same transaction as event insertion. SQL constraints reject events that violate business rules:
-- Projection schema with UNIQUE constraint
createUserProjectionSchema :: Session.Session ()
createUserProjectionSchema = Session.sql $
Text.encodeUtf8 $ Text.unlines
[ "CREATE TABLE IF NOT EXISTS users (",
" user_id UUID PRIMARY KEY,",
" email TEXT NOT NULL UNIQUE,", -- ← Enforces uniqueness!
" name TEXT NOT NULL",
");"
]
-- Projection handlers
userProjection :: ProjectionHandlers '[UserRegistered] SQLStore
userProjection =
(Proxy @UserRegistered, userRegisteredHandler)
:-> ProjectionEnd
where
userRegisteredHandler envelope =
Transaction.statement
( envelope.payload.userId,
envelope.payload.email,
envelope.payload.name
)
[resultlessStatement|
INSERT INTO users (user_id, email, name)
VALUES ($1 :: uuid, $2 :: text, $3 :: text)
|]
If the email already exists, the UNIQUE constraint causes a transaction rollback, and the event is rejected.
8.6. Demonstration
demoEmailUniqueness :: IO ()
demoEmailUniqueness = do
putStrLn "\n=== Demo: Email Uniqueness via SQL Constraints ==="
-- Create temporary PostgreSQL database
result <- Temp.startConfig Temp.defaultConfig
case result of
Left err -> putStrLn $ "Failed to start temp DB: " <> show err
Right db -> do
let connStr = Temp.toConnectionString db
connectionSettings = [ConnectionSetting.connection $ ConnectionSettingConnection.string (Text.decodeUtf8 connStr)]
-- Create connection pool
pool <-
Pool.acquire $
Config.settings
[ Config.size 10,
Config.staticConnectionSettings connectionSettings
]
-- Create event store schema
void $ Pool.use pool (createSQLSchema >> createUserProjectionSchema)
-- Register synchronous projection
let registry = registerSyncProjection
(ProjectionId "user_projection")
userProjection
emptySyncProjectionRegistry
-- Create store with synchronous projections
store <- newSQLStoreWithProjections connStr registry
-- Try to register first user
let aliceId = read "00000000-0000-0000-0000-000000000001"
let aliceStream = StreamId aliceId
result1 <- insertEvents store Nothing $
Transaction $ Map.singleton aliceStream
(StreamWrite NoStream [registerUser aliceId "alice@example.com" "Alice"])
case result1 of
SuccessfulInsertion _ ->
putStrLn " ✓ Alice registered successfully"
FailedInsertion err ->
putStrLn $ " ✗ Alice registration failed: " <> show err
-- Try to register another user with SAME email
let bobId = read "00000000-0000-0000-0000-000000000002"
let bobStream = StreamId bobId
result2 <- insertEvents store Nothing $
Transaction $ Map.singleton bobStream
(StreamWrite NoStream [registerUser bobId "alice@example.com" "Bob"])
case result2 of
SuccessfulInsertion _ ->
putStrLn " ✗ Bob registered (SHOULD HAVE FAILED!)"
FailedInsertion (BackendError _) ->
putStrLn " ✓ Bob registration rejected (email conflict) ← Expected!"
FailedInsertion err ->
putStrLn $ " ? Bob registration failed: " <> show err
-- Try Bob with different email - should succeed
result3 <- insertEvents store Nothing $
Transaction $ Map.singleton bobStream
(StreamWrite NoStream [registerUser bobId "bob@example.com" "Bob"])
case result3 of
SuccessfulInsertion _ ->
putStrLn " ✓ Bob registered with different email"
FailedInsertion err ->
putStrLn $ " ✗ Bob registration failed: " <> show err
-- Cleanup
shutdownSQLStore store
void $ Temp.stop db
8.7. Pattern 2: Multi-Stream Transactions
For complex invariants, combine SQL CHECK constraints with multi-stream atomic inserts. Example: course enrollment with capacity limits. ————————————————
-- Projection schemas
createCourseProjectionSchema :: Session.Session ()
createCourseProjectionSchema = Session.sql $
Text.encodeUtf8 $ Text.unlines
[ "CREATE TABLE IF NOT EXISTS courses (",
" course_id UUID PRIMARY KEY,",
" course_name TEXT NOT NULL,",
" max_capacity INT NOT NULL,",
" enrollment_count INT DEFAULT 0,",
" CHECK (enrollment_count <= max_capacity)",
");",
"",
"CREATE TABLE IF NOT EXISTS enrollments (",
" course_id UUID NOT NULL,",
" student_id UUID NOT NULL,",
" PRIMARY KEY (course_id, student_id)",
");"
]
-- Projection handlers
courseProjection :: ProjectionHandlers '[CourseCreated, StudentEnrolled] SQLStore
courseProjection =
(Proxy @CourseCreated, courseCreatedHandler)
:-> (Proxy @StudentEnrolled, studentEnrolledHandler)
:-> ProjectionEnd
where
courseCreatedHandler envelope =
Transaction.statement
( envelope.payload.courseId,
envelope.payload.courseName,
envelope.payload.maxCapacity
)
[resultlessStatement|
INSERT INTO courses (course_id, course_name, max_capacity)
VALUES ($1 :: uuid, $2 :: text, $3 :: int)
|]
studentEnrolledHandler envelope = do
Transaction.statement
( envelope.payload.courseId,
envelope.payload.studentId
)
[resultlessStatement|
INSERT INTO enrollments (course_id, student_id)
VALUES ($1 :: uuid, $2 :: uuid)
|]
Transaction.statement
envelope.payload.courseId
[resultlessStatement|
UPDATE courses
SET enrollment_count = enrollment_count + 1
WHERE course_id = $1 :: uuid
|]
8.8. Demo: Multi-Stream Coordination
demoMultiStreamEnrollment :: IO ()
demoMultiStreamEnrollment = do
putStrLn "\n=== Demo: Multi-Stream Course Enrollment ==="
result <- Temp.startConfig Temp.defaultConfig
case result of
Left err -> putStrLn $ "Failed to start temp DB: " <> show err
Right db -> do
let connStr = Temp.toConnectionString db
connectionSettings = [ConnectionSetting.connection $ ConnectionSettingConnection.string (Text.decodeUtf8 connStr)]
pool <-
Pool.acquire $
Config.settings
[ Config.size 10,
Config.staticConnectionSettings connectionSettings
]
void $ Pool.use pool (createSQLSchema >> createCourseProjectionSchema)
let registry =
registerSyncProjection
(ProjectionId "course_projection")
courseProjection
emptySyncProjectionRegistry
store <- newSQLStoreWithProjections connStr registry
-- Create course with capacity = 2
let courseId = read "00000000-0000-0000-0000-000000000010"
let courseStream = StreamId courseId
result1 <- insertEvents store Nothing $
Transaction $ Map.singleton courseStream
(StreamWrite NoStream [createCourse courseId "Haskell 101" 2])
case result1 of
SuccessfulInsertion (InsertionSuccess{streamCursors = cursors1}) -> do
let courseCursor = cursors1 Map.! courseStream
putStrLn " ✓ Course created (capacity: 2)"
-- Enroll first student
let student1Id = read "00000000-0000-0000-0000-000000000101"
let student1Stream = StreamId student1Id
result2 <- insertEvents store Nothing $
Transaction $ Map.fromList
[ (courseStream, StreamWrite (ExactVersion courseCursor) [enrollStudent courseId student1Id]),
(student1Stream, StreamWrite NoStream [registerUser student1Id "s1@example.com" "Student 1"])
]
case result2 of
SuccessfulInsertion (InsertionSuccess{streamCursors = cursors2}) -> do
let courseCursor2 = cursors2 Map.! courseStream
putStrLn " ✓ Student 1 enrolled (1/2 capacity)"
-- Enroll second student
let student2Id = read "00000000-0000-0000-0000-000000000102"
let student2Stream = StreamId student2Id
result3 <- insertEvents store Nothing $
Transaction $ Map.fromList
[ (courseStream, StreamWrite (ExactVersion courseCursor2) [enrollStudent courseId student2Id]),
(student2Stream, StreamWrite NoStream [registerUser student2Id "s2@example.com" "Student 2"])
]
case result3 of
SuccessfulInsertion (InsertionSuccess{streamCursors = cursors3}) -> do
let courseCursor3 = cursors3 Map.! courseStream
putStrLn " ✓ Student 2 enrolled (2/2 capacity - FULL)"
-- Try to enroll third student
-- The CHECK constraint will reject this atomically, preventing race conditions
let student3Id = read "00000000-0000-0000-0000-000000000103"
let student3Stream = StreamId student3Id
result4 <- insertEvents store Nothing $
Transaction $ Map.fromList
[ (courseStream, StreamWrite (ExactVersion courseCursor3) [enrollStudent courseId student3Id]),
(student3Stream, StreamWrite NoStream [registerUser student3Id "s3@example.com" "Student 3"])
]
case result4 of
SuccessfulInsertion _ ->
putStrLn " ✗ Student 3 enrolled (SHOULD HAVE BEEN REJECTED)"
FailedInsertion err ->
putStrLn $ " ✓ Student 3 rejected by CHECK constraint: " <> show err
FailedInsertion err ->
putStrLn $ " ✗ Student 2 enrollment failed: " <> show err
FailedInsertion err ->
putStrLn $ " ✗ Student 1 enrollment failed: " <> show err
FailedInsertion err ->
putStrLn $ " ✗ Course creation failed: " <> show err
shutdownSQLStore store
void $ Temp.stop db
8.9. Running the Examples
main :: IO ()
main = do
putStrLn "=== Hindsight Tutorial 08: Cross-Stream Consistency ==="
demoEmailUniqueness
demoMultiStreamEnrollment
putStrLn "\n✓ Tutorial complete!"
8.10. Summary
Key concepts:
SQL constraints enforce simple invariants (UNIQUE, CHECK) within sync projection transactions
Multi-stream transactions coordinate events across multiple streams with version expectations
Combine both for complex invariants that require multi-stream coordination