{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wno-orphans #-}

module Test.Hindsight.Store.TestRunner (
    -- * Test Infrastructure
    EventStoreTestRunner (..),
    runTest,
    runMultiInstanceTest,
    repeatTest,

    -- * Generic Test Suites
    genericEventStoreTests,
    multiInstanceTests,

    -- * Helper Functions
    collectEventsUntilTombstone,
    handleTombstone,
    extractUserInfo,
)
where

import Control.Concurrent (MVar, newEmptyMVar, putMVar, takeMVar, threadDelay)
import Control.Monad (forM, forM_, replicateM, replicateM_)
import Data.IORef
import Data.Map.Strict qualified as Map
import Data.Maybe (mapMaybe)
import Data.Text (Text)
import Data.Typeable (cast)
import Data.UUID.V4 qualified as UUID
import Hindsight.Events (SomeLatestEvent)
import Hindsight.Store
import Test.Hindsight.Examples (UserCreated, UserInformation2 (..))
import Test.Hindsight.Store.Common
import Test.Tasty
import Test.Tasty.HUnit
import UnliftIO.Async (async, concurrently, wait)
import UnliftIO.Exception (fromException, throwIO, tryAny)

-- | Test runner for event store tests
data EventStoreTestRunner backend = EventStoreTestRunner
    { forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> IO ()
withStore :: forall a. (BackendHandle backend -> IO a) -> IO ()
    , forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> IO ()
withStores :: forall a. Int -> ([BackendHandle backend] -> IO a) -> IO ()
    {- ^ For multi-instance tests: provides N handles to the same backend storage
    Simulates multiple processes accessing the same backend
    -}
    }

-- | Common event store test cases split into basic and consistency tests
genericEventStoreTests ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO, Show (Cursor backend), Ord (Cursor backend)) =>
    EventStoreTestRunner backend ->
    [TestTree]
genericEventStoreTests :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend), Ord (Cursor backend)) =>
EventStoreTestRunner backend -> [TestTree]
genericEventStoreTests EventStoreTestRunner backend
runner =
    [ TestName -> [TestTree] -> TestTree
testGroup
        TestName
"Basic Tests"
        [ TestName -> IO () -> TestTree
testCase TestName
"Basic Event Reception" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testBasicEventReception)
        , TestName -> IO () -> TestTree
testCase TestName
"Correlation ID Preservation" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testCorrelationIdPreservation)
        , TestName -> IO () -> TestTree
testCase TestName
"Single Stream Selection" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testSingleStreamSelection)
        , TestName -> IO () -> TestTree
testCase TestName
"Start From Position" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testStartFromPosition)
        , Int -> TestName -> IO () -> TestTree
repeatTest Int
20 TestName
"Async Subscription Reception" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testAsyncSubscription)
        , TestName -> IO () -> TestTree
testCase TestName
"Subscription Honors Stop Result" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testSubscriptionStopBehavior)
        , TestName -> IO () -> TestTree
testCase TestName
"Handler Exception Enrichment" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testHandlerExceptionEnrichment)
        ]
    , TestName -> [TestTree] -> TestTree
testGroup
        TestName
"Stream Version Tests"
        [ TestName -> IO () -> TestTree
testCase TestName
"Stream Versions Start At 1" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testStreamVersionsStartAt1)
        , TestName -> IO () -> TestTree
testCase TestName
"Stream Versions Are Contiguous" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testStreamVersionsContiguous)
        , TestName -> IO () -> TestTree
testCase TestName
"Stream Versions Exposed In Subscription" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testStreamVersionExposedInSubscription)
        , TestName -> IO () -> TestTree
testCase TestName
"Multiple Streams Have Independent Versions" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testIndependentStreamVersions)
        ]
    , TestName -> [TestTree] -> TestTree
testGroup
        TestName
"Consistency Tests"
        [ TestName -> IO () -> TestTree
testCase TestName
"No Stream Condition" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testNoStreamCondition)
        , TestName -> IO () -> TestTree
testCase TestName
"Stream Exists Condition" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testStreamExistsCondition)
        , TestName -> IO () -> TestTree
testCase TestName
"Exact Version Condition" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testExactVersionCondition)
        , TestName -> IO () -> TestTree
testCase TestName
"Exact Stream Version Condition" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testExactStreamVersionCondition)
        , TestName -> IO () -> TestTree
testCase TestName
"Concurrent Writes" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testConcurrentWrites)
        , TestName -> IO () -> TestTree
testCase TestName
"Batch Atomicity" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testBatchAtomicity)
        , TestName -> IO () -> TestTree
testCase TestName
"Multi-Stream Consistency" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testMultiStreamConsistency)
        , TestName -> IO () -> TestTree
testCase TestName
"Version Expectation Race Condition" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testVersionExpectationRaceCondition)
        , TestName -> IO () -> TestTree
testCase TestName
"Any Expectation Concurrency" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testAnyExpectationConcurrency)
        , TestName -> IO () -> TestTree
testCase TestName
"Mixed Version Expectations" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testMixedVersionExpectations)
        , TestName -> IO () -> TestTree
testCase TestName
"Cascading Version Dependencies" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (EventStoreError backend)) =>
BackendHandle backend -> IO ()
testCascadingVersionDependencies)
        , TestName -> IO () -> TestTree
testCase TestName
"Multi-Stream Head Consistency" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testMultiStreamHeadConsistency)
        , TestName -> IO () -> TestTree
testCase TestName
"Empty Batch Insertion" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testEmptyBatchInsertion)
        , TestName -> IO () -> TestTree
testCase TestName
"Mixed Empty and Non-Empty Streams" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testMixedEmptyStreams)
        ]
    , TestName -> [TestTree] -> TestTree
testGroup
        TestName
"Per-Stream Cursor Tests"
        [ TestName -> IO () -> TestTree
testCase TestName
"Per-Stream Cursor Extraction" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testPerStreamCursorExtraction)
        , TestName -> IO () -> TestTree
testCase TestName
"Cursor Independence" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testCursorIndependence)
        , TestName -> IO () -> TestTree
testCase TestName
"Stale Cursor Per Stream" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testStaleCursorPerStream)
        , TestName -> IO () -> TestTree
testCase TestName
"Cursor Completeness" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend), Ord (Cursor backend)) =>
BackendHandle backend -> IO ()
testCursorCompleteness)
        , TestName -> IO () -> TestTree
testCase TestName
"Empty Stream Cursor Handling" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner (BackendHandle backend -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testEmptyStreamCursorHandling)
        ]
    ]

-- | Multi-instance test cases (for backends that support cross-process subscriptions)
multiInstanceTests ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) =>
    EventStoreTestRunner backend ->
    [TestTree]
multiInstanceTests :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
EventStoreTestRunner backend -> [TestTree]
multiInstanceTests EventStoreTestRunner backend
runner =
    [ TestName -> [TestTree] -> TestTree
testGroup
        TestName
"Multi-Instance Tests"
        [ TestName -> IO () -> TestTree
testCase TestName
"Multi-Instance Subscriptions (2 instances)" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> Int -> ([BackendHandle backend] -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> Int -> ([BackendHandle backend] -> IO ()) -> IO ()
runMultiInstanceTest EventStoreTestRunner backend
runner Int
2 ([BackendHandle backend] -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
[BackendHandle backend] -> IO ()
testMultiInstanceSubscription)
        , TestName -> IO () -> TestTree
testCase TestName
"Multi-Instance Subscriptions (5 instances)" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> Int -> ([BackendHandle backend] -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> Int -> ([BackendHandle backend] -> IO ()) -> IO ()
runMultiInstanceTest EventStoreTestRunner backend
runner Int
5 ([BackendHandle backend] -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
[BackendHandle backend] -> IO ()
testMultiInstanceSubscription)
        , TestName -> IO () -> TestTree
testCase TestName
"Multi-Instance Subscriptions (10 instances)" (IO () -> TestTree) -> IO () -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> Int -> ([BackendHandle backend] -> IO ()) -> IO ()
forall backend.
EventStoreTestRunner backend
-> Int -> ([BackendHandle backend] -> IO ()) -> IO ()
runMultiInstanceTest EventStoreTestRunner backend
runner Int
10 ([BackendHandle backend] -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
[BackendHandle backend] -> IO ()
testMultiInstanceSubscription)
        ]
    ]

repeatTest :: Int -> TestName -> Assertion -> TestTree
repeatTest :: Int -> TestName -> IO () -> TestTree
repeatTest Int
n TestName
name IO ()
assertion =
    TestName -> [TestTree] -> TestTree
testGroup (TestName
name TestName -> TestName -> TestName
forall a. Semigroup a => a -> a -> a
<> TestName
" x" TestName -> TestName -> TestName
forall a. Semigroup a => a -> a -> a
<> Int -> TestName
forall a. Show a => a -> TestName
show Int
n) ([TestTree] -> TestTree) -> [TestTree] -> TestTree
forall a b. (a -> b) -> a -> b
$
        Int -> TestTree -> [TestTree]
forall a. Int -> a -> [a]
replicate Int
n (TestTree -> [TestTree]) -> TestTree -> [TestTree]
forall a b. (a -> b) -> a -> b
$
            TestName -> IO () -> TestTree
testCase TestName
name IO ()
assertion

-- | Run a test with the test runner
runTest :: EventStoreTestRunner backend -> (BackendHandle backend -> IO ()) -> IO ()
runTest :: forall backend.
EventStoreTestRunner backend
-> (BackendHandle backend -> IO ()) -> IO ()
runTest EventStoreTestRunner backend
runner BackendHandle backend -> IO ()
action = EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> IO ()
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> IO ()
withStore EventStoreTestRunner backend
runner BackendHandle backend -> IO ()
action

-- | Run a multi-instance test with the test runner
runMultiInstanceTest :: EventStoreTestRunner backend -> Int -> ([BackendHandle backend] -> IO ()) -> IO ()
runMultiInstanceTest :: forall backend.
EventStoreTestRunner backend
-> Int -> ([BackendHandle backend] -> IO ()) -> IO ()
runMultiInstanceTest EventStoreTestRunner backend
runner Int
n [BackendHandle backend] -> IO ()
action = EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> IO ()
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> IO ()
withStores EventStoreTestRunner backend
runner Int
n [BackendHandle backend] -> IO ()
action

-- | Helper functions
collectEventsUntilTombstone :: IORef [EventEnvelope UserCreated backend] -> EventHandler UserCreated IO backend
collectEventsUntilTombstone :: forall backend.
IORef [EventEnvelope UserCreated backend]
-> EventHandler UserCreated IO backend
collectEventsUntilTombstone IORef [EventEnvelope UserCreated backend]
ref EventEnvelope UserCreated backend
event = do
    IORef [EventEnvelope UserCreated backend]
-> ([EventEnvelope UserCreated backend]
    -> ([EventEnvelope UserCreated backend], ()))
-> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [EventEnvelope UserCreated backend]
ref (\[EventEnvelope UserCreated backend]
events -> (EventEnvelope UserCreated backend
event EventEnvelope UserCreated backend
-> [EventEnvelope UserCreated backend]
-> [EventEnvelope UserCreated backend]
forall a. a -> [a] -> [a]
: [EventEnvelope UserCreated backend]
events, ()))
    SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue

handleTombstone :: MVar () -> EventHandler Tombstone IO backend
handleTombstone :: forall backend. MVar () -> EventHandler Tombstone IO backend
handleTombstone MVar ()
completionVar EventEnvelope Tombstone backend
_ = do
    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
completionVar ()
    SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Stop

extractUserInfo :: EventEnvelope UserCreated backend -> Maybe UserInformation2
extractUserInfo :: forall backend.
EventEnvelope UserCreated backend -> Maybe UserInformation2
extractUserInfo EventEnvelope UserCreated backend
envelope = UserInformation2 -> Maybe UserInformation2
forall a b. (Typeable a, Typeable b) => a -> Maybe b
cast EventEnvelope UserCreated backend
envelope.payload

-- Basic Test implementations --

testBasicEventReception :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testBasicEventReception :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testBasicEventReception BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    receivedEvents <- newIORef []
    completionVar <- newEmptyMVar

    let testEvents = (Int -> SomeLatestEvent) -> [Int] -> [SomeLatestEvent]
forall a b. (a -> b) -> [a] -> [b]
map Int -> SomeLatestEvent
makeUserEvent [Int
1 .. Int
3] [SomeLatestEvent] -> [SomeLatestEvent] -> [SomeLatestEvent]
forall a. [a] -> [a] -> [a]
++ [SomeLatestEvent
makeTombstone]
    result <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))

    case result of
        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert events: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion InsertionSuccess backend
