{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
module Test.Hindsight.Store.StressTests where
import Control.Concurrent (newEmptyMVar, putMVar, readMVar, takeMVar, threadDelay)
import Control.Exception (SomeException, try)
import Control.Monad (foldM, forM, forM_, replicateM)
import Data.Map.Strict qualified as Map
import Data.Time (diffUTCTime, getCurrentTime)
import Data.UUID.V4 qualified as UUID
import Hindsight.Store
import System.Random (randomRIO)
import System.Timeout (timeout)
import Test.Hindsight.Store.Common (makeUserEvent)
import Test.Hindsight.Store.TestRunner (EventStoreTestRunner (..), runTest)
import Test.Tasty
import Test.Tasty.HUnit
import UnliftIO.Async (async, mapConcurrently, wait)
stressTests ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO) =>
    EventStoreTestRunner backend ->
    [TestTree]
stressTests :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
EventStoreTestRunner backend -> [TestTree]
stressTests EventStoreTestRunner backend
runner =
    [ TestName -> [TestTree] -> TestTree
testGroup
        TestName
"Pathological Tests"
        [ TestName -> Assertion -> TestTree
testCase TestName
"Massive Version Conflicts (100 writers)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
            EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
runTest EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testMassiveVersionConflicts
        , TestName -> Assertion -> TestTree
testCase TestName
"Massive Version Advancement (1000 iterations)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
            EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
runTest EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testMassiveVersionAdvancement
        , TestName -> Assertion -> TestTree
testCase TestName
"Version Skew Scenario" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
            EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
runTest EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testVersionSkewScenario
        ]
    , TestName -> [TestTree] -> TestTree
testGroup
        TestName
"High-Contention Tests"
        [ TestName -> Assertion -> TestTree
testCase TestName
"High Contention Version Checks (50 writers)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
            EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
runTest EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testHighContentionVersionChecks
        , TestName -> Assertion -> TestTree
testCase TestName
"Version Expectation Performance" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
            EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
runTest EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testVersionExpectationPerformance
        , TestName -> Assertion -> TestTree
testCase TestName
"Cascading Version Failures" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
            EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
runTest EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testCascadingVersionFailures
        , TestName -> Assertion -> TestTree
testCase TestName
"Multi-Stream Version Atomicity (10 streams)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
            EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
runTest EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testMultiStreamVersionAtomicity
        , TestName -> Assertion -> TestTree
testCase TestName
"Rapid Version Advancement (100 iterations)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
            EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
runTest EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testRapidVersionAdvancement
        ]
    , TestName -> [TestTree] -> TestTree
testGroup
        TestName
"Connection Resilience Tests"
        [ TestName -> Assertion -> TestTree
testCase TestName
"Version Checks with Connection Failures" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
            EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
runTest EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testVersionCheckWithConnectionFailures
        , TestName -> Assertion -> TestTree
testCase TestName
"Version Check Deadlock Scenarios" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
            EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> Assertion) -> Assertion
runTest EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testVersionCheckDeadlock
        ]
    ]
testMassiveVersionConflicts ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO) =>
    BackendHandle backend ->
    IO ()
testMassiveVersionConflicts :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testMassiveVersionConflicts BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    
    result1 <-
        insertEvents store Nothing $
            Transaction (Map.singleton streamId (StreamWrite NoStream [makeUserEvent 0]))
    cursor <- case result1 of
        SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
c}) -> Cursor backend -> IO (Cursor backend)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Cursor backend
c
        InsertionResult backend
_ -> TestName -> IO (Cursor backend)
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Failed to initialize stream"
    
    results <-
        mapConcurrently
            ( \Int
i -> do
                
                delay <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
1000) 
                threadDelay delay
                insertEvents store Nothing $
                    Transaction (Map.singleton streamId (StreamWrite (ExactVersion cursor) [makeUserEvent i]))
            )
            [1 .. 100]
    
    let successes = [InsertionResult backend
r | r :: InsertionResult backend
r@(SuccessfulInsertion InsertionSuccess backend
_) <- [InsertionResult backend]
results]
        failures = [InsertionResult backend
r | r :: InsertionResult backend
r@(FailedInsertion EventStoreError backend
_) <- [InsertionResult backend]
results]
    length successes @?= 1
    length failures @?= 99
    
    forM_ failures $ \case
        FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        InsertionResult backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Expected ConsistencyError"
testMassiveVersionAdvancement ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO) =>
    BackendHandle backend ->
    IO ()
