{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}

module Hindsight.Store.Memory.Internal (
    -- * Core Types
    StoreState (..),
    StoredEvent (..),

    -- * Cursor that can be rebuilt from sequence numbers
    StoreCursor (..),

    -- * Version Control
    checkVersionConstraint,
    getCurrentVersion,
    getCurrentStreamVersion,

    -- * Event Processing
    processEvents,
    processEventThroughMatchers,

    -- * State Management
    updateState,

    -- * Common Operations
    makeStoredEvents,
    checkAllVersions,
    insertAllEvents,
    subscribeToEvents,
)
where

import Control.Concurrent.Async (async, cancel, wait)
import Control.Concurrent.STM (
    TVar,
    atomically,
    newTVar,
    readTVar,
    retry,
    writeTVar,
 )
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO (..))
import Data.Aeson (FromJSON (..), ToJSON (..), Value (..))
import Data.Foldable (toList)
import Data.List (sortOn, zip4)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Proxy (Proxy)
import Data.Text (Text)
import Data.Text qualified as T
import Data.Time (UTCTime)
import GHC.Generics (Generic)
import Hindsight.Events
import Hindsight.Store
import Hindsight.Store.Parsing (parseStoredEventToEnvelope)
import UnliftIO (MonadUnliftIO, withRunInIO)
import UnliftIO.Exception (SomeException, catch, throwIO)

-- | Raw stored event with minimal type information.
data StoredEvent = StoredEvent
    { StoredEvent -> Integer
seqNo :: Integer
    -- ^ Global sequence number for total ordering
    , StoredEvent -> EventId
eventId :: EventId
    -- ^ Unique event identifier
    , StoredEvent -> StreamId
streamId :: StreamId
    -- ^ Stream this event belongs to
    , StoredEvent -> Maybe CorrelationId
correlationId :: Maybe CorrelationId
    -- ^ Optional correlation ID for tracking
    , StoredEvent -> UTCTime
createdAt :: UTCTime
    -- ^ Event creation timestamp
    , StoredEvent -> Text
eventName :: Text
    -- ^ Event type name
    , StoredEvent -> Integer
eventVersion :: Integer
    -- ^ Event payload version number
    , StoredEvent -> Value
payload :: Value
    -- ^ JSON payload
    , StoredEvent -> StreamVersion
streamVersion :: StreamVersion
    -- ^ Local stream version (1, 2, 3...)
    }
    deriving (Int -> StoredEvent -> ShowS
[StoredEvent] -> ShowS
StoredEvent -> String
(Int -> StoredEvent -> ShowS)
-> (StoredEvent -> String)
-> ([StoredEvent] -> ShowS)
-> Show StoredEvent
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> StoredEvent -> ShowS
showsPrec :: Int -> StoredEvent -> ShowS
$cshow :: StoredEvent -> String
show :: StoredEvent -> String
$cshowList :: [StoredEvent] -> ShowS
showList :: [StoredEvent] -> ShowS
Show, StoredEvent -> StoredEvent -> Bool
(StoredEvent -> StoredEvent -> Bool)
-> (StoredEvent -> StoredEvent -> Bool) -> Eq StoredEvent
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: StoredEvent -> StoredEvent -> Bool
== :: StoredEvent -> StoredEvent -> Bool
$c/= :: StoredEvent -> StoredEvent -> Bool
/= :: StoredEvent -> StoredEvent -> Bool
Eq, (forall x. StoredEvent -> Rep StoredEvent x)
-> (forall x. Rep StoredEvent x -> StoredEvent)
-> Generic StoredEvent
forall x. Rep StoredEvent x -> StoredEvent
forall x. StoredEvent -> Rep StoredEvent x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. StoredEvent -> Rep StoredEvent x
from :: forall x. StoredEvent -> Rep StoredEvent x
$cto :: forall x. Rep StoredEvent x -> StoredEvent
to :: forall x. Rep StoredEvent x -> StoredEvent
Generic, Maybe StoredEvent
Value -> Parser [StoredEvent]
Value -> Parser StoredEvent
(Value -> Parser StoredEvent)
-> (Value -> Parser [StoredEvent])
-> Maybe StoredEvent
-> FromJSON StoredEvent
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser StoredEvent
parseJSON :: Value -> Parser StoredEvent
$cparseJSONList :: Value -> Parser [StoredEvent]
parseJSONList :: Value -> Parser [StoredEvent]
$comittedField :: Maybe StoredEvent
omittedField :: Maybe StoredEvent
FromJSON, [StoredEvent] -> Value
[StoredEvent] -> Encoding
StoredEvent -> Bool
StoredEvent -> Value
StoredEvent -> Encoding
(StoredEvent -> Value)
-> (StoredEvent -> Encoding)
-> ([StoredEvent] -> Value)
-> ([StoredEvent] -> Encoding)
-> (StoredEvent -> Bool)
-> ToJSON StoredEvent
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: StoredEvent -> Value
toJSON :: StoredEvent -> Value
$ctoEncoding :: StoredEvent -> Encoding
toEncoding :: StoredEvent -> Encoding
$ctoJSONList :: [StoredEvent] -> Value
toJSONList :: [StoredEvent] -> Value
$ctoEncodingList :: [StoredEvent] -> Encoding
toEncodingList :: [StoredEvent] -> Encoding
$comitField :: StoredEvent -> Bool
omitField :: StoredEvent -> Bool
ToJSON)

