{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
module Test.Hindsight.Store.ConsistencyTests (consistencyTests) where
import Control.Concurrent (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Monad (forM, forM_, replicateM, replicateM_)
import Data.Map.Strict qualified as Map
import Data.UUID.V4 qualified as UUID
import Hindsight.Events (SomeLatestEvent)
import Hindsight.Store
import Test.Hindsight.Examples (makeUserEvent)
import Test.Hindsight.Store.TestRunner (EventStoreTestRunner (..))
import Test.Tasty
import Test.Tasty.HUnit
import UnliftIO.Async (async, concurrently, wait)
consistencyTests ::
forall backend.
(EventStore backend, StoreConstraints backend IO, Show (Cursor backend), Show (EventStoreError backend)) =>
EventStoreTestRunner backend ->
[TestTree]
consistencyTests :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend), Show (EventStoreError backend)) =>
EventStoreTestRunner backend -> [TestTree]
consistencyTests EventStoreTestRunner backend
runner =
[ TestName -> Assertion -> TestTree
testCase TestName
"No Stream Condition" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testNoStreamCondition
, TestName -> Assertion -> TestTree
testCase TestName
"Stream Exists Condition" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testStreamExistsCondition
, TestName -> Assertion -> TestTree
testCase TestName
"Exact Version Condition" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testExactVersionCondition
, TestName -> Assertion -> TestTree
testCase TestName
"Exact Stream Version Condition" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testExactStreamVersionCondition
, TestName -> Assertion -> TestTree
testCase TestName
"Concurrent Writes" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testConcurrentWrites
, TestName -> Assertion -> TestTree
testCase TestName
"Batch Atomicity" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testBatchAtomicity
, TestName -> Assertion -> TestTree
testCase TestName
"Multi-Stream Consistency" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testMultiStreamConsistency
, TestName -> Assertion -> TestTree
testCase TestName
"Version Expectation Race Condition" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testVersionExpectationRaceCondition
, TestName -> Assertion -> TestTree
testCase TestName
"Any Expectation Concurrency" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testAnyExpectationConcurrency
, TestName -> Assertion -> TestTree
testCase TestName
"Mixed Version Expectations" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testMixedVersionExpectations
, TestName -> Assertion -> TestTree
testCase TestName
"Cascading Version Dependencies" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (EventStoreError backend)) =>
BackendHandle backend -> Assertion
testCascadingVersionDependencies
, TestName -> Assertion -> TestTree
testCase TestName
"Multi-Stream Head Consistency" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testMultiStreamHeadConsistency
, TestName -> Assertion -> TestTree
testCase TestName
"Empty Batch Insertion" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testEmptyBatchInsertion
, TestName -> Assertion -> TestTree
testCase TestName
"Mixed Empty and Non-Empty Streams" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testMixedEmptyStreams
]
testNoStreamCondition :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testNoStreamCondition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testNoStreamCondition BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
result1 <-
insertEvents store Nothing $
singleEvent streamId NoStream (makeUserEvent 1)
case result1 of
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"First write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
result2 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream (Int -> SomeLatestEvent
makeUserEvent Int
2)
case result2 of
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Unexpected error: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Second write should have failed"
testStreamExistsCondition :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testStreamExistsCondition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testStreamExistsCondition BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
result1 <-
insertEvents store Nothing $
appendAfterAny streamId (makeUserEvent 1)
case result1 of
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> do
_ <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream (Int -> SomeLatestEvent
makeUserEvent Int
1)
result2 <-
insertEvents store Nothing $
appendAfterAny streamId (makeUserEvent 2)
case result2 of
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Second write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Unexpected error: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"First write should have failed"
testExactVersionCondition :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testExactVersionCondition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testExactVersionCondition BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
SuccessfulInsertion (InsertionSuccess{finalCursor = initCursor}) <-
insertEvents store Nothing $
singleEvent streamId NoStream (makeUserEvent 42)
result1 <-
insertEvents store Nothing $
singleEvent streamId StreamExists (makeUserEvent 1)
case result1 of
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"First write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
cursor1}) -> do
result2 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
initCursor) (Int -> SomeLatestEvent
makeUserEvent Int
2)
case result2 of
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> do
result3 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursor1) (Int -> SomeLatestEvent
makeUserEvent Int
3)
case result3 of
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Third write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Unexpected error: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Second write should have failed"
testExactStreamVersionCondition :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testExactStreamVersionCondition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testExactStreamVersionCondition BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
result1 <-
insertEvents store Nothing $
singleEvent streamId NoStream (makeUserEvent 1)
case result1 of
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"First write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
result2 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (StreamVersion -> ExpectedVersion backend
forall backend. StreamVersion -> ExpectedVersion backend
ExactStreamVersion (Int64 -> StreamVersion
StreamVersion Int64
1)) (Int -> SomeLatestEvent
makeUserEvent Int
2)
case result2 of
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Second write with correct stream version failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
result3 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (StreamVersion -> ExpectedVersion backend
forall backend. StreamVersion -> ExpectedVersion backend
ExactStreamVersion (Int64 -> StreamVersion
StreamVersion Int64
1)) (Int -> SomeLatestEvent
makeUserEvent Int
3)
case result3 of
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Unexpected error: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Third write should have failed with wrong stream version"
testConcurrentWrites :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testConcurrentWrites :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testConcurrentWrites BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
result1 <-
insertEvents store Nothing $
singleEvent streamId NoStream (makeUserEvent 1)
case result1 of
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Initial write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
cursor}) -> do
(result2, result3) <-
IO (InsertionResult backend)
-> IO (InsertionResult backend)
-> IO (InsertionResult backend, InsertionResult backend)
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m (a, b)
concurrently
( BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursor) (Int -> SomeLatestEvent
makeUserEvent Int
2)
)
( BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursor) (Int -> SomeLatestEvent
makeUserEvent Int
3)
)
case (result2, result3) of
(SuccessfulInsertion InsertionSuccess backend
_, FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_)) -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
(FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_), SuccessfulInsertion InsertionSuccess backend
_) -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
(InsertionResult backend, InsertionResult backend)
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Expected exactly one write to succeed"
testBatchAtomicity :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testBatchAtomicity :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testBatchAtomicity BackendHandle backend
store = do
streamId1 <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
streamId2 <- StreamId <$> UUID.nextRandom
_ <-
insertEvents store Nothing $
singleEvent streamId1 NoStream (makeUserEvent 1)
result <-
insertEvents store Nothing $
fromWrites
[ (streamId1, StreamWrite StreamExists [makeUserEvent 2])
, (streamId2, StreamWrite StreamExists [makeUserEvent 3])
]
case result of
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
InsertionResult backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Batch write should have failed completely"
testMultiStreamConsistency :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testMultiStreamConsistency :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testMultiStreamConsistency BackendHandle backend
store = do
streams@[streamId1, streamId2, streamId3] <- Int -> IO StreamId -> IO [StreamId]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
3 (UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom)
let initWrites =
Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction
( [(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend))
-> [(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall a b. (a -> b) -> a -> b
$
[StreamId]
-> [StreamWrite [] SomeLatestEvent backend]
-> [(StreamId, StreamWrite [] SomeLatestEvent backend)]
forall a b. [a] -> [b] -> [(a, b)]
zip [StreamId]
streams ([StreamWrite [] SomeLatestEvent backend]
-> [(StreamId, StreamWrite [] SomeLatestEvent backend)])
-> [StreamWrite [] SomeLatestEvent backend]
-> [(StreamId, StreamWrite [] SomeLatestEvent backend)]
forall a b. (a -> b) -> a -> b
$
(Int -> StreamWrite [] SomeLatestEvent backend)
-> [Int] -> [StreamWrite [] SomeLatestEvent backend]
forall a b. (a -> b) -> [a] -> [b]
map
(\Int
i -> ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream [Int -> SomeLatestEvent
makeUserEvent Int
i])
[Int
1 ..]
)
result1 <- insertEvents store Nothing initWrites
case result1 of
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Initial writes failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
cursor}) -> do
let batch :: Transaction [] backend
batch =
Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction
( [(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList
[ (StreamId
streamId1, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursor) [Int -> SomeLatestEvent
makeUserEvent Int
4])
, (StreamId
streamId2, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream [Int -> SomeLatestEvent
makeUserEvent Int
5])
, (StreamId
streamId3, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
StreamExists [Int -> SomeLatestEvent
makeUserEvent Int
6])
]
)
result2 <- BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing Transaction [] backend
batch
case result2 of
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
InsertionResult backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Mixed version batch should fail"
testVersionExpectationRaceCondition :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testVersionExpectationRaceCondition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testVersionExpectationRaceCondition BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
result <-
insertEvents store Nothing $
singleEvent streamId NoStream (makeUserEvent 0)
case result of
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Initial write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
cursor}) -> do
results <- [Int]
-> (Int -> IO (Async (InsertionResult backend)))
-> IO [Async (InsertionResult backend)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1 .. Int
10] ((Int -> IO (Async (InsertionResult backend)))
-> IO [Async (InsertionResult backend)])
-> (Int -> IO (Async (InsertionResult backend)))
-> IO [Async (InsertionResult backend)]
forall a b. (a -> b) -> a -> b
$ \Int
i -> IO (InsertionResult backend)
-> IO (Async (InsertionResult backend))
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (IO (InsertionResult backend)
-> IO (Async (InsertionResult backend)))
-> IO (InsertionResult backend)
-> IO (Async (InsertionResult backend))
forall a b. (a -> b) -> a -> b
$ do
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursor) (Int -> SomeLatestEvent
makeUserEvent Int
i)
outcomes <- mapM wait results
let successes = [InsertionResult backend
r | r :: InsertionResult backend
r@(SuccessfulInsertion InsertionSuccess backend
_) <- [InsertionResult backend]
outcomes]
failures = [InsertionResult backend
r | r :: InsertionResult backend
r@(FailedInsertion EventStoreError backend
_) <- [InsertionResult backend]
outcomes]
length successes @?= 1
length failures @?= 9
forM_ failures $ \case
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
InsertionResult backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Expected ConsistencyError for version conflict"
testAnyExpectationConcurrency :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testAnyExpectationConcurrency :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testAnyExpectationConcurrency BackendHandle backend
store = do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
_ <-
insertEvents store Nothing $
singleEvent streamId NoStream (makeUserEvent 0)
start <- newEmptyMVar
results <- forM [1 .. 20] $ \Int
i -> IO (InsertionResult backend)
-> IO (Async (InsertionResult backend))
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (IO (InsertionResult backend)
-> IO (Async (InsertionResult backend)))
-> IO (InsertionResult backend)
-> IO (Async (InsertionResult backend))
forall a b. (a -> b) -> a -> b
$ do
MVar () -> Assertion
forall a. MVar a -> IO a
takeMVar MVar ()
start
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId ExpectedVersion backend
forall backend. ExpectedVersion backend
Any (Int -> SomeLatestEvent
makeUserEvent Int
i)
replicateM_ 20 (putMVar start ())
outcomes <- mapM wait results
let successes = [InsertionResult backend
r | r :: InsertionResult backend
r@(SuccessfulInsertion InsertionSuccess backend
_) <- [InsertionResult backend]
outcomes]
length successes @?= 20
testMixedVersionExpectations :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testMixedVersionExpectations :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testMixedVersionExpectations BackendHandle backend
store = do
streams <- Int -> IO StreamId -> IO [StreamId]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
5 (UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom)
case streams of
[StreamId
s1, StreamId
s2, StreamId
s3, StreamId
s4, StreamId
s5] -> do
_ <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction
( [(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList
[ (StreamId
s1, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream [Int -> SomeLatestEvent
makeUserEvent Int
1])
, (StreamId
s3, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream [Int -> SomeLatestEvent
makeUserEvent Int
3])
]
)
result <-
insertEvents store Nothing $
Transaction
( Map.fromList
[ (s1, StreamWrite StreamExists [makeUserEvent 11])
, (s2, StreamWrite NoStream [makeUserEvent 12])
, (s3, StreamWrite (ExactStreamVersion (StreamVersion 2)) [makeUserEvent 13])
, (s4, StreamWrite Any [makeUserEvent 14])
, (s5, StreamWrite StreamExists [makeUserEvent 15])
]
)
case result of
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
InsertionResult backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Batch with mixed expectations should fail if any expectation fails"
[StreamId]
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Expected exactly 5 streams"
testCascadingVersionDependencies :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (EventStoreError backend)) => BackendHandle backend -> IO ()
testCascadingVersionDependencies :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (EventStoreError backend)) =>
BackendHandle backend -> Assertion
testCascadingVersionDependencies BackendHandle backend
store = do
streams <- Int -> IO StreamId -> IO [StreamId]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
5 (UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom)
cursors <- forM (zip [0 ..] streams) $ \(Int
i, StreamId
stream) -> do
result <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
stream ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream (Int -> SomeLatestEvent
makeUserEvent Int
i)
case result of
SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
cursor}) -> (StreamId, Cursor backend) -> IO (StreamId, Cursor backend)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StreamId
stream, Cursor backend
cursor)
FailedInsertion EventStoreError backend
err -> TestName -> IO (StreamId, Cursor backend)
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO (StreamId, Cursor backend))
-> TestName -> IO (StreamId, Cursor backend)
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to create dependency chain: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
case cursors of
[(StreamId
_, Cursor backend
_), (StreamId
_, Cursor backend
_), (StreamId
s3, Cursor backend
c3), (StreamId
_, Cursor backend
_), (StreamId
_, Cursor backend
_)] -> do
result1 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
s3 (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
c3) (Int -> SomeLatestEvent
makeUserEvent Int
33)
case result1 of
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to update middle stream: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
result2 <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
s3 (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
c3) (Int -> SomeLatestEvent
makeUserEvent Int
333)
case result2 of
FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
InsertionResult backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Should not be able to use old cursor after update"
[(StreamId, Cursor backend)]
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Expected exactly 5 cursors"
testMultiStreamHeadConsistency :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testMultiStreamHeadConsistency :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testMultiStreamHeadConsistency BackendHandle backend
store = do
streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
streamB <- StreamId <$> UUID.nextRandom
streamC <- StreamId <$> UUID.nextRandom
result <-
insertEvents store Nothing $
Transaction
( Map.fromList
[ (streamA, StreamWrite NoStream [makeUserEvent 1, makeUserEvent 2])
, (streamB, StreamWrite NoStream [makeUserEvent 10, makeUserEvent 20, makeUserEvent 30])
, (streamC, StreamWrite NoStream [makeUserEvent 100])
]
)
case result of
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert multi-stream batch: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion{} -> do
resultA <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamA ExpectedVersion backend
forall backend. ExpectedVersion backend
StreamExists (Int -> SomeLatestEvent
makeUserEvent Int
3)
case resultA of
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to append to stream A (stream heads may be corrupted): " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
resultB <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamB ExpectedVersion backend
forall backend. ExpectedVersion backend
StreamExists (Int -> SomeLatestEvent
makeUserEvent Int
40)
case resultB of
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to append to stream B (stream heads may be corrupted): " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
resultC <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamC ExpectedVersion backend
forall backend. ExpectedVersion backend
StreamExists (Int -> SomeLatestEvent
makeUserEvent Int
200)
case resultC of
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to append to stream C (stream heads may be corrupted): " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
testEmptyBatchInsertion :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testEmptyBatchInsertion :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testEmptyBatchInsertion BackendHandle backend
store = do
result <- BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction (Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. Map k a
Map.empty :: Map.Map StreamId (StreamWrite [] SomeLatestEvent backend)))
case result of
FailedInsertion EventStoreError backend
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
SuccessfulInsertion InsertionSuccess backend
_ -> do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
result2 <-
insertEvents store Nothing $
appendToOrCreateStream streamId (makeUserEvent 1)
case result2 of
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Failed follow-up insert after empty batch: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
testMixedEmptyStreams :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testMixedEmptyStreams :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testMixedEmptyStreams BackendHandle backend
store = do
streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
streamB <- StreamId <$> UUID.nextRandom
streamC <- StreamId <$> UUID.nextRandom
result <-
insertEvents store Nothing $
Transaction
( Map.fromList
[ (streamA, StreamWrite NoStream [makeUserEvent 1, makeUserEvent 2])
, (streamB, StreamWrite NoStream [])
, (streamC, StreamWrite NoStream [makeUserEvent 100])
]
)
case result of
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert mixed empty/non-empty batch: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
resultB <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamB ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream (Int -> SomeLatestEvent
makeUserEvent Int
10)
case resultB of
FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Stream B should not exist yet, but got error: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
resultA <-
BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamA ExpectedVersion backend
forall backend. ExpectedVersion backend
StreamExists (Int -> SomeLatestEvent
makeUserEvent Int
3)
resultC <-
insertEvents store Nothing $
singleEvent streamC StreamExists (makeUserEvent 200)
case (resultA, resultC) of
(SuccessfulInsertion InsertionSuccess backend
_, SuccessfulInsertion InsertionSuccess backend
_) -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
(InsertionResult backend, InsertionResult backend)
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Streams A and C should exist after mixed batch"