hindsight-core
Copyright(c) 2024
LicenseBSD3
Maintainermaintainer@example.com
Stabilityexperimental
Safe HaskellNone
LanguageGHC2021

Hindsight.Store

Description

This module defines the core event store interface and types used across all storage backends.

Overview

An event store is a append-only log of events organized into streams. Each stream represents the lifecycle of a single aggregate or entity.

Basic Usage

import Hindsight.Store
import Hindsight.Store.Memory (newMemoryStore)

-- Create a store
store <- newMemoryStore

-- Insert events
let batch = StreamWrite
      { expectedVersion = NoStream
      , events = [mkEvent "user_created" payload]
      }
result <- insertEvents store Nothing (Map.singleton streamId batch)

-- Subscribe to events
handle <- subscribe store matcher selector

Key Concepts

  • Streams: Ordered sequences of events for a single entity
  • Cursors: Opaque positions in the global event log
  • Subscriptions: Real-time notifications of new events
  • Version Expectations: Optimistic concurrency control for writes
Synopsis

Core Identifiers

Unique identifiers for streams, events, and correlations.

newtype StreamId Source #

Unique identifier for an event stream.

Streams are logical groupings of related events, typically representing the lifecycle of a single aggregate or entity.

Constructors

StreamId 

Fields

newtype EventId Source #

Unique identifier for an individual event.

Constructors

EventId 

Fields

Instances

Instances details
FromJSON EventId Source # 
Instance details

Defined in Hindsight.Store

ToJSON EventId Source # 
Instance details

Defined in Hindsight.Store

Show EventId Source # 
Instance details

Defined in Hindsight.Store

Eq EventId Source # 
Instance details

Defined in Hindsight.Store

Methods

(==) :: EventId -> EventId -> Bool #

(/=) :: EventId -> EventId -> Bool #

Ord EventId Source # 
Instance details

Defined in Hindsight.Store

newtype CorrelationId Source #

Correlation identifier for tracking related events across streams.

Used to trace causally-related events that span multiple streams or external system boundaries.

Constructors

CorrelationId 

Fields

newtype StreamVersion Source #

Local stream version - simple incrementing number per stream (1, 2, 3, ...)

Constructors

StreamVersion Int64 

Instances

Instances details
FromJSON StreamVersion Source # 
Instance details

Defined in Hindsight.Store

ToJSON StreamVersion Source # 
Instance details

Defined in Hindsight.Store

Enum StreamVersion Source # 
Instance details

Defined in Hindsight.Store

Generic StreamVersion Source # 
Instance details

Defined in Hindsight.Store

Associated Types

type Rep StreamVersion 
Instance details

Defined in Hindsight.Store

