{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}

{- |
Module      : Hindsight.Store.Memory
Description : In-memory event store for testing and development
Copyright   : (c) 2024
License     : BSD3
Maintainer  : maintainer@example.com
Stability   : experimental

= Overview

In-memory event store using STM (Software Transactional Memory) for fast, concurrent access.
Ideal for testing, development, and scenarios where events don't need to survive process restarts.

⚠️  __Data is lost on process termination__ - not suitable for production use.

= Quick Start

@
import Hindsight.Store.Memory (newMemoryStore)
import Hindsight

main :: IO ()
main = do
  -- Create store
  store <- newMemoryStore

  -- Insert events (see Hindsight.Store for details)
  streamId <- StreamId \<$\> UUID.nextRandom
  let event = mkEvent MyEvent myData
  result <- insertEvents store Nothing $ singleEvent streamId NoStream event

  -- Subscribe to events
  handle <- subscribe store matcher (EventSelector AllStreams FromBeginning)
  -- ... process events ...
@

= Use Cases

__When to use Memory store:__

* Unit and integration tests (fast, isolated)
* Development and prototyping
* Temporary event processing pipelines
* Scenarios where persistence isn't required

__When NOT to use Memory store:__

* Production systems requiring durability
* Multi-process applications (each process has separate state)
* Long-running services that can't afford data loss

= Trade-offs

__Advantages:__

* Fastest performance (no I/O)
* No external dependencies
* Simple setup (single function call)
* Thread-safe via STM

__Limitations:__

* Data lost on process termination or crash
* Memory usage grows with event count
* Single-process only (no sharing between instances)
* No built-in persistence or snapshots

= Implementation

Events and stream metadata stored in-memory using STM 'TVar's.
Subscriptions use 'STM' retry mechanism for efficient event notification.
-}
module Hindsight.Store.Memory (
    -- * Core Types
    MemoryStore,
    MemoryStoreHandle,
    MemoryCursor (..),

    -- * Store Operations
    newMemoryStore,

    -- * Re-exports
    module Hindsight.Store,
)
where

import Control.Concurrent.STM
import Control.Monad (forM, forM_)
import Control.Monad.IO.Class (liftIO)
import Data.Aeson (FromJSON (..), ToJSON (..))
import Data.Map.Strict qualified as Map
import Data.Time (getCurrentTime)
import Data.UUID.V4 qualified as UUID
import GHC.Generics (Generic)
import Hindsight.Store
import Hindsight.Store.Memory.Internal
import UnliftIO (MonadUnliftIO)

