{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}

{- | Basic subscription and event handling tests

Tests fundamental event store operations:
- Event insertion and subscription
- Stream selection (all streams vs. single stream)
- Position-based subscription startup
- Correlation ID preservation
- Async subscription behavior
- Subscription stop semantics
- Exception handling and enrichment
-}
module Test.Hindsight.Store.BasicTests (basicTests) where

import Control.Concurrent (MVar, newEmptyMVar, putMVar, takeMVar, threadDelay)
import Control.Monad (mapM_)
import Data.Aeson (FromJSON, ToJSON)
import Data.IORef
import Data.Map.Strict qualified as Map
import Data.Proxy (Proxy (..))
import Data.Text (Text)
import Data.UUID.V4 qualified as UUID
import GHC.Generics (Generic)
import Hindsight.Events
import Hindsight.Store
import Test.Hindsight.Examples (Tombstone, UserCreated, UserInformation2 (..), makeTombstone, makeUserEvent)
import Test.Hindsight.Store.Common (collectEvents, extractPayload, handleTombstone)
import Test.Hindsight.Store.TestRunner (EventStoreTestRunner (..))
import Test.Tasty
import Test.Tasty.HUnit
import UnliftIO.Exception (fromException, throwIO, tryAny)

-- * Counter Events for Stop/Fail Testing

-- | Counter increment event for testing subscription stop behavior
type CounterInc = "counter_inc"

data CounterIncPayload = CounterIncPayload
    deriving stock (Int -> CounterIncPayload -> ShowS
[CounterIncPayload] -> ShowS
CounterIncPayload -> TestName
(Int -> CounterIncPayload -> ShowS)
-> (CounterIncPayload -> TestName)
-> ([CounterIncPayload] -> ShowS)
-> Show CounterIncPayload
forall a.
(Int -> a -> ShowS) -> (a -> TestName) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> CounterIncPayload -> ShowS
showsPrec :: Int -> CounterIncPayload -> ShowS
$cshow :: CounterIncPayload -> TestName
show :: CounterIncPayload -> TestName
$cshowList :: [CounterIncPayload] -> ShowS
showList :: [CounterIncPayload] -> ShowS
Show, CounterIncPayload -> CounterIncPayload -> Bool
(CounterIncPayload -> CounterIncPayload -> Bool)
-> (CounterIncPayload -> CounterIncPayload -> Bool)
-> Eq CounterIncPayload
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: CounterIncPayload -> CounterIncPayload -> Bool
== :: CounterIncPayload -> CounterIncPayload -> Bool
$c/= :: CounterIncPayload -> CounterIncPayload -> Bool
/= :: CounterIncPayload -> CounterIncPayload -> Bool
Eq, (forall x. CounterIncPayload -> Rep CounterIncPayload x)
-> (forall x. Rep CounterIncPayload x -> CounterIncPayload)
-> Generic CounterIncPayload
forall x. Rep CounterIncPayload x -> CounterIncPayload
forall x. CounterIncPayload -> Rep CounterIncPayload x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. CounterIncPayload -> Rep CounterIncPayload x
from :: forall x. CounterIncPayload -> Rep CounterIncPayload x
$cto :: forall x. Rep CounterIncPayload x -> CounterIncPayload
to :: forall x. Rep CounterIncPayload x -> CounterIncPayload
Generic)
    deriving anyclass (Maybe CounterIncPayload
Value -> Parser [CounterIncPayload]
Value -> Parser CounterIncPayload
(Value -> Parser CounterIncPayload)
-> (Value -> Parser [CounterIncPayload])
-> Maybe CounterIncPayload
-> FromJSON CounterIncPayload
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser CounterIncPayload
parseJSON :: Value -> Parser CounterIncPayload
$cparseJSONList :: Value -> Parser [CounterIncPayload]
parseJSONList :: Value -> Parser [CounterIncPayload]
$comittedField :: Maybe CounterIncPayload
omittedField :: Maybe CounterIncPayload
FromJSON, [CounterIncPayload] -> Value
[CounterIncPayload] -> Encoding
CounterIncPayload -> Bool
CounterIncPayload -> Value
CounterIncPayload -> Encoding
(CounterIncPayload -> Value)
-> (CounterIncPayload -> Encoding)
-> ([CounterIncPayload] -> Value)
-> ([CounterIncPayload] -> Encoding)
-> (CounterIncPayload -> Bool)
-> ToJSON CounterIncPayload
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: CounterIncPayload -> Value
toJSON :: CounterIncPayload -> Value
$ctoEncoding :: CounterIncPayload -> Encoding
toEncoding :: CounterIncPayload -> Encoding
$ctoJSONList :: [CounterIncPayload] -> Value
toJSONList :: [CounterIncPayload] -> Value
$ctoEncodingList :: [CounterIncPayload] -> Encoding
toEncodingList :: [CounterIncPayload] -> Encoding
$comitField :: CounterIncPayload -> Bool
omitField :: CounterIncPayload -> Bool
ToJSON)