-- | Internal state maintained by the store.
data StoreState backend = StoreState
    { forall backend. StoreState backend -> Integer
nextSequence :: Integer
    -- ^ Next global sequence number to assign
    , forall backend. StoreState backend -> Map Integer StoredEvent
events :: Map Integer StoredEvent
    -- ^ All events indexed by sequence number
    , forall backend. StoreState backend -> Map StreamId [Integer]
streamEvents :: Map StreamId [Integer]
    -- ^ Stream-to-sequence-numbers index
    , forall backend. StoreState backend -> Map StreamId (Cursor backend)
streamVersions :: Map StreamId (Cursor backend)
    -- ^ Latest global cursor per stream
    , forall backend. StoreState backend -> Map StreamId StreamVersion
streamLocalVersions :: Map StreamId StreamVersion
    -- ^ Latest local version per stream
    , forall backend. StoreState backend -> Map StreamId (TVar Integer)
streamNotifications :: Map StreamId (TVar Integer)
    -- ^ Per-stream notification variables for subscriptions
    , forall backend. StoreState backend -> TVar Integer
globalNotification :: TVar Integer
    -- ^ Global notification variable for all-stream subscriptions
    }

deriving instance (Eq (Cursor backend)) => Eq (StoreState backend)

-- | Capability for creating cursors from sequence numbers
class StoreCursor backend where
    -- | Create a cursor from a sequence number
    makeCursor :: Integer -> Cursor backend

    -- | Get the current sequence number
    makeSequenceNo :: Cursor backend -> Integer

-- | Check version constraints for a stream
checkVersionConstraint ::
    (Eq (Cursor backend)) =>
    StoreState backend ->
    StreamId ->
    ExpectedVersion backend ->
    Either (VersionMismatch backend) ()
checkVersionConstraint :: forall backend.
Eq (Cursor backend) =>
StoreState backend
-> StreamId
-> ExpectedVersion backend
-> Either (VersionMismatch backend) ()
checkVersionConstraint StoreState backend
state StreamId
streamId ExpectedVersion backend
verExpectation = case ExpectedVersion backend
verExpectation of
    ExpectedVersion backend
Any -> () -> Either (VersionMismatch backend) ()
forall a b. b -> Either a b
Right ()
    ExpectedVersion backend
NoStream
        | StreamId -> Map StreamId (Cursor backend) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member StreamId
streamId StoreState backend
state.streamVersions ->
            VersionMismatch backend -> Either (VersionMismatch backend) ()
forall a b. a -> Either a b
Left (VersionMismatch backend -> Either (VersionMismatch backend) ())
-> VersionMismatch backend -> Either (VersionMismatch backend) ()
forall a b. (a -> b) -> a -> b
$ StreamId
-> ExpectedVersion backend
-> Maybe (Cursor backend)
-> VersionMismatch backend
forall backend.
StreamId
-> ExpectedVersion backend
-> Maybe (Cursor backend)
-> VersionMismatch backend
VersionMismatch StreamId
streamId ExpectedVersion backend
verExpectation (StoreState backend -> StreamId -> Maybe (Cursor backend)
forall backend.
StoreState backend -> StreamId -> Maybe (Cursor backend)
getCurrentVersion StoreState backend
state StreamId
streamId)
        | Bool
otherwise -> () -> Either (VersionMismatch backend) ()
forall a b. b -> Either a b
Right ()
    ExpectedVersion backend
StreamExists
        | StreamId -> Map StreamId (Cursor backend) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member StreamId
streamId StoreState backend
state.streamVersions -> () -> Either (VersionMismatch backend) ()
forall a b. b -> Either a b
Right ()
        | Bool
otherwise ->
            VersionMismatch backend -> Either (VersionMismatch backend) ()
forall a b. a -> Either a b
Left (VersionMismatch backend -> Either (VersionMismatch backend) ())
-> VersionMismatch backend -> Either (VersionMismatch backend) ()
forall a b. (a -> b) -> a -> b
$ StreamId
-> ExpectedVersion backend
-> Maybe (Cursor backend)
-> VersionMismatch backend
forall backend.
StreamId
-> ExpectedVersion backend
-> Maybe (Cursor backend)
-> VersionMismatch backend
VersionMismatch StreamId
streamId ExpectedVersion backend
verExpectation Maybe (Cursor backend)
forall a. Maybe a
Nothing
    ExactVersion Cursor backend
expected ->
        let actual :: Maybe (Cursor backend)
actual = StoreState backend -> StreamId -> Maybe (Cursor backend)
forall backend.
StoreState backend -> StreamId -> Maybe (Cursor backend)
getCurrentVersion StoreState backend
state StreamId
streamId
         in if Cursor backend -> Maybe (Cursor backend)
forall a. a -> Maybe a
Just Cursor backend
expected Maybe (Cursor backend) -> Maybe (Cursor backend) -> Bool
forall a. Eq a => a -> a -> Bool
== Maybe (Cursor backend)
actual
                then () -> Either (VersionMismatch backend) ()
forall a b. b -> Either a b
Right ()
                else VersionMismatch backend -> Either (VersionMismatch backend) ()
forall a b. a -> Either a b
Left (VersionMismatch backend -> Either (VersionMismatch backend) ())
-> VersionMismatch backend -> Either (VersionMismatch backend) ()
forall a b. (a -> b) -> a -> b
$ StreamId
-> ExpectedVersion backend
-> Maybe (Cursor backend)
-> VersionMismatch backend
forall backend.
StreamId
-> ExpectedVersion backend
-> Maybe (Cursor backend)
-> VersionMismatch backend
VersionMismatch StreamId
streamId ExpectedVersion backend
verExpectation Maybe (Cursor backend)
actual
    ExactStreamVersion StreamVersion