testMassiveVersionAdvancement :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testMassiveVersionAdvancement BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    
    result1 <-
        insertEvents store Nothing $
            Transaction (Map.singleton streamId (StreamWrite NoStream [makeUserEvent 0]))
    initialCursor <- case result1 of
        SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
c}) -> Cursor backend -> IO (Cursor backend)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Cursor backend
c
        InsertionResult backend
_ -> TestName -> IO (Cursor backend)
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Failed to initialize stream"
    
    (finalCursor, midCursor) <-
        foldM
            ( \(Cursor backend
cursor, Maybe (Cursor backend)
savedMid) Int
i -> do
                result <-
                    BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                        Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction (StreamId
-> StreamWrite [] SomeLatestEvent backend
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. k -> a -> Map k a
Map.singleton StreamId
streamId (ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursor) [Int -> SomeLatestEvent
makeUserEvent Int
i]))
                case result of
                    SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
newCursor}) ->
                        
                        if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
500
                            then (Cursor backend, Maybe (Cursor backend))
-> IO (Cursor backend, Maybe (Cursor backend))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Cursor backend
newCursor, Cursor backend -> Maybe (Cursor backend)
forall a. a -> Maybe a
Just Cursor backend
newCursor)
                            else (Cursor backend, Maybe (Cursor backend))
-> IO (Cursor backend, Maybe (Cursor backend))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Cursor backend
newCursor, Maybe (Cursor backend)
savedMid)
                    InsertionResult backend
_ -> TestName -> IO (Cursor backend, Maybe (Cursor backend))
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO (Cursor backend, Maybe (Cursor backend)))
-> TestName -> IO (Cursor backend, Maybe (Cursor backend))
forall a b. (a -> b) -> a -> b
$ TestName
"Failed at iteration " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ Int -> TestName
forall a. Show a => a -> TestName
show Int
i
            )
            (initialCursor, Nothing)
            [1 .. 1000]
    
    result2 <-
        insertEvents store Nothing $
            Transaction (Map.singleton streamId (StreamWrite (ExactVersion initialCursor) [makeUserEvent 9999]))
    case result2 of
        FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        InsertionResult backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Should reject very old cursor"
    
    case midCursor of
        Maybe (Cursor backend)
Nothing -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Failed to capture mid cursor"
        Just Cursor backend
actualMidCursor -> do
            result3 <-
                BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                    Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction (StreamId
-> StreamWrite [] SomeLatestEvent backend
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. k -> a -> Map k a
Map.singleton StreamId
streamId (ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
actualMidCursor) [Int -> SomeLatestEvent
makeUserEvent Int
10000]))
            case result3 of
                FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                InsertionResult backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Should reject reused cursor"
    
    result4 <-
        insertEvents store Nothing $
            Transaction (Map.singleton streamId (StreamWrite (ExactVersion finalCursor) [makeUserEvent 10001]))
    case result4 of
        SuccessfulInsertion InsertionSuccess backend
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        InsertionResult backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Final cursor should still be valid"
testVersionSkewScenario ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO) =>
    BackendHandle backend ->
    IO ()
testVersionSkewScenario :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testVersionSkewScenario BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    
    result1 <-
        insertEvents store Nothing $
            Transaction (Map.singleton streamId (StreamWrite NoStream [makeUserEvent 0]))
    cursor1 <- case result1 of
        SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
c}) -> Cursor backend -> IO (Cursor backend)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Cursor backend
c
        InsertionResult backend
_ -> TestName -> IO (Cursor backend)
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Failed to initialize stream"
    
    cursors <-
        foldM
            ( \[Cursor backend]
acc Int
i -> do
                let lastCursor :: Cursor backend
lastCursor = [Cursor backend] -> Cursor backend
forall a. HasCallStack => [a] -> a
last [Cursor backend]
acc
                result <-
                    BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                        Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction (StreamId
-> StreamWrite [] SomeLatestEvent backend
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. k -> a -> Map k a
Map.singleton StreamId
streamId (ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
lastCursor) [Int -> SomeLatestEvent
makeUserEvent Int
i]))
                case result of
                    SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
newCursor}) -> [Cursor backend] -> IO [Cursor backend]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([Cursor backend]
acc [Cursor backend] -> [Cursor backend] -> [Cursor backend]
forall a. [a] -> [a] -> [a]
++ [Cursor backend
newCursor])
                    InsertionResult backend
_ -> TestName -> IO [Cursor backend]
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO [Cursor backend])
-> TestName -> IO [Cursor backend]
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to advance at " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ Int -> TestName
forall a. Show a => a -> TestName
show Int
i
            )
            [cursor1]
            [1 .. 10]
    let currentCursor = [Cursor backend] -> Cursor backend
