{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wno-orphans #-}
module Test.Hindsight.Store.TestRunner (
EventStoreTestRunner (..),
runTest,
runMultiInstanceTest,
repeatTest,
genericEventStoreTests,
multiInstanceTests,
collectEventsUntilTombstone,
handleTombstone,
extractUserInfo,
)
where
import Control.Concurrent (MVar, newEmptyMVar, putMVar, takeMVar, threadDelay)
import Control.Monad (forM, forM_, replicateM, replicateM_)
import Data.IORef
import Data.Map.Strict qualified as Map
import Data.Maybe (mapMaybe)
import Data.Text (Text)
import Data.Typeable (cast)
import Data.UUID.V4 qualified as UUID
import Hindsight.Events (SomeLatestEvent)
import Hindsight.Store
import Test.Hindsight.Examples (UserCreated, UserInformation2 (..))
import Test.Hindsight.Store.Common
import Test.Tasty
import Test.Tasty.HUnit
import UnliftIO.Async (async, concurrently, wait)
import UnliftIO.Exception (fromException, throwIO, tryAny)
data EventStoreTestRunner backend = EventStoreTestRunner
{ forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> IO ()
withStore :: forall a. (BackendHandle backend -> IO a) -> IO ()
, forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> IO ()
withStores :: forall a. Int -> ([BackendHandle backend] -> IO a) -> IO ()
}
genericEventStoreTests ::
forall backend.
(EventStore backend, StoreConstraints backend IO, Show (Cursor backend), Ord (Cursor backend)) =>
EventStoreTestRunner backend ->
[TestTree]
genericEventStoreTests :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend), Ord (Cursor backend)) =>
EventStoreTestRunner backend -> [TestTree]
genericEventStoreTests EventStoreTestRunner backend
runner =
[ TestName -> [TestTree] -> TestTree
testGroup
TestName
"Basic Tests"
[ TestName -> IO () -> TestTree
testCase TestName
"Basic Event Reception" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testBasicEventReception)
, TestName -> IO () -> TestTree
testCase TestName
"Correlation ID Preservation" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testCorrelationIdPreservation)
, TestName -> IO () -> TestTree
testCase TestName
"Single Stream Selection" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testSingleStreamSelection)
, TestName -> IO () -> TestTree
testCase TestName
"Start From Position" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testStartFromPosition)
, Int -> TestName -> IO () -> TestTree
repeatTest Int
20 TestName
"Async Subscription Reception" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testAsyncSubscription)
, TestName -> IO () -> TestTree
testCase TestName
"Subscription Honors Stop Result" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testSubscriptionStopBehavior)
, TestName -> IO () -> TestTree
testCase TestName
"Handler Exception Enrichment" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testHandlerExceptionEnrichment)
]
, TestName -> [TestTree] -> TestTree
testGroup
TestName
"Stream Version Tests"
[ TestName -> IO () -> TestTree
testCase TestName
"Stream Versions Start At 1" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testStreamVersionsStartAt1)
, TestName -> IO () -> TestTree
testCase TestName
"Stream Versions Are Contiguous" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testStreamVersionsContiguous)
, TestName -> IO () -> TestTree
testCase TestName
"Stream Versions Exposed In Subscription" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testStreamVersionExposedInSubscription)
, TestName -> IO () -> TestTree
testCase TestName
"Multiple Streams Have Independent Versions" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testIndependentStreamVersions)
]
, TestName -> [TestTree] -> TestTree
testGroup
TestName
"Consistency Tests"
[ TestName -> IO () -> TestTree
testCase TestName
"No Stream Condition" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testNoStreamCondition)
, TestName -> IO () -> TestTree
testCase TestName
"Stream Exists Condition" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testStreamExistsCondition)
, TestName -> IO () -> TestTree
testCase TestName
"Exact Version Condition" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testExactVersionCondition)
, TestName -> IO () -> TestTree
testCase TestName
"Exact Stream Version Condition" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testExactStreamVersionCondition)
, TestName -> IO () -> TestTree
testCase TestName
"Concurrent Writes" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testConcurrentWrites)
, TestName -> IO () -> TestTree
testCase TestName
"Batch Atomicity" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testBatchAtomicity)
, TestName -> IO () -> TestTree
testCase TestName
"Multi-Stream Consistency" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testMultiStreamConsistency)
, TestName -> IO () -> TestTree
testCase TestName
"Version Expectation Race Condition" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testVersionExpectationRaceCondition)
, TestName -> IO () -> TestTree
testCase TestName
"Any Expectation Concurrency" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testAnyExpectationConcurrency)
, TestName -> IO () -> TestTree
testCase TestName
"Mixed Version Expectations" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testMixedVersionExpectations)
, TestName -> IO () -> TestTree
testCase TestName
"Cascading Version Dependencies" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (EventStoreError backend)) =>
BackendHandle backend -> IO ()
testCascadingVersionDependencies)
, TestName -> IO () -> TestTree
testCase TestName
"Multi-Stream Head Consistency" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testMultiStreamHeadConsistency)
, TestName -> IO () -> TestTree
testCase TestName
"Empty Batch Insertion" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testEmptyBatchInsertion)
, TestName -> IO () -> TestTree
testCase TestName
"Mixed Empty and Non-Empty Streams" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testMixedEmptyStreams)
]
, TestName -> [TestTree] -> TestTree
testGroup
TestName
"Per-Stream Cursor Tests"
[ TestName -> IO () -> TestTree
testCase TestName
"Per-Stream Cursor Extraction" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testPerStreamCursorExtraction)
, TestName -> IO () -> TestTree
testCase TestName
"Cursor Independence" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testCursorIndependence)
, TestName -> IO () -> TestTree
testCase TestName
"Stale Cursor Per Stream" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testStaleCursorPerStream)
, TestName -> IO () -> TestTree
testCase TestName
"Cursor Completeness" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend), Ord (Cursor backend)) =>
BackendHandle backend -> IO ()
testCursorCompleteness)
, TestName -> IO () -> TestTree
testCase TestName
"Empty Stream Cursor Handling" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testEmptyStreamCursorHandling)
]
]
multiInstanceTests ::
forall backend.
(EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) =>
EventStoreTestRunner backend ->
[TestTree]
multiInstanceTests :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
EventStoreTestRunner backend -> [TestTree]
multiInstanceTests EventStoreTestRunner backend
runner =
[ TestName -> [TestTree] -> TestTree
testGroup
TestName
"Multi-Instance Tests"
[ TestName -> IO () -> TestTree
testCase TestName
"Multi-Instance Subscriptions (2 instances)" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> Int -> ([BackendHandle backend] -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> Int -> ([BackendHandle backend] -> IO ()) -> IO ()
runMultiInstanceTest EventStoreTestRunner backend
runner Int
2 ([BackendHandle backend] -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
[BackendHandle backend] -> IO ()
testMultiInstanceSubscription)
, TestName -> IO () -> TestTree
testCase TestName
"Multi-Instance Subscriptions (5 instances)" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> Int -> ([BackendHandle backend] -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> Int -> ([BackendHandle backend] -> IO ()) -> IO ()
runMultiInstanceTest EventStoreTestRunner backend
runner Int
5 ([BackendHandle backend] -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
[BackendHandle backend] -> IO ()
testMultiInstanceSubscription)
, TestName -> IO () -> TestTree
testCase TestName
"Multi-Instance Subscriptions (10 instances)" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> Int -> ([BackendHandle backend] -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> Int -> ([BackendHandle backend] -> IO ()) -> IO ()
runMultiInstanceTest EventStoreTestRunner backend
runner Int
10 ([BackendHandle backend] -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
[BackendHandle backend] -> IO ()
testMultiInstanceSubscription)
]
]
repeatTest :: Int -> TestName -> Assertion -> TestTree
repeatTest :: Int -> TestName -> IO () -> TestTree
repeatTest Int
n TestName
name IO ()
assertion =
TestName -> [TestTree] -> TestTree
testGroup (TestName
name TestName -> TestName -> TestName
forall a. Semigroup a => a -> a -> a
<> TestName
" x" TestName -> TestName -> TestName
forall a. Semigroup a => a -> a -> a
<> Int -> TestName
forall a. Show a => a -> TestName
show Int
n) ([TestTree] -> TestTree) -> [TestTree] -> TestTree
forall a b. (a -> b) -> a -> b
$
Int -> TestTree -> [TestTree]
forall a. Int -> a -> [a]
replicate Int
n (TestTree -> [TestTree]) -> TestTree -> [TestTree]
forall a b. (a -> b) -> a -> b
$
TestName -> IO () -> TestTree
testCase TestName
name IO ()
assertion
runTest :: EventStoreTestRunner backend -> (BackendHandle backend -> IO ()) -> IO ()
runTest :: forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner BackendHandle backend -> IO ()
action = EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> IO ()
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> IO ()
withStore EventStoreTestRunner backend
runner BackendHandle backend -> IO ()
action
runMultiInstanceTest :: EventStoreTestRunner backend -> Int -> ([BackendHandle backend] -> IO ()) -> IO ()
runMultiInstanceTest :: forall backend.
EventStoreTestRunner backend
-> Int -> ([BackendHandle backend] -> IO ()) -> IO ()
runMultiInstanceTest EventStoreTestRunner backend
runner Int
n [BackendHandle backend] -> IO ()
action = EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> IO ()
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> IO ()
withStores EventStoreTestRunner backend
runner Int
n [BackendHandle backend] -> IO ()
action
collectEventsUntilTombstone :: IORef [EventEnvelope UserCreated backend] -> EventHandler UserCreated IO backend
collectEventsUntilTombstone :: forall backend.
IORef [EventEnvelope UserCreated backend]
-> EventHandler UserCreated IO backend
collectEventsUntilTombstone IORef [EventEnvelope UserCreated backend]
ref EventEnvelope UserCreated backend
event = do
IORef [EventEnvelope UserCreated backend]
-> ([EventEnvelope UserCreated backend]
-> ([EventEnvelope UserCreated backend], ()))
-> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [EventEnvelope UserCreated backend]
ref (\[EventEnvelope UserCreated backend]
events -> (EventEnvelope UserCreated backend
event EventEnvelope UserCreated backend
-> [EventEnvelope UserCreated backend]
-> [EventEnvelope UserCreated backend]
forall a. a -> [a] -> [a]
: [EventEnvelope UserCreated backend]
events, ()))
SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
handleTombstone :: MVar () -> EventHandler Tombstone IO backend
handleTombstone :: forall backend. MVar () -> EventHandler Tombstone IO backend
handleTombstone MVar ()
completionVar EventEnvelope Tombstone backend
_ = do
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
completionVar ()
SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Stop
extractUserInfo :: EventEnvelope UserCreated backend -> Maybe UserInformation2
EventEnvelope UserCreated backend
envelope = UserInformation2 -> Maybe UserInformation2
forall a b. (Typeable a, Typeable b) => a -> Maybe b
cast EventEnvelope UserCreated backend
envelope.payload
testBasicEventReception :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testBasicEventReception :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testBasicEventReception BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
receivedEvents <- newIORef []
completionVar <- newEmptyMVar
let testEvents = (Int -> SomeLatestEvent) -> [Int] -> [SomeLatestEvent]
forall a b. (a -> b) -> [a] -> [b]
map Int -> SomeLatestEvent
makeUserEvent [Int
1 .. Int
3] [SomeLatestEvent] -> [SomeLatestEvent] -> [SomeLatestEvent]
forall a. [a] -> [a] -> [a]
++ [SomeLatestEvent
makeTombstone]
result <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))
case result of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert events: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
handle <-
BackendHandle backend
-> EventMatcher '[UserCreated, Tombstone] backend IO
-> EventSelector backend
-> IO (SubscriptionHandle backend)
forall backend (m :: * -> *) (ts :: [Symbol]).
(EventStore backend, StoreConstraints backend m) =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints backend m =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribe
BackendHandle backend
store
( EventHandler UserCreated IO backend
-> (Proxy UserCreated, EventHandler UserCreated IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy UserCreated, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match UserCreated (IORef [EventEnvelope UserCreated backend]
-> EventHandler UserCreated IO backend
forall backend.
IORef [EventEnvelope UserCreated backend]
-> EventHandler UserCreated IO backend
collectEventsUntilTombstone IORef [EventEnvelope UserCreated backend]
receivedEvents)
(Proxy UserCreated, EventHandler UserCreated IO backend)
-> EventMatcher '[Tombstone] backend IO
-> EventMatcher '[UserCreated, Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventHandler Tombstone IO backend
-> (Proxy Tombstone, EventHandler Tombstone IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy Tombstone, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match Tombstone (MVar () -> EventHandler Tombstone IO backend
forall backend. MVar () -> EventHandler Tombstone IO backend
handleTombstone MVar ()
completionVar)
(Proxy Tombstone, EventHandler Tombstone IO backend)
-> EventMatcher '[] backend IO
-> EventMatcher '[Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventMatcher '[] backend IO
forall backend (m :: * -> *). EventMatcher '[] backend m
MatchEnd
)
EventSelector{streamId :: StreamSelector
streamId = StreamSelector
AllStreams, startupPosition :: StartupPosition backend
startupPosition = StartupPosition backend
forall backend. StartupPosition backend
FromBeginning}
takeMVar completionVar
handle.cancel
events <- reverse <$> readIORef receivedEvents
length events @?= 3
let userInfos = (EventEnvelope UserCreated backend -> Maybe UserInformation2)
-> [EventEnvelope UserCreated backend] -> [UserInformation2]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe EventEnvelope UserCreated backend -> Maybe UserInformation2
forall backend.
EventEnvelope UserCreated backend -> Maybe UserInformation2
extractUserInfo [EventEnvelope UserCreated backend]
events
length userInfos @?= 3
let userNames :: [Text]
userNames = (UserInformation2 -> Text) -> [UserInformation2] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map UserInformation2 -> Text
userName [UserInformation2]
userInfos
userNames @?= ["user1", "user2", "user3"]
testSingleStreamSelection :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testSingleStreamSelection :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testSingleStreamSelection BackendHandle backend
store = do
stream1 <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
stream2 <- StreamId <$> UUID.nextRandom
receivedEvents <- newIORef []
completionVar <- newEmptyMVar
_ <- insertEvents store Nothing (multiEvent stream1 Any (map makeUserEvent [1 .. 3]))
_ <- insertEvents store Nothing (multiEvent stream2 Any (map makeUserEvent [4 .. 6]))
_ <- insertEvents store Nothing (Transaction (Map.fromList [(stream1, StreamWrite Any [makeTombstone])]))
handle <-
subscribe
store
( match UserCreated (collectEventsUntilTombstone receivedEvents)
:? match Tombstone (handleTombstone completionVar)
:? MatchEnd
)
EventSelector{streamId = SingleStream stream1, startupPosition = FromBeginning}
takeMVar completionVar
handle.cancel
events <- reverse <$> readIORef receivedEvents
let userInfos = (EventEnvelope UserCreated backend -> Maybe UserInformation2)
-> [EventEnvelope UserCreated backend] -> [UserInformation2]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe EventEnvelope UserCreated backend -> Maybe UserInformation2
forall backend.
EventEnvelope UserCreated backend -> Maybe UserInformation2
extractUserInfo [EventEnvelope UserCreated backend]
events
length userInfos @?= 3
let userNames = (UserInformation2 -> Text) -> [UserInformation2] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map UserInformation2 -> Text
userName [UserInformation2]
userInfos
userNames @?= ["user1", "user2", "user3"]
testStartFromPosition :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testStartFromPosition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testStartFromPosition BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
let testEvents = (Int -> SomeLatestEvent) -> [Int] -> [SomeLatestEvent]
forall a b. (a -> b) -> [a] -> [b]
map Int -> SomeLatestEvent
makeUserEvent [Int
1 .. Int
5]
result <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite Any (take 3 testEvents))]))
case result of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert first batch: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
cursor}) -> do
_ <- BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction ([(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(StreamId
streamId, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
Any (Int -> [SomeLatestEvent] -> [SomeLatestEvent]
forall a. Int -> [a] -> [a]
drop Int
3 [SomeLatestEvent]
testEvents))]))
_ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite Any [makeTombstone])]))
receivedEvents <- newIORef []
completionVar <- newEmptyMVar
handle <-
subscribe
store
( match UserCreated (collectEventsUntilTombstone receivedEvents)
:? match Tombstone (handleTombstone completionVar)
:? MatchEnd
)
EventSelector{streamId = AllStreams, startupPosition = FromLastProcessed cursor}
takeMVar completionVar
handle.cancel
events <- reverse <$> readIORef receivedEvents
let userInfos = (EventEnvelope UserCreated backend -> Maybe UserInformation2)
-> [EventEnvelope UserCreated backend] -> [UserInformation2]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe EventEnvelope UserCreated backend -> Maybe UserInformation2
forall backend.
EventEnvelope UserCreated backend -> Maybe UserInformation2
extractUserInfo [EventEnvelope UserCreated backend]
events
length userInfos @?= 2
let userNames :: [Text]
userNames = (UserInformation2 -> Text) -> [UserInformation2] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map UserInformation2 -> Text
userName [UserInformation2]
userInfos
userNames @?= ["user4", "user5"]
testCorrelationIdPreservation :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testCorrelationIdPreservation :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testCorrelationIdPreservation BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
corrId <- CorrelationId <$> UUID.nextRandom
receivedEvents <- newIORef []
completionVar <- newEmptyMVar
let testEvents = (Int -> SomeLatestEvent) -> [Int] -> [SomeLatestEvent]
forall a b. (a -> b) -> [a] -> [b]
map Int -> SomeLatestEvent
makeUserEvent [Int
1 .. Int
3] [SomeLatestEvent] -> [SomeLatestEvent] -> [SomeLatestEvent]
forall a. [a] -> [a] -> [a]
++ [SomeLatestEvent
makeTombstone]
result <- insertEvents store (Just corrId) (Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))
case result of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert events: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
handle <-
BackendHandle backend
-> EventMatcher '[UserCreated, Tombstone] backend IO
-> EventSelector backend
-> IO (SubscriptionHandle backend)
forall backend (m :: * -> *) (ts :: [Symbol]).
(EventStore backend, StoreConstraints backend m) =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints backend m =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribe
BackendHandle backend
store
( EventHandler UserCreated IO backend
-> (Proxy UserCreated, EventHandler UserCreated IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy UserCreated, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match UserCreated (IORef [EventEnvelope UserCreated backend]
-> EventHandler UserCreated IO backend
forall backend.
IORef [EventEnvelope UserCreated backend]
-> EventHandler UserCreated IO backend
collectEventsUntilTombstone IORef [EventEnvelope UserCreated backend]
receivedEvents)
(Proxy UserCreated, EventHandler UserCreated IO backend)
-> EventMatcher '[Tombstone] backend IO
-> EventMatcher '[UserCreated, Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventHandler Tombstone IO backend
-> (Proxy Tombstone, EventHandler Tombstone IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy Tombstone, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match Tombstone (MVar () -> EventHandler Tombstone IO backend
forall backend. MVar () -> EventHandler Tombstone IO backend
handleTombstone MVar ()
completionVar)
(Proxy Tombstone, EventHandler Tombstone IO backend)
-> EventMatcher '[] backend IO
-> EventMatcher '[Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventMatcher '[] backend IO
forall backend (m :: * -> *). EventMatcher '[] backend m
MatchEnd
)
EventSelector{streamId :: StreamSelector
streamId = StreamSelector
AllStreams, startupPosition :: StartupPosition backend
startupPosition = StartupPosition backend
forall backend. StartupPosition backend
FromBeginning}
takeMVar completionVar
handle.cancel
events <- readIORef receivedEvents
mapM_ (\EventEnvelope UserCreated backend
evt -> EventEnvelope UserCreated backend
evt.correlationId Maybe CorrelationId -> Maybe CorrelationId -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
@?= CorrelationId -> Maybe CorrelationId
forall a. a -> Maybe a
Just CorrelationId
corrId) events
testAsyncSubscription :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testAsyncSubscription :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testAsyncSubscription BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
receivedEvents <- newIORef []
completionVar <- newEmptyMVar
handle <-
subscribe
store
( match UserCreated (collectEventsUntilTombstone receivedEvents)
:? match Tombstone (handleTombstone completionVar)
:? MatchEnd
)
EventSelector{streamId = AllStreams, startupPosition = FromBeginning}
let testEvents = (Int -> SomeLatestEvent) -> [Int] -> [SomeLatestEvent]
forall a b. (a -> b) -> [a] -> [b]
map Int -> SomeLatestEvent
makeUserEvent [Int
1 .. Int
3] [SomeLatestEvent] -> [SomeLatestEvent] -> [SomeLatestEvent]
forall a. [a] -> [a] -> [a]
++ [SomeLatestEvent
makeTombstone]
result <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))
case result of
FailedInsertion EventStoreError backend
err -> do
SubscriptionHandle backend
handle.cancel
TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert events: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
completionVar
SubscriptionHandle backend
handle.cancel
events <- [EventEnvelope UserCreated backend]
-> [EventEnvelope UserCreated backend]
forall a. [a] -> [a]
reverse ([EventEnvelope UserCreated backend]
-> [EventEnvelope UserCreated backend])
-> IO [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend]
forall a. IORef a -> IO a
readIORef IORef [EventEnvelope UserCreated backend]
receivedEvents
length events @?= 3
let userInfos = (EventEnvelope UserCreated backend -> Maybe UserInformation2)
-> [EventEnvelope UserCreated backend] -> [UserInformation2]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe EventEnvelope UserCreated backend -> Maybe UserInformation2
forall backend.
EventEnvelope UserCreated backend -> Maybe UserInformation2
extractUserInfo [EventEnvelope UserCreated backend]
events
length userInfos @?= 3
let userNames :: [Text]
userNames = (UserInformation2 -> Text) -> [UserInformation2] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map UserInformation2 -> Text
userName [UserInformation2]
userInfos
userNames @?= ["user1", "user2", "user3"]
testSubscriptionStopBehavior :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testSubscriptionStopBehavior :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testSubscriptionStopBehavior BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
counter <- newIORef (0 :: Int)
completionVar <- newEmptyMVar
let handleInc :: EventHandler CounterInc IO backend
handleInc EventEnvelope CounterInc backend
_ = do
IORef Int -> (Int -> (Int, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
counter (\Int
n -> (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, ()))
SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
let handleStop :: EventHandler CounterStop IO backend
handleStop EventEnvelope CounterStop backend
_ = do
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
completionVar ()
SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Stop
handle <-
subscribe
store
( match CounterInc handleInc
:? match CounterStop handleStop
:? MatchEnd
)
EventSelector{streamId = AllStreams, startupPosition = FromBeginning}
let testEvents =
[ SomeLatestEvent
makeCounterInc
, SomeLatestEvent
makeCounterInc
, SomeLatestEvent
makeCounterStop
, SomeLatestEvent
makeCounterInc
, SomeLatestEvent
makeCounterInc
]
result <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))
case result of
FailedInsertion EventStoreError backend
err -> do
SubscriptionHandle backend
handle.cancel
TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert events: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
completionVar
Int -> IO ()
threadDelay Int
100000
SubscriptionHandle backend
handle.cancel
finalCount <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
counter
finalCount @?= 2
testHandlerExceptionEnrichment :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testHandlerExceptionEnrichment :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testHandlerExceptionEnrichment BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
counter <- newIORef (0 :: Int)
let handleInc :: EventHandler CounterInc IO backend
handleInc EventEnvelope CounterInc backend
_ = do
IORef Int -> (Int -> (Int, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
counter (\Int
n -> (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, ()))
SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
let handleFail :: EventHandler CounterFail IO backend
handleFail EventEnvelope CounterFail backend
_envelope = do
IOError -> IO SubscriptionResult
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (IOError -> IO SubscriptionResult)
-> IOError -> IO SubscriptionResult
forall a b. (a -> b) -> a -> b
$ TestName -> IOError
userError TestName
"Test exception from CounterFail handler"
let testEvents =
[ SomeLatestEvent
makeCounterInc
, SomeLatestEvent
makeCounterInc
, SomeLatestEvent
makeCounterFail
, SomeLatestEvent
makeCounterInc
, SomeLatestEvent
makeCounterInc
]
result <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))
case result of
FailedInsertion EventStoreError backend
err -> do
TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert events: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
handle <-
BackendHandle backend
-> EventMatcher '[CounterInc, CounterFail] backend IO
-> EventSelector backend
-> IO (SubscriptionHandle backend)
forall backend (m :: * -> *) (ts :: [Symbol]).
(EventStore backend, StoreConstraints backend m) =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints backend m =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribe
BackendHandle backend
store
( EventHandler CounterInc IO backend
-> (Proxy CounterInc, EventHandler CounterInc IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy CounterInc, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match CounterInc EventHandler CounterInc IO backend
handleInc
(Proxy CounterInc, EventHandler CounterInc IO backend)
-> EventMatcher '[CounterFail] backend IO
-> EventMatcher '[CounterInc, CounterFail] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventHandler CounterFail IO backend
-> (Proxy CounterFail, EventHandler CounterFail IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy CounterFail, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match CounterFail EventHandler CounterFail IO backend
handleFail
(Proxy CounterFail, EventHandler CounterFail IO backend)
-> EventMatcher '[] backend IO
-> EventMatcher '[CounterFail] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventMatcher '[] backend IO
forall backend (m :: * -> *). EventMatcher '[] backend m
MatchEnd
)
EventSelector{streamId :: StreamSelector
streamId = StreamSelector
AllStreams, startupPosition :: StartupPosition backend
startupPosition = StartupPosition backend
forall backend. StartupPosition backend
FromBeginning}
waitResult <- tryAny handle.wait
finalCount <- readIORef counter
finalCount @?= 2
case waitResult of
Left SomeException
exc -> case SomeException -> Maybe HandlerException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc of
Just (HandlerException{Maybe CorrelationId
Text
UTCTime
SomeException
EventId
StreamId
StreamVersion
originalException :: SomeException
failedEventPosition :: Text
failedEventId :: EventId
failedEventName :: Text
failedEventStreamId :: StreamId
failedEventStreamVersion :: StreamVersion
failedEventCorrelationId :: Maybe CorrelationId
failedEventCreatedAt :: UTCTime
failedEventCorrelationId :: HandlerException -> Maybe CorrelationId
failedEventCreatedAt :: HandlerException -> UTCTime
failedEventId :: HandlerException -> EventId
failedEventName :: HandlerException -> Text
failedEventPosition :: HandlerException -> Text
failedEventStreamId :: HandlerException -> StreamId
failedEventStreamVersion :: HandlerException -> StreamVersion
originalException :: HandlerException -> SomeException
..}) -> do
Text
failedEventName Text -> Text -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
@?= Text
"counter_fail"
SomeException -> TestName
forall a. Show a => a -> TestName
show SomeException
originalException TestName -> TestName -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
@?= TestName
"user error (Test exception from CounterFail handler)"
Maybe HandlerException
Nothing -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Expected HandlerException, got: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ SomeException -> TestName
forall a. Show a => a -> TestName
show SomeException
exc
Right () -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Expected subscription to fail with HandlerException"
testNoStreamCondition :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testNoStreamCondition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testNoStreamCondition BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
result1 <-
insertEvents store Nothing $
singleEvent streamId NoStream (makeUserEvent 1)
case result1 of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"First write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
result2 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream (Int -> SomeLatestEvent
makeUserEvent Int
2)
case result2 of
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Unexpected error: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Second write should have failed"
testStreamExistsCondition :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testStreamExistsCondition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testStreamExistsCondition BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
result1 <-
insertEvents store Nothing $
appendAfterAny streamId (makeUserEvent 1)
case result1 of
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> do
_ <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream (Int -> SomeLatestEvent
makeUserEvent Int
1)
result2 <-
insertEvents store Nothing $
appendAfterAny streamId (makeUserEvent 2)
case result2 of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Second write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Unexpected error: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"First write should have failed"
testExactVersionCondition :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testExactVersionCondition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testExactVersionCondition BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
SuccessfulInsertion (InsertionSuccess{finalCursor = initCursor}) <-
insertEvents store Nothing $
singleEvent streamId NoStream (makeUserEvent 42)
result1 <-
insertEvents store Nothing $
singleEvent streamId StreamExists (makeUserEvent 1)
case result1 of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"First write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
cursor1}) -> do
result2 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
initCursor) (Int -> SomeLatestEvent
makeUserEvent Int
2)
case result2 of
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> do
result3 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursor1) (Int -> SomeLatestEvent
makeUserEvent Int
3)
case result3 of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Third write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Unexpected error: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Second write should have failed"
testConcurrentWrites :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testConcurrentWrites :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testConcurrentWrites BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
result1 <-
insertEvents store Nothing $
singleEvent streamId NoStream (makeUserEvent 1)
case result1 of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Initial write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
cursor}) -> do
(result2, result3) <-
IO (InsertionResult backend)
-> IO (InsertionResult backend)
-> IO (InsertionResult backend, InsertionResult backend)
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m (a, b)
concurrently
( BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursor) (Int -> SomeLatestEvent
makeUserEvent Int
2)
)
( BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursor) (Int -> SomeLatestEvent
makeUserEvent Int
3)
)
case (result2, result3) of
(SuccessfulInsertion InsertionSuccess backend
_, FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_)) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
(FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_), SuccessfulInsertion InsertionSuccess backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
(InsertionResult backend, InsertionResult backend)
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Expected exactly one write to succeed"
testBatchAtomicity :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testBatchAtomicity :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testBatchAtomicity BackendHandle backend
store = do
streamId1 <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
streamId2 <- StreamId <$> UUID.nextRandom
_ <-
insertEvents store Nothing $
singleEvent streamId1 NoStream (makeUserEvent 1)
result <-
insertEvents store Nothing $
fromWrites
[ (streamId1, StreamWrite StreamExists [makeUserEvent 2])
, (streamId2, StreamWrite StreamExists [makeUserEvent 3])
]
case result of
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
InsertionResult backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Batch write should have failed completely"
testMultiStreamConsistency :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testMultiStreamConsistency :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testMultiStreamConsistency BackendHandle backend
store = do
streams@[streamId1, streamId2, streamId3] <- Int -> IO StreamId -> IO [StreamId]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
3 (UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom)
let initWrites =
Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction
( [(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend))
-> [(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall a b. (a -> b) -> a -> b
$
[StreamId]
-> [StreamWrite [] SomeLatestEvent backend]
-> [(StreamId, StreamWrite [] SomeLatestEvent backend)]
forall a b. [a] -> [b] -> [(a, b)]
zip [StreamId]
streams ([StreamWrite [] SomeLatestEvent backend]
-> [(StreamId, StreamWrite [] SomeLatestEvent backend)])
-> [StreamWrite [] SomeLatestEvent backend]
-> [(StreamId, StreamWrite [] SomeLatestEvent backend)]
forall a b. (a -> b) -> a -> b
$
(Int -> StreamWrite [] SomeLatestEvent backend)
-> [Int] -> [StreamWrite [] SomeLatestEvent backend]
forall a b. (a -> b) -> [a] -> [b]
map
(\Int
i -> ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream [Int -> SomeLatestEvent
makeUserEvent Int
i])
[Int
1 ..]
)
result1 <- insertEvents store Nothing initWrites
case result1 of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Initial writes failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
cursor}) -> do
let batch :: Transaction [] backend
batch =
Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction
( [(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList
[ (StreamId
streamId1, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursor) [Int -> SomeLatestEvent
makeUserEvent Int
4])
, (StreamId
streamId2, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream [Int -> SomeLatestEvent
makeUserEvent Int
5])
, (StreamId
streamId3, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
StreamExists [Int -> SomeLatestEvent
makeUserEvent Int
6])
]
)
result2 <- BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing Transaction [] backend
batch
case result2 of
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
InsertionResult backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Mixed version batch should fail"
testExactStreamVersionCondition :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testExactStreamVersionCondition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testExactStreamVersionCondition BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
result1 <-
insertEvents store Nothing $
singleEvent streamId NoStream (makeUserEvent 1)
case result1 of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"First write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
result2 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (StreamVersion -> ExpectedVersion backend
forall backend. StreamVersion -> ExpectedVersion backend
ExactStreamVersion (Int64 -> StreamVersion
StreamVersion Int64
1)) (Int -> SomeLatestEvent
makeUserEvent Int
2)
case result2 of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Second write with correct stream version failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
result3 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (StreamVersion -> ExpectedVersion backend
forall backend. StreamVersion -> ExpectedVersion backend
ExactStreamVersion (Int64 -> StreamVersion
StreamVersion Int64
1)) (Int -> SomeLatestEvent
makeUserEvent Int
3)
case result3 of
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Unexpected error: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Third write should have failed with wrong stream version"
testVersionExpectationRaceCondition :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testVersionExpectationRaceCondition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testVersionExpectationRaceCondition BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
result <-
insertEvents store Nothing $
singleEvent streamId NoStream (makeUserEvent 0)
case result of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Initial write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
cursor}) -> do
results <- [Int]
-> (Int -> IO (Async (InsertionResult backend)))
-> IO [Async (InsertionResult backend)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1 .. Int
10] ((Int -> IO (Async (InsertionResult backend)))
-> IO [Async (InsertionResult backend)])
-> (Int -> IO (Async (InsertionResult backend)))
-> IO [Async (InsertionResult backend)]
forall a b. (a -> b) -> a -> b
$ \Int
i -> IO (InsertionResult backend)
-> IO (Async (InsertionResult backend))
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (IO (InsertionResult backend)
-> IO (Async (InsertionResult backend)))
-> IO (InsertionResult backend)
-> IO (Async (InsertionResult backend))
forall a b. (a -> b) -> a -> b
$ do
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursor) (Int -> SomeLatestEvent
makeUserEvent Int
i)
outcomes <- mapM wait results
let successes = [InsertionResult backend
r | r :: InsertionResult backend
r@(SuccessfulInsertion InsertionSuccess backend
_) <- [InsertionResult backend]
outcomes]
failures = [InsertionResult backend
r | r :: InsertionResult backend
r@(FailedInsertion EventStoreError backend
_) <- [InsertionResult backend]
outcomes]
length successes @?= 1
length failures @?= 9
forM_ failures $ \case
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
InsertionResult backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Expected ConsistencyError for version conflict"
testAnyExpectationConcurrency :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testAnyExpectationConcurrency :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testAnyExpectationConcurrency BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
_ <-
insertEvents store Nothing $
singleEvent streamId NoStream (makeUserEvent 0)
start <- newEmptyMVar
results <- forM [1 .. 20] $ \Int
i -> IO (InsertionResult backend)
-> IO (Async (InsertionResult backend))
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (IO (InsertionResult backend)
-> IO (Async (InsertionResult backend)))
-> IO (InsertionResult backend)
-> IO (Async (InsertionResult backend))
forall a b. (a -> b) -> a -> b
$ do
MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
start
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId ExpectedVersion backend
forall backend. ExpectedVersion backend
Any (Int -> SomeLatestEvent
makeUserEvent Int
i)
replicateM_ 20 (putMVar start ())
outcomes <- mapM wait results
let successes = [InsertionResult backend
r | r :: InsertionResult backend
r@(SuccessfulInsertion InsertionSuccess backend
_) <- [InsertionResult backend]
outcomes]
length successes @?= 20
testMixedVersionExpectations :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testMixedVersionExpectations :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testMixedVersionExpectations BackendHandle backend
store = do
streams <- Int -> IO StreamId -> IO [StreamId]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
5 (UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom)
case streams of
[StreamId
s1, StreamId
s2, StreamId
s3, StreamId
s4, StreamId
s5] -> do
_ <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction
( [(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList
[ (StreamId
s1, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream [Int -> SomeLatestEvent
makeUserEvent Int
1])
, (StreamId
s3, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream [Int -> SomeLatestEvent
makeUserEvent Int
3])
]
)
result <-
insertEvents store Nothing $
Transaction
( Map.fromList
[ (s1, StreamWrite StreamExists [makeUserEvent 11])
, (s2, StreamWrite NoStream [makeUserEvent 12])
, (s3, StreamWrite (ExactStreamVersion (StreamVersion 2)) [makeUserEvent 13])
, (s4, StreamWrite Any [makeUserEvent 14])
, (s5, StreamWrite StreamExists [makeUserEvent 15])
]
)
case result of
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
InsertionResult backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Batch with mixed expectations should fail if any expectation fails"
[StreamId]
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Expected exactly 5 streams"
testCascadingVersionDependencies :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (EventStoreError backend)) => BackendHandle backend -> IO ()
testCascadingVersionDependencies :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (EventStoreError backend)) =>
BackendHandle backend -> IO ()
testCascadingVersionDependencies BackendHandle backend
store = do
streams <- Int -> IO StreamId -> IO [StreamId]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
5 (UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom)
cursors <- forM (zip [0 ..] streams) $ \(Int
i, StreamId
stream) -> do
result <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
stream ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream (Int -> SomeLatestEvent
makeUserEvent Int
i)
case result of
SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
cursor}) -> (StreamId, Cursor backend) -> IO (StreamId, Cursor backend)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StreamId
stream, Cursor backend
cursor)
FailedInsertion EventStoreError backend
err -> TestName -> IO (StreamId, Cursor backend)
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO (StreamId, Cursor backend))
-> TestName -> IO (StreamId, Cursor backend)
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to create dependency chain: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
case cursors of
[(StreamId
_, Cursor backend
_), (StreamId
_, Cursor backend
_), (StreamId
s3, Cursor backend
c3), (StreamId
_, Cursor backend
_), (StreamId
_, Cursor backend
_)] -> do
result1 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
s3 (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
c3) (Int -> SomeLatestEvent
makeUserEvent Int
33)
case result1 of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to update middle stream: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
result2 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
s3 (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
c3) (Int -> SomeLatestEvent
makeUserEvent Int
333)
case result2 of
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
InsertionResult backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Should not be able to use old cursor after update"
[(StreamId, Cursor backend)]
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Expected exactly 5 cursors"
testMultiStreamHeadConsistency :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testMultiStreamHeadConsistency :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testMultiStreamHeadConsistency BackendHandle backend
store = do
streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
streamB <- StreamId <$> UUID.nextRandom
streamC <- StreamId <$> UUID.nextRandom
result <-
insertEvents store Nothing $
Transaction
( Map.fromList
[ (streamA, StreamWrite NoStream [makeUserEvent 1, makeUserEvent 2])
, (streamB, StreamWrite NoStream [makeUserEvent 10, makeUserEvent 20, makeUserEvent 30])
, (streamC, StreamWrite NoStream [makeUserEvent 100])
]
)
case result of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert multi-stream batch: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion{} -> do
resultA <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamA ExpectedVersion backend
forall backend. ExpectedVersion backend
StreamExists (Int -> SomeLatestEvent
makeUserEvent Int
3)
case resultA of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to append to stream A (stream heads may be corrupted): " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
resultB <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamB ExpectedVersion backend
forall backend. ExpectedVersion backend
StreamExists (Int -> SomeLatestEvent
makeUserEvent Int
40)
case resultB of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to append to stream B (stream heads may be corrupted): " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
resultC <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamC ExpectedVersion backend
forall backend. ExpectedVersion backend
StreamExists (Int -> SomeLatestEvent
makeUserEvent Int
200)
case resultC of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to append to stream C (stream heads may be corrupted): " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
testEmptyBatchInsertion :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testEmptyBatchInsertion :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testEmptyBatchInsertion BackendHandle backend
store = do
result <- BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction (Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. Map k a
Map.empty :: Map.Map StreamId (StreamWrite [] SomeLatestEvent backend)))
case result of
FailedInsertion EventStoreError backend
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
SuccessfulInsertion InsertionSuccess backend
_ -> do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
result2 <-
insertEvents store Nothing $
appendToOrCreateStream streamId (makeUserEvent 1)
case result2 of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed follow-up insert after empty batch: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
testMixedEmptyStreams :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testMixedEmptyStreams :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testMixedEmptyStreams BackendHandle backend
store = do
streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
streamB <- StreamId <$> UUID.nextRandom
streamC <- StreamId <$> UUID.nextRandom
result <-
insertEvents store Nothing $
Transaction
( Map.fromList
[ (streamA, StreamWrite NoStream [makeUserEvent 1, makeUserEvent 2])
, (streamB, StreamWrite NoStream [])
, (streamC, StreamWrite NoStream [makeUserEvent 100])
]
)
case result of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert mixed empty/non-empty batch: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
resultB <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamB ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream (Int -> SomeLatestEvent
makeUserEvent Int
10)
case resultB of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Stream B should not exist yet, but got error: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
resultA <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamA ExpectedVersion backend
forall backend. ExpectedVersion backend
StreamExists (Int -> SomeLatestEvent
makeUserEvent Int
3)
resultC <-
insertEvents store Nothing $
singleEvent streamC StreamExists (makeUserEvent 200)
case (resultA, resultC) of
(SuccessfulInsertion InsertionSuccess backend
_, SuccessfulInsertion InsertionSuccess backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
(InsertionResult backend, InsertionResult backend)
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Streams A and C should exist after mixed batch"
testPerStreamCursorExtraction :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
BackendHandle backend
store = do
streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
streamB <- StreamId <$> UUID.nextRandom
streamC <- StreamId <$> UUID.nextRandom
result1 <-
insertEvents store Nothing $
Transaction
( Map.fromList
[ (streamA, StreamWrite NoStream [makeUserEvent 1])
, (streamB, StreamWrite NoStream [makeUserEvent 10])
, (streamC, StreamWrite NoStream [makeUserEvent 100])
]
)
case result1 of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Initial multi-stream write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion (InsertionSuccess{Map StreamId (Cursor backend)
streamCursors :: Map StreamId (Cursor backend)
streamCursors :: forall backend.
InsertionSuccess backend -> Map StreamId (Cursor backend)
streamCursors}) -> do
case StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamA Map StreamId (Cursor backend)
streamCursors of
Maybe (Cursor backend)
Nothing -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Stream A cursor missing from streamCursors"
Just Cursor backend
cursorA -> do
result2 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamA (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorA) (Int -> SomeLatestEvent
makeUserEvent Int
2)
case result2 of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Append with stream A cursor failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
result3 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamA (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorA) (Int -> SomeLatestEvent
makeUserEvent Int
3)
case result3 of
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
InsertionResult backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Should not be able to reuse stale cursor"
testCursorIndependence :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testCursorIndependence :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testCursorIndependence BackendHandle backend
store = do
streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
streamB <- StreamId <$> UUID.nextRandom
result1 <-
insertEvents store Nothing $
Transaction
( Map.fromList
[ (streamA, StreamWrite NoStream [makeUserEvent 1])
, (streamB, StreamWrite NoStream [makeUserEvent 10])
]
)
case result1 of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Initial write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion (InsertionSuccess{Map StreamId (Cursor backend)
streamCursors :: forall backend.
InsertionSuccess backend -> Map StreamId (Cursor backend)
streamCursors :: Map StreamId (Cursor backend)
streamCursors}) -> do
case (StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamA Map StreamId (Cursor backend)
streamCursors, StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamB Map StreamId (Cursor backend)
streamCursors) of
(Just Cursor backend
cursorA, Just Cursor backend
cursorB) -> do
result2 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamA (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorA) (Int -> SomeLatestEvent
makeUserEvent Int
2)
case result2 of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Stream A update failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
result3 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamB (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorB) (Int -> SomeLatestEvent
makeUserEvent Int
11)
case result3 of
FailedInsertion EventStoreError backend
err ->
TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Stream B cursor should still be valid after stream A update: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
(Maybe (Cursor backend), Maybe (Cursor backend))
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Missing cursors from initial transaction"
testStaleCursorPerStream :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testStaleCursorPerStream :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testStaleCursorPerStream BackendHandle backend
store = do
streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
streamB <- StreamId <$> UUID.nextRandom
result1 <-
insertEvents store Nothing $
Transaction
( Map.fromList
[ (streamA, StreamWrite NoStream [makeUserEvent 1])
, (streamB, StreamWrite NoStream [makeUserEvent 10])
]
)
case result1 of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Initial write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion (InsertionSuccess{Map StreamId (Cursor backend)
streamCursors :: forall backend.
InsertionSuccess backend -> Map StreamId (Cursor backend)
streamCursors :: Map StreamId (Cursor backend)
streamCursors}) -> do
case (StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamA Map StreamId (Cursor backend)
streamCursors, StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamB Map StreamId (Cursor backend)
streamCursors) of
(Just Cursor backend
cursorA, Just Cursor backend
cursorB) -> do
result2 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamA (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorA) (Int -> SomeLatestEvent
makeUserEvent Int
2)
case result2 of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Stream A update failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
result3 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction
( [(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList
[ (StreamId
streamA, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorA) [Int -> SomeLatestEvent
makeUserEvent Int
3])
, (StreamId
streamB, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorB) [Int -> SomeLatestEvent
makeUserEvent Int
11])
]
)
case result3 of
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
InsertionResult backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Transaction with stale cursor should fail atomically"
(Maybe (Cursor backend), Maybe (Cursor backend))
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Missing cursors from initial transaction"
testCursorCompleteness :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend), Ord (Cursor backend)) => BackendHandle backend -> IO ()
testCursorCompleteness :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend), Ord (Cursor backend)) =>
BackendHandle backend -> IO ()
testCursorCompleteness BackendHandle backend
store = do
streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
streamB <- StreamId <$> UUID.nextRandom
streamC <- StreamId <$> UUID.nextRandom
result <-
insertEvents store Nothing $
Transaction
( Map.fromList
[ (streamA, StreamWrite NoStream [makeUserEvent 1, makeUserEvent 2])
, (streamB, StreamWrite NoStream [makeUserEvent 10])
, (streamC, StreamWrite NoStream [makeUserEvent 100, makeUserEvent 101, makeUserEvent 102])
]
)
case result of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Multi-stream write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion (InsertionSuccess{Cursor backend
finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor :: Cursor backend
finalCursor, Map StreamId (Cursor backend)
streamCursors :: forall backend.
InsertionSuccess backend -> Map StreamId (Cursor backend)
streamCursors :: Map StreamId (Cursor backend)
streamCursors}) -> do
Map StreamId (Cursor backend) -> Int
forall k a. Map k a -> Int
Map.size Map StreamId (Cursor backend)
streamCursors Int -> Int -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
@?= Int
3
HasCallStack => TestName -> Bool -> IO ()
TestName -> Bool -> IO ()
assertBool TestName
"Stream A missing from cursors" (StreamId -> Map StreamId (Cursor backend) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member StreamId
streamA Map StreamId (Cursor backend)
streamCursors)
HasCallStack => TestName -> Bool -> IO ()
TestName -> Bool -> IO ()
assertBool TestName
"Stream B missing from cursors" (StreamId -> Map StreamId (Cursor backend) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member StreamId
streamB Map StreamId (Cursor backend)
streamCursors)
HasCallStack => TestName -> Bool -> IO ()
TestName -> Bool -> IO ()
assertBool TestName
"Stream C missing from cursors" (StreamId -> Map StreamId (Cursor backend) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member StreamId
streamC Map StreamId (Cursor backend)
streamCursors)
[(StreamId, Cursor backend)]
-> ((StreamId, Cursor backend) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map StreamId (Cursor backend) -> [(StreamId, Cursor backend)]
forall k a. Map k a -> [(k, a)]
Map.toList Map StreamId (Cursor backend)
streamCursors) (((StreamId, Cursor backend) -> IO ()) -> IO ())
-> ((StreamId, Cursor backend) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(StreamId
sid, Cursor backend
cursor) ->
HasCallStack => TestName -> Bool -> IO ()
TestName -> Bool -> IO ()
assertBool
(TestName
"Cursor for " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ StreamId -> TestName
forall a. Show a => a -> TestName
show StreamId
sid TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ TestName
" violates ordering invariant")
(Cursor backend
cursor Cursor backend -> Cursor backend -> Bool
forall a. Ord a => a -> a -> Bool
<= Cursor backend
finalCursor)
case (StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamA Map StreamId (Cursor backend)
streamCursors, StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamB Map StreamId (Cursor backend)
streamCursors, StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamC Map StreamId (Cursor backend)
streamCursors) of
(Just Cursor backend
cursorA, Just Cursor backend
cursorB, Just Cursor backend
cursorC) -> do
resultA <- BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$ StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamA (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorA) (Int -> SomeLatestEvent
makeUserEvent Int
3)
resultB <- insertEvents store Nothing $ singleEvent streamB (ExactVersion cursorB) (makeUserEvent 11)
resultC <- insertEvents store Nothing $ singleEvent streamC (ExactVersion cursorC) (makeUserEvent 103)
case (resultA, resultB, resultC) of
(SuccessfulInsertion InsertionSuccess backend
_, SuccessfulInsertion InsertionSuccess backend
_, SuccessfulInsertion InsertionSuccess backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
(InsertionResult backend, InsertionResult backend,
InsertionResult backend)
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"One or more stream cursors were not usable"
(Maybe (Cursor backend), Maybe (Cursor backend),
Maybe (Cursor backend))
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Failed to extract all cursors from map"
testEmptyStreamCursorHandling :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testEmptyStreamCursorHandling :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testEmptyStreamCursorHandling BackendHandle backend
store = do
streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
streamB <- StreamId <$> UUID.nextRandom
streamC <- StreamId <$> UUID.nextRandom
result <-
insertEvents store Nothing $
Transaction
( Map.fromList
[ (streamA, StreamWrite NoStream [makeUserEvent 1])
, (streamB, StreamWrite NoStream [])
, (streamC, StreamWrite NoStream [makeUserEvent 100])
]
)
case result of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Multi-stream with empty stream failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion (InsertionSuccess{Map StreamId (Cursor backend)
streamCursors :: forall backend.
InsertionSuccess backend -> Map StreamId (Cursor backend)
streamCursors :: Map StreamId (Cursor backend)
streamCursors}) -> do
HasCallStack => TestName -> Bool -> IO ()
TestName -> Bool -> IO ()
assertBool TestName
"Stream A should have cursor (has events)" (StreamId -> Map StreamId (Cursor backend) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member StreamId
streamA Map StreamId (Cursor backend)
streamCursors)
HasCallStack => TestName -> Bool -> IO ()
TestName -> Bool -> IO ()
assertBool TestName
"Stream C should have cursor (has events)" (StreamId -> Map StreamId (Cursor backend) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member StreamId
streamC Map StreamId (Cursor backend)
streamCursors)
case StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamB Map StreamId (Cursor backend)
streamCursors of
Maybe (Cursor backend)
Nothing -> do
resultB <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamB ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream (Int -> SomeLatestEvent
makeUserEvent Int
10)
case resultB of
FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Stream B should not exist after empty write: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just Cursor backend
cursorB -> do
resultB <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamB (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorB) (Int -> SomeLatestEvent
makeUserEvent Int
10)
case resultB of
FailedInsertion EventStoreError backend
err ->
TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"If empty stream gets cursor, it should be usable: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
testStreamVersionsStartAt1 :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testStreamVersionsStartAt1 :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testStreamVersionsStartAt1 BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
receivedVersions <- newIORef []
completionVar <- newEmptyMVar
let collectVersions IORef [a]
ref r
event = do
IORef [a] -> ([a] -> ([a], ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [a]
ref (\[a]
versions -> (r
event.streamVersion a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
versions, ()))
SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
_ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite NoStream [makeUserEvent 1, makeTombstone])]))
handle <-
subscribe
store
( match UserCreated (collectVersions receivedVersions)
:? match Tombstone (handleTombstone completionVar)
:? MatchEnd
)
EventSelector{streamId = SingleStream streamId, startupPosition = FromBeginning}
takeMVar completionVar
handle.cancel
versions <- reverse <$> readIORef receivedVersions
versions @?= [StreamVersion 1]
testStreamVersionsContiguous :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testStreamVersionsContiguous :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testStreamVersionsContiguous BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
receivedVersions <- newIORef []
completionVar <- newEmptyMVar
let collectVersions IORef [a]
ref r
event = do
IORef [a] -> ([a] -> ([a], ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [a]
ref (\[a]
versions -> (r
event.streamVersion a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
versions, ()))
SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
_ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite NoStream [makeUserEvent 1, makeUserEvent 2])]))
_ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite StreamExists [makeUserEvent 3])]))
_ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite StreamExists [makeUserEvent 4, makeUserEvent 5, makeTombstone])]))
handle <-
subscribe
store
( match UserCreated (collectVersions receivedVersions)
:? match Tombstone (handleTombstone completionVar)
:? MatchEnd
)
EventSelector{streamId = SingleStream streamId, startupPosition = FromBeginning}
takeMVar completionVar
handle.cancel
versions <- reverse <$> readIORef receivedVersions
versions @?= [StreamVersion 1, StreamVersion 2, StreamVersion 3, StreamVersion 4, StreamVersion 5]
testStreamVersionExposedInSubscription :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testStreamVersionExposedInSubscription :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testStreamVersionExposedInSubscription BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
receivedVersionRef <- newIORef Nothing
completionVar <- newEmptyMVar
let captureVersion IORef (Maybe a)
ref r
event = do
IORef (Maybe a) -> (Maybe a -> (Maybe a, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Maybe a)
ref (\Maybe a
_ -> (a -> Maybe a
forall a. a -> Maybe a
Just r
event.streamVersion, ()))
SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
_ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite NoStream [makeUserEvent 1, makeTombstone])]))
handle <-
subscribe
store
( match UserCreated (captureVersion receivedVersionRef)
:? match Tombstone (handleTombstone completionVar)
:? MatchEnd
)
EventSelector{streamId = SingleStream streamId, startupPosition = FromBeginning}
takeMVar completionVar
handle.cancel
mbVersion <- readIORef receivedVersionRef
mbVersion @?= Just (StreamVersion 1)
testIndependentStreamVersions :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testIndependentStreamVersions :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testIndependentStreamVersions BackendHandle backend
store = do
streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
streamB <- StreamId <$> UUID.nextRandom
streamC <- StreamId <$> UUID.nextRandom
versionsA <- newIORef []
versionsB <- newIORef []
versionsC <- newIORef []
completionVar <- newEmptyMVar
let collectVersionsFor IORef [a]
ref r
event = do
IORef [a] -> ([a] -> ([a], ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [a]
ref (\[a]
versions -> (r
event.streamVersion a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
versions, ()))
SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
_ <-
insertEvents store Nothing $
Transaction
( Map.fromList
[ (streamA, StreamWrite NoStream [makeUserEvent 1, makeUserEvent 2])
, (streamB, StreamWrite NoStream [makeUserEvent 10])
, (streamC, StreamWrite NoStream [makeUserEvent 100, makeUserEvent 101, makeUserEvent 102])
]
)
_ <-
insertEvents store Nothing $
Transaction
( Map.fromList
[ (streamA, StreamWrite StreamExists [makeUserEvent 3])
, (streamB, StreamWrite StreamExists [makeUserEvent 11, makeUserEvent 12])
]
)
_ <-
insertEvents store Nothing $
Transaction
( Map.fromList
[ (streamA, StreamWrite StreamExists [makeTombstone])
, (streamB, StreamWrite StreamExists [makeTombstone])
, (streamC, StreamWrite StreamExists [makeTombstone])
]
)
remainingTombstones <- newIORef (3 :: Int)
let handleTombstones EventEnvelope Tombstone backend
_ = do
remaining <- IORef Int -> (Int -> (Int, Int)) -> IO Int
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
remainingTombstones (\Int
n -> (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1))
if remaining == 0
then putMVar completionVar () >> pure Stop
else pure Continue
handle <-
subscribe
store
( match
UserCreated
( \EventEnvelope UserCreated backend
event -> do
case EventEnvelope UserCreated backend
event.streamId of
StreamId
sid
| StreamId
sid StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
streamA -> IORef [StreamVersion] -> EventHandler UserCreated IO backend
forall {r} {a}.
HasField "streamVersion" r a =>
IORef [a] -> r -> IO SubscriptionResult
collectVersionsFor IORef [StreamVersion]
versionsA EventEnvelope UserCreated backend
event
| StreamId
sid StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
streamB -> IORef [StreamVersion] -> EventHandler UserCreated IO backend
forall {r} {a}.
HasField "streamVersion" r a =>
IORef [a] -> r -> IO SubscriptionResult
collectVersionsFor IORef [StreamVersion]
versionsB EventEnvelope UserCreated backend
event
| StreamId
sid StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
streamC -> IORef [StreamVersion] -> EventHandler UserCreated IO backend
forall {r} {a}.
HasField "streamVersion" r a =>
IORef [a] -> r -> IO SubscriptionResult
collectVersionsFor IORef [StreamVersion]
versionsC EventEnvelope UserCreated backend
event
| Bool
otherwise -> SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
)
:? match Tombstone handleTombstones
:? MatchEnd
)
EventSelector{streamId = AllStreams, startupPosition = FromBeginning}
takeMVar completionVar
handle.cancel
vA <- reverse <$> readIORef versionsA
vB <- reverse <$> readIORef versionsB
vC <- reverse <$> readIORef versionsC
vA @?= [StreamVersion 1, StreamVersion 2, StreamVersion 3]
vB @?= [StreamVersion 1, StreamVersion 2, StreamVersion 3]
vC @?= [StreamVersion 1, StreamVersion 2, StreamVersion 3]
testMultiInstanceSubscription ::
forall backend.
(EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) =>
[BackendHandle backend] ->
IO ()
testMultiInstanceSubscription :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
[BackendHandle backend] -> IO ()
testMultiInstanceSubscription [BackendHandle backend]
stores = do
case [BackendHandle backend]
stores of
[] -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Expected at least 1 store handle"
(BackendHandle backend
writerStore : [BackendHandle backend]
subscriberStores) -> do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
completionVars <- replicateM (length subscriberStores) newEmptyMVar
receivedEventsRefs <- replicateM (length subscriberStores) (newIORef [])
subscriptionHandles <- forM (zip3 subscriberStores completionVars receivedEventsRefs) $
\(BackendHandle backend
store, MVar ()
completionVar, IORef [EventEnvelope UserCreated backend]
eventsRef) -> do
BackendHandle backend
-> EventMatcher '[UserCreated, Tombstone] backend IO
-> EventSelector backend
-> IO (SubscriptionHandle backend)
forall backend (m :: * -> *) (ts :: [Symbol]).
(EventStore backend, StoreConstraints backend m) =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints backend m =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribe
BackendHandle backend
store
( EventHandler UserCreated IO backend
-> (Proxy UserCreated, EventHandler UserCreated IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy UserCreated, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match UserCreated (IORef [EventEnvelope UserCreated backend]
-> EventHandler UserCreated IO backend
forall backend.
IORef [EventEnvelope UserCreated backend]
-> EventHandler UserCreated IO backend
collectEventsUntilTombstone IORef [EventEnvelope UserCreated backend]
eventsRef)
(Proxy UserCreated, EventHandler UserCreated IO backend)
-> EventMatcher '[Tombstone] backend IO
-> EventMatcher '[UserCreated, Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventHandler Tombstone IO backend
-> (Proxy Tombstone, EventHandler Tombstone IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy Tombstone, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match Tombstone (MVar () -> EventHandler Tombstone IO backend
forall backend. MVar () -> EventHandler Tombstone IO backend
handleTombstone MVar ()
completionVar)
(Proxy Tombstone, EventHandler Tombstone IO backend)
-> EventMatcher '[] backend IO
-> EventMatcher '[Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventMatcher '[] backend IO
forall backend (m :: * -> *). EventMatcher '[] backend m
MatchEnd
)
EventSelector{streamId :: StreamSelector
streamId = StreamSelector
AllStreams, startupPosition :: StartupPosition backend
startupPosition = StartupPosition backend
forall backend. StartupPosition backend
FromBeginning}
let testEvents = (Int -> SomeLatestEvent) -> [Int] -> [SomeLatestEvent]
forall a b. (a -> b) -> [a] -> [b]
map Int -> SomeLatestEvent
makeUserEvent [Int
1 .. Int
5] [SomeLatestEvent] -> [SomeLatestEvent] -> [SomeLatestEvent]
forall a. [a] -> [a] -> [a]
++ [SomeLatestEvent
makeTombstone]
result <-
insertEvents
writerStore
Nothing
(Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))
case result of
FailedInsertion EventStoreError backend
err -> do
(SubscriptionHandle backend -> IO ())
-> [SubscriptionHandle backend] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (.cancel) [SubscriptionHandle backend]
subscriptionHandles
TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert events: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
[MVar ()] -> (MVar () -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [MVar ()]
completionVars MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar
(SubscriptionHandle backend -> IO ())
-> [SubscriptionHandle backend] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (.cancel) [SubscriptionHandle backend]
subscriptionHandles
allReceivedEvents <- (IORef [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend])
-> [IORef [EventEnvelope UserCreated backend]]
-> IO [[EventEnvelope UserCreated backend]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (([EventEnvelope UserCreated backend]
-> [EventEnvelope UserCreated backend])
-> IO [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [EventEnvelope UserCreated backend]
-> [EventEnvelope UserCreated backend]
forall a. [a] -> [a]
reverse (IO [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend])
-> (IORef [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend])
-> IORef [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IORef [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend]
forall a. IORef a -> IO a
readIORef) [IORef [EventEnvelope UserCreated backend]]
receivedEventsRefs
forM_ allReceivedEvents $ \[EventEnvelope UserCreated backend]
events -> do
[EventEnvelope UserCreated backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [EventEnvelope UserCreated backend]
events Int -> Int -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
@?= Int
5
let userInfos :: [UserInformation2]
userInfos = (EventEnvelope UserCreated backend -> Maybe UserInformation2)
-> [EventEnvelope UserCreated backend] -> [UserInformation2]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe EventEnvelope UserCreated backend -> Maybe UserInformation2
forall backend.
EventEnvelope UserCreated backend -> Maybe UserInformation2
extractUserInfo [EventEnvelope UserCreated backend]
events
[UserInformation2] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [UserInformation2]
userInfos Int -> Int -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
@?= Int
5
let userNames :: [Text]
userNames :: [Text]
userNames = (UserInformation2 -> Text) -> [UserInformation2] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map UserInformation2 -> Text
userName [UserInformation2]
userInfos
[Text]
userNames [Text] -> [Text] -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
@?= [Text
"user1", Text
"user2", Text
"user3", Text
"user4", Text
"user5"]
case allReceivedEvents of
[] -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
([EventEnvelope UserCreated backend]
firstEvents : [[EventEnvelope UserCreated backend]]
restEvents) ->
[[EventEnvelope UserCreated backend]]
-> ([EventEnvelope UserCreated backend] -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [[EventEnvelope UserCreated backend]]
restEvents (([EventEnvelope UserCreated backend] -> IO ()) -> IO ())
-> ([EventEnvelope UserCreated backend] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[EventEnvelope UserCreated backend]
events ->
[EventEnvelope UserCreated backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [EventEnvelope UserCreated backend]
events Int -> Int -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
@?= [EventEnvelope UserCreated backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [EventEnvelope UserCreated backend]
firstEvents