{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
module Hindsight.Store.Memory (
MemoryStore,
MemoryStoreHandle,
MemoryCursor (..),
newMemoryStore,
module Hindsight.Store,
)
where
import Control.Concurrent.STM
import Control.Monad (forM, forM_)
import Control.Monad.IO.Class (liftIO)
import Data.Aeson (FromJSON (..), ToJSON (..))
import Data.Map.Strict qualified as Map
import Data.Time (getCurrentTime)
import Data.UUID.V4 qualified as UUID
import GHC.Generics (Generic)
import Hindsight.Store
import Hindsight.Store.Memory.Internal
import UnliftIO (MonadUnliftIO)
newtype MemoryCursor = MemoryCursor
{ MemoryCursor -> Integer
getSequenceNo :: Integer
}
deriving stock (Int -> MemoryCursor -> ShowS
[MemoryCursor] -> ShowS
MemoryCursor -> String
(Int -> MemoryCursor -> ShowS)
-> (MemoryCursor -> String)
-> ([MemoryCursor] -> ShowS)
-> Show MemoryCursor
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MemoryCursor -> ShowS
showsPrec :: Int -> MemoryCursor -> ShowS
$cshow :: MemoryCursor -> String
show :: MemoryCursor -> String
$cshowList :: [MemoryCursor] -> ShowS
showList :: [MemoryCursor] -> ShowS
Show, MemoryCursor -> MemoryCursor -> Bool
(MemoryCursor -> MemoryCursor -> Bool)
-> (MemoryCursor -> MemoryCursor -> Bool) -> Eq MemoryCursor
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MemoryCursor -> MemoryCursor -> Bool
== :: MemoryCursor -> MemoryCursor -> Bool
$c/= :: MemoryCursor -> MemoryCursor -> Bool
/= :: MemoryCursor -> MemoryCursor -> Bool
Eq, Eq MemoryCursor
Eq MemoryCursor =>
(MemoryCursor -> MemoryCursor -> Ordering)
-> (MemoryCursor -> MemoryCursor -> Bool)
-> (MemoryCursor -> MemoryCursor -> Bool)
-> (MemoryCursor -> MemoryCursor -> Bool)
-> (MemoryCursor -> MemoryCursor -> Bool)
-> (MemoryCursor -> MemoryCursor -> MemoryCursor)
-> (MemoryCursor -> MemoryCursor -> MemoryCursor)
-> Ord MemoryCursor
MemoryCursor -> MemoryCursor -> Bool
MemoryCursor -> MemoryCursor -> Ordering
MemoryCursor -> MemoryCursor -> MemoryCursor
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: MemoryCursor -> MemoryCursor -> Ordering
compare :: MemoryCursor -> MemoryCursor -> Ordering
$c< :: MemoryCursor -> MemoryCursor -> Bool
< :: MemoryCursor -> MemoryCursor -> Bool
$c<= :: MemoryCursor -> MemoryCursor -> Bool
<= :: MemoryCursor -> MemoryCursor -> Bool
$c> :: MemoryCursor -> MemoryCursor -> Bool
> :: MemoryCursor -> MemoryCursor -> Bool
$c>= :: MemoryCursor -> MemoryCursor -> Bool
>= :: MemoryCursor -> MemoryCursor -> Bool
$cmax :: MemoryCursor -> MemoryCursor -> MemoryCursor
max :: MemoryCursor -> MemoryCursor -> MemoryCursor
$cmin :: MemoryCursor -> MemoryCursor -> MemoryCursor
min :: MemoryCursor -> MemoryCursor -> MemoryCursor
Ord, (forall x. MemoryCursor -> Rep MemoryCursor x)
-> (forall x. Rep MemoryCursor x -> MemoryCursor)
-> Generic MemoryCursor
forall x. Rep MemoryCursor x -> MemoryCursor
forall x. MemoryCursor -> Rep MemoryCursor x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. MemoryCursor -> Rep MemoryCursor x
from :: forall x. MemoryCursor -> Rep MemoryCursor x
$cto :: forall x. Rep MemoryCursor x -> MemoryCursor
to :: forall x. Rep MemoryCursor x -> MemoryCursor
Generic)
deriving anyclass (Maybe MemoryCursor
Value -> Parser [MemoryCursor]
Value -> Parser MemoryCursor
(Value -> Parser MemoryCursor)
-> (Value -> Parser [MemoryCursor])
-> Maybe MemoryCursor
-> FromJSON MemoryCursor
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser MemoryCursor
parseJSON :: Value -> Parser MemoryCursor
$cparseJSONList :: Value -> Parser [MemoryCursor]
parseJSONList :: Value -> Parser [MemoryCursor]
$comittedField :: Maybe MemoryCursor
omittedField :: Maybe MemoryCursor
FromJSON, [MemoryCursor] -> Value
[MemoryCursor] -> Encoding
MemoryCursor -> Bool
MemoryCursor -> Value
MemoryCursor -> Encoding
(MemoryCursor -> Value)
-> (MemoryCursor -> Encoding)
-> ([MemoryCursor] -> Value)
-> ([MemoryCursor] -> Encoding)
-> (MemoryCursor -> Bool)
-> ToJSON MemoryCursor
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: MemoryCursor -> Value
toJSON :: MemoryCursor -> Value
$ctoEncoding :: MemoryCursor -> Encoding
toEncoding :: MemoryCursor -> Encoding
$ctoJSONList :: [MemoryCursor] -> Value
toJSONList :: [MemoryCursor] -> Value
$ctoEncodingList :: [MemoryCursor] -> Encoding
toEncodingList :: [MemoryCursor] -> Encoding
$comitField :: MemoryCursor -> Bool
omitField :: MemoryCursor -> Bool
ToJSON)
type instance Cursor MemoryStore = MemoryCursor
type instance BackendHandle MemoryStore = MemoryStoreHandle
newtype MemoryStoreHandle = MemoryStoreHandle
{ MemoryStoreHandle -> TVar (StoreState MemoryStore)
stateVar :: TVar (StoreState MemoryStore)
}
newMemoryStore :: IO MemoryStoreHandle
newMemoryStore :: IO MemoryStoreHandle
newMemoryStore = do
globalVar <- Integer -> IO (TVar Integer)
forall a. a -> IO (TVar a)
newTVarIO (-Integer
1)
MemoryStoreHandle <$> newTVarIO (initialState globalVar)
where
initialState :: TVar Integer -> StoreState backend
initialState TVar Integer
globalVar =
StoreState
{ nextSequence :: Integer
nextSequence = Integer
0
, events :: Map Integer StoredEvent
events = Map Integer StoredEvent
forall k a. Map k a
Map.empty
, streamEvents :: Map StreamId [Integer]
streamEvents = Map StreamId [Integer]
forall k a. Map k a
Map.empty
, streamVersions :: Map StreamId (Cursor backend)
streamVersions = Map StreamId (Cursor backend)
forall k a. Map k a
Map.empty
, streamLocalVersions :: Map StreamId StreamVersion
streamLocalVersions = Map StreamId StreamVersion
forall k a. Map k a
Map.empty
, streamNotifications :: Map StreamId (TVar Integer)
streamNotifications = Map StreamId (TVar Integer)
forall k a. Map k a
Map.empty
, globalNotification :: TVar Integer
globalNotification = TVar Integer
globalVar
}
data MemoryStore
instance EventStore MemoryStore where
type StoreConstraints MemoryStore m = (MonadUnliftIO m)
insertEvents :: forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints MemoryStore m) =>
BackendHandle MemoryStore
-> Maybe CorrelationId
-> Transaction t MemoryStore
-> m (InsertionResult MemoryStore)
insertEvents BackendHandle MemoryStore
handle Maybe CorrelationId
corrId Transaction t MemoryStore
transaction = IO (InsertionResult MemoryStore) -> m (InsertionResult MemoryStore)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (InsertionResult MemoryStore)
-> m (InsertionResult MemoryStore))
-> IO (InsertionResult MemoryStore)
-> m (InsertionResult MemoryStore)
forall a b. (a -> b) -> a -> b
$ do
now <- IO UTCTime
getCurrentTime
eventIds <- forM [1 .. totalEvents] $ \Int
_ -> UUID -> EventId
EventId (UUID -> EventId) -> IO UUID -> IO EventId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
atomically $ do
state <- readTVar handle.stateVar
case checkAllVersions state batches of
Left EventStoreError MemoryStore
mismatch -> InsertionResult MemoryStore -> STM (InsertionResult MemoryStore)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult MemoryStore -> STM (InsertionResult MemoryStore))
-> InsertionResult MemoryStore -> STM (InsertionResult MemoryStore)
forall a b. (a -> b) -> a -> b
$ EventStoreError MemoryStore -> InsertionResult MemoryStore
forall backend. EventStoreError backend -> InsertionResult backend
FailedInsertion EventStoreError MemoryStore
mismatch
Right () -> do
let (StoreState MemoryStore
newState, Cursor MemoryStore
finalCursor, Map StreamId (Cursor MemoryStore)
streamCursors) = StoreState MemoryStore
-> Maybe CorrelationId
-> UTCTime
-> [EventId]
-> Map StreamId (StreamWrite t SomeLatestEvent MemoryStore)
-> (StoreState MemoryStore, Cursor MemoryStore,
Map StreamId (Cursor MemoryStore))
forall backend (t :: * -> *).
(StoreCursor backend, Foldable t) =>
StoreState backend
-> Maybe CorrelationId
-> UTCTime
-> [EventId]
-> Map StreamId (StreamWrite t SomeLatestEvent backend)
-> (StoreState backend, Cursor backend,
Map StreamId (Cursor backend))
insertAllEvents StoreState MemoryStore
state Maybe CorrelationId
corrId UTCTime
now [EventId]
eventIds Map StreamId (StreamWrite t SomeLatestEvent MemoryStore)
batches
TVar (StoreState MemoryStore) -> StoreState MemoryStore -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar BackendHandle MemoryStore
MemoryStoreHandle
handle.stateVar StoreState MemoryStore
newState
[StreamId] -> (StreamId -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map StreamId (StreamWrite t SomeLatestEvent MemoryStore)
-> [StreamId]
forall k a. Map k a -> [k]
Map.keys Map StreamId (StreamWrite t SomeLatestEvent MemoryStore)
batches) ((StreamId -> STM ()) -> STM ()) -> (StreamId -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \StreamId
streamId -> do
Maybe (TVar Integer) -> (TVar Integer -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (StreamId -> Map StreamId (TVar Integer) -> Maybe (TVar Integer)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamId StoreState MemoryStore
state.streamNotifications) ((TVar Integer -> STM ()) -> STM ())
-> (TVar Integer -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \TVar Integer
var ->
TVar Integer -> Integer -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Integer
var (MemoryCursor -> Integer
getSequenceNo Cursor MemoryStore
MemoryCursor
finalCursor)
TVar Integer -> Integer -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar StoreState MemoryStore
state.globalNotification (MemoryCursor -> Integer
getSequenceNo Cursor MemoryStore
MemoryCursor
finalCursor)
InsertionResult MemoryStore -> STM (InsertionResult MemoryStore)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult MemoryStore -> STM (InsertionResult MemoryStore))
-> InsertionResult MemoryStore -> STM (InsertionResult MemoryStore)
forall a b. (a -> b) -> a -> b
$
InsertionSuccess MemoryStore -> InsertionResult MemoryStore
forall backend. InsertionSuccess backend -> InsertionResult backend
SuccessfulInsertion (InsertionSuccess MemoryStore -> InsertionResult MemoryStore)
-> InsertionSuccess MemoryStore -> InsertionResult MemoryStore
forall a b. (a -> b) -> a -> b
$
InsertionSuccess
{ finalCursor :: Cursor MemoryStore
finalCursor = Cursor MemoryStore
finalCursor
, streamCursors :: Map StreamId (Cursor MemoryStore)
streamCursors = Map StreamId (Cursor MemoryStore)
streamCursors
}
where
batches :: Map StreamId (StreamWrite t SomeLatestEvent MemoryStore)
batches = Transaction t MemoryStore
transaction.transactionWrites
totalEvents :: Int
totalEvents = [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int] -> Int) -> [Int] -> Int
forall a b. (a -> b) -> a -> b
$ (StreamWrite t SomeLatestEvent MemoryStore -> Int)
-> [StreamWrite t SomeLatestEvent MemoryStore] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (t SomeLatestEvent -> Int
forall a. t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (t SomeLatestEvent -> Int)
-> (StreamWrite t SomeLatestEvent MemoryStore -> t SomeLatestEvent)
-> StreamWrite t SomeLatestEvent MemoryStore
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (.events)) ([StreamWrite t SomeLatestEvent MemoryStore] -> [Int])
-> [StreamWrite t SomeLatestEvent MemoryStore] -> [Int]
forall a b. (a -> b) -> a -> b
$ Map StreamId (StreamWrite t SomeLatestEvent MemoryStore)
-> [StreamWrite t SomeLatestEvent MemoryStore]
forall k a. Map k a -> [a]
Map.elems Map StreamId (StreamWrite t SomeLatestEvent MemoryStore)
batches
subscribe :: forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints MemoryStore m =>
BackendHandle MemoryStore
-> EventMatcher ts MemoryStore m
-> EventSelector MemoryStore
-> m (SubscriptionHandle MemoryStore)
subscribe BackendHandle MemoryStore
handle EventMatcher ts MemoryStore m
matcher EventSelector MemoryStore
selector = TVar (StoreState MemoryStore)
-> EventMatcher ts MemoryStore m
-> EventSelector MemoryStore
-> m (SubscriptionHandle MemoryStore)
forall (m :: * -> *) backend (ts :: [Symbol]).
(MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
TVar (StoreState backend)
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribeToEvents BackendHandle MemoryStore
MemoryStoreHandle
handle.stateVar EventMatcher ts MemoryStore m
matcher EventSelector MemoryStore
selector
instance StoreCursor MemoryStore where
makeCursor :: Integer -> Cursor MemoryStore
makeCursor = Integer -> Cursor MemoryStore
Integer -> MemoryCursor
MemoryCursor
makeSequenceNo :: Cursor MemoryStore -> Integer
makeSequenceNo = (.getSequenceNo)