_ -> do
            handle <-
                BackendHandle backend
-> EventMatcher '[UserCreated, Tombstone] backend IO
-> EventSelector backend
-> IO (SubscriptionHandle backend)
forall backend (m :: * -> *) (ts :: [Symbol]).
(EventStore backend, StoreConstraints backend m) =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints backend m =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribe
                    BackendHandle backend
store
                    ( EventHandler UserCreated IO backend
-> (Proxy UserCreated, EventHandler UserCreated IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy UserCreated, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match UserCreated (IORef [EventEnvelope UserCreated backend]
-> EventHandler UserCreated IO backend
forall backend.
IORef [EventEnvelope UserCreated backend]
-> EventHandler UserCreated IO backend
collectEventsUntilTombstone IORef [EventEnvelope UserCreated backend]
receivedEvents)
                        (Proxy UserCreated, EventHandler UserCreated IO backend)
-> EventMatcher '[Tombstone] backend IO
-> EventMatcher '[UserCreated, Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventHandler Tombstone IO backend
-> (Proxy Tombstone, EventHandler Tombstone IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy Tombstone, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match Tombstone (MVar () -> EventHandler Tombstone IO backend
forall backend. MVar () -> EventHandler Tombstone IO backend
handleTombstone MVar ()
completionVar)
                        (Proxy Tombstone, EventHandler Tombstone IO backend)
-> EventMatcher '[] backend IO
-> EventMatcher '[Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventMatcher '[] backend IO
forall backend (m :: * -> *). EventMatcher '[] backend m
MatchEnd
                    )
                    EventSelector{streamId :: StreamSelector
streamId = StreamSelector
AllStreams, startupPosition :: StartupPosition backend
startupPosition = StartupPosition backend
forall backend. StartupPosition backend
FromBeginning}

            takeMVar completionVar
            handle.cancel -- Cancel subscription after completion
            events <- reverse <$> readIORef receivedEvents
            length events @?= 3

            let userInfos = (EventEnvelope UserCreated backend -> Maybe UserInformation2)
-> [EventEnvelope UserCreated backend] -> [UserInformation2]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe EventEnvelope UserCreated backend -> Maybe UserInformation2
forall backend.
EventEnvelope UserCreated backend -> Maybe UserInformation2
extractUserInfo [EventEnvelope UserCreated backend]
events
            length userInfos @?= 3
            let userNames :: [Text]
                userNames = (UserInformation2 -> Text) -> [UserInformation2] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map UserInformation2 -> Text
userName [UserInformation2]
userInfos
            userNames @?= ["user1", "user2", "user3"]

testSingleStreamSelection :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testSingleStreamSelection :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testSingleStreamSelection BackendHandle backend
store = do
    stream1 <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    stream2 <- StreamId <$> UUID.nextRandom
    receivedEvents <- newIORef []
    completionVar <- newEmptyMVar

    _ <- insertEvents store Nothing (multiEvent stream1 Any (map makeUserEvent [1 .. 3]))
    _ <- insertEvents store Nothing (multiEvent stream2 Any (map makeUserEvent [4 .. 6]))
    _ <- insertEvents store Nothing (Transaction (Map.fromList [(stream1, StreamWrite Any [makeTombstone])]))

    handle <-
        subscribe
            store
            ( match UserCreated (collectEventsUntilTombstone receivedEvents)
                :? match Tombstone (handleTombstone completionVar)
                :? MatchEnd
            )
            EventSelector{streamId = SingleStream stream1, startupPosition = FromBeginning}

    takeMVar completionVar
    handle.cancel -- Cancel subscription after completion
    events <- reverse <$> readIORef receivedEvents
    let userInfos = (EventEnvelope UserCreated backend -> Maybe UserInformation2)
-> [EventEnvelope UserCreated backend] -> [UserInformation2]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe EventEnvelope UserCreated backend -> Maybe UserInformation2
forall backend.
EventEnvelope UserCreated backend -> Maybe UserInformation2
extractUserInfo [EventEnvelope UserCreated backend]
events
    length userInfos @?= 3
    let userNames = (UserInformation2 -> Text) -> [UserInformation2] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map UserInformation2 -> Text
userName [UserInformation2]
userInfos
    userNames @?= ["user1", "user2", "user3"]

testStartFromPosition :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testStartFromPosition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testStartFromPosition BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    let testEvents = (Int -> SomeLatestEvent) -> [Int] -> [SomeLatestEvent]
forall a b. (a -> b) -> [a] -> [b]
map Int -> SomeLatestEvent
makeUserEvent [Int
1 .. Int
5]

    result <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite Any (take 3 testEvents))]))
    case result of
        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert first batch: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
cursor}) -> do
            _ <- BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction ([(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(StreamId
streamId, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
Any (Int -> [SomeLatestEvent] -> [SomeLatestEvent]
forall a. Int -> [a] -> [a]
drop Int
3 [SomeLatestEvent]
testEvents))]))
            _ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite Any [makeTombstone])]))

            receivedEvents <- newIORef []
            completionVar <- newEmptyMVar

            handle <-
                subscribe
                    store
                    ( match UserCreated (collectEventsUntilTombstone receivedEvents)
                        :? match Tombstone (handleTombstone completionVar)
                        :? MatchEnd
                    )
                    EventSelector{streamId = AllStreams, startupPosition = FromLastProcessed cursor}

            takeMVar completionVar
            handle.cancel -- Cancel subscription after completion
            events <- reverse <$> readIORef receivedEvents
            let userInfos = (EventEnvelope UserCreated backend -> Maybe UserInformation2)
-> [EventEnvelope UserCreated backend] -> [UserInformation2]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe EventEnvelope UserCreated backend -> Maybe UserInformation2
forall backend.
EventEnvelope UserCreated backend -> Maybe UserInformation2
extractUserInfo [EventEnvelope UserCreated backend]
events
            length userInfos @?= 2
            let userNames :: [Text]
                userNames = (UserInformation2 -> Text) -> [UserInformation2] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map UserInformation2 -> Text
userName [UserInformation2]
userInfos
            userNames @?= ["user4", "user5"]

testCorrelationIdPreservation :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testCorrelationIdPreservation :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testCorrelationIdPreservation BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    corrId <- CorrelationId <$> UUID.nextRandom
    receivedEvents <- newIORef []
    completionVar <- newEmptyMVar

    let testEvents = (Int -> SomeLatestEvent) -> [Int] -> [SomeLatestEvent]
forall a b. (a -> b) -> [a] -> [b]
map Int -> SomeLatestEvent
makeUserEvent [Int
1 .. Int
3] [SomeLatestEvent] -> [SomeLatestEvent] -> [SomeLatestEvent]
forall a. [a] -> [a] -> [a]
++ [SomeLatestEvent
makeTombstone]
    result <- insertEvents store (Just corrId) (Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))

    case result of
        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert events: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion InsertionSuccess backend
_ -> do
            handle <-
                BackendHandle backend
-> EventMatcher '[UserCreated, Tombstone] backend IO
-> EventSelector backend
-> IO (SubscriptionHandle backend)
forall backend (m :: * -> *) (ts :: [Symbol]).
(EventStore backend, StoreConstraints backend m) =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints backend m =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribe
                    BackendHandle backend