type instance MaxVersion CounterInc = 0
type instance Versions CounterInc = '[CounterIncPayload]
instance Event CounterInc
instance MigrateVersion 0 CounterInc

-- | Counter stop event for testing subscription stop behavior
type CounterStop = "counter_stop"

data CounterStopPayload = CounterStopPayload
    deriving stock (Int -> CounterStopPayload -> ShowS
[CounterStopPayload] -> ShowS
CounterStopPayload -> TestName
(Int -> CounterStopPayload -> ShowS)
-> (CounterStopPayload -> TestName)
-> ([CounterStopPayload] -> ShowS)
-> Show CounterStopPayload
forall a.
(Int -> a -> ShowS) -> (a -> TestName) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> CounterStopPayload -> ShowS
showsPrec :: Int -> CounterStopPayload -> ShowS
$cshow :: CounterStopPayload -> TestName
show :: CounterStopPayload -> TestName
$cshowList :: [CounterStopPayload] -> ShowS
showList :: [CounterStopPayload] -> ShowS
Show, CounterStopPayload -> CounterStopPayload -> Bool
(CounterStopPayload -> CounterStopPayload -> Bool)
-> (CounterStopPayload -> CounterStopPayload -> Bool)
-> Eq CounterStopPayload
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: CounterStopPayload -> CounterStopPayload -> Bool
== :: CounterStopPayload -> CounterStopPayload -> Bool
$c/= :: CounterStopPayload -> CounterStopPayload -> Bool
/= :: CounterStopPayload -> CounterStopPayload -> Bool
Eq, (forall x. CounterStopPayload -> Rep CounterStopPayload x)
-> (forall x. Rep CounterStopPayload x -> CounterStopPayload)
-> Generic CounterStopPayload
forall x. Rep CounterStopPayload x -> CounterStopPayload
forall x. CounterStopPayload -> Rep CounterStopPayload x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. CounterStopPayload -> Rep CounterStopPayload x
from :: forall x. CounterStopPayload -> Rep CounterStopPayload x
$cto :: forall x. Rep CounterStopPayload x -> CounterStopPayload
to :: forall x. Rep CounterStopPayload x -> CounterStopPayload
Generic)
    deriving anyclass (Maybe CounterStopPayload
Value -> Parser [CounterStopPayload]
Value -> Parser CounterStopPayload
(Value -> Parser CounterStopPayload)
-> (Value -> Parser [CounterStopPayload])
-> Maybe CounterStopPayload
-> FromJSON CounterStopPayload
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser CounterStopPayload
parseJSON :: Value -> Parser CounterStopPayload
$cparseJSONList :: Value -> Parser [CounterStopPayload]
parseJSONList :: Value -> Parser [CounterStopPayload]
$comittedField :: Maybe CounterStopPayload
omittedField :: Maybe CounterStopPayload
FromJSON, [CounterStopPayload] -> Value
[CounterStopPayload] -> Encoding
CounterStopPayload -> Bool
CounterStopPayload -> Value
CounterStopPayload -> Encoding
(CounterStopPayload -> Value)
-> (CounterStopPayload -> Encoding)
-> ([CounterStopPayload] -> Value)
-> ([CounterStopPayload] -> Encoding)
-> (CounterStopPayload -> Bool)
-> ToJSON CounterStopPayload
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: CounterStopPayload -> Value
toJSON :: CounterStopPayload -> Value
$ctoEncoding :: CounterStopPayload -> Encoding
toEncoding :: CounterStopPayload -> Encoding
$ctoJSONList :: [CounterStopPayload] -> Value
toJSONList :: [CounterStopPayload] -> Value
$ctoEncodingList :: [CounterStopPayload] -> Encoding
toEncodingList :: [CounterStopPayload] -> Encoding
$comitField :: CounterStopPayload -> Bool
omitField :: CounterStopPayload -> Bool
ToJSON)

type instance MaxVersion CounterStop = 0
type instance Versions CounterStop = '[CounterStopPayload]
instance Event CounterStop
instance MigrateVersion 0 CounterStop

-- | Counter fail event for testing exception handling
type CounterFail = "counter_fail"

