{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE Strict #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}

{- | Backend-agnostic comprehensive subscription ordering tests

Tests that all subscriptions (regardless of when they start) see events
in the same total order. Uses hash chain verification to detect any
ordering inconsistencies across concurrent subscriptions.
-}
module Test.Hindsight.Store.OrderingTests (orderingTests) where

import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.Async (async, forConcurrently, forConcurrently_, wait)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM (TVar, atomically, modifyTVar, newTVarIO, readTVar, writeTVar)
import Control.Monad (forM, forM_, replicateM, void, when)
import Control.Monad.IO.Class (liftIO)
import Data.Aeson (FromJSON, ToJSON)
import Data.IORef (IORef, atomicModifyIORef', modifyIORef', newIORef, readIORef)
import Data.Int (Int64)
import Data.Map.Strict qualified as Map
import Data.Proxy (Proxy (..))
import Data.Set qualified as Set
import Data.Text (Text, isPrefixOf, pack, unpack)
import Data.Text qualified as Text
import Data.Time (UTCTime, diffUTCTime, getCurrentTime)
import Data.UUID.V4 qualified as UUID
import GHC.Generics (Generic)
import Hindsight.Events
import Hindsight.Store
import System.IO (hFlush, stdout)
import System.Random (randomRIO)
import System.Timeout (timeout)
import Test.Hindsight.Store.TestRunner (EventStoreTestRunner (..))
import Test.Tasty
import Test.Tasty.HUnit

-- | Comprehensive test event type
type ComprehensiveTestEvent = "comprehensive_test_event"

data ComprehensiveTestPayload = ComprehensiveTestPayload
    { ComprehensiveTestPayload -> Int
compValue :: Int
    , ComprehensiveTestPayload -> Text
text :: Text
    , ComprehensiveTestPayload -> [Char]
bytes :: String -- Changed from ByteString for JSON compatibility
    , ComprehensiveTestPayload -> UTCTime
timestamp :: UTCTime
    }
    deriving (Int -> ComprehensiveTestPayload -> ShowS
[ComprehensiveTestPayload] -> ShowS
ComprehensiveTestPayload -> [Char]
(Int -> ComprehensiveTestPayload -> ShowS)
-> (ComprehensiveTestPayload -> [Char])
-> ([ComprehensiveTestPayload] -> ShowS)
-> Show ComprehensiveTestPayload
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ComprehensiveTestPayload -> ShowS
showsPrec :: Int -> ComprehensiveTestPayload -> ShowS
$cshow :: ComprehensiveTestPayload -> [Char]
show :: ComprehensiveTestPayload -> [Char]
$cshowList :: [ComprehensiveTestPayload] -> ShowS
showList :: [ComprehensiveTestPayload] -> ShowS
Show, ComprehensiveTestPayload -> ComprehensiveTestPayload -> Bool
(ComprehensiveTestPayload -> ComprehensiveTestPayload -> Bool)
-> (ComprehensiveTestPayload -> ComprehensiveTestPayload -> Bool)
-> Eq ComprehensiveTestPayload
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ComprehensiveTestPayload -> ComprehensiveTestPayload -> Bool
== :: ComprehensiveTestPayload -> ComprehensiveTestPayload -> Bool
$c/= :: ComprehensiveTestPayload -> ComprehensiveTestPayload -> Bool
/= :: ComprehensiveTestPayload -> ComprehensiveTestPayload -> Bool
Eq, (forall x.
 ComprehensiveTestPayload -> Rep ComprehensiveTestPayload x)
-> (forall x.
    Rep ComprehensiveTestPayload x -> ComprehensiveTestPayload)
-> Generic ComprehensiveTestPayload
forall x.
Rep ComprehensiveTestPayload x -> ComprehensiveTestPayload
forall x.
ComprehensiveTestPayload -> Rep ComprehensiveTestPayload x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x.
ComprehensiveTestPayload -> Rep ComprehensiveTestPayload x
from :: forall x.
ComprehensiveTestPayload -> Rep ComprehensiveTestPayload x
$cto :: forall x.
Rep ComprehensiveTestPayload x -> ComprehensiveTestPayload
to :: forall x.
Rep ComprehensiveTestPayload x -> ComprehensiveTestPayload
Generic, Maybe ComprehensiveTestPayload
Value -> Parser [ComprehensiveTestPayload]
Value -> Parser ComprehensiveTestPayload
(Value -> Parser ComprehensiveTestPayload)
-> (Value -> Parser [ComprehensiveTestPayload])
-> Maybe ComprehensiveTestPayload
-> FromJSON ComprehensiveTestPayload
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser ComprehensiveTestPayload
parseJSON :: Value -> Parser ComprehensiveTestPayload
$cparseJSONList :: Value -> Parser [ComprehensiveTestPayload]
parseJSONList :: Value -> Parser [ComprehensiveTestPayload]
$comittedField :: Maybe ComprehensiveTestPayload
omittedField :: Maybe ComprehensiveTestPayload
FromJSON, [ComprehensiveTestPayload] -> Value
[ComprehensiveTestPayload] -> Encoding
ComprehensiveTestPayload -> Bool
ComprehensiveTestPayload -> Value
ComprehensiveTestPayload -> Encoding
(ComprehensiveTestPayload -> Value)
-> (ComprehensiveTestPayload -> Encoding)
-> ([ComprehensiveTestPayload] -> Value)
-> ([ComprehensiveTestPayload] -> Encoding)
-> (ComprehensiveTestPayload -> Bool)
-> ToJSON ComprehensiveTestPayload
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: ComprehensiveTestPayload -> Value
toJSON :: ComprehensiveTestPayload -> Value
$ctoEncoding :: ComprehensiveTestPayload -> Encoding
toEncoding :: ComprehensiveTestPayload -> Encoding
$ctoJSONList :: [ComprehensiveTestPayload] -> Value
toJSONList :: [ComprehensiveTestPayload] -> Value
$ctoEncodingList :: [ComprehensiveTestPayload] -> Encoding
toEncodingList :: [ComprehensiveTestPayload] -> Encoding
$comitField :: ComprehensiveTestPayload -> Bool
omitField :: ComprehensiveTestPayload -> Bool
ToJSON)

type instance MaxVersion ComprehensiveTestEvent = 0
type instance Versions ComprehensiveTestEvent = '[ComprehensiveTestPayload]
instance Hindsight.Events.Event ComprehensiveTestEvent

instance MigrateVersion 0 ComprehensiveTestEvent

-- | Completion sentinel event to signal end of test
type CompletionEvent = "test_completion_event"

data CompletionPayload = CompletionPayload
    { CompletionPayload -> Text
completionId :: Text
    , CompletionPayload -> Int
completionTotalTransactions :: Int
    , CompletionPayload -> Int
totalExpectedEvents :: Int
    , CompletionPayload -> UTCTime
completedAt :: UTCTime
    }
    deriving (Int -> CompletionPayload -> ShowS
[CompletionPayload] -> ShowS
CompletionPayload -> [Char]
(Int -> CompletionPayload -> ShowS)
-> (CompletionPayload -> [Char])
-> ([CompletionPayload] -> ShowS)
-> Show CompletionPayload
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> CompletionPayload -> ShowS
showsPrec :: Int -> CompletionPayload -> ShowS
$cshow :: CompletionPayload -> [Char]
show :: CompletionPayload -> [Char]
$cshowList :: [CompletionPayload] -> ShowS
showList :: [CompletionPayload] -> ShowS
Show, CompletionPayload -> CompletionPayload -> Bool
(CompletionPayload -> CompletionPayload -> Bool)
-> (CompletionPayload -> CompletionPayload -> Bool)
-> Eq CompletionPayload
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: CompletionPayload -> CompletionPayload -> Bool
== :: CompletionPayload -> CompletionPayload -> Bool
$c/= :: CompletionPayload -> CompletionPayload -> Bool
/= :: CompletionPayload -> CompletionPayload -> Bool
Eq, (forall x. CompletionPayload -> Rep CompletionPayload x)
-> (forall x. Rep CompletionPayload x -> CompletionPayload)
-> Generic CompletionPayload
forall x. Rep CompletionPayload x -> CompletionPayload
forall x. CompletionPayload -> Rep CompletionPayload x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. CompletionPayload -> Rep CompletionPayload x
from :: forall x. CompletionPayload -> Rep CompletionPayload x
$cto :: forall x. Rep CompletionPayload x -> CompletionPayload
to :: forall x. Rep CompletionPayload x -> CompletionPayload
Generic, Maybe CompletionPayload
Value -> Parser [CompletionPayload]
Value -> Parser CompletionPayload
(Value -> Parser CompletionPayload)
-> (Value -> Parser [CompletionPayload])
-> Maybe CompletionPayload
-> FromJSON CompletionPayload
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser CompletionPayload
parseJSON :: Value -> Parser CompletionPayload
$cparseJSONList :: Value -> Parser [CompletionPayload]
parseJSONList :: Value -> Parser [CompletionPayload]
$comittedField :: Maybe CompletionPayload
omittedField :: Maybe CompletionPayload
FromJSON, [CompletionPayload] -> Value
[CompletionPayload] -> Encoding
CompletionPayload -> Bool
CompletionPayload -> Value
CompletionPayload -> Encoding
(CompletionPayload -> Value)
-> (CompletionPayload -> Encoding)
-> ([CompletionPayload] -> Value)
-> ([CompletionPayload] -> Encoding)
-> (CompletionPayload -> Bool)
-> ToJSON CompletionPayload
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: CompletionPayload -> Value
toJSON :: CompletionPayload -> Value
$ctoEncoding :: CompletionPayload -> Encoding
toEncoding :: CompletionPayload -> Encoding
$ctoJSONList :: [CompletionPayload] -> Value
toJSONList :: [CompletionPayload] -> Value
$ctoEncodingList :: [CompletionPayload] -> Encoding
toEncodingList :: [CompletionPayload] -> Encoding
$comitField :: CompletionPayload -> Bool
omitField :: CompletionPayload -> Bool
ToJSON)

type instance MaxVersion CompletionEvent = 0
type instance Versions CompletionEvent = '[CompletionPayload]
instance Hindsight.Events.Event CompletionEvent

instance MigrateVersion 0 CompletionEvent

{- | Subscription tracking data with optimized hash storage
Now generic over position type
-}
data SubscriptionState position = SubscriptionState
    { forall position. SubscriptionState position -> Text
subscriptionId :: Text
    , forall position. SubscriptionState position -> UTCTime
startedAt :: UTCTime
    , forall position. SubscriptionState position -> Int64
currentHash :: !Int64 -- Hash chain (optimized: Int64)
    , forall position. SubscriptionState position -> Int
subEventCount :: Int
    , forall position. SubscriptionState position -> Maybe UTCTime
firstEventTime :: Maybe UTCTime
    , forall position. SubscriptionState position -> Maybe UTCTime
lastEventTime :: Maybe UTCTime
    , forall position. SubscriptionState position -> Bool
completionReceived :: Bool
    , forall position. SubscriptionState position -> Maybe UTCTime
completionReceivedAt :: Maybe UTCTime
    , forall position. SubscriptionState position -> Maybe Int
expectedTotalEvents :: Maybe Int
    , forall position. SubscriptionState position -> MVar ()
completionMVar :: MVar () -- Signal for completion
    , forall position. SubscriptionState position -> Map StreamId Int
streamIdHashes :: !(Map.Map StreamId Int) -- Memoized UUID hashes
    , forall position. SubscriptionState position -> Map Text Int
textHashes :: !(Map.Map Text Int) -- Memoized text hashes
    , forall position. SubscriptionState position -> [position]
positionsSeen :: ![position] -- Track positions for debugging
    } -- Cannot derive Show/Eq due to MVar

-- | Transaction plan for test execution
data TransactionPlan = TransactionPlan
    { TransactionPlan -> Int
planId :: Int
    , TransactionPlan -> Int
planEventCount :: Int -- Number of events to insert
    , TransactionPlan -> [StreamId]
targetStreams :: [StreamId] -- Streams to insert into
    , TransactionPlan -> ComprehensiveTestPayload
basePayload :: ComprehensiveTestPayload -- Base payload to vary
    , TransactionPlan -> Int
delayBeforeStart :: Int -- Microseconds delay before starting
    , TransactionPlan -> Int
expectedSlowness :: Int -- Expected processing time factor
    }
    deriving (Int -> TransactionPlan -> ShowS
[TransactionPlan] -> ShowS
TransactionPlan -> [Char]
(Int -> TransactionPlan -> ShowS)
-> (TransactionPlan -> [Char])
-> ([TransactionPlan] -> ShowS)
-> Show TransactionPlan
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TransactionPlan -> ShowS
showsPrec :: Int -> TransactionPlan -> ShowS
$cshow :: TransactionPlan -> [Char]
show :: TransactionPlan -> [Char]
$cshowList :: [TransactionPlan] -> ShowS
showList :: [TransactionPlan] -> ShowS
Show)

-- | Final result from a subscription (renamed to avoid conflict)
data TestSubscriptionResult position = TestSubscriptionResult
    { forall position. TestSubscriptionResult position -> Text
resultSubscriptionId :: Text
    , forall position. TestSubscriptionResult position -> UTCTime
resultStartedAt :: UTCTime
    , forall position. TestSubscriptionResult position -> Int64
resultFinalHash :: !Int64 -- Optimized: Int64
    , forall position. TestSubscriptionResult position -> Int
resultEventCount :: Int
    , forall position. TestSubscriptionResult position -> Maybe UTCTime
resultFirstEventTime :: Maybe UTCTime
    , forall position. TestSubscriptionResult position -> Maybe UTCTime
resultLastEventTime :: Maybe UTCTime
    , forall position. TestSubscriptionResult position -> UTCTime
resultCompletedAt :: UTCTime
    , forall position. TestSubscriptionResult position -> Bool
resultCompletionReceived :: Bool
    , forall position. TestSubscriptionResult position -> Maybe UTCTime
resultCompletionReceivedAt :: Maybe UTCTime
    , forall position. TestSubscriptionResult position -> Maybe Int
resultExpectedTotalEvents :: Maybe Int
    , forall position. TestSubscriptionResult position -> Int64
resultProcessingDurationMs :: Int64
    , forall position. TestSubscriptionResult position -> [position]
resultPositionsSeen :: ![position] -- For debugging
    }

-- Manual Show instance for debugging (position list can be very long)
instance forall position. (Show position) => Show (TestSubscriptionResult position) where
    show :: TestSubscriptionResult position -> [Char]
show TestSubscriptionResult position
r =
        [Char]
"TestSubscriptionResult{"
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"subId="
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Text -> [Char]
unpack TestSubscriptionResult position
r.resultSubscriptionId
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
", events="
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show TestSubscriptionResult position
r.resultEventCount
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
", hash="
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int64 -> [Char]
forall a. Show a => a -> [Char]
show TestSubscriptionResult position
r.resultFinalHash
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
", completed="
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Bool -> [Char]
forall a. Show a => a -> [Char]
show TestSubscriptionResult position
r.resultCompletionReceived
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
", positions="
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ([position] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length TestSubscriptionResult position
r.resultPositionsSeen)
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" items"
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"}"

-- | Configuration for comprehensive consistency test
data TestConfiguration = TestConfiguration
    { TestConfiguration -> Int
configNumTransactions :: Int -- Number of concurrent transactions (default: 40)
    , TestConfiguration -> Int
configMinEventsPerTx :: Int -- Minimum events per transaction (default: 1)
    , TestConfiguration -> Int
configMaxEventsPerTx :: Int -- Maximum events per transaction (default: 100)
    , TestConfiguration -> Int
configMinStreamsPerTx :: Int -- Minimum streams per transaction (default: 1)
    , TestConfiguration -> Int
configMaxStreamsPerTx :: Int -- Maximum streams per transaction (default: 3)
    , TestConfiguration -> Int
configTxExecutionWindowMs :: Int -- Total window for all tx execution in ms (default: 5000)
    , TestConfiguration -> Int
configNumEarlySubscriptions :: Int -- Early subscriptions count (default: 5)
    , TestConfiguration -> Int
configNumConcurrentSubscriptions :: Int -- Concurrent subscriptions count (default: 10)
    , TestConfiguration -> Int
configNumValidationSubscriptions :: Int -- Validation subscriptions count (default: 5)
    , TestConfiguration -> Int
configSubExecutionWindowMs :: Int -- Total window for subscription starts in ms (default: 7000)
    , TestConfiguration -> Int
configProcessingTimeMs :: Int -- Wait time for event processing in ms (default: 3000)
    , TestConfiguration -> Int
configMaxSlownessFactor :: Int -- Max artificial slowness multiplier (default: 10)
    }
    deriving (Int -> TestConfiguration -> ShowS
[TestConfiguration] -> ShowS
TestConfiguration -> [Char]
(Int -> TestConfiguration -> ShowS)
-> (TestConfiguration -> [Char])
-> ([TestConfiguration] -> ShowS)
-> Show TestConfiguration
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TestConfiguration -> ShowS
showsPrec :: Int -> TestConfiguration -> ShowS
$cshow :: TestConfiguration -> [Char]
show :: TestConfiguration -> [Char]
$cshowList :: [TestConfiguration] -> ShowS
showList :: [TestConfiguration] -> ShowS
Show, TestConfiguration -> TestConfiguration -> Bool
(TestConfiguration -> TestConfiguration -> Bool)
-> (TestConfiguration -> TestConfiguration -> Bool)
-> Eq TestConfiguration
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TestConfiguration -> TestConfiguration -> Bool
== :: TestConfiguration -> TestConfiguration -> Bool
$c/= :: TestConfiguration -> TestConfiguration -> Bool
/= :: TestConfiguration -> TestConfiguration -> Bool
Eq)

-- | Default test configuration for standard testing
defaultTestConfig :: TestConfiguration
defaultTestConfig :: TestConfiguration
defaultTestConfig =
    TestConfiguration
        { configNumTransactions :: Int
configNumTransactions = Int
10
        , configMinEventsPerTx :: Int
configMinEventsPerTx = Int
10
        , configMaxEventsPerTx :: Int
configMaxEventsPerTx = Int
10
        , configMinStreamsPerTx :: Int
configMinStreamsPerTx = Int
1
        , configMaxStreamsPerTx :: Int
configMaxStreamsPerTx = Int
5
        , configTxExecutionWindowMs :: Int
configTxExecutionWindowMs = Int
20000
        , configNumEarlySubscriptions :: Int
configNumEarlySubscriptions = Int
2
        , configNumConcurrentSubscriptions :: Int
configNumConcurrentSubscriptions = Int
10
        , configNumValidationSubscriptions :: Int
configNumValidationSubscriptions = Int
2
        , configSubExecutionWindowMs :: Int
configSubExecutionWindowMs = Int
7000
        , configProcessingTimeMs :: Int
configProcessingTimeMs = Int
3000
        , configMaxSlownessFactor :: Int
configMaxSlownessFactor = Int
10
        }

-- | Light test configuration for quick testing
lightTestConfig :: TestConfiguration
lightTestConfig :: TestConfiguration
lightTestConfig =
    TestConfiguration
defaultTestConfig
        { configNumTransactions = 5
        , configMaxEventsPerTx = 10
        , configTxExecutionWindowMs = 10000
        , configNumEarlySubscriptions = 1
        , configNumConcurrentSubscriptions = 3
        , configNumValidationSubscriptions = 1
        , configSubExecutionWindowMs = 3000
        , configProcessingTimeMs = 1000
        }

-- | Stress test configuration for heavy load testing
stressTestConfig :: TestConfiguration
stressTestConfig :: TestConfiguration
stressTestConfig =
    TestConfiguration
defaultTestConfig
        { configNumTransactions = 100 -- 10x more transactions
        , configMaxEventsPerTx = 10 -- Keep moderate event count
        , configTxExecutionWindowMs = 6000 -- 6 seconds for all transactions
        , configNumEarlySubscriptions = 1000 -- Massive subscription load
        , configNumConcurrentSubscriptions = 1000
        , configNumValidationSubscriptions = 1000
        , configSubExecutionWindowMs = 12000 -- 12 seconds for subscription starts
        , configProcessingTimeMs = 20000 -- Longer processing time
        , configMaxSlownessFactor = 20 -- More variation in slowness
        }

-- | Debug test configuration for race condition analysis
debugTestConfig :: TestConfiguration
debugTestConfig :: TestConfiguration
debugTestConfig =
    TestConfiguration
defaultTestConfig
        { configNumTransactions = 100 -- Moderate number of transactions
        , configMinEventsPerTx = 10 -- HIGH events per tx to trigger full batches
        , configMaxEventsPerTx = 15 -- Range to create burst conditions
        , configTxExecutionWindowMs = 1000 -- FAST execution to create bursts
        , configNumEarlySubscriptions = 1 -- Minimal subscriptions
        , configNumConcurrentSubscriptions = 3
        , configNumValidationSubscriptions = 1
        , configSubExecutionWindowMs = 500 -- Fast subscription starts
        , configProcessingTimeMs = 100 -- Very short processing time
        }

-- | Progress tracking for coordinated reporting
data ProgressState = ProgressState
    { ProgressState -> Int
transactionsCompleted :: Int
    , ProgressState -> Int
transactionsFailed :: Int
    , ProgressState -> Int
totalTransactions :: Int
    , ProgressState -> Int
subscriptionsStarted :: Int
    , ProgressState -> Int
subscriptionsCompleted :: Int
    , ProgressState -> Int
totalSubscriptions :: Int
    , ProgressState -> UTCTime
testStartTime :: UTCTime
    , ProgressState -> UTCTime
lastUpdateTime :: UTCTime
    , ProgressState -> Text
currentPhase :: Text
    , ProgressState -> Text
detailMessage :: Text
    }
    deriving (Int -> ProgressState -> ShowS
[ProgressState] -> ShowS
ProgressState -> [Char]
(Int -> ProgressState -> ShowS)
-> (ProgressState -> [Char])
-> ([ProgressState] -> ShowS)
-> Show ProgressState
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ProgressState -> ShowS
showsPrec :: Int -> ProgressState -> ShowS
$cshow :: ProgressState -> [Char]
show :: ProgressState -> [Char]
$cshowList :: [ProgressState] -> ShowS
showList :: [ProgressState] -> ShowS
Show)

-- | Progress manager with thread-safe updates
data ProgressManager = ProgressManager
    { ProgressManager -> TVar ProgressState
progressState :: TVar ProgressState
    , ProgressManager -> TVar Bool
shouldStop :: TVar Bool
    , ProgressManager -> Int
updateIntervalMs :: Int
    }

-- | Create a new progress manager
newProgressManager :: TestConfiguration -> UTCTime -> IO ProgressManager
newProgressManager :: TestConfiguration -> UTCTime -> IO ProgressManager
newProgressManager TestConfiguration
config UTCTime
startTime = do
    let totalSubs :: Int
totalSubs =
            TestConfiguration
config.configNumEarlySubscriptions
                Int -> Int -> Int
forall a. Num a => a -> a -> a
+ TestConfiguration
config.configNumConcurrentSubscriptions
                Int -> Int -> Int
forall a. Num a => a -> a -> a
+ TestConfiguration
config.configNumValidationSubscriptions

    initialState <-
        ProgressState -> IO (TVar ProgressState)
forall a. a -> IO (TVar a)
newTVarIO (ProgressState -> IO (TVar ProgressState))
-> ProgressState -> IO (TVar ProgressState)
forall a b. (a -> b) -> a -> b
$
            ProgressState
                { transactionsCompleted :: Int
transactionsCompleted = Int
0
                , transactionsFailed :: Int
transactionsFailed = Int
0
                , totalTransactions :: Int
totalTransactions = TestConfiguration
config.configNumTransactions
                , subscriptionsStarted :: Int
subscriptionsStarted = Int
0
                , subscriptionsCompleted :: Int
subscriptionsCompleted = Int
0
                , totalSubscriptions :: Int
totalSubscriptions = Int
totalSubs
                , testStartTime :: UTCTime
testStartTime = UTCTime
startTime
                , lastUpdateTime :: UTCTime
lastUpdateTime = UTCTime
startTime
                , currentPhase :: Text
currentPhase = Text
"Initializing"
                , detailMessage :: Text
detailMessage = Text
""
                }

    shouldStop <- newTVarIO False

    let updateInterval = case TestConfiguration
config.configNumTransactions of
            Int
n | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
50 -> Int
500 -- 500ms for light tests
            Int
n | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
500 -> Int
1000 -- 1s for normal tests
            Int
_ -> Int
2000 -- 2s for stress tests
    pure $ ProgressManager initialState shouldStop updateInterval

-- | Update transaction progress
reportTransactionCompleted :: ProgressManager -> Bool -> IO ()
reportTransactionCompleted :: ProgressManager -> Bool -> IO ()
reportTransactionCompleted ProgressManager
pm Bool
success = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    TVar ProgressState -> (ProgressState -> ProgressState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar ProgressManager
pm.progressState ((ProgressState -> ProgressState) -> STM ())
-> (ProgressState -> ProgressState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ProgressState
s ->
        if Bool
success
            then ProgressState
s{transactionsCompleted = s.transactionsCompleted + 1}
            else ProgressState
s{transactionsFailed = s.transactionsFailed + 1}

-- | Update subscription progress
reportSubscriptionStarted :: ProgressManager -> IO ()
reportSubscriptionStarted :: ProgressManager -> IO ()
reportSubscriptionStarted ProgressManager
pm = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    TVar ProgressState -> (ProgressState -> ProgressState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar ProgressManager
pm.progressState ((ProgressState -> ProgressState) -> STM ())
-> (ProgressState -> ProgressState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ProgressState
s ->
        ProgressState
s{subscriptionsStarted = s.subscriptionsStarted + 1}

reportSubscriptionCompleted :: ProgressManager -> IO ()
reportSubscriptionCompleted :: ProgressManager -> IO ()
reportSubscriptionCompleted ProgressManager
pm = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    TVar ProgressState -> (ProgressState -> ProgressState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar ProgressManager
pm.progressState ((ProgressState -> ProgressState) -> STM ())
-> (ProgressState -> ProgressState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ProgressState
s ->
        ProgressState
s{subscriptionsCompleted = s.subscriptionsCompleted + 1}

reportPhaseChangeIO :: ProgressManager -> Text -> Text -> IO ()
reportPhaseChangeIO :: ProgressManager -> Text -> Text -> IO ()
reportPhaseChangeIO ProgressManager
pm Text
phase Text
detail = do
    now <- IO UTCTime
getCurrentTime
    atomically $ do
        modifyTVar pm.progressState $ \ProgressState
s ->
            ProgressState
s{currentPhase = phase, detailMessage = detail, lastUpdateTime = now}

-- | Format progress display
formatProgress :: ProgressState -> String
formatProgress :: ProgressState -> [Char]
formatProgress ProgressState
s =
    let elapsed :: NominalDiffTime
elapsed = UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime ProgressState
s.lastUpdateTime ProgressState
s.testStartTime
        elapsedSecs :: Int
elapsedSecs = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round NominalDiffTime
elapsed :: Int

        txProgress :: Int
txProgress = ProgressState
s.transactionsCompleted Int -> Int -> Int
forall a. Num a => a -> a -> a
+ ProgressState
s.transactionsFailed
        txPercent :: Int
txPercent =
            if ProgressState
s.totalTransactions Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                then (Int
txProgress Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
100) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` ProgressState
s.totalTransactions
                else Int
0

        subPercent :: Int
subPercent =
            if ProgressState
s.totalSubscriptions Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                then (ProgressState
s.subscriptionsCompleted Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
100) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` ProgressState
s.totalSubscriptions
                else Int
0

        overallPercent :: Int
overallPercent = (Int
txPercent Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
70 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
subPercent Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
30) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
100

        etaStr :: [Char]
etaStr =
            if Int
txProgress Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 Bool -> Bool -> Bool
&& ProgressState
s.transactionsCompleted Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< ProgressState
s.totalTransactions
                then
                    let rate :: Double
rate = Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
txProgress Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ NominalDiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac NominalDiffTime
elapsed :: Double
                        remaining :: Double
remaining = Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ProgressState
s.totalTransactions Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
txProgress) :: Double
                        etaSecs :: Int
etaSecs = Double -> Int
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
remaining Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
rate) :: Int
                     in [Char]
" | ETA: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
etaSecs [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"s"
                else [Char]
""

        txStatus :: [Char]
txStatus =
            Int -> [Char]
forall a. Show a => a -> [Char]
show Int
txProgress
                [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"/"
                [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ProgressState
s.totalTransactions
                [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" ("
                [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
txPercent
                [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"%)"
                [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ (if ProgressState
s.transactionsFailed Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 then [Char]
" [" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ProgressState
s.transactionsFailed [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" failed]" else [Char]
"")

        subStatus :: [Char]
subStatus =
            Int -> [Char]
forall a. Show a => a -> [Char]
show ProgressState
s.subscriptionsCompleted
                [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"/"
                [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ProgressState
s.totalSubscriptions
                [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" ("
                [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
subPercent
                [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"%)"
                [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" ["
                [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ProgressState
s.subscriptionsStarted
                [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" started]"
     in [Char]
"["
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
elapsedSecs
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"s] "
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Text -> [Char]
unpack ProgressState
s.currentPhase
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" | Tx: "
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
txStatus
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" | Subs: "
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
subStatus
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" | Overall: "
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
overallPercent
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"%"
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
etaStr
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ (if Bool -> Bool
not (Text -> Bool
Text.null ProgressState
s.detailMessage) then [Char]
" | " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Text -> [Char]
unpack ProgressState
s.detailMessage else [Char]
"")

-- | Start progress reporting thread
startProgressReporting :: ProgressManager -> IO ()
startProgressReporting :: ProgressManager -> IO ()
startProgressReporting ProgressManager
pm = do
    IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO ()
progressLoop
  where
    progressLoop :: IO ()
progressLoop = do
        shouldStop <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar ProgressManager
pm.shouldStop
        if shouldStop
            then do
                state <- atomically $ readTVar pm.progressState
                putStrLn $ "\n" ++ formatProgress state
                putStrLn "Progress reporting stopped."
            else do
                state <- atomically $ readTVar pm.progressState
                putStr $ "\r" ++ formatProgress state
                hFlush stdout
                threadDelay (pm.updateIntervalMs * 1000)
                progressLoop

-- | Stop progress reporting
stopProgressReporting :: ProgressManager -> IO ()
stopProgressReporting :: ProgressManager -> IO ()
stopProgressReporting ProgressManager
pm = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar ProgressManager
pm.shouldStop Bool
True

{- | Generic hash chain utilities with memoization
Now uses Show position instead of PostgreSQL-specific txNo/seqNo
-}
updateHashChainGeneric ::
    forall position.
    (Show position) =>
    Int64 ->
    position -> -- Generic position type
    StreamId ->
    ComprehensiveTestPayload ->
    Map.Map StreamId Int ->
    Map.Map Text Int ->
    (Int64, Map.Map StreamId Int, Map.Map Text Int)
updateHashChainGeneric :: forall position.
Show position =>
Int64
-> position
-> StreamId
-> ComprehensiveTestPayload
-> Map StreamId Int
-> Map Text Int
-> (Int64, Map StreamId Int, Map Text Int)
updateHashChainGeneric Int64
prevHash position
position StreamId
streamId ComprehensiveTestPayload
payload Map StreamId Int
streamHashes Map Text Int
textHashes =
    let
        -- Look up or compute StreamId hash with memoization
        (Int
streamHashVal, Map StreamId Int
newStreamHashes) = case StreamId -> Map StreamId Int -> Maybe Int
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamId Map StreamId Int
streamHashes of
            Just Int
cached -> (Int
cached, Map StreamId Int
streamHashes)
            Maybe Int
Nothing ->
                let computed :: Int
computed = [Char] -> Int
fastTextHash (StreamId -> [Char]
forall a. Show a => a -> [Char]
show StreamId
streamId)
                 in (Int
computed, StreamId -> Int -> Map StreamId Int -> Map StreamId Int
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert StreamId
streamId Int
computed Map StreamId Int
streamHashes)

        -- Look up or compute payload text hash with memoization
        (Int
textHashVal, Map Text Int
newTextHashes) = case Text -> Map Text Int -> Maybe Int
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup ComprehensiveTestPayload
payload.text Map Text Int
textHashes of
            Just Int
cached -> (Int
cached, Map Text Int
textHashes)
            Maybe Int
Nothing ->
                let computed :: Int
computed = [Char] -> Int
fastTextHash (Text -> [Char]
unpack ComprehensiveTestPayload
payload.text)
                 in (Int
computed, Text -> Int -> Map Text Int -> Map Text Int
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert ComprehensiveTestPayload
payload.text Int
computed Map Text Int
textHashes)

        -- Hash the position generically using Show
        positionHash :: Int
positionHash = [Char] -> Int
fastTextHash (position -> [Char]
forall a. Show a => a -> [Char]
show position
position)

        -- Optimized polynomial rolling hash using Int64 arithmetic
        combined :: Int64
combined =
            (Int64
prevHash Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
31)
                Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
positionHash Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
37)
                Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int
forall a. Num a => a -> a
abs Int
streamHashVal) Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
43)
                Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral ComprehensiveTestPayload
payload.compValue Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
47)
                Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int
forall a. Num a => a -> a
abs Int
textHashVal) Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
53)
        newHash :: Int64
newHash = Int64
combined Int64 -> Int64 -> Int64
forall a. Integral a => a -> a -> a
`mod` Int64
982451653 -- Large prime for good distribution
     in
        (Int64
newHash, Map StreamId Int
newStreamHashes, Map Text Int
newTextHashes)
  where
    -- Fast hash function for strings - optimized for performance
    fastTextHash :: String -> Int
    fastTextHash :: [Char] -> Int
fastTextHash = (Int -> Char -> Int) -> Int -> [Char] -> Int
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (\Int
acc Char
c -> Int
acc Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
31 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Char -> Int
forall a. Enum a => a -> Int
fromEnum Char
c) Int
0

{- | Wait for subscription completion with timeout
Simpler than crash detection - just waits for completion MVar with 60s timeout
-}
waitForCompletion :: forall position. IORef (SubscriptionState position) -> IO ()
waitForCompletion :: forall position. IORef (SubscriptionState position) -> IO ()
waitForCompletion IORef (SubscriptionState position)
stateRef = do
    state <- IORef (SubscriptionState position)
-> IO (SubscriptionState position)
forall a. IORef a -> IO a
readIORef IORef (SubscriptionState position)
stateRef
    result <- timeout 60_000_000 $ takeMVar state.completionMVar -- 60 seconds
    case result of
        Maybe ()
Nothing -> [Char] -> IO ()
forall a. HasCallStack => [Char] -> IO a
assertFailure ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Subscription " [Char] -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> [Char]
unpack SubscriptionState position
state.subscriptionId [Char] -> ShowS
forall a. Semigroup a => a -> a -> a
<> [Char]
" timed out after 60 seconds"
        Just () -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-- | Generate random transaction plans based on configuration
generateTransactionPlans :: TestConfiguration -> IO [TransactionPlan]
generateTransactionPlans :: TestConfiguration -> IO [TransactionPlan]
generateTransactionPlans TestConfiguration
config = do
    baseTime <- IO UTCTime
getCurrentTime
    forM [1 .. config.configNumTransactions] $ \Int
planId -> do
        eventCount <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (TestConfiguration
config.configMinEventsPerTx, TestConfiguration
config.configMaxEventsPerTx)
        numStreams <- randomRIO (config.configMinStreamsPerTx, config.configMaxStreamsPerTx)
        targetStreams <- replicateM numStreams (StreamId <$> UUID.nextRandom)

        value <- randomRIO (1, 10000)
        textSuffix <- pack . show <$> randomRIO (1000 :: Int, 9999)
        let text = Text
"transaction-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
pack (Int -> [Char]
forall a. Show a => a -> [Char]
show Int
planId) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
textSuffix
        bytes <- show <$> randomRIO (100000 :: Int, 999999)

        delayBeforeStart <- randomRIO (0, config.configTxExecutionWindowMs * 1000)
        expectedSlowness <- randomRIO (1, config.configMaxSlownessFactor)

        pure $
            TransactionPlan
                { planId = planId
                , planEventCount = eventCount
                , targetStreams = targetStreams
                , basePayload =
                    ComprehensiveTestPayload
                        { compValue = value
                        , text = text
                        , bytes = bytes
                        , timestamp = baseTime
                        }
                , delayBeforeStart = delayBeforeStart
                , expectedSlowness = expectedSlowness
                }

{- | Create and start a subscription with hash tracking that stops on completion
Generic over backend type
-}
startHashTrackingSubscription ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) =>
    BackendHandle backend ->
    Text ->
    UTCTime ->
    Maybe ProgressManager ->
    IO (SubscriptionHandle backend, IORef (SubscriptionState (Cursor backend)))
startHashTrackingSubscription :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend
-> Text
-> UTCTime
-> Maybe ProgressManager
-> IO
     (SubscriptionHandle backend,
      IORef (SubscriptionState (Cursor backend)))
startHashTrackingSubscription BackendHandle backend
store Text
subId UTCTime
startTime Maybe ProgressManager
mProgressManager =
    BackendHandle backend
-> Text
-> UTCTime
-> Maybe ProgressManager
-> StartupPosition backend
-> IO
     (SubscriptionHandle backend,
      IORef (SubscriptionState (Cursor backend)))
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend
-> Text
-> UTCTime
-> Maybe ProgressManager
-> StartupPosition backend
-> IO
     (SubscriptionHandle backend,
      IORef (SubscriptionState (Cursor backend)))
startHashTrackingSubscriptionWithPosition BackendHandle backend
store Text
subId UTCTime
startTime Maybe ProgressManager
mProgressManager StartupPosition backend
forall backend. StartupPosition backend
FromBeginning

-- | Create and start a subscription with specific starting position
startHashTrackingSubscriptionWithPosition ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) =>
    BackendHandle backend ->
    Text ->
    UTCTime ->
    Maybe ProgressManager ->
    StartupPosition backend ->
    IO (SubscriptionHandle backend, IORef (SubscriptionState (Cursor backend)))
startHashTrackingSubscriptionWithPosition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend
-> Text
-> UTCTime
-> Maybe ProgressManager
-> StartupPosition backend
-> IO
     (SubscriptionHandle backend,
      IORef (SubscriptionState (Cursor backend)))
startHashTrackingSubscriptionWithPosition BackendHandle backend
store Text
subId UTCTime
startTime Maybe ProgressManager
mProgressManager StartupPosition backend
startPos = do
    completionMVar <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    stateRef <-
        newIORef $
            SubscriptionState
                { subscriptionId = subId
                , startedAt = startTime
                , currentHash = 0
                , subEventCount = 0
                , firstEventTime = Nothing
                , lastEventTime = Nothing
                , completionReceived = False
                , completionReceivedAt = Nothing
                , expectedTotalEvents = Nothing
                , completionMVar = completionMVar
                , streamIdHashes = Map.empty
                , textHashes = Map.empty
                , positionsSeen = []
                }

    let handler =
            ( forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @ComprehensiveTestEvent
            , \EventEnvelope ComprehensiveTestEvent backend
envelope -> do
                now <- IO UTCTime -> IO UTCTime
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
                liftIO $ atomicModifyIORef' stateRef $ \SubscriptionState (Cursor backend)
state ->
                    let position :: Cursor backend
position = EventEnvelope ComprehensiveTestEvent backend
envelope.position
                        (Int64
newHash, Map StreamId Int
newStreamHashes, Map Text Int
newTextHashes) =
                            Int64
-> Cursor backend
-> StreamId
-> ComprehensiveTestPayload
-> Map StreamId Int
-> Map Text Int
-> (Int64, Map StreamId Int, Map Text Int)
forall position.
Show position =>
Int64
-> position
-> StreamId
-> ComprehensiveTestPayload
-> Map StreamId Int
-> Map Text Int
-> (Int64, Map StreamId Int, Map Text Int)
updateHashChainGeneric
                                SubscriptionState (Cursor backend)
state.currentHash
                                Cursor backend
position
                                EventEnvelope ComprehensiveTestEvent backend
envelope.streamId
                                EventEnvelope ComprehensiveTestEvent backend
envelope.payload
                                SubscriptionState (Cursor backend)
state.streamIdHashes
                                SubscriptionState (Cursor backend)
state.textHashes
                        newState :: SubscriptionState (Cursor backend)
newState =
                            SubscriptionState (Cursor backend)
state
                                { currentHash = newHash
                                , streamIdHashes = newStreamHashes
                                , textHashes = newTextHashes
                                , subEventCount = state.subEventCount + 1
                                , firstEventTime = case state.firstEventTime of
                                    Maybe UTCTime
Nothing -> UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
now
                                    Just UTCTime
t -> UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
t
                                , lastEventTime = Just now
                                , positionsSeen = state.positionsSeen ++ [position]
                                }
                     in (SubscriptionState (Cursor backend)
newState, ())
                pure Continue
            )
                (Proxy ComprehensiveTestEvent,
 EventEnvelope ComprehensiveTestEvent backend
 -> IO SubscriptionResult)
-> EventMatcher '[CompletionEvent] backend IO
-> EventMatcher
     '[ComprehensiveTestEvent, CompletionEvent] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? ( forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @CompletionEvent
                   , \EventEnvelope CompletionEvent backend
envelope -> do
                        now <- IO UTCTime -> IO UTCTime
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
                        liftIO $ atomicModifyIORef' stateRef $ \SubscriptionState (Cursor backend)
state ->
                            let newState :: SubscriptionState (Cursor backend)
newState =
                                    SubscriptionState (Cursor backend)
state
                                        { completionReceived = True
                                        , completionReceivedAt = Just now
                                        , expectedTotalEvents = Just envelope.payload.totalExpectedEvents
                                        , lastEventTime = Just now
                                        }
                             in (SubscriptionState (Cursor backend)
newState, ())
                        liftIO $ putMVar completionMVar ()
                        case mProgressManager of
                            Just ProgressManager
pm -> IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ProgressManager -> IO ()
reportSubscriptionCompleted ProgressManager
pm
                            Maybe ProgressManager
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                        pure Stop
                   )
                (Proxy CompletionEvent,
 EventEnvelope CompletionEvent backend -> IO SubscriptionResult)
-> EventMatcher '[] backend IO
-> EventMatcher '[CompletionEvent] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventMatcher '[] backend IO
forall backend (m :: * -> *). EventMatcher '[] backend m
MatchEnd

    -- Use generic subscribe API
    subscriptionHandle <- subscribe store handler (EventSelector AllStreams startPos)

    case mProgressManager of
        Just ProgressManager
pm -> ProgressManager -> IO ()
reportSubscriptionStarted ProgressManager
pm
        Maybe ProgressManager
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

    pure (subscriptionHandle, stateRef)

-- | Execute a transaction plan
executeTransactionPlan ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) =>
    BackendHandle backend ->
    TransactionPlan ->
    Maybe ProgressManager ->
    IO (Either String ())
executeTransactionPlan :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend
-> TransactionPlan
-> Maybe ProgressManager
-> IO (Either [Char] ())
executeTransactionPlan BackendHandle backend
store TransactionPlan
plan Maybe ProgressManager
mProgressManager = do
    Int -> IO ()
threadDelay TransactionPlan
plan.delayBeforeStart

    let makeEvent :: Int -> SomeLatestEvent
makeEvent Int
i =
            let payload :: ComprehensiveTestPayload
payload =
                    TransactionPlan
plan.basePayload
                        { compValue = plan.basePayload.compValue + i
                        , text = plan.basePayload.text <> "-" <> pack (show i)
                        }
             in Proxy ComprehensiveTestEvent
-> CurrentPayloadType ComprehensiveTestEvent -> SomeLatestEvent
forall (event :: Symbol).
Event event =>
Proxy event -> CurrentPayloadType event -> SomeLatestEvent
SomeLatestEvent (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @ComprehensiveTestEvent) CurrentPayloadType ComprehensiveTestEvent
ComprehensiveTestPayload
payload

    let events :: [SomeLatestEvent]
events = (Int -> SomeLatestEvent) -> [Int] -> [SomeLatestEvent]
forall a b. (a -> b) -> [a] -> [b]
map Int -> SomeLatestEvent
makeEvent [Int
1 .. TransactionPlan
plan.planEventCount]

    let streamsWithEvents :: [(StreamId, SomeLatestEvent)]
streamsWithEvents = [StreamId] -> [SomeLatestEvent] -> [(StreamId, SomeLatestEvent)]
forall a b. [a] -> [b] -> [(a, b)]
zip ([StreamId] -> [StreamId]
forall a. HasCallStack => [a] -> [a]
cycle TransactionPlan
plan.targetStreams) [SomeLatestEvent]
events
        groupedEvents :: Map StreamId [SomeLatestEvent]
groupedEvents =
            ([SomeLatestEvent] -> [SomeLatestEvent] -> [SomeLatestEvent])
-> [(StreamId, [SomeLatestEvent])]
-> Map StreamId [SomeLatestEvent]
forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
Map.fromListWith
                [SomeLatestEvent] -> [SomeLatestEvent] -> [SomeLatestEvent]
forall a. [a] -> [a] -> [a]
(++)
                [(StreamId
streamId, [SomeLatestEvent
event]) | (StreamId
streamId, SomeLatestEvent
event) <- [(StreamId, SomeLatestEvent)]
streamsWithEvents]
        eventBatches :: Map StreamId (StreamWrite [] SomeLatestEvent backend)
eventBatches = ([SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend)
-> Map StreamId [SomeLatestEvent]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map (\[SomeLatestEvent]
es -> ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
NoStream [SomeLatestEvent]
es) Map StreamId [SomeLatestEvent]
groupedEvents

    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (TransactionPlan
plan.expectedSlowness Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
5) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
        Int -> IO ()
threadDelay (TransactionPlan
plan.expectedSlowness Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
50_000)

    let tryInsert :: Int -> IO (Either [Char] ())
tryInsert Int
n = do
            result <- BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction Map StreamId (StreamWrite [] SomeLatestEvent backend)
eventBatches)
            case result of
                FailedInsertion EventStoreError backend
err ->
                    if (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> (Int
0 :: Int))
                        then Int -> IO (Either [Char] ())
tryInsert (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- (Int
1 :: Int))
                        else do
                            case Maybe ProgressManager
mProgressManager of
                                Just ProgressManager
pm -> ProgressManager -> Bool -> IO ()
reportTransactionCompleted ProgressManager
pm Bool
False
                                Maybe ProgressManager
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                            Either [Char] () -> IO (Either [Char] ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either [Char] () -> IO (Either [Char] ()))
-> Either [Char] () -> IO (Either [Char] ())
forall a b. (a -> b) -> a -> b
$ [Char] -> Either [Char] ()
forall a b. a -> Either a b
Left ([Char] -> Either [Char] ()) -> [Char] -> Either [Char] ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Transaction " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show TransactionPlan
plan.planId [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" failed: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> [Char]
forall a. Show a => a -> [Char]
show EventStoreError backend
err
                SuccessfulInsertion InsertionSuccess backend
_ -> do
                    case Maybe ProgressManager
mProgressManager of
                        Just ProgressManager
pm -> ProgressManager -> Bool -> IO ()
reportTransactionCompleted ProgressManager
pm Bool
True
                        Maybe ProgressManager
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                    Either [Char] () -> IO (Either [Char] ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either [Char] () -> IO (Either [Char] ()))
-> Either [Char] () -> IO (Either [Char] ())
forall a b. (a -> b) -> a -> b
$ () -> Either [Char] ()
forall a b. b -> Either a b
Right ()

    Int -> IO (Either [Char] ())
tryInsert Int
10

{- | Comprehensive consistency test with configurable parameters
Generic over backend type
-}
testComprehensiveConsistencyWithConfig ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) =>
    EventStoreTestRunner backend ->
    TestConfiguration ->
    Assertion
testComprehensiveConsistencyWithConfig :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
EventStoreTestRunner backend -> TestConfiguration -> IO ()
testComprehensiveConsistencyWithConfig EventStoreTestRunner backend
runner TestConfiguration
config = do
    [Char] -> IO ()
putStrLn [Char]
"\n=== Starting Comprehensive Consistency Test ==="
    [Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$
        [Char]
"Config: "
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show TestConfiguration
config.configNumTransactions
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" transactions over "
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show TestConfiguration
config.configTxExecutionWindowMs
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"ms, "
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show (TestConfiguration
config.configNumEarlySubscriptions Int -> Int -> Int
forall a. Num a => a -> a -> a
+ TestConfiguration
config.configNumConcurrentSubscriptions Int -> Int -> Int
forall a. Num a => a -> a -> a
+ TestConfiguration
config.configNumValidationSubscriptions)
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" subscriptions over "
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show TestConfiguration
config.configSubExecutionWindowMs
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"ms"

    testStartTime <- IO UTCTime
getCurrentTime
    progressManager <- newProgressManager config testStartTime
    startProgressReporting progressManager

    withStore runner $ \BackendHandle backend
store -> do
        transactionPlans <- TestConfiguration -> IO [TransactionPlan]
generateTransactionPlans TestConfiguration
config
        let totalEvents = [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int] -> Int) -> [Int] -> Int
forall a b. (a -> b) -> a -> b
$ (TransactionPlan -> Int) -> [TransactionPlan] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (.planEventCount) [TransactionPlan]
transactionPlans
        reportPhaseChangeIO progressManager "Planning" $ "Generated " <> pack (show (length transactionPlans)) <> " transaction plans, " <> pack (show totalEvents) <> " total events"

        subscriptionsRef <- newIORef []

        reportPhaseChangeIO progressManager "Starting" "Launching concurrent transactions and subscriptions"

        transactionResults <- async $ do
            reportPhaseChangeIO progressManager "Transactions" $ "Executing " <> pack (show (length transactionPlans)) <> " transactions over " <> pack (show config.configTxExecutionWindowMs) <> "ms window"
            forConcurrently_ transactionPlans $ \TransactionPlan
plan -> do
                result <- BackendHandle backend
-> TransactionPlan
-> Maybe ProgressManager
-> IO (Either [Char] ())
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend
-> TransactionPlan
-> Maybe ProgressManager
-> IO (Either [Char] ())
executeTransactionPlan BackendHandle backend
store TransactionPlan
plan (ProgressManager -> Maybe ProgressManager
forall a. a -> Maybe a
Just ProgressManager
progressManager)
                case result of
                    Left [Char]
_err -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                    Right ()
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

        earlySubscriptionsAsync <- async $ do
            reportPhaseChangeIO progressManager "Early Subs" $ "Starting " <> pack (show config.configNumEarlySubscriptions) <> " early subscriptions"
            earlyStartTime <- getCurrentTime
            subs <- forConcurrently [1 .. config.configNumEarlySubscriptions] $ \Int
i -> do
                let subId :: Text
subId = Text
"early-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
pack (Int -> [Char]
forall a. Show a => a -> [Char]
show Int
i)
                (handle, stateRef) <- BackendHandle backend
-> Text
-> UTCTime
-> Maybe ProgressManager
-> IO
     (SubscriptionHandle backend,
      IORef (SubscriptionState (Cursor backend)))
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend
-> Text
-> UTCTime
-> Maybe ProgressManager
-> IO
     (SubscriptionHandle backend,
      IORef (SubscriptionState (Cursor backend)))
startHashTrackingSubscription BackendHandle backend
store Text
subId UTCTime
earlyStartTime (ProgressManager -> Maybe ProgressManager
forall a. a -> Maybe a
Just ProgressManager
progressManager)
                pure (handle, stateRef)
            mapM_ (modifyIORef' subscriptionsRef . (:)) subs
            pure subs

        concurrentSubscriptionsAsync <- async $ do
            reportPhaseChangeIO progressManager "Concurrent Subs" $ "Starting " <> pack (show config.configNumConcurrentSubscriptions) <> " concurrent subscriptions over " <> pack (show config.configSubExecutionWindowMs) <> "ms window"

            let startPos = StartupPosition backend
forall backend. StartupPosition backend
FromBeginning

            subs <- forConcurrently [1 .. config.configNumConcurrentSubscriptions] $ \Int
i -> do
                delay <- (Int, Int) -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
0, TestConfiguration
config.configSubExecutionWindowMs Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)
                threadDelay delay

                subStartTime <- getCurrentTime
                let subId = Text
"concurrent-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
pack (Int -> [Char]
forall a. Show a => a -> [Char]
show Int
i)
                (handle, stateRef) <- startHashTrackingSubscriptionWithPosition store subId subStartTime (Just progressManager) startPos
                pure (handle, stateRef)

            mapM_ (modifyIORef' subscriptionsRef . (:)) subs
            pure subs

        reportPhaseChangeIO progressManager "Waiting" "Waiting for concurrent activities to complete"
        wait transactionResults
        _ <- wait earlySubscriptionsAsync
        _ <- wait concurrentSubscriptionsAsync
        reportPhaseChangeIO progressManager "Completed" "All transactions and concurrent subscriptions completed"

        reportPhaseChangeIO progressManager "Validation Subs" $ "Starting " <> pack (show config.configNumValidationSubscriptions) <> " validation subscriptions"
        validationTime <- getCurrentTime
        validationSubscriptions <- forM [1 .. config.configNumValidationSubscriptions] $ \Int
i -> do
            let subId :: Text
subId = Text
"validation-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
pack (Int -> [Char]
forall a. Show a => a -> [Char]
show Int
i)
            (handle, stateRef) <- BackendHandle backend
-> Text
-> UTCTime
-> Maybe ProgressManager
-> IO
     (SubscriptionHandle backend,
      IORef (SubscriptionState (Cursor backend)))
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend
-> Text
-> UTCTime
-> Maybe ProgressManager
-> IO
     (SubscriptionHandle backend,
      IORef (SubscriptionState (Cursor backend)))
startHashTrackingSubscription BackendHandle backend
store Text
subId UTCTime
validationTime (ProgressManager -> Maybe ProgressManager
forall a. a -> Maybe a
Just ProgressManager
progressManager)
            pure (handle, stateRef)

        mapM_ (modifyIORef' subscriptionsRef . (:)) validationSubscriptions

        reportPhaseChangeIO progressManager "Activating" "Waiting for subscriptions to activate"
        threadDelay 500_000

        reportPhaseChangeIO progressManager "Completion" "Sending completion event"
        completionTime <- getCurrentTime
        completionStream <- StreamId <$> UUID.nextRandom
        let completionPayload =
                CompletionPayload
                    { completionId :: Text
completionId = Text
"test-completion-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
pack (Int -> [Char]
forall a. Show a => a -> [Char]
show (Int -> [Char]) -> Int -> [Char]
forall a b. (a -> b) -> a -> b
$ [TransactionPlan] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TransactionPlan]
transactionPlans)
                    , completionTotalTransactions :: Int
completionTotalTransactions = [TransactionPlan] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TransactionPlan]
transactionPlans
                    , totalExpectedEvents :: Int
totalExpectedEvents = Int
totalEvents
                    , completedAt :: UTCTime
completedAt = UTCTime
completionTime
                    }

        void $
            insertEvents store Nothing $
                Transaction
                    ( Map.singleton completionStream $
                        StreamWrite
                            NoStream
                            [SomeLatestEvent (Proxy @CompletionEvent) completionPayload]
                    )

        reportPhaseChangeIO progressManager "Waiting Complete" "Waiting for all subscriptions to receive completion event"

        allSubscriptions <- readIORef subscriptionsRef
        reportPhaseChangeIO progressManager "Waiting Complete" $ "Waiting for " <> pack (show (length allSubscriptions)) <> " subscriptions to complete"

        let completionMVars = ((SubscriptionHandle backend,
  IORef (SubscriptionState (Cursor backend)))
 -> IORef (SubscriptionState (Cursor backend)))
-> [(SubscriptionHandle backend,
     IORef (SubscriptionState (Cursor backend)))]
-> [IORef (SubscriptionState (Cursor backend))]
forall a b. (a -> b) -> [a] -> [b]
map (\(SubscriptionHandle backend
_, IORef (SubscriptionState (Cursor backend))
stateRef) -> IORef (SubscriptionState (Cursor backend))
stateRef) [(SubscriptionHandle backend,
  IORef (SubscriptionState (Cursor backend)))]
allSubscriptions
        forConcurrently_ completionMVars $ \IORef (SubscriptionState (Cursor backend))
stateRef ->
            IORef (SubscriptionState (Cursor backend)) -> IO ()
forall position. IORef (SubscriptionState position) -> IO ()
waitForCompletion IORef (SubscriptionState (Cursor backend))
stateRef
        reportPhaseChangeIO progressManager "Analyzing" "All subscriptions completed, collecting results"

        results <- forM allSubscriptions $ \(SubscriptionHandle backend
handle, IORef (SubscriptionState (Cursor backend))
stateRef) -> do
            state <- IORef (SubscriptionState (Cursor backend))
-> IO (SubscriptionState (Cursor backend))
forall a. IORef a -> IO a
readIORef IORef (SubscriptionState (Cursor backend))
stateRef
            completedAt <- getCurrentTime
            handle.cancel -- Clean up (should already be stopped)
            let processingDurationMs = case SubscriptionState (Cursor backend)
state.completionReceivedAt of
                    Maybe UTCTime
Nothing -> Int64
0
                    Just UTCTime
eventCompletionTime ->
                        Double -> Int64
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int64) -> Double -> Int64
forall a b. (a -> b) -> a -> b
$ (Rational -> Double
forall a. Fractional a => Rational -> a
fromRational (Rational -> Double) -> Rational -> Double
forall a b. (a -> b) -> a -> b
$ NominalDiffTime -> Rational
forall a. Real a => a -> Rational
toRational (NominalDiffTime -> Rational) -> NominalDiffTime -> Rational
forall a b. (a -> b) -> a -> b
$ UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
eventCompletionTime SubscriptionState (Cursor backend)
state.startedAt) Double -> Double -> Double
forall a. Num a => a -> a -> a
* (Double
1000 :: Double)

            pure $
                TestSubscriptionResult
                    { resultSubscriptionId = state.subscriptionId
                    , resultStartedAt = state.startedAt
                    , resultFinalHash = state.currentHash
                    , resultEventCount = state.subEventCount
                    , resultFirstEventTime = state.firstEventTime
                    , resultLastEventTime = state.lastEventTime
                    , resultCompletedAt = completedAt
                    , resultCompletionReceived = state.completionReceived
                    , resultCompletionReceivedAt = state.completionReceivedAt
                    , resultExpectedTotalEvents = state.expectedTotalEvents
                    , resultProcessingDurationMs = processingDurationMs
                    , resultPositionsSeen = state.positionsSeen
                    }

        stopProgressReporting progressManager
        analyzeResults results

-- | Analyze subscription results for consistency
analyzeResults :: forall position. [TestSubscriptionResult position] -> Assertion
analyzeResults :: forall position. [TestSubscriptionResult position] -> IO ()
analyzeResults [TestSubscriptionResult position]
results = do
    [Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"\n=== Analysis of " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ([TestSubscriptionResult position] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TestSubscriptionResult position]
results) [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" subscriptions ==="

    let ([TestSubscriptionResult position]
earlyResults, [TestSubscriptionResult position]
concurrentResults, [TestSubscriptionResult position]
validationResults) =
            (TestSubscriptionResult position
 -> ([TestSubscriptionResult position],
     [TestSubscriptionResult position],
     [TestSubscriptionResult position])
 -> ([TestSubscriptionResult position],
     [TestSubscriptionResult position],
     [TestSubscriptionResult position]))
-> ([TestSubscriptionResult position],
    [TestSubscriptionResult position],
    [TestSubscriptionResult position])
-> [TestSubscriptionResult position]
-> ([TestSubscriptionResult position],
    [TestSubscriptionResult position],
    [TestSubscriptionResult position])
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr
                ( \TestSubscriptionResult position
r ([TestSubscriptionResult position]
e, [TestSubscriptionResult position]
c, [TestSubscriptionResult position]
v) ->
                    if Text
"early-" Text -> Text -> Bool
`isPrefixOf` TestSubscriptionResult position
r.resultSubscriptionId
                        then (TestSubscriptionResult position
r TestSubscriptionResult position
-> [TestSubscriptionResult position]
-> [TestSubscriptionResult position]
forall a. a -> [a] -> [a]
: [TestSubscriptionResult position]
e, [TestSubscriptionResult position]
c, [TestSubscriptionResult position]
v)
                        else
                            if Text
"concurrent-" Text -> Text -> Bool
`isPrefixOf` TestSubscriptionResult position
r.resultSubscriptionId
                                then ([TestSubscriptionResult position]
e, TestSubscriptionResult position
r TestSubscriptionResult position
-> [TestSubscriptionResult position]
-> [TestSubscriptionResult position]
forall a. a -> [a] -> [a]
: [TestSubscriptionResult position]
c, [TestSubscriptionResult position]
v)
                                else ([TestSubscriptionResult position]
e, [TestSubscriptionResult position]
c, TestSubscriptionResult position
r TestSubscriptionResult position
-> [TestSubscriptionResult position]
-> [TestSubscriptionResult position]
forall a. a -> [a] -> [a]
: [TestSubscriptionResult position]
v)
                )
                ([], [], [])
                [TestSubscriptionResult position]
results

    [Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Early subscriptions: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ([TestSubscriptionResult position] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TestSubscriptionResult position]
earlyResults)
    [Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Concurrent subscriptions: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ([TestSubscriptionResult position] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TestSubscriptionResult position]
concurrentResults)
    [Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Validation subscriptions: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ([TestSubscriptionResult position] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TestSubscriptionResult position]
validationResults)

    let eventCounts :: [Int]
eventCounts = (TestSubscriptionResult position -> Int)
-> [TestSubscriptionResult position] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (.resultEventCount) [TestSubscriptionResult position]
results
        minEvents :: Int
minEvents = [Int] -> Int
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
minimum [Int]
eventCounts
        maxEvents :: Int
maxEvents = [Int] -> Int
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum [Int]
eventCounts

    let avgEvents :: Double
avgEvents = Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral ([Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum [Int]
eventCounts) Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral ([Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int]
eventCounts) :: Double
    [Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$
        [Char]
"Event counts - Min: "
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
minEvents
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
", Max: "
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
maxEvents
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
", Avg: "
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show (Double -> Int
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round Double
avgEvents :: Int)

    let completedSubs :: [TestSubscriptionResult position]
completedSubs = (TestSubscriptionResult position -> Bool)
-> [TestSubscriptionResult position]
-> [TestSubscriptionResult position]
forall a. (a -> Bool) -> [a] -> [a]
filter (.resultCompletionReceived) [TestSubscriptionResult position]
results
        incompleteSubs :: [TestSubscriptionResult position]
incompleteSubs = (TestSubscriptionResult position -> Bool)
-> [TestSubscriptionResult position]
-> [TestSubscriptionResult position]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool)
-> (TestSubscriptionResult position -> Bool)
-> TestSubscriptionResult position
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (.resultCompletionReceived)) [TestSubscriptionResult position]
results

    [Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$
        [Char]
"Completion status - Completed: "
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ([TestSubscriptionResult position] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TestSubscriptionResult position]
completedSubs)
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
", Incomplete: "
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ([TestSubscriptionResult position] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TestSubscriptionResult position]
incompleteSubs)

    let hashes :: [Int64]
hashes = (TestSubscriptionResult position -> Int64)
-> [TestSubscriptionResult position] -> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map (.resultFinalHash) [TestSubscriptionResult position]
results
        uniqueHashes :: Int
uniqueHashes = Set Int64 -> Int
forall a. Set a -> Int
Set.size (Set Int64 -> Int) -> Set Int64 -> Int
forall a b. (a -> b) -> a -> b
$ [Int64] -> Set Int64
forall a. Ord a => [a] -> Set a
Set.fromList [Int64]
hashes

    [Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Unique final hashes: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
uniqueHashes

    [Char] -> IO ()
putStrLn [Char]
"\n=== SUMMARY STATISTICS ==="
    let processingDurations :: [Int64]
processingDurations = (TestSubscriptionResult position -> Int64)
-> [TestSubscriptionResult position] -> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map (.resultProcessingDurationMs) [TestSubscriptionResult position]
results
        minDuration :: Int64
minDuration = [Int64] -> Int64
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
minimum [Int64]
processingDurations
        maxDuration :: Int64
maxDuration = [Int64] -> Int64
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum [Int64]
processingDurations
        avgDuration :: Double
avgDuration = Int64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral ([Int64] -> Int64
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum [Int64]
processingDurations) Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral ([Int64] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int64]
processingDurations) :: Double

    [Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$
        [Char]
"Processing durations - Min: "
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int64 -> [Char]
forall a. Show a => a -> [Char]
show Int64
minDuration
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"ms, Max: "
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int64 -> [Char]
forall a. Show a => a -> [Char]
show Int64
maxDuration
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"ms, Avg: "
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show (Double -> Int
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round Double
avgDuration :: Int)
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
"ms"

    if Int
uniqueHashes Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
        then do
            [Char] -> IO ()
putStrLn [Char]
"\n✅ Hash consistency verified"

            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
minEvents Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
maxEvents) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                [Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$
                    [Char]
"⚠️  Event count mismatch: min=" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
minEvents [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
", max=" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
maxEvents

            let validationHashes :: [Int64]
validationHashes = (TestSubscriptionResult position -> Int64)
-> [TestSubscriptionResult position] -> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map (.resultFinalHash) [TestSubscriptionResult position]
validationResults
                earlyHashes :: [Int64]
earlyHashes = (TestSubscriptionResult position -> Int64)
-> [TestSubscriptionResult position] -> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map (.resultFinalHash) [TestSubscriptionResult position]
earlyResults

            case ([Int64]
validationHashes, [Int64]
earlyHashes) of
                (Int64
vh : [Int64]
_, Int64
eh : [Int64]
_) -> [Char] -> Int64 -> Int64 -> IO ()
forall a. (Eq a, Show a, HasCallStack) => [Char] -> a -> a -> IO ()
assertEqual [Char]
"Validation vs early mismatch" Int64
eh Int64
vh
                ([Int64], [Int64])
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        else do
            [Char] -> IO ()
putStrLn [Char]
"\n❌ Hash mismatch detected"
            let hashGroups :: Map Int64 Int
hashGroups = (Int -> Int -> Int) -> [(Int64, Int)] -> Map Int64 Int
forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
Map.fromListWith (Int -> Int -> Int
forall a. Num a => a -> a -> a
(+) :: Int -> Int -> Int) [(Int64
h, (Int
1 :: Int)) | Int64
h <- [Int64]
hashes]
            [(Int64, Int)] -> ((Int64, Int) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map Int64 Int -> [(Int64, Int)]
forall k a. Map k a -> [(k, a)]
Map.toList Map Int64 Int
hashGroups) (((Int64, Int) -> IO ()) -> IO ())
-> ((Int64, Int) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Int64
hash, Int
count) ->
                [Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"  Hash " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int64 -> [Char]
forall a. Show a => a -> [Char]
show Int64
hash [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
": " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
count [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" subscriptions"

            [Char] -> IO ()
forall a. HasCallStack => [Char] -> IO a
assertFailure ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
uniqueHashes [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" different hashes across " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show ([TestSubscriptionResult position] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TestSubscriptionResult position]
results) [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" subscriptions"

-- | Generate test label from configuration parameters
generateTestLabel :: String -> TestConfiguration -> String
generateTestLabel :: [Char] -> TestConfiguration -> [Char]
generateTestLabel [Char]
testType TestConfiguration
config =
    let totalSubs :: Int
totalSubs =
            TestConfiguration
config.configNumEarlySubscriptions
                Int -> Int -> Int
forall a. Num a => a -> a -> a
+ TestConfiguration
config.configNumConcurrentSubscriptions
                Int -> Int -> Int
forall a. Num a => a -> a -> a
+ TestConfiguration
config.configNumValidationSubscriptions
     in [Char]
testType
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" consistency test ("
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show TestConfiguration
config.configNumTransactions
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" tx, "
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
totalSubs
            [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" subs)"

-- | Backend-agnostic ordering test suite
orderingTests ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) =>
    EventStoreTestRunner backend ->
    [TestTree]
orderingTests :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
EventStoreTestRunner backend -> [TestTree]
orderingTests EventStoreTestRunner backend
runner =
    [ [Char] -> [TestTree] -> TestTree
testGroup
        [Char]
"Debug Tests"
        [ [Char] -> IO () -> TestTree
testCase
            ([Char] -> TestConfiguration -> [Char]
generateTestLabel [Char]
"Debug" TestConfiguration
debugTestConfig)
            (EventStoreTestRunner backend -> TestConfiguration -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
EventStoreTestRunner backend -> TestConfiguration -> IO ()
testComprehensiveConsistencyWithConfig EventStoreTestRunner backend
runner TestConfiguration
debugTestConfig)
        ]
    , [Char] -> [TestTree] -> TestTree
testGroup
        [Char]
"Comprehensive Consistency Tests"
        [ [Char] -> IO () -> TestTree
testCase
            ([Char] -> TestConfiguration -> [Char]
generateTestLabel [Char]
"Light" TestConfiguration
lightTestConfig)
            (EventStoreTestRunner backend -> TestConfiguration -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
EventStoreTestRunner backend -> TestConfiguration -> IO ()
testComprehensiveConsistencyWithConfig EventStoreTestRunner backend
runner TestConfiguration
lightTestConfig)
        , [Char] -> IO () -> TestTree
testCase
            ([Char] -> TestConfiguration -> [Char]
generateTestLabel [Char]
"Standard" TestConfiguration
defaultTestConfig)
            (EventStoreTestRunner backend -> TestConfiguration -> IO ()
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
EventStoreTestRunner backend -> TestConfiguration -> IO ()
testComprehensiveConsistencyWithConfig EventStoreTestRunner backend
runner TestConfiguration
defaultTestConfig)
            -- Stress test disabled by default (can hang on Ubuntu CI)
            -- testCase (generateTestLabel "Stress" stressTestConfig)
            --   (testComprehensiveConsistencyWithConfig runner stressTestConfig)
        ]
    ]