{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

{- | Multi-instance event ordering tests

Tests cross-instance subscription ordering guarantees:
- Multi-instance subscriptions (cross-process notifications)
- Event ordering consistency across multiple instances (AllStreams mode)
- Event ordering consistency for single stream across multiple instances
- Hash-chain verification to detect ordering violations
-}
module Test.Hindsight.Store.MultiInstanceEventOrderingTests (multiInstanceEventOrderingTests) where

import Control.Concurrent (MVar, newEmptyMVar, putMVar, takeMVar, threadDelay)
import Control.Monad (forM, forM_, replicateM, when)
import Crypto.Hash (Digest, SHA256, hash)
import Data.Bits (shiftR)
import Data.ByteArray qualified as BA
import Data.ByteString (ByteString)
import Data.ByteString qualified as BS
import Data.ByteString.Lazy qualified as BSL
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.Int (Int64)
import Data.Map.Strict qualified as Map
import Data.Maybe (mapMaybe)
import Data.Text (Text)
import Data.UUID (toByteString)
import Data.UUID.V4 qualified as UUID
import Data.Word (Word8)
import Hindsight.Store
import System.Random (randomRIO)
import System.Timeout (timeout)
import Test.Hindsight.Examples (Tombstone, UserCreated, UserInformation2 (..), makeTombstone, makeUserEvent)
import Test.Hindsight.Store.Common (collectEvents, extractPayload, handleTombstone)
import Test.Hindsight.Store.TestRunner (EventStoreTestRunner (..))
import Test.Tasty
import Test.Tasty.HUnit
import UnliftIO.Async (async, forConcurrently, forConcurrently_)

-- | Multi-instance ordering test suite for event store backends
multiInstanceEventOrderingTests ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO, Show (Cursor backend), Ord (Cursor backend)) =>
    EventStoreTestRunner backend ->
    [TestTree]
multiInstanceEventOrderingTests :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend), Ord (Cursor backend)) =>
EventStoreTestRunner backend -> [TestTree]
multiInstanceEventOrderingTests EventStoreTestRunner backend
runner =
    [ String -> [TestTree] -> TestTree
testGroup
        String
"Multi-Instance Tests"
        [ String -> Assertion -> TestTree
testCase String
"Multi-Instance Subscriptions (2 instances)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
withStores EventStoreTestRunner backend
runner Int
2 [BackendHandle backend] -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
[BackendHandle backend] -> Assertion
testMultiInstanceSubscription
        , String -> Assertion -> TestTree
testCase String
"Multi-Instance Subscriptions (5 instances)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
withStores EventStoreTestRunner backend
runner Int
5 [BackendHandle backend] -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
[BackendHandle backend] -> Assertion
testMultiInstanceSubscription
        , String -> Assertion -> TestTree
testCase String
"Multi-Instance Subscriptions (10 instances)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
withStores EventStoreTestRunner backend
runner Int
10 [BackendHandle backend] -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
[BackendHandle backend] -> Assertion
testMultiInstanceSubscription
        ]
    , String -> [TestTree] -> TestTree
testGroup
        String
"Multi-Instance Event Ordering Tests (AllStreams)"
        [ String -> Assertion -> TestTree
testCase String
"Event Ordering AllStreams (2 instances, 5 events each)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
            EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
withStores EventStoreTestRunner backend
runner Int
2 (Int -> Int -> [BackendHandle backend] -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend), Ord (Cursor backend)) =>
Int -> Int -> [BackendHandle backend] -> Assertion
testMultiInstanceEventOrdering_AllStreams Int
5 Int
2)
        , String -> Assertion -> TestTree
testCase String
"Event Ordering AllStreams (3 instances, 10 events each)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
            EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
withStores EventStoreTestRunner backend
runner Int
3 (Int -> Int -> [BackendHandle backend] -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend), Ord (Cursor backend)) =>
Int -> Int -> [BackendHandle backend] -> Assertion
testMultiInstanceEventOrdering_AllStreams Int
10 Int
3)
        , String -> Assertion -> TestTree
testCase String
"Event Ordering AllStreams (5 instances, 20 events each)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
            EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
withStores EventStoreTestRunner backend
runner Int
5 (Int -> Int -> [BackendHandle backend] -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend), Ord (Cursor backend)) =>
Int -> Int -> [BackendHandle backend] -> Assertion
testMultiInstanceEventOrdering_AllStreams Int
20 Int
5)
        ]
    , String -> [TestTree] -> TestTree
testGroup
        String
"Multi-Instance Event Ordering Tests (SingleStream)"
        [ String -> Assertion -> TestTree
testCase String
"Event Ordering SingleStream (2 instances, 5 events each)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
            EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
withStores EventStoreTestRunner backend
runner Int
2 (Int -> Int -> [BackendHandle backend] -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend), Ord (Cursor backend)) =>
Int -> Int -> [BackendHandle backend] -> Assertion
testMultiInstanceEventOrdering_SingleStream Int
5 Int
2)
        , String -> Assertion -> TestTree
testCase String
"Event Ordering SingleStream (3 instances, 10 events each)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
            EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
withStores EventStoreTestRunner backend
runner Int
3 (Int -> Int -> [BackendHandle backend] -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend), Ord (Cursor backend)) =>
Int -> Int -> [BackendHandle backend] -> Assertion
testMultiInstanceEventOrdering_SingleStream Int
10 Int
3)
        , String -> Assertion -> TestTree
testCase String
"Event Ordering SingleStream (5 instances, 20 events each)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
            EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
withStores EventStoreTestRunner backend
runner Int
5 (Int -> Int -> [BackendHandle backend] -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend), Ord (Cursor backend)) =>
Int -> Int -> [BackendHandle backend] -> Assertion
testMultiInstanceEventOrdering_SingleStream Int
20 Int
5)
        ]
    ]