store
                    ( EventHandler UserCreated IO backend
-> (Proxy UserCreated, EventHandler UserCreated IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy UserCreated, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match UserCreated (IORef [EventEnvelope UserCreated backend]
-> EventHandler UserCreated IO backend
forall backend.
IORef [EventEnvelope UserCreated backend]
-> EventHandler UserCreated IO backend
collectEventsUntilTombstone IORef [EventEnvelope UserCreated backend]
receivedEvents)
                        (Proxy UserCreated, EventHandler UserCreated IO backend)
-> EventMatcher '[Tombstone] backend IO
-> EventMatcher '[UserCreated, Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventHandler Tombstone IO backend
-> (Proxy Tombstone, EventHandler Tombstone IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy Tombstone, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match Tombstone (MVar () -> EventHandler Tombstone IO backend
forall backend. MVar () -> EventHandler Tombstone IO backend
handleTombstone MVar ()
completionVar)
                        (Proxy Tombstone, EventHandler Tombstone IO backend)
-> EventMatcher '[] backend IO
-> EventMatcher '[Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventMatcher '[] backend IO
forall backend (m :: * -> *). EventMatcher '[] backend m
MatchEnd
                    )
                    EventSelector{streamId :: StreamSelector
streamId = StreamSelector
AllStreams, startupPosition :: StartupPosition backend
startupPosition = StartupPosition backend
forall backend. StartupPosition backend
FromBeginning}

            takeMVar completionVar
            handle.cancel -- Cancel subscription after completion
            events <- readIORef receivedEvents
            mapM_ (\EventEnvelope UserCreated backend
evt -> EventEnvelope UserCreated backend
evt.correlationId Maybe CorrelationId -> Maybe CorrelationId -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
@?= CorrelationId -> Maybe CorrelationId
forall a. a -> Maybe a
Just CorrelationId
corrId) events

testAsyncSubscription :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testAsyncSubscription :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testAsyncSubscription BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    receivedEvents <- newIORef []
    completionVar <- newEmptyMVar

    handle <-
        subscribe
            store
            ( match UserCreated (collectEventsUntilTombstone receivedEvents)
                :? match Tombstone (handleTombstone completionVar)
                :? MatchEnd
            )
            EventSelector{streamId = AllStreams, startupPosition = FromBeginning}

    let testEvents = (Int -> SomeLatestEvent) -> [Int] -> [SomeLatestEvent]
forall a b. (a -> b) -> [a] -> [b]
map Int -> SomeLatestEvent
makeUserEvent [Int
1 .. Int
3] [SomeLatestEvent] -> [SomeLatestEvent] -> [SomeLatestEvent]
forall a. [a] -> [a] -> [a]
++ [SomeLatestEvent
makeTombstone]
    result <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))

    case result of
        FailedInsertion EventStoreError backend
err -> do
            SubscriptionHandle backend
handle.cancel
            TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert events: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion InsertionSuccess backend
_ -> do
            MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
completionVar
            SubscriptionHandle backend
handle.cancel
            events <- [EventEnvelope UserCreated backend]
-> [EventEnvelope UserCreated backend]
forall a. [a] -> [a]
reverse ([EventEnvelope UserCreated backend]
 -> [EventEnvelope UserCreated backend])
-> IO [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend]
forall a. IORef a -> IO a
readIORef IORef [EventEnvelope UserCreated backend]
receivedEvents
            length events @?= 3
            let userInfos = (EventEnvelope UserCreated backend -> Maybe UserInformation2)
-> [EventEnvelope UserCreated backend] -> [UserInformation2]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe EventEnvelope UserCreated backend -> Maybe UserInformation2
forall backend.
EventEnvelope UserCreated backend -> Maybe UserInformation2
extractUserInfo [EventEnvelope UserCreated backend]
events
            length userInfos @?= 3
            let userNames :: [Text]
                userNames = (UserInformation2 -> Text) -> [UserInformation2] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map UserInformation2 -> Text
userName [UserInformation2]
userInfos
            userNames @?= ["user1", "user2", "user3"]

{- | Test that subscriptions honor the Stop result from handlers

This is a critical property: when a handler returns Stop, the subscription
must not process any subsequent events. This test verifies that all backends
correctly implement this behavior.

Test sequence:
  1. Insert: [Inc, Inc, Stop, Inc, Inc]
  2. Handler: increments counter on Inc, returns Stop on Stop event
  3. Expected: counter = 2 (stopped before processing the last two Incs)
-}
testSubscriptionStopBehavior :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testSubscriptionStopBehavior :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testSubscriptionStopBehavior BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    counter <- newIORef (0 :: Int)
    completionVar <- newEmptyMVar

    -- Handler that increments counter and returns Continue
    let handleInc :: EventHandler CounterInc IO backend
        handleInc EventEnvelope CounterInc backend
_ = do
            IORef Int -> (Int -> (Int, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
counter (\Int
n -> (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, ()))
            SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue

    -- Handler that returns Stop (signaling subscription should end)
    let handleStop :: EventHandler CounterStop IO backend
        handleStop EventEnvelope CounterStop backend
_ = do
            MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
completionVar ()
            SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Stop

    -- Start subscription before inserting events
    handle <-
        subscribe
            store
            ( match CounterInc handleInc
                :? match CounterStop handleStop
                :? MatchEnd
            )
            EventSelector{streamId = AllStreams, startupPosition = FromBeginning}

    -- Insert test sequence: 2 increments, then Stop, then 2 more increments
    let testEvents =
            [ SomeLatestEvent
makeCounterInc -- counter = 1
            , SomeLatestEvent
makeCounterInc -- counter = 2
            , SomeLatestEvent
makeCounterStop -- STOP HERE - should not process further
            , SomeLatestEvent
makeCounterInc -- should NOT be processed
            , SomeLatestEvent
makeCounterInc -- should NOT be processed
            ]

    result <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))

    case result of
        FailedInsertion EventStoreError backend
err -> do
            SubscriptionHandle backend
handle.cancel
            TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert events: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion InsertionSuccess backend
_ -> do
            -- Wait for the Stop handler to signal completion
            MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
completionVar

            -- Give a small grace period to catch any erroneous event processing
            -- If the backend is broken, it might process events after Stop
            Int -> IO ()
threadDelay Int
100000 -- 100ms
            SubscriptionHandle backend
handle.cancel

            -- Verify counter stopped at 2 (before the Stop event)
            finalCount <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
counter
            finalCount @?= 2

{- | Test that handler exceptions enrich failures with event context

When a handler throws an exception, the subscription should die (fail-fast).
The exception should be enriched with full event metadata for debugging.

Test sequence:
  1. Insert: [Inc, Inc, Fail, Inc, Inc]
  2. Handler: increments counter on Inc, throws exception on Fail
  3. Expected: counter = 2, subscription dies with HandlerException containing event metadata
-}
testHandlerExceptionEnrichment :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testHandlerExceptionEnrichment :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testHandlerExceptionEnrichment BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    counter <- newIORef (0 :: Int)

    -- Handler that increments counter and returns Continue
    let handleInc :: EventHandler CounterInc IO backend
        handleInc EventEnvelope CounterInc backend
_ = do
            IORef Int -> (Int -> (Int, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
counter (\Int
n -> (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, ()))
            SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue

    -- Handler that throws a test exception
    let handleFail :: EventHandler CounterFail IO backend
        handleFail EventEnvelope CounterFail backend
_envelope = do
            IOError -> IO SubscriptionResult
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (IOError -> IO SubscriptionResult)
-> IOError -> IO SubscriptionResult
forall a b. (a -> b) -> a -> b
$ TestName -> IOError
userError TestName
"Test exception from CounterFail handler"

    -- Insert test sequence BEFORE starting subscription (to ensure events are ready)
    let testEvents =
            [ SomeLatestEvent
makeCounterInc -- counter = 1
            , SomeLatestEvent
makeCounterInc -- counter = 2
            , SomeLatestEvent
makeCounterFail -- EXCEPTION HERE - subscription should die
            , SomeLatestEvent
makeCounterInc -- should NOT be processed
            , SomeLatestEvent
makeCounterInc -- should NOT be processed
            ]

    result <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))

    case result of
        FailedInsertion EventStoreError backend
err -> do
            TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert events: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion InsertionSuccess backend
_ -> do
            -- Start subscription AFTER inserting events
            handle <-
                BackendHandle backend
-> EventMatcher '[CounterInc, CounterFail] backend IO
-> EventSelector backend
-> IO (SubscriptionHandle backend)
forall backend (m :: * -> *) (ts :: [Symbol]).
(EventStore backend, StoreConstraints backend m) =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints backend m =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribe
                    BackendHandle backend