-- | Cursor implementation for the memory store
newtype MemoryCursor = MemoryCursor
    { MemoryCursor -> Integer
getSequenceNo :: Integer
    }
    deriving stock (Int -> MemoryCursor -> ShowS
[MemoryCursor] -> ShowS
MemoryCursor -> String
(Int -> MemoryCursor -> ShowS)
-> (MemoryCursor -> String)
-> ([MemoryCursor] -> ShowS)
-> Show MemoryCursor
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MemoryCursor -> ShowS
showsPrec :: Int -> MemoryCursor -> ShowS
$cshow :: MemoryCursor -> String
show :: MemoryCursor -> String
$cshowList :: [MemoryCursor] -> ShowS
showList :: [MemoryCursor] -> ShowS
Show, MemoryCursor -> MemoryCursor -> Bool
(MemoryCursor -> MemoryCursor -> Bool)
-> (MemoryCursor -> MemoryCursor -> Bool) -> Eq MemoryCursor
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MemoryCursor -> MemoryCursor -> Bool
== :: MemoryCursor -> MemoryCursor -> Bool
$c/= :: MemoryCursor -> MemoryCursor -> Bool
/= :: MemoryCursor -> MemoryCursor -> Bool
Eq, Eq MemoryCursor
Eq MemoryCursor =>
(MemoryCursor -> MemoryCursor -> Ordering)
-> (MemoryCursor -> MemoryCursor -> Bool)
-> (MemoryCursor -> MemoryCursor -> Bool)
-> (MemoryCursor -> MemoryCursor -> Bool)
-> (MemoryCursor -> MemoryCursor -> Bool)
-> (MemoryCursor -> MemoryCursor -> MemoryCursor)
-> (MemoryCursor -> MemoryCursor -> MemoryCursor)
-> Ord MemoryCursor
MemoryCursor -> MemoryCursor -> Bool
MemoryCursor -> MemoryCursor -> Ordering
MemoryCursor -> MemoryCursor -> MemoryCursor
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: MemoryCursor -> MemoryCursor -> Ordering
compare :: MemoryCursor -> MemoryCursor -> Ordering
$c< :: MemoryCursor -> MemoryCursor -> Bool
< :: MemoryCursor -> MemoryCursor -> Bool
$c<= :: MemoryCursor -> MemoryCursor -> Bool
<= :: MemoryCursor -> MemoryCursor -> Bool
$c> :: MemoryCursor -> MemoryCursor -> Bool
> :: MemoryCursor -> MemoryCursor -> Bool
$c>= :: MemoryCursor -> MemoryCursor -> Bool
>= :: MemoryCursor -> MemoryCursor -> Bool
$cmax :: MemoryCursor -> MemoryCursor -> MemoryCursor
max :: MemoryCursor -> MemoryCursor -> MemoryCursor
$cmin :: MemoryCursor -> MemoryCursor -> MemoryCursor
min :: MemoryCursor -> MemoryCursor -> MemoryCursor
Ord, (forall x. MemoryCursor -> Rep MemoryCursor x)
-> (forall x. Rep MemoryCursor x -> MemoryCursor)
-> Generic MemoryCursor
forall x. Rep MemoryCursor x -> MemoryCursor
forall x. MemoryCursor -> Rep MemoryCursor x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. MemoryCursor -> Rep MemoryCursor x
from :: forall x. MemoryCursor -> Rep MemoryCursor x
$cto :: forall x. Rep MemoryCursor x -> MemoryCursor
to :: forall x. Rep MemoryCursor x -> MemoryCursor
Generic)
    deriving anyclass (Maybe MemoryCursor
Value -> Parser [MemoryCursor]
Value -> Parser MemoryCursor
(Value -> Parser MemoryCursor)
-> (Value -> Parser [MemoryCursor])
-> Maybe MemoryCursor
-> FromJSON MemoryCursor
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser MemoryCursor
parseJSON :: Value -> Parser MemoryCursor
$cparseJSONList :: Value -> Parser [MemoryCursor]
parseJSONList :: Value -> Parser [MemoryCursor]
$comittedField :: Maybe MemoryCursor
omittedField :: Maybe MemoryCursor
FromJSON, [MemoryCursor] -> Value
[MemoryCursor] -> Encoding
MemoryCursor -> Bool
MemoryCursor -> Value
MemoryCursor -> Encoding
(MemoryCursor -> Value)
-> (MemoryCursor -> Encoding)
-> ([MemoryCursor] -> Value)
-> ([MemoryCursor] -> Encoding)
-> (MemoryCursor -> Bool)
-> ToJSON MemoryCursor
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: MemoryCursor -> Value
toJSON :: MemoryCursor -> Value
$ctoEncoding :: MemoryCursor -> Encoding
toEncoding :: MemoryCursor -> Encoding
$ctoJSONList :: [MemoryCursor] -> Value
toJSONList :: [MemoryCursor] -> Value
$ctoEncodingList :: [MemoryCursor] -> Encoding
toEncodingList :: [MemoryCursor] -> Encoding
$comitField :: MemoryCursor -> Bool
omitField :: MemoryCursor -> Bool
ToJSON)

-- | Type family instances
type instance Cursor MemoryStore = MemoryCursor

type instance BackendHandle MemoryStore = MemoryStoreHandle

-- | Handle for the memory store
newtype MemoryStoreHandle = MemoryStoreHandle
    { MemoryStoreHandle -> TVar (StoreState MemoryStore)
stateVar :: TVar (StoreState MemoryStore)
    }

-- | Creates a new memory store instance
newMemoryStore :: IO MemoryStoreHandle
newMemoryStore :: IO MemoryStoreHandle
newMemoryStore = do
    globalVar <- Integer -> IO (TVar Integer)
forall a. a -> IO (TVar a)
newTVarIO (-Integer
1)
    MemoryStoreHandle <$> newTVarIO (initialState globalVar)
  where
    initialState :: TVar Integer -> StoreState backend
initialState TVar Integer
globalVar =
        StoreState
            { nextSequence :: Integer
nextSequence = Integer
0
            , events :: Map Integer StoredEvent
events = Map Integer StoredEvent
forall k a. Map k a
Map.empty
            , streamEvents :: Map StreamId [Integer]
streamEvents = Map StreamId [Integer]
forall k a. Map k a
Map.empty
            , streamVersions :: Map StreamId (Cursor backend)
streamVersions = Map StreamId (Cursor backend)
forall k a. Map k a
Map.empty
            , streamLocalVersions :: Map StreamId StreamVersion
streamLocalVersions = Map StreamId StreamVersion
forall k a. Map k a
Map.empty
            , streamNotifications :: Map StreamId (TVar Integer)
streamNotifications = Map StreamId (TVar Integer)
forall k a. Map k a
Map.empty
            , globalNotification :: TVar Integer
globalNotification = TVar Integer
globalVar
            }

data MemoryStore

-- | Memory store implementation
instance EventStore MemoryStore where
    type StoreConstraints MemoryStore m = (MonadUnliftIO m)

    insertEvents :: forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints MemoryStore m) =>
BackendHandle MemoryStore
-> Maybe CorrelationId
-> Transaction t MemoryStore
-> m (InsertionResult MemoryStore)
insertEvents BackendHandle MemoryStore
handle Maybe CorrelationId
corrId Transaction t MemoryStore
transaction = IO (InsertionResult MemoryStore) -> m (InsertionResult MemoryStore)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (InsertionResult MemoryStore)
 -> m (InsertionResult MemoryStore))
-> IO (InsertionResult MemoryStore)
-> m (InsertionResult MemoryStore)
forall a b. (a -> b) -> a -> b
$ do
        -- First perform the basic insertion
        now <- IO UTCTime
getCurrentTime
        eventIds <- forM [1 .. totalEvents] $ \Int
_ -> UUID -> EventId
EventId (UUID -> EventId) -> IO UUID -> IO EventId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUID.nextRandom

        atomically $ do
            state <- readTVar handle.stateVar

            -- Check version constraints
            case checkAllVersions state batches of
                Left EventStoreError MemoryStore
mismatch -> InsertionResult MemoryStore -> STM (InsertionResult MemoryStore)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult MemoryStore -> STM (InsertionResult MemoryStore))
-> InsertionResult MemoryStore -> STM (InsertionResult MemoryStore)
forall a b. (a -> b) -> a -> b
$ EventStoreError MemoryStore -> InsertionResult MemoryStore
forall backend. EventStoreError backend -> InsertionResult backend
FailedInsertion EventStoreError MemoryStore
mismatch
                Right () -> do
                    -- Perform insertion
                    let (StoreState MemoryStore
newState, Cursor MemoryStore
finalCursor, Map StreamId (Cursor MemoryStore)
streamCursors) = StoreState MemoryStore
-> Maybe CorrelationId
-> UTCTime
-> [EventId]
-> Map StreamId (StreamWrite t SomeLatestEvent MemoryStore)
-> (StoreState MemoryStore, Cursor MemoryStore,
    Map StreamId (Cursor MemoryStore))
forall backend (t :: * -> *).
(StoreCursor backend, Foldable t) =>
StoreState backend
-> Maybe CorrelationId
-> UTCTime
-> [EventId]
-> Map StreamId (StreamWrite t SomeLatestEvent backend)
-> (StoreState backend, Cursor backend,
    Map StreamId (Cursor backend))
insertAllEvents StoreState MemoryStore
state Maybe CorrelationId
corrId UTCTime
now [EventId]
eventIds Map StreamId (StreamWrite t SomeLatestEvent MemoryStore)
batches
                    TVar (StoreState MemoryStore) -> StoreState MemoryStore -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar BackendHandle MemoryStore
MemoryStoreHandle
handle.stateVar StoreState MemoryStore
newState

                    -- Notify listeners
                    [StreamId] -> (StreamId -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map StreamId (StreamWrite t SomeLatestEvent MemoryStore)
-> [StreamId]
forall k a. Map k a -> [k]
Map.keys Map StreamId (StreamWrite t SomeLatestEvent MemoryStore)
batches) ((StreamId -> STM ()) -> STM ()) -> (StreamId -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \StreamId
streamId -> do
                        Maybe (TVar Integer) -> (TVar Integer -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (StreamId -> Map StreamId (TVar Integer) -> Maybe (TVar Integer)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamId StoreState MemoryStore
state.streamNotifications) ((TVar Integer -> STM ()) -> STM ())
-> (TVar Integer -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \TVar Integer
var ->
                            TVar Integer -> Integer -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Integer
var (MemoryCursor -> Integer
getSequenceNo Cursor MemoryStore
MemoryCursor
finalCursor)
                    TVar Integer -> Integer -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar StoreState MemoryStore
state.globalNotification (MemoryCursor -> Integer
getSequenceNo Cursor MemoryStore
MemoryCursor
finalCursor)

                    InsertionResult MemoryStore -> STM (InsertionResult MemoryStore)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult MemoryStore -> STM (InsertionResult MemoryStore))
-> InsertionResult MemoryStore -> STM (InsertionResult MemoryStore)
forall a b. (a -> b) -> a -> b
$
                        InsertionSuccess MemoryStore -> InsertionResult MemoryStore
forall backend. InsertionSuccess backend -> InsertionResult backend
SuccessfulInsertion (InsertionSuccess MemoryStore -> InsertionResult MemoryStore)
-> InsertionSuccess MemoryStore -> InsertionResult MemoryStore
forall a b. (a -> b) -> a -> b
$
                            InsertionSuccess
                                { finalCursor :: Cursor MemoryStore
finalCursor = Cursor MemoryStore
finalCursor
                                , streamCursors :: Map StreamId (Cursor MemoryStore)
streamCursors = Map StreamId (Cursor MemoryStore)
streamCursors
                                }
      where
        batches :: Map StreamId (StreamWrite t SomeLatestEvent MemoryStore)
batches = Transaction t MemoryStore
transaction.transactionWrites
        totalEvents :: Int
totalEvents = [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int] -> Int) -> [Int] -> Int
forall a b. (a -> b) -> a -> b
$ (StreamWrite t SomeLatestEvent MemoryStore -> Int)
-> [StreamWrite t SomeLatestEvent MemoryStore] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (t SomeLatestEvent -> Int
forall a. t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (t SomeLatestEvent -> Int)
-> (StreamWrite t SomeLatestEvent MemoryStore -> t SomeLatestEvent)
-> StreamWrite t SomeLatestEvent MemoryStore
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (.events)) ([StreamWrite t SomeLatestEvent MemoryStore] -> [Int])
-> [StreamWrite t SomeLatestEvent MemoryStore] -> [Int]
forall a b. (a -> b) -> a -> b
$ Map StreamId (StreamWrite t SomeLatestEvent MemoryStore)
-> [StreamWrite t SomeLatestEvent MemoryStore]
forall k a. Map k a -> [a]
Map.elems Map StreamId (StreamWrite t SomeLatestEvent MemoryStore)
batches

    subscribe :: forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints MemoryStore m =>
BackendHandle MemoryStore
-> EventMatcher ts MemoryStore m
-> EventSelector MemoryStore
-> m (SubscriptionHandle MemoryStore)
subscribe BackendHandle MemoryStore
handle EventMatcher ts MemoryStore m
matcher EventSelector MemoryStore
selector = TVar (StoreState MemoryStore)
-> EventMatcher ts MemoryStore m
-> EventSelector MemoryStore
-> m (SubscriptionHandle MemoryStore)
forall (m :: * -> *) backend (ts :: [Symbol]).
(MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
TVar (StoreState backend)
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribeToEvents BackendHandle MemoryStore
MemoryStoreHandle
handle.stateVar EventMatcher ts MemoryStore m
matcher EventSelector MemoryStore
selector

instance StoreCursor MemoryStore where
    makeCursor :: Integer -> Cursor MemoryStore
makeCursor = Integer -> Cursor MemoryStore
Integer -> MemoryCursor
MemoryCursor
    makeSequenceNo :: Cursor MemoryStore -> Integer
makeSequenceNo = (.getSequenceNo)