data CounterFailPayload = CounterFailPayload
    deriving stock (Int -> CounterFailPayload -> ShowS
[CounterFailPayload] -> ShowS
CounterFailPayload -> TestName
(Int -> CounterFailPayload -> ShowS)
-> (CounterFailPayload -> TestName)
-> ([CounterFailPayload] -> ShowS)
-> Show CounterFailPayload
forall a.
(Int -> a -> ShowS) -> (a -> TestName) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> CounterFailPayload -> ShowS
showsPrec :: Int -> CounterFailPayload -> ShowS
$cshow :: CounterFailPayload -> TestName
show :: CounterFailPayload -> TestName
$cshowList :: [CounterFailPayload] -> ShowS
showList :: [CounterFailPayload] -> ShowS
Show, CounterFailPayload -> CounterFailPayload -> Bool
(CounterFailPayload -> CounterFailPayload -> Bool)
-> (CounterFailPayload -> CounterFailPayload -> Bool)
-> Eq CounterFailPayload
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: CounterFailPayload -> CounterFailPayload -> Bool
== :: CounterFailPayload -> CounterFailPayload -> Bool
$c/= :: CounterFailPayload -> CounterFailPayload -> Bool
/= :: CounterFailPayload -> CounterFailPayload -> Bool
Eq, (forall x. CounterFailPayload -> Rep CounterFailPayload x)
-> (forall x. Rep CounterFailPayload x -> CounterFailPayload)
-> Generic CounterFailPayload
forall x. Rep CounterFailPayload x -> CounterFailPayload
forall x. CounterFailPayload -> Rep CounterFailPayload x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. CounterFailPayload -> Rep CounterFailPayload x
from :: forall x. CounterFailPayload -> Rep CounterFailPayload x
$cto :: forall x. Rep CounterFailPayload x -> CounterFailPayload
to :: forall x. Rep CounterFailPayload x -> CounterFailPayload
Generic)
    deriving anyclass (Maybe CounterFailPayload
Value -> Parser [CounterFailPayload]
Value -> Parser CounterFailPayload
(Value -> Parser CounterFailPayload)
-> (Value -> Parser [CounterFailPayload])
-> Maybe CounterFailPayload
-> FromJSON CounterFailPayload
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser CounterFailPayload
parseJSON :: Value -> Parser CounterFailPayload
$cparseJSONList :: Value -> Parser [CounterFailPayload]
parseJSONList :: Value -> Parser [CounterFailPayload]
$comittedField :: Maybe CounterFailPayload
omittedField :: Maybe CounterFailPayload
FromJSON, [CounterFailPayload] -> Value
[CounterFailPayload] -> Encoding
CounterFailPayload -> Bool
CounterFailPayload -> Value
CounterFailPayload -> Encoding
(CounterFailPayload -> Value)
-> (CounterFailPayload -> Encoding)
-> ([CounterFailPayload] -> Value)
-> ([CounterFailPayload] -> Encoding)
-> (CounterFailPayload -> Bool)
-> ToJSON CounterFailPayload
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: CounterFailPayload -> Value
toJSON :: CounterFailPayload -> Value
$ctoEncoding :: CounterFailPayload -> Encoding
toEncoding :: CounterFailPayload -> Encoding
$ctoJSONList :: [CounterFailPayload] -> Value
toJSONList :: [CounterFailPayload] -> Value
$ctoEncodingList :: [CounterFailPayload] -> Encoding
toEncodingList :: [CounterFailPayload] -> Encoding
$comitField :: CounterFailPayload -> Bool
omitField :: CounterFailPayload -> Bool
ToJSON)

type instance MaxVersion CounterFail = 0
type instance Versions CounterFail = '[CounterFailPayload]
instance Event CounterFail
instance MigrateVersion 0 CounterFail

-- | Helper to create a counter increment event
makeCounterInc :: SomeLatestEvent
makeCounterInc :: SomeLatestEvent
makeCounterInc =
    Proxy CounterInc
-> CurrentPayloadType CounterInc -> SomeLatestEvent
forall (event :: Symbol).
Event event =>
Proxy event -> CurrentPayloadType event -> SomeLatestEvent
SomeLatestEvent
        (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @CounterInc)
        CurrentPayloadType CounterInc
CounterIncPayload
CounterIncPayload

-- | Helper to create a counter stop event
makeCounterStop :: SomeLatestEvent
makeCounterStop :: SomeLatestEvent
makeCounterStop =
    Proxy CounterStop
-> CurrentPayloadType CounterStop -> SomeLatestEvent
forall (event :: Symbol).
Event event =>
Proxy event -> CurrentPayloadType event -> SomeLatestEvent
SomeLatestEvent
        (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @CounterStop)
        CurrentPayloadType CounterStop
CounterStopPayload
CounterStopPayload

-- | Helper to create a counter fail event
makeCounterFail :: SomeLatestEvent
makeCounterFail :: SomeLatestEvent
makeCounterFail =
    Proxy CounterFail
-> CurrentPayloadType CounterFail -> SomeLatestEvent
forall (event :: Symbol).
Event event =>
Proxy event -> CurrentPayloadType event -> SomeLatestEvent
SomeLatestEvent
        (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @CounterFail)
        CurrentPayloadType CounterFail
CounterFailPayload
CounterFailPayload

-- * Test Utilities

repeatTest :: Int -> TestName -> Assertion -> TestTree
repeatTest :: Int -> TestName -> Assertion -> TestTree
repeatTest Int
n TestName
name Assertion
assertion =
    TestName -> [TestTree] -> TestTree
testGroup (TestName
name TestName -> ShowS
forall a. Semigroup a => a -> a -> a
<> TestName
" x" TestName -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> TestName
forall a. Show a => a -> TestName
show Int
n) ([TestTree] -> TestTree) -> [TestTree] -> TestTree
forall a b. (a -> b) -> a -> b
$
        Int -> TestTree -> [TestTree]