store
                    ( EventHandler CounterInc IO backend
-> (Proxy CounterInc, EventHandler CounterInc IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy CounterInc, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match CounterInc EventHandler CounterInc IO backend
handleInc
                        (Proxy CounterInc, EventHandler CounterInc IO backend)
-> EventMatcher '[CounterFail] backend IO
-> EventMatcher '[CounterInc, CounterFail] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventHandler CounterFail IO backend
-> (Proxy CounterFail, EventHandler CounterFail IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy CounterFail, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match CounterFail EventHandler CounterFail IO backend
handleFail
                        (Proxy CounterFail, EventHandler CounterFail IO backend)
-> EventMatcher '[] backend IO
-> EventMatcher '[CounterFail] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventMatcher '[] backend IO
forall backend (m :: * -> *). EventMatcher '[] backend m
MatchEnd
                    )
                    EventSelector{streamId :: StreamSelector
streamId = StreamSelector
AllStreams, startupPosition :: StartupPosition backend
startupPosition = StartupPosition backend
forall backend. StartupPosition backend
FromBeginning}

            -- Wait for subscription to complete or fail
            waitResult <- tryAny handle.wait

            -- Verify counter stopped at 2 (subscription died on Fail event)
            finalCount <- readIORef counter
            finalCount @?= 2

            -- Verify the exception is a HandlerException with proper metadata
            case waitResult of
                Left SomeException
exc -> case SomeException -> Maybe HandlerException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc of
                    Just (HandlerException{Maybe CorrelationId
Text
UTCTime
SomeException
EventId
StreamId
StreamVersion
originalException :: SomeException
failedEventPosition :: Text
failedEventId :: EventId
failedEventName :: Text
failedEventStreamId :: StreamId
failedEventStreamVersion :: StreamVersion
failedEventCorrelationId :: Maybe CorrelationId
failedEventCreatedAt :: UTCTime
failedEventCorrelationId :: HandlerException -> Maybe CorrelationId
failedEventCreatedAt :: HandlerException -> UTCTime
failedEventId :: HandlerException -> EventId
failedEventName :: HandlerException -> Text
failedEventPosition :: HandlerException -> Text
failedEventStreamId :: HandlerException -> StreamId
failedEventStreamVersion :: HandlerException -> StreamVersion
originalException :: HandlerException -> SomeException
..}) -> do
                        -- Verify exception enrichment
                        Text
failedEventName Text -> Text -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
@?= Text
"counter_fail"
                        SomeException -> TestName
forall a. Show a => a -> TestName
show SomeException
originalException TestName -> TestName -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
@?= TestName
"user error (Test exception from CounterFail handler)"
                    Maybe HandlerException
Nothing -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Expected HandlerException, got: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ SomeException -> TestName
forall a. Show a => a -> TestName
show SomeException
exc
                Right () -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Expected subscription to fail with HandlerException"

-- Consistency Test implementations --

testNoStreamCondition :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testNoStreamCondition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testNoStreamCondition BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom

    result1 <-
        insertEvents store Nothing $
            singleEvent streamId NoStream (makeUserEvent 1)

    case result1 of
        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"First write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion InsertionSuccess backend
_ -> do
            result2 <-
                BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                    StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream (Int -> SomeLatestEvent
makeUserEvent Int
2)

            case result2 of
                FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Unexpected error: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                SuccessfulInsertion InsertionSuccess backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Second write should have failed"

testStreamExistsCondition :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testStreamExistsCondition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testStreamExistsCondition BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom

    result1 <-
        insertEvents store Nothing $
            appendAfterAny streamId (makeUserEvent 1)

    case result1 of
        FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> do
            _ <-
                BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                    StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream (Int -> SomeLatestEvent
makeUserEvent Int
1)

            result2 <-
                insertEvents store Nothing $
                    appendAfterAny streamId (makeUserEvent 2)

            case result2 of
                FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Second write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                SuccessfulInsertion InsertionSuccess backend
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Unexpected error: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion InsertionSuccess backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"First write should have failed"

testExactVersionCondition :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testExactVersionCondition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testExactVersionCondition BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom

    SuccessfulInsertion (InsertionSuccess{finalCursor = initCursor}) <-
        insertEvents store Nothing $
            singleEvent streamId NoStream (makeUserEvent 42)

    result1 <-
        insertEvents store Nothing $
            singleEvent streamId StreamExists (makeUserEvent 1)

    case result1 of
        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"First write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
cursor1}) -> do
            -- Write with wrong version should fail
            result2 <-
                BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                    StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
initCursor) (Int -> SomeLatestEvent
makeUserEvent Int
2)

            case result2 of
                FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> do
                    -- Write with correct version should succeed
                    result3 <-
                        BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                            StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursor1) (Int -> SomeLatestEvent
makeUserEvent Int
3)

                    case result3 of
                        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Third write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                        SuccessfulInsertion InsertionSuccess backend
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Unexpected error: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                SuccessfulInsertion InsertionSuccess backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Second write should have failed"

testConcurrentWrites :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testConcurrentWrites :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testConcurrentWrites BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom

    -- Initialize stream
    result1 <-
        insertEvents store Nothing $
            singleEvent streamId NoStream (makeUserEvent 1)

    case result1 of
        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Initial write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
cursor}) -> do
            -- Attempt concurrent writes with same expected version
            (result2, result3) <-
                IO (InsertionResult backend)
-> IO (InsertionResult backend)
-> IO (InsertionResult backend, InsertionResult backend)
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m (a, b)
concurrently
                    ( BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                        StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursor) (Int -> SomeLatestEvent
makeUserEvent Int
2)
                    )
                    ( BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                        StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursor) (Int -> SomeLatestEvent
makeUserEvent Int
3)
                    )

            -- Exactly one write should succeed
            case (result2, result3) of
                (SuccessfulInsertion InsertionSuccess backend
_, FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_)) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                (FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_), SuccessfulInsertion InsertionSuccess backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                (InsertionResult backend, InsertionResult backend)
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Expected exactly one write to succeed"

testBatchAtomicity :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testBatchAtomicity :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testBatchAtomicity BackendHandle backend
store = do
    streamId1 <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    streamId2 <- StreamId <$> UUID.nextRandom

    -- Initialize first stream
    _ <-
        insertEvents store Nothing $
            singleEvent streamId1 NoStream (makeUserEvent 1)

    -- Try batch write with one valid and one invalid condition
    result <-
        insertEvents store Nothing $
            fromWrites
                [ (streamId1, StreamWrite StreamExists [makeUserEvent 2])
                , (streamId2, StreamWrite StreamExists [makeUserEvent 3])
                ]

    -- Entire batch should fail
    case result of
        FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        InsertionResult backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Batch write should have failed completely"

testMultiStreamConsistency :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testMultiStreamConsistency :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testMultiStreamConsistency BackendHandle backend
store = do
    streams@[streamId1, streamId2, streamId3] <- Int -> IO StreamId -> IO [StreamId]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
3 (UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom)

    -- Initialize streams with different versions
    let initWrites =
            Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction
                ( [(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([(StreamId, StreamWrite [] SomeLatestEvent backend)]
 -> Map StreamId (StreamWrite [] SomeLatestEvent backend))
-> [(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall a b. (a -> b) -> a -> b
$
                    [StreamId]
-> [StreamWrite [] SomeLatestEvent backend]
-> [(StreamId, StreamWrite [] SomeLatestEvent backend)]
forall a b. [a] -> [b] -> [(a, b)]
zip [StreamId]
streams ([StreamWrite [] SomeLatestEvent backend]
 -> [(StreamId, StreamWrite [] SomeLatestEvent backend)])
-> [StreamWrite [] SomeLatestEvent backend]
-> [(StreamId, StreamWrite [] SomeLatestEvent backend)]
forall a b. (a -> b) -> a -> b
$
                        (Int -> StreamWrite [] SomeLatestEvent backend)
-> [Int] -> [StreamWrite [] SomeLatestEvent backend]
forall a b. (a -> b) -> [a] -> [b]
map
                            (\Int
i -> ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream [Int -> SomeLatestEvent
makeUserEvent Int
i])
                            [Int
1 ..]
                )

    result1 <- insertEvents store Nothing initWrites

    case result1 of
        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Initial writes failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
cursor}) -> do
            -- Try writing to all streams with mix of correct and incorrect versions
            let batch :: Transaction [] backend
batch =
                    Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction
                        ( [(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList
                            [ (StreamId
streamId1, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursor) [Int -> SomeLatestEvent
makeUserEvent Int
4])
                            , (StreamId
streamId2, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream [Int -> SomeLatestEvent
makeUserEvent Int
5]) -- Should fail
                            , (StreamId
streamId3, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
StreamExists [Int -> SomeLatestEvent
makeUserEvent Int
6])
                            ]
                        )

            result2 <- BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing Transaction [] backend
batch

            case result2 of
                FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                InsertionResult backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Mixed version batch should fail"

-- Stream version condition tests --

testExactStreamVersionCondition :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testExactStreamVersionCondition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testExactStreamVersionCondition BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom

    -- Insert first event
    result1 <-
        insertEvents store Nothing $
            singleEvent streamId NoStream (makeUserEvent 1)

    case result1 of
        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"First write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion InsertionSuccess backend
_ -> do
            -- Try with correct stream version
            result2 <-
                BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                    StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (StreamVersion -> ExpectedVersion backend
forall backend. StreamVersion -> ExpectedVersion backend
ExactStreamVersion (Int64 -> StreamVersion
StreamVersion Int64
1)) (Int -> SomeLatestEvent
makeUserEvent Int
2)

            case result2 of
                FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Second write with correct stream version failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                SuccessfulInsertion InsertionSuccess backend
_ -> do
                    -- Try with wrong stream version (still expecting version 1)
                    result3 <-
                        BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                            StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (StreamVersion -> ExpectedVersion backend
forall backend. StreamVersion -> ExpectedVersion backend
ExactStreamVersion (Int64 -> StreamVersion
StreamVersion Int64
1)) (Int -> SomeLatestEvent
makeUserEvent Int
3)

                    case result3 of
                        FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Unexpected error: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                        SuccessfulInsertion InsertionSuccess backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Third write should have failed with wrong stream version"

testVersionExpectationRaceCondition :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testVersionExpectationRaceCondition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testVersionExpectationRaceCondition BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom

    -- Initialize stream
    result <-
        insertEvents store Nothing $
            singleEvent streamId NoStream (makeUserEvent 0)

    case result of
        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Initial write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
cursor}) -> do
            -- Race 10 writers all expecting the same version
            results <- [Int]
-> (Int -> IO (Async (InsertionResult backend)))
-> IO [Async (InsertionResult backend)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1 .. Int
10] ((Int -> IO (Async (InsertionResult backend)))
 -> IO [Async (InsertionResult backend)])
-> (Int -> IO (Async (InsertionResult backend)))
-> IO [Async (InsertionResult backend)]
forall a b. (a -> b) -> a -> b
$ \Int
i -> IO (InsertionResult backend)
-> IO (Async (InsertionResult backend))
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (IO (InsertionResult backend)
 -> IO (Async (InsertionResult backend)))
-> IO (InsertionResult backend)
-> IO (Async (InsertionResult backend))
forall a b. (a -> b) -> a -> b
$ do
                BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                    StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursor) (Int -> SomeLatestEvent
makeUserEvent Int
i)

            outcomes <- mapM wait results
            let successes = [InsertionResult backend
r | r :: InsertionResult backend
r@(SuccessfulInsertion InsertionSuccess backend
_) <- [InsertionResult backend]
outcomes]
                failures = [InsertionResult backend
r | r :: InsertionResult backend
r@(FailedInsertion EventStoreError backend
_) <- [InsertionResult backend]
outcomes]

            length successes @?= 1 -- Exactly one should succeed
            length failures @?= 9 -- All others should fail

            -- Verify all failures are consistency errors
            forM_ failures $ \case
                FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                InsertionResult backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Expected ConsistencyError for version conflict"