forall a. HasCallStack => [a] -> a
last [Cursor backend]
cursors
        staleCursor = [Cursor backend]
cursors [Cursor backend] -> Int -> Cursor backend
forall a. HasCallStack => [a] -> Int -> a
!! Int
2 
    
    result2 <-
        insertEvents store Nothing $
            Transaction (Map.singleton streamId (StreamWrite (ExactVersion staleCursor) [makeUserEvent 999]))
    case result2 of
        FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        InsertionResult backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Should reject stale cursor"
    
    result3 <-
        insertEvents store Nothing $
            Transaction (Map.singleton streamId (StreamWrite (ExactVersion currentCursor) [makeUserEvent 1000]))
    case result3 of
        SuccessfulInsertion InsertionSuccess backend
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        InsertionResult backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Current cursor should work"
testHighContentionVersionChecks ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO) =>
    BackendHandle backend ->
    IO ()
testHighContentionVersionChecks :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testHighContentionVersionChecks BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    
    _ <-
        insertEvents store Nothing $
            Transaction (Map.singleton streamId (StreamWrite NoStream [makeUserEvent 0]))
    
    start <- newEmptyMVar
    results <- forM [1 .. 50] $ \Int
i -> IO (InsertionResult backend)
-> IO (Async (InsertionResult backend))
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (IO (InsertionResult backend)
 -> IO (Async (InsertionResult backend)))
-> IO (InsertionResult backend)
-> IO (Async (InsertionResult backend))
forall a b. (a -> b) -> a -> b
$ do
        MVar () -> Assertion
forall a. MVar a -> IO a
readMVar MVar ()
start 
        BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
            Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction (StreamId
-> StreamWrite [] SomeLatestEvent backend
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. k -> a -> Map k a
Map.singleton StreamId
streamId (ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
Any [Int -> SomeLatestEvent
makeUserEvent Int
i]))
    
    putMVar start ()
    
    outcomes <- mapM wait results
    let successes = [InsertionResult backend
r | r :: InsertionResult backend
r@(SuccessfulInsertion InsertionSuccess backend
_) <- [InsertionResult backend]
outcomes]
    length successes @?= 50 
testVersionExpectationPerformance ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO) =>
    BackendHandle backend ->
    IO ()
testVersionExpectationPerformance :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testVersionExpectationPerformance BackendHandle backend
store = do
    numStreams <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
10, Int
20)
    streams <- replicateM numStreams (StreamId <$> UUID.nextRandom)
    
    _ <-
        insertEvents store Nothing $
            Transaction (Map.fromList [(s, StreamWrite NoStream [makeUserEvent 0]) | s <- streams])
    
    startTime <- getCurrentTime
    
    results <-
        mapConcurrently
            ( \(Int
i, StreamId
stream) -> do
                let expectation :: ExpectedVersion backend
expectation = case Int
i Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
4 of
                        Int
0 -> ExpectedVersion backend
forall backend. ExpectedVersion backend
Any
                        Int
1 -> ExpectedVersion backend
forall backend. ExpectedVersion backend
StreamExists
                        Int
2 -> ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream 
                        Int
_ -> ExpectedVersion backend
forall backend. ExpectedVersion backend
Any
                BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                    Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction (StreamId
-> StreamWrite [] SomeLatestEvent backend
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. k -> a -> Map k a
Map.singleton StreamId
stream (ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
expectation [Int -> SomeLatestEvent
makeUserEvent Int
i]))
            )
            (zip [1 .. 100] (cycle streams))
    endTime <- getCurrentTime
    let duration = UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
endTime UTCTime
startTime
    
    assertBool ("Performance test took too long: " ++ show duration) (duration < 5)
    
    let successes = [InsertionResult backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [InsertionResult backend
r | r :: InsertionResult backend
r@(SuccessfulInsertion InsertionSuccess backend
_) <- [InsertionResult backend]
results]
        failures = [InsertionResult backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [InsertionResult backend
r | r :: InsertionResult backend
r@(FailedInsertion EventStoreError backend
_) <- [InsertionResult backend]
results]
    assertBool "Should have some successes" (successes > 0)
    assertBool "Should have some failures due to NoStream on existing streams" (failures > 0)
testCascadingVersionFailures ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO) =>
    BackendHandle backend ->
    IO ()
testCascadingVersionFailures :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testCascadingVersionFailures BackendHandle backend
store = do
    
    [streamA, streamB, streamC, _, _] <- Int -> IO StreamId -> IO [StreamId]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
5 (UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom)
    
    result1 <-
        insertEvents store Nothing $
            Transaction (Map.singleton streamA (StreamWrite NoStream [makeUserEvent 1]))
    cursorA <- case result1 of
        SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
c}) -> Cursor backend -> IO (Cursor backend)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Cursor backend
c
        InsertionResult backend