-- * Test Implementations

{- | Test that multiple "processes" (separate handles) can subscribe to events
written by another "process". This validates cross-process notification works.
-}
testMultiInstanceSubscription ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) =>
    [BackendHandle backend] ->
    IO ()
testMultiInstanceSubscription :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
[BackendHandle backend] -> Assertion
testMultiInstanceSubscription [BackendHandle backend]
stores = do
    case [BackendHandle backend]
stores of
        [] -> String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure String
"Expected at least 1 store handle"
        (BackendHandle backend
writerStore : [BackendHandle backend]
subscriberStores) -> do
            streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom

            -- Create completion vars for each subscriber
            completionVars <- replicateM (length subscriberStores) newEmptyMVar
            receivedEventsRefs <- replicateM (length subscriberStores) (newIORef [])

            -- Start subscriptions on all subscriber stores (simulating separate processes)
            subscriptionHandles <- forM (zip3 subscriberStores completionVars receivedEventsRefs) $
                \(BackendHandle backend
store, MVar ()
completionVar, IORef [EventEnvelope UserCreated backend]
eventsRef) -> do
                    BackendHandle backend
-> EventMatcher '[UserCreated, Tombstone] backend IO
-> EventSelector backend
-> IO (SubscriptionHandle backend)
forall backend (m :: * -> *) (ts :: [Symbol]).
(EventStore backend, StoreConstraints backend m) =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints backend m =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribe
                        BackendHandle backend
