| Copyright | (c) 2024 |
|---|---|
| License | BSD3 |
| Maintainer | maintainer@example.com |
| Stability | experimental |
| Safe Haskell | None |
| Language | GHC2021 |
Hindsight.Store
Description
This module defines the core event store interface and types used across all storage backends.
Overview
An event store is a append-only log of events organized into streams. Each stream represents the lifecycle of a single aggregate or entity.
Basic Usage
import Hindsight.Store
import Hindsight.Store.Memory (newMemoryStore)
-- Create a store
store <- newMemoryStore
-- Insert events
let batch = StreamWrite
{ expectedVersion = NoStream
, events = [mkEvent "user_created" payload]
}
result <- insertEvents store Nothing (Map.singleton streamId batch)
-- Subscribe to events
handle <- subscribe store matcher selector
Key Concepts
- Streams: Ordered sequences of events for a single entity
- Cursors: Opaque positions in the global event log
- Subscriptions: Real-time notifications of new events
- Version Expectations: Optimistic concurrency control for writes
Synopsis
- newtype StreamId = StreamId {}
- newtype EventId = EventId {}
- newtype CorrelationId = CorrelationId {}
- newtype StreamVersion = StreamVersion Int64
- class EventStore backend where
- type StoreConstraints backend (m :: Type -> Type)
- insertEvents :: forall (t :: Type -> Type) m. (Traversable t, StoreConstraints backend m) => BackendHandle backend -> Maybe CorrelationId -> Transaction t backend -> m (InsertionResult backend)
- subscribe :: forall m (ts :: [Symbol]). StoreConstraints backend m => BackendHandle backend -> EventMatcher ts backend m -> EventSelector backend -> m (SubscriptionHandle backend)
- type family BackendHandle backend = (result :: Type) | result -> backend
- type family Cursor backend = (result :: Type) | result -> backend
- data StreamWrite (t :: k -> Type) (e :: k) backend = StreamWrite {
- expectedVersion :: ExpectedVersion backend
- events :: t e
- newtype Transaction (t :: Type -> Type) backend = Transaction {
- transactionWrites :: Map StreamId (StreamWrite t SomeLatestEvent backend)
- data InsertionResult backend
- = SuccessfulInsertion (InsertionSuccess backend)
- | FailedInsertion (EventStoreError backend)
- data InsertionSuccess backend = InsertionSuccess {
- finalCursor :: Cursor backend
- streamCursors :: Map StreamId (Cursor backend)
- singleEvent :: StreamId -> ExpectedVersion backend -> SomeLatestEvent -> Transaction [] backend
- multiEvent :: StreamId -> ExpectedVersion backend -> t SomeLatestEvent -> Transaction t backend
- appendAfterAny :: StreamId -> SomeLatestEvent -> Transaction [] backend
- appendToOrCreateStream :: StreamId -> SomeLatestEvent -> Transaction [] backend
- fromWrites :: forall (t :: Type -> Type) backend. [(StreamId, StreamWrite t SomeLatestEvent backend)] -> Transaction t backend
- data ExpectedVersion backend
- = NoStream
- | StreamExists
- | ExactVersion (Cursor backend)
- | ExactStreamVersion StreamVersion
- | Any
- data EventStoreError backend
- = ConsistencyError (ConsistencyErrorInfo backend)
- | BackendError ErrorInfo
- | OtherError ErrorInfo
- data ErrorInfo = ErrorInfo {}
- data ConsistencyErrorInfo backend = ConsistencyErrorInfo [VersionMismatch backend]
- data VersionMismatch backend = VersionMismatch {
- streamId :: StreamId
- expectedVersion :: ExpectedVersion backend
- actualVersion :: Maybe (Cursor backend)
- data HandlerException = HandlerException {}
- data EventSelector backend = EventSelector {
- streamId :: StreamSelector
- startupPosition :: StartupPosition backend
- data StreamSelector
- data StartupPosition backend
- = FromBeginning
- | FromLastProcessed (Cursor backend)
- data SubscriptionHandle (backend :: k) = SubscriptionHandle {}
- data SubscriptionResult
- data EventEnvelope (event :: Symbol) backend = EventWithMetadata {
- position :: Cursor backend
- eventId :: EventId
- streamId :: StreamId
- streamVersion :: StreamVersion
- correlationId :: Maybe CorrelationId
- createdAt :: UTCTime
- payload :: CurrentPayloadType event
- type EventHandler (event :: Symbol) (m :: Type -> Type) backend = EventEnvelope event backend -> m SubscriptionResult
- data EventMatcher (ts :: [Symbol]) backend (m :: Type -> Type) where
- (:?) :: forall (event :: Symbol) (m :: Type -> Type) backend (ts1 :: [Symbol]). Event event => (Proxy event, EventHandler event m backend) -> EventMatcher ts1 backend m -> EventMatcher (event ': ts1) backend m
- MatchEnd :: forall backend (m :: Type -> Type). EventMatcher ('[] :: [Symbol]) backend m
- match :: forall (event :: k) -> forall a. a -> (Proxy event, a)
Core Identifiers
Unique identifiers for streams, events, and correlations.
Unique identifier for an event stream.
Streams are logical groupings of related events, typically representing the lifecycle of a single aggregate or entity.
Unique identifier for an individual event.
newtype CorrelationId Source #
Correlation identifier for tracking related events across streams.
Used to trace causally-related events that span multiple streams or external system boundaries.
Constructors
| CorrelationId | |
Instances
newtype StreamVersion Source #
Local stream version - simple incrementing number per stream (1, 2, 3, ...)
Constructors
| StreamVersion Int64 |
Instances
Event Store Interface
The main type class implemented by all storage backends.
The EventStore class includes the StoreConstraints type family,
which defines additional constraints required by each backend.
class EventStore backend where Source #
Core interface for event store backends.
Provides methods for inserting events and subscribing to event streams.
Each backend defines its own constraints via StoreConstraints.
Associated Types
type StoreConstraints backend (m :: Type -> Type) Source #
Additional constraints required by this backend.
For example, PostgreSQL requires MonadUnliftIO for async operations.
Methods
insertEvents :: forall (t :: Type -> Type) m. (Traversable t, StoreConstraints backend m) => BackendHandle backend -> Maybe CorrelationId -> Transaction t backend -> m (InsertionResult backend) Source #
Insert event batches atomically into the store.
All events in the batch are inserted as a single transaction. If any version check fails, the entire operation is rolled back.
subscribe :: forall m (ts :: [Symbol]). StoreConstraints backend m => BackendHandle backend -> EventMatcher ts backend m -> EventSelector backend -> m (SubscriptionHandle backend) Source #
Subscribe to events matching the given criteria.
The subscription will call the appropriate handler for each matching
event until a handler returns Stop or the subscription is cancelled.
Backend Types
Backend-specific handle and cursor types.
Each backend defines its own Cursor and BackendHandle via type families.
type family BackendHandle backend = (result :: Type) | result -> backend Source #
Handle to interact with a specific storage backend.
Contains backend-specific configuration like connection pools, file handles, or in-memory storage references.
The type family is injective: knowing the handle type determines the backend, which helps with type inference.
type family Cursor backend = (result :: Type) | result -> backend Source #
Position marker within an event store, backend-specific.
For example, PostgreSQL uses a compound cursor of (transaction_no, seq_no), while a simple implementation might use a single sequence number.
The type family is injective: knowing the cursor type determines the backend which helps with type inference.
Event Operations
Types for inserting and querying events.
Insertion
data StreamWrite (t :: k -> Type) (e :: k) backend Source #
Write operation for inserting events into a single stream.
Groups events with their version expectation for atomic insertion.
The event type e can be SomeLatestEvent for raw events or enriched with metadata.
Constructors
| StreamWrite | |
Fields
| |
Instances
| (Show (ExpectedVersion backend), Show (t e)) => Show (StreamWrite t e backend) Source # | |
Defined in Hindsight.Store Methods showsPrec :: Int -> StreamWrite t e backend -> ShowS # show :: StreamWrite t e backend -> String # showList :: [StreamWrite t e backend] -> ShowS # | |
| (Eq (ExpectedVersion backend), Eq (t e)) => Eq (StreamWrite t e backend) Source # | |
Defined in Hindsight.Store Methods (==) :: StreamWrite t e backend -> StreamWrite t e backend -> Bool # (/=) :: StreamWrite t e backend -> StreamWrite t e backend -> Bool # | |
newtype Transaction (t :: Type -> Type) backend Source #
A multi-stream transactional write operation.
Represents a set of writes to be inserted atomically across multiple streams. All version checks must pass for the entire transaction to succeed.
Ordering Guarantees
- Events within a single stream are totally ordered - they appear in the global event log in the order specified.
- Events across different streams within the same transaction have __no ordering guarantee__ relative to each other. They are inserted atomically but may appear in any interleaving in the global log.
Composition
Transactions form a Monoid, allowing easy composition:
transaction1 <> transaction2 <> transaction3
When combining transactions:
- Events for the same stream are concatenated (events from tx1, then tx2)
- Version expectations are left-biased (tx1's expectation wins)
Example
-- Simple single-event write
let tx = singleEvent streamId NoStream event
result <- insertEvents store Nothing tx
-- Multi-stream with composition
let tx = singleEvent userStreamId NoStream userCreated
<> appendAfterAny auditLogId auditEntry
result <- insertEvents store Nothing tx
Constructors
| Transaction | |
Fields
| |
Instances
| Semigroup (t SomeLatestEvent) => Monoid (Transaction t backend) Source # | |
Defined in Hindsight.Store Methods mempty :: Transaction t backend # mappend :: Transaction t backend -> Transaction t backend -> Transaction t backend # mconcat :: [Transaction t backend] -> Transaction t backend # | |
| Semigroup (t SomeLatestEvent) => Semigroup (Transaction t backend) Source # | |
Defined in Hindsight.Store Methods (<>) :: Transaction t backend -> Transaction t backend -> Transaction t backend # sconcat :: NonEmpty (Transaction t backend) -> Transaction t backend # stimes :: Integral b => b -> Transaction t backend -> Transaction t backend # | |
| (Show (Cursor backend), Show (t SomeLatestEvent)) => Show (Transaction t backend) Source # | |
Defined in Hindsight.Store Methods showsPrec :: Int -> Transaction t backend -> ShowS # show :: Transaction t backend -> String # showList :: [Transaction t backend] -> ShowS # | |
| (Eq (Cursor backend), Eq (t SomeLatestEvent)) => Eq (Transaction t backend) Source # | |
Defined in Hindsight.Store Methods (==) :: Transaction t backend -> Transaction t backend -> Bool # (/=) :: Transaction t backend -> Transaction t backend -> Bool # | |
data InsertionResult backend Source #
Result of an event insertion operation.
Constructors
| SuccessfulInsertion (InsertionSuccess backend) | Success with cursor information |
| FailedInsertion (EventStoreError backend) | Failure with error details |
data InsertionSuccess backend Source #
Success data from an event insertion operation.
Contains the global cursor position of the last event inserted and the per-stream cursor positions.
Constructors
| InsertionSuccess | |
Fields
| |
Transaction Helpers
Helper functions for constructing transactions.
The singleEvent and multiEvent functions are the primary API,
requiring explicit version expectations. Convenience helpers like
appendAfterAny are provided for common patterns.
Arguments
| :: StreamId | Target stream identifier |
| -> ExpectedVersion backend | Version expectation for concurrency control |
| -> SomeLatestEvent | Event to insert |
| -> Transaction [] backend | Transaction with single event |
Create a transaction with a single event to a single stream.
This is the primary API - it forces explicit choice of version expectation.
-- Creating a new aggregate singleEvent accountId NoStream accountCreated -- Updating with optimistic locking singleEvent accountId (ExactStreamVersion v) balanceUpdated -- Append-only log singleEvent logId Any logEntry
Arguments
| :: StreamId | Target stream identifier |
| -> ExpectedVersion backend | Version expectation for concurrency control |
| -> t SomeLatestEvent | Collection of events to insert (typically a list) |
| -> Transaction t backend | Transaction with multiple events |
Create a transaction with multiple events to a single stream.
multiEvent streamId NoStream [event1, event2, event3]
Arguments
| :: StreamId | Target stream identifier (must exist) |
| -> SomeLatestEvent | Event to append |
| -> Transaction [] backend | Transaction with StreamExists expectation |
Append an event to an existing stream without version checking.
Uses StreamExists for the version expectation - the stream must already
exist, but any version is acceptable. Suitable for append-only scenarios
where multiple processes may be writing concurrently.
Use Cases
- Audit logs where the log stream is pre-created
- Event logs with concurrent writers
- Not suitable for aggregates with business invariants
-- Audit log entry (stream must exist) appendAfterAny auditLogId auditEntry -- Multiple concurrent writers OK tx1 = appendAfterAny logId event1 tx2 = appendAfterAny logId event2 -- No conflict
appendToOrCreateStream Source #
Arguments
| :: StreamId | Target stream identifier (created if missing) |
| -> SomeLatestEvent | Event to append |
| -> Transaction [] backend | Transaction with Any expectation (no version check) |
Append an event, creating the stream if it doesn't exist.
Uses Any for the version expectation - no version checking at all.
This is the most permissive option.
Warning: This provides no concurrency control. Suitable for testing or truly append-only scenarios, but dangerous for aggregates with invariants.
Use Cases
- Testing where you don't care about stream state
- Idempotent appends where duplicates are handled elsewhere
- Not suitable for production aggregates
-- Test code appendToOrCreateStream testStreamId testEvent -- For production, prefer explicit version expectations singleEvent streamId NoStream event -- Better: explicit create
Arguments
| :: forall (t :: Type -> Type) backend. [(StreamId, StreamWrite t SomeLatestEvent backend)] | List of stream writes |
| -> Transaction t backend | Combined transaction |
Create a transaction from a list of stream writes.
fromWrites [ (stream1, StreamWrite NoStream [e1]) , (stream2, StreamWrite NoStream [e2]) ]
Version Control
Optimistic concurrency control for preventing conflicts.
When inserting events, you can specify version expectations to ensure the stream hasn't changed since you read it. This implements optimistic locking without holding database locks during business logic execution.
See: https://en.wikipedia.org/wiki/Optimistic_concurrency_control
data ExpectedVersion backend Source #
Version expectation for optimistic concurrency control.
Used to prevent concurrent modifications by specifying what version a stream should be at before inserting new events.
This implements optimistic locking: instead of holding locks during a read-modify-write cycle, we check at write time that the stream hasn't changed since we read it.
See: https://en.wikipedia.org/wiki/Optimistic_concurrency_control
Common patterns:
NoStream- Creating a new aggregate (ensure stream doesn't exist)ExactStreamVersion- Normal update (check we have latest version)Any- Append-only scenarios (no conflict prevention)
Constructors
| NoStream | Stream must not exist (for creating new streams) |
| StreamExists | Stream must exist (any version) |
| ExactVersion (Cursor backend) | Stream must be at this exact global position |
| ExactStreamVersion StreamVersion | Stream must be at this exact local version |
| Any | No version check (always succeeds) |
Instances
| Show (Cursor backend) => Show (ExpectedVersion backend) Source # | |
Defined in Hindsight.Store Methods showsPrec :: Int -> ExpectedVersion backend -> ShowS # show :: ExpectedVersion backend -> String # showList :: [ExpectedVersion backend] -> ShowS # | |
| Eq (Cursor backend) => Eq (ExpectedVersion backend) Source # | |
Defined in Hindsight.Store Methods (==) :: ExpectedVersion backend -> ExpectedVersion backend -> Bool # (/=) :: ExpectedVersion backend -> ExpectedVersion backend -> Bool # | |
Errors
data EventStoreError backend Source #
Possible errors when interacting with the event store.
Constructors
| ConsistencyError (ConsistencyErrorInfo backend) | Version expectation failures |
| BackendError ErrorInfo | Storage backend errors |
| OtherError ErrorInfo | Other application errors |
Instances
| Show (Cursor backend) => Show (EventStoreError backend) Source # | |
Defined in Hindsight.Store Methods showsPrec :: Int -> EventStoreError backend -> ShowS # show :: EventStoreError backend -> String # showList :: [EventStoreError backend] -> ShowS # | |
General error information with optional exception details.
Constructors
| ErrorInfo | |
Fields
| |
data ConsistencyErrorInfo backend Source #
Consistency violation details containing all version mismatches.
Constructors
| ConsistencyErrorInfo [VersionMismatch backend] |
Instances
| Show (Cursor backend) => Show (ConsistencyErrorInfo backend) Source # | |
Defined in Hindsight.Store Methods showsPrec :: Int -> ConsistencyErrorInfo backend -> ShowS # show :: ConsistencyErrorInfo backend -> String # showList :: [ConsistencyErrorInfo backend] -> ShowS # | |
data VersionMismatch backend Source #
Details of a version expectation failure.
Constructors
| VersionMismatch | |
Fields
| |
Instances
| Show (Cursor backend) => Show (VersionMismatch backend) Source # | |
Defined in Hindsight.Store Methods showsPrec :: Int -> VersionMismatch backend -> ShowS # show :: VersionMismatch backend -> String # showList :: [VersionMismatch backend] -> ShowS # | |
data HandlerException Source #
Exception thrown when an event handler fails during subscription processing.
This exception wraps the original exception with rich event context to aid debugging. When a handler throws an exception, the subscription will die immediately (fail-fast), and this enriched exception will be available via the Async handle.
The handler exception represents a bug in event processing code. Higher-level code (such as projection managers) can implement retry logic if desired.
Constructors
| HandlerException | |
Fields
| |
Instances
| Exception HandlerException Source # | |
Defined in Hindsight.Store Methods toException :: HandlerException -> SomeException # fromException :: SomeException -> Maybe HandlerException # | |
| Show HandlerException Source # | |
Defined in Hindsight.Store Methods showsPrec :: Int -> HandlerException -> ShowS # show :: HandlerException -> String # showList :: [HandlerException] -> ShowS # | |
Event Subscriptions
Real-time streaming of events as they're inserted.
Subscription Configuration
data EventSelector backend Source #
Criteria for selecting events in a subscription.
Constructors
| EventSelector | |
Fields
| |
Instances
| Show (StartupPosition backend) => Show (EventSelector backend) Source # | |
Defined in Hindsight.Store Methods showsPrec :: Int -> EventSelector backend -> ShowS # show :: EventSelector backend -> String # showList :: [EventSelector backend] -> ShowS # | |
data StreamSelector Source #
Stream selection criteria for subscriptions.
Constructors
| AllStreams | Subscribe to all streams |
| SingleStream StreamId | Subscribe to a specific stream only |
Instances
| Show StreamSelector Source # | |
Defined in Hindsight.Store Methods showsPrec :: Int -> StreamSelector -> ShowS # show :: StreamSelector -> String # showList :: [StreamSelector] -> ShowS # | |
data StartupPosition backend Source #
Starting position for event subscriptions.
Constructors
| FromBeginning | Start from the first event |
| FromLastProcessed (Cursor backend) | Start after the specified position |
Instances
| Show (Cursor backend) => Show (StartupPosition backend) Source # | |
Defined in Hindsight.Store Methods showsPrec :: Int -> StartupPosition backend -> ShowS # show :: StartupPosition backend -> String # showList :: [StartupPosition backend] -> ShowS # | |
Subscription Control
data SubscriptionHandle (backend :: k) Source #
Handle for managing a subscription lifecycle.
Constructors
| SubscriptionHandle | |
data SubscriptionResult Source #
Control flow for event subscriptions.
Constructors
| Stop | Stop processing and cancel the subscription |
| Continue | Continue processing subsequent events |
Instances
| Show SubscriptionResult Source # | |
Defined in Hindsight.Store Methods showsPrec :: Int -> SubscriptionResult -> ShowS # show :: SubscriptionResult -> String # showList :: [SubscriptionResult] -> ShowS # | |
| Eq SubscriptionResult Source # | |
Defined in Hindsight.Store Methods (==) :: SubscriptionResult -> SubscriptionResult -> Bool # (/=) :: SubscriptionResult -> SubscriptionResult -> Bool # | |
Event Handling
Processing events in subscriptions.
Event Envelopes
Events with full metadata from the store.
data EventEnvelope (event :: Symbol) backend Source #
Event with full metadata as retrieved from the store.
Constructors
| EventWithMetadata | |
Fields
| |
Instances
| (Show (CurrentPayloadType event), Show (Cursor backend)) => Show (EventEnvelope event backend) Source # | |
Defined in Hindsight.Store Methods showsPrec :: Int -> EventEnvelope event backend -> ShowS # show :: EventEnvelope event backend -> String # showList :: [EventEnvelope event backend] -> ShowS # | |
Event Matchers
Type-safe pattern matching on event types.
Build matchers using match and the (:?) operator:
matcher = match "user_created" handleUserCreated
:? match "user_updated" handleUserUpdated
:? MatchEnd
type EventHandler (event :: Symbol) (m :: Type -> Type) backend = EventEnvelope event backend -> m SubscriptionResult Source #
Handler function for processing events in a subscription.
data EventMatcher (ts :: [Symbol]) backend (m :: Type -> Type) where Source #
Type-safe event matcher for handling different event types in subscriptions.
Constructed using the match helper with (:?) to build a chain of handlers:
matcher = match "user_created" handleUserCreated
:? match "user_updated" handleUserUpdated
:? MatchEnd
Constructors
| (:?) :: forall (event :: Symbol) (m :: Type -> Type) backend (ts1 :: [Symbol]). Event event => (Proxy event, EventHandler event m backend) -> EventMatcher ts1 backend m -> EventMatcher (event ': ts1) backend m infixr 5 | |
| MatchEnd :: forall backend (m :: Type -> Type). EventMatcher ('[] :: [Symbol]) backend m |
Arguments
| :: forall (event :: k) -> forall a. a | Handler function |
| -> (Proxy event, a) | Handler pair for use with |
Helper to construct event handler pairs for EventMatcher.
Uses RequiredTypeArguments for better error messages when the type is omitted.
matcher = match "user_created" handleUser :? MatchEnd