_ -> TestName -> IO (Cursor backend)
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Failed to initialize stream A"
    
    _ <-
        insertEvents store Nothing $
            Transaction (Map.singleton streamB (StreamWrite NoStream [makeUserEvent 2]))
    _ <-
        insertEvents store Nothing $
            Transaction (Map.singleton streamC (StreamWrite NoStream [makeUserEvent 3]))
    
    result4 <-
        insertEvents store Nothing $
            Transaction (Map.singleton streamA (StreamWrite (ExactVersion cursorA) [makeUserEvent 11]))
    case result4 of
        SuccessfulInsertion InsertionSuccess backend
_ -> do
            
            result5 <-
                BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                    Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction (StreamId
-> StreamWrite [] SomeLatestEvent backend
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. k -> a -> Map k a
Map.singleton StreamId
streamA (ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorA) [Int -> SomeLatestEvent
makeUserEvent Int
111]))
            case result5 of
                FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                InsertionResult backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Should fail with outdated cursor"
        InsertionResult backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Failed to update stream A"
testMultiStreamVersionAtomicity ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO) =>
    BackendHandle backend ->
    IO ()
testMultiStreamVersionAtomicity :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testMultiStreamVersionAtomicity BackendHandle backend
store = do
    
    streams <- Int -> IO StreamId -> IO [StreamId]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
10 (UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom)
    
    let (initialized, uninitialized) = splitAt 5 streams
    _ <-
        insertEvents store Nothing $
            Transaction (Map.fromList [(s, StreamWrite NoStream [makeUserEvent i]) | (i, s) <- zip [1 ..] initialized])
    
    result1 <-
        insertEvents store Nothing $
            Transaction
                ( Map.fromList $
                    [(s, StreamWrite StreamExists [makeUserEvent 100]) | s <- initialized]
                        ++ [(s, StreamWrite StreamExists [makeUserEvent 200]) | s <- uninitialized] 
                )
    case result1 of
        FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> do
            
            
            result2 <-
                BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                    Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction
                        ( [(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([(StreamId, StreamWrite [] SomeLatestEvent backend)]
 -> Map StreamId (StreamWrite [] SomeLatestEvent backend))
-> [(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall a b. (a -> b) -> a -> b
$
                            [(StreamId
s, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
StreamExists [Int -> SomeLatestEvent
makeUserEvent Int
100]) | StreamId
s <- [StreamId]
initialized]
                                [(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> [(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> [(StreamId, StreamWrite [] SomeLatestEvent backend)]
forall a. [a] -> [a] -> [a]
++ [(StreamId
s, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream [Int -> SomeLatestEvent
makeUserEvent Int
200]) | StreamId
s <- [StreamId]
uninitialized]
                        )
            case result2 of
                SuccessfulInsertion InsertionSuccess backend
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                InsertionResult backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Batch with correct expectations should succeed"
        InsertionResult backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Mixed batch should fail atomically"
testRapidVersionAdvancement ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO) =>
    BackendHandle backend ->
    IO ()
testRapidVersionAdvancement :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testRapidVersionAdvancement BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    
    result1 <-
        insertEvents store Nothing $
            Transaction (Map.singleton streamId (StreamWrite NoStream [makeUserEvent 0]))
    initialCursor <- case result1 of
        SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
c}) -> Cursor backend -> IO (Cursor backend)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Cursor backend
c
        InsertionResult backend
_ -> TestName -> IO (Cursor backend)
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Failed to initialize stream"
    
    _ <-
        foldM
            ( \Cursor backend
cursor Int
i -> do
                result <-
                    BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                        Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction (StreamId
-> StreamWrite [] SomeLatestEvent backend
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. k -> a -> Map k a
Map.singleton StreamId
streamId (ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursor) [Int -> SomeLatestEvent
makeUserEvent Int
i]))
                case result of
                    SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
newCursor}) -> Cursor backend -> IO (Cursor backend)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Cursor backend
newCursor
                    InsertionResult backend
_ -> TestName -> IO (Cursor backend)
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO (Cursor backend))
-> TestName -> IO (Cursor backend)
forall a b. (a -> b) -> a -> b
$ TestName
"Failed at iteration " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ Int -> TestName
forall a. Show a => a -> TestName
show Int
i
            )
            initialCursor
            [1 .. 100]
    
    result2 <-
        insertEvents store Nothing $
            Transaction (Map.singleton streamId (StreamWrite (ExactVersion initialCursor) [makeUserEvent 999]))
    case result2 of
        FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        InsertionResult backend
_ -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Should not accept very old cursor"
testVersionCheckWithConnectionFailures ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO) =>
    BackendHandle backend ->
    IO ()
