{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
module Test.Hindsight.Store.StreamVersionTests (streamVersionTests) where
import Control.Concurrent (MVar, newEmptyMVar, putMVar, takeMVar)
import Data.IORef
import Data.Map.Strict qualified as Map
import Data.UUID.V4 qualified as UUID
import Hindsight.Store
import Test.Hindsight.Examples (Tombstone, UserCreated, makeTombstone, makeUserEvent)
import Test.Hindsight.Store.Common (handleTombstone)
import Test.Hindsight.Store.TestRunner (EventStoreTestRunner (..))
import Test.Tasty
import Test.Tasty.HUnit
streamVersionTests ::
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
EventStoreTestRunner backend ->
[TestTree]
streamVersionTests :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
EventStoreTestRunner backend -> [TestTree]
streamVersionTests EventStoreTestRunner backend
runner =
[ TestName -> Assertion -> TestTree
testCase TestName
"Stream Versions Start At 1" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testStreamVersionsStartAt1
, TestName -> Assertion -> TestTree
testCase TestName
"Stream Versions Are Contiguous" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testStreamVersionsContiguous
, TestName -> Assertion -> TestTree
testCase TestName
"Stream Versions Exposed In Subscription" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testStreamVersionExposedInSubscription
, TestName -> Assertion -> TestTree
testCase TestName
"Multiple Streams Have Independent Versions" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testIndependentStreamVersions
]
testStreamVersionsStartAt1 :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testStreamVersionsStartAt1 :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testStreamVersionsStartAt1 BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
receivedVersions <- newIORef []
completionVar <- newEmptyMVar
let collectVersions IORef [a]
ref r
event = do
IORef [a] -> ([a] -> ([a], ())) -> Assertion
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [a]
ref (\[a]
versions -> (r
event.streamVersion a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
versions, ()))
SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
_ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite NoStream [makeUserEvent 1, makeTombstone])]))
handle <-
subscribe
store
( match UserCreated (collectVersions receivedVersions)
:? match Tombstone (handleTombstone completionVar)
:? MatchEnd
)
EventSelector{streamId = SingleStream streamId, startupPosition = FromBeginning}
takeMVar completionVar
handle.cancel
versions <- reverse <$> readIORef receivedVersions
versions @?= [StreamVersion 1]
testStreamVersionsContiguous :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testStreamVersionsContiguous :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testStreamVersionsContiguous BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
receivedVersions <- newIORef []
completionVar <- newEmptyMVar
let collectVersions IORef [a]
ref r
event = do
IORef [a] -> ([a] -> ([a], ())) -> Assertion
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [a]
ref (\[a]
versions -> (r
event.streamVersion a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
versions, ()))
SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
_ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite NoStream [makeUserEvent 1, makeUserEvent 2])]))
_ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite StreamExists [makeUserEvent 3])]))
_ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite StreamExists [makeUserEvent 4, makeUserEvent 5, makeTombstone])]))
handle <-
subscribe
store
( match UserCreated (collectVersions receivedVersions)
:? match Tombstone (handleTombstone completionVar)
:? MatchEnd
)
EventSelector{streamId = SingleStream streamId, startupPosition = FromBeginning}
takeMVar completionVar
handle.cancel
versions <- reverse <$> readIORef receivedVersions
versions @?= [StreamVersion 1, StreamVersion 2, StreamVersion 3, StreamVersion 4, StreamVersion 5]
testStreamVersionExposedInSubscription :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testStreamVersionExposedInSubscription :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testStreamVersionExposedInSubscription BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
receivedVersionRef <- newIORef Nothing
completionVar <- newEmptyMVar
let captureVersion IORef (Maybe a)
ref r
event = do
IORef (Maybe a) -> (Maybe a -> (Maybe a, ())) -> Assertion
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Maybe a)
ref (\Maybe a
_ -> (a -> Maybe a
forall a. a -> Maybe a
Just r
event.streamVersion, ()))
SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
_ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite NoStream [makeUserEvent 1, makeTombstone])]))
handle <-
subscribe
store
( match UserCreated (captureVersion receivedVersionRef)
:? match Tombstone (handleTombstone completionVar)
:? MatchEnd
)
EventSelector{streamId = SingleStream streamId, startupPosition = FromBeginning}
takeMVar completionVar
handle.cancel
mbVersion <- readIORef receivedVersionRef
mbVersion @?= Just (StreamVersion 1)
testIndependentStreamVersions :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testIndependentStreamVersions :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testIndependentStreamVersions BackendHandle backend
store = do
streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
streamB <- StreamId <$> UUID.nextRandom
streamC <- StreamId <$> UUID.nextRandom
versionsA <- newIORef []
versionsB <- newIORef []
versionsC <- newIORef []
completionVar <- newEmptyMVar
let collectVersionsFor IORef [a]
ref r
event = do
IORef [a] -> ([a] -> ([a], ())) -> Assertion
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [a]
ref (\[a]
versions -> (r
event.streamVersion a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
versions, ()))
SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
_ <-
insertEvents store Nothing $
Transaction
( Map.fromList
[ (streamA, StreamWrite NoStream [makeUserEvent 1, makeUserEvent 2])
, (streamB, StreamWrite NoStream [makeUserEvent 10])
, (streamC, StreamWrite NoStream [makeUserEvent 100, makeUserEvent 101, makeUserEvent 102])
]
)
_ <-
insertEvents store Nothing $
Transaction
( Map.fromList
[ (streamA, StreamWrite StreamExists [makeUserEvent 3])
, (streamB, StreamWrite StreamExists [makeUserEvent 11, makeUserEvent 12])
]
)
_ <-
insertEvents store Nothing $
Transaction
( Map.fromList
[ (streamA, StreamWrite StreamExists [makeTombstone])
, (streamB, StreamWrite StreamExists [makeTombstone])
, (streamC, StreamWrite StreamExists [makeTombstone])
]
)
remainingTombstones <- newIORef (3 :: Int)
let handleTombstones :: EventHandler Tombstone IO backend
handleTombstones EventEnvelope Tombstone backend
_ = do
remaining <- IORef Int -> (Int -> (Int, Int)) -> IO Int
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
remainingTombstones (\Int
n -> (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1))
if remaining == 0
then putMVar completionVar () >> pure Stop
else pure Continue
handle <-
subscribe
store
( match
UserCreated
( \EventEnvelope UserCreated backend
event -> do
case EventEnvelope UserCreated backend
event.streamId of
StreamId
sid
| StreamId
sid StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
streamA -> IORef [StreamVersion] -> EventHandler UserCreated IO backend
forall {r} {a}.
HasField "streamVersion" r a =>
IORef [a] -> r -> IO SubscriptionResult
collectVersionsFor IORef [StreamVersion]
versionsA EventEnvelope UserCreated backend
event
| StreamId
sid StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
streamB -> IORef [StreamVersion] -> EventHandler UserCreated IO backend
forall {r} {a}.
HasField "streamVersion" r a =>
IORef [a] -> r -> IO SubscriptionResult
collectVersionsFor IORef [StreamVersion]
versionsB EventEnvelope UserCreated backend
event
| StreamId
sid StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
streamC -> IORef [StreamVersion] -> EventHandler UserCreated IO backend
forall {r} {a}.
HasField "streamVersion" r a =>
IORef [a] -> r -> IO SubscriptionResult
collectVersionsFor IORef [StreamVersion]
versionsC EventEnvelope UserCreated backend
event
| Bool
otherwise -> SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
)
:? match Tombstone handleTombstones
:? MatchEnd
)
EventSelector{streamId = AllStreams, startupPosition = FromBeginning}
takeMVar completionVar
handle.cancel
vA <- reverse <$> readIORef versionsA
vB <- reverse <$> readIORef versionsB
vC <- reverse <$> readIORef versionsC
vA @?= [StreamVersion 1, StreamVersion 2, StreamVersion 3]
vB @?= [StreamVersion 1, StreamVersion 2, StreamVersion 3]
vC @?= [StreamVersion 1, StreamVersion 2, StreamVersion 3]