{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

{- | Stream version tests

Tests stream version functionality:
- Stream versions start at 1
- Stream versions are contiguous (no gaps)
- Stream versions are exposed in subscription envelopes
- Multiple streams have independent version sequences
-}
module Test.Hindsight.Store.StreamVersionTests (streamVersionTests) where

import Control.Concurrent (MVar, newEmptyMVar, putMVar, takeMVar)
import Data.IORef
import Data.Map.Strict qualified as Map
import Data.UUID.V4 qualified as UUID
import Hindsight.Store
import Test.Hindsight.Examples (Tombstone, UserCreated, makeTombstone, makeUserEvent)
import Test.Hindsight.Store.Common (handleTombstone)
import Test.Hindsight.Store.TestRunner (EventStoreTestRunner (..))
import Test.Tasty
import Test.Tasty.HUnit

-- | Stream version test suite for event store backends
streamVersionTests ::
    forall backend.
    (EventStore backend, StoreConstraints backend IO) =>
    EventStoreTestRunner backend ->
    [TestTree]
streamVersionTests :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
EventStoreTestRunner backend -> [TestTree]
streamVersionTests EventStoreTestRunner backend
runner =
    [ TestName -> Assertion -> TestTree
testCase TestName
"Stream Versions Start At 1" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testStreamVersionsStartAt1
    , TestName -> Assertion -> TestTree
testCase TestName
"Stream Versions Are Contiguous" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testStreamVersionsContiguous
    , TestName -> Assertion -> TestTree
testCase TestName
"Stream Versions Exposed In Subscription" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testStreamVersionExposedInSubscription
    , TestName -> Assertion -> TestTree
testCase TestName
"Multiple Streams Have Independent Versions" (Assertion -> TestTree) -> Assertion -> TestTree
forall a b. (a -> b) -> a -> b
$ EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
forall backend.
EventStoreTestRunner backend
-> forall a. (BackendHandle backend -> IO a) -> Assertion
withStore EventStoreTestRunner backend
runner BackendHandle backend -> Assertion
forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testIndependentStreamVersions
    ]

-- * Test Implementations

-- | Test that stream versions start at 1
testStreamVersionsStartAt1 :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testStreamVersionsStartAt1 :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testStreamVersionsStartAt1 BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    receivedVersions <- newIORef []
    completionVar <- newEmptyMVar

    let collectVersions IORef [a]
ref r
event = do
            IORef [a] -> ([a] -> ([a], ())) -> Assertion
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [a]
ref (\[a]
versions -> (r
event.streamVersion a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
versions, ()))
            SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue

    -- Insert first event
    _ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite NoStream [makeUserEvent 1, makeTombstone])]))

    handle <-
        subscribe
            store
            ( match UserCreated (collectVersions receivedVersions)
                :? match Tombstone (handleTombstone completionVar)
                :? MatchEnd
            )
            EventSelector{streamId = SingleStream streamId, startupPosition = FromBeginning}

    takeMVar completionVar
    handle.cancel

    versions <- reverse <$> readIORef receivedVersions
    versions @?= [StreamVersion 1]

-- | Test that stream versions are contiguous (no gaps)
testStreamVersionsContiguous :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testStreamVersionsContiguous :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testStreamVersionsContiguous BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    receivedVersions <- newIORef []
    completionVar <- newEmptyMVar

    let collectVersions IORef [a]
ref r
event = do
            IORef [a] -> ([a] -> ([a], ())) -> Assertion
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [a]
ref (\[a]
versions -> (r
event.streamVersion a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
versions, ()))
            SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue

    -- Insert 5 events in multiple batches
    _ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite NoStream [makeUserEvent 1, makeUserEvent 2])]))
    _ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite StreamExists [makeUserEvent 3])]))
    _ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite StreamExists [makeUserEvent 4, makeUserEvent 5, makeTombstone])]))

    handle <-
        subscribe
            store
            ( match UserCreated (collectVersions receivedVersions)
                :? match Tombstone (handleTombstone completionVar)
                :? MatchEnd
            )
            EventSelector{streamId = SingleStream streamId, startupPosition = FromBeginning}

    takeMVar completionVar
    handle.cancel

    versions <- reverse <$> readIORef receivedVersions
    -- Should be contiguous: 1, 2, 3, 4, 5
    versions @?= [StreamVersion 1, StreamVersion 2, StreamVersion 3, StreamVersion 4, StreamVersion 5]

-- | Test that stream versions are exposed in subscriptions
testStreamVersionExposedInSubscription :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testStreamVersionExposedInSubscription :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testStreamVersionExposedInSubscription BackendHandle backend
store = do
    streamId <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    receivedVersionRef <- newIORef Nothing
    completionVar <- newEmptyMVar

    let captureVersion IORef (Maybe a)