testAnyExpectationConcurrency :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testAnyExpectationConcurrency :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testAnyExpectationConcurrency BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom

    -- Initialize stream
    _ <-
        insertEvents store Nothing $
            singleEvent streamId NoStream (makeUserEvent 0)

    -- Spawn 20 concurrent writers with Any expectation
    start <- newEmptyMVar
    results <- forM [1 .. 20] $ \Int
i -> IO (InsertionResult backend)
-> IO (Async (InsertionResult backend))
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (IO (InsertionResult backend)
 -> IO (Async (InsertionResult backend)))
-> IO (InsertionResult backend)
-> IO (Async (InsertionResult backend))
forall a b. (a -> b) -> a -> b
$ do
        MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
start -- Wait for signal
        BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
            StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamId ExpectedVersion backend
forall backend. ExpectedVersion backend
Any (Int -> SomeLatestEvent
makeUserEvent Int
i)

    -- Start all writers simultaneously
    replicateM_ 20 (putMVar start ())

    -- Verify all complete successfully
    outcomes <- mapM wait results
    let successes = [InsertionResult backend
r | r :: InsertionResult backend
r@(SuccessfulInsertion InsertionSuccess backend
_) <- [InsertionResult backend]
outcomes]
    length successes @?= 20 -- All should succeed with Any expectation

testMixedVersionExpectations :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testMixedVersionExpectations :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testMixedVersionExpectations BackendHandle backend
store = do
    -- Create 5 streams
    streams <- Int -> IO StreamId -> IO [StreamId]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
5 (UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom)

    -- Initialize some streams
    case streams of
        [StreamId
s1, StreamId
s2, StreamId
s3, StreamId
s4, StreamId
s5] -> do
            _ <-
                BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                    Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction
                        ( [(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList
                            [ (StreamId
s1, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream [Int -> SomeLatestEvent
makeUserEvent Int
1])
                            , (StreamId
s3, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream [Int -> SomeLatestEvent
makeUserEvent Int
3])
                            ]
                        )

            -- Try batch with mixed expectations
            result <-
                insertEvents store Nothing $
                    Transaction
                        ( Map.fromList
                            [ (s1, StreamWrite StreamExists [makeUserEvent 11]) -- Should succeed
                            , (s2, StreamWrite NoStream [makeUserEvent 12]) -- Should succeed
                            , (s3, StreamWrite (ExactStreamVersion (StreamVersion 2)) [makeUserEvent 13]) -- Should fail (wrong version)
                            , (s4, StreamWrite Any [makeUserEvent 14]) -- Would succeed if batch succeeds
                            , (s5, StreamWrite StreamExists [makeUserEvent 15]) -- Would fail (stream doesn't exist)
                            ]
                        )

            -- Entire batch should fail due to any failure
            case result of
                FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                InsertionResult backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Batch with mixed expectations should fail if any expectation fails"
        [StreamId]
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Expected exactly 5 streams"

testCascadingVersionDependencies :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (EventStoreError backend)) => BackendHandle backend -> IO ()
testCascadingVersionDependencies :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (EventStoreError backend)) =>
BackendHandle backend -> IO ()
testCascadingVersionDependencies BackendHandle backend
store = do
    -- Create a chain of 5 streams
    streams <- Int -> IO StreamId -> IO [StreamId]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
5 (UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom)

    -- Build dependency chain: each stream depends on previous
    cursors <- forM (zip [0 ..] streams) $ \(Int
i, StreamId
stream) -> do
        result <-
            BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
stream ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream (Int -> SomeLatestEvent
makeUserEvent Int
i)
        case result of
            SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
cursor}) -> (StreamId, Cursor backend) -> IO (StreamId, Cursor backend)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StreamId
stream, Cursor backend
cursor)
            FailedInsertion EventStoreError backend
err -> TestName -> IO (StreamId, Cursor backend)
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO (StreamId, Cursor backend))
-> TestName -> IO (StreamId, Cursor backend)
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to create dependency chain: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err

    -- Now update middle of chain and verify dependencies
    case cursors of
        [(StreamId
_, Cursor backend
_), (StreamId
_, Cursor backend
_), (StreamId
s3, Cursor backend
c3), (StreamId
_, Cursor backend
_), (StreamId
_, Cursor backend
_)] -> do
            -- Update s3
            result1 <-
                BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                    StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
s3 (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
c3) (Int -> SomeLatestEvent
makeUserEvent Int
33)

            case result1 of
                FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to update middle stream: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                SuccessfulInsertion InsertionSuccess backend
_ -> do
                    -- Old cursor for s3 should now be invalid
                    result2 <-
                        BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                            StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
s3 (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
c3) (Int -> SomeLatestEvent
makeUserEvent Int
333)

                    case result2 of
                        FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- Expected
                        InsertionResult backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Should not be able to use old cursor after update"
        [(StreamId, Cursor backend)]
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Expected exactly 5 cursors"

{- | Test that stream heads are tracked correctly when multiple streams
are inserted in a single transaction. This is validated indirectly by
checking that each stream can be appended to independently after a multi-stream insert.
-}
testMultiStreamHeadConsistency :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testMultiStreamHeadConsistency :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testMultiStreamHeadConsistency BackendHandle backend
store = do
    -- Create three distinct streams
    streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    streamB <- StreamId <$> UUID.nextRandom
    streamC <- StreamId <$> UUID.nextRandom

    -- Insert multiple events for each stream in a SINGLE transaction
    -- Stream A: 2 events, Stream B: 3 events, Stream C: 1 event
    result <-
        insertEvents store Nothing $
            Transaction
                ( Map.fromList
                    [ (streamA, StreamWrite NoStream [makeUserEvent 1, makeUserEvent 2])
                    , (streamB, StreamWrite NoStream [makeUserEvent 10, makeUserEvent 20, makeUserEvent 30])
                    , (streamC, StreamWrite NoStream [makeUserEvent 100])
                    ]
                )

    case result of
        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert multi-stream batch: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion{} -> do
            -- Verify each stream can be appended to independently
            -- If stream heads are tracked correctly, StreamExists should work for all streams

            -- Append to stream A
            resultA <-
                BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                    StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamA ExpectedVersion backend
forall backend. ExpectedVersion backend
StreamExists (Int -> SomeLatestEvent
makeUserEvent Int
3)

            case resultA of
                FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to append to stream A (stream heads may be corrupted): " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                SuccessfulInsertion InsertionSuccess backend
_ -> do
                    -- Append to stream B
                    resultB <-
                        BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                            StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamB ExpectedVersion backend
forall backend. ExpectedVersion backend
StreamExists (Int -> SomeLatestEvent
makeUserEvent Int
40)

                    case resultB of
                        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to append to stream B (stream heads may be corrupted): " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                        SuccessfulInsertion InsertionSuccess backend
_ -> do
                            -- Append to stream C
                            resultC <-
                                BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                                    StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamC ExpectedVersion backend
forall backend. ExpectedVersion backend
StreamExists (Int -> SomeLatestEvent
makeUserEvent Int
200)

                            case resultC of
                                FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to append to stream C (stream heads may be corrupted): " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                                SuccessfulInsertion InsertionSuccess backend
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- All streams updated successfully

-- | Test that empty batch insertion is handled correctly
testEmptyBatchInsertion :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testEmptyBatchInsertion :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testEmptyBatchInsertion BackendHandle backend
store = do
    -- Try to insert completely empty batch
    result <- BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction (Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. Map k a
Map.empty :: Map.Map StreamId (StreamWrite [] SomeLatestEvent backend)))

    case result of
        FailedInsertion EventStoreError backend
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- Acceptable to reject empty batches
        SuccessfulInsertion InsertionSuccess backend
_ -> do
            -- If we allow empty batches, cursor should be valid (no negative sequence numbers)
            -- Can't easily validate cursor structure across backends, but at least
            -- verify we can query with it
            streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
            result2 <-
                insertEvents store Nothing $
                    appendToOrCreateStream streamId (makeUserEvent 1)
            case result2 of
                FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed follow-up insert after empty batch: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                SuccessfulInsertion InsertionSuccess backend
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-- | Test that streams with zero events in a multi-stream batch are handled correctly
testMixedEmptyStreams :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testMixedEmptyStreams :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testMixedEmptyStreams BackendHandle backend
store = do
    streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    streamB <- StreamId <$> UUID.nextRandom
    streamC <- StreamId <$> UUID.nextRandom

    -- Insert batch where one stream has zero events
    -- Stream A: 2 events, Stream B: 0 events, Stream C: 1 event
    result <-
        insertEvents store Nothing $
            Transaction
                ( Map.fromList
                    [ (streamA, StreamWrite NoStream [makeUserEvent 1, makeUserEvent 2])
                    , (streamB, StreamWrite NoStream [])
                    , (streamC, StreamWrite NoStream [makeUserEvent 100])
                    ]
                )

    case result of
        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert mixed empty/non-empty batch: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion InsertionSuccess backend
_ -> do
            -- Verify stream B (empty) can be created
            resultB <-
                BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                    StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamB ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream (Int -> SomeLatestEvent
makeUserEvent Int
10)

            case resultB of
                FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Stream B should not exist yet, but got error: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                SuccessfulInsertion InsertionSuccess backend