forall a. Int -> a -> [a]
replicate Int
n (TestTree -> [TestTree]) -> TestTree -> [TestTree]
forall a b. (a -> b) -> a -> b
$
            TestName -> Assertion -> TestTree
testCase TestName
name Assertion
assertion

-- | Basic test suite for event store backends
basicTests ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) =>
    EventStoreTestRunner backend ->
    [TestTree]
basicTests :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
EventStoreTestRunner backend -> [TestTree]
basicTests EventStoreTestRunner backend
runner =
    [ TestName -> Assertion -> TestTree
testCase TestName
"Basic Event Reception" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testBasicEventReception
    , TestName -> Assertion -> TestTree
testCase TestName
"Correlation ID Preservation" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testCorrelationIdPreservation
    , TestName -> Assertion -> TestTree
testCase TestName
"Single Stream Selection" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testSingleStreamSelection
    , TestName -> Assertion -> TestTree
testCase TestName
"Start From Position" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testStartFromPosition
    , Int -> TestName -> Assertion -> TestTree
repeatTest Int
20 TestName
"Async Subscription Reception" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testAsyncSubscription
    , TestName -> Assertion -> TestTree
testCase TestName
"Subscription Honors Stop Result" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testSubscriptionStopBehavior
    , TestName -> Assertion -> TestTree
testCase TestName
"Handler Exception Enrichment" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testHandlerExceptionEnrichment
    ]

-- * Test Implementations

testBasicEventReception :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testBasicEventReception :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testBasicEventReception BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    receivedEvents <- newIORef []
    completionVar <- newEmptyMVar

    let testEvents = (Int -> SomeLatestEvent) -> [Int] -> [SomeLatestEvent]
forall a b. (a -> b) -> [a] -> [b]
map Int -> SomeLatestEvent
makeUserEvent [Int
1 .. Int
3] [SomeLatestEvent] -> [SomeLatestEvent] -> [SomeLatestEvent]
forall a. [a] -> [a] -> [a]
++ [SomeLatestEvent
makeTombstone]
    result <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))

    case result of
        FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert events: " TestName -> ShowS
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion InsertionSuccess backend
_ -> do
            handle <-
                BackendHandle backend
-> EventMatcher '[UserCreated, Tombstone] backend IO
-> EventSelector backend
-> IO (SubscriptionHandle backend)
forall backend (m :: * -> *) (ts :: [Symbol]).
(EventStore backend, StoreConstraints backend m) =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints backend m =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribe
                    BackendHandle backend