store
                        ( EventHandler UserCreated IO backend
-> (Proxy UserCreated, EventHandler UserCreated IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy UserCreated, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match UserCreated (IORef [EventEnvelope UserCreated backend]
-> EventHandler UserCreated IO backend
forall (event :: Symbol) backend.
IORef [EventEnvelope event backend]
-> EventHandler event IO backend
collectEvents IORef [EventEnvelope UserCreated backend]
eventsRef)
                            (Proxy UserCreated, EventHandler UserCreated IO backend)
-> EventMatcher '[Tombstone] backend IO
-> EventMatcher '[UserCreated, Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventHandler Tombstone IO backend
-> (Proxy Tombstone, EventHandler Tombstone IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy Tombstone, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match Tombstone (MVar () -> EventHandler Tombstone IO backend
forall (event :: Symbol) backend.
MVar () -> EventHandler event IO backend
handleTombstone MVar ()
completionVar)
                            (Proxy Tombstone, EventHandler Tombstone IO backend)
-> EventMatcher '[] backend IO
-> EventMatcher '[Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventMatcher '[] backend IO
forall backend (m :: * -> *). EventMatcher '[] backend m
MatchEnd
                        )
                        EventSelector{streamId :: StreamSelector
streamId = StreamSelector
AllStreams, startupPosition :: StartupPosition backend
startupPosition = StartupPosition backend
forall backend. StartupPosition backend
FromBeginning}

            -- Write events from the writer store (simulating different process)
            let testEvents = (Int -> SomeLatestEvent) -> [Int] -> [SomeLatestEvent]
forall a b. (a -> b) -> [a] -> [b]
map Int -> SomeLatestEvent
makeUserEvent [Int
1 .. Int
5] [SomeLatestEvent] -> [SomeLatestEvent] -> [SomeLatestEvent]
forall a. [a] -> [a] -> [a]
++ [SomeLatestEvent
makeTombstone]
            result <-
                insertEvents
                    writerStore
                    Nothing
                    (Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))

            case result of
                FailedInsertion EventStoreError backend
err -> do
                    -- Cancel all subscriptions on failure
                    (SubscriptionHandle backend -> Assertion)
-> [SubscriptionHandle backend] -> Assertion
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (.cancel) [SubscriptionHandle backend]
subscriptionHandles
                    String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$ String
"Failed to insert events: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> String
forall a. Show a => a -> String
show EventStoreError backend
err
                SuccessfulInsertion InsertionSuccess backend
_ -> do
                    -- Wait for all subscribers to complete
                    [MVar ()] -> (MVar () -> Assertion) -> Assertion
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [MVar ()]
completionVars MVar () -> Assertion
forall a. MVar a -> IO a
takeMVar

                    -- Cancel all subscriptions
                    (SubscriptionHandle backend -> Assertion)
-> [SubscriptionHandle backend] -> Assertion
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (.cancel) [SubscriptionHandle backend]
subscriptionHandles

                    -- Verify all subscribers received the same events
                    allReceivedEvents <- (IORef [EventEnvelope UserCreated backend]
 -> IO [EventEnvelope UserCreated backend])
-> [IORef [EventEnvelope UserCreated backend]]
-> IO [[EventEnvelope UserCreated backend]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (([EventEnvelope UserCreated backend]
 -> [EventEnvelope UserCreated backend])
-> IO [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [EventEnvelope UserCreated backend]
-> [EventEnvelope UserCreated backend]
forall a. [a] -> [a]
reverse (IO [EventEnvelope UserCreated backend]
 -> IO [EventEnvelope UserCreated backend])
-> (IORef [EventEnvelope UserCreated backend]
    -> IO [EventEnvelope UserCreated backend])
-> IORef [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IORef [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend]
forall a. IORef a -> IO a
readIORef) [IORef [EventEnvelope UserCreated backend]]
receivedEventsRefs
                    forM_ allReceivedEvents $ \[EventEnvelope UserCreated backend]
events -> do
                        [EventEnvelope UserCreated backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [EventEnvelope UserCreated backend]
events Int -> Int -> Assertion
forall a. (Eq a, Show a, HasCallStack) => a -> a -> Assertion
@?= Int
5
                        let userInfos :: [UserInformation2]
userInfos = (EventEnvelope UserCreated backend -> UserInformation2)
-> [EventEnvelope UserCreated backend] -> [UserInformation2]
forall a b. (a -> b) -> [a] -> [b]
map EventEnvelope UserCreated backend -> CurrentPayloadType UserCreated
EventEnvelope UserCreated backend -> UserInformation2
forall (event :: Symbol) backend.
EventEnvelope event backend -> CurrentPayloadType event
extractPayload [EventEnvelope UserCreated backend]
events
                        [UserInformation2] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [UserInformation2]
userInfos Int -> Int -> Assertion
forall a. (Eq a, Show a, HasCallStack) => a -> a -> Assertion
@?= Int
5
                        let userNames :: [Text]
                            userNames :: [Text]
userNames = (UserInformation2 -> Text) -> [UserInformation2] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (.userName) [UserInformation2]
userInfos
                        [Text]
userNames [Text] -> [Text] -> Assertion
forall a. (Eq a, Show a, HasCallStack) => a -> a -> Assertion
@?= [Text
"user1", Text
"user2", Text
"user3", Text
"user4", Text
"user5"]

                    -- All subscribers should have received identical events
                    case allReceivedEvents of
                        [] -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                        ([EventEnvelope UserCreated backend]
firstEvents : [[EventEnvelope UserCreated backend]]
restEvents) ->
                            [[EventEnvelope UserCreated backend]]
-> ([EventEnvelope UserCreated backend] -> Assertion) -> Assertion
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [[EventEnvelope UserCreated backend]]
restEvents (([EventEnvelope UserCreated backend] -> Assertion) -> Assertion)
-> ([EventEnvelope UserCreated backend] -> Assertion) -> Assertion
forall a b. (a -> b) -> a -> b
$ \[EventEnvelope UserCreated backend]
events ->
                                [EventEnvelope UserCreated backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [EventEnvelope UserCreated backend]
events Int -> Int -> Assertion
forall a. (Eq a, Show a, HasCallStack) => a -> a -> Assertion
@?= [EventEnvelope UserCreated backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [EventEnvelope UserCreated backend]
firstEvents

{- | Test that multiple instances observe events in the same total order (AllStreams mode).

This test validates a critical property for event sourcing: all subscribers,
regardless of when they start or which instance they're on, MUST see events
in the same global order.

Test design:
  - N instances (separate store handles to same backend)
  - Each instance writes M events total to its own stream (all CONCURRENT with jitter)
  - 6N total subscriptions testing all combinations of timing and startup mode:

    **FromBeginning (3N subscriptions):**
    1. FB-initial: Started BEFORE any writes
    2. FB-during: Started DURING writes (mid-stream)
    3. FB-after: Started AFTER all writes complete

    **FromPosition (3N subscriptions):**
    4. FLP-initial: Started right AFTER checkpoint acquisition
    5. FLP-during: Started DURING writes (mid-stream)
    6. FLP-after: Started AFTER all writes complete

  - Checkpoint: Captured after first M/2 concurrent writes (maximum cursor)
  - Each subscription computes a blockchain-style hash chain:
      hash_n = SHA256(hash_{n-1} || eventId || streamId || streamVersion)

  - Verification:
    * All FromBeginning groups (initial, during, after) → same hash H1
    * All FromPosition groups (initial, during, after) → same hash H2
    * H1 ≠ H2 (sanity check: they saw different event sets)

Why this is comprehensive:
  - Tests BOTH startup modes (FromBeginning and FromPosition)
  - Tests subscriptions started at ALL phases (before/during/after writes)
  - All writes fully concurrent with jitter (realistic stress test)
  - Hash chaining makes ordering violations detectable
  - Validates cursor-based resumption works correctly under load
-}
testMultiInstanceEventOrdering_AllStreams ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO, Show (Cursor backend), Ord (Cursor backend)) =>
    -- | Number of events each instance writes
    Int ->
    -- | Number of instances
    Int ->
    [BackendHandle backend] ->
    IO ()
testMultiInstanceEventOrdering_AllStreams :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend), Ord (Cursor backend)) =>
Int -> Int -> [BackendHandle backend] -> Assertion
testMultiInstanceEventOrdering_AllStreams Int
numEventsPerInstance Int
numInstances [BackendHandle backend]
stores = do
    -- Validate inputs
    Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([BackendHandle backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [BackendHandle backend]
stores Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
numInstances) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
        String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
            String
"Expected " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
numInstances String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" store handles, got " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show ([BackendHandle backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [BackendHandle backend]
stores)

    -- Split events into two phases (M/2 each)
    let halfEvents :: Int
halfEvents = Int
numEventsPerInstance Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
2

    -- Each instance gets a unique stream
    streamIds <- Int -> IO StreamId -> IO [StreamId]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numInstances (UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom)

    -- Shared tombstone stream (signals end of test)
    tombstoneStream <- StreamId <$> UUID.nextRandom

    -- 6 subscription groups: 6N total subscriptions
    hashRefs <- replicateM (6 * numInstances) (newIORef (initialHash :: Digest SHA256))
    completionVars <- replicateM (6 * numInstances) newEmptyMVar

    -- Phase 1: Write M/2 events CONCURRENTLY with jitter, capturing cursors
    allCursors <- forConcurrently (zip stores streamIds) $ \(BackendHandle backend
store, StreamId
streamId) -> do
        jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
50_000)
        threadDelay jitter
        forM [1 .. halfEvents] $ \Int
eventNum -> do
            eventJitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
25_000)
            threadDelay eventJitter
            let event = Int -> SomeLatestEvent
makeUserEvent Int
eventNum
            result <- insertEvents store Nothing (singleEvent streamId Any event)
            case result of
                FailedInsertion EventStoreError backend
err -> String -> IO (Cursor backend)
forall a. HasCallStack => String -> IO a
assertFailure (String -> IO (Cursor backend)) -> String -> IO (Cursor backend)
forall a b. (a -> b) -> a -> b
$ String
"Phase 1 write failed: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> String
forall a. Show a => a -> String
show EventStoreError backend
err
                SuccessfulInsertion InsertionSuccess backend
success -> Cursor backend -> IO (Cursor backend)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure InsertionSuccess backend
success.finalCursor

    -- Capture checkpoint cursor (maximum across all Phase 1 writes)
    let checkpointCursor = case [[Cursor backend]] -> [Cursor backend]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [[Cursor backend]]
allCursors of
            [] -> String -> Cursor backend
forall a. HasCallStack => String -> a
error String
"No cursors captured from Phase 1"
            [Cursor backend]
cs -> [Cursor backend] -> Cursor backend
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum [Cursor backend]
cs

    -- Phase 2: Start FB-initial and FLP-initial subscriptions
    let fbInitialRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances [IORef (Digest SHA256)]
hashRefs
        fbInitialVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances [MVar ()]
completionVars
        flpInitialRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop Int
numInstances [IORef (Digest SHA256)]
hashRefs)
        flpInitialVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop Int
numInstances [MVar ()]
completionVars)

    fbInitialHandles <- forConcurrently (zip3 stores fbInitialRefs fbInitialVars) $
        \(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
            jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
            threadDelay jitter
            subscribe
                store
                ( match UserCreated (hashEventHandler hashRef)
                    :? match Tombstone (handleTombstone doneVar)
                    :? MatchEnd
                )
                EventSelector{streamId = AllStreams, startupPosition = FromBeginning}

    flpInitialHandles <- forConcurrently (zip3 stores flpInitialRefs flpInitialVars) $
        \(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
            jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
            threadDelay jitter
            subscribe
                store
                ( match UserCreated (hashEventHandler hashRef)
                    :? match Tombstone (handleTombstone doneVar)
                    :? MatchEnd
                )
                EventSelector{streamId = AllStreams, startupPosition = FromPosition checkpointCursor}

    -- Phase 3: Write remaining M/2 events CONCURRENTLY, spawn "during" subs mid-write
    let fbDuringRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop (Int
2 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [IORef (Digest SHA256)]
hashRefs)
        fbDuringVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop (Int
2 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [MVar ()]
completionVars)
        flpDuringRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop (Int
3 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [IORef (Digest SHA256)]
hashRefs)
        flpDuringVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop (Int
3 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [MVar ()]
completionVars)

    duringHandlesVar <- newEmptyMVar

    _ <- async $ do
        threadDelay 1_000_000

        fbDuringHandles <- forConcurrently (zip3 stores fbDuringRefs fbDuringVars) $
            \(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
                jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
                threadDelay jitter
                subscribe
                    store
                    ( match UserCreated (hashEventHandler hashRef)
                        :? match Tombstone (handleTombstone doneVar)
                        :? MatchEnd
                    )
                    EventSelector{streamId = AllStreams, startupPosition = FromBeginning}

        flpDuringHandles <- forConcurrently (zip3 stores flpDuringRefs flpDuringVars) $
            \(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
                jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
                threadDelay jitter
                subscribe
                    store
                    ( match UserCreated (hashEventHandler hashRef)
                        :? match Tombstone (handleTombstone doneVar)
                        :? MatchEnd
                    )
                    EventSelector{streamId = AllStreams, startupPosition = FromPosition checkpointCursor}

        putMVar duringHandlesVar (fbDuringHandles <> flpDuringHandles)

    -- Write remaining events concurrently
    forConcurrently_ (zip stores streamIds) $ \(BackendHandle backend
store, StreamId
streamId) -> do
        jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
50_000)
        threadDelay jitter
        forM_ [(halfEvents + 1) .. numEventsPerInstance] $ \Int
eventNum -> do
            eventJitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
25_000)
            threadDelay eventJitter
            let event = Int -> SomeLatestEvent
makeUserEvent Int
eventNum
            result <- insertEvents store Nothing (singleEvent streamId Any event)
            case result of
                FailedInsertion EventStoreError backend
err -> String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$ String
"Phase 3 write failed: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> String
forall a. Show a => a -> String
show EventStoreError backend
err
                SuccessfulInsertion InsertionSuccess backend
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

    duringHandles <- takeMVar duringHandlesVar

    -- Phase 4: Start "after" subscriptions
    let fbAfterRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [IORef (Digest SHA256)]
hashRefs)
        fbAfterVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [MVar ()]
completionVars)
        flpAfterRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop (Int
5 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [IORef (Digest SHA256)]
hashRefs
        flpAfterVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop (Int
5 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [MVar ()]
completionVars

    fbAfterHandles <- forConcurrently (zip3 stores fbAfterRefs fbAfterVars) $
        \(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
            jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
            threadDelay jitter
            subscribe
                store
                ( match UserCreated (hashEventHandler hashRef)
                    :? match Tombstone (handleTombstone doneVar)
                    :? MatchEnd
                )
                EventSelector{streamId = AllStreams, startupPosition = FromBeginning}

    flpAfterHandles <- forConcurrently (zip3 stores flpAfterRefs flpAfterVars) $
        \(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
            jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
            threadDelay jitter
            subscribe
                store
                ( match UserCreated (hashEventHandler hashRef)
                    :? match Tombstone (handleTombstone doneVar)
                    :? MatchEnd
                )
                EventSelector{streamId = AllStreams, startupPosition = FromPosition checkpointCursor}

    -- Phase 5: Write tombstone to signal end of test
    result <- insertEvents (head stores) Nothing (singleEvent tombstoneStream Any makeTombstone)

    let allHandles = [SubscriptionHandle backend]
fbInitialHandles [SubscriptionHandle backend]
-> [SubscriptionHandle backend] -> [SubscriptionHandle backend]
forall a. Semigroup a => a -> a -> a
<> [SubscriptionHandle backend]
flpInitialHandles [SubscriptionHandle backend]
-> [SubscriptionHandle backend] -> [SubscriptionHandle backend]
forall a. Semigroup a => a -> a -> a
<> [SubscriptionHandle backend]
duringHandles [SubscriptionHandle backend]
-> [SubscriptionHandle backend] -> [SubscriptionHandle backend]
forall a. Semigroup a => a -> a -> a
<> [SubscriptionHandle backend]
fbAfterHandles [SubscriptionHandle backend]
-> [SubscriptionHandle backend] -> [SubscriptionHandle backend]
forall a. Semigroup a => a -> a -> a
<> [SubscriptionHandle backend]
flpAfterHandles

    case result of
        FailedInsertion EventStoreError backend
err -> do
            (SubscriptionHandle backend -> Assertion)
-> [SubscriptionHandle backend] -> Assertion
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (.cancel) [SubscriptionHandle backend]
allHandles
            String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$ String
"Failed to insert tombstone: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> String
forall a. Show a => a -> String
show EventStoreError backend
err
        SuccessfulInsertion InsertionSuccess backend
_ -> do
            timeoutResult <- Int -> Assertion -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
30_000_000 (Assertion -> IO (Maybe ())) -> Assertion -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ [MVar ()] -> (MVar () -> Assertion) -> Assertion
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [MVar ()]
completionVars MVar () -> Assertion
forall a. MVar a -> IO a
takeMVar
            mapM_ (.cancel) allHandles

            case timeoutResult of
                Maybe ()
Nothing -> String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure String
"Test timed out waiting for subscriptions"
                Just ()
_ -> do
                    fbInitialHashes <- (IORef (Digest SHA256) -> IO (Digest SHA256))
-> [IORef (Digest SHA256)] -> IO [Digest SHA256]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM IORef (Digest SHA256) -> IO (Digest SHA256)
forall a. IORef a -> IO a
readIORef (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances [IORef (Digest SHA256)]
hashRefs)
                    flpInitialHashes <- mapM readIORef (take numInstances (drop numInstances hashRefs))
                    fbDuringHashes <- mapM readIORef (take numInstances (drop (2 * numInstances) hashRefs))
                    flpDuringHashes <- mapM readIORef (take numInstances (drop (3 * numInstances) hashRefs))
                    fbAfterHashes <- mapM readIORef (take numInstances (drop (4 * numInstances) hashRefs))
                    flpAfterHashes <- mapM readIORef (drop (5 * numInstances) hashRefs)

                    verifyGroupConsistency "FB-initial" fbInitialHashes
                    verifyGroupConsistency "FLP-initial" flpInitialHashes
                    verifyGroupConsistency "FB-during" fbDuringHashes
                    verifyGroupConsistency "FLP-during" flpDuringHashes
                    verifyGroupConsistency "FB-after" fbAfterHashes
                    verifyGroupConsistency "FLP-after" flpAfterHashes

                    case (fbInitialHashes, fbDuringHashes, fbAfterHashes) of
                        (Digest SHA256
h1 : [Digest SHA256]
_, Digest SHA256
h2 : [Digest SHA256]
_, Digest SHA256
h3 : [Digest SHA256]
_) -> do
                            Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
/= Digest SHA256
h2) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
                                String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
                                    String
"FB-initial and FB-during have different hashes:\n"
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"  FB-initial: "
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h1
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\n  FB-during:  "
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h2
                            Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
/= Digest SHA256
h3) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
                                String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
                                    String
"FB-initial and FB-after have different hashes:\n"
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"  FB-initial: "
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h1
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\n  FB-after:   "
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h3
                        ([Digest SHA256], [Digest SHA256], [Digest SHA256])
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

                    case (flpInitialHashes, flpDuringHashes, flpAfterHashes) of
                        (Digest SHA256
h1 : [Digest SHA256]
_, Digest SHA256
h2 : [Digest SHA256]
_, Digest SHA256
h3 : [Digest SHA256]
_) -> do
                            Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
/= Digest SHA256
h2) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
                                String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
                                    String
"FLP-initial and FLP-during have different hashes:\n"
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"  FLP-initial: "
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h1
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\n  FLP-during:  "
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h2
                            Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
/= Digest SHA256
h3) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
                                String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
                                    String
"FLP-initial and FLP-after have different hashes:\n"
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"  FLP-initial: "
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h1
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\n  FLP-after:   "
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h3
                        ([Digest SHA256], [Digest SHA256], [Digest SHA256])
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

                    case (fbInitialHashes, flpInitialHashes) of
                        (Digest SHA256
h1 : [Digest SHA256]
_, Digest SHA256
h2 : [Digest SHA256]
_) ->
                            Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
== Digest SHA256
h2) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
                                String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure
                                    String
"SANITY CHECK FAILED: FB and FLP groups have the same hash!\n\
                                    \This indicates FromPosition is not working correctly."
                        ([Digest SHA256], [Digest SHA256])
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

{- | Test that multiple instances observe events in the same total order (SingleStream mode).

This test validates that single-stream subscriptions maintain ordering consistency
across multiple instances writing to the same stream concurrently.

Key differences from AllStreams test:
  - Write target: all instances → 1 shared stream (not N separate streams)
  - Subscription selector: SingleStream(sharedStream) (not AllStreams)
  - Tests concurrent writes to the same stream from multiple instances
-}
testMultiInstanceEventOrdering_SingleStream ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO, Show (Cursor backend), Ord (Cursor backend)) =>
    Int ->
    Int ->
    [BackendHandle backend] ->
    IO ()