_ -> do
                    -- Verify streams A and C can be appended to (they should exist)
                    resultA <-
                        BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                            StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamA ExpectedVersion backend
forall backend. ExpectedVersion backend
StreamExists (Int -> SomeLatestEvent
makeUserEvent Int
3)

                    resultC <-
                        insertEvents store Nothing $
                            singleEvent streamC StreamExists (makeUserEvent 200)

                    case (resultA, resultC) of
                        (SuccessfulInsertion InsertionSuccess backend
_, SuccessfulInsertion InsertionSuccess backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                        (InsertionResult backend, InsertionResult backend)
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Streams A and C should exist after mixed batch"

-- Per-Stream Cursor Test implementations --

{- | Test that per-stream cursors can be extracted from multi-stream transactions
and used for optimistic locking on specific streams.

This is the core use case from Tutorial 08: when inserting events to multiple
streams in a single transaction, we need to get back individual cursors for
each stream to use in subsequent optimistic locking operations.
-}
testPerStreamCursorExtraction :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testPerStreamCursorExtraction :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testPerStreamCursorExtraction BackendHandle backend
store = do
    streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    streamB <- StreamId <$> UUID.nextRandom
    streamC <- StreamId <$> UUID.nextRandom

    -- Multi-stream transaction
    result1 <-
        insertEvents store Nothing $
            Transaction
                ( Map.fromList
                    [ (streamA, StreamWrite NoStream [makeUserEvent 1])
                    , (streamB, StreamWrite NoStream [makeUserEvent 10])
                    , (streamC, StreamWrite NoStream [makeUserEvent 100])
                    ]
                )

    case result1 of
        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Initial multi-stream write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion (InsertionSuccess{Map StreamId (Cursor backend)
streamCursors :: Map StreamId (Cursor backend)
streamCursors :: forall backend.
InsertionSuccess backend -> Map StreamId (Cursor backend)
streamCursors}) -> do
            -- Extract cursor for stream A
            case StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamA Map StreamId (Cursor backend)
streamCursors of
                Maybe (Cursor backend)
Nothing -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Stream A cursor missing from streamCursors"
                Just Cursor backend
cursorA -> do
                    -- Use cursor A to append to stream A with optimistic locking
                    result2 <-
                        BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                            StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamA (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorA) (Int -> SomeLatestEvent
makeUserEvent Int
2)

                    case result2 of
                        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Append with stream A cursor failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                        SuccessfulInsertion InsertionSuccess backend
_ -> do
                            -- Try to use the same cursor again - should fail (stale)
                            result3 <-
                                BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                                    StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamA (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorA) (Int -> SomeLatestEvent
makeUserEvent Int
3)

                            case result3 of
                                FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- Expected!
                                InsertionResult backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Should not be able to reuse stale cursor"

{- | Test that cursors for different streams are independent.

Updating stream A should not invalidate stream B's cursor. This is critical
for concurrent multi-stream operations where different processes may be
updating different streams.
-}
testCursorIndependence :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testCursorIndependence :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testCursorIndependence BackendHandle backend
store = do
    streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    streamB <- StreamId <$> UUID.nextRandom

    -- Tx1: Initialize both streams
    result1 <-
        insertEvents store Nothing $
            Transaction
                ( Map.fromList
                    [ (streamA, StreamWrite NoStream [makeUserEvent 1])
                    , (streamB, StreamWrite NoStream [makeUserEvent 10])
                    ]
                )

    case result1 of
        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Initial write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion (InsertionSuccess{Map StreamId (Cursor backend)
streamCursors :: forall backend.
InsertionSuccess backend -> Map StreamId (Cursor backend)
streamCursors :: Map StreamId (Cursor backend)
streamCursors}) -> do
            case (StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamA Map StreamId (Cursor backend)
streamCursors, StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamB Map StreamId (Cursor backend)
streamCursors) of
                (Just Cursor backend
cursorA, Just Cursor backend
cursorB) -> do
                    -- Tx2: Update stream A (should NOT affect cursorB)
                    result2 <-
                        BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                            StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamA (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorA) (Int -> SomeLatestEvent
makeUserEvent Int
2)

                    case result2 of
                        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Stream A update failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                        SuccessfulInsertion InsertionSuccess backend
_ -> do
                            -- Tx3: Use cursorB to append to stream B
                            -- This should succeed because stream B hasn't been modified
                            result3 <-
                                BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                                    StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamB (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorB) (Int -> SomeLatestEvent
makeUserEvent Int
11)

                            case result3 of
                                FailedInsertion EventStoreError backend
err ->
                                    TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Stream B cursor should still be valid after stream A update: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                                SuccessfulInsertion InsertionSuccess backend
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- Success! Independence verified
                (Maybe (Cursor backend), Maybe (Cursor backend))
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Missing cursors from initial transaction"

{- | Test that using a stale cursor for one stream in a multi-stream transaction
causes the entire transaction to fail atomically.
-}
testStaleCursorPerStream :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testStaleCursorPerStream :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testStaleCursorPerStream BackendHandle backend
store = do
    streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    streamB <- StreamId <$> UUID.nextRandom

    -- Initialize streams
    result1 <-
        insertEvents store Nothing $
            Transaction
                ( Map.fromList
                    [ (streamA, StreamWrite NoStream [makeUserEvent 1])
                    , (streamB, StreamWrite NoStream [makeUserEvent 10])
                    ]
                )

    case result1 of
        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Initial write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion (InsertionSuccess{Map StreamId (Cursor backend)
streamCursors :: forall backend.
InsertionSuccess backend -> Map StreamId (Cursor backend)
streamCursors :: Map StreamId (Cursor backend)
streamCursors}) -> do
            case (StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamA Map StreamId (Cursor backend)
streamCursors, StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamB Map StreamId (Cursor backend)
streamCursors) of
                (Just Cursor backend
cursorA, Just Cursor backend
cursorB) -> do
                    -- Update stream A separately (makes cursorA stale)
                    result2 <-
                        BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                            StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamA (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorA) (Int -> SomeLatestEvent
makeUserEvent Int
2)

                    case result2 of
                        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Stream A update failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                        SuccessfulInsertion InsertionSuccess backend
_ -> do
                            -- Try multi-stream transaction with stale cursorA
                            -- Even though cursorB is still valid, the transaction should fail
                            result3 <-
                                BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                                    Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction
                                        ( [(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList
                                            [ (StreamId
streamA, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorA) [Int -> SomeLatestEvent
makeUserEvent Int
3]) -- stale!
                                            , (StreamId
streamB, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorB) [Int -> SomeLatestEvent
makeUserEvent Int
11]) -- valid
                                            ]
                                        )

                            case result3 of
                                FailedInsertion (ConsistencyError ConsistencyErrorInfo backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- Expected! Atomic failure
                                InsertionResult backend
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Transaction with stale cursor should fail atomically"
                (Maybe (Cursor backend), Maybe (Cursor backend))
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Missing cursors from initial transaction"

{- | Test that all streams in a multi-stream transaction get cursors,
and that the cursors map is complete.
-}
testCursorCompleteness :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend), Ord (Cursor backend)) => BackendHandle backend -> IO ()
testCursorCompleteness :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend), Ord (Cursor backend)) =>
BackendHandle backend -> IO ()
testCursorCompleteness BackendHandle backend
store = do
    streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    streamB <- StreamId <$> UUID.nextRandom
    streamC <- StreamId <$> UUID.nextRandom

    -- Multi-stream transaction to 3 streams
    result <-
        insertEvents store Nothing $
            Transaction
                ( Map.fromList
                    [ (streamA, StreamWrite NoStream [makeUserEvent 1, makeUserEvent 2])
                    , (streamB, StreamWrite NoStream [makeUserEvent 10])
                    , (streamC, StreamWrite NoStream [makeUserEvent 100, makeUserEvent 101, makeUserEvent 102])
                    ]
                )

    case result of
        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Multi-stream write failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion (InsertionSuccess{Cursor backend
finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor :: Cursor backend
finalCursor, Map StreamId (Cursor backend)
streamCursors :: forall backend.
InsertionSuccess backend -> Map StreamId (Cursor backend)
streamCursors :: Map StreamId (Cursor backend)
streamCursors}) -> do
            -- Verify all 3 streams have cursors
            Map StreamId (Cursor backend) -> Int
forall k a. Map k a -> Int
Map.size Map StreamId (Cursor backend)
streamCursors Int -> Int -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
@?= Int
3

            -- Verify all stream IDs are present
            HasCallStack => TestName -> Bool -> IO ()
TestName -> Bool -> IO ()
assertBool TestName
"Stream A missing from cursors" (StreamId -> Map StreamId (Cursor backend) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member StreamId
streamA Map StreamId (Cursor backend)
streamCursors)
            HasCallStack => TestName -> Bool -> IO ()
TestName -> Bool -> IO ()
assertBool TestName
"Stream B missing from cursors" (StreamId -> Map StreamId (Cursor backend) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member StreamId
streamB Map StreamId (Cursor backend)
streamCursors)
            HasCallStack => TestName -> Bool -> IO ()
TestName -> Bool -> IO ()
assertBool TestName
"Stream C missing from cursors" (StreamId -> Map StreamId (Cursor backend) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member StreamId
streamC Map StreamId (Cursor backend)
streamCursors)

            -- Verify ordering invariant: all stream cursors <= finalCursor
            [(StreamId, Cursor backend)]