store
                    ( EventHandler UserCreated IO backend
-> (Proxy UserCreated, EventHandler UserCreated IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy UserCreated, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match UserCreated (IORef [EventEnvelope UserCreated backend]
-> EventHandler UserCreated IO backend
forall (event :: Symbol) backend.
IORef [EventEnvelope event backend]
-> EventHandler event IO backend
collectEvents IORef [EventEnvelope UserCreated backend]
receivedEvents)
                        (Proxy UserCreated, EventHandler UserCreated IO backend)
-> EventMatcher '[Tombstone] backend IO
-> EventMatcher '[UserCreated, Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventHandler Tombstone IO backend
-> (Proxy Tombstone, EventHandler Tombstone IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy Tombstone, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match Tombstone (MVar () -> EventHandler Tombstone IO backend
forall (event :: Symbol) backend.
MVar () -> EventHandler event IO backend
handleTombstone MVar ()
completionVar)
                        (Proxy Tombstone, EventHandler Tombstone IO backend)
-> EventMatcher '[] backend IO
-> EventMatcher '[Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventMatcher '[] backend IO
forall backend (m :: * -> *). EventMatcher '[] backend m
MatchEnd
                    )
                    EventSelector{streamId :: StreamSelector
streamId = StreamSelector
AllStreams, startupPosition :: StartupPosition backend
startupPosition = StartupPosition backend
forall backend. StartupPosition backend
FromBeginning}

            takeMVar completionVar
            handle.cancel -- Cancel subscription after completion
            events <- reverse <$> readIORef receivedEvents
            length events @?= 3

            let userInfos = (EventEnvelope UserCreated backend -> UserInformation2)
-> [EventEnvelope UserCreated backend] -> [UserInformation2]
forall a b. (a -> b) -> [a] -> [b]
map EventEnvelope UserCreated backend -> CurrentPayloadType UserCreated
EventEnvelope UserCreated backend -> UserInformation2
forall (event :: Symbol) backend.
EventEnvelope event backend -> CurrentPayloadType event
extractPayload [EventEnvelope UserCreated backend]
events
            length userInfos @?= 3
            let userNames :: [Text]
                userNames = (UserInformation2 -> Text) -> [UserInformation2] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (.userName) [UserInformation2]
userInfos
            userNames @?= ["user1", "user2", "user3"]

testSingleStreamSelection :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testSingleStreamSelection :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testSingleStreamSelection BackendHandle backend
store = do
    stream1 <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    stream2 <- StreamId <$> UUID.nextRandom
    receivedEvents <- newIORef []
    completionVar <- newEmptyMVar

    _ <- insertEvents store Nothing (multiEvent stream1 Any (map makeUserEvent [1 .. 3]))
    _ <- insertEvents store Nothing (multiEvent stream2 Any (map makeUserEvent [4 .. 6]))
    _ <- insertEvents store Nothing (Transaction (Map.fromList [(stream1, StreamWrite Any [makeTombstone])]))

    handle <-
        subscribe
            store
            ( match UserCreated (collectEvents receivedEvents)
                :? match Tombstone (handleTombstone completionVar)
                :? MatchEnd
            )
            EventSelector{streamId = SingleStream stream1, startupPosition = FromBeginning}

    takeMVar completionVar
    handle.cancel -- Cancel subscription after completion
    events <- reverse <$> readIORef receivedEvents
    let userInfos = (EventEnvelope UserCreated backend -> UserInformation2)
-> [EventEnvelope UserCreated backend] -> [UserInformation2]
forall a b. (a -> b) -> [a] -> [b]
map EventEnvelope UserCreated backend -> CurrentPayloadType UserCreated
EventEnvelope UserCreated backend -> UserInformation2
forall (event :: Symbol) backend.
EventEnvelope event backend -> CurrentPayloadType event
extractPayload [EventEnvelope UserCreated backend]
events
    length userInfos @?= 3
    let userNames = (UserInformation2 -> Text) -> [UserInformation2] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (.userName) [UserInformation2]
userInfos
    userNames @?= ["user1", "user2", "user3"]

testStartFromPosition :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testStartFromPosition :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testStartFromPosition BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    let testEvents = (Int -> SomeLatestEvent) -> [Int] -> [SomeLatestEvent]
forall a b. (a -> b) -> [a] -> [b]
map Int -> SomeLatestEvent
makeUserEvent [Int
1 .. Int
5]

    result <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite Any (take 3 testEvents))]))
    case result of
        FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert first batch: " TestName -> ShowS
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion (InsertionSuccess{finalCursor :: forall backend. InsertionSuccess backend -> Cursor backend
finalCursor = Cursor backend
cursor}) -> do
            _ <- BackendHandle backend
-> Maybe CorrelationId
-> Transaction [] backend
-> IO (InsertionResult backend)
forall backend (t :: * -> *) (m :: * -> *).
(EventStore backend, Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints backend m) =>
BackendHandle backend
-> Maybe CorrelationId
-> Transaction t backend
-> m (InsertionResult backend)
insertEvents BackendHandle backend
store Maybe CorrelationId
forall a. Maybe a
Nothing (Map StreamId (StreamWrite [] SomeLatestEvent backend)
-> Transaction [] backend
forall (t :: * -> *) backend.
Map StreamId (StreamWrite t SomeLatestEvent backend)
-> Transaction t backend
Transaction ([(StreamId, StreamWrite [] SomeLatestEvent backend)]
-> Map StreamId (StreamWrite [] SomeLatestEvent backend)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(StreamId
streamId, ExpectedVersion backend
-> [SomeLatestEvent] -> StreamWrite [] SomeLatestEvent backend
forall {k} (t :: k -> *) (e :: k) backend.
ExpectedVersion backend -> t e -> StreamWrite t e backend
StreamWrite ExpectedVersion backend
forall backend. ExpectedVersion backend
Any (Int -> [SomeLatestEvent] -> [SomeLatestEvent]
forall a. Int -> [a] -> [a]
drop Int
3 [SomeLatestEvent]
testEvents))]))
            _ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite Any [makeTombstone])]))

            receivedEvents <- newIORef []
            completionVar <- newEmptyMVar

            handle <-
                subscribe
                    store
                    ( match UserCreated (collectEvents receivedEvents)
                        :? match Tombstone (handleTombstone completionVar)
                        :? MatchEnd
                    )
                    EventSelector{streamId = AllStreams, startupPosition = FromPosition cursor}

            takeMVar completionVar
            handle.cancel -- Cancel subscription after completion
            events <- reverse <$> readIORef receivedEvents
            let userInfos = (EventEnvelope UserCreated backend -> UserInformation2)
-> [EventEnvelope UserCreated backend] -> [UserInformation2]
forall a b. (a -> b) -> [a] -> [b]
map EventEnvelope UserCreated backend -> CurrentPayloadType UserCreated
EventEnvelope UserCreated backend -> UserInformation2
forall (event :: Symbol) backend.
EventEnvelope event backend -> CurrentPayloadType event
extractPayload [EventEnvelope UserCreated backend]
events
            length userInfos @?= 2
            let userNames :: [Text]
                userNames = (UserInformation2 -> Text) -> [UserInformation2] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (.userName) [UserInformation2]