testMultiInstanceEventOrdering_SingleStream :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend), Ord (Cursor backend)) =>
Int -> Int -> [BackendHandle backend] -> Assertion
testMultiInstanceEventOrdering_SingleStream Int
numEventsPerInstance Int
numInstances [BackendHandle backend]
stores = do
    Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([BackendHandle backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [BackendHandle backend]
stores Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
numInstances) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
        String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
            String
"Expected " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
numInstances String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" store handles, got " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show ([BackendHandle backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [BackendHandle backend]
stores)

    let halfEvents :: Int
halfEvents = Int
numEventsPerInstance Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
2

    sharedStream <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom

    hashRefs <- replicateM (6 * numInstances) (newIORef (initialHash :: Digest SHA256))
    completionVars <- replicateM (6 * numInstances) newEmptyMVar

    allCursors <- forConcurrently stores $ \BackendHandle backend
store -> do
        jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
50_000)
        threadDelay jitter
        forM [1 .. halfEvents] $ \Int
eventNum -> do
            eventJitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
25_000)
            threadDelay eventJitter
            let event = Int -> SomeLatestEvent
makeUserEvent Int
eventNum
            result <- insertEvents store Nothing (singleEvent sharedStream Any event)
            case result of
                FailedInsertion EventStoreError backend
err -> String -> IO (Cursor backend)
forall a. HasCallStack => String -> IO a
assertFailure (String -> IO (Cursor backend)) -> String -> IO (Cursor backend)
forall a b. (a -> b) -> a -> b
$ String
"Phase 1 write failed: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> String
forall a. Show a => a -> String
show EventStoreError backend
err
                SuccessfulInsertion InsertionSuccess backend
