{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
module Test.Hindsight.Store.MultiInstanceEventOrderingTests (multiInstanceEventOrderingTests) where
import Control.Concurrent (MVar, newEmptyMVar, putMVar, takeMVar, threadDelay)
import Control.Monad (forM, forM_, replicateM, when)
import Crypto.Hash (Digest, SHA256, hash)
import Data.Bits (shiftR)
import Data.ByteArray qualified as BA
import Data.ByteString (ByteString)
import Data.ByteString qualified as BS
import Data.ByteString.Lazy qualified as BSL
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.Int (Int64)
import Data.Map.Strict qualified as Map
import Data.Maybe (mapMaybe)
import Data.Text (Text)
import Data.UUID (toByteString)
import Data.UUID.V4 qualified as UUID
import Data.Word (Word8)
import Hindsight.Store
import System.Random (randomRIO)
import System.Timeout (timeout)
import Test.Hindsight.Examples (Tombstone, UserCreated, UserInformation2 (..), makeTombstone, makeUserEvent)
import Test.Hindsight.Store.Common (collectEvents, extractPayload, handleTombstone)
import Test.Hindsight.Store.TestRunner (EventStoreTestRunner (..))
import Test.Tasty
import Test.Tasty.HUnit
import UnliftIO.Async (async, forConcurrently, forConcurrently_)
multiInstanceEventOrderingTests ::
forall backend.
(EventStore backend, StoreConstraints backend IO, Show (Cursor backend), Ord (Cursor backend)) =>
EventStoreTestRunner backend ->
[TestTree]
multiInstanceEventOrderingTests :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend), Ord (Cursor backend)) =>
EventStoreTestRunner backend -> [TestTree]
multiInstanceEventOrderingTests EventStoreTestRunner backend
runner =
[ String -> [TestTree] -> TestTree
testGroup
String
"Multi-Instance Tests"
[ String -> Assertion -> TestTree
testCase String
"Multi-Instance Subscriptions (2 instances)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
withStores EventStoreTestRunner backend
runner Int
2 [BackendHandle backend] -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
[BackendHandle backend] -> Assertion
testMultiInstanceSubscription
, String -> Assertion -> TestTree
testCase String
"Multi-Instance Subscriptions (5 instances)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
withStores EventStoreTestRunner backend
runner Int
5 [BackendHandle backend] -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
[BackendHandle backend] -> Assertion
testMultiInstanceSubscription
, String -> Assertion -> TestTree
testCase String
"Multi-Instance Subscriptions (10 instances)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
withStores EventStoreTestRunner backend
runner Int
10 [BackendHandle backend] -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
[BackendHandle backend] -> Assertion
testMultiInstanceSubscription
]
, String -> [TestTree] -> TestTree
testGroup
String
"Multi-Instance Event Ordering Tests (AllStreams)"
[ String -> Assertion -> TestTree
testCase String
"Event Ordering AllStreams (2 instances, 5 events each)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
withStores EventStoreTestRunner backend
runner Int
2 (Int -> Int -> [BackendHandle backend] -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend), Ord (Cursor backend)) =>
Int -> Int -> [BackendHandle backend] -> Assertion
testMultiInstanceEventOrdering_AllStreams Int
5 Int
2)
, String -> Assertion -> TestTree
testCase String
"Event Ordering AllStreams (3 instances, 10 events each)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
withStores EventStoreTestRunner backend
runner Int
3 (Int -> Int -> [BackendHandle backend] -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend), Ord (Cursor backend)) =>
Int -> Int -> [BackendHandle backend] -> Assertion
testMultiInstanceEventOrdering_AllStreams Int
10 Int
3)
, String -> Assertion -> TestTree
testCase String
"Event Ordering AllStreams (5 instances, 20 events each)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
withStores EventStoreTestRunner backend
runner Int
5 (Int -> Int -> [BackendHandle backend] -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend), Ord (Cursor backend)) =>
Int -> Int -> [BackendHandle backend] -> Assertion
testMultiInstanceEventOrdering_AllStreams Int
20 Int
5)
]
, String -> [TestTree] -> TestTree
testGroup
String
"Multi-Instance Event Ordering Tests (SingleStream)"
[ String -> Assertion -> TestTree
testCase String
"Event Ordering SingleStream (2 instances, 5 events each)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
withStores EventStoreTestRunner backend
runner Int
2 (Int -> Int -> [BackendHandle backend] -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend), Ord (Cursor backend)) =>
Int -> Int -> [BackendHandle backend] -> Assertion
testMultiInstanceEventOrdering_SingleStream Int
5 Int
2)
, String -> Assertion -> TestTree
testCase String
"Event Ordering SingleStream (3 instances, 10 events each)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
withStores EventStoreTestRunner backend
runner Int
3 (Int -> Int -> [BackendHandle backend] -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend), Ord (Cursor backend)) =>
Int -> Int -> [BackendHandle backend] -> Assertion
testMultiInstanceEventOrdering_SingleStream Int
10 Int
3)
, String -> Assertion -> TestTree
testCase String
"Event Ordering SingleStream (5 instances, 20 events each)" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. Int -> ([BackendHandle backend] -> IO a) -> Assertion
withStores EventStoreTestRunner backend
runner Int
5 (Int -> Int -> [BackendHandle backend] -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend), Ord (Cursor backend)) =>
Int -> Int -> [BackendHandle backend] -> Assertion
testMultiInstanceEventOrdering_SingleStream Int
20 Int
5)
]
]
testMultiInstanceSubscription ::
forall backend.
(EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) =>
[BackendHandle backend] ->
IO ()
testMultiInstanceSubscription :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
[BackendHandle backend] -> Assertion
testMultiInstanceSubscription [BackendHandle backend]
stores = do
case [BackendHandle backend]
stores of
[] -> String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure String
"Expected at least 1 store handle"
(BackendHandle backend
writerStore : [BackendHandle backend]
subscriberStores) -> do
streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
completionVars <- replicateM (length subscriberStores) newEmptyMVar
receivedEventsRefs <- replicateM (length subscriberStores) (newIORef [])
subscriptionHandles <- forM (zip3 subscriberStores completionVars receivedEventsRefs) $
\(BackendHandle backend
store, MVar ()
completionVar, IORef [EventEnvelope UserCreated backend]
eventsRef) -> do
BackendHandle backend
-> EventMatcher '[UserCreated, Tombstone] backend IO
-> EventSelector backend
-> IO (SubscriptionHandle backend)
forall backend (m :: * -> *) (ts :: [Symbol]).
(EventStore backend, StoreConstraints backend m) =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints backend m =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribe
BackendHandle backend
store
( EventHandler UserCreated IO backend
-> (Proxy UserCreated, EventHandler UserCreated IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy UserCreated, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match UserCreated (IORef [EventEnvelope UserCreated backend]
-> EventHandler UserCreated IO backend
forall (event :: Symbol) backend.
IORef [EventEnvelope event backend]
-> EventHandler event IO backend
collectEvents IORef [EventEnvelope UserCreated backend]
eventsRef)
(Proxy UserCreated, EventHandler UserCreated IO backend)
-> EventMatcher '[Tombstone] backend IO
-> EventMatcher '[UserCreated, Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventHandler Tombstone IO backend
-> (Proxy Tombstone, EventHandler Tombstone IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy Tombstone, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match Tombstone (MVar () -> EventHandler Tombstone IO backend
forall (event :: Symbol) backend.
MVar () -> EventHandler event IO backend
handleTombstone MVar ()
completionVar)
(Proxy Tombstone, EventHandler Tombstone IO backend)
-> EventMatcher '[] backend IO
-> EventMatcher '[Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventMatcher '[] backend IO
forall backend (m :: * -> *). EventMatcher '[] backend m
MatchEnd
)
EventSelector{streamId :: StreamSelector
streamId = StreamSelector
AllStreams, startupPosition :: StartupPosition backend
startupPosition = StartupPosition backend
forall backend. StartupPosition backend
FromBeginning}
let testEvents = (Int -> SomeLatestEvent) -> [Int] -> [SomeLatestEvent]
forall a b. (a -> b) -> [a] -> [b]
map Int -> SomeLatestEvent
makeUserEvent [Int
1 .. Int
5] [SomeLatestEvent] -> [SomeLatestEvent] -> [SomeLatestEvent]
forall a. [a] -> [a] -> [a]
++ [SomeLatestEvent
makeTombstone]
result <-
insertEvents
writerStore
Nothing
(Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))
case result of
FailedInsertion EventStoreError backend
err -> do
(SubscriptionHandle backend -> Assertion)
-> [SubscriptionHandle backend] -> Assertion
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (.cancel) [SubscriptionHandle backend]
subscriptionHandles
String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$ String
"Failed to insert events: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> String
forall a. Show a => a -> String
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
[MVar ()] -> (MVar () -> Assertion) -> Assertion
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [MVar ()]
completionVars MVar () -> Assertion
forall a. MVar a -> IO a
takeMVar
(SubscriptionHandle backend -> Assertion)
-> [SubscriptionHandle backend] -> Assertion
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (.cancel) [SubscriptionHandle backend]
subscriptionHandles
allReceivedEvents <- (IORef [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend])
-> [IORef [EventEnvelope UserCreated backend]]
-> IO [[EventEnvelope UserCreated backend]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (([EventEnvelope UserCreated backend]
-> [EventEnvelope UserCreated backend])
-> IO [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [EventEnvelope UserCreated backend]
-> [EventEnvelope UserCreated backend]
forall a. [a] -> [a]
reverse (IO [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend])
-> (IORef [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend])
-> IORef [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IORef [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend]
forall a. IORef a -> IO a
readIORef) [IORef [EventEnvelope UserCreated backend]]
receivedEventsRefs
forM_ allReceivedEvents $ \[EventEnvelope UserCreated backend]
events -> do
[EventEnvelope UserCreated backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [EventEnvelope UserCreated backend]
events Int -> Int -> Assertion
forall a. (Eq a, Show a, HasCallStack) => a -> a -> Assertion
@?= Int
5
let userInfos :: [UserInformation2]
userInfos = (EventEnvelope UserCreated backend -> UserInformation2)
-> [EventEnvelope UserCreated backend] -> [UserInformation2]
forall a b. (a -> b) -> [a] -> [b]
map EventEnvelope UserCreated backend -> CurrentPayloadType UserCreated
EventEnvelope UserCreated backend -> UserInformation2
forall (event :: Symbol) backend.
EventEnvelope event backend -> CurrentPayloadType event
extractPayload [EventEnvelope UserCreated backend]
events
[UserInformation2] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [UserInformation2]
userInfos Int -> Int -> Assertion
forall a. (Eq a, Show a, HasCallStack) => a -> a -> Assertion
@?= Int
5
let userNames :: [Text]
userNames :: [Text]
userNames = (UserInformation2 -> Text) -> [UserInformation2] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (.userName) [UserInformation2]
userInfos
[Text]
userNames [Text] -> [Text] -> Assertion
forall a. (Eq a, Show a, HasCallStack) => a -> a -> Assertion
@?= [Text
"user1", Text
"user2", Text
"user3", Text
"user4", Text
"user5"]
case allReceivedEvents of
[] -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
([EventEnvelope UserCreated backend]
firstEvents : [[EventEnvelope UserCreated backend]]
restEvents) ->
[[EventEnvelope UserCreated backend]]
-> ([EventEnvelope UserCreated backend] -> Assertion) -> Assertion
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [[EventEnvelope UserCreated backend]]
restEvents (([EventEnvelope UserCreated backend] -> Assertion) -> Assertion)
-> ([EventEnvelope UserCreated backend] -> Assertion) -> Assertion
forall a b. (a -> b) -> a -> b
$ \[EventEnvelope UserCreated backend]
events ->
[EventEnvelope UserCreated backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [EventEnvelope UserCreated backend]
events Int -> Int -> Assertion
forall a. (Eq a, Show a, HasCallStack) => a -> a -> Assertion
@?= [EventEnvelope UserCreated backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [EventEnvelope UserCreated backend]
firstEvents
testMultiInstanceEventOrdering_AllStreams ::
forall backend.
(EventStore backend, StoreConstraints backend IO, Show (Cursor backend), Ord (Cursor backend)) =>
Int ->
Int ->
[BackendHandle backend] ->
IO ()
testMultiInstanceEventOrdering_AllStreams :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend), Ord (Cursor backend)) =>
Int -> Int -> [BackendHandle backend] -> Assertion
testMultiInstanceEventOrdering_AllStreams Int
numEventsPerInstance Int
numInstances [BackendHandle backend]
stores = do
Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([BackendHandle backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [BackendHandle backend]
stores Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
numInstances) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
String
"Expected " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
numInstances String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" store handles, got " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show ([BackendHandle backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [BackendHandle backend]
stores)
let halfEvents :: Int
halfEvents = Int
numEventsPerInstance Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
2
streamIds <- Int -> IO StreamId -> IO [StreamId]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numInstances (UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom)
tombstoneStream <- StreamId <$> UUID.nextRandom
hashRefs <- replicateM (6 * numInstances) (newIORef (initialHash :: Digest SHA256))
completionVars <- replicateM (6 * numInstances) newEmptyMVar
allCursors <- forConcurrently (zip stores streamIds) $ \(BackendHandle backend
store, StreamId
streamId) -> do
jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
50_000)
threadDelay jitter
forM [1 .. halfEvents] $ \Int
eventNum -> do
eventJitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
25_000)
threadDelay eventJitter
let event = Int -> SomeLatestEvent
makeUserEvent Int
eventNum
result <- insertEvents store Nothing (singleEvent streamId Any event)
case result of
FailedInsertion EventStoreError backend
err -> String -> IO (Cursor backend)
forall a. HasCallStack => String -> IO a
assertFailure (String -> IO (Cursor backend)) -> String -> IO (Cursor backend)
forall a b. (a -> b) -> a -> b
$ String
"Phase 1 write failed: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> String
forall a. Show a => a -> String
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
success -> Cursor backend -> IO (Cursor backend)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure InsertionSuccess backend
success.finalCursor
let checkpointCursor = case [[Cursor backend]] -> [Cursor backend]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [[Cursor backend]]
allCursors of
[] -> String -> Cursor backend
forall a. HasCallStack => String -> a
error String
"No cursors captured from Phase 1"
[Cursor backend]
cs -> [Cursor backend] -> Cursor backend
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum [Cursor backend]
cs
let fbInitialRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances [IORef (Digest SHA256)]
hashRefs
fbInitialVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances [MVar ()]
completionVars
flpInitialRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop Int
numInstances [IORef (Digest SHA256)]
hashRefs)
flpInitialVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop Int
numInstances [MVar ()]
completionVars)
fbInitialHandles <- forConcurrently (zip3 stores fbInitialRefs fbInitialVars) $
\(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
threadDelay jitter
subscribe
store
( match UserCreated (hashEventHandler hashRef)
:? match Tombstone (handleTombstone doneVar)
:? MatchEnd
)
EventSelector{streamId = AllStreams, startupPosition = FromBeginning}
flpInitialHandles <- forConcurrently (zip3 stores flpInitialRefs flpInitialVars) $
\(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
threadDelay jitter
subscribe
store
( match UserCreated (hashEventHandler hashRef)
:? match Tombstone (handleTombstone doneVar)
:? MatchEnd
)
EventSelector{streamId = AllStreams, startupPosition = FromPosition checkpointCursor}
let fbDuringRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop (Int
2 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [IORef (Digest SHA256)]
hashRefs)
fbDuringVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop (Int
2 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [MVar ()]
completionVars)
flpDuringRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop (Int
3 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [IORef (Digest SHA256)]
hashRefs)
flpDuringVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop (Int
3 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [MVar ()]
completionVars)
duringHandlesVar <- newEmptyMVar
_ <- async $ do
threadDelay 1_000_000
fbDuringHandles <- forConcurrently (zip3 stores fbDuringRefs fbDuringVars) $
\(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
threadDelay jitter
subscribe
store
( match UserCreated (hashEventHandler hashRef)
:? match Tombstone (handleTombstone doneVar)
:? MatchEnd
)
EventSelector{streamId = AllStreams, startupPosition = FromBeginning}
flpDuringHandles <- forConcurrently (zip3 stores flpDuringRefs flpDuringVars) $
\(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
threadDelay jitter
subscribe
store
( match UserCreated (hashEventHandler hashRef)
:? match Tombstone (handleTombstone doneVar)
:? MatchEnd
)
EventSelector{streamId = AllStreams, startupPosition = FromPosition checkpointCursor}
putMVar duringHandlesVar (fbDuringHandles <> flpDuringHandles)
forConcurrently_ (zip stores streamIds) $ \(BackendHandle backend
store, StreamId
streamId) -> do
jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
50_000)
threadDelay jitter
forM_ [(halfEvents + 1) .. numEventsPerInstance] $ \Int
eventNum -> do
eventJitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
25_000)
threadDelay eventJitter
let event = Int -> SomeLatestEvent
makeUserEvent Int
eventNum
result <- insertEvents store Nothing (singleEvent streamId Any event)
case result of
FailedInsertion EventStoreError backend
err -> String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$ String
"Phase 3 write failed: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> String
forall a. Show a => a -> String
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
duringHandles <- takeMVar duringHandlesVar
let fbAfterRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [IORef (Digest SHA256)]
hashRefs)
fbAfterVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [MVar ()]
completionVars)
flpAfterRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop (Int
5 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [IORef (Digest SHA256)]
hashRefs
flpAfterVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop (Int
5 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [MVar ()]
completionVars
fbAfterHandles <- forConcurrently (zip3 stores fbAfterRefs fbAfterVars) $
\(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
threadDelay jitter
subscribe
store
( match UserCreated (hashEventHandler hashRef)
:? match Tombstone (handleTombstone doneVar)
:? MatchEnd
)
EventSelector{streamId = AllStreams, startupPosition = FromBeginning}
flpAfterHandles <- forConcurrently (zip3 stores flpAfterRefs flpAfterVars) $
\(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
threadDelay jitter
subscribe
store
( match UserCreated (hashEventHandler hashRef)
:? match Tombstone (handleTombstone doneVar)
:? MatchEnd
)
EventSelector{streamId = AllStreams, startupPosition = FromPosition checkpointCursor}
result <- insertEvents (head stores) Nothing (singleEvent tombstoneStream Any makeTombstone)
let allHandles = [SubscriptionHandle backend]
fbInitialHandles [SubscriptionHandle backend]
-> [SubscriptionHandle backend] -> [SubscriptionHandle backend]
forall a. Semigroup a => a -> a -> a
<> [SubscriptionHandle backend]
flpInitialHandles [SubscriptionHandle backend]
-> [SubscriptionHandle backend] -> [SubscriptionHandle backend]
forall a. Semigroup a => a -> a -> a
<> [SubscriptionHandle backend]
duringHandles [SubscriptionHandle backend]
-> [SubscriptionHandle backend] -> [SubscriptionHandle backend]
forall a. Semigroup a => a -> a -> a
<> [SubscriptionHandle backend]
fbAfterHandles [SubscriptionHandle backend]
-> [SubscriptionHandle backend] -> [SubscriptionHandle backend]
forall a. Semigroup a => a -> a -> a
<> [SubscriptionHandle backend]
flpAfterHandles
case result of
FailedInsertion EventStoreError backend
err -> do
(SubscriptionHandle backend -> Assertion)
-> [SubscriptionHandle backend] -> Assertion
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (.cancel) [SubscriptionHandle backend]
allHandles
String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$ String
"Failed to insert tombstone: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> String
forall a. Show a => a -> String
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
timeoutResult <- Int -> Assertion -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
30_000_000 (Assertion -> IO (Maybe ())) -> Assertion -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ [MVar ()] -> (MVar () -> Assertion) -> Assertion
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [MVar ()]
completionVars MVar () -> Assertion
forall a. MVar a -> IO a
takeMVar
mapM_ (.cancel) allHandles
case timeoutResult of
Maybe ()
Nothing -> String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure String
"Test timed out waiting for subscriptions"
Just ()
_ -> do
fbInitialHashes <- (IORef (Digest SHA256) -> IO (Digest SHA256))
-> [IORef (Digest SHA256)] -> IO [Digest SHA256]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM IORef (Digest SHA256) -> IO (Digest SHA256)
forall a. IORef a -> IO a
readIORef (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances [IORef (Digest SHA256)]
hashRefs)
flpInitialHashes <- mapM readIORef (take numInstances (drop numInstances hashRefs))
fbDuringHashes <- mapM readIORef (take numInstances (drop (2 * numInstances) hashRefs))
flpDuringHashes <- mapM readIORef (take numInstances (drop (3 * numInstances) hashRefs))
fbAfterHashes <- mapM readIORef (take numInstances (drop (4 * numInstances) hashRefs))
flpAfterHashes <- mapM readIORef (drop (5 * numInstances) hashRefs)
verifyGroupConsistency "FB-initial" fbInitialHashes
verifyGroupConsistency "FLP-initial" flpInitialHashes
verifyGroupConsistency "FB-during" fbDuringHashes
verifyGroupConsistency "FLP-during" flpDuringHashes
verifyGroupConsistency "FB-after" fbAfterHashes
verifyGroupConsistency "FLP-after" flpAfterHashes
case (fbInitialHashes, fbDuringHashes, fbAfterHashes) of
(Digest SHA256
h1 : [Digest SHA256]
_, Digest SHA256
h2 : [Digest SHA256]
_, Digest SHA256
h3 : [Digest SHA256]
_) -> do
Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
/= Digest SHA256
h2) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
String
"FB-initial and FB-during have different hashes:\n"
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" FB-initial: "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h1
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\n FB-during: "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h2
Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
/= Digest SHA256
h3) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
String
"FB-initial and FB-after have different hashes:\n"
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" FB-initial: "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h1
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\n FB-after: "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h3
([Digest SHA256], [Digest SHA256], [Digest SHA256])
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
case (flpInitialHashes, flpDuringHashes, flpAfterHashes) of
(Digest SHA256
h1 : [Digest SHA256]
_, Digest SHA256
h2 : [Digest SHA256]
_, Digest SHA256
h3 : [Digest SHA256]
_) -> do
Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
/= Digest SHA256
h2) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
String
"FLP-initial and FLP-during have different hashes:\n"
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" FLP-initial: "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h1
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\n FLP-during: "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h2
Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
/= Digest SHA256
h3) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
String
"FLP-initial and FLP-after have different hashes:\n"
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" FLP-initial: "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h1
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\n FLP-after: "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h3
([Digest SHA256], [Digest SHA256], [Digest SHA256])
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
case (fbInitialHashes, flpInitialHashes) of
(Digest SHA256
h1 : [Digest SHA256]
_, Digest SHA256
h2 : [Digest SHA256]
_) ->
Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
== Digest SHA256
h2) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure
String
"SANITY CHECK FAILED: FB and FLP groups have the same hash!\n\
\This indicates FromPosition is not working correctly."
([Digest SHA256], [Digest SHA256])
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
testMultiInstanceEventOrdering_SingleStream ::
forall backend.
(EventStore backend, StoreConstraints backend IO, Show (Cursor backend), Ord (Cursor backend)) =>
Int ->
Int ->
[BackendHandle backend] ->
IO ()
testMultiInstanceEventOrdering_SingleStream :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend), Ord (Cursor backend)) =>
Int -> Int -> [BackendHandle backend] -> Assertion
testMultiInstanceEventOrdering_SingleStream Int
numEventsPerInstance Int
numInstances [BackendHandle backend]
stores = do
Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([BackendHandle backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [BackendHandle backend]
stores Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
numInstances) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
String
"Expected " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
numInstances String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" store handles, got " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show ([BackendHandle backend] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [BackendHandle backend]
stores)
let halfEvents :: Int
halfEvents = Int
numEventsPerInstance Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
2
sharedStream <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
hashRefs <- replicateM (6 * numInstances) (newIORef (initialHash :: Digest SHA256))
completionVars <- replicateM (6 * numInstances) newEmptyMVar
allCursors <- forConcurrently stores $ \BackendHandle backend
store -> do
jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
50_000)
threadDelay jitter
forM [1 .. halfEvents] $ \Int
eventNum -> do
eventJitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
25_000)
threadDelay eventJitter
let event = Int -> SomeLatestEvent
makeUserEvent Int
eventNum
result <- insertEvents store Nothing (singleEvent sharedStream Any event)
case result of
FailedInsertion EventStoreError backend
err -> String -> IO (Cursor backend)
forall a. HasCallStack => String -> IO a
assertFailure (String -> IO (Cursor backend)) -> String -> IO (Cursor backend)
forall a b. (a -> b) -> a -> b
$ String
"Phase 1 write failed: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> String
forall a. Show a => a -> String
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
success -> Cursor backend -> IO (Cursor backend)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure InsertionSuccess backend
success.finalCursor
let checkpointCursor = case [[Cursor backend]] -> [Cursor backend]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [[Cursor backend]]
allCursors of
[] -> String -> Cursor backend
forall a. HasCallStack => String -> a
error String
"No cursors captured from Phase 1"
[Cursor backend]
cs -> [Cursor backend] -> Cursor backend
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum [Cursor backend]
cs
let fbInitialRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances [IORef (Digest SHA256)]
hashRefs
fbInitialVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances [MVar ()]
completionVars
flpInitialRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop Int
numInstances [IORef (Digest SHA256)]
hashRefs)
flpInitialVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop Int
numInstances [MVar ()]
completionVars)
fbInitialHandles <- forConcurrently (zip3 stores fbInitialRefs fbInitialVars) $
\(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
threadDelay jitter
subscribe
store
( match UserCreated (hashEventHandler hashRef)
:? match Tombstone (handleTombstone doneVar)
:? MatchEnd
)
EventSelector{streamId = SingleStream sharedStream, startupPosition = FromBeginning}
flpInitialHandles <- forConcurrently (zip3 stores flpInitialRefs flpInitialVars) $
\(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
threadDelay jitter
subscribe
store
( match UserCreated (hashEventHandler hashRef)
:? match Tombstone (handleTombstone doneVar)
:? MatchEnd
)
EventSelector{streamId = SingleStream sharedStream, startupPosition = FromPosition checkpointCursor}
let fbDuringRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop (Int
2 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [IORef (Digest SHA256)]
hashRefs)
fbDuringVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop (Int
2 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [MVar ()]
completionVars)
flpDuringRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop (Int
3 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [IORef (Digest SHA256)]
hashRefs)
flpDuringVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop (Int
3 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [MVar ()]
completionVars)
duringHandlesVar <- newEmptyMVar
_ <- async $ do
threadDelay 1_000_000
fbDuringHandles <- forConcurrently (zip3 stores fbDuringRefs fbDuringVars) $
\(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
threadDelay jitter
subscribe
store
( match UserCreated (hashEventHandler hashRef)
:? match Tombstone (handleTombstone doneVar)
:? MatchEnd
)
EventSelector{streamId = SingleStream sharedStream, startupPosition = FromBeginning}
flpDuringHandles <- forConcurrently (zip3 stores flpDuringRefs flpDuringVars) $
\(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
threadDelay jitter
subscribe
store
( match UserCreated (hashEventHandler hashRef)
:? match Tombstone (handleTombstone doneVar)
:? MatchEnd
)
EventSelector{streamId = SingleStream sharedStream, startupPosition = FromPosition checkpointCursor}
putMVar duringHandlesVar (fbDuringHandles <> flpDuringHandles)
forConcurrently_ stores $ \BackendHandle backend
store -> do
jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
50_000)
threadDelay jitter
forM_ [(halfEvents + 1) .. numEventsPerInstance] $ \Int
eventNum -> do
eventJitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
25_000)
threadDelay eventJitter
let event = Int -> SomeLatestEvent
makeUserEvent Int
eventNum
result <- insertEvents store Nothing (singleEvent sharedStream Any event)
case result of
FailedInsertion EventStoreError backend
err -> String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$ String
"Phase 3 write failed: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> String
forall a. Show a => a -> String
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
duringHandles <- takeMVar duringHandlesVar
let fbAfterRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [IORef (Digest SHA256)]
hashRefs)
fbAfterVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
take Int
numInstances (Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [MVar ()]
completionVars)
flpAfterRefs = Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
drop (Int
5 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [IORef (Digest SHA256)]
hashRefs
flpAfterVars = Int -> [MVar ()] -> [MVar ()]
forall a. Int -> [a] -> [a]
drop (Int
5 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numInstances) [MVar ()]
completionVars
fbAfterHandles <- forConcurrently (zip3 stores fbAfterRefs fbAfterVars) $
\(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
threadDelay jitter
subscribe
store
( match UserCreated (hashEventHandler hashRef)
:? match Tombstone (handleTombstone doneVar)
:? MatchEnd
)
EventSelector{streamId = SingleStream sharedStream, startupPosition = FromBeginning}
flpAfterHandles <- forConcurrently (zip3 stores flpAfterRefs flpAfterVars) $
\(BackendHandle backend
store, IORef (Digest SHA256)
hashRef, MVar ()
doneVar) -> do
jitter <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, Int
200_000)
threadDelay jitter
subscribe
store
( match UserCreated (hashEventHandler hashRef)
:? match Tombstone (handleTombstone doneVar)
:? MatchEnd
)
EventSelector{streamId = SingleStream sharedStream, startupPosition = FromPosition checkpointCursor}
result <- insertEvents (head stores) Nothing (singleEvent sharedStream Any makeTombstone)
let allHandles = [SubscriptionHandle backend]
fbInitialHandles [SubscriptionHandle backend]
-> [SubscriptionHandle backend] -> [SubscriptionHandle backend]
forall a. Semigroup a => a -> a -> a
<> [SubscriptionHandle backend]
flpInitialHandles [SubscriptionHandle backend]
-> [SubscriptionHandle backend] -> [SubscriptionHandle backend]
forall a. Semigroup a => a -> a -> a
<> [SubscriptionHandle backend]
duringHandles [SubscriptionHandle backend]
-> [SubscriptionHandle backend] -> [SubscriptionHandle backend]
forall a. Semigroup a => a -> a -> a
<> [SubscriptionHandle backend]
fbAfterHandles [SubscriptionHandle backend]
-> [SubscriptionHandle backend] -> [SubscriptionHandle backend]
forall a. Semigroup a => a -> a -> a
<> [SubscriptionHandle backend]
flpAfterHandles
case result of
FailedInsertion EventStoreError backend
err -> do
(SubscriptionHandle backend -> Assertion)
-> [SubscriptionHandle backend] -> Assertion
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (.cancel) [SubscriptionHandle backend]
allHandles
String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$ String
"Failed to insert tombstone: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> String
forall a. Show a => a -> String
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
timeoutResult <- Int -> Assertion -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
30_000_000 (Assertion -> IO (Maybe ())) -> Assertion -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ [MVar ()] -> (MVar () -> Assertion) -> Assertion
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [MVar ()]
completionVars MVar () -> Assertion
forall a. MVar a -> IO a
takeMVar
mapM_ (.cancel) allHandles
case timeoutResult of
Maybe ()
Nothing -> String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure String
"Test timed out waiting for subscriptions"
Just ()
_ -> do
fbInitialHashes <- (IORef (Digest SHA256) -> IO (Digest SHA256))
-> [IORef (Digest SHA256)] -> IO [Digest SHA256]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM IORef (Digest SHA256) -> IO (Digest SHA256)
forall a. IORef a -> IO a
readIORef (Int -> [IORef (Digest SHA256)] -> [IORef (Digest SHA256)]
forall a. Int -> [a] -> [a]
take Int
numInstances [IORef (Digest SHA256)]
hashRefs)
flpInitialHashes <- mapM readIORef (take numInstances (drop numInstances hashRefs))
fbDuringHashes <- mapM readIORef (take numInstances (drop (2 * numInstances) hashRefs))
flpDuringHashes <- mapM readIORef (take numInstances (drop (3 * numInstances) hashRefs))
fbAfterHashes <- mapM readIORef (take numInstances (drop (4 * numInstances) hashRefs))
flpAfterHashes <- mapM readIORef (drop (5 * numInstances) hashRefs)
verifyGroupConsistency "FB-initial" fbInitialHashes
verifyGroupConsistency "FLP-initial" flpInitialHashes
verifyGroupConsistency "FB-during" fbDuringHashes
verifyGroupConsistency "FLP-during" flpDuringHashes
verifyGroupConsistency "FB-after" fbAfterHashes
verifyGroupConsistency "FLP-after" flpAfterHashes
case (fbInitialHashes, fbDuringHashes, fbAfterHashes) of
(Digest SHA256
h1 : [Digest SHA256]
_, Digest SHA256
h2 : [Digest SHA256]
_, Digest SHA256
h3 : [Digest SHA256]
_) -> do
Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
/= Digest SHA256
h2) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
String
"FB-initial and FB-during have different hashes:\n"
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" FB-initial: "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h1
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\n FB-during: "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h2
Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
/= Digest SHA256
h3) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
String
"FB-initial and FB-after have different hashes:\n"
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" FB-initial: "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h1
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\n FB-after: "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h3
([Digest SHA256], [Digest SHA256], [Digest SHA256])
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
case (flpInitialHashes, flpDuringHashes, flpAfterHashes) of
(Digest SHA256
h1 : [Digest SHA256]
_, Digest SHA256
h2 : [Digest SHA256]
_, Digest SHA256
h3 : [Digest SHA256]
_) -> do
Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
/= Digest SHA256
h2) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
String
"FLP-initial and FLP-during have different hashes:\n"
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" FLP-initial: "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h1
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\n FLP-during: "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h2
Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
/= Digest SHA256
h3) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
String
"FLP-initial and FLP-after have different hashes:\n"
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" FLP-initial: "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h1
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\n FLP-after: "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
h3
([Digest SHA256], [Digest SHA256], [Digest SHA256])
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
case (fbInitialHashes, flpInitialHashes) of
(Digest SHA256
h1 : [Digest SHA256]
_, Digest SHA256
h2 : [Digest SHA256]
_) ->
Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
h1 Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
== Digest SHA256
h2) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure
String
"SANITY CHECK FAILED: FB and FLP groups have the same hash!\n\
\This indicates FromPosition is not working correctly."
([Digest SHA256], [Digest SHA256])
_ -> () -> Assertion
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
initialHash :: Digest SHA256
initialHash :: Digest SHA256
initialHash = ByteString -> Digest SHA256
forall ba a.
(ByteArrayAccess ba, HashAlgorithm a) =>
ba -> Digest a
hash (ByteString
BS.empty :: ByteString)
hashEventHandler :: forall backend. IORef (Digest SHA256) -> EventHandler UserCreated IO backend
hashEventHandler :: forall backend.
IORef (Digest SHA256) -> EventHandler UserCreated IO backend
hashEventHandler IORef (Digest SHA256)
hashRef EventEnvelope UserCreated backend
envelope = do
let eventIdBytes :: ByteString
eventIdBytes = LazyByteString -> ByteString
BSL.toStrict (LazyByteString -> ByteString) -> LazyByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ UUID -> LazyByteString
toByteString (EventEnvelope UserCreated backend
envelope.eventId.toUUID)
streamIdBytes :: ByteString
streamIdBytes = LazyByteString -> ByteString
BSL.toStrict (LazyByteString -> ByteString) -> LazyByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ UUID -> LazyByteString
toByteString (EventEnvelope UserCreated backend
envelope.streamId.toUUID)
streamVersionBytes :: ByteString
streamVersionBytes = StreamVersion -> ByteString
encodeStreamVersion EventEnvelope UserCreated backend
envelope.streamVersion
prevHash <- IORef (Digest SHA256) -> IO (Digest SHA256)
forall a. IORef a -> IO a
readIORef IORef (Digest SHA256)
hashRef
let prevHashBytes = Digest SHA256 -> ByteString
hashToBytes Digest SHA256
prevHash
combined = [ByteString] -> ByteString
BS.concat [ByteString
prevHashBytes, ByteString
eventIdBytes, ByteString
streamIdBytes, ByteString
streamVersionBytes]
newHash = ByteString -> Digest SHA256
forall ba a.
(ByteArrayAccess ba, HashAlgorithm a) =>
ba -> Digest a
hash ByteString
combined
writeIORef hashRef newHash
pure Continue
hashToBytes :: Digest SHA256 -> ByteString
hashToBytes :: Digest SHA256 -> ByteString
hashToBytes = Digest SHA256 -> ByteString
forall bin bout.
(ByteArrayAccess bin, ByteArray bout) =>
bin -> bout
BA.convert
encodeStreamVersion :: StreamVersion -> ByteString
encodeStreamVersion :: StreamVersion -> ByteString
encodeStreamVersion (StreamVersion Int64
n) = [Word8] -> ByteString
BS.pack ([Word8] -> ByteString) -> [Word8] -> ByteString
forall a b. (a -> b) -> a -> b
$ Int64 -> [Word8]
encodeInt64 Int64
n
encodeInt64 :: Int64 -> [Word8]
encodeInt64 :: Int64 -> [Word8]
encodeInt64 Int64
n =
[ Int64 -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64
n Int64 -> Int -> Int64
forall a. Bits a => a -> Int -> a
`shiftR` Int
56)
, Int64 -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64
n Int64 -> Int -> Int64
forall a. Bits a => a -> Int -> a
`shiftR` Int
48)
, Int64 -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64
n Int64 -> Int -> Int64
forall a. Bits a => a -> Int -> a
`shiftR` Int
40)
, Int64 -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64
n Int64 -> Int -> Int64
forall a. Bits a => a -> Int -> a
`shiftR` Int
32)
, Int64 -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64
n Int64 -> Int -> Int64
forall a. Bits a => a -> Int -> a
`shiftR` Int
24)
, Int64 -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64
n Int64 -> Int -> Int64
forall a. Bits a => a -> Int -> a
`shiftR` Int
16)
, Int64 -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64
n Int64 -> Int -> Int64
forall a. Bits a => a -> Int -> a
`shiftR` Int
8)
, Int64 -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
n
]
verifyGroupConsistency :: String -> [Digest SHA256] -> IO ()
verifyGroupConsistency :: String -> [Digest SHA256] -> Assertion
verifyGroupConsistency String
groupName [Digest SHA256]
hashes =
case [Digest SHA256]
hashes of
[] -> String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$ String
groupName String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
": No subscriptions completed"
(Digest SHA256
expectedHash : [Digest SHA256]
rest) ->
[(Int, Digest SHA256)]
-> ((Int, Digest SHA256) -> Assertion) -> Assertion
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Int] -> [Digest SHA256] -> [(Int, Digest SHA256)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1 :: Int ..] [Digest SHA256]
rest) (((Int, Digest SHA256) -> Assertion) -> Assertion)
-> ((Int, Digest SHA256) -> Assertion) -> Assertion
forall a b. (a -> b) -> a -> b
$ \(Int
idx, Digest SHA256
actualHash) ->
Bool -> Assertion -> Assertion
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Digest SHA256
actualHash Digest SHA256 -> Digest SHA256 -> Bool
forall a. Eq a => a -> a -> Bool
/= Digest SHA256
expectedHash) (Assertion -> Assertion) -> Assertion -> Assertion
forall a b. (a -> b) -> a -> b
$
String -> Assertion
forall a. HasCallStack => String -> IO a
assertFailure (String -> Assertion) -> String -> Assertion
forall a b. (a -> b) -> a -> b
$
String
groupName
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" subscription "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
idx
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" saw different event order.\n"
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"Expected hash: "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
expectedHash
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\nActual hash: "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Digest SHA256 -> String
forall a. Show a => a -> String
show Digest SHA256
actualHash