2. In-Memory Projections
Reading the entire event log whenever you want to figure out the state of your system is usually not practical. That’s why you may want to build projections to serve as read models and speed up information retrieval. Projections are essentially built by continuously mutating a state while reading a sequence of events.
Projections can be easily built on top of Hindsight’s subscription mechanism. In this tutorial, we will do just that, using STM to store the state of our system in-memory.
2.1. Prerequisites
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RequiredTypeArguments #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# OPTIONS_GHC -Wno-orphans #-}
module Main where
import Control.Concurrent (threadDelay)
import Control.Concurrent.STM
import Control.Monad (void)
import Data.Aeson (FromJSON, ToJSON)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Text (Text)
import Data.UUID.V4 qualified as UUID
import GHC.Generics (Generic)
import Hindsight
import Hindsight.Store.Memory (MemoryStore, newMemoryStore)
2.2. Define Domain Events
Let’s model user lifecycle events:
type UserRegistered = "user_registered"
type UserDeactivated = "user_deactivated"
data UserInfo = UserInfo
{ userId :: Text
, userName :: Text
} deriving (Show, Eq, Generic, FromJSON, ToJSON)
data DeactivationInfo = DeactivationInfo
{ userId :: Text
} deriving (Show, Eq, Generic, FromJSON, ToJSON)
-- Event versioning
type instance MaxVersion UserRegistered = 0
type instance Versions UserRegistered = '[UserInfo]
instance Event UserRegistered
instance MigrateVersion 0 UserRegistered
type instance MaxVersion UserDeactivated = 0
type instance Versions UserDeactivated = '[DeactivationInfo]
instance Event UserDeactivated
instance MigrateVersion 0 UserDeactivated
-- Event helpers
registerUser :: Text -> Text -> SomeLatestEvent
registerUser uid name =
mkEvent UserRegistered (UserInfo uid name)
deactivateUser :: Text -> SomeLatestEvent
deactivateUser uid =
mkEvent UserDeactivated (DeactivationInfo uid)
2.3. Building a Simple Projection
Let’s count active users using a TVar (transactional variable):
-- Our read model: a simple counter
type UserCountModel = TVar Int
-- Create an empty model
newUserCountModel :: IO UserCountModel
newUserCountModel = newTVarIO 0
-- Query the current count
queryUserCount :: UserCountModel -> IO Int
queryUserCount = readTVarIO
-- Update handlers for each event type
handleRegistration :: UserCountModel -> EventHandler UserRegistered IO MemoryStore
handleRegistration countModel envelope = do
let user = envelope.payload
atomically $ modifyTVar' countModel (+1)
putStrLn $ " → Registered: " <> show user.userName <> " (count +1)"
return Continue
handleDeactivation :: UserCountModel -> EventHandler UserDeactivated IO MemoryStore
handleDeactivation countModel envelope = do
let user = envelope.payload
atomically $ modifyTVar' countModel (subtract 1)
putStrLn $ " → Deactivated: " <> show user.userId <> " (count -1)"
return Continue
2.4. Demo: Counting Active Users
demoUserCount :: IO ()
demoUserCount = do
putStrLn "=== User Count Projection ==="
store <- newMemoryStore
streamId <- StreamId <$> UUID.nextRandom
-- Create our read model
countModel <- newUserCountModel
-- Subscribe to events and update the model
handle <- subscribe store
( match UserRegistered (handleRegistration countModel) :?
match UserDeactivated (handleDeactivation countModel) :?
MatchEnd )
(EventSelector AllStreams FromBeginning)
-- Insert some events
let events = [ registerUser "U001" "Alice"
, registerUser "U002" "Bob"
, registerUser "U003" "Carol"
, deactivateUser "U002" -- Bob leaves
]
void $ insertEvents store Nothing $
multiEvent streamId Any events
-- Wait for projection to update
threadDelay 100000
-- Query the result
activeUsers <- queryUserCount countModel
putStrLn $ "\n✓ Active users: " <> show activeUsers
handle.cancel
threadDelay 10000
Note that there is no new Hindsight concept here: everything is based on
the subscription mechanism you already read about in the first tutorial.
However, when we first presented subscriptions, we glimpsed over a
rather subtle point which is more apparent here. Subscription handlers
are defined using a custom match
syntax instead of standard
pattern-matching:
match UserRegistered (handleRegistration countModel) :?
match UserDeactivated (handleDeactivation countModel) :?
MatchEnd
The reason for this is that events in Hindsight form an extensible sum type. In this terms, we are not defining user events like this:
data UserEvents = UserRegistered | UserDeactivated
Defining event payloads as normal sum types is perfectly possible if you so want (and maybe a practical choice in many situations). However, there is a key benefit in having an open universe of events: loose-coupling. Each service can define its own events, without preventing another service to aggregate information from multiple services in the same subscription.
2.5. Building a Richer Projection
Let’s build a projection that tracks actual user details:
-- Our read model: a map of active users
type UserDirectoryModel = TVar (Map Text Text) -- userId -> userName
newUserDirectory :: IO UserDirectoryModel
newUserDirectory = newTVarIO Map.empty
-- Query operations
getAllUsers :: UserDirectoryModel -> IO [(Text, Text)]
getAllUsers model = Map.toList <$> readTVarIO model
lookupUser :: UserDirectoryModel -> Text -> IO (Maybe Text)
lookupUser model uid = Map.lookup uid <$> readTVarIO model
-- Update handlers
handleRegistrationDir :: UserDirectoryModel -> EventHandler UserRegistered IO MemoryStore
handleRegistrationDir dirModel envelope = do
let user = envelope.payload
atomically $ modifyTVar' dirModel (Map.insert user.userId user.userName)
return Continue
handleDeactivationDir :: UserDirectoryModel -> EventHandler UserDeactivated IO MemoryStore
handleDeactivationDir dirModel envelope = do
let user = envelope.payload
atomically $ modifyTVar' dirModel (Map.delete user.userId)
return Continue
2.6. Demo: User Directory
demoUserDirectory :: IO ()
demoUserDirectory = do
putStrLn "\n=== User Directory Projection ==="
store <- newMemoryStore
streamId <- StreamId <$> UUID.nextRandom
-- Create the directory model
dirModel <- newUserDirectory
-- Subscribe
handle <- subscribe store
( match UserRegistered (handleRegistrationDir dirModel)
:? match UserDeactivated (handleDeactivationDir dirModel)
:? MatchEnd
)
(EventSelector AllStreams FromBeginning)
-- Insert events
let events = [ registerUser "U001" "Alice"
, registerUser "U002" "Bob"
, deactivateUser "U001" -- Alice leaves
]
void $ insertEvents store Nothing $
multiEvent streamId Any events
threadDelay 100000
-- Query the directory
allUsers <- getAllUsers dirModel
putStrLn $ "\n✓ Active users: " <> show allUsers
-- Look up a specific user
maybeBob <- lookupUser dirModel "U002"
putStrLn $ "✓ User U002: " <> show maybeBob
handle.cancel
threadDelay 10000
2.7. Running the Examples
main :: IO ()
main = do
putStrLn "=== Hindsight Tutorial 02: In-Memory Projections ==="
putStrLn ""
demoUserCount
demoUserDirectory
putStrLn ""
putStrLn "Tutorial complete!"
2.8. Summary
Key concepts:
Projections continuously transform events into queryable state
STM (
TVar
,atomically
) provides thread-safe in-memory storageHandlers return
Continue
to keep processing orStop
to halt the subscription
2.9. Next Steps
In the next tutorial, we’ll explore PostgreSQL projections - building read models that persist to a database and survive application restarts.