userInfos
            userNames @?= ["user4", "user5"]

testCorrelationIdPreservation :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testCorrelationIdPreservation :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testCorrelationIdPreservation BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    corrId <- CorrelationId <$> UUID.nextRandom
    receivedEvents <- newIORef []
    completionVar <- newEmptyMVar

    let testEvents = (Int -> SomeLatestEvent) -> [Int] -> [SomeLatestEvent]
forall a b. (a -> b) -> [a] -> [b]
map Int -> SomeLatestEvent
makeUserEvent [Int
1 .. Int
3] [SomeLatestEvent] -> [SomeLatestEvent] -> [SomeLatestEvent]
forall a. [a] -> [a] -> [a]
++ [SomeLatestEvent
makeTombstone]
    result <- insertEvents store (Just corrId) (Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))

    case result of
        FailedInsertion EventStoreError backend
err -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert events: " TestName -> ShowS
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion InsertionSuccess backend
_ -> do
            handle <-
                BackendHandle backend
-> EventMatcher '[UserCreated, Tombstone] backend IO
-> EventSelector backend
-> IO (SubscriptionHandle backend)
forall backend (m :: * -> *) (ts :: [Symbol]).
(EventStore backend, StoreConstraints backend m) =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints backend m =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribe
                    BackendHandle backend
store
                    ( EventHandler UserCreated IO backend
-> (Proxy UserCreated, EventHandler UserCreated IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy UserCreated, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match UserCreated (IORef [EventEnvelope UserCreated backend]
-> EventHandler UserCreated IO backend
forall (event :: Symbol) backend.
IORef [EventEnvelope event backend]
-> EventHandler event IO backend
collectEvents IORef [EventEnvelope UserCreated backend]
receivedEvents)
                        (Proxy UserCreated, EventHandler UserCreated IO backend)
-> EventMatcher '[Tombstone] backend IO
-> EventMatcher '[UserCreated, Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventHandler Tombstone IO backend
-> (Proxy Tombstone, EventHandler Tombstone IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy Tombstone, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match Tombstone (MVar () -> EventHandler Tombstone IO backend
forall (event :: Symbol) backend.
MVar () -> EventHandler event IO backend
handleTombstone MVar ()
completionVar)
                        (Proxy Tombstone, EventHandler Tombstone IO backend)
-> EventMatcher '[] backend IO
-> EventMatcher '[Tombstone] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventMatcher '[] backend IO
forall backend (m :: * -> *). EventMatcher '[] backend m
MatchEnd
                    )
                    EventSelector{streamId :: StreamSelector
streamId = StreamSelector
AllStreams, startupPosition :: StartupPosition backend
startupPosition = StartupPosition backend
forall backend. StartupPosition backend
FromBeginning}

            takeMVar completionVar
            handle.cancel -- Cancel subscription after completion
            events <- readIORef receivedEvents
            mapM_ (\EventEnvelope UserCreated backend
evt -> EventEnvelope UserCreated backend
evt.correlationId Maybe CorrelationId -> Maybe CorrelationId -> Assertion
forall a. (Eq a, Show a, HasCallStack) => a -> a -> Assertion
@?= CorrelationId -> Maybe CorrelationId
forall a. a -> Maybe a
Just CorrelationId
corrId) events

testAsyncSubscription :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testAsyncSubscription :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testAsyncSubscription BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    receivedEvents <- newIORef []
    completionVar <- newEmptyMVar

    handle <-
        subscribe
            store
            ( match UserCreated (collectEvents receivedEvents)
                :? match Tombstone (handleTombstone completionVar)
                :? MatchEnd
            )
            EventSelector{streamId = AllStreams, startupPosition = FromBeginning}

    let testEvents = (Int -> SomeLatestEvent) -> [Int] -> [SomeLatestEvent]
forall a b. (a -> b) -> [a] -> [b]
map Int -> SomeLatestEvent
makeUserEvent [Int
1 .. Int
3] [SomeLatestEvent] -> [SomeLatestEvent] -> [SomeLatestEvent]
forall a. [a] -> [a] -> [a]
++ [SomeLatestEvent
makeTombstone]
    result <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))

    case result of
        FailedInsertion EventStoreError backend
err -> do
            SubscriptionHandle backend
handle.cancel
            TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert events: " TestName -> ShowS
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion InsertionSuccess backend
_ -> do
            MVar () -> Assertion
forall a. MVar a -> IO a
takeMVar MVar ()
completionVar
            SubscriptionHandle backend
handle.cancel
            events <- [EventEnvelope UserCreated backend]
-> [EventEnvelope UserCreated backend]
forall a. [a] -> [a]
reverse ([EventEnvelope UserCreated backend]
 -> [EventEnvelope UserCreated backend])
