{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
module Hindsight.Store.Memory.Internal (
StoreState (..),
StoredEvent (..),
StoreCursor (..),
checkVersionConstraint,
getCurrentVersion,
getCurrentStreamVersion,
processEvents,
processEventThroughMatchers,
updateState,
makeStoredEvents,
checkAllVersions,
insertAllEvents,
subscribeToEvents,
)
where
import Control.Concurrent.Async (async, cancel, wait)
import Control.Concurrent.STM (
TVar,
atomically,
newTVar,
readTVar,
retry,
writeTVar,
)
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO (..))
import Data.Aeson (FromJSON (..), ToJSON (..), Value (..))
import Data.Foldable (toList)
import Data.List (sortOn, zip4)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Proxy (Proxy)
import Data.Text (Text)
import Data.Text qualified as T
import Data.Time (UTCTime)
import GHC.Generics (Generic)
import Hindsight.Events
import Hindsight.Store
import Hindsight.Store.Parsing (parseStoredEventToEnvelope)
import UnliftIO (MonadUnliftIO, withRunInIO)
import UnliftIO.Exception (SomeException, catch, throwIO)
data StoredEvent = StoredEvent
{ StoredEvent -> Integer
seqNo :: Integer
, StoredEvent -> EventId
eventId :: EventId
, StoredEvent -> StreamId
streamId :: StreamId
, StoredEvent -> Maybe CorrelationId
correlationId :: Maybe CorrelationId
, StoredEvent -> UTCTime
createdAt :: UTCTime
, StoredEvent -> Text
eventName :: Text
, StoredEvent -> Integer
eventVersion :: Integer
, StoredEvent -> Value
payload :: Value
, StoredEvent -> StreamVersion
streamVersion :: StreamVersion
}
deriving (Int -> StoredEvent -> ShowS
[StoredEvent] -> ShowS
StoredEvent -> String
(Int -> StoredEvent -> ShowS)
-> (StoredEvent -> String)
-> ([StoredEvent] -> ShowS)
-> Show StoredEvent
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> StoredEvent -> ShowS
showsPrec :: Int -> StoredEvent -> ShowS
$cshow :: StoredEvent -> String
show :: StoredEvent -> String
$cshowList :: [StoredEvent] -> ShowS
showList :: [StoredEvent] -> ShowS
Show, StoredEvent -> StoredEvent -> Bool
(StoredEvent -> StoredEvent -> Bool)
-> (StoredEvent -> StoredEvent -> Bool) -> Eq StoredEvent
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: StoredEvent -> StoredEvent -> Bool
== :: StoredEvent -> StoredEvent -> Bool
$c/= :: StoredEvent -> StoredEvent -> Bool
/= :: StoredEvent -> StoredEvent -> Bool
Eq, (forall x. StoredEvent -> Rep StoredEvent x)
-> (forall x. Rep StoredEvent x -> StoredEvent)
-> Generic StoredEvent
forall x. Rep StoredEvent x -> StoredEvent
forall x. StoredEvent -> Rep StoredEvent x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. StoredEvent -> Rep StoredEvent x
from :: forall x. StoredEvent -> Rep StoredEvent x
$cto :: forall x. Rep StoredEvent x -> StoredEvent
to :: forall x. Rep StoredEvent x -> StoredEvent
Generic, Maybe StoredEvent
Value -> Parser [StoredEvent]
Value -> Parser StoredEvent
(Value -> Parser StoredEvent)
-> (Value -> Parser [StoredEvent])
-> Maybe StoredEvent
-> FromJSON StoredEvent
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser StoredEvent
parseJSON :: Value -> Parser StoredEvent
$cparseJSONList :: Value -> Parser [StoredEvent]
parseJSONList :: Value -> Parser [StoredEvent]
$comittedField :: Maybe StoredEvent
omittedField :: Maybe StoredEvent
FromJSON, [StoredEvent] -> Value
[StoredEvent] -> Encoding
StoredEvent -> Bool
StoredEvent -> Value
StoredEvent -> Encoding
(StoredEvent -> Value)
-> (StoredEvent -> Encoding)
-> ([StoredEvent] -> Value)
-> ([StoredEvent] -> Encoding)
-> (StoredEvent -> Bool)
-> ToJSON StoredEvent
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: StoredEvent -> Value
toJSON :: StoredEvent -> Value
$ctoEncoding :: StoredEvent -> Encoding
toEncoding :: StoredEvent -> Encoding
$ctoJSONList :: [StoredEvent] -> Value
toJSONList :: [StoredEvent] -> Value
$ctoEncodingList :: [StoredEvent] -> Encoding
toEncodingList :: [StoredEvent] -> Encoding
$comitField :: StoredEvent -> Bool
omitField :: StoredEvent -> Bool
ToJSON)
data StoreState backend = StoreState
{ forall backend. StoreState backend -> Integer
nextSequence :: Integer
, forall backend. StoreState backend -> Map Integer StoredEvent
events :: Map Integer StoredEvent
, forall backend. StoreState backend -> Map StreamId [Integer]
streamEvents :: Map StreamId [Integer]
, forall backend. StoreState backend -> Map StreamId (Cursor backend)
streamVersions :: Map StreamId (Cursor backend)
, forall backend. StoreState backend -> Map StreamId StreamVersion
streamLocalVersions :: Map StreamId StreamVersion
, forall backend. StoreState backend -> Map StreamId (TVar Integer)
streamNotifications :: Map StreamId (TVar Integer)
, forall backend. StoreState backend -> TVar Integer
globalNotification :: TVar Integer
}
deriving instance (Eq (Cursor backend)) => Eq (StoreState backend)
class StoreCursor backend where
makeCursor :: Integer -> Cursor backend
makeSequenceNo :: Cursor backend -> Integer
checkVersionConstraint ::
(Eq (Cursor backend)) =>
StoreState backend ->
StreamId ->
ExpectedVersion backend ->
Either (VersionMismatch backend) ()
checkVersionConstraint :: forall backend.
Eq (Cursor backend) =>
StoreState backend
-> StreamId
-> ExpectedVersion backend
-> Either (VersionMismatch backend) ()
checkVersionConstraint StoreState backend
state StreamId
streamId ExpectedVersion backend
verExpectation = case ExpectedVersion backend
verExpectation of
ExpectedVersion backend
Any -> () -> Either (VersionMismatch backend) ()
forall a b. b -> Either a b
Right ()
ExpectedVersion backend
NoStream
| StreamId -> Map StreamId (Cursor backend) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member StreamId
streamId StoreState backend
state.streamVersions ->
VersionMismatch backend -> Either (VersionMismatch backend) ()
forall a b. a -> Either a b
Left (VersionMismatch backend -> Either (VersionMismatch backend) ())
-> VersionMismatch backend -> Either (VersionMismatch backend) ()
forall a b. (a -> b) -> a -> b
$ StreamId
-> ExpectedVersion backend
-> Maybe (Cursor backend)
-> VersionMismatch backend
forall backend.
StreamId
-> ExpectedVersion backend
-> Maybe (Cursor backend)
-> VersionMismatch backend
VersionMismatch StreamId
streamId ExpectedVersion backend
verExpectation (StoreState backend -> StreamId -> Maybe (Cursor backend)
forall backend.
StoreState backend -> StreamId -> Maybe (Cursor backend)
getCurrentVersion StoreState backend
state StreamId
streamId)
| Bool
otherwise -> () -> Either (VersionMismatch backend) ()
forall a b. b -> Either a b
Right ()
ExpectedVersion backend
StreamExists
| StreamId -> Map StreamId (Cursor backend) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member StreamId
streamId StoreState backend
state.streamVersions -> () -> Either (VersionMismatch backend) ()
forall a b. b -> Either a b
Right ()
| Bool
otherwise ->
VersionMismatch backend -> Either (VersionMismatch backend) ()
forall a b. a -> Either a b
Left (VersionMismatch backend -> Either (VersionMismatch backend) ())
-> VersionMismatch backend -> Either (VersionMismatch backend) ()
forall a b. (a -> b) -> a -> b
$ StreamId
-> ExpectedVersion backend
-> Maybe (Cursor backend)
-> VersionMismatch backend
forall backend.
StreamId
-> ExpectedVersion backend
-> Maybe (Cursor backend)
-> VersionMismatch backend
VersionMismatch StreamId
streamId ExpectedVersion backend
verExpectation Maybe (Cursor backend)
forall a. Maybe a
Nothing
ExactVersion Cursor backend
expected ->
let actual :: Maybe (Cursor backend)
actual = StoreState backend -> StreamId -> Maybe (Cursor backend)
forall backend.
StoreState backend -> StreamId -> Maybe (Cursor backend)
getCurrentVersion StoreState backend
state StreamId
streamId
in if Cursor backend -> Maybe (Cursor backend)
forall a. a -> Maybe a
Just Cursor backend
expected Maybe (Cursor backend) -> Maybe (Cursor backend) -> Bool
forall a. Eq a => a -> a -> Bool
== Maybe (Cursor backend)
actual
then () -> Either (VersionMismatch backend) ()
forall a b. b -> Either a b
Right ()
else VersionMismatch backend -> Either (VersionMismatch backend) ()
forall a b. a -> Either a b
Left (VersionMismatch backend -> Either (VersionMismatch backend) ())
-> VersionMismatch backend -> Either (VersionMismatch backend) ()
forall a b. (a -> b) -> a -> b
$ StreamId
-> ExpectedVersion backend
-> Maybe (Cursor backend)
-> VersionMismatch backend
forall backend.
StreamId
-> ExpectedVersion backend
-> Maybe (Cursor backend)
-> VersionMismatch backend
VersionMismatch StreamId
streamId ExpectedVersion backend
verExpectation Maybe (Cursor backend)
actual
ExactStreamVersion StreamVersion
expectedStreamVersion ->
let actualStreamVersion :: Maybe StreamVersion
actualStreamVersion = StoreState backend -> StreamId -> Maybe StreamVersion
forall backend.
StoreState backend -> StreamId -> Maybe StreamVersion
getCurrentStreamVersion StoreState backend
state StreamId
streamId
actualCursor :: Maybe (Cursor backend)
actualCursor = StoreState backend -> StreamId -> Maybe (Cursor backend)
forall backend.
StoreState backend -> StreamId -> Maybe (Cursor backend)
getCurrentVersion StoreState backend
state StreamId
streamId
in if StreamVersion -> Maybe StreamVersion
forall a. a -> Maybe a
Just StreamVersion
expectedStreamVersion Maybe StreamVersion -> Maybe StreamVersion -> Bool
forall a. Eq a => a -> a -> Bool
== Maybe StreamVersion
actualStreamVersion
then () -> Either (VersionMismatch backend) ()
forall a b. b -> Either a b
Right ()
else VersionMismatch backend -> Either (VersionMismatch backend) ()
forall a b. a -> Either a b
Left (VersionMismatch backend -> Either (VersionMismatch backend) ())
-> VersionMismatch backend -> Either (VersionMismatch backend) ()
forall a b. (a -> b) -> a -> b
$ StreamId
-> ExpectedVersion backend
-> Maybe (Cursor backend)
-> VersionMismatch backend
forall backend.
StreamId
-> ExpectedVersion backend
-> Maybe (Cursor backend)
-> VersionMismatch backend
VersionMismatch StreamId
streamId ExpectedVersion backend
verExpectation Maybe (Cursor backend)
actualCursor
getCurrentVersion :: StoreState backend -> StreamId -> Maybe (Cursor backend)
getCurrentVersion :: forall backend.
StoreState backend -> StreamId -> Maybe (Cursor backend)
getCurrentVersion StoreState backend
state StreamId
streamId =
StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamId StoreState backend
state.streamVersions
getCurrentStreamVersion :: StoreState backend -> StreamId -> Maybe StreamVersion
getCurrentStreamVersion :: forall backend.
StoreState backend -> StreamId -> Maybe StreamVersion
getCurrentStreamVersion StoreState backend
state StreamId
streamId =
StreamId -> Map StreamId StreamVersion -> Maybe StreamVersion
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamId StoreState backend
state.streamLocalVersions
processEvents ::
forall ts m backend.
(MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
EventMatcher ts backend m ->
[StoredEvent] ->
m SubscriptionResult
processEvents :: forall (ts :: [Symbol]) (m :: * -> *) backend.
(MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
EventMatcher ts backend m -> [StoredEvent] -> m SubscriptionResult
processEvents EventMatcher ts backend m
MatchEnd [StoredEvent]
_ = SubscriptionResult -> m SubscriptionResult
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
processEvents ((Proxy event, EventHandler event m backend)
matcher :? EventMatcher ts1 backend m
rest) [StoredEvent]
events = do
let processAllEvents :: [StoredEvent] -> m SubscriptionResult
processAllEvents [] = SubscriptionResult -> m SubscriptionResult
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
processAllEvents (StoredEvent
event : [StoredEvent]
remaining) = do
result <- (Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> StoredEvent
-> m SubscriptionResult
forall (m :: * -> *) backend (event :: Symbol) (ts :: [Symbol]).
(MonadUnliftIO m, Event event, StoreCursor backend,
Show (Cursor backend)) =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts backend m -> StoredEvent -> m SubscriptionResult
processEventThroughMatchers (Proxy event, EventHandler event m backend)
matcher EventMatcher ts1 backend m
rest StoredEvent
event
case result of
SubscriptionResult
Stop -> SubscriptionResult -> m SubscriptionResult
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Stop
SubscriptionResult
Continue -> [StoredEvent] -> m SubscriptionResult
processAllEvents [StoredEvent]
remaining
in [StoredEvent] -> m SubscriptionResult
processAllEvents [StoredEvent]
events
processEventThroughMatchers ::
forall m backend event ts.
(MonadUnliftIO m, Event event, StoreCursor backend, Show (Cursor backend)) =>
(Proxy event, EventHandler event m backend) ->
EventMatcher ts backend m ->
StoredEvent ->
m SubscriptionResult
processEventThroughMatchers :: forall (m :: * -> *) backend (event :: Symbol) (ts :: [Symbol]).
(MonadUnliftIO m, Event event, StoreCursor backend,
Show (Cursor backend)) =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts backend m -> StoredEvent -> m SubscriptionResult
processEventThroughMatchers (Proxy event
proxy, EventHandler event m backend
handler) EventMatcher ts backend m
rest StoredEvent
event = do
let targetEvent :: Text
targetEvent = Proxy event -> Text
forall (event :: Symbol). KnownSymbol event => Proxy event -> Text
getEventName Proxy event
proxy
if StoredEvent
event.eventName Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
targetEvent
then case Proxy event
-> EventId
-> StreamId
-> Cursor backend
-> StreamVersion
-> Maybe CorrelationId
-> UTCTime
-> Value
-> Integer
-> Maybe (EventEnvelope event backend)
forall (event :: Symbol) backend.
Event event =>
Proxy event
-> EventId
-> StreamId
-> Cursor backend
-> StreamVersion
-> Maybe CorrelationId
-> UTCTime
-> Value
-> Integer
-> Maybe (EventEnvelope event backend)
parseStoredEventToEnvelope
Proxy event
proxy
StoredEvent
event.eventId
StoredEvent
event.streamId
(Integer -> Cursor backend
forall backend. StoreCursor backend => Integer -> Cursor backend
makeCursor StoredEvent
event.seqNo)
StoredEvent
event.streamVersion
StoredEvent
event.correlationId
StoredEvent
event.createdAt
StoredEvent
event.payload
StoredEvent
event.eventVersion of
Just EventEnvelope event backend
envelope ->
(EventHandler event m backend
handler EventEnvelope event backend
envelope) m SubscriptionResult
-> (SomeException -> m SubscriptionResult) -> m SubscriptionResult
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(SomeException
e :: SomeException) ->
HandlerException -> m SubscriptionResult
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (HandlerException -> m SubscriptionResult)
-> HandlerException -> m SubscriptionResult
forall a b. (a -> b) -> a -> b
$
HandlerException
{ originalException :: SomeException
originalException = SomeException
e
, failedEventPosition :: Text
failedEventPosition = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Cursor backend -> String
forall a. Show a => a -> String
show (forall backend. StoreCursor backend => Integer -> Cursor backend
makeCursor @backend StoredEvent
event.seqNo)
, failedEventId :: EventId
failedEventId = StoredEvent
event.eventId
, failedEventName :: Text
failedEventName = StoredEvent
event.eventName
, failedEventStreamId :: StreamId
failedEventStreamId = StoredEvent
event.streamId
, failedEventStreamVersion :: StreamVersion
failedEventStreamVersion = StoredEvent
event.streamVersion
, failedEventCorrelationId :: Maybe CorrelationId
failedEventCorrelationId = StoredEvent
event.correlationId
, failedEventCreatedAt :: UTCTime
failedEventCreatedAt = StoredEvent
event.createdAt
}
Maybe (EventEnvelope event backend)
Nothing -> EventMatcher ts backend m -> [StoredEvent] -> m SubscriptionResult
forall (ts :: [Symbol]) (m :: * -> *) backend.
(MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
EventMatcher ts backend m -> [StoredEvent] -> m SubscriptionResult
processEvents EventMatcher ts backend m
rest [StoredEvent
event]
else EventMatcher ts backend m -> [StoredEvent] -> m SubscriptionResult
forall (ts :: [Symbol]) (m :: * -> *) backend.
(MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
EventMatcher ts backend m -> [StoredEvent] -> m SubscriptionResult
processEvents EventMatcher ts backend m
rest [StoredEvent
event]
updateState :: forall backend. (StoreCursor backend) => StoredEvent -> StoreState backend -> StoreState backend
updateState :: forall backend.
StoreCursor backend =>
StoredEvent -> StoreState backend -> StoreState backend
updateState StoredEvent
event StoreState backend
state =
StoreState backend
state
{ events = Map.insert event.seqNo event state.events
, streamEvents = Map.alter (Just . maybe [event.seqNo] (event.seqNo :)) event.streamId state.streamEvents
, streamVersions = Map.insert event.streamId (makeCursor event.seqNo) state.streamVersions
, streamLocalVersions = Map.insert event.streamId event.streamVersion state.streamLocalVersions
, globalNotification = state.globalNotification
}
makeStoredEvents ::
forall t backend.
(Foldable t) =>
StoreState backend ->
Maybe CorrelationId ->
UTCTime ->
[EventId] ->
StreamId ->
StreamWrite t SomeLatestEvent backend ->
([StoredEvent], [Integer])
makeStoredEvents :: forall (t :: * -> *) backend.
Foldable t =>
StoreState backend
-> Maybe CorrelationId
-> UTCTime
-> [EventId]
-> StreamId
-> StreamWrite t SomeLatestEvent backend
-> ([StoredEvent], [Integer])
makeStoredEvents StoreState backend
state Maybe CorrelationId
mbCorrId UTCTime
now [EventId]
eventIds StreamId
streamId StreamWrite t SomeLatestEvent backend
batch =
let baseSeq :: Integer
baseSeq = StoreState backend
state.nextSequence
seqNos :: [Integer]
seqNos = [Integer
baseSeq .. Integer
baseSeq Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral (t SomeLatestEvent -> Int
forall a. t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length StreamWrite t SomeLatestEvent backend
batch.events) Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
1]
currentStreamVersion :: StreamVersion
currentStreamVersion = StreamVersion
-> StreamId -> Map StreamId StreamVersion -> StreamVersion
forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault (Int64 -> StreamVersion
StreamVersion Int64
0) StreamId
streamId StoreState backend
state.streamLocalVersions
streamVersions :: [StreamVersion]
streamVersions = [StreamVersion
currentStreamVersion StreamVersion -> StreamVersion -> StreamVersion
forall a. Num a => a -> a -> a
+ StreamVersion
1 .. StreamVersion
currentStreamVersion StreamVersion -> StreamVersion -> StreamVersion
forall a. Num a => a -> a -> a
+ Int -> StreamVersion
forall a b. (Integral a, Num b) => a -> b
fromIntegral (t SomeLatestEvent -> Int
forall a. t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length StreamWrite t SomeLatestEvent backend
batch.events)]
mkStoredEvent :: (Integer, (EventId, SomeLatestEvent), StreamVersion)
-> (StoredEvent, Integer)
mkStoredEvent (Integer
sn, (EventId
eid, SomeLatestEvent Proxy event
proxy CurrentPayloadType event
payload), StreamVersion
streamVer) =
let name :: Text
name = Proxy event -> Text
forall (event :: Symbol). KnownSymbol event => Proxy event -> Text
getEventName Proxy event
proxy
version :: Integer
version = Integer -> Integer
forall a. Num a => Integer -> a
fromInteger (Integer -> Integer) -> Integer -> Integer
forall a b. (a -> b) -> a -> b
$ Proxy event -> Integer
forall (event :: Symbol). Event event => Proxy event -> Integer
getMaxVersion Proxy event
proxy
in ( StoredEvent
{ seqNo :: Integer
seqNo = Integer
sn
, eventId :: EventId
eventId = EventId
eid
, streamId :: StreamId
streamId = StreamId
streamId
, correlationId :: Maybe CorrelationId
correlationId = Maybe CorrelationId
mbCorrId
, createdAt :: UTCTime
createdAt = UTCTime
now
, eventName :: Text
eventName = Text
name
, eventVersion :: Integer
eventVersion = Integer
version
, payload :: Value
payload = FinalVersionType (FromList (Versions event)) -> Value
forall a. ToJSON a => a -> Value
toJSON FinalVersionType (FromList (Versions event))
CurrentPayloadType event
payload
, streamVersion :: StreamVersion
streamVersion = StreamVersion
streamVer
}
, Integer
sn
)
in [(StoredEvent, Integer)] -> ([StoredEvent], [Integer])
forall a b. [(a, b)] -> ([a], [b])
unzip ([(StoredEvent, Integer)] -> ([StoredEvent], [Integer]))
-> [(StoredEvent, Integer)] -> ([StoredEvent], [Integer])
forall a b. (a -> b) -> a -> b
$ ((Integer, (EventId, SomeLatestEvent), StreamVersion)
-> (StoredEvent, Integer))
-> [(Integer, (EventId, SomeLatestEvent), StreamVersion)]
-> [(StoredEvent, Integer)]
forall a b. (a -> b) -> [a] -> [b]
map (Integer, (EventId, SomeLatestEvent), StreamVersion)
-> (StoredEvent, Integer)
mkStoredEvent ([(Integer, (EventId, SomeLatestEvent), StreamVersion)]
-> [(StoredEvent, Integer)])
-> [(Integer, (EventId, SomeLatestEvent), StreamVersion)]
-> [(StoredEvent, Integer)]
forall a b. (a -> b) -> a -> b
$ [Integer]
-> [(EventId, SomeLatestEvent)]
-> [StreamVersion]
-> [(Integer, (EventId, SomeLatestEvent), StreamVersion)]
forall a b c. [a] -> [b] -> [c] -> [(a, b, c)]
zip3 [Integer]
seqNos ([EventId] -> [SomeLatestEvent] -> [(EventId, SomeLatestEvent)]
forall a b. [a] -> [b] -> [(a, b)]
zip [EventId]
eventIds ([SomeLatestEvent] -> [(EventId, SomeLatestEvent)])
-> [SomeLatestEvent] -> [(EventId, SomeLatestEvent)]
forall a b. (a -> b) -> a -> b
$ t SomeLatestEvent -> [SomeLatestEvent]
forall a. t a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList StreamWrite t SomeLatestEvent backend
batch.events) [StreamVersion]
streamVersions
checkAllVersions ::
forall t backend.
(Eq (Cursor backend)) =>
StoreState backend ->
Map StreamId (StreamWrite t SomeLatestEvent backend) ->
Either (EventStoreError backend) ()
checkAllVersions :: forall (t :: * -> *) backend.
Eq (Cursor backend) =>
StoreState backend
-> Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Either (EventStoreError backend) ()
checkAllVersions StoreState backend
state Map StreamId (StreamWrite t SomeLatestEvent backend)
batches = do
case [Either (VersionMismatch backend) ()]
-> Either (VersionMismatch backend) ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_
[ StoreState backend
-> StreamId
-> ExpectedVersion backend
-> Either (VersionMismatch backend) ()
forall backend.
Eq (Cursor backend) =>
StoreState backend
-> StreamId
-> ExpectedVersion backend
-> Either (VersionMismatch backend) ()
checkVersionConstraint StoreState backend
state StreamId
streamId StreamWrite t SomeLatestEvent backend
batch.expectedVersion
| (StreamId
streamId, StreamWrite t SomeLatestEvent backend
batch) <- Map StreamId (StreamWrite t SomeLatestEvent backend)
-> [(StreamId, StreamWrite t SomeLatestEvent backend)]
forall k a. Map k a -> [(k, a)]
Map.toList Map StreamId (StreamWrite t SomeLatestEvent backend)
batches
] of
Left VersionMismatch backend
mismatch -> EventStoreError backend -> Either (EventStoreError backend) ()
forall a b. a -> Either a b
Left (EventStoreError backend -> Either (EventStoreError backend) ())
-> EventStoreError backend -> Either (EventStoreError backend) ()
forall a b. (a -> b) -> a -> b
$ ConsistencyErrorInfo backend -> EventStoreError backend
forall backend.
ConsistencyErrorInfo backend -> EventStoreError backend
ConsistencyError (ConsistencyErrorInfo backend -> EventStoreError backend)
-> ConsistencyErrorInfo backend -> EventStoreError backend
forall a b. (a -> b) -> a -> b
$ [VersionMismatch backend] -> ConsistencyErrorInfo backend
forall backend.
[VersionMismatch backend] -> ConsistencyErrorInfo backend
ConsistencyErrorInfo [VersionMismatch backend
mismatch]
Right () -> () -> Either (EventStoreError backend) ()
forall a b. b -> Either a b
Right ()
insertAllEvents ::
forall backend t.
(StoreCursor backend, Foldable t) =>
StoreState backend ->
Maybe CorrelationId ->
UTCTime ->
[EventId] ->
Map StreamId (StreamWrite t SomeLatestEvent backend) ->
(StoreState backend, Cursor backend, Map StreamId (Cursor backend))
insertAllEvents :: forall backend (t :: * -> *).
(StoreCursor backend, Foldable t) =>
StoreState backend
-> Maybe CorrelationId
-> UTCTime
-> [EventId]
-> Map StreamId (StreamWrite t SomeLatestEvent backend)
-> (StoreState backend, Cursor backend,
Map StreamId (Cursor backend))
insertAllEvents StoreState backend
state Maybe CorrelationId
mbCorrId UTCTime
now [EventId]
eventIds Map StreamId (StreamWrite t SomeLatestEvent backend)
batches =
let
batchSizes :: [Int]
batchSizes = (StreamWrite t SomeLatestEvent backend -> Int)
-> [StreamWrite t SomeLatestEvent backend] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (t SomeLatestEvent -> Int
forall a. t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (t SomeLatestEvent -> Int)
-> (StreamWrite t SomeLatestEvent backend -> t SomeLatestEvent)
-> StreamWrite t SomeLatestEvent backend
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (.events)) ([StreamWrite t SomeLatestEvent backend] -> [Int])
-> [StreamWrite t SomeLatestEvent backend] -> [Int]
forall a b. (a -> b) -> a -> b
$ Map StreamId (StreamWrite t SomeLatestEvent backend)
-> [StreamWrite t SomeLatestEvent backend]
forall k a. Map k a -> [a]
Map.elems Map StreamId (StreamWrite t SomeLatestEvent backend)
batches
batchStartSeqs :: [Integer]
batchStartSeqs = (Integer -> Integer -> Integer)
-> Integer -> [Integer] -> [Integer]
forall b a. (b -> a -> b) -> b -> [a] -> [b]
scanl Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
(+) StoreState backend
state.nextSequence ([Integer] -> [Integer]) -> [Integer] -> [Integer]
forall a b. (a -> b) -> a -> b
$ (Int -> Integer) -> [Int] -> [Integer]
forall a b. (a -> b) -> [a] -> [b]
map Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral ([Int] -> [Integer]) -> [Int] -> [Integer]
forall a b. (a -> b) -> a -> b
$ [Int] -> [Int]
forall a. HasCallStack => [a] -> [a]
init [Int]
batchSizes
([StoredEvent]
allEvents, [Integer]
seqNos) =
[(StoredEvent, Integer)] -> ([StoredEvent], [Integer])
forall a b. [(a, b)] -> ([a], [b])
unzip ([(StoredEvent, Integer)] -> ([StoredEvent], [Integer]))
-> [(StoredEvent, Integer)] -> ([StoredEvent], [Integer])
forall a b. (a -> b) -> a -> b
$
[[(StoredEvent, Integer)]] -> [(StoredEvent, Integer)]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
[ (([StoredEvent] -> [Integer] -> [(StoredEvent, Integer)])
-> ([StoredEvent], [Integer]) -> [(StoredEvent, Integer)]
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry [StoredEvent] -> [Integer] -> [(StoredEvent, Integer)]
forall a b. [a] -> [b] -> [(a, b)]
zip) (([StoredEvent], [Integer]) -> [(StoredEvent, Integer)])
-> ([StoredEvent], [Integer]) -> [(StoredEvent, Integer)]
forall a b. (a -> b) -> a -> b
$ StoreState backend
-> Maybe CorrelationId
-> UTCTime
-> [EventId]
-> StreamId
-> StreamWrite t SomeLatestEvent backend
-> ([StoredEvent], [Integer])
forall (t :: * -> *) backend.
Foldable t =>
StoreState backend
-> Maybe CorrelationId
-> UTCTime
-> [EventId]
-> StreamId
-> StreamWrite t SomeLatestEvent backend
-> ([StoredEvent], [Integer])
makeStoredEvents (StoreState backend
state{nextSequence = startSeq} :: StoreState backend) Maybe CorrelationId
mbCorrId UTCTime
now [EventId]
eventIdsForBatch StreamId
streamId StreamWrite t SomeLatestEvent backend
batch
| (StreamId
streamId, StreamWrite t SomeLatestEvent backend
batch, [EventId]
eventIdsForBatch, Integer
startSeq) <-
[StreamId]
-> [StreamWrite t SomeLatestEvent backend]
-> [[EventId]]
-> [Integer]
-> [(StreamId, StreamWrite t SomeLatestEvent backend, [EventId],
Integer)]
forall a b c d. [a] -> [b] -> [c] -> [d] -> [(a, b, c, d)]
zip4
(Map StreamId (StreamWrite t SomeLatestEvent backend) -> [StreamId]
forall k a. Map k a -> [k]
Map.keys Map StreamId (StreamWrite t SomeLatestEvent backend)
batches)
(Map StreamId (StreamWrite t SomeLatestEvent backend)
-> [StreamWrite t SomeLatestEvent backend]
forall k a. Map k a -> [a]
Map.elems Map StreamId (StreamWrite t SomeLatestEvent backend)
batches)
([Int] -> [EventId] -> [[EventId]]
forall a. [Int] -> [a] -> [[a]]
chunksOf [Int]
batchSizes [EventId]
eventIds)
[Integer]
batchStartSeqs
]
newStreamVersions :: Map StreamId (Cursor backend)
newStreamVersions = (Map StreamId (Cursor backend)
-> StoredEvent -> Map StreamId (Cursor backend))
-> Map StreamId (Cursor backend)
-> [StoredEvent]
-> Map StreamId (Cursor backend)
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' Map StreamId (Cursor backend)
-> StoredEvent -> Map StreamId (Cursor backend)
forall {k} {backend} {r}.
(Ord k, StoreCursor backend, HasField "streamId" r k,
HasField "seqNo" r Integer) =>
Map k (Cursor backend) -> r -> Map k (Cursor backend)
updateStreamVersions StoreState backend
state.streamVersions [StoredEvent]
allEvents
finalState :: StoreState backend
finalState =
StoreState backend
state
{ nextSequence = state.nextSequence + fromIntegral (length allEvents)
, events = Map.union state.events (Map.fromList $ zip seqNos allEvents)
, streamEvents = foldr updateStreamEvents state.streamEvents allEvents
, streamVersions = newStreamVersions
, streamLocalVersions = foldl' updateStreamLocalVersions state.streamLocalVersions allEvents
}
finalCursor :: Cursor backend
finalCursor = case [Integer]
seqNos of
[] -> Integer -> Cursor backend
forall backend. StoreCursor backend => Integer -> Cursor backend
makeCursor (StoreState backend
state.nextSequence Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
1)
[Integer]
_ -> Integer -> Cursor backend
forall backend. StoreCursor backend => Integer -> Cursor backend
makeCursor (Integer -> Cursor backend) -> Integer -> Cursor backend
forall a b. (a -> b) -> a -> b
$ [Integer] -> Integer
forall a. HasCallStack => [a] -> a
last [Integer]
seqNos
perStreamCursors :: Map StreamId (Cursor backend)
perStreamCursors = Map StreamId (Cursor backend)
-> Set StreamId -> Map StreamId (Cursor backend)
forall k a. Ord k => Map k a -> Set k -> Map k a
Map.restrictKeys Map StreamId (Cursor backend)
newStreamVersions (Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Set StreamId
forall k a. Map k a -> Set k
Map.keysSet Map StreamId (StreamWrite t SomeLatestEvent backend)
batches)
in
(StoreState backend
finalState, Cursor backend
finalCursor, Map StreamId (Cursor backend)
perStreamCursors)
where
updateStreamEvents :: r -> Map k [a] -> Map k [a]
updateStreamEvents r
e =
(Maybe [a] -> Maybe [a]) -> k -> Map k [a] -> Map k [a]
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter
([a] -> Maybe [a]
forall a. a -> Maybe a
Just ([a] -> Maybe [a]) -> (Maybe [a] -> [a]) -> Maybe [a] -> Maybe [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> ([a] -> [a]) -> Maybe [a] -> [a]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [r
e.seqNo] (r
e.seqNo a -> [a] -> [a]
forall a. a -> [a] -> [a]
:))
r
e.streamId
updateStreamVersions :: Map k (Cursor backend) -> r -> Map k (Cursor backend)
updateStreamVersions Map k (Cursor backend)
acc r
e =
k
-> Cursor backend
-> Map k (Cursor backend)
-> Map k (Cursor backend)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert
r
e.streamId
(Integer -> Cursor backend
forall backend. StoreCursor backend => Integer -> Cursor backend
makeCursor r
e.seqNo)
Map k (Cursor backend)
acc
updateStreamLocalVersions :: Map k a -> r -> Map k a
updateStreamLocalVersions Map k a
acc r
e =
k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert
r
e.streamId
r
e.streamVersion
Map k a
acc
chunksOf :: [Int] -> [a] -> [[a]]
chunksOf :: forall a. [Int] -> [a] -> [[a]]
chunksOf [] [a]
_ = []
chunksOf (Int
n : [Int]
ns) [a]
xs = Int -> [a] -> [a]
forall a. Int -> [a] -> [a]
take Int
n [a]
xs [a] -> [[a]] -> [[a]]
forall a. a -> [a] -> [a]
: [Int] -> [a] -> [[a]]
forall a. [Int] -> [a] -> [[a]]
chunksOf [Int]
ns (Int -> [a] -> [a]
forall a. Int -> [a] -> [a]
drop Int
n [a]
xs)
subscribeToEvents ::
forall m backend ts.
(MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
TVar (StoreState backend) ->
EventMatcher ts backend m ->
EventSelector backend ->
m (SubscriptionHandle backend)
subscribeToEvents :: forall (m :: * -> *) backend (ts :: [Symbol]).
(MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
TVar (StoreState backend)
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribeToEvents TVar (StoreState backend)
stateVar EventMatcher ts backend m
matcher EventSelector backend
selector = do
startSeq <- case EventSelector backend
selector.startupPosition of
StartupPosition backend
FromBeginning -> Integer -> m Integer
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (-Integer
1)
FromLastProcessed Cursor backend
cursor -> Integer -> m Integer
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Integer -> m Integer) -> Integer -> m Integer
forall a b. (a -> b) -> a -> b
$ Cursor backend -> Integer
forall backend. StoreCursor backend => Cursor backend -> Integer
makeSequenceNo Cursor backend
cursor
notifyVar <- liftIO $ atomically $ do
state <- readTVar stateVar
case selector.streamId of
StreamSelector
AllStreams -> do
TVar Integer -> STM (TVar Integer)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure StoreState backend
state.globalNotification
SingleStream StreamId
sid -> do
case StreamId -> Map StreamId (TVar Integer) -> Maybe (TVar Integer)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
sid StoreState backend
state.streamNotifications of
Just TVar Integer
var -> TVar Integer -> STM (TVar Integer)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TVar Integer
var
Maybe (TVar Integer)
Nothing -> do
var <- Integer -> STM (TVar Integer)
forall a. a -> STM (TVar a)
newTVar Integer
startSeq
let newState =
StoreState backend
state{streamNotifications = Map.insert sid var state.streamNotifications}
writeTVar stateVar newState
pure var
let loop :: Integer -> m ()
loop Integer
position = do
eventResult <- IO [StoredEvent] -> m [StoredEvent]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [StoredEvent] -> m [StoredEvent])
-> IO [StoredEvent] -> m [StoredEvent]
forall a b. (a -> b) -> a -> b
$ STM [StoredEvent] -> IO [StoredEvent]
forall a. STM a -> IO a
atomically (STM [StoredEvent] -> IO [StoredEvent])
-> STM [StoredEvent] -> IO [StoredEvent]
forall a b. (a -> b) -> a -> b
$ do
relevantEvents <- case EventSelector backend
selector.streamId of
StreamSelector
AllStreams -> do
state <- TVar (StoreState backend) -> STM (StoreState backend)
forall a. TVar a -> STM a
readTVar TVar (StoreState backend)
stateVar
pure $ sortOn seqNo [ev | (seqNo, ev) <- Map.toList state.events, seqNo > position]
SingleStream StreamId
sid -> do
state <- TVar (StoreState backend) -> STM (StoreState backend)
forall a. TVar a -> STM a
readTVar TVar (StoreState backend)
stateVar
pure $
sortOn
seqNo
[ ev
| seqNos <- toList $ Map.lookup sid state.streamEvents
, seqNo <- seqNos
, seqNo > position
, ev <- toList $ Map.lookup seqNo state.events
]
pure relevantEvents
case eventResult of
[] -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
curr <- TVar Integer -> STM Integer
forall a. TVar a -> STM a
readTVar TVar Integer
notifyVar
when (curr <= position) $ do
retry
Integer -> m ()
loop Integer
position
[StoredEvent]
events -> do
let maxSeq :: Integer
maxSeq = [Integer] -> Integer
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum ([Integer] -> Integer) -> [Integer] -> Integer
forall a b. (a -> b) -> a -> b
$ (StoredEvent -> Integer) -> [StoredEvent] -> [Integer]
forall a b. (a -> b) -> [a] -> [b]
map StoredEvent -> Integer
seqNo [StoredEvent]
events
result <- EventMatcher ts backend m -> [StoredEvent] -> m SubscriptionResult
forall (ts :: [Symbol]) (m :: * -> *) backend.
(MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
EventMatcher ts backend m -> [StoredEvent] -> m SubscriptionResult
processEvents EventMatcher ts backend m
matcher [StoredEvent]
events
case result of
SubscriptionResult
Stop -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
SubscriptionResult
Continue ->
Integer -> m ()
loop Integer
maxSeq
withRunInIO $ \forall a. m a -> IO a
runInIO -> do
workerThread <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ m () -> IO ()
forall a. m a -> IO a
runInIO (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ Integer -> m ()
loop Integer
startSeq
pure $
SubscriptionHandle
{ cancel = cancel workerThread
, wait = wait workerThread
}