success -> Cursor backend -> IO (Cursor backend)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure InsertionSuccess backend
success.finalCursor

    let checkpointCursor = case [[Cursor backend]] -> [Cursor backend]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [[Cursor backend]]
allCursors of
            [] -> String -> Cursor backend
forall a. HasCallStack => String -> a
error String
"No cursors captured from Phase 1"
            [Cursor backend]
cs -> [Cursor backend] -> Cursor backend
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum [Cursor backend]
cs

    let fbInitialRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances [IORef (Digest SHA256)]
hashRefs
        fbInitialVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances [MVar ()]
completionVars
        flpInitialRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop Int
numInstances [IORef (Digest SHA256)]
hashRefs)
        flpInitialVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop Int
numInstances [MVar ()]
completionVars)

    fbInitialHandles <- forConcurrently (zip3 stores fbInitialRefs fbInitialVars) $
        \(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
            jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
            threadDelay jitter
            subscribe
                store
                ( match UserCreated (hashEventHandler hashRef)
                    :? match Tombstone (handleTombstone doneVar)
                    :? MatchEnd
                )
                EventSelector{streamId = SingleStream sharedStream, startupPosition = FromBeginning}

    flpInitialHandles <- forConcurrently (zip3 stores flpInitialRefs flpInitialVars) $
        \(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
            jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
            threadDelay jitter
            subscribe
                store
                ( match UserCreated (hashEventHandler hashRef)
                    :? match Tombstone (handleTombstone doneVar)
                    :? MatchEnd
                )
                EventSelector{streamId = SingleStream sharedStream, startupPosition = FromPosition checkpointCursor}

    let fbDuringRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop (Int
2 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [IORef (Digest SHA256)]
hashRefs)
        fbDuringVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop (Int
2 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [MVar ()]
completionVars)
        flpDuringRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop (Int
3 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [IORef (Digest SHA256)]
hashRefs)
        flpDuringVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop (Int
3 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [MVar ()]
completionVars)

    duringHandlesVar <- newEmptyMVar

    _ <- async $ do
        threadDelay 1_000_000

        fbDuringHandles <- forConcurrently (zip3 stores fbDuringRefs fbDuringVars) $
            \(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
                jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
                threadDelay jitter
                subscribe
                    store
                    ( match UserCreated (hashEventHandler hashRef)
                        :? match Tombstone (handleTombstone doneVar)
                        :? MatchEnd
                    )
                    EventSelector{streamId = SingleStream sharedStream, startupPosition = FromBeginning}

        flpDuringHandles <- forConcurrently (zip3 stores flpDuringRefs flpDuringVars) $
            \(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
                jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
                threadDelay jitter
                subscribe
                    store
                    ( match UserCreated (hashEventHandler hashRef)
                        :? match Tombstone (handleTombstone doneVar)
                        :? MatchEnd
                    )
                    EventSelector{streamId = SingleStream sharedStream, startupPosition = FromPosition checkpointCursor}

        putMVar duringHandlesVar (fbDuringHandles <> flpDuringHandles)

    forConcurrently_ stores $ \BackendHandle backend
store -> do
        jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
50_000)
        threadDelay jitter
        forM_ [(halfEvents + 1) .. numEventsPerInstance] $ \Int
eventNum -> do
            eventJitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
25_000)
            threadDelay eventJitter
            let event = Int -> SomeLatestEvent
makeUserEvent Int
eventNum
            result <- insertEvents store Nothing (singleEvent sharedStream Any event)
            case result of
                FailedInsertion EventStoreError backend
err -> String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$ String
"Phase 3 write failed: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> String
forall a. Show a => a -> String
show EventStoreError backend
err
                SuccessfulInsertion InsertionSuccess backend
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

    duringHandles <- takeMVar duringHandlesVar

    let fbAfterRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [IORef (Digest SHA256)]
hashRefs)
        fbAfterVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [MVar ()]
completionVars)
        flpAfterRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop (Int
5 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [IORef (Digest SHA256)]
hashRefs
        flpAfterVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop (Int
5 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [MVar ()]
completionVars

    fbAfterHandles <- forConcurrently (zip3 stores fbAfterRefs fbAfterVars) $
        \(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
            jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
            threadDelay jitter
            subscribe
                store
                ( match UserCreated (hashEventHandler hashRef)
                    :? match Tombstone (handleTombstone doneVar)
                    :? MatchEnd
                )
                EventSelector{streamId = SingleStream sharedStream, startupPosition = FromBeginning}

    flpAfterHandles <- forConcurrently (zip3 stores flpAfterRefs flpAfterVars) $
        \(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
            jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
            threadDelay jitter
            subscribe
                store
                ( match UserCreated (hashEventHandler hashRef)
                    :? match Tombstone (handleTombstone doneVar)
                    :? MatchEnd
                )
                EventSelector{streamId = SingleStream sharedStream, startupPosition = FromPosition checkpointCursor}

    result <- insertEvents (head stores) Nothing (singleEvent sharedStream Any makeTombstone)

    let allHandles = [SubscriptionHandle backend]
fbInitialHandles [SubscriptionHandle backend]
-> [SubscriptionHandle backend] -> [SubscriptionHandle backend]
forall a. Semigroup a => a -> a -> a
<> [SubscriptionHandle backend]
flpInitialHandles [SubscriptionHandle backend]
-> [SubscriptionHandle backend] -> [SubscriptionHandle backend]
forall a. Semigroup a => a -> a -> a
<> [SubscriptionHandle backend]
duringHandles [SubscriptionHandle backend]
-> [SubscriptionHandle backend] -> [SubscriptionHandle backend]
forall a. Semigroup a => a -> a -> a
<> [SubscriptionHandle backend]
fbAfterHandles [SubscriptionHandle backend]
-> [SubscriptionHandle backend] -> [SubscriptionHandle backend]
forall a. Semigroup a => a -> a -> a
<> [SubscriptionHandle backend]
flpAfterHandles

    case result of
        FailedInsertion EventStoreError backend
err -> do
            (SubscriptionHandle backend -> Assertion)
-> [SubscriptionHandle backend] -> Assertion
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (.cancel) [SubscriptionHandle backend]
allHandles
            String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$ String
"Failed to insert tombstone: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> String
forall a. Show a => a -> String
show EventStoreError backend
err
        SuccessfulInsertion InsertionSuccess backend
_ -> do
            timeoutResult <- Int -> Assertion -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
30_000_000 (Assertion -> IO (Maybe ())) -> Assertion -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ [MVar ()] -> (MVar () -> Assertion) -> Assertion
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [MVar ()]
completionVars MVar () -> Assertion
forall a. MVar a -> IO a
takeMVar
            mapM_ (.cancel) allHandles

            case timeoutResult of
                Maybe ()
Nothing -> String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure String
"Test timed out waiting for subscriptions"
                Just ()
_ -> do
                    fbInitialHashes <- (IORef (Digest SHA256) -> IO (Digest SHA256))
-> [IORef (Digest SHA256)] -> IO [Digest SHA256]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM IORef (Digest SHA256) -> IO (Digest SHA256)
forall a. IORef a -> IO a
readIORef (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances [IORef (Digest SHA256)]
hashRefs)
                    flpInitialHashes <- mapM readIORef (take numInstances (drop numInstances hashRefs))
                    fbDuringHashes <- mapM readIORef (take numInstances (drop (2 * numInstances) hashRefs))
                    flpDuringHashes <- mapM readIORef (take numInstances (drop (3 * numInstances) hashRefs))
                    fbAfterHashes <- mapM readIORef (take numInstances (drop (4 * numInstances) hashRefs))
                    flpAfterHashes <- mapM readIORef (drop (5 * numInstances) hashRefs)

                    verifyGroupConsistency "FB-initial" fbInitialHashes
                    verifyGroupConsistency "FLP-initial" flpInitialHashes
                    verifyGroupConsistency "FB-during" fbDuringHashes
                    verifyGroupConsistency "FLP-during" flpDuringHashes
                    verifyGroupConsistency "FB-after" fbAfterHashes
                    verifyGroupConsistency "FLP-after" flpAfterHashes

                    case (fbInitialHashes, fbDuringHashes, fbAfterHashes) of
                        (Digest SHA256
h1 : [Digest SHA256]
_, Digest SHA256
h2 : [Digest SHA256]
_, Digest SHA256
h3 : [Digest SHA256]
_) -> do
                            Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
/= Digest SHA256
h2) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
                                String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
                                    String
"FB-initial and FB-during have different hashes:\n"
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"  FB-initial: "
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h1
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\n  FB-during:  "
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h2
                            Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
/= Digest SHA256
h3) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
                                String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
                                    String
"FB-initial and FB-after have different hashes:\n"
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"  FB-initial: "
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h1
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\n  FB-after:   "
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h3
                        ([Digest SHA256], [Digest SHA256], [Digest SHA256])
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

                    case (flpInitialHashes, flpDuringHashes, flpAfterHashes) of
                        (Digest SHA256
h1 : [Digest SHA256]
_, Digest SHA256
h2 : [Digest SHA256]
_, Digest SHA256
h3 : [Digest SHA256]
_) -> do
                            Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
/= Digest SHA256
h2) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
                                String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
                                    String
"FLP-initial and FLP-during have different hashes:\n"
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"  FLP-initial: "
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h1
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\n  FLP-during:  "
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h2
                            Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
/= Digest SHA256
h3) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
                                String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
                                    String
