{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE Strict #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Test.Hindsight.Store.OrderingTests (orderingTests) where
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.Async (async, forConcurrently, forConcurrently_, wait)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM (TVar, atomically, modifyTVar, newTVarIO, readTVar, writeTVar)
import Control.Monad (forM, forM_, replicateM, void, when)
import Control.Monad.IO.Class (liftIO)
import Data.Aeson (FromJSON, ToJSON)
import Data.IORef (IORef, atomicModifyIORef', modifyIORef', newIORef, readIORef)
import Data.Int (Int64)
import Data.Map.Strict qualified as Map
import Data.Proxy (Proxy (..))
import Data.Set qualified as Set
import Data.Text (Text, isPrefixOf, pack, unpack)
import Data.Text qualified as Text
import Data.Time (UTCTime, diffUTCTime, getCurrentTime)
import Data.UUID.V4 qualified as UUID
import GHC.Generics (Generic)
import Hindsight.Events
import Hindsight.Store
import System.IO (hFlush, stdout)
import System.Random (randomRIO)
import System.Timeout (timeout)
import Test.Hindsight.Store.TestRunner (EventStoreTestRunner (..))
import Test.Tasty
import Test.Tasty.HUnit
type ComprehensiveTestEvent = "comprehensive_test_event"
data ComprehensiveTestPayload = ComprehensiveTestPayload
{ ComprehensiveTestPayload -> Int
compValue :: Int
, ComprehensiveTestPayload -> Text
text :: Text
, ComprehensiveTestPayload -> [Char]
bytes :: String
, ComprehensiveTestPayload -> UTCTime
timestamp :: UTCTime
}
deriving (Int -> ComprehensiveTestPayload -> ShowS
[ComprehensiveTestPayload] -> ShowS
ComprehensiveTestPayload -> [Char]
(Int -> ComprehensiveTestPayload -> ShowS)
-> (ComprehensiveTestPayload -> [Char])
-> ([ComprehensiveTestPayload] -> ShowS)
-> Show ComprehensiveTestPayload
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ComprehensiveTestPayload -> ShowS
showsPrec :: Int -> ComprehensiveTestPayload -> ShowS
$cshow :: ComprehensiveTestPayload -> [Char]
show :: ComprehensiveTestPayload -> [Char]
$cshowList :: [ComprehensiveTestPayload] -> ShowS
showList :: [ComprehensiveTestPayload] -> ShowS
Show, ComprehensiveTestPayload -> ComprehensiveTestPayload -> Bool
(ComprehensiveTestPayload -> ComprehensiveTestPayload -> Bool)
-> (ComprehensiveTestPayload -> ComprehensiveTestPayload -> Bool)
-> Eq ComprehensiveTestPayload
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ComprehensiveTestPayload -> ComprehensiveTestPayload -> Bool
== :: ComprehensiveTestPayload -> ComprehensiveTestPayload -> Bool
$c/= :: ComprehensiveTestPayload -> ComprehensiveTestPayload -> Bool
/= :: ComprehensiveTestPayload -> ComprehensiveTestPayload -> Bool
Eq, (forall x.
ComprehensiveTestPayload -> Rep ComprehensiveTestPayload x)
-> (forall x.
Rep ComprehensiveTestPayload x -> ComprehensiveTestPayload)
-> Generic ComprehensiveTestPayload
forall x.
Rep ComprehensiveTestPayload x -> ComprehensiveTestPayload
forall x.
ComprehensiveTestPayload -> Rep ComprehensiveTestPayload x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x.
ComprehensiveTestPayload -> Rep ComprehensiveTestPayload x
from :: forall x.
ComprehensiveTestPayload -> Rep ComprehensiveTestPayload x
$cto :: forall x.
Rep ComprehensiveTestPayload x -> ComprehensiveTestPayload
to :: forall x.
Rep ComprehensiveTestPayload x -> ComprehensiveTestPayload
Generic, Maybe ComprehensiveTestPayload
Value -> Parser [ComprehensiveTestPayload]
Value -> Parser ComprehensiveTestPayload
(Value -> Parser ComprehensiveTestPayload)
-> (Value -> Parser [ComprehensiveTestPayload])
-> Maybe ComprehensiveTestPayload
-> FromJSON ComprehensiveTestPayload
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser ComprehensiveTestPayload
parseJSON :: Value -> Parser ComprehensiveTestPayload
$cparseJSONList :: Value -> Parser [ComprehensiveTestPayload]
parseJSONList :: Value -> Parser [ComprehensiveTestPayload]
$comittedField :: Maybe ComprehensiveTestPayload
omittedField :: Maybe ComprehensiveTestPayload
FromJSON, [ComprehensiveTestPayload] -> Value
[ComprehensiveTestPayload] -> Encoding
ComprehensiveTestPayload -> Bool
ComprehensiveTestPayload -> Value
ComprehensiveTestPayload -> Encoding
(ComprehensiveTestPayload -> Value)
-> (ComprehensiveTestPayload -> Encoding)
-> ([ComprehensiveTestPayload] -> Value)
-> ([ComprehensiveTestPayload] -> Encoding)
-> (ComprehensiveTestPayload -> Bool)
-> ToJSON ComprehensiveTestPayload
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: ComprehensiveTestPayload -> Value
toJSON :: ComprehensiveTestPayload -> Value
$ctoEncoding :: ComprehensiveTestPayload -> Encoding
toEncoding :: ComprehensiveTestPayload -> Encoding
$ctoJSONList :: [ComprehensiveTestPayload] -> Value
toJSONList :: [ComprehensiveTestPayload] -> Value
$ctoEncodingList :: [ComprehensiveTestPayload] -> Encoding
toEncodingList :: [ComprehensiveTestPayload] -> Encoding
$comitField :: ComprehensiveTestPayload -> Bool
omitField :: ComprehensiveTestPayload -> Bool
ToJSON)
type instance MaxVersion ComprehensiveTestEvent = 0
type instance Versions ComprehensiveTestEvent = '[ComprehensiveTestPayload]
instance Hindsight.Events.Event ComprehensiveTestEvent
instance MigrateVersion 0 ComprehensiveTestEvent
type CompletionEvent = "test_completion_event"
data CompletionPayload = CompletionPayload
{ CompletionPayload -> Text
completionId :: Text
, CompletionPayload -> Int
completionTotalTransactions :: Int
, CompletionPayload -> Int
totalExpectedEvents :: Int
, CompletionPayload -> UTCTime
completedAt :: UTCTime
}
deriving (Int -> CompletionPayload -> ShowS
[CompletionPayload] -> ShowS
CompletionPayload -> [Char]
(Int -> CompletionPayload -> ShowS)
-> (CompletionPayload -> [Char])
-> ([CompletionPayload] -> ShowS)
-> Show CompletionPayload
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> CompletionPayload -> ShowS
showsPrec :: Int -> CompletionPayload -> ShowS
$cshow :: CompletionPayload -> [Char]
show :: CompletionPayload -> [Char]
$cshowList :: [CompletionPayload] -> ShowS
showList :: [CompletionPayload] -> ShowS
Show, CompletionPayload -> CompletionPayload -> Bool
(CompletionPayload -> CompletionPayload -> Bool)
-> (CompletionPayload -> CompletionPayload -> Bool)
-> Eq CompletionPayload
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: CompletionPayload -> CompletionPayload -> Bool
== :: CompletionPayload -> CompletionPayload -> Bool
$c/= :: CompletionPayload -> CompletionPayload -> Bool
/= :: CompletionPayload -> CompletionPayload -> Bool
Eq, (forall x. CompletionPayload -> Rep CompletionPayload x)
-> (forall x. Rep CompletionPayload x -> CompletionPayload)
-> Generic CompletionPayload
forall x. Rep CompletionPayload x -> CompletionPayload
forall x. CompletionPayload -> Rep CompletionPayload x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. CompletionPayload -> Rep CompletionPayload x
from :: forall x. CompletionPayload -> Rep CompletionPayload x
$cto :: forall x. Rep CompletionPayload x -> CompletionPayload
to :: forall x. Rep CompletionPayload x -> CompletionPayload
Generic, Maybe CompletionPayload
Value -> Parser [CompletionPayload]
Value -> Parser CompletionPayload
(Value -> Parser CompletionPayload)
-> (Value -> Parser [CompletionPayload])
-> Maybe CompletionPayload
-> FromJSON CompletionPayload
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser CompletionPayload
parseJSON :: Value -> Parser CompletionPayload
$cparseJSONList :: Value -> Parser [CompletionPayload]
parseJSONList :: Value -> Parser [CompletionPayload]
$comittedField :: Maybe CompletionPayload
omittedField :: Maybe CompletionPayload
FromJSON, [CompletionPayload] -> Value
[CompletionPayload] -> Encoding
CompletionPayload -> Bool
CompletionPayload -> Value
CompletionPayload -> Encoding
(CompletionPayload -> Value)
-> (CompletionPayload -> Encoding)
-> ([CompletionPayload] -> Value)
-> ([CompletionPayload] -> Encoding)
-> (CompletionPayload -> Bool)
-> ToJSON CompletionPayload
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: CompletionPayload -> Value
toJSON :: CompletionPayload -> Value
$ctoEncoding :: CompletionPayload -> Encoding
toEncoding :: CompletionPayload -> Encoding
$ctoJSONList :: [CompletionPayload] -> Value
toJSONList :: [CompletionPayload] -> Value
$ctoEncodingList :: [CompletionPayload] -> Encoding
toEncodingList :: [CompletionPayload] -> Encoding
$comitField :: CompletionPayload -> Bool
omitField :: CompletionPayload -> Bool
ToJSON)
type instance MaxVersion CompletionEvent = 0
type instance Versions CompletionEvent = '[CompletionPayload]
instance Hindsight.Events.Event CompletionEvent
instance MigrateVersion 0 CompletionEvent
data SubscriptionState position = SubscriptionState
{ forall position. SubscriptionState position -> Text
subscriptionId :: Text
, forall position. SubscriptionState position -> UTCTime
startedAt :: UTCTime
, forall position. SubscriptionState position -> Int64
currentHash :: !Int64
, forall position. SubscriptionState position -> Int
subEventCount :: Int
, forall position. SubscriptionState position -> Maybe UTCTime
firstEventTime :: Maybe UTCTime
, forall position. SubscriptionState position -> Maybe UTCTime
lastEventTime :: Maybe UTCTime
, forall position. SubscriptionState position -> Bool
completionReceived :: Bool
, forall position. SubscriptionState position -> Maybe UTCTime
completionReceivedAt :: Maybe UTCTime
, forall position. SubscriptionState position -> Maybe Int
expectedTotalEvents :: Maybe Int
, forall position. SubscriptionState position -> MVar ()
completionMVar :: MVar ()
, forall position. SubscriptionState position -> Map StreamId Int
streamIdHashes :: !(Map.Map StreamId Int)
, forall position. SubscriptionState position -> Map Text Int
textHashes :: !(Map.Map Text Int)
, forall position. SubscriptionState position -> [position]
positionsSeen :: ![position]
}
data TransactionPlan = TransactionPlan
{ TransactionPlan -> Int
planId :: Int
, TransactionPlan -> Int
planEventCount :: Int
, TransactionPlan -> [StreamId]
targetStreams :: [StreamId]
, TransactionPlan -> ComprehensiveTestPayload
basePayload :: ComprehensiveTestPayload
, TransactionPlan -> Int
delayBeforeStart :: Int
, TransactionPlan -> Int
expectedSlowness :: Int
}
deriving (Int -> TransactionPlan -> ShowS
[TransactionPlan] -> ShowS
TransactionPlan -> [Char]
(Int -> TransactionPlan -> ShowS)
-> (TransactionPlan -> [Char])
-> ([TransactionPlan] -> ShowS)
-> Show TransactionPlan
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TransactionPlan -> ShowS
showsPrec :: Int -> TransactionPlan -> ShowS
$cshow :: TransactionPlan -> [Char]
show :: TransactionPlan -> [Char]
$cshowList :: [TransactionPlan] -> ShowS
showList :: [TransactionPlan] -> ShowS
Show)
data TestSubscriptionResult position = TestSubscriptionResult
{ forall position. TestSubscriptionResult position -> Text
resultSubscriptionId :: Text
, forall position. TestSubscriptionResult position -> UTCTime
resultStartedAt :: UTCTime
, forall position. TestSubscriptionResult position -> Int64
resultFinalHash :: !Int64
, forall position. TestSubscriptionResult position -> Int
resultEventCount :: Int
, forall position. TestSubscriptionResult position -> Maybe UTCTime
resultFirstEventTime :: Maybe UTCTime
, forall position. TestSubscriptionResult position -> Maybe UTCTime
resultLastEventTime :: Maybe UTCTime
, forall position. TestSubscriptionResult position -> UTCTime
resultCompletedAt :: UTCTime
, forall position. TestSubscriptionResult position -> Bool
resultCompletionReceived :: Bool
, forall position. TestSubscriptionResult position -> Maybe UTCTime
resultCompletionReceivedAt :: Maybe UTCTime
, forall position. TestSubscriptionResult position -> Maybe Int
resultExpectedTotalEvents :: Maybe Int
, forall position. TestSubscriptionResult position -> Int64
resultProcessingDurationMs :: Int64
, forall position. TestSubscriptionResult position -> [position]
resultPositionsSeen :: ![position]
}
instance forall position. (Show position) => Show (TestSubscriptionResult position) where
show :: TestSubscriptionResult position -> [Char]
show TestSubscriptionResult position
r =
[Char]
"TestSubscriptionResult{"
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"subId="
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Text -> [Char]
unpack TestSubscriptionResult position
r.resultSubscriptionId
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
", events="
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show TestSubscriptionResult position
r.resultEventCount
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
", hash="
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int64 -> [Char]
forall a. Show a => a -> [Char]
show TestSubscriptionResult position
r.resultFinalHash
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
", completed="
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Bool -> [Char]
forall a. Show a => a -> [Char]
show TestSubscriptionResult position
r.resultCompletionReceived
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
", positions="
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ([position] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length TestSubscriptionResult position
r.resultPositionsSeen)
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" items"
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"}"
data TestConfiguration = TestConfiguration
{ TestConfiguration -> Int
configNumTransactions :: Int
, TestConfiguration -> Int
configMinEventsPerTx :: Int
, TestConfiguration -> Int
configMaxEventsPerTx :: Int
, TestConfiguration -> Int
configMinStreamsPerTx :: Int
, TestConfiguration -> Int
configMaxStreamsPerTx :: Int
, TestConfiguration -> Int
configTxExecutionWindowMs :: Int
, TestConfiguration -> Int
configNumEarlySubscriptions :: Int
, TestConfiguration -> Int
configNumConcurrentSubscriptions :: Int
, TestConfiguration -> Int
configNumValidationSubscriptions :: Int
, TestConfiguration -> Int
configSubExecutionWindowMs :: Int
, TestConfiguration -> Int
configProcessingTimeMs :: Int
, TestConfiguration -> Int
configMaxSlownessFactor :: Int
}
deriving (Int -> TestConfiguration -> ShowS
[TestConfiguration] -> ShowS
TestConfiguration -> [Char]
(Int -> TestConfiguration -> ShowS)
-> (TestConfiguration -> [Char])
-> ([TestConfiguration] -> ShowS)
-> Show TestConfiguration
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TestConfiguration -> ShowS
showsPrec :: Int -> TestConfiguration -> ShowS
$cshow :: TestConfiguration -> [Char]
show :: TestConfiguration -> [Char]
$cshowList :: [TestConfiguration] -> ShowS
showList :: [TestConfiguration] -> ShowS
Show, TestConfiguration -> TestConfiguration -> Bool
(TestConfiguration -> TestConfiguration -> Bool)
-> (TestConfiguration -> TestConfiguration -> Bool)
-> Eq TestConfiguration
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TestConfiguration -> TestConfiguration -> Bool
== :: TestConfiguration -> TestConfiguration -> Bool
$c/= :: TestConfiguration -> TestConfiguration -> Bool
/= :: TestConfiguration -> TestConfiguration -> Bool
Eq)
defaultTestConfig :: TestConfiguration
defaultTestConfig :: TestConfiguration
defaultTestConfig =
TestConfiguration
{ configNumTransactions :: Int
configNumTransactions = Int
10
, configMinEventsPerTx :: Int
configMinEventsPerTx = Int
10
, configMaxEventsPerTx :: Int
configMaxEventsPerTx = Int
10
, configMinStreamsPerTx :: Int
configMinStreamsPerTx = Int
1
, configMaxStreamsPerTx :: Int
configMaxStreamsPerTx = Int
5
, configTxExecutionWindowMs :: Int
configTxExecutionWindowMs = Int
20000
, configNumEarlySubscriptions :: Int
configNumEarlySubscriptions = Int
2
, configNumConcurrentSubscriptions :: Int
configNumConcurrentSubscriptions = Int
10
, configNumValidationSubscriptions :: Int
configNumValidationSubscriptions = Int
2
, configSubExecutionWindowMs :: Int
configSubExecutionWindowMs = Int
7000
, configProcessingTimeMs :: Int
configProcessingTimeMs = Int
3000
, configMaxSlownessFactor :: Int
configMaxSlownessFactor = Int
10
}
lightTestConfig :: TestConfiguration
lightTestConfig :: TestConfiguration
lightTestConfig =
TestConfiguration
defaultTestConfig
{ configNumTransactions = 5
, configMaxEventsPerTx = 10
, configTxExecutionWindowMs = 10000
, configNumEarlySubscriptions = 1
, configNumConcurrentSubscriptions = 3
, configNumValidationSubscriptions = 1
, configSubExecutionWindowMs = 3000
, configProcessingTimeMs = 1000
}
stressTestConfig :: TestConfiguration
stressTestConfig :: TestConfiguration
stressTestConfig =
TestConfiguration
defaultTestConfig
{ configNumTransactions = 100
, configMaxEventsPerTx = 10
, configTxExecutionWindowMs = 6000
, configNumEarlySubscriptions = 1000
, configNumConcurrentSubscriptions = 1000
, configNumValidationSubscriptions = 1000
, configSubExecutionWindowMs = 12000
, configProcessingTimeMs = 20000
, configMaxSlownessFactor = 20
}
debugTestConfig :: TestConfiguration
debugTestConfig :: TestConfiguration
debugTestConfig =
TestConfiguration
defaultTestConfig
{ configNumTransactions = 100
, configMinEventsPerTx = 10
, configMaxEventsPerTx = 15
, configTxExecutionWindowMs = 1000
, configNumEarlySubscriptions = 1
, configNumConcurrentSubscriptions = 3
, configNumValidationSubscriptions = 1
, configSubExecutionWindowMs = 500
, configProcessingTimeMs = 100
}
data ProgressState = ProgressState
{ ProgressState -> Int
transactionsCompleted :: Int
, ProgressState -> Int
transactionsFailed :: Int
, ProgressState -> Int
totalTransactions :: Int
, ProgressState -> Int
subscriptionsStarted :: Int
, ProgressState -> Int
subscriptionsCompleted :: Int
, ProgressState -> Int
totalSubscriptions :: Int
, ProgressState -> UTCTime
testStartTime :: UTCTime
, ProgressState -> UTCTime
lastUpdateTime :: UTCTime
, ProgressState -> Text
currentPhase :: Text
, ProgressState -> Text
detailMessage :: Text
}
deriving (Int -> ProgressState -> ShowS
[ProgressState] -> ShowS
ProgressState -> [Char]
(Int -> ProgressState -> ShowS)
-> (ProgressState -> [Char])
-> ([ProgressState] -> ShowS)
-> Show ProgressState
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ProgressState -> ShowS
showsPrec :: Int -> ProgressState -> ShowS
$cshow :: ProgressState -> [Char]
show :: ProgressState -> [Char]
$cshowList :: [ProgressState] -> ShowS
showList :: [ProgressState] -> ShowS
Show)
data ProgressManager = ProgressManager
{ ProgressManager -> TVar ProgressState
progressState :: TVar ProgressState
, ProgressManager -> TVar Bool
shouldStop :: TVar Bool
, ProgressManager -> Int
updateIntervalMs :: Int
}
newProgressManager :: TestConfiguration -> UTCTime -> IO ProgressManager
newProgressManager :: TestConfiguration -> UTCTime -> IO ProgressManager
newProgressManager TestConfiguration
config UTCTime
startTime = do
let totalSubs :: Int
totalSubs =
TestConfiguration
config.configNumEarlySubscriptions
Int -> Int -> Int
forall a. Num a => a -> a -> a
+ TestConfiguration
config.configNumConcurrentSubscriptions
Int -> Int -> Int
forall a. Num a => a -> a -> a
+ TestConfiguration
config.configNumValidationSubscriptions
initialState <-
ProgressState -> IO (TVar ProgressState)
forall a. a -> IO (TVar a)
newTVarIO (ProgressState -> IO (TVar ProgressState))
-> ProgressState -> IO (TVar ProgressState)
forall a b. (a -> b) -> a -> b
$
ProgressState
{ transactionsCompleted :: Int
transactionsCompleted = Int
0
, transactionsFailed :: Int
transactionsFailed = Int
0
, totalTransactions :: Int
totalTransactions = TestConfiguration
config.configNumTransactions
, subscriptionsStarted :: Int
subscriptionsStarted = Int
0
, subscriptionsCompleted :: Int
subscriptionsCompleted = Int
0
, totalSubscriptions :: Int
totalSubscriptions = Int
totalSubs
, testStartTime :: UTCTime
testStartTime = UTCTime
startTime
, lastUpdateTime :: UTCTime
lastUpdateTime = UTCTime
startTime
, currentPhase :: Text
currentPhase = Text
"Initializing"
, detailMessage :: Text
detailMessage = Text
""
}
shouldStop <- newTVarIO False
let updateInterval = case TestConfiguration
config.configNumTransactions of
Int
n | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
50 -> Int
500
Int
n | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
500 -> Int
1000
Int
_ -> Int
2000
pure $ ProgressManager initialState shouldStop updateInterval
reportTransactionCompleted :: ProgressManager -> Bool -> IO ()
reportTransactionCompleted :: ProgressManager -> Bool -> IO ()
reportTransactionCompleted ProgressManager
pm Bool
success = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar ProgressState -> (ProgressState -> ProgressState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar ProgressManager
pm.progressState ((ProgressState -> ProgressState) -> STM ())
-> (ProgressState -> ProgressState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ProgressState
s ->
if Bool
success
then ProgressState
s{transactionsCompleted = s.transactionsCompleted + 1}
else ProgressState
s{transactionsFailed = s.transactionsFailed + 1}
reportSubscriptionStarted :: ProgressManager -> IO ()
reportSubscriptionStarted :: ProgressManager -> IO ()
reportSubscriptionStarted ProgressManager
pm = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar ProgressState -> (ProgressState -> ProgressState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar ProgressManager
pm.progressState ((ProgressState -> ProgressState) -> STM ())
-> (ProgressState -> ProgressState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ProgressState
s ->
ProgressState
s{subscriptionsStarted = s.subscriptionsStarted + 1}
reportSubscriptionCompleted :: ProgressManager -> IO ()
reportSubscriptionCompleted :: ProgressManager -> IO ()
reportSubscriptionCompleted ProgressManager
pm = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar ProgressState -> (ProgressState -> ProgressState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar ProgressManager
pm.progressState ((ProgressState -> ProgressState) -> STM ())
-> (ProgressState -> ProgressState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ProgressState
s ->
ProgressState
s{subscriptionsCompleted = s.subscriptionsCompleted + 1}
reportPhaseChangeIO :: ProgressManager -> Text -> Text -> IO ()
reportPhaseChangeIO :: ProgressManager -> Text -> Text -> IO ()
reportPhaseChangeIO ProgressManager
pm Text
phase Text
detail = do
now <- IO UTCTime
getCurrentTime
atomically $ do
modifyTVar pm.progressState $ \ProgressState
s ->
ProgressState
s{currentPhase = phase, detailMessage = detail, lastUpdateTime = now}
formatProgress :: ProgressState -> String
formatProgress :: ProgressState -> [Char]
formatProgress ProgressState
s =
let elapsed :: NominalDiffTime
elapsed = UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime ProgressState
s.lastUpdateTime ProgressState
s.testStartTime
elapsedSecs :: Int
elapsedSecs = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round NominalDiffTime
elapsed :: Int
txProgress :: Int
txProgress = ProgressState
s.transactionsCompleted Int -> Int -> Int
forall a. Num a => a -> a -> a
+ ProgressState
s.transactionsFailed
txPercent :: Int
txPercent =
if ProgressState
s.totalTransactions Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then (Int
txProgress Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
100) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` ProgressState
s.totalTransactions
else Int
0
subPercent :: Int
subPercent =
if ProgressState
s.totalSubscriptions Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then (ProgressState
s.subscriptionsCompleted Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
100) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` ProgressState
s.totalSubscriptions
else Int
0
overallPercent :: Int
overallPercent = (Int
txPercent Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
70 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
subPercent Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
30) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
100
etaStr :: [Char]
etaStr =
if Int
txProgress Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 Bool -> Bool -> Bool
&& ProgressState
s.transactionsCompleted Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< ProgressState
s.totalTransactions
then
let rate :: Double
rate = Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
txProgress Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ NominalDiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac NominalDiffTime
elapsed :: Double
remaining :: Double
remaining = Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ProgressState
s.totalTransactions Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
txProgress) :: Double
etaSecs :: Int
etaSecs = Double -> Int
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
remaining Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
rate) :: Int
in [Char]
" | ETA: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
etaSecs [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"s"
else [Char]
""
txStatus :: [Char]
txStatus =
Int -> [Char]
forall a. Show a => a -> [Char]
show Int
txProgress
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"/"
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ProgressState
s.totalTransactions
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" ("
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
txPercent
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"%)"
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ (if ProgressState
s.transactionsFailed Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 then [Char]
" [" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ProgressState
s.transactionsFailed [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" failed]" else [Char]
"")
subStatus :: [Char]
subStatus =
Int -> [Char]
forall a. Show a => a -> [Char]
show ProgressState
s.subscriptionsCompleted
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"/"
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ProgressState
s.totalSubscriptions
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" ("
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
subPercent
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"%)"
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" ["
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ProgressState
s.subscriptionsStarted
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" started]"
in [Char]
"["
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
elapsedSecs
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"s] "
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Text -> [Char]
unpack ProgressState
s.currentPhase
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" | Tx: "
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
txStatus
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" | Subs: "
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
subStatus
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" | Overall: "
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
overallPercent
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"%"
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
etaStr
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ (if Bool -> Bool
not (Text -> Bool
Text.null ProgressState
s.detailMessage) then [Char]
" | " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Text -> [Char]
unpack ProgressState
s.detailMessage else [Char]
"")
startProgressReporting :: ProgressManager -> IO ()
startProgressReporting :: ProgressManager -> IO ()
startProgressReporting ProgressManager
pm = do
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO ()
progressLoop
where
progressLoop :: IO ()
progressLoop = do
shouldStop <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar ProgressManager
pm.shouldStop
if shouldStop
then do
state <- atomically $ readTVar pm.progressState
putStrLn $ "\n" ++ formatProgress state
putStrLn "Progress reporting stopped."
else do
state <- atomically $ readTVar pm.progressState
putStr $ "\r" ++ formatProgress state
hFlush stdout
threadDelay (pm.updateIntervalMs * 1000)
progressLoop
stopProgressReporting :: ProgressManager -> IO ()
stopProgressReporting :: ProgressManager -> IO ()
stopProgressReporting ProgressManager
pm = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar ProgressManager
pm.shouldStop Bool
True
updateHashChainGeneric ::
forall position.
(Show position) =>
Int64 ->
position ->
StreamId ->
ComprehensiveTestPayload ->
Map.Map StreamId Int ->
Map.Map Text Int ->
(Int64, Map.Map StreamId Int, Map.Map Text Int)
updateHashChainGeneric :: forall position.
Show position =>
Int64
-> position
-> StreamId
-> ComprehensiveTestPayload
-> Map StreamId Int
-> Map Text Int
-> (Int64, Map StreamId Int, Map Text Int)
updateHashChainGeneric Int64
prevHash position
position StreamId
streamId ComprehensiveTestPayload
payload Map StreamId Int
streamHashes Map Text Int
textHashes =
let
(Int
streamHashVal, Map StreamId Int
newStreamHashes) = case StreamId -> Map StreamId Int -> Maybe Int
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamId Map StreamId Int
streamHashes of
Just Int
cached -> (Int
cached, Map StreamId Int
streamHashes)
Maybe Int
Nothing ->
let computed :: Int
computed = [Char] -> Int
fastTextHash (StreamId -> [Char]
forall a. Show a => a -> [Char]
show StreamId
streamId)
in (Int
computed, StreamId -> Int -> Map StreamId Int -> Map StreamId Int
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert StreamId
streamId Int
computed Map StreamId Int
streamHashes)
(Int
textHashVal, Map Text Int
newTextHashes) = case Text -> Map Text Int -> Maybe Int
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup ComprehensiveTestPayload
payload.text Map Text Int
textHashes of
Just Int
cached -> (Int
cached, Map Text Int
textHashes)
Maybe Int
Nothing ->
let computed :: Int
computed = [Char] -> Int
fastTextHash (Text -> [Char]
unpack ComprehensiveTestPayload
payload.text)
in (Int
computed, Text -> Int -> Map Text Int -> Map Text Int
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert ComprehensiveTestPayload
payload.text Int
computed Map Text Int
textHashes)
positionHash :: Int
positionHash = [Char] -> Int
fastTextHash (position -> [Char]
forall a. Show a => a -> [Char]
show position
position)
combined :: Int64
combined =
(Int64
prevHash Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
31)
Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
positionHash Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
37)
Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int
forall a. Num a => a -> a
abs Int
streamHashVal) Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
43)
Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral ComprehensiveTestPayload
payload.compValue Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
47)
Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int
forall a. Num a => a -> a
abs Int
textHashVal) Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
53)
newHash :: Int64
newHash = Int64
combined Int64 -> Int64 -> Int64
forall a. Integral a => a -> a -> a
`mod` Int64
982451653
in
(Int64
newHash, Map StreamId Int
newStreamHashes, Map Text Int
newTextHashes)
where
fastTextHash :: String -> Int
fastTextHash :: [Char] -> Int
fastTextHash = (Int -> Char -> Int) -> Int -> [Char] -> Int
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (\Int
acc Char
c -> Int
acc Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
31 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Char -> Int
forall a. Enum a => a -> Int
fromEnum Char
c) Int
0
waitForCompletion :: forall position. IORef (SubscriptionState position) -> IO ()
waitForCompletion :: forall position. IORef (SubscriptionState position) -> IO ()
waitForCompletion IORef (SubscriptionState position)
stateRef = do
state <- IORef (SubscriptionState position)
-> IO (SubscriptionState position)
forall a. IORef a -> IO a
readIORef IORef (SubscriptionState position)
stateRef
result <- timeout 60_000_000 $ takeMVar state.completionMVar
case result of
Maybe ()
Nothing -> [Char] -> IO ()
forall a. HasCallStack => [Char] -> IO a
assertFailure ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Subscription " [Char] -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> [Char]
unpack SubscriptionState position
state.subscriptionId [Char] -> ShowS
forall a. Semigroup a => a -> a -> a
<> [Char]
" timed out after 60 seconds"
Just () -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
generateTransactionPlans :: TestConfiguration -> IO [TransactionPlan]
generateTransactionPlans :: TestConfiguration -> IO [TransactionPlan]
generateTransactionPlans TestConfiguration
config = do
baseTime <- IO UTCTime
getCurrentTime
forM [1 .. config.configNumTransactions] $ \Int
planId -> do
eventCount <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (TestConfiguration
config.configMinEventsPerTx, TestConfiguration
config.configMaxEventsPerTx)
numStreams <- randomRIO (config.configMinStreamsPerTx, config.configMaxStreamsPerTx)
targetStreams <- replicateM numStreams (StreamId <$> UUID.nextRandom)
value <- randomRIO (1, 10000)
textSuffix <- pack . show <$> randomRIO (1000 :: Int, 9999)
let text = Text
"transaction-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
pack (Int -> [Char]
forall a. Show a => a -> [Char]
show Int
planId) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
textSuffix
bytes <- show <$> randomRIO (100000 :: Int, 999999)
delayBeforeStart <- randomRIO (0, config.configTxExecutionWindowMs * 1000)
expectedSlowness <- randomRIO (1, config.configMaxSlownessFactor)
pure $
TransactionPlan
{ planId = planId
, planEventCount = eventCount
, targetStreams = targetStreams
, basePayload =
ComprehensiveTestPayload
{ compValue = value
, text = text
, bytes = bytes
, timestamp = baseTime
}
, delayBeforeStart = delayBeforeStart
, expectedSlowness = expectedSlowness
}
startHashTrackingSubscription ::
forall backend.
(EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) =>
BackendHandle backend ->
Text ->
UTCTime ->
Maybe ProgressManager ->
IO (SubscriptionHandle backend, IORef (SubscriptionState (Cursor backend)))
startHashTrackingSubscription :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend
-> Text
-> UTCTime
-> Maybe ProgressManager
-> IO
(SubscriptionHandle backend,
IORef (SubscriptionState (Cursor backend)))
startHashTrackingSubscription BackendHandle backend
store Text
subId UTCTime
startTime Maybe ProgressManager
mProgressManager =
BackendHandle backend
-> Text
-> UTCTime
-> Maybe ProgressManager
-> StartupPosition backend
-> IO
(SubscriptionHandle backend,
IORef (SubscriptionState (Cursor backend)))
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend
-> Text
-> UTCTime
-> Maybe ProgressManager
-> StartupPosition backend
-> IO
(SubscriptionHandle backend,
IORef (SubscriptionState (Cursor backend)))
startHashTrackingSubscriptionWithPosition BackendHandle backend
store Text
subId UTCTime
startTime Maybe ProgressManager
mProgressManager StartupPosition backend
forall backend. StartupPosition backend
FromBeginning
startHashTrackingSubscriptionWithPosition ::
forall backend.
(EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) =>
BackendHandle backend ->
Text ->
UTCTime ->
Maybe ProgressManager ->
StartupPosition backend ->
IO (SubscriptionHandle backend, IORef (SubscriptionState (Cursor backend)))
startHashTrackingSubscriptionWithPosition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend
-> Text
-> UTCTime
-> Maybe ProgressManager
-> StartupPosition backend
-> IO
(SubscriptionHandle backend,
IORef (SubscriptionState (Cursor backend)))
startHashTrackingSubscriptionWithPosition BackendHandle backend
store Text
subId UTCTime
startTime Maybe ProgressManager
mProgressManager StartupPosition backend
startPos = do
completionMVar <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
stateRef <-
newIORef $
SubscriptionState
{ subscriptionId = subId
, startedAt = startTime
, currentHash = 0
, subEventCount = 0
, firstEventTime = Nothing
, lastEventTime = Nothing
, completionReceived = False
, completionReceivedAt = Nothing
, expectedTotalEvents = Nothing
, completionMVar = completionMVar
, streamIdHashes = Map.empty
, textHashes = Map.empty
, positionsSeen = []
}
let handler =
( forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @ComprehensiveTestEvent
, \EventEnvelope ComprehensiveTestEvent backend
envelope -> do
now <- IO UTCTime -> IO UTCTime
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
liftIO $ atomicModifyIORef' stateRef $ \SubscriptionState (Cursor backend)
state ->
let position :: Cursor backend
position = EventEnvelope ComprehensiveTestEvent backend
envelope.position
(Int64
newHash, Map StreamId Int
newStreamHashes, Map Text Int
newTextHashes) =
Int64
-> Cursor backend
-> StreamId
-> ComprehensiveTestPayload
-> Map StreamId Int
-> Map Text Int
-> (Int64, Map StreamId Int, Map Text Int)
forall position.
Show position =>
Int64
-> position
-> StreamId
-> ComprehensiveTestPayload
-> Map StreamId Int
-> Map Text Int
-> (Int64, Map StreamId Int, Map Text Int)
updateHashChainGeneric
SubscriptionState (Cursor backend)
state.currentHash
Cursor backend
position
EventEnvelope ComprehensiveTestEvent backend
envelope.streamId
EventEnvelope ComprehensiveTestEvent backend
envelope.payload
SubscriptionState (Cursor backend)
state.streamIdHashes
SubscriptionState (Cursor backend)
state.textHashes
newState :: SubscriptionState (Cursor backend)
newState =
SubscriptionState (Cursor backend)
state
{ currentHash = newHash
, streamIdHashes = newStreamHashes
, textHashes = newTextHashes
, subEventCount = state.subEventCount + 1
, firstEventTime = case state.firstEventTime of
Maybe UTCTime
Nothing -> UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
now
Just UTCTime
t -> UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
t
, lastEventTime = Just now
, positionsSeen = state.positionsSeen ++ [position]
}
in (SubscriptionState (Cursor backend)
newState, ())
pure Continue
)
(Proxy ComprehensiveTestEvent,
EventEnvelope ComprehensiveTestEvent backend
-> IO SubscriptionResult)
-> EventMatcher '[CompletionEvent] backend IO
-> EventMatcher
'[ComprehensiveTestEvent, CompletionEvent] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? ( forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @CompletionEvent
, \EventEnvelope CompletionEvent backend
envelope -> do
now <- IO UTCTime -> IO UTCTime
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
liftIO $ atomicModifyIORef' stateRef $ \SubscriptionState (Cursor backend)
state ->
let newState :: SubscriptionState (Cursor backend)
newState =
SubscriptionState (Cursor backend)
state
{ completionReceived = True
, completionReceivedAt = Just now
, expectedTotalEvents = Just envelope.payload.totalExpectedEvents
, lastEventTime = Just now
}
in (SubscriptionState (Cursor backend)
newState, ())
liftIO $ putMVar completionMVar ()
case mProgressManager of
Just ProgressManager
pm -> IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ProgressManager -> IO ()
reportSubscriptionCompleted ProgressManager
pm
Maybe ProgressManager
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
pure Stop
)
(Proxy CompletionEvent,
EventEnvelope CompletionEvent backend -> IO SubscriptionResult)
-> EventMatcher '[] backend IO
-> EventMatcher '[CompletionEvent] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventMatcher '[] backend IO
forall backend (m :: * -> *). EventMatcher '[] backend m
MatchEnd
subscriptionHandle <- subscribe store handler (EventSelector AllStreams startPos)
case mProgressManager of
Just ProgressManager
pm -> ProgressManager -> IO ()
reportSubscriptionStarted ProgressManager
pm
Maybe ProgressManager
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
pure (subscriptionHandle, stateRef)
executeTransactionPlan ::
forall backend.
(EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) =>
BackendHandle backend ->
TransactionPlan ->
Maybe ProgressManager ->
IO (Either String ())
executeTransactionPlan :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend
-> TransactionPlan
-> Maybe ProgressManager
-> IO (Either [Char] ())
executeTransactionPlan BackendHandle backend
store TransactionPlan
plan Maybe ProgressManager
mProgressManager = do
Int -> IO ()
threadDelay TransactionPlan
plan.delayBeforeStart
let makeEvent :: Int -> SomeLatestEvent
makeEvent Int
i =
let payload :: ComprehensiveTestPayload
payload =
TransactionPlan
plan.basePayload
{ compValue = plan.basePayload.compValue + i
, text = plan.basePayload.text <> "-" <> pack (show i)
}
in Proxy ComprehensiveTestEvent
-> CurrentPayloadType ComprehensiveTestEvent -> SomeLatestEvent
forall (event :: Symbol).
Event event =>
Proxy event -> CurrentPayloadType event -> SomeLatestEvent
SomeLatestEvent (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @ComprehensiveTestEvent) CurrentPayloadType ComprehensiveTestEvent
ComprehensiveTestPayload
payload
let events :: [SomeLatestEvent]
events = (Int -> SomeLatestEvent) -> [Int] -> [SomeLatestEvent]
forall a b. (a -> b) -> [a] -> [b]
map Int -> SomeLatestEvent
makeEvent [Int
1 .. TransactionPlan
plan.planEventCount]
let streamsWithEvents :: [(StreamId, SomeLatestEvent)]
streamsWithEvents = [StreamId] -> [SomeLatestEvent] -> [(StreamId, SomeLatestEvent)]
forall a b. [a] -> [b] -> [(a, b)]
zip ([StreamId] -> [StreamId]
forall a. HasCallStack => [a] -> [a]
cycle TransactionPlan
plan.targetStreams) [SomeLatestEvent]
events
groupedEvents :: Map StreamId [SomeLatestEvent]
groupedEvents =
([SomeLatestEvent] -> [SomeLatestEvent] -> [SomeLatestEvent])
-> [(StreamId, [SomeLatestEvent])]
-> Map StreamId [SomeLatestEvent]
forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
Map.fromListWith
[SomeLatestEvent] -> [SomeLatestEvent] -> [SomeLatestEvent]
forall a. [a] -> [a] -> [a]
(++)
[(StreamId
streamId, [SomeLatestEvent
event]) | (StreamId
streamId, SomeLatestEvent
event) <- [(StreamId, SomeLatestEvent)]
streamsWithEvents]
eventBatches :: Map StreamId (StreamWrite [] SomeLatestEvent backend)
eventBatches = ([SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend)
-> Map StreamId [SomeLatestEvent]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map (\[SomeLatestEvent]
es -> ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream [SomeLatestEvent]
es) Map StreamId [SomeLatestEvent]
groupedEvents
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (TransactionPlan
plan.expectedSlowness Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
5) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
Int -> IO ()
threadDelay (TransactionPlan
plan.expectedSlowness Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
50_000)
let tryInsert :: Int -> IO (Either [Char] ())
tryInsert Int
n = do
result <- BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction Map StreamId (StreamWrite [] SomeLatestEvent backend)
eventBatches)
case result of
FailedInsertion EventStoreError backend
err ->
if (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> (Int
0 :: Int))
then Int -> IO (Either [Char] ())
tryInsert (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- (Int
1 :: Int))
else do
case Maybe ProgressManager
mProgressManager of
Just ProgressManager
pm -> ProgressManager -> Bool -> IO ()
reportTransactionCompleted ProgressManager
pm Bool
False
Maybe ProgressManager
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Either [Char] () -> IO (Either [Char] ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either [Char] () -> IO (Either [Char] ()))
-> Either [Char] () -> IO (Either [Char] ())
forall a b. (a -> b) -> a -> b
$ [Char] -> Either [Char] ()
forall a b. a -> Either a b
Left ([Char] -> Either [Char] ()) -> [Char] -> Either [Char] ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Transaction " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show TransactionPlan
plan.planId [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" failed: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> [Char]
forall a. Show a => a -> [Char]
show EventStoreError backend
err
SuccessfulInsertion InsertionSuccess backend
_ -> do
case Maybe ProgressManager
mProgressManager of
Just ProgressManager
pm -> ProgressManager -> Bool -> IO ()
reportTransactionCompleted ProgressManager
pm Bool
True
Maybe ProgressManager
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Either [Char] () -> IO (Either [Char] ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either [Char] () -> IO (Either [Char] ()))
-> Either [Char] () -> IO (Either [Char] ())
forall a b. (a -> b) -> a -> b
$ () -> Either [Char] ()
forall a b. b -> Either a b
Right ()
Int -> IO (Either [Char] ())
tryInsert Int
10
testComprehensiveConsistencyWithConfig ::
forall backend.
(EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) =>
EventStoreTestRunner backend ->
TestConfiguration ->
Assertion
testComprehensiveConsistencyWithConfig :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
EventStoreTestRunner backend -> TestConfiguration -> IO ()
testComprehensiveConsistencyWithConfig EventStoreTestRunner backend
runner TestConfiguration
config = do
[Char] -> IO ()
putStrLn [Char]
"\n=== Starting Comprehensive Consistency Test ==="
[Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$
[Char]
"Config: "
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show TestConfiguration
config.configNumTransactions
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" transactions over "
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show TestConfiguration
config.configTxExecutionWindowMs
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"ms, "
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show (TestConfiguration
config.configNumEarlySubscriptions Int -> Int -> Int
forall a. Num a => a -> a -> a
+ TestConfiguration
config.configNumConcurrentSubscriptions Int -> Int -> Int
forall a. Num a => a -> a -> a
+ TestConfiguration
config.configNumValidationSubscriptions)
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" subscriptions over "
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show TestConfiguration
config.configSubExecutionWindowMs
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"ms"
testStartTime <- IO UTCTime
getCurrentTime
progressManager <- newProgressManager config testStartTime
startProgressReporting progressManager
withStore runner $ \BackendHandle backend
store -> do
transactionPlans <- TestConfiguration -> IO [TransactionPlan]
generateTransactionPlans TestConfiguration
config
let totalEvents = [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int] -> Int) -> [Int] -> Int
forall a b. (a -> b) -> a -> b
$ (TransactionPlan -> Int) -> [TransactionPlan] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (.planEventCount) [TransactionPlan]
transactionPlans
reportPhaseChangeIO progressManager "Planning" $ "Generated " <> pack (show (length transactionPlans)) <> " transaction plans, " <> pack (show totalEvents) <> " total events"
subscriptionsRef <- newIORef []
reportPhaseChangeIO progressManager "Starting" "Launching concurrent transactions and subscriptions"
transactionResults <- async $ do
reportPhaseChangeIO progressManager "Transactions" $ "Executing " <> pack (show (length transactionPlans)) <> " transactions over " <> pack (show config.configTxExecutionWindowMs) <> "ms window"
forConcurrently_ transactionPlans $ \TransactionPlan
plan -> do
result <- BackendHandle backend
-> TransactionPlan
-> Maybe ProgressManager
-> IO (Either [Char] ())
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend
-> TransactionPlan
-> Maybe ProgressManager
-> IO (Either [Char] ())
executeTransactionPlan BackendHandle backend
store TransactionPlan
plan (ProgressManager -> Maybe ProgressManager
forall a. a -> Maybe a
Just ProgressManager
progressManager)
case result of
Left [Char]
_err -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Right ()
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
earlySubscriptionsAsync <- async $ do
reportPhaseChangeIO progressManager "Early Subs" $ "Starting " <> pack (show config.configNumEarlySubscriptions) <> " early subscriptions"
earlyStartTime <- getCurrentTime
subs <- forConcurrently [1 .. config.configNumEarlySubscriptions] $ \Int
i -> do
let subId :: Text
subId = Text
"early-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
pack (Int -> [Char]
forall a. Show a => a -> [Char]
show Int
i)
(handle, stateRef) <- BackendHandle backend
-> Text
-> UTCTime
-> Maybe ProgressManager
-> IO
(SubscriptionHandle backend,
IORef (SubscriptionState (Cursor backend)))
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend
-> Text
-> UTCTime
-> Maybe ProgressManager
-> IO
(SubscriptionHandle backend,
IORef (SubscriptionState (Cursor backend)))
startHashTrackingSubscription BackendHandle backend
store Text
subId UTCTime
earlyStartTime (ProgressManager -> Maybe ProgressManager
forall a. a -> Maybe a
Just ProgressManager
progressManager)
pure (handle, stateRef)
mapM_ (modifyIORef' subscriptionsRef . (:)) subs
pure subs
concurrentSubscriptionsAsync <- async $ do
reportPhaseChangeIO progressManager "Concurrent Subs" $ "Starting " <> pack (show config.configNumConcurrentSubscriptions) <> " concurrent subscriptions over " <> pack (show config.configSubExecutionWindowMs) <> "ms window"
let startPos = StartupPosition backend
forall backend. StartupPosition backend
FromBeginning
subs <- forConcurrently [1 .. config.configNumConcurrentSubscriptions] $ \Int
i -> do
delay <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, TestConfiguration
config.configSubExecutionWindowMs Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)
threadDelay delay
subStartTime <- getCurrentTime
let subId = Text
"concurrent-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
pack (Int -> [Char]
forall a. Show a => a -> [Char]
show Int
i)
(handle, stateRef) <- startHashTrackingSubscriptionWithPosition store subId subStartTime (Just progressManager) startPos
pure (handle, stateRef)
mapM_ (modifyIORef' subscriptionsRef . (:)) subs
pure subs
reportPhaseChangeIO progressManager "Waiting" "Waiting for concurrent activities to complete"
wait transactionResults
_ <- wait earlySubscriptionsAsync
_ <- wait concurrentSubscriptionsAsync
reportPhaseChangeIO progressManager "Completed" "All transactions and concurrent subscriptions completed"
reportPhaseChangeIO progressManager "Validation Subs" $ "Starting " <> pack (show config.configNumValidationSubscriptions) <> " validation subscriptions"
validationTime <- getCurrentTime
validationSubscriptions <- forM [1 .. config.configNumValidationSubscriptions] $ \Int
i -> do
let subId :: Text
subId = Text
"validation-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
pack (Int -> [Char]
forall a. Show a => a -> [Char]
show Int
i)
(handle, stateRef) <- BackendHandle backend
-> Text
-> UTCTime
-> Maybe ProgressManager
-> IO
(SubscriptionHandle backend,
IORef (SubscriptionState (Cursor backend)))
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
BackendHandle backend
-> Text
-> UTCTime
-> Maybe ProgressManager
-> IO
(SubscriptionHandle backend,
IORef (SubscriptionState (Cursor backend)))
startHashTrackingSubscription BackendHandle backend
store Text
subId UTCTime
validationTime (ProgressManager -> Maybe ProgressManager
forall a. a -> Maybe a
Just ProgressManager
progressManager)
pure (handle, stateRef)
mapM_ (modifyIORef' subscriptionsRef . (:)) validationSubscriptions
reportPhaseChangeIO progressManager "Activating" "Waiting for subscriptions to activate"
threadDelay 500_000
reportPhaseChangeIO progressManager "Completion" "Sending completion event"
completionTime <- getCurrentTime
completionStream <- StreamId <$> UUID.nextRandom
let completionPayload =
CompletionPayload
{ completionId :: Text
completionId = Text
"test-completion-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
pack (Int -> [Char]
forall a. Show a => a -> [Char]
show (Int -> [Char]) -> Int -> [Char]
forall a b. (a -> b) -> a -> b
$ [TransactionPlan] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TransactionPlan]
transactionPlans)
, completionTotalTransactions :: Int
completionTotalTransactions = [TransactionPlan] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TransactionPlan]
transactionPlans
, totalExpectedEvents :: Int
totalExpectedEvents = Int
totalEvents
, completedAt :: UTCTime
completedAt = UTCTime
completionTime
}
void $
insertEvents store Nothing $
Transaction
( Map.singleton completionStream $
StreamWrite
NoStream
[SomeLatestEvent (Proxy @CompletionEvent) completionPayload]
)
reportPhaseChangeIO progressManager "Waiting Complete" "Waiting for all subscriptions to receive completion event"
allSubscriptions <- readIORef subscriptionsRef
reportPhaseChangeIO progressManager "Waiting Complete" $ "Waiting for " <> pack (show (length allSubscriptions)) <> " subscriptions to complete"
let completionMVars = ((SubscriptionHandle backend,
IORef (SubscriptionState (Cursor backend)))
-> IORef (SubscriptionState (Cursor backend)))
-> [(SubscriptionHandle backend,
IORef (SubscriptionState (Cursor backend)))]
-> [IORef (SubscriptionState (Cursor backend))]
forall a b. (a -> b) -> [a] -> [b]
map (\(SubscriptionHandle backend
_, IORef (SubscriptionState (Cursor backend))
stateRef) -> IORef (SubscriptionState (Cursor backend))
stateRef) [(SubscriptionHandle backend,
IORef (SubscriptionState (Cursor backend)))]
allSubscriptions
forConcurrently_ completionMVars $ \IORef (SubscriptionState (Cursor backend))
stateRef ->
IORef (SubscriptionState (Cursor backend)) -> IO ()
forall position. IORef (SubscriptionState position) -> IO ()
waitForCompletion IORef (SubscriptionState (Cursor backend))
stateRef
reportPhaseChangeIO progressManager "Analyzing" "All subscriptions completed, collecting results"
results <- forM allSubscriptions $ \(SubscriptionHandle backend
handle, IORef (SubscriptionState (Cursor backend))
stateRef) -> do
state <- IORef (SubscriptionState (Cursor backend))
-> IO (SubscriptionState (Cursor backend))
forall a. IORef a -> IO a
readIORef IORef (SubscriptionState (Cursor backend))
stateRef
completedAt <- getCurrentTime
handle.cancel
let processingDurationMs = case SubscriptionState (Cursor backend)
state.completionReceivedAt of
Maybe UTCTime
Nothing -> Int64
0
Just UTCTime
eventCompletionTime ->
Double -> Int64
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int64) -> Double -> Int64
forall a b. (a -> b) -> a -> b
$ (Rational -> Double
forall a. Fractional a => Rational -> a
fromRational (Rational -> Double) -> Rational -> Double
forall a b. (a -> b) -> a -> b
$ NominalDiffTime -> Rational
forall a. Real a => a -> Rational
toRational (NominalDiffTime -> Rational) -> NominalDiffTime -> Rational
forall a b. (a -> b) -> a -> b
$ UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
eventCompletionTime SubscriptionState (Cursor backend)
state.startedAt) Double -> Double -> Double
forall a. Num a => a -> a -> a
* (Double
1000 :: Double)
pure $
TestSubscriptionResult
{ resultSubscriptionId = state.subscriptionId
, resultStartedAt = state.startedAt
, resultFinalHash = state.currentHash
, resultEventCount = state.subEventCount
, resultFirstEventTime = state.firstEventTime
, resultLastEventTime = state.lastEventTime
, resultCompletedAt = completedAt
, resultCompletionReceived = state.completionReceived
, resultCompletionReceivedAt = state.completionReceivedAt
, resultExpectedTotalEvents = state.expectedTotalEvents
, resultProcessingDurationMs = processingDurationMs
, resultPositionsSeen = state.positionsSeen
}
stopProgressReporting progressManager
analyzeResults results
analyzeResults :: forall position. [TestSubscriptionResult position] -> Assertion
analyzeResults :: forall position. [TestSubscriptionResult position] -> IO ()
analyzeResults [TestSubscriptionResult position]
results = do
[Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"\n=== Analysis of " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ([TestSubscriptionResult position] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TestSubscriptionResult position]
results) [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" subscriptions ==="
let ([TestSubscriptionResult position]
earlyResults, [TestSubscriptionResult position]
concurrentResults, [TestSubscriptionResult position]
validationResults) =
(TestSubscriptionResult position
-> ([TestSubscriptionResult position],
[TestSubscriptionResult position],
[TestSubscriptionResult position])
-> ([TestSubscriptionResult position],
[TestSubscriptionResult position],
[TestSubscriptionResult position]))
-> ([TestSubscriptionResult position],
[TestSubscriptionResult position],
[TestSubscriptionResult position])
-> [TestSubscriptionResult position]
-> ([TestSubscriptionResult position],
[TestSubscriptionResult position],
[TestSubscriptionResult position])
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr
( \TestSubscriptionResult position
r ([TestSubscriptionResult position]
e, [TestSubscriptionResult position]
c, [TestSubscriptionResult position]
v) ->
if Text
"early-" Text -> Text -> Bool
`isPrefixOf` TestSubscriptionResult position
r.resultSubscriptionId
then (TestSubscriptionResult position
r TestSubscriptionResult position
-> [TestSubscriptionResult position]
-> [TestSubscriptionResult position]
forall a. a -> [a] -> [a]
: [TestSubscriptionResult position]
e, [TestSubscriptionResult position]
c, [TestSubscriptionResult position]
v)
else
if Text
"concurrent-" Text -> Text -> Bool
`isPrefixOf` TestSubscriptionResult position
r.resultSubscriptionId
then ([TestSubscriptionResult position]
e, TestSubscriptionResult position
r TestSubscriptionResult position
-> [TestSubscriptionResult position]
-> [TestSubscriptionResult position]
forall a. a -> [a] -> [a]
: [TestSubscriptionResult position]
c, [TestSubscriptionResult position]
v)
else ([TestSubscriptionResult position]
e, [TestSubscriptionResult position]
c, TestSubscriptionResult position
r TestSubscriptionResult position
-> [TestSubscriptionResult position]
-> [TestSubscriptionResult position]
forall a. a -> [a] -> [a]
: [TestSubscriptionResult position]
v)
)
([], [], [])
[TestSubscriptionResult position]
results
[Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Early subscriptions: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ([TestSubscriptionResult position] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TestSubscriptionResult position]
earlyResults)
[Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Concurrent subscriptions: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ([TestSubscriptionResult position] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TestSubscriptionResult position]
concurrentResults)
[Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Validation subscriptions: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ([TestSubscriptionResult position] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TestSubscriptionResult position]
validationResults)
let eventCounts :: [Int]
eventCounts = (TestSubscriptionResult position -> Int)
-> [TestSubscriptionResult position] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (.resultEventCount) [TestSubscriptionResult position]
results
minEvents :: Int
minEvents = [Int] -> Int
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
minimum [Int]
eventCounts
maxEvents :: Int
maxEvents = [Int] -> Int
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum [Int]
eventCounts
let avgEvents :: Double
avgEvents = Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral ([Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum [Int]
eventCounts) Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral ([Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int]
eventCounts) :: Double
[Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$
[Char]
"Event counts - Min: "
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
minEvents
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
", Max: "
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
maxEvents
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
", Avg: "
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show (Double -> Int
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round Double
avgEvents :: Int)
let completedSubs :: [TestSubscriptionResult position]
completedSubs = (TestSubscriptionResult position -> Bool)
-> [TestSubscriptionResult position]
-> [TestSubscriptionResult position]
forall a. (a -> Bool) -> [a] -> [a]
filter (.resultCompletionReceived) [TestSubscriptionResult position]
results
incompleteSubs :: [TestSubscriptionResult position]
incompleteSubs = (TestSubscriptionResult position -> Bool)
-> [TestSubscriptionResult position]
-> [TestSubscriptionResult position]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool)
-> (TestSubscriptionResult position -> Bool)
-> TestSubscriptionResult position
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (.resultCompletionReceived)) [TestSubscriptionResult position]
results
[Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$
[Char]
"Completion status - Completed: "
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ([TestSubscriptionResult position] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TestSubscriptionResult position]
completedSubs)
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
", Incomplete: "
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ([TestSubscriptionResult position] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TestSubscriptionResult position]
incompleteSubs)
let hashes :: [Int64]
hashes = (TestSubscriptionResult position -> Int64)
-> [TestSubscriptionResult position] -> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map (.resultFinalHash) [TestSubscriptionResult position]
results
uniqueHashes :: Int
uniqueHashes = Set Int64 -> Int
forall a. Set a -> Int
Set.size (Set Int64 -> Int) -> Set Int64 -> Int
forall a b. (a -> b) -> a -> b
$ [Int64] -> Set Int64
forall a. Ord a => [a] -> Set a
Set.fromList [Int64]
hashes
[Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Unique final hashes: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
uniqueHashes
[Char] -> IO ()
putStrLn [Char]
"\n=== SUMMARY STATISTICS ==="
let processingDurations :: [Int64]
processingDurations = (TestSubscriptionResult position -> Int64)
-> [TestSubscriptionResult position] -> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map (.resultProcessingDurationMs) [TestSubscriptionResult position]
results
minDuration :: Int64
minDuration = [Int64] -> Int64
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
minimum [Int64]
processingDurations
maxDuration :: Int64
maxDuration = [Int64] -> Int64
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum [Int64]
processingDurations
avgDuration :: Double
avgDuration = Int64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral ([Int64] -> Int64
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum [Int64]
processingDurations) Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral ([Int64] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int64]
processingDurations) :: Double
[Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$
[Char]
"Processing durations - Min: "
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int64 -> [Char]
forall a. Show a => a -> [Char]
show Int64
minDuration
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"ms, Max: "
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int64 -> [Char]
forall a. Show a => a -> [Char]
show Int64
maxDuration
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"ms, Avg: "
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show (Double -> Int
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round Double
avgDuration :: Int)
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"ms"
if Int
uniqueHashes Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
then do
[Char] -> IO ()
putStrLn [Char]
"\n✅ Hash consistency verified"
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
minEvents Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
maxEvents) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
[Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$
[Char]
"⚠️ Event count mismatch: min=" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
minEvents [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
", max=" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
maxEvents
let validationHashes :: [Int64]
validationHashes = (TestSubscriptionResult position -> Int64)
-> [TestSubscriptionResult position] -> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map (.resultFinalHash) [TestSubscriptionResult position]
validationResults
earlyHashes :: [Int64]
earlyHashes = (TestSubscriptionResult position -> Int64)
-> [TestSubscriptionResult position] -> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map (.resultFinalHash) [TestSubscriptionResult position]
earlyResults
case ([Int64]
validationHashes, [Int64]
earlyHashes) of
(Int64
vh : [Int64]
_, Int64
eh : [Int64]
_) -> [Char] -> Int64 -> Int64 -> IO ()
forall a. (Eq a, Show a, HasCallStack) => [Char] -> a -> a -> IO ()
assertEqual [Char]
"Validation vs early mismatch" Int64
eh Int64
vh
([Int64], [Int64])
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
else do
[Char] -> IO ()
putStrLn [Char]
"\n❌ Hash mismatch detected"
let hashGroups :: Map Int64 Int
hashGroups = (Int -> Int -> Int) -> [(Int64, Int)] -> Map Int64 Int
forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
Map.fromListWith (Int -> Int -> Int
forall a. Num a => a -> a -> a
(+) :: Int -> Int -> Int) [(Int64
h, (Int
1 :: Int)) | Int64
h <- [Int64]
hashes]
[(Int64, Int)] -> ((Int64, Int) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map Int64 Int -> [(Int64, Int)]
forall k a. Map k a -> [(k, a)]
Map.toList Map Int64 Int
hashGroups) (((Int64, Int) -> IO ()) -> IO ())
-> ((Int64, Int) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Int64
hash, Int
count) ->
[Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
" Hash " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int64 -> [Char]
forall a. Show a => a -> [Char]
show Int64
hash [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
": " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
count [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" subscriptions"
[Char] -> IO ()
forall a. HasCallStack => [Char] -> IO a
assertFailure ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
uniqueHashes [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" different hashes across " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ([TestSubscriptionResult position] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TestSubscriptionResult position]
results) [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" subscriptions"
generateTestLabel :: String -> TestConfiguration -> String
generateTestLabel :: [Char] -> TestConfiguration -> [Char]
generateTestLabel [Char]
testType TestConfiguration
config =
let totalSubs :: Int
totalSubs =
TestConfiguration
config.configNumEarlySubscriptions
Int -> Int -> Int
forall a. Num a => a -> a -> a
+ TestConfiguration
config.configNumConcurrentSubscriptions
Int -> Int -> Int
forall a. Num a => a -> a -> a
+ TestConfiguration
config.configNumValidationSubscriptions
in [Char]
testType
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" consistency test ("
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show TestConfiguration
config.configNumTransactions
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" tx, "
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
totalSubs
[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" subs)"
orderingTests ::
forall backend.
(EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) =>
EventStoreTestRunner backend ->
[TestTree]
orderingTests :: forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
EventStoreTestRunner backend -> [TestTree]
orderingTests EventStoreTestRunner backend
runner =
[ [Char] -> [TestTree] -> TestTree
testGroup
[Char]
"Debug Tests"
[ [Char] -> IO () -> TestTree
testCase
([Char] -> TestConfiguration -> [Char]
generateTestLabel [Char]
"Debug" TestConfiguration
debugTestConfig)
(EventStoreTestRunner backend -> TestConfiguration -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
EventStoreTestRunner backend -> TestConfiguration -> IO ()
testComprehensiveConsistencyWithConfig EventStoreTestRunner backend
runner TestConfiguration
debugTestConfig)
]
, [Char] -> [TestTree] -> TestTree
testGroup
[Char]
"Comprehensive Consistency Tests"
[ [Char] -> IO () -> TestTree
testCase
([Char] -> TestConfiguration -> [Char]
generateTestLabel [Char]
"Light" TestConfiguration
lightTestConfig)
(EventStoreTestRunner backend -> TestConfiguration -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
EventStoreTestRunner backend -> TestConfiguration -> IO ()
testComprehensiveConsistencyWithConfig EventStoreTestRunner backend
runner TestConfiguration
lightTestConfig)
, [Char] -> IO () -> TestTree
testCase
([Char] -> TestConfiguration -> [Char]
generateTestLabel [Char]
"Standard" TestConfiguration
defaultTestConfig)
(EventStoreTestRunner backend -> TestConfiguration -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
Show (Cursor backend)) =>
EventStoreTestRunner backend -> TestConfiguration -> IO ()
testComprehensiveConsistencyWithConfig EventStoreTestRunner backend
runner TestConfiguration
defaultTestConfig)
]
]