expectedStreamVersion ->
        let actualStreamVersion :: Maybe StreamVersion
actualStreamVersion = StoreState backend -> StreamId -> Maybe StreamVersion
forall backend.
StoreState backend -> StreamId -> Maybe StreamVersion
getCurrentStreamVersion StoreState backend
state StreamId
streamId
            actualCursor :: Maybe (Cursor backend)
actualCursor = StoreState backend -> StreamId -> Maybe (Cursor backend)
forall backend.
StoreState backend -> StreamId -> Maybe (Cursor backend)
getCurrentVersion StoreState backend
state StreamId
streamId
         in if StreamVersion -> Maybe StreamVersion
forall a. a -> Maybe a
Just StreamVersion
expectedStreamVersion Maybe StreamVersion -> Maybe StreamVersion -> Bool
forall a. Eq a => a -> a -> Bool
== Maybe StreamVersion
actualStreamVersion
                then () -> Either (VersionMismatch backend) ()
forall a b. b -> Either a b
Right ()
                else VersionMismatch backend -> Either (VersionMismatch backend) ()
forall a b. a -> Either a b
Left (VersionMismatch backend -> Either (VersionMismatch backend) ())
-> VersionMismatch backend -> Either (VersionMismatch backend) ()
forall a b. (a -> b) -> a -> b
$ StreamId
-> ExpectedVersion backend
-> Maybe (Cursor backend)
-> VersionMismatch backend
forall backend.
StreamId
-> ExpectedVersion backend
-> Maybe (Cursor backend)
-> VersionMismatch backend
VersionMismatch StreamId
streamId ExpectedVersion backend
verExpectation Maybe (Cursor backend)
actualCursor

-- | Get current version of a stream (global cursor)
getCurrentVersion :: StoreState backend -> StreamId -> Maybe (Cursor backend)
getCurrentVersion :: forall backend.
StoreState backend -> StreamId -> Maybe (Cursor backend)
getCurrentVersion StoreState backend
state StreamId
streamId =
    StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamId StoreState backend
state.streamVersions

-- | Get current stream version of a stream (local cursor)
getCurrentStreamVersion :: StoreState backend -> StreamId -> Maybe StreamVersion
getCurrentStreamVersion :: forall backend.
StoreState backend -> StreamId -> Maybe StreamVersion
getCurrentStreamVersion StoreState backend
state StreamId
streamId =
    StreamId -> Map StreamId StreamVersion -> Maybe StreamVersion
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamId StoreState backend
state.streamLocalVersions