"FLP-initial and FLP-after have different hashes:\n"
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"  FLP-initial: "
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h1
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\n  FLP-after:   "
                                        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h3
                        ([Digest SHA256], [Digest SHA256], [Digest SHA256])
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

                    case (fbInitialHashes, flpInitialHashes) of
                        (Digest SHA256
h1 : [Digest SHA256]
_, Digest SHA256
h2 : [Digest SHA256]
_) ->
                            Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
== Digest SHA256
h2) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
                                String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure
                                    String
"SANITY CHECK FAILED: FB and FLP groups have the same hash!\n\
                                    \This indicates FromPosition is not working correctly."
                        ([Digest SHA256], [Digest SHA256])
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-- * Hash Chain Helpers

initialHash :: Digest SHA256
initialHash :: Digest SHA256
initialHash = ByteString -> Digest SHA256
forall ba a.
(ByteArrayAccess ba, HashAlgorithm a) =>
ba -> Digest a
hash (ByteString
BS.empty :: ByteString)

hashEventHandler :: forall backend. IORef (Digest SHA256) -> EventHandler UserCreated IO backend
hashEventHandler :: forall backend.
IORef (Digest SHA256) -> EventHandler UserCreated IO backend
hashEventHandler IORef (Digest SHA256)
hashRef EventEnvelope UserCreated backend
envelope = do
    let eventIdBytes :: ByteString