-> ((StreamId, Cursor backend) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map StreamId (Cursor backend) -> [(StreamId, Cursor backend)]
forall k a. Map k a -> [(k, a)]
Map.toList Map StreamId (Cursor backend)
streamCursors) (((StreamId, Cursor backend) -> IO ()) -> IO ())
-> ((StreamId, Cursor backend) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(StreamId
sid, Cursor backend
cursor) ->
                HasCallStack => TestName -> Bool -> IO ()
TestName -> Bool -> IO ()
assertBool
                    (TestName
"Cursor for " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ StreamId -> TestName
forall a. Show a => a -> TestName
show StreamId
sid TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ TestName
" violates ordering invariant")
                    (Cursor backend
cursor Cursor backend -> Cursor backend -> Bool
forall a. Ord a => a -> a -> Bool
<= Cursor backend
finalCursor)

            -- Verify we can actually use each cursor
            case (StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamA Map StreamId (Cursor backend)
streamCursors, StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamB Map StreamId (Cursor backend)
streamCursors, StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamC Map StreamId (Cursor backend)
streamCursors) of
                (Just Cursor backend
cursorA, Just Cursor backend
cursorB, Just Cursor backend
cursorC) -> do
                    -- Try using each cursor
                    resultA <- BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$ StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamA (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorA) (Int -> SomeLatestEvent
makeUserEvent Int
3)
                    resultB <- insertEvents store Nothing $ singleEvent streamB (ExactVersion cursorB) (makeUserEvent 11)
                    resultC <- insertEvents store Nothing $ singleEvent streamC (ExactVersion cursorC) (makeUserEvent 103)

                    case (resultA, resultB, resultC) of
                        (SuccessfulInsertion InsertionSuccess backend
_, SuccessfulInsertion InsertionSuccess backend
_, SuccessfulInsertion InsertionSuccess backend
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                        (InsertionResult backend, InsertionResult backend,
 InsertionResult backend)
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"One or more stream cursors were not usable"
                (Maybe (Cursor backend), Maybe (Cursor backend),
 Maybe (Cursor backend))
_ -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Failed to extract all cursors from map"

{- | Test that streams with EMPTY event lists still get proper cursor handling
in multi-stream transactions. This is a critical edge case - implementations
might fail to track cursors for streams that have no events written.

The user specifically flagged this: "Check that we get the proper cursor event
for EMPTY stream writes - that's the kind of places where nasty stupid bugs may lie."
-}
testEmptyStreamCursorHandling :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testEmptyStreamCursorHandling :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> IO ()
testEmptyStreamCursorHandling BackendHandle backend
store = do
    streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    streamB <- StreamId <$> UUID.nextRandom
    streamC <- StreamId <$> UUID.nextRandom

    -- Multi-stream transaction with one EMPTY stream (streamB has no events)
    result <-
        insertEvents store Nothing $
            Transaction
                ( Map.fromList
                    [ (streamA, StreamWrite NoStream [makeUserEvent 1])
                    , (streamB, StreamWrite NoStream []) -- EMPTY! This is the edge case
                    , (streamC, StreamWrite NoStream [makeUserEvent 100])
                    ]
                )

    case result of
        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Multi-stream with empty stream failed: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion (InsertionSuccess{Map StreamId (Cursor backend)
streamCursors :: forall backend.
InsertionSuccess backend -> Map StreamId (Cursor backend)
streamCursors :: Map StreamId (Cursor backend)
streamCursors}) -> do
            -- Critical question: Does streamB get a cursor even though it has no events?
            -- Behavior expectation: streamB should NOT appear in streamCursors because
            -- it didn't actually write any events. Only streams with events get cursors.

            -- Verify streams A and C have cursors (they have events)
            HasCallStack => TestName -> Bool -> IO ()
TestName -> Bool -> IO ()
assertBool TestName
"Stream A should have cursor (has events)" (StreamId -> Map StreamId (Cursor backend) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member StreamId
streamA Map StreamId (Cursor backend)
streamCursors)
            HasCallStack => TestName -> Bool -> IO ()
TestName -> Bool -> IO ()
assertBool TestName
"Stream C should have cursor (has events)" (StreamId -> Map StreamId (Cursor backend) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member StreamId
streamC Map StreamId (Cursor backend)
streamCursors)

            -- Verify streamB does NOT have cursor (no events written)
            -- An empty stream write is effectively a no-op
            case StreamId -> Map StreamId (Cursor backend) -> Maybe (Cursor backend)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamB Map StreamId (Cursor backend)
streamCursors of
                Maybe (Cursor backend)
Nothing -> do
                    -- Expected: streamB shouldn't get a cursor for empty write
                    -- Verify streamB stream doesn't exist (empty write is a no-op)
                    resultB <-
                        BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                            StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamB ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream (Int -> SomeLatestEvent
makeUserEvent Int
10)

                    case resultB of
                        FailedInsertion EventStoreError backend
err -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Stream B should not exist after empty write: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                        SuccessfulInsertion InsertionSuccess backend
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- Good! StreamB was never created
                Just Cursor backend
cursorB -> do
                    -- Alternative behavior: backend gave cursor for empty stream
                    -- In this case, verify the cursor is still usable and stream was created
                    resultB <-
                        BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Transaction [] backend -> IO (InsertionResult backend))
-> Transaction [] backend -> IO (InsertionResult backend)
forall a b. (a -> b) -> a -> b
$
                            StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
forall backend.
StreamId
-> ExpectedVersion backend
-> SomeLatestEvent
-> Transaction [] backend
singleEvent StreamId
streamB (Cursor backend -> ExpectedVersion backend
forall backend. Cursor backend -> ExpectedVersion backend
ExactVersion Cursor backend
cursorB) (Int -> SomeLatestEvent
makeUserEvent Int
10)

                    case resultB of
                        FailedInsertion EventStoreError backend
err ->
                            TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"If empty stream gets cursor, it should be usable: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                        SuccessfulInsertion InsertionSuccess backend
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- Alternative behavior is OK if consistent

-- Stream Version Test implementations --

-- | Test that stream versions start at 1
testStreamVersionsStartAt1 :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testStreamVersionsStartAt1 :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testStreamVersionsStartAt1 BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    receivedVersions <- newIORef []
    completionVar <- newEmptyMVar

    let collectVersions IORef [a]
ref r
event = do
            IORef [a] -> ([a] -> ([a], ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [a]
ref (\[a]
versions -> (r
event.streamVersion a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
versions, ()))
            SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue

    -- Insert first event
    _ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite NoStream [makeUserEvent 1, makeTombstone])]))

    handle <-
        subscribe
            store
            ( match UserCreated (collectVersions receivedVersions)
                :? match Tombstone (handleTombstone completionVar)
                :? MatchEnd
            )
            EventSelector{streamId = SingleStream streamId, startupPosition = FromBeginning}

    takeMVar completionVar
    handle.cancel

    versions <- reverse <$> readIORef receivedVersions
    versions @?= [StreamVersion 1]

-- | Test that stream versions are contiguous (no gaps)
testStreamVersionsContiguous :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testStreamVersionsContiguous :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testStreamVersionsContiguous BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    receivedVersions <- newIORef []
    completionVar <- newEmptyMVar

    let collectVersions IORef [a]
ref r
event = do
            IORef [a] -> ([a] -> ([a], ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [a]
ref (\[a]
versions -> (r
event.streamVersion a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
versions, ()))
            SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue

    -- Insert 5 events in multiple batches
    _ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite NoStream [makeUserEvent 1, makeUserEvent 2])]))
    _ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite StreamExists [makeUserEvent 3])]))
    _ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite StreamExists [makeUserEvent 4, makeUserEvent 5, makeTombstone])]))

    handle <-
        subscribe
            store
            ( match UserCreated (collectVersions receivedVersions)
                :? match Tombstone (handleTombstone completionVar)
                :? MatchEnd
            )
            EventSelector{streamId = SingleStream streamId, startupPosition = FromBeginning}

    takeMVar completionVar
    handle.cancel

    versions <- reverse <$> readIORef receivedVersions
    -- Should be contiguous: 1, 2, 3, 4, 5
    versions @?= [StreamVersion 1, StreamVersion 2, StreamVersion 3, StreamVersion 4, StreamVersion 5]

-- | Test that stream versions are exposed in subscriptions
testStreamVersionExposedInSubscription :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testStreamVersionExposedInSubscription :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testStreamVersionExposedInSubscription BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    receivedVersionRef <- newIORef Nothing
    completionVar <- newEmptyMVar

    let captureVersion IORef (Maybe a)
ref r
event = do
            IORef (Maybe a) -> (Maybe a -> (Maybe a, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Maybe a)
ref (\Maybe a
_ -> (a -> Maybe a
forall a. a -> Maybe a
Just r
event.streamVersion, ()))
            SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue

    _ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite NoStream [makeUserEvent 1, makeTombstone])]))

    handle <-
        subscribe
            store
            ( match UserCreated (captureVersion receivedVersionRef)
                :? match Tombstone (handleTombstone completionVar)
                :? MatchEnd
            )
            EventSelector{streamId = SingleStream streamId, startupPosition = FromBeginning}

    takeMVar completionVar
    handle.cancel

    mbVersion <- readIORef receivedVersionRef
    mbVersion @?= Just (StreamVersion 1)

-- | Test that multiple streams have independent version sequences
testIndependentStreamVersions :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testIndependentStreamVersions :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> IO ()
testIndependentStreamVersions BackendHandle backend
store = do
    streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    streamB <- StreamId <$> UUID.nextRandom
    streamC <- StreamId <$> UUID.nextRandom

    versionsA <- newIORef []
    versionsB <- newIORef []
    versionsC <- newIORef []
    completionVar <- newEmptyMVar

    let collectVersionsFor IORef [a]