ref r
event = do
            IORef (Maybe a) -> (Maybe a -> (Maybe a, ())) -> Assertion
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Maybe a)
ref (\Maybe a
_ -> (a -> Maybe a
forall a. a -> Maybe a
Just r
event.streamVersion, ()))
            SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue

    _ <- insertEvents store Nothing (Transaction (Map.fromList [(streamId, StreamWrite NoStream [makeUserEvent 1, makeTombstone])]))

    handle <-
        subscribe
            store
            ( match UserCreated (captureVersion receivedVersionRef)
                :? match Tombstone (handleTombstone completionVar)
                :? MatchEnd
            )
            EventSelector{streamId = SingleStream streamId, startupPosition = FromBeginning}

    takeMVar completionVar
    handle.cancel

    mbVersion <- readIORef receivedVersionRef
    mbVersion @?= Just (StreamVersion 1)

-- | Test that multiple streams have independent version sequences
testIndependentStreamVersions :: forall backend. (EventStore backend, StoreConstraints backend IO) => BackendHandle backend -> IO ()
testIndependentStreamVersions :: forall backend.
(EventStore backend, StoreConstraints backend IO) =>
BackendHandle backend -> Assertion
testIndependentStreamVersions BackendHandle backend
store = do
    streamA <- UUID -> StreamId
StreamId (UUID -> StreamId) -> IO UUID -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom
    streamB <- StreamId <$> UUID.nextRandom
    streamC <- StreamId <$> UUID.nextRandom

    versionsA <- newIORef []
    versionsB <- newIORef []
    versionsC <- newIORef []
    completionVar <- newEmptyMVar

    let collectVersionsFor IORef [a]
ref r
event = do
            IORef [a] -> ([a] -> ([a], ())) -> Assertion
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [a]
ref (\[a]
versions -> (r
event.streamVersion a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
versions, ()))
            SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue

    -- Insert events to all three streams in mixed order, in the SAME transaction
    _ <-
        insertEvents store Nothing $
            Transaction
                ( Map.fromList
                    [ (streamA, StreamWrite NoStream [makeUserEvent 1, makeUserEvent 2]) -- A: versions 1, 2
                    , (streamB, StreamWrite NoStream [makeUserEvent 10]) -- B: version 1
                    , (streamC, StreamWrite NoStream [makeUserEvent 100, makeUserEvent 101, makeUserEvent 102]) -- C: versions 1, 2, 3
                    ]
                )

    -- Insert more events to verify independent counting
    _ <-
        insertEvents store Nothing $
            Transaction
                ( Map.fromList
                    [ (streamA, StreamWrite StreamExists [makeUserEvent 3]) -- A: version 3
                    , (streamB, StreamWrite StreamExists [makeUserEvent 11, makeUserEvent 12]) -- B: versions 2, 3
                    ]
                )

    -- Add tombstones
    _ <-
        insertEvents store Nothing $
            Transaction
                ( Map.fromList
                    [ (streamA, StreamWrite StreamExists [makeTombstone])
                    , (streamB, StreamWrite StreamExists [makeTombstone])
                    , (streamC, StreamWrite StreamExists [makeTombstone])
                    ]
                )

    -- Subscribe to all streams
    remainingTombstones <- newIORef (3 :: Int)
    let handleTombstones :: EventHandler Tombstone IO backend
        handleTombstones EventEnvelope Tombstone backend
_ = do
            remaining <- IORef Int -> (Int -> (Int, Int)) -> IO Int
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
remainingTombstones (\Int
n -> (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1))
            if remaining == 0
                then putMVar completionVar () >> pure Stop
                else pure Continue

    handle <-
        subscribe
            store
            ( match
                UserCreated
                ( \EventEnvelope UserCreated backend
event -> do
                    case EventEnvelope UserCreated backend
event.streamId of
                        StreamId
sid
                            | StreamId
sid StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
streamA -> IORef [StreamVersion] -> EventHandler UserCreated IO backend
forall {r} {a}.
HasField "streamVersion" r a =>
IORef [a] -> r -> IO SubscriptionResult
collectVersionsFor IORef [StreamVersion]
versionsA EventEnvelope UserCreated backend
event
                            | StreamId
sid StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
streamB -> IORef [StreamVersion] -> EventHandler UserCreated IO backend
forall {r} {a}.
HasField "streamVersion" r a =>
IORef [a] -> r -> IO SubscriptionResult
collectVersionsFor IORef [StreamVersion]
versionsB EventEnvelope UserCreated backend
event
                            | StreamId
sid StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
streamC -> IORef [StreamVersion] -> EventHandler UserCreated IO backend
forall {r} {a}.
HasField "streamVersion" r a =>
IORef [a] -> r -> IO SubscriptionResult
collectVersionsFor IORef [StreamVersion]
versionsC EventEnvelope UserCreated backend
event
                            | Bool
otherwise -> SubscriptionResult -> IO SubscriptionResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
                )
                :? match Tombstone handleTombstones
                :? MatchEnd
            )
            EventSelector{streamId = AllStreams, startupPosition = FromBeginning}

    takeMVar completionVar
    handle.cancel

    -- Verify each stream has independent version sequences
    vA <- reverse <$> readIORef versionsA
    vB <- reverse <$> readIORef versionsB
    vC <- reverse <$> readIORef versionsC

    vA @?= [StreamVersion 1, StreamVersion 2, StreamVersion 3]
    vB @?= [StreamVersion 1, StreamVersion 2, StreamVersion 3]
    vC @?= [StreamVersion 1, StreamVersion 2, StreamVersion 3]