type Rep StreamVersion = D1 ('MetaData "StreamVersion" "Hindsight.Store" "hindsight-core-0.1.0.0-inplace" 'True) (C1 ('MetaCons "StreamVersion" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)))
Num StreamVersion Source # 
Instance details

Defined in Hindsight.Store

Show StreamVersion Source # 
Instance details

Defined in Hindsight.Store

Eq StreamVersion Source # 
Instance details

Defined in Hindsight.Store

Ord StreamVersion Source # 
Instance details

Defined in Hindsight.Store

type Rep StreamVersion Source # 
Instance details

Defined in Hindsight.Store

type Rep StreamVersion = D1 ('MetaData "StreamVersion" "Hindsight.Store" "hindsight-core-0.1.0.0-inplace" 'True) (C1 ('MetaCons "StreamVersion" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)))

Event Store Interface

The main type class implemented by all storage backends.

The EventStore class includes the StoreConstraints type family, which defines additional constraints required by each backend.

class EventStore backend where Source #

Core interface for event store backends.

Provides methods for inserting events and subscribing to event streams. Each backend defines its own constraints via StoreConstraints.

Associated Types

type StoreConstraints backend (m :: Type -> Type) Source #

Additional constraints required by this backend.

For example, PostgreSQL requires MonadUnliftIO for async operations.

Methods

insertEvents :: forall (t :: Type -> Type) m. (Traversable t, StoreConstraints backend m) => BackendHandle backend -> Maybe CorrelationId -> Transaction t backend -> m (InsertionResult backend) Source #

Insert event batches atomically into the store.

All events in the batch are inserted as a single transaction. If any version check fails, the entire operation is rolled back.

subscribe :: forall m (ts :: [Symbol]). StoreConstraints backend m => BackendHandle backend -> EventMatcher ts backend m -> EventSelector backend -> m (SubscriptionHandle backend) Source #

Subscribe to events matching the given criteria.

The subscription will call the appropriate handler for each matching event until a handler returns Stop or the subscription is cancelled.

Backend Types

Backend-specific handle and cursor types.

Each backend defines its own Cursor and BackendHandle via type families.

type family BackendHandle backend = (result :: Type) | result -> backend Source #

Handle to interact with a specific storage backend.

Contains backend-specific configuration like connection pools, file handles, or in-memory storage references.

The type family is injective: knowing the handle type determines the backend, which helps with type inference.

type family Cursor backend = (result :: Type) | result -> backend Source #

Position marker within an event store, backend-specific.

For example, PostgreSQL uses a compound cursor of (transaction_no, seq_no), while a simple implementation might use a single sequence number.

The type family is injective: knowing the cursor type determines the backend which helps with type inference.

Event Operations

Types for inserting and querying events.

Insertion

data StreamWrite (t :: k -> Type) (e :: k) backend Source #

Write operation for inserting events into a single stream.

Groups events with their version expectation for atomic insertion. The event type e can be SomeLatestEvent for raw events or enriched with metadata.

Constructors

StreamWrite 

Fields

Instances

Instances details
(Show (ExpectedVersion backend), Show (t e)) => Show (StreamWrite t e backend) Source # 
Instance details

Defined in Hindsight.Store

Methods

showsPrec :: Int -> StreamWrite t e backend -> ShowS #

show :: StreamWrite t e backend -> String #

showList :: [StreamWrite t e backend] -> ShowS #

(Eq (ExpectedVersion backend), Eq (t e)) => Eq (StreamWrite t e backend) Source # 
Instance details

Defined in Hindsight.Store

Methods

(==) :: StreamWrite t e backend -> StreamWrite t e backend -> Bool #

(/=) :: StreamWrite t e backend -> StreamWrite t e backend -> Bool #

newtype Transaction (t :: Type -> Type) backend Source #

A multi-stream transactional write operation.

Represents a set of writes to be inserted atomically across multiple streams. All version checks must pass for the entire transaction to succeed.

Ordering Guarantees

  • Events within a single stream are totally ordered - they appear in the global event log in the order specified.
  • Events across different streams within the same transaction have __no ordering guarantee__ relative to each other. They are inserted atomically but may appear in any interleaving in the global log.

Composition

Transactions form a Monoid, allowing easy composition:

transaction1 <> transaction2 <> transaction3

When combining transactions:

  • Events for the same stream are concatenated (events from tx1, then tx2)
  • Version expectations are left-biased (tx1's expectation wins)

Example

-- Simple single-event write
let tx = singleEvent streamId NoStream event
result <- insertEvents store Nothing tx

-- Multi-stream with composition
let tx = singleEvent userStreamId NoStream userCreated
      <> appendAfterAny auditLogId auditEntry
result <- insertEvents store Nothing tx

Instances

Instances details
Semigroup (t SomeLatestEvent) => Monoid (Transaction t backend) Source # 
Instance details

Defined in Hindsight.Store

Methods

mempty :: Transaction t backend #

mappend :: Transaction t backend -> Transaction t backend -> Transaction t backend #

mconcat :: [Transaction t backend] -> Transaction t backend #

Semigroup (t SomeLatestEvent) => Semigroup (Transaction t backend) Source # 
Instance details

Defined in Hindsight.Store

Methods

(<>) :: Transaction t backend -> Transaction t backend -> Transaction t backend #

sconcat :: NonEmpty (Transaction t backend) -> Transaction t backend #

stimes :: Integral b => b -> Transaction t backend -> Transaction t backend #

(Show (Cursor backend), Show (t SomeLatestEvent)) => Show (Transaction t backend) Source # 
Instance details

Defined in Hindsight.Store

Methods

showsPrec :: Int -> Transaction t backend -> ShowS #

show :: Transaction t backend -> String #

showList :: [Transaction t backend] -> ShowS #

(Eq (Cursor backend), Eq (t SomeLatestEvent)) => Eq (Transaction t backend) Source # 
Instance details

Defined in Hindsight.Store

Methods

(==) :: Transaction t backend -> Transaction t backend -> Bool #

(/=) :: Transaction t backend -> Transaction t backend -> Bool #

data InsertionResult backend Source #

Result of an event insertion operation.

Constructors

SuccessfulInsertion (InsertionSuccess backend)

Success with cursor information

FailedInsertion (EventStoreError backend)

Failure with error details

data InsertionSuccess backend Source #

Success data from an event insertion operation.

Contains the global cursor position of the last event inserted and the per-stream cursor positions.

Constructors

InsertionSuccess 

Fields

Transaction Helpers

Helper functions for constructing transactions.

The singleEvent and multiEvent functions are the primary API, requiring explicit version expectations. Convenience helpers like appendAfterAny are provided for common patterns.

singleEvent Source #

Arguments

:: StreamId

Target stream identifier

-> ExpectedVersion backend

Version expectation for concurrency control

-> SomeLatestEvent

Event to insert

-> Transaction [] backend

Transaction with single event

Create a transaction with a single event to a single stream.

This is the primary API - it forces explicit choice of version expectation.

-- Creating a new aggregate
singleEvent accountId NoStream accountCreated

-- Updating with optimistic locking
singleEvent accountId (ExactStreamVersion v) balanceUpdated

-- Append-only log
singleEvent logId Any logEntry

multiEvent Source #

Arguments

:: StreamId

Target stream identifier

-> ExpectedVersion backend

Version expectation for concurrency control

-> t SomeLatestEvent

Collection of events to insert (typically a list)

-> Transaction t backend

Transaction with multiple events

Create a transaction with multiple events to a single stream.

multiEvent streamId NoStream [event1, event2, event3]

appendAfterAny Source #

Arguments

:: StreamId

Target stream identifier (must exist)

-> SomeLatestEvent

Event to append

-> Transaction [] backend

Transaction with StreamExists expectation

Append an event to an existing stream without version checking.

Uses StreamExists for the version expectation - the stream must already exist, but any version is acceptable. Suitable for append-only scenarios where multiple processes may be writing concurrently.

Use Cases

  • Audit logs where the log stream is pre-created
  • Event logs with concurrent writers
  • Not suitable for aggregates with business invariants
-- Audit log entry (stream must exist)
appendAfterAny auditLogId auditEntry

-- Multiple concurrent writers OK
tx1 = appendAfterAny logId event1
tx2 = appendAfterAny logId event2  -- No conflict

appendToOrCreateStream Source #

Arguments

:: StreamId

Target stream identifier (created if missing)

-> SomeLatestEvent

Event to append

-> Transaction [] backend

Transaction with Any expectation (no version check)

Append an event, creating the stream if it doesn't exist.

Uses Any for the version expectation - no version checking at all. This is the most permissive option.

Warning: This provides no concurrency control. Suitable for testing or truly append-only scenarios, but dangerous for aggregates with invariants.

Use Cases

  • Testing where you don't care about stream state
  • Idempotent appends where duplicates are handled elsewhere
  • Not suitable for production aggregates
-- Test code
appendToOrCreateStream testStreamId testEvent

-- For production, prefer explicit version expectations
singleEvent streamId NoStream event  -- Better: explicit create

fromWrites Source #

Arguments

:: forall (t :: Type -> Type) backend. [(StreamId, StreamWrite t SomeLatestEvent backend)]

List of stream writes

-> Transaction t backend

Combined transaction

Create a transaction from a list of stream writes.

fromWrites
  [ (stream1, StreamWrite NoStream [e1])
  , (stream2, StreamWrite NoStream [e2])
  ]

Version Control

Optimistic concurrency control for preventing conflicts.

When inserting events, you can specify version expectations to ensure the stream hasn't changed since you read it. This implements optimistic locking without holding database locks during business logic execution.

See: https://en.wikipedia.org/wiki/Optimistic_concurrency_control

data ExpectedVersion backend Source #

Version expectation for optimistic concurrency control.

Used to prevent concurrent modifications by specifying what version a stream should be at before inserting new events.

This implements optimistic locking: instead of holding locks during a read-modify-write cycle, we check at write time that the stream hasn't changed since we read it.

See: https://en.wikipedia.org/wiki/Optimistic_concurrency_control

Common patterns:

  • NoStream - Creating a new aggregate (ensure stream doesn't exist)
  • ExactStreamVersion - Normal update (check we have latest version)
  • Any - Append-only scenarios (no conflict prevention)

Constructors

NoStream

Stream must not exist (for creating new streams)

StreamExists

Stream must exist (any version)

ExactVersion (Cursor backend)

Stream must be at this exact global position

ExactStreamVersion StreamVersion

Stream must be at this exact local version

Any

No version check (always succeeds)

Instances

Instances details
Show (Cursor backend) => Show (ExpectedVersion backend) Source # 
Instance details

Defined in Hindsight.Store

Methods

showsPrec :: Int -> ExpectedVersion backend -> ShowS #

show :: ExpectedVersion backend -> String #

showList :: [ExpectedVersion backend] -> ShowS #

Eq (Cursor backend) => Eq (ExpectedVersion backend) Source # 
Instance details

Defined in Hindsight.Store

Methods

(==) :: ExpectedVersion backend -> ExpectedVersion backend -> Bool #

(/=) :: ExpectedVersion backend -> ExpectedVersion backend -> Bool #

Errors

data EventStoreError backend Source #

Possible errors when interacting with the event store.

Constructors

ConsistencyError (ConsistencyErrorInfo backend)

Version expectation failures

BackendError ErrorInfo

Storage backend errors

OtherError ErrorInfo

Other application errors

Instances

Instances details
Show (Cursor backend) => Show (EventStoreError backend) Source # 
Instance details

Defined in Hindsight.Store

Methods

showsPrec :: Int -> EventStoreError backend -> ShowS #

show :: EventStoreError backend -> String #

showList :: [EventStoreError backend] -> ShowS #

data ErrorInfo Source #

General error information with optional exception details.

Constructors

ErrorInfo 

Fields

Instances

Instances details
Show ErrorInfo Source # 
Instance details

Defined in Hindsight.Store

data ConsistencyErrorInfo backend Source #

Consistency violation details containing all version mismatches.

Constructors

ConsistencyErrorInfo [VersionMismatch backend] 

Instances

Instances details
Show (Cursor backend) => Show (ConsistencyErrorInfo backend) Source # 
Instance details

Defined in Hindsight.Store

data VersionMismatch backend Source #

Details of a version expectation failure.

Constructors

VersionMismatch 

Fields

Instances

Instances details
Show (Cursor backend) => Show (VersionMismatch backend) Source # 
Instance details

Defined in Hindsight.Store

Methods

showsPrec :: Int -> VersionMismatch backend -> ShowS #

show :: VersionMismatch backend -> String #

showList :: [VersionMismatch backend] -> ShowS #

data HandlerException Source #

Exception thrown when an event handler fails during subscription processing.

This exception wraps the original exception with rich event context to aid debugging. When a handler throws an exception, the subscription will die immediately (fail-fast), and this enriched exception will be available via the Async handle.

The handler exception represents a bug in event processing code. Higher-level code (such as projection managers) can implement retry logic if desired.

Constructors

HandlerException 

Fields

Event Subscriptions

Real-time streaming of events as they're inserted.

Subscription Configuration

data EventSelector backend Source #

Criteria for selecting events in a subscription.

Constructors

EventSelector 

Fields

Instances

Instances details
Show (StartupPosition backend) => Show (EventSelector backend) Source # 
Instance details

Defined in Hindsight.Store

Methods

showsPrec :: Int -> EventSelector backend -> ShowS #

show :: EventSelector backend -> String #

showList :: [EventSelector backend] -> ShowS #

data StreamSelector Source #

Stream selection criteria for subscriptions.

Constructors

AllStreams

Subscribe to all streams

SingleStream StreamId

Subscribe to a specific stream only

Instances

Instances details
Show StreamSelector Source # 
Instance details

Defined in Hindsight.Store

data StartupPosition backend Source #

Starting position for event subscriptions.

Constructors

FromBeginning

Start from the first event

FromLastProcessed (Cursor backend)

Start after the specified position

Instances

Instances details
Show (Cursor backend) => Show (StartupPosition backend) Source # 
Instance details

Defined in Hindsight.Store

Methods

showsPrec :: Int -> StartupPosition backend -> ShowS #

show :: StartupPosition backend -> String #

showList :: [StartupPosition backend] -> ShowS #

Subscription Control

data SubscriptionHandle (backend :: k) Source #

Handle for managing a subscription lifecycle.

Constructors

SubscriptionHandle 

Fields

  • cancel :: IO ()

    Cancel the subscription

  • wait :: IO ()

    Wait for the subscription to complete or fail. Re-throws any exception from the subscription thread. Useful for testing (to observe handler exceptions) and production (to monitor subscription health).

data SubscriptionResult Source #

Control flow for event subscriptions.

Constructors

Stop

Stop processing and cancel the subscription

Continue

Continue processing subsequent events

Event Handling

Processing events in subscriptions.

Event Envelopes

Events with full metadata from the store.

data EventEnvelope (event :: Symbol) backend Source #

Event with full metadata as retrieved from the store.

Constructors

EventWithMetadata 

Fields

Instances

Instances details
(Show (CurrentPayloadType event), Show (Cursor backend)) => Show (EventEnvelope event backend) Source # 
Instance details

Defined in Hindsight.Store

Methods

showsPrec :: Int -> EventEnvelope event backend -> ShowS #

show :: EventEnvelope event backend -> String #

showList :: [EventEnvelope event backend] -> ShowS #

Event Matchers

Type-safe pattern matching on event types.

Build matchers using match and the (:?) operator:

   matcher = match "user_created" handleUserCreated
      :? match "user_updated" handleUserUpdated
      :? MatchEnd
   

type EventHandler (event :: Symbol) (m :: Type -> Type) backend = EventEnvelope event backend -> m SubscriptionResult Source #

Handler function for processing events in a subscription.

data EventMatcher (ts :: [Symbol]) backend (m :: Type -> Type) where Source #

Type-safe event matcher for handling different event types in subscriptions.

Constructed using the match helper with (:?) to build a chain of handlers:

matcher = match "user_created" handleUserCreated
       :? match "user_updated" handleUserUpdated
       :? MatchEnd

Constructors

(:?) :: forall (event :: Symbol) (m :: Type -> Type) backend (ts1 :: [Symbol]). Event event => (Proxy event, EventHandler event m backend) -> EventMatcher ts1 backend m -> EventMatcher (event ': ts1) backend m infixr 5 
MatchEnd :: forall backend (m :: Type -> Type). EventMatcher ('[] :: [Symbol]) backend m 

match Source #

Arguments

:: forall (event :: k) -> forall a. a

Handler function

-> (Proxy event, a)

Handler pair for use with (:?)

Helper to construct event handler pairs for EventMatcher.

Uses RequiredTypeArguments for better error messages when the type is omitted.

matcher = match "user_created" handleUser :? MatchEnd