testVersionCheckWithConnectionFailures :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testVersionCheckWithConnectionFailures BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    
    _ <-
        insertEvents store Nothing $
            Transaction (Map.singleton streamId (StreamWrite NoStream [makeUserEvent 0]))
    
    results <- forM [1 .. 50] $ \Int
i -> do
        
        delay <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
100)
        threadDelay delay
        tryResult <-
            try $
                insertEvents store Nothing $
                    Transaction (Map.singleton streamId (StreamWrite Any [makeUserEvent i]))
        case tryResult of
            Left (SomeException
_ :: SomeException) -> Maybe (InsertionResult backend)
-> IO (Maybe (InsertionResult backend))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (InsertionResult backend)
forall a. Maybe a
Nothing 
            Right InsertionResult backend
insertResult -> Maybe (InsertionResult backend)
-> IO (Maybe (InsertionResult backend))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (InsertionResult backend)
 -> IO (Maybe (InsertionResult backend)))
-> Maybe (InsertionResult backend)
-> IO (Maybe (InsertionResult backend))
forall a b. (a -> b) -> a -> b
$ InsertionResult backend -> Maybe (InsertionResult backend)
forall a. a -> Maybe a
Just InsertionResult backend
insertResult
    let successfulInserts = [InsertionResult backend
r | Just r :: InsertionResult backend
r@(SuccessfulInsertion InsertionSuccess backend
_) <- [Maybe (InsertionResult backend)]
results]
    
    assertBool "Should have some successful inserts" (length successfulInserts > 10)
testVersionCheckDeadlock ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO) =>
    BackendHandle backend ->
    IO ()
testVersionCheckDeadlock :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testVersionCheckDeadlock BackendHandle backend
store = do
    streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    streamB <- StreamId <$> UUID.nextRandom
    
    _ <-
        insertEvents store Nothing $
            Transaction
                ( Map.fromList
                    [ (streamA, StreamWrite NoStream [makeUserEvent 1])
                    , (streamB, StreamWrite NoStream [makeUserEvent 1])
                    ]
                )
    
    syncVar1 <- newEmptyMVar
    syncVar2 <- newEmptyMVar
    result1 <- async $ do
        takeMVar syncVar1 
        
        insertEvents store Nothing $
            Transaction
                ( Map.fromList
                    [ (streamA, StreamWrite StreamExists [makeUserEvent 2])
                    , (streamB, StreamWrite StreamExists [makeUserEvent 2])
                    ]
                )
    result2 <- async $ do
        takeMVar syncVar2 
        
        insertEvents store Nothing $
            Transaction
                ( Map.fromList
                    [ (streamB, StreamWrite StreamExists [makeUserEvent 3])
                    , (streamA, StreamWrite StreamExists [makeUserEvent 3])
                    ]
                )
    
    putMVar syncVar1 ()
    putMVar syncVar2 ()
    
    timeoutResult <- timeout 10_000_000 $ do
        
        r1 <- wait result1
        r2 <- wait result2
        pure (r1, r2)
    case timeoutResult of
        Maybe (InsertionResult backend, InsertionResult backend)
Nothing -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Transactions took too long - possible deadlock"
        Just (InsertionResult backend
r1, InsertionResult backend
r2) -> do
            
            let anySuccess :: Bool
anySuccess = (InsertionResult backend -> Bool)
-> [InsertionResult backend] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any InsertionResult backend -> Bool
forall {backend}. InsertionResult backend -> Bool
isSuccess [InsertionResult backend
r1, InsertionResult backend
r2]
            HasCallStack => TestName -> Bool -> Assertion
TestName -> Bool -> Assertion
assertBool TestName
"At least one transaction should succeed" Bool
anySuccess
  where
    isSuccess :: InsertionResult backend -> Bool
isSuccess (SuccessfulInsertion InsertionSuccess backend
_) = Bool
True
    isSuccess InsertionResult backend
_ = Bool
False