-> IO [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [EventEnvelope UserCreated backend]
-> IO [EventEnvelope UserCreated backend]
forall a. IORef a -> IO a
readIORef IORef [EventEnvelope UserCreated backend]
receivedEvents
            length events @?= 3
            let userInfos = (EventEnvelope UserCreated backend -> UserInformation2)
-> [EventEnvelope UserCreated backend] -> [UserInformation2]
forall a b. (a -> b) -> [a] -> [b]
map EventEnvelope UserCreated backend -> CurrentPayloadType UserCreated
EventEnvelope UserCreated backend -> UserInformation2
forall (event :: Symbol) backend.
EventEnvelope event backend -> CurrentPayloadType event
extractPayload [EventEnvelope UserCreated backend]
events
            length userInfos @?= 3
            let userNames :: [Text]
                userNames = (UserInformation2 -> Text) -> [UserInformation2] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (.userName) [UserInformation2]
userInfos
            userNames @?= ["user1", "user2", "user3"]

{- | Test that subscriptions honor the Stop result from handlers

This is a critical property: when a handler returns Stop, the subscription
must not process any subsequent events. This test verifies that all backends
correctly implement this behavior.

Test sequence:
  1. Insert: [Inc, Inc, Stop, Inc, Inc]
  2. Handler: increments counter on Inc, returns Stop on Stop event
  3. Expected: counter = 2 (stopped before processing the last two Incs)
-}
testSubscriptionStopBehavior :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testSubscriptionStopBehavior :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testSubscriptionStopBehavior BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    counter <- newIORef (0 :: Int)
    completionVar <- newEmptyMVar

    -- Handler that increments counter and returns Continue
    let handleInc :: EventHandler CounterInc IO backend
        handleInc EventEnvelope CounterInc backend
_ = do
            IORef Int -> (Int -> (Int, ())) -> Assertion
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
counter (\Int
n -> (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, ()))
            SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue

    -- Handler that returns Stop (signaling subscription should end)
    let handleStop :: EventHandler CounterStop IO backend
        handleStop EventEnvelope CounterStop backend
_ = do
            MVar () -> () -> Assertion
forall a. MVar a -> a -> Assertion
putMVar MVar ()
completionVar ()
            SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Stop

    -- Start subscription before inserting events
    handle <-
        subscribe
            store
            ( match CounterInc handleInc
                :? match CounterStop handleStop
                :? MatchEnd
            )
            EventSelector{streamId = AllStreams, startupPosition = FromBeginning}

    -- Insert test sequence: 2 increments, then Stop, then 2 more increments
    let testEvents =
            [ SomeLatestEvent
makeCounterInc -- counter = 1
            , SomeLatestEvent
makeCounterInc -- counter = 2
            , SomeLatestEvent
makeCounterStop -- STOP HERE - should not process further
            , SomeLatestEvent
makeCounterInc -- should NOT be processed
            , SomeLatestEvent
makeCounterInc -- should NOT be processed
            ]

    result <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))

    case result of
        FailedInsertion EventStoreError backend
err -> do
            SubscriptionHandle backend
handle.cancel
            TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert events: " TestName -> ShowS
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion InsertionSuccess backend
_ -> do
            -- Wait for the Stop handler to signal completion
            MVar () -> Assertion
forall a. MVar a -> IO a
takeMVar MVar ()
completionVar

            -- Give a small grace period to catch any erroneous event processing
            -- If the backend is broken, it might process events after Stop
            Int -> Assertion
threadDelay Int
100000 -- 100ms
            SubscriptionHandle backend
handle.cancel

            -- Verify counter stopped at 2 (before the Stop event)
            finalCount <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
counter
            finalCount @?= 2

{- | Test that handler exceptions enrich failures with event context

When a handler throws an exception, the subscription should die (fail-fast).
The exception should be enriched with full event metadata for debugging.

Test sequence:
  1. Insert: [Inc, Inc, Fail, Inc, Inc]
  2. Handler: increments counter on Inc, throws exception on Fail
  3. Expected: counter = 2, subscription dies with HandlerException containing event metadata
-}
testHandlerExceptionEnrichment :: forall backend. (EventStore backend, StoreConstraints backend IO, Show (Cursor backend)) => BackendHandle backend -> IO ()
testHandlerExceptionEnrichment :: forall backend.
(EventStore backend, StoreConstraints backend IO,
 Show (Cursor backend)) =>
BackendHandle backend -> Assertion
testHandlerExceptionEnrichment BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    counter <- newIORef (0 :: Int)

    -- Handler that increments counter and returns Continue
    let handleInc :: EventHandler CounterInc IO backend
        handleInc EventEnvelope CounterInc backend
_ = do
            IORef Int -> (Int -> (Int, ())) -> Assertion
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
counter (\Int
n -> (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, ()))
            SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue

    -- Handler that throws a test exception
    let handleFail :: EventHandler CounterFail IO backend
        handleFail EventEnvelope CounterFail backend