-- | Process events through matcher chain
processEvents ::
    forall ts m backend.
    (MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
    EventMatcher ts backend m ->
    [StoredEvent] ->
    m SubscriptionResult
processEvents :: forall (ts :: [Symbol]) (m :: * -> *) backend.
(MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
EventMatcher ts backend m -> [StoredEvent] -> m SubscriptionResult
processEvents EventMatcher ts backend m
MatchEnd [StoredEvent]
_ = SubscriptionResult -> m SubscriptionResult
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
processEvents ((Proxy event, EventHandler event m backend)
matcher :? EventMatcher ts1 backend m
rest) [StoredEvent]
events = do
    let processAllEvents :: [StoredEvent] -> m SubscriptionResult
processAllEvents [] = SubscriptionResult -> m SubscriptionResult
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
        processAllEvents (StoredEvent
event : [StoredEvent]
remaining) = do
            result <- (Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> StoredEvent
-> m SubscriptionResult
forall (m :: * -> *) backend (event :: Symbol) (ts :: [Symbol]).
(MonadUnliftIO m, Event event, StoreCursor backend,
 Show (Cursor backend)) =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts backend m -> StoredEvent -> m SubscriptionResult
processEventThroughMatchers (Proxy event, EventHandler event m backend)
matcher EventMatcher ts1 backend m
rest StoredEvent
event
            case result of
                SubscriptionResult
Stop -> SubscriptionResult -> m SubscriptionResult
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Stop
                SubscriptionResult
Continue -> [StoredEvent] -> m SubscriptionResult
processAllEvents [StoredEvent]
remaining
     in [StoredEvent] -> m SubscriptionResult
processAllEvents [StoredEvent]
events

-- | Process a single event through matchers
processEventThroughMatchers ::
    forall m backend event ts.
    (MonadUnliftIO m, Event event, StoreCursor backend, Show (Cursor backend)) =>
    (Proxy event, EventHandler event m backend) ->
    EventMatcher ts backend m ->
    StoredEvent ->
    m SubscriptionResult
processEventThroughMatchers :: forall (m :: * -> *) backend (event :: Symbol) (ts :: [Symbol]).
(MonadUnliftIO m, Event event, StoreCursor backend,
 Show (Cursor backend)) =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts backend m -> StoredEvent -> m SubscriptionResult
processEventThroughMatchers (Proxy event
proxy, EventHandler event m backend
handler) EventMatcher ts backend m
rest StoredEvent
event = do
    let targetEvent :: Text
targetEvent = Proxy event -> Text
forall (event :: Symbol). KnownSymbol event => Proxy event -> Text
getEventName Proxy event
proxy
    if StoredEvent
event.eventName Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
targetEvent
        then case Proxy event
-> EventId
-> StreamId
-> Cursor backend
-> StreamVersion
-> Maybe CorrelationId
-> UTCTime
-> Value
-> Integer
-> Maybe (EventEnvelope event backend)
forall (event :: Symbol) backend.
Event event =>
Proxy event
-> EventId
-> StreamId
-> Cursor backend
-> StreamVersion
-> Maybe CorrelationId
-> UTCTime
-> Value
-> Integer
-> Maybe (EventEnvelope event backend)
parseStoredEventToEnvelope
            Proxy event
proxy
            StoredEvent
event.eventId
            StoredEvent
event.streamId
            (Integer -> Cursor backend
forall backend. StoreCursor backend => Integer -> Cursor backend
makeCursor StoredEvent
event.seqNo)
            StoredEvent
event.streamVersion
            StoredEvent
event.correlationId
            StoredEvent
event.createdAt
            StoredEvent
event.payload
            StoredEvent
event.eventVersion of
            Just EventEnvelope event backend
envelope ->
                (EventHandler event m backend
handler EventEnvelope event backend
envelope) m SubscriptionResult
-> (SomeException -> m SubscriptionResult) -> m SubscriptionResult
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(SomeException
e :: SomeException) ->
                    HandlerException -> m SubscriptionResult
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (HandlerException -> m SubscriptionResult)
-> HandlerException -> m SubscriptionResult
forall a b. (a -> b) -> a -> b
$
                        HandlerException
                            { originalException :: SomeException
originalException = SomeException
e
                            , failedEventPosition :: Text
failedEventPosition = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Cursor backend -> String
forall a. Show a => a -> String
show (forall backend. StoreCursor backend => Integer -> Cursor backend
makeCursor @backend StoredEvent
event.seqNo)
                            , failedEventId :: EventId
failedEventId = StoredEvent
event.eventId
                            , failedEventName :: Text
failedEventName = StoredEvent
event.eventName
                            , failedEventStreamId :: StreamId
failedEventStreamId = StoredEvent
event.streamId
                            , failedEventStreamVersion :: StreamVersion
failedEventStreamVersion = StoredEvent
event.streamVersion
                            , failedEventCorrelationId :: Maybe CorrelationId
failedEventCorrelationId = StoredEvent
event.correlationId
                            , failedEventCreatedAt :: UTCTime
failedEventCreatedAt = StoredEvent
event.createdAt
                            }
            Maybe (EventEnvelope event backend)
Nothing -> EventMatcher ts backend m -> [StoredEvent] -> m SubscriptionResult
forall (ts :: [Symbol]) (m :: * -> *) backend.
(MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
EventMatcher ts backend m -> [StoredEvent] -> m SubscriptionResult
processEvents EventMatcher ts backend m
rest [StoredEvent
event]
        else EventMatcher ts backend m -> [StoredEvent] -> m SubscriptionResult
forall (ts :: [Symbol]) (m :: * -> *) backend.
(MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
EventMatcher ts backend m -> [StoredEvent] -> m SubscriptionResult
processEvents EventMatcher ts backend m
rest [StoredEvent
event]

-- | Update store state with new events
updateState :: forall backend. (StoreCursor backend) => StoredEvent -> StoreState backend -> StoreState backend
updateState :: forall backend.
StoreCursor backend =>
StoredEvent -> StoreState backend -> StoreState backend
updateState StoredEvent
event StoreState backend
state =
    StoreState backend
state
        { events = Map.insert event.seqNo event state.events
        , streamEvents = Map.alter (Just . maybe [event.seqNo] (event.seqNo :)) event.streamId state.streamEvents
        , streamVersions = Map.insert event.streamId (makeCursor event.seqNo) state.streamVersions
        , streamLocalVersions = Map.insert event.streamId event.streamVersion state.streamLocalVersions
        , globalNotification = state.globalNotification
        }

-- | Make stored events from raw data
makeStoredEvents ::
    forall t backend.
    (Foldable t) =>
    StoreState backend ->
    Maybe CorrelationId ->
    UTCTime ->
    [EventId] ->
    StreamId ->
    StreamWrite t SomeLatestEvent backend ->
    ([StoredEvent], [Integer])
makeStoredEvents :: forall (t :: * -> *) backend.
Foldable t =>
StoreState backend
-> Maybe CorrelationId
-> UTCTime
-> [EventId]
-> StreamId
-> StreamWrite t SomeLatestEvent backend
-> ([StoredEvent], [Integer])
makeStoredEvents StoreState backend
state Maybe CorrelationId
mbCorrId UTCTime
now [EventId]
eventIds StreamId
streamId StreamWrite t SomeLatestEvent backend
batch =
    let baseSeq :: Integer
baseSeq = StoreState backend
state.nextSequence
        seqNos :: [Integer]
seqNos = [Integer
baseSeq .. Integer
baseSeq Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral (t SomeLatestEvent -> Int
forall a. t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length StreamWrite t SomeLatestEvent backend
batch.events) Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
1]
        currentStreamVersion :: StreamVersion
currentStreamVersion = StreamVersion
-> StreamId -> Map StreamId StreamVersion -> StreamVersion
forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault (Int64 -> StreamVersion
StreamVersion Int64
0) StreamId
streamId StoreState backend
state.streamLocalVersions
        streamVersions :: [StreamVersion]
streamVersions = [StreamVersion
currentStreamVersion StreamVersion -> StreamVersion -> StreamVersion
forall a. Num a => a -> a -> a
+ StreamVersion
1 .. StreamVersion
currentStreamVersion StreamVersion -> StreamVersion -> StreamVersion
forall a. Num a => a -> a -> a
+ Int -> StreamVersion
forall a b. (Integral a, Num b) => a -> b
fromIntegral (t SomeLatestEvent -> Int
forall a. t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length StreamWrite t SomeLatestEvent backend
batch.events)]
        mkStoredEvent :: (Integer, (EventId, SomeLatestEvent), StreamVersion)
-> (StoredEvent, Integer)
mkStoredEvent (Integer
sn, (EventId
eid, SomeLatestEvent Proxy event
proxy CurrentPayloadType event
payload), StreamVersion
streamVer) =
            let name :: Text
name = Proxy event -> Text
forall (event :: Symbol). KnownSymbol event => Proxy event -> Text
getEventName Proxy event
proxy
                version :: Integer
version = Integer -> Integer
forall a. Num a => Integer -> a
fromInteger (Integer -> Integer) -> Integer -> Integer
forall a b. (a -> b) -> a -> b
$ Proxy event -> Integer
forall (event :: Symbol). Event event => Proxy event -> Integer
getMaxVersion Proxy event
proxy
             in ( StoredEvent
                    { seqNo :: Integer
seqNo = Integer
sn
                    , eventId :: EventId
eventId = EventId
eid
                    , streamId :: StreamId
streamId = StreamId
streamId
                    , correlationId :: Maybe CorrelationId
correlationId = Maybe CorrelationId
mbCorrId
                    , createdAt :: UTCTime
createdAt = UTCTime
now
                    , eventName :: Text
eventName = Text
name
                    , eventVersion :: Integer
eventVersion = Integer
version
                    , payload :: Value
payload = FinalVersionType (FromList (Versions event)) -> Value
forall a. ToJSON a => a -> Value
toJSON FinalVersionType (FromList (Versions event))
CurrentPayloadType event
payload
                    , streamVersion :: StreamVersion
streamVersion = StreamVersion
streamVer
                    }
                , Integer
sn
                )
     in [(StoredEvent, Integer)] -> ([StoredEvent], [Integer])
forall a b. [(a, b)] -> ([a], [b])
unzip ([(StoredEvent, Integer)] -> ([StoredEvent], [Integer]))
-> [(StoredEvent, Integer)] -> ([StoredEvent], [Integer])
forall a b. (a -> b) -> a -> b
$ ((Integer, (EventId, SomeLatestEvent), StreamVersion)
 -> (StoredEvent, Integer))
-> [(Integer, (EventId, SomeLatestEvent), StreamVersion)]
-> [(StoredEvent, Integer)]
forall a b. (a -> b) -> [a] -> [b]
map (Integer, (EventId, SomeLatestEvent), StreamVersion)
-> (StoredEvent, Integer)
mkStoredEvent ([(Integer, (EventId, SomeLatestEvent), StreamVersion)]
 -> [(StoredEvent, Integer)])
-> [(Integer, (EventId, SomeLatestEvent), StreamVersion)]
-> [(StoredEvent, Integer)]
forall a b. (a -> b) -> a -> b
$ [Integer]
-> [(EventId, SomeLatestEvent)]
-> [StreamVersion]
-> [(Integer, (EventId, SomeLatestEvent), StreamVersion)]
forall a b c. [a] -> [b] -> [c] -> [(a, b, c)]
zip3 [Integer]
seqNos ([EventId] -> [SomeLatestEvent] -> [(EventId, SomeLatestEvent)]
forall a b. [a] -> [b] -> [(a, b)]
zip [EventId]
eventIds ([SomeLatestEvent] -> [(EventId, SomeLatestEvent)])
-> [SomeLatestEvent] -> [(EventId, SomeLatestEvent)]
forall a b. (a -> b) -> a -> b
$ t SomeLatestEvent -> [SomeLatestEvent]
forall a. t a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList StreamWrite t SomeLatestEvent backend
batch.events) [StreamVersion]
streamVersions

-- | Check version constraints for all streams
checkAllVersions ::
    forall t backend.
    (Eq (Cursor backend)) =>
    StoreState backend ->
    Map StreamId (StreamWrite t SomeLatestEvent backend) ->
    Either (EventStoreError backend) ()
checkAllVersions :: forall (t :: * -> *) backend.
Eq (Cursor backend) =>
StoreState backend
-> Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Either (EventStoreError backend) ()
checkAllVersions StoreState backend
state Map StreamId (StreamWrite t SomeLatestEvent backend)
batches = do
    case [Either (VersionMismatch backend) ()]
-> Either (VersionMismatch backend) ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_
        [ StoreState backend
-> StreamId
-> ExpectedVersion backend
-> Either (VersionMismatch backend) ()
forall backend.
Eq (Cursor backend) =>
StoreState backend
-> StreamId
-> ExpectedVersion backend
-> Either (VersionMismatch backend) ()
checkVersionConstraint StoreState backend
state StreamId
streamId StreamWrite t SomeLatestEvent backend
batch.expectedVersion
        | (StreamId
streamId, StreamWrite t SomeLatestEvent backend
batch) <- Map StreamId (StreamWrite t SomeLatestEvent backend)
-> [(StreamId, StreamWrite t SomeLatestEvent backend)]
forall k a. Map k a -> [(k, a)]
Map.toList Map StreamId (StreamWrite t SomeLatestEvent backend)
batches
        ] of
        Left VersionMismatch backend
mismatch -> EventStoreError backend -> Either (EventStoreError backend) ()
forall a b. a -> Either a b
Left (EventStoreError backend -> Either (EventStoreError backend) ())
-> EventStoreError backend -> Either (EventStoreError backend) ()
forall a b. (a -> b) -> a -> b
$ ConsistencyErrorInfo backend -> EventStoreError backend
forall backend.
ConsistencyErrorInfo backend -> EventStoreError backend
ConsistencyError (ConsistencyErrorInfo backend -> EventStoreError backend)
-> ConsistencyErrorInfo backend -> EventStoreError backend
forall a b. (a -> b) -> a -> b
$ [VersionMismatch backend] -> ConsistencyErrorInfo backend
forall backend.
[VersionMismatch backend] -> ConsistencyErrorInfo backend
ConsistencyErrorInfo [VersionMismatch backend
mismatch]
        Right () -> () -> Either (EventStoreError backend) ()
forall a b. b -> Either a b
Right ()

{- | Insert all events into state
Returns: (new state, final global cursor, per-stream cursors)
-}
insertAllEvents ::
    forall backend t.
    (StoreCursor backend, Foldable t) =>
    StoreState backend ->
    Maybe CorrelationId ->
    UTCTime ->
    [EventId] ->
    Map StreamId (StreamWrite t SomeLatestEvent backend) ->
    (StoreState backend, Cursor backend, Map StreamId (Cursor backend))
insertAllEvents :: forall backend (t :: * -> *).
(StoreCursor backend, Foldable t) =>
StoreState backend
-> Maybe CorrelationId
-> UTCTime
-> [EventId]
-> Map StreamId (StreamWrite t SomeLatestEvent backend)
-> (StoreState backend, Cursor backend,
    Map StreamId (Cursor backend))
insertAllEvents StoreState backend
state Maybe CorrelationId
mbCorrId UTCTime
now [EventId]
eventIds Map StreamId (StreamWrite t SomeLatestEvent backend)
batches =
    let
        -- Calculate batch sizes and starting sequence numbers for each batch
        batchSizes :: [Int]
batchSizes = (StreamWrite t SomeLatestEvent backend -> Int)
-> [StreamWrite t SomeLatestEvent backend] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (t SomeLatestEvent -> Int
forall a. t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (t SomeLatestEvent -> Int)
-> (StreamWrite t SomeLatestEvent backend -> t SomeLatestEvent)
-> StreamWrite t SomeLatestEvent backend
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (.events)) ([StreamWrite t SomeLatestEvent backend] -> [Int])
-> [StreamWrite t SomeLatestEvent backend] -> [Int]
forall a b. (a -> b) -> a -> b
$ Map StreamId (StreamWrite t SomeLatestEvent backend)
-> [StreamWrite t SomeLatestEvent backend]
forall k a. Map k a -> [a]
Map.elems Map StreamId (StreamWrite t SomeLatestEvent backend)
batches
        batchStartSeqs :: [Integer]
batchStartSeqs = (Integer -> Integer -> Integer)
-> Integer -> [Integer] -> [Integer]
forall b a. (b -> a -> b) -> b -> [a] -> [b]
scanl Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
(+) StoreState backend
state.nextSequence ([Integer] -> [Integer]) -> [Integer] -> [Integer]
forall a b. (a -> b) -> a -> b
$ (Int -> Integer) -> [Int] -> [Integer]
forall a b. (a -> b) -> [a] -> [b]
map Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral ([Int] -> [Integer]) -> [Int] -> [Integer]
forall a b. (a -> b) -> a -> b
$ [Int] -> [Int]
forall a. HasCallStack => [a] -> [a]
init [Int]
batchSizes

        -- Generate events with proper sequence numbers for each batch
        ([StoredEvent]
allEvents, [Integer]
seqNos) =
            [(StoredEvent, Integer)] -> ([StoredEvent], [Integer])
forall a b. [(a, b)] -> ([a], [b])
unzip ([(StoredEvent, Integer)] -> ([StoredEvent], [Integer]))
-> [(StoredEvent, Integer)] -> ([StoredEvent], [Integer])
forall a b. (a -> b) -> a -> b
$
                [[(StoredEvent, Integer)]] -> [(StoredEvent, Integer)]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
                    [ (([StoredEvent] -> [Integer] -> [(StoredEvent, Integer)])
-> ([StoredEvent], [Integer]) -> [(StoredEvent, Integer)]
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry [StoredEvent] -> [Integer] -> [(StoredEvent, Integer)]
forall a b. [a] -> [b] -> [(a, b)]
zip) (([StoredEvent], [Integer]) -> [(StoredEvent, Integer)])
-> ([StoredEvent], [Integer]) -> [(StoredEvent, Integer)]
forall a b. (a -> b) -> a -> b
$ StoreState backend
-> Maybe CorrelationId
-> UTCTime
-> [EventId]
-> StreamId
-> StreamWrite t SomeLatestEvent backend
-> ([StoredEvent], [Integer])
forall (t :: * -> *) backend.
Foldable t =>
StoreState backend
-> Maybe CorrelationId
-> UTCTime
-> [EventId]
-> StreamId
-> StreamWrite t SomeLatestEvent backend
-> ([StoredEvent], [Integer])
makeStoredEvents (StoreState backend
state{nextSequence = startSeq} :: StoreState backend) Maybe CorrelationId
mbCorrId UTCTime
now [EventId]
eventIdsForBatch StreamId
streamId StreamWrite t SomeLatestEvent backend
batch
                    | (StreamId
streamId, StreamWrite t SomeLatestEvent backend
batch, [EventId]
eventIdsForBatch, Integer
startSeq) <-
                        [StreamId]
-> [StreamWrite t SomeLatestEvent backend]
-> [[EventId]]
-> [Integer]
-> [(StreamId, StreamWrite t SomeLatestEvent backend, [EventId],
     Integer)]
forall a b c d. [a] -> [b] -> [c] -> [d] -> [(a, b, c, d)]
zip4
                            (Map StreamId (StreamWrite t SomeLatestEvent backend) -> [StreamId]
forall k a. Map k a -> [k]
Map.keys Map StreamId (StreamWrite t SomeLatestEvent backend)
batches)
                            (Map StreamId (StreamWrite t SomeLatestEvent backend)
-> [StreamWrite t SomeLatestEvent backend]
forall k a. Map k a -> [a]
Map.elems Map StreamId (StreamWrite t SomeLatestEvent backend)
batches)
                            ([Int] -> [EventId] -> [[EventId]]
forall a. [Int] -> [a] -> [[a]]
chunksOf [Int]
batchSizes [EventId]
eventIds)
                            [Integer]
batchStartSeqs
                    ]

        -- Update state with new events and metadata
        newStreamVersions :: Map StreamId (Cursor backend)
newStreamVersions = (Map StreamId (Cursor backend)
 -> StoredEvent -> Map StreamId (Cursor backend))
-> Map StreamId (Cursor backend)
-> [StoredEvent]
-> Map StreamId (Cursor backend)
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' Map StreamId (Cursor backend)
-> StoredEvent -> Map StreamId (Cursor backend)
forall {k} {backend} {r}.
(Ord k, StoreCursor backend, HasField "streamId" r k,
 HasField "seqNo" r Integer) =>
Map k (Cursor backend) -> r -> Map k (Cursor backend)
updateStreamVersions StoreState backend
state.streamVersions [StoredEvent]
allEvents
        finalState :: StoreState backend
finalState =
            StoreState backend
state
                { nextSequence = state.nextSequence + fromIntegral (length allEvents)
                , events = Map.union state.events (Map.fromList $ zip seqNos allEvents)
                , streamEvents = foldr updateStreamEvents state.streamEvents allEvents
                , streamVersions = newStreamVersions
                , streamLocalVersions = foldl' updateStreamLocalVersions state.streamLocalVersions allEvents
                }

        -- Return cursor pointing to last inserted event, or previous position if no events
        finalCursor :: Cursor backend
finalCursor = case [Integer]
seqNos of
            [] -> Integer -> Cursor backend
forall backend. StoreCursor backend => Integer -> Cursor backend
makeCursor (StoreState backend
state.nextSequence Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
1) -- No events inserted
            [Integer]
_ -> Integer -> Cursor backend
forall backend. StoreCursor backend => Integer -> Cursor backend
makeCursor (Integer -> Cursor backend) -> Integer -> Cursor backend
forall a b. (a -> b) -> a -> b
$ [Integer] -> Integer
forall a. HasCallStack => [a] -> a
last [Integer]
seqNos -- Last inserted event

        -- Extract per-stream cursors for streams that were written to
        perStreamCursors :: Map StreamId (Cursor backend)
perStreamCursors = Map StreamId (Cursor backend)
-> Set StreamId -> Map StreamId (Cursor backend)
forall k a. Ord k => Map k a -> Set k -> Map k a
Map.restrictKeys Map StreamId (Cursor backend)
newStreamVersions (Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Set StreamId
forall k a. Map k a -> Set k
Map.keysSet Map StreamId (StreamWrite t SomeLatestEvent backend)
batches)
     in
        (StoreState backend
finalState, Cursor backend
finalCursor, Map StreamId (Cursor backend)
perStreamCursors)
  where
    -- Helper function to update stream events mapping
    updateStreamEvents :: r -> Map k [a] -> Map k [a]
updateStreamEvents r
e =
        (Maybe [a] -> Maybe [a]) -> k -> Map k [a] -> Map k [a]
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter
            ([a] -> Maybe [a]
forall a. a -> Maybe a
Just ([a] -> Maybe [a]) -> (Maybe [a] -> [a]) -> Maybe [a] -> Maybe [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> ([a] -> [a]) -> Maybe [a] -> [a]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [r
e.seqNo] (r
e.seqNo a -> [a] -> [a]
forall a. a -> [a] -> [a]
:))
            r
e.streamId

    -- Helper function to update stream versions (left fold: acc first, event second)
    updateStreamVersions :: Map k (Cursor backend) -> r -> Map k (Cursor backend)
updateStreamVersions Map k (Cursor backend)
acc r
e =
        k
-> Cursor backend
-> Map k (Cursor backend)
-> Map k (Cursor backend)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert
            r
e.streamId
            (Integer -> Cursor backend
forall backend. StoreCursor backend => Integer -> Cursor backend
makeCursor r
e.seqNo)
            Map k (Cursor backend)
acc

    -- Helper function to update stream local versions (left fold: acc first, event second)
    updateStreamLocalVersions :: Map k a -> r -> Map k a
updateStreamLocalVersions Map k a
acc r
e =
        k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert
            r
e.streamId
            r
e.streamVersion
            Map k a
acc

    -- Helper function to chunk list based on sizes
    chunksOf :: [Int] -> [a] -> [[a]]
    chunksOf :: forall a. [Int] -> [a] -> [[a]]
chunksOf [] [a]
_ = []
    chunksOf (Int
n : [Int]
ns) [a]
xs = Int -> [a] -> [a]
forall a. Int -> [a] -> [a]
take Int
n [a]
xs [a] -> [[a]] -> [[a]]
forall a. a -> [a] -> [a]
: [Int] -> [a] -> [[a]]
forall a. [Int] -> [a] -> [[a]]
chunksOf [Int]
ns (Int -> [a] -> [a]
forall a. Int -> [a] -> [a]
drop Int
n [a]
xs)

-- Add new function:

-- | Common implementation of event subscription
subscribeToEvents ::
    forall m backend ts.
    (MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
    TVar (StoreState backend) -> -- State variable
    EventMatcher ts backend m -> -- Event matcher
    EventSelector backend -> -- Event selector
    m (SubscriptionHandle backend)
subscribeToEvents :: forall (m :: * -> *) backend (ts :: [Symbol]).
(MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
TVar (StoreState backend)
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribeToEvents TVar (StoreState backend)
stateVar EventMatcher ts backend m
matcher EventSelector backend
selector = do
    -- Calculate initial sequence number
    startSeq <- case EventSelector backend
selector.startupPosition of
        StartupPosition backend
FromBeginning -> Integer -> m Integer
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (-Integer
1)
        FromLastProcessed Cursor backend
cursor -> Integer -> m Integer
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Integer -> m Integer) -> Integer -> m Integer
forall a b. (a -> b) -> a -> b
$ Cursor backend -> Integer
forall backend. StoreCursor backend => Cursor backend -> Integer
makeSequenceNo Cursor backend
cursor

    -- Get or create notification channel
    notifyVar <- liftIO $ atomically $ do
        state <- readTVar stateVar
        case selector.streamId of
            StreamSelector
AllStreams -> do
                TVar Integer -> STM (TVar Integer)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure StoreState backend
state.globalNotification
            SingleStream StreamId
sid -> do
                case StreamId -> Map StreamId (TVar Integer) -> Maybe (TVar Integer)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
sid StoreState backend
state.streamNotifications of
                    Just TVar Integer
var -> TVar Integer -> STM (TVar Integer)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TVar Integer
var
                    Maybe (TVar Integer)
Nothing -> do
                        var <- Integer -> STM (TVar Integer)
forall a. a -> STM (TVar a)
newTVar Integer
startSeq
                        let newState =
                                StoreState backend
state{streamNotifications = Map.insert sid var state.streamNotifications}
                        writeTVar stateVar newState
                        pure var

    -- Main subscription loop
    let loop :: Integer -> m ()
        loop Integer
position = do
            eventResult <- IO [StoredEvent] -> m [StoredEvent]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [StoredEvent] -> m [StoredEvent])
-> IO [StoredEvent] -> m [StoredEvent]
forall a b. (a -> b) -> a -> b
$ STM [StoredEvent] -> IO [StoredEvent]
forall a. STM a -> IO a
atomically (STM [StoredEvent] -> IO [StoredEvent])
-> STM [StoredEvent] -> IO [StoredEvent]
forall a b. (a -> b) -> a -> b
$ do
                relevantEvents <- case EventSelector backend
selector.streamId of
                    StreamSelector
AllStreams -> do
                        state <- TVar (StoreState backend) -> STM (StoreState backend)
forall a. TVar a -> STM a
readTVar TVar (StoreState backend)
stateVar
                        pure $ sortOn seqNo [ev | (seqNo, ev) <- Map.toList state.events, seqNo > position]
                    SingleStream StreamId
sid -> do
                        state <- TVar (StoreState backend) -> STM (StoreState backend)
forall a. TVar a -> STM a
readTVar TVar (StoreState backend)
stateVar
                        pure $
                            sortOn
                                seqNo
                                [ ev
                                | seqNos <- toList $ Map.lookup sid state.streamEvents
                                , seqNo <- seqNos
                                , seqNo > position
                                , ev <- toList $ Map.lookup seqNo state.events
                                ]

                pure relevantEvents

            case eventResult of
                [] -> do
                    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                        curr <- TVar Integer -> STM Integer
forall a. TVar a -> STM a
readTVar TVar Integer
notifyVar
                        when (curr <= position) $ do
                            retry
                    Integer -> m ()
loop Integer
position
                [StoredEvent]
events -> do
                    let maxSeq :: Integer
maxSeq = [Integer] -> Integer
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum ([Integer] -> Integer) -> [Integer] -> Integer
forall a b. (a -> b) -> a -> b
$ (StoredEvent -> Integer) -> [StoredEvent] -> [Integer]
forall a b. (a -> b) -> [a] -> [b]
map StoredEvent -> Integer
seqNo [StoredEvent]
events
                    result <- EventMatcher ts backend m -> [StoredEvent] -> m SubscriptionResult
forall (ts :: [Symbol]) (m :: * -> *) backend.
(MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
EventMatcher ts backend m -> [StoredEvent] -> m SubscriptionResult
processEvents EventMatcher ts backend m
matcher [StoredEvent]
events
                    case result of
                        SubscriptionResult
Stop -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                        SubscriptionResult
Continue ->
                            Integer -> m ()
loop Integer
maxSeq

    -- Start subscription in an async thread for proper cancellation
    withRunInIO $ \forall a. m a -> IO a
runInIO -> do
        workerThread <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ m () -> IO ()
forall a. m a -> IO a
runInIO (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ Integer -> m ()
loop Integer
startSeq
        pure $
            SubscriptionHandle
                { cancel = cancel workerThread
                , wait = wait workerThread
                }