ref r
event = do
            IORef [a] -> ([a] -> ([a], ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [a]
ref (\[a]
versions -> (r
event.streamVersion a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
versions, ()))
            SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue

    -- Insert events to all three streams in mixed order, in the SAME transaction
    _ <-
        insertEvents store Nothing $
            Transaction
                ( Map.fromList
                    [ (streamA, StreamWrite NoStream [makeUserEvent 1, makeUserEvent 2]) -- A: versions 1, 2
                    , (streamB, StreamWrite NoStream [makeUserEvent 10]) -- B: version 1
                    , (streamC, StreamWrite NoStream [makeUserEvent 100, makeUserEvent 101, makeUserEvent 102]) -- C: versions 1, 2, 3
                    ]
                )

    -- Insert more events to verify independent counting
    _ <-
        insertEvents store Nothing $
            Transaction
                ( Map.fromList
                    [ (streamA, StreamWrite StreamExists [makeUserEvent 3]) -- A: version 3
                    , (streamB, StreamWrite StreamExists [makeUserEvent 11, makeUserEvent 12]) -- B: versions 2, 3
                    ]
                )

    -- Add tombstones
    _ <-
        insertEvents store Nothing $
            Transaction
                ( Map.fromList
                    [ (streamA, StreamWrite StreamExists [makeTombstone])
                    , (streamB, StreamWrite StreamExists [makeTombstone])
                    , (streamC, StreamWrite StreamExists [makeTombstone])
                    ]
                )

    -- Subscribe to all streams
    remainingTombstones <- newIORef (3 :: Int)
    let handleTombstones EventEnvelope Tombstone backend
_ = do
            remaining <- IORef Int -> (Int -> (Int, Int)) -> IO Int
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
remainingTombstones (\Int
n -> (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1))
            if remaining == 0
                then putMVar completionVar () >> pure Stop
                else pure Continue

    handle <-
        subscribe
            store
            ( match
                UserCreated
                ( \EventEnvelope UserCreated backend
event -> do
                    case EventEnvelope UserCreated backend
event.streamId of
                        StreamId
sid
                            | StreamId
sid StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
streamA -> IORef [StreamVersion] -> EventHandler UserCreated IO backend
forall {r} {a}.
HasField "streamVersion" r a =>
IORef [a] -> r -> IO SubscriptionResult
collectVersionsFor IORef [StreamVersion]
versionsA EventEnvelope UserCreated backend
event
                            | StreamId
sid StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
streamB -> IORef [StreamVersion] -> EventHandler UserCreated IO backend
forall {r} {a}.
HasField "streamVersion" r a =>
IORef [a] -> r -> IO SubscriptionResult
collectVersionsFor IORef [StreamVersion]
versionsB EventEnvelope UserCreated backend
event
                            | StreamId
sid StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
streamC -> IORef [StreamVersion] -> EventHandler UserCreated IO backend
forall {r} {a}.
HasField "streamVersion" r a =>
IORef [a] -> r -> IO SubscriptionResult
collectVersionsFor IORef [StreamVersion]
versionsC EventEnvelope UserCreated backend
event
                            | Bool
otherwise -> SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
                )
                :? match Tombstone handleTombstones
                :? MatchEnd
            )
            EventSelector{streamId = AllStreams, startupPosition = FromBeginning}

    takeMVar completionVar
    handle.cancel

    -- Verify each stream has independent version sequences
    vA <- reverse <$> readIORef versionsA
    vB <- reverse <$> readIORef versionsB
    vC <- reverse <$> readIORef versionsC

    vA @?= [StreamVersion 1, StreamVersion 2, StreamVersion 3]
    vB @?= [StreamVersion 1, StreamVersion 2, StreamVersion 3]
    vC @?= [StreamVersion 1, StreamVersion 2, StreamVersion 3]

-- Multi-Instance Test implementations --

{- | Test that multiple "processes" (separate handles) can subscribe to events
written by another "process". This validates cross-process notification works.
-}
testMultiInstanceSubscription ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) =>
    [BackendHandle backend] ->
    IO ()
testMultiInstanceSubscription :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
[BackendHandle backend] -> IO ()
testMultiInstanceSubscription [BackendHandle backend]
stores = do
    case [BackendHandle backend]
stores of
        [] -> TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Expected at least 1 store handle"
        (BackendHandle backend
writerStore : [BackendHandle backend]
subscriberStores) -> do
            streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom

            -- Create completion vars for each subscriber
            completionVars <- replicateM (length subscriberStores) newEmptyMVar
            receivedEventsRefs <- replicateM (length subscriberStores) (newIORef [])

            -- Start subscriptions on all subscriber stores (simulating separate processes)
            subscriptionHandles <- forM (zip3 subscriberStores completionVars receivedEventsRefs) $
                \(BackendHandle backend
store, MVar ()
completionVar, IORef [EventEnvelope UserCreated backend]
eventsRef) -> do
                    BackendHandle backend
-> EventMatcher '[UserCreated, Tombstone] backend IO
-> EventSelector backend
-> IO (SubscriptionHandle backend)
forall backend (m :: * -> *) (ts :: [Symbol]).
(EventStore backend, StoreConstraints backend m) =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints backend m =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribe
                        BackendHandle backend
store
                        ( EventHandler UserCreated IO backend
-> (Proxy UserCreated, EventHandler UserCreated IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy UserCreated, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match UserCreated (IORef [EventEnvelope UserCreated backend]
-> EventHandler UserCreated IO backend
forall backend.
IORef [EventEnvelope UserCreated backend]
-> EventHandler UserCreated IO backend
collectEventsUntilTombstone IORef [EventEnvelope UserCreated backend]
eventsRef)
                            (Proxy UserCreated, EventHandler UserCreated IO backend)
-> EventMatcher '[Tombstone] backend IO
-> EventMatcher '[UserCreated, Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventHandler Tombstone IO backend
-> (Proxy Tombstone, EventHandler Tombstone IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy Tombstone, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match Tombstone (MVar () -> EventHandler Tombstone IO backend
forall backend. MVar () -> EventHandler Tombstone IO backend
handleTombstone MVar ()
completionVar)
                            (Proxy Tombstone, EventHandler Tombstone IO backend)
-> EventMatcher '[] backend IO
-> EventMatcher '[Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventMatcher '[] backend IO
forall backend (m :: * -> *). EventMatcher '[] backend m
MatchEnd
                        )
                        EventSelector{streamId :: StreamSelector
streamId = StreamSelector
AllStreams, startupPosition :: StartupPosition backend
startupPosition = StartupPosition backend
forall backend. StartupPosition backend
FromBeginning}

            -- Write events from the writer store (simulating different process)
            let testEvents = (Int -> SomeLatestEvent) -> [Int] -> [SomeLatestEvent]
forall a b. (a -> b) -> [a] -> [b]
map Int -> SomeLatestEvent
makeUserEvent [Int
1 .. Int
5] [SomeLatestEvent] -> [SomeLatestEvent] -> [SomeLatestEvent]
forall a. [a] -> [a] -> [a]
++ [SomeLatestEvent
makeTombstone]
            result <-
                insertEvents
                    writerStore
                    Nothing
                    (Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))

            case result of
                FailedInsertion EventStoreError backend
err -> do
                    -- Cancel all subscriptions on failure
                    (SubscriptionHandle backend -> IO ())
-> [SubscriptionHandle backend] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (.cancel) [SubscriptionHandle backend]
subscriptionHandles
                    TestName -> IO ()
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> IO ()) -> TestName -> IO ()
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert events: " TestName -> TestName -> TestName
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
                SuccessfulInsertion InsertionSuccess backend
_ -> do
                    -- Wait for all subscribers to complete
                    [MVar ()] -> (MVar () -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [MVar ()]
completionVars MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar

                    -- Cancel all subscriptions
                    (SubscriptionHandle backend -> IO ())
-> [SubscriptionHandle backend] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (.cancel) [SubscriptionHandle backend]
subscriptionHandles

                    -- Verify all subscribers received the same events
                    allReceivedEvents <- (IORef [EventEnvelope UserCreated backend]
 -> IO [EventEnvelope UserCreated backend])
-> [IORef [EventEnvelope UserCreated backend]]
-> IO [[EventEnvelope UserCreated backend]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (([EventEnvelope UserCreated backend]
 -> [EventEnvelope UserCreated backend])
-> IO [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [EventEnvelope UserCreated backend]
-> [EventEnvelope UserCreated backend]
forall a. [a] -> [a]
reverse (IO [EventEnvelope UserCreated backend]
 -> IO [EventEnvelope UserCreated backend])
-> (IORef [EventEnvelope UserCreated backend]
    -> IO [EventEnvelope UserCreated backend])
-> IORef [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IORef [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend]
forall a. IORef a -> IO a
readIORef) [IORef [EventEnvelope UserCreated backend]]
receivedEventsRefs
                    forM_ allReceivedEvents $ \[EventEnvelope UserCreated backend]
events -> do
                        [EventEnvelope UserCreated backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [EventEnvelope UserCreated backend]
events Int -> Int -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
@?= Int
5
                        let userInfos :: [UserInformation2]
userInfos = (EventEnvelope UserCreated backend -> Maybe UserInformation2)
-> [EventEnvelope UserCreated backend] -> [UserInformation2]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe EventEnvelope UserCreated backend -> Maybe UserInformation2
forall backend.
EventEnvelope UserCreated backend -> Maybe UserInformation2
extractUserInfo [EventEnvelope UserCreated backend]
events
                        [UserInformation2] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [UserInformation2]
userInfos Int -> Int -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
@?= Int
5
                        let userNames :: [Text]
                            userNames :: [Text]
userNames = (UserInformation2 -> Text) -> [UserInformation2] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map UserInformation2 -> Text
userName [UserInformation2]
userInfos
                        [Text]
userNames [Text] -> [Text] -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
@?= [Text
"user1", Text
"user2", Text
"user3", Text
"user4", Text
"user5"]

                    -- All subscribers should have received identical events
                    case allReceivedEvents of
                        [] -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                        ([EventEnvelope UserCreated backend]
firstEvents : [[EventEnvelope UserCreated backend]]
restEvents) ->
                            [[EventEnvelope UserCreated backend]]
-> ([EventEnvelope UserCreated backend] -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [[EventEnvelope UserCreated backend]]
restEvents (([EventEnvelope UserCreated backend] -> IO ()) -> IO ())
-> ([EventEnvelope UserCreated backend] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[EventEnvelope UserCreated backend]
events ->
                                [EventEnvelope UserCreated backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [EventEnvelope UserCreated backend]
events Int -> Int -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
@?= [EventEnvelope UserCreated backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [EventEnvelope UserCreated backend]
firstEvents