_envelope = do
            IOError -> IO SubscriptionResult
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (IOError -> IO SubscriptionResult)
-> IOError -> IO SubscriptionResult
forall a b. (a -> b) -> a -> b
$ TestName -> IOError
userError TestName
"Test exception from CounterFail handler"

    -- Insert test sequence BEFORE starting subscription (to ensure events are ready)
    let testEvents =
            [ SomeLatestEvent
makeCounterInc -- counter = 1
            , SomeLatestEvent
makeCounterInc -- counter = 2
            , SomeLatestEvent
makeCounterFail -- EXCEPTION HERE - subscription should die
            , SomeLatestEvent
makeCounterInc -- should NOT be processed
            , SomeLatestEvent
makeCounterInc -- should NOT be processed
            ]

    result <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite Any testEvents)]))

    case result of
        FailedInsertion EventStoreError backend
err -> do
            TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Failed to insert events: " TestName -> ShowS
forall a. [a] -> [a] -> [a]
++ EventStoreError backend -> TestName
forall a. Show a => a -> TestName
show EventStoreError backend
err
        SuccessfulInsertion InsertionSuccess backend
_ -> do
            -- Start subscription AFTER inserting events
            handle <-
                BackendHandle backend
-> EventMatcher '[CounterInc, CounterFail] backend IO
-> EventSelector backend
-> IO (SubscriptionHandle backend)
forall backend (m :: * -> *) (ts :: [Symbol]).
(EventStore backend, StoreConstraints backend m) =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints backend m =>
BackendHandle backend
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribe
                    BackendHandle backend
store
                    ( EventHandler CounterInc IO backend
-> (Proxy CounterInc, EventHandler CounterInc IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy CounterInc, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match CounterInc EventHandler CounterInc IO backend
handleInc
                        (Proxy CounterInc, EventHandler CounterInc IO backend)
-> EventMatcher '[CounterFail] backend IO
-> EventMatcher '[CounterInc, CounterFail] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventHandler CounterFail IO backend
-> (Proxy CounterFail, EventHandler CounterFail IO backend)
forall (event :: Symbol) -> forall a. a -> (Proxy event, a)
forall a. a -> (Proxy CounterFail, a)
forall {k}. forall (event :: k) -> forall a. a -> (Proxy event, a)
match CounterFail EventHandler CounterFail IO backend
handleFail
                        (Proxy CounterFail, EventHandler CounterFail IO backend)
-> EventMatcher '[] backend IO
-> EventMatcher '[CounterFail] backend IO
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? EventMatcher '[] backend IO
forall backend (m :: * -> *). EventMatcher '[] backend m
MatchEnd
                    )
                    EventSelector{streamId :: StreamSelector
streamId = StreamSelector
AllStreams, startupPosition :: StartupPosition backend
startupPosition = StartupPosition backend
forall backend. StartupPosition backend
FromBeginning}

            -- Wait for subscription to complete or fail
            waitResult <- tryAny handle.wait

            -- Verify counter stopped at 2 (subscription died on Fail event)
            finalCount <- readIORef counter
            finalCount @?= 2

            -- Verify the exception is a HandlerException with proper metadata
            case waitResult of
                Left SomeException
exc -> case SomeException -> Maybe HandlerException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc of
                    Just (HandlerException{Maybe CorrelationId
Text
UTCTime
SomeException
EventId
StreamId
StreamVersion
originalException :: SomeException
failedEventPosition :: Text
failedEventId :: EventId
failedEventName :: Text
failedEventStreamId :: StreamId
failedEventStreamVersion :: StreamVersion
failedEventCorrelationId :: Maybe CorrelationId
failedEventCreatedAt :: UTCTime
failedEventCorrelationId :: HandlerException -> Maybe CorrelationId
failedEventCreatedAt :: HandlerException -> UTCTime
failedEventId :: HandlerException -> EventId
failedEventName :: HandlerException -> Text
failedEventPosition :: HandlerException -> Text
failedEventStreamId :: HandlerException -> StreamId
failedEventStreamVersion :: HandlerException -> StreamVersion
originalException :: HandlerException -> SomeException
..}) -> do
                        -- Verify exception enrichment
                        Text
failedEventName Text -> Text -> Assertion
forall a. (Eq a, Show a, HasCallStack) => a -> a -> Assertion
@?= Text
"counter_fail"
                        SomeException -> TestName
forall a. Show a => a -> TestName
show SomeException
originalException TestName -> TestName -> Assertion
forall a. (Eq a, Show a, HasCallStack) => a -> a -> Assertion
@?= TestName
"user error (Test exception from CounterFail handler)"
                    Maybe HandlerException
Nothing -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure (TestName -> Assertion) -> TestName -> Assertion
forall a b. (a -> b) -> a -> b
$ TestName
"Expected HandlerException, got: " TestName -> ShowS
forall a. [a] -> [a] -> [a]
++ SomeException -> TestName
forall a. Show a => a -> TestName
show SomeException
exc
                Right () -> TestName -> Assertion
forall a. HasCallStack => TestName -> IO a
assertFailure TestName
"Expected subscription to fail with HandlerException"