eventIdBytes = LazyByteString -> ByteString
BSL.toStrict (LazyByteString -> ByteString) -> LazyByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ UUID -> LazyByteString
toByteString (EventEnvelope UserCreated backend
envelope.eventId.toUUID)
        streamIdBytes :: ByteString
streamIdBytes = LazyByteString -> ByteString
BSL.toStrict (LazyByteString -> ByteString) -> LazyByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ UUID -> LazyByteString
toByteString (EventEnvelope UserCreated backend
envelope.streamId.toUUID)
        streamVersionBytes :: ByteString
streamVersionBytes = StreamVersion -> ByteString
encodeStreamVersion EventEnvelope UserCreated backend
envelope.streamVersion

    prevHash <- IORef (Digest SHA256) -> IO (Digest SHA256)
forall a. IORef a -> IO a
readIORef IORef (Digest SHA256)
hashRef
    let prevHashBytes = Digest SHA256 -> ByteString
hashToBytes Digest SHA256
prevHash
        combined = [ByteString] -> ByteString
BS.concat [ByteString
prevHashBytes, ByteString
eventIdBytes, ByteString
streamIdBytes, ByteString
streamVersionBytes]
        newHash = ByteString -> Digest SHA256
forall ba a.
(ByteArrayAccess ba, HashAlgorithm a) =>
ba -> Digest a
hash ByteString
combined

    writeIORef hashRef newHash
    pure Continue

hashToBytes :: Digest SHA256 -> ByteString
hashToBytes :: Digest SHA256 -> ByteString
hashToBytes = Digest SHA256 -> ByteString
forall bin bout.
(ByteArrayAccess bin, ByteArray bout) =>
bin -> bout
BA.convert

encodeStreamVersion :: StreamVersion -> ByteString
encodeStreamVersion :: StreamVersion -> ByteString
encodeStreamVersion (StreamVersion Int64
n) = [Word8] -> ByteString
BS.pack ([Word8] -> ByteString) -> [Word8] -> ByteString
forall a b. (a -> b) -> a -> b
$ Int64 -> [Word8]
encodeInt64 Int64
n

encodeInt64 :: Int64 -> [Word8]
encodeInt64 :: Int64 -> [Word8]
encodeInt64 Int64
n =
    [ Int64 -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64
n Int64 -> Int -> Int64
forall a. Bits a => a -> Int -> a
`shiftR` Int
56)
    , Int64 -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64
n Int64 -> Int -> Int64
forall a. Bits a => a -> Int -> a
`shiftR` Int
48)
    , Int64 -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64
n Int64 -> Int -> Int64
forall a. Bits a => a -> Int -> a
`shiftR` Int
40)
    , Int64 -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64
n Int64 -> Int -> Int64
forall a. Bits a => a -> Int -> a
`shiftR` Int
32)
    , Int64 -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64
n Int64 -> Int -> Int64
forall a. Bits a => a -> Int -> a
`shiftR` Int
24)
    , Int64 -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64
n Int64 -> Int -> Int64
forall a. Bits a => a -> Int -> a
`shiftR` Int
16)
    , Int64 -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64
n Int64 -> Int -> Int64
forall a. Bits a => a -> Int -> a
`shiftR` Int
8)
    , Int64 -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
n
    ]

verifyGroupConsistency :: String -> [Digest SHA256] -> IO ()
verifyGroupConsistency :: String -> [Digest SHA256] -> Assertion
verifyGroupConsistency String
groupName [Digest SHA256]
hashes =
    case [Digest SHA256]
hashes of
        [] -> String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$ String
groupName String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
": No subscriptions completed"
        (Digest SHA256
expectedHash : [Digest SHA256]
rest) ->
            [(Int, Digest SHA256)]
-> ((Int, Digest SHA256) -> Assertion) -> Assertion
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Int] -> [Digest SHA256] -> [(Int, Digest SHA256)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1 :: Int ..] [Digest SHA256]
rest) (((Int, Digest SHA256) -> Assertion) -> Assertion)
-> ((Int, Digest SHA256) -> Assertion) -> Assertion
forall a b. (a -> b) -> a -> b
$ \(Int
idx, Digest SHA256
actualHash) ->
                Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
actualHash Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
/= Digest SHA256
expectedHash) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
                    String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
                        String
groupName
                            String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" subscription "
                            String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
idx
                            String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" saw different event order.\n"
                            String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"Expected hash: "
                            String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
expectedHash
                            String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\nActual hash:   "
                            String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
actualHash