{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}

{- |
Module      : Hindsight.Store.Filesystem
Description : File-based persistent event store with multi-process support
Copyright   : (c) 2024
License     : BSD3
Maintainer  : maintainer@example.com
Stability   : experimental

= Overview

File-based event store persisting events as JSON on disk. Provides durability without
requiring a database. Suitable for single-node deployments with moderate event volumes.

Events stored in append-only @events.log@ file. Multi-process support via file locking
and fsnotify change detection.

= Quick Start

@
import Hindsight.Store.Filesystem

main :: IO ()
main = do
  -- Create store with default config
  config <- mkDefaultConfig "./events"
  store <- newFilesystemStore config

  -- Insert events (see Hindsight.Store for details)
  streamId <- StreamId \<$\> UUID.nextRandom
  let event = mkEvent MyEvent myData
  result <- insertEvents store Nothing $ singleEvent streamId NoStream event

  -- Subscribe to events
  handle <- subscribe store matcher (EventSelector AllStreams FromBeginning)
  -- ... process events ...

  -- Cleanup when done
  cleanupFilesystemStore store
@

= Configuration

'FilesystemStoreConfig' has three parameters:

* @storePath@ - Directory for event log and lock file
* @syncInterval@ - Disk sync frequency (microseconds, 0 = sync every write)
* @lockTimeout@ - Max time to wait for file lock (microseconds)

Use 'mkDefaultConfig' for sensible defaults or construct manually for custom settings.

= Use Cases

__When to use Filesystem store:__

* Single-node applications requiring durability
* Development/staging environments
* Embedded systems or edge deployments
* Apps that can't run PostgreSQL (resource constraints, deployment complexity)
* Multi-process applications on same host

__When NOT to use Filesystem store:__

* Distributed multi-node systems (use PostgreSQL)
* Very high event throughput (PostgreSQL performs better)
* Large event volumes (startup replay becomes slow)

= Trade-offs

__Advantages:__

* Events survive process restarts (durable)
* No database dependency
* Multi-process support on same host
* Simple deployment (just a directory)
* ACID guarantees via file locking

__Limitations:__

* Startup time grows with event count (linear log replay)
* All indices must fit in memory
* Single-node only (no distributed support)
* Performance limited by disk I/O
* Not suitable for very large datasets

= Implementation

Persistence layer over Memory store infrastructure. Events written to disk then loaded
into in-memory STM structures. File locking serializes writes. fsnotify detects changes
from other processes for incremental reloading.

Storage format: Append-only JSON log (@events.log@), one transaction per line.
Stream indices rebuilt on startup by replaying log.
-}
module Hindsight.Store.Filesystem (
    -- * Store Types
    FilesystemStore,
    FilesystemStoreHandle,
    FilesystemCursor (..),

    -- * Configuration
    FilesystemStoreConfig (..),
    mkDefaultConfig,
    getStoreConfig,

    -- * Store Operations
    newFilesystemStore,
    cleanupFilesystemStore,

    -- * Exceptions
    StoreException (..),

    -- * Testing Support
    EventLogEntry (..),
    StorePaths (..),
    getPaths,
)
where

import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (Async, async, cancel)
import Control.Concurrent.STM (
    TChan,
    TVar,
    atomically,
    dupTChan,
    modifyTVar',
    newBroadcastTChanIO,
    newTVarIO,
    readTChan,
    readTVar,
    writeTChan,
    writeTVar,
 )
import Control.Exception (Exception, SomeException, bracket, displayException, throwIO, try)
import Control.Monad (forM_, forever, void, when)
import Control.Monad.IO.Class (liftIO)
import Data.Aeson (FromJSON, ToJSON (..), decode, encode)
import Data.ByteString qualified as BS -- Strict ByteString for file operations
import Data.ByteString.Char8 qualified as BS8
import Data.ByteString.Lazy qualified as BL
import Data.ByteString.Lazy.Char8 qualified as BL8
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (listToMaybe)
import Data.Text (pack)
import Data.Time (UTCTime, getCurrentTime)
import Data.UUID (UUID)
import Data.UUID.V4 qualified as UUID
import UnliftIO (MonadUnliftIO)

-- (StoredEvent (..), StoreCursor (..), StoreState (..), updateState)

import GHC.Generics (Generic)
import Hindsight.Store (
    BackendHandle,
    Cursor,
    ErrorInfo (..),
    EventId (EventId),
    EventMatcher,
    EventSelector (..),
    EventStore (..),
    EventStoreError (BackendError),
    InsertionResult (FailedInsertion, SuccessfulInsertion),
    InsertionSuccess (..),
    StreamWrite (events),
    SubscriptionHandle (..),
    Transaction (..),
 )
import Hindsight.Store.Memory.Internal (
    StoreCursor (..),
    StoreState (..),
    StoredEvent (seqNo),
    checkAllVersions,
    insertAllEvents,
    subscribeToEvents,
    updateState,
 )
import System.Directory (canonicalizePath, createDirectoryIfMissing, doesFileExist, removeFile)
import System.FSNotify (Event (..), eventPath, watchDir, withManager)
import System.FileLock qualified as FL
import System.FilePath (takeDirectory, (</>))

-- No System.IO imports needed - using lazy ByteString operations instead
import System.Timeout (timeout)

-- | Configuration for filesystem store
data FilesystemStoreConfig = FilesystemStoreConfig
    { FilesystemStoreConfig -> FilePath
storePath :: FilePath
    -- ^ Base directory for store files
    , FilesystemStoreConfig -> Int
syncInterval :: Int
    -- ^ How often to sync to disk (number of writes)
    , FilesystemStoreConfig -> Int
lockTimeout :: Int
    -- ^ Timeout for acquiring locks (microseconds)
    }
    deriving (Int -> FilesystemStoreConfig -> ShowS
[FilesystemStoreConfig] -> ShowS
FilesystemStoreConfig -> FilePath
(Int -> FilesystemStoreConfig -> ShowS)
-> (FilesystemStoreConfig -> FilePath)
-> ([FilesystemStoreConfig] -> ShowS)
-> Show FilesystemStoreConfig
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> FilesystemStoreConfig -> ShowS
showsPrec :: Int -> FilesystemStoreConfig -> ShowS
$cshow :: FilesystemStoreConfig -> FilePath
show :: FilesystemStoreConfig -> FilePath
$cshowList :: [FilesystemStoreConfig] -> ShowS
showList :: [FilesystemStoreConfig] -> ShowS
Show, FilesystemStoreConfig -> FilesystemStoreConfig -> Bool
(FilesystemStoreConfig -> FilesystemStoreConfig -> Bool)
-> (FilesystemStoreConfig -> FilesystemStoreConfig -> Bool)
-> Eq FilesystemStoreConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: FilesystemStoreConfig -> FilesystemStoreConfig -> Bool
== :: FilesystemStoreConfig -> FilesystemStoreConfig -> Bool
$c/= :: FilesystemStoreConfig -> FilesystemStoreConfig -> Bool
/= :: FilesystemStoreConfig -> FilesystemStoreConfig -> Bool
Eq, (forall x. FilesystemStoreConfig -> Rep FilesystemStoreConfig x)
-> (forall x. Rep FilesystemStoreConfig x -> FilesystemStoreConfig)
-> Generic FilesystemStoreConfig
forall x. Rep FilesystemStoreConfig x -> FilesystemStoreConfig
forall x. FilesystemStoreConfig -> Rep FilesystemStoreConfig x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. FilesystemStoreConfig -> Rep FilesystemStoreConfig x
from :: forall x. FilesystemStoreConfig -> Rep FilesystemStoreConfig x
$cto :: forall x. Rep FilesystemStoreConfig x -> FilesystemStoreConfig
to :: forall x. Rep FilesystemStoreConfig x -> FilesystemStoreConfig
Generic)
    deriving anyclass (Maybe FilesystemStoreConfig
Value -> Parser [FilesystemStoreConfig]
Value -> Parser FilesystemStoreConfig
(Value -> Parser FilesystemStoreConfig)
-> (Value -> Parser [FilesystemStoreConfig])
-> Maybe FilesystemStoreConfig
-> FromJSON FilesystemStoreConfig
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser FilesystemStoreConfig
parseJSON :: Value -> Parser FilesystemStoreConfig
$cparseJSONList :: Value -> Parser [FilesystemStoreConfig]
parseJSONList :: Value -> Parser [FilesystemStoreConfig]
$comittedField :: Maybe FilesystemStoreConfig
omittedField :: Maybe FilesystemStoreConfig
FromJSON, [FilesystemStoreConfig] -> Value
[FilesystemStoreConfig] -> Encoding
FilesystemStoreConfig -> Bool
FilesystemStoreConfig -> Value
FilesystemStoreConfig -> Encoding
(FilesystemStoreConfig -> Value)
-> (FilesystemStoreConfig -> Encoding)
-> ([FilesystemStoreConfig] -> Value)
-> ([FilesystemStoreConfig] -> Encoding)
-> (FilesystemStoreConfig -> Bool)
-> ToJSON FilesystemStoreConfig
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: FilesystemStoreConfig -> Value
toJSON :: FilesystemStoreConfig -> Value
$ctoEncoding :: FilesystemStoreConfig -> Encoding
toEncoding :: FilesystemStoreConfig -> Encoding
$ctoJSONList :: [FilesystemStoreConfig] -> Value
toJSONList :: [FilesystemStoreConfig] -> Value
$ctoEncodingList :: [FilesystemStoreConfig] -> Encoding
toEncodingList :: [FilesystemStoreConfig] -> Encoding
$comitField :: FilesystemStoreConfig -> Bool
omitField :: FilesystemStoreConfig -> Bool
ToJSON)

-- | An entry in our event log represents a transaction state change.
data EventLogEntry = EventLogEntry
    { EventLogEntry -> UUID
transactionId :: UUID
    -- ^ Unique transaction identifier
    , EventLogEntry -> [StoredEvent]
events :: [StoredEvent]
    -- ^ Events written in this transaction
    , EventLogEntry -> UTCTime
timestamp :: UTCTime
    -- ^ When the transaction was written
    }
    deriving stock (Int -> EventLogEntry -> ShowS
[EventLogEntry] -> ShowS
EventLogEntry -> FilePath
(Int -> EventLogEntry -> ShowS)
-> (EventLogEntry -> FilePath)
-> ([EventLogEntry] -> ShowS)
-> Show EventLogEntry
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> EventLogEntry -> ShowS
showsPrec :: Int -> EventLogEntry -> ShowS
$cshow :: EventLogEntry -> FilePath
show :: EventLogEntry -> FilePath
$cshowList :: [EventLogEntry] -> ShowS
showList :: [EventLogEntry] -> ShowS
Show, EventLogEntry -> EventLogEntry -> Bool
(EventLogEntry -> EventLogEntry -> Bool)
-> (EventLogEntry -> EventLogEntry -> Bool) -> Eq EventLogEntry
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: EventLogEntry -> EventLogEntry -> Bool
== :: EventLogEntry -> EventLogEntry -> Bool
$c/= :: EventLogEntry -> EventLogEntry -> Bool
/= :: EventLogEntry -> EventLogEntry -> Bool
Eq, (forall x. EventLogEntry -> Rep EventLogEntry x)
-> (forall x. Rep EventLogEntry x -> EventLogEntry)
-> Generic EventLogEntry
forall x. Rep EventLogEntry x -> EventLogEntry
forall x. EventLogEntry -> Rep EventLogEntry x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. EventLogEntry -> Rep EventLogEntry x
from :: forall x. EventLogEntry -> Rep EventLogEntry x
$cto :: forall x. Rep EventLogEntry x -> EventLogEntry
to :: forall x. Rep EventLogEntry x -> EventLogEntry
Generic)
    deriving anyclass (Maybe EventLogEntry
Value -> Parser [EventLogEntry]
Value -> Parser EventLogEntry
(Value -> Parser EventLogEntry)
-> (Value -> Parser [EventLogEntry])
-> Maybe EventLogEntry
-> FromJSON EventLogEntry
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser EventLogEntry
parseJSON :: Value -> Parser EventLogEntry
$cparseJSONList :: Value -> Parser [EventLogEntry]
parseJSONList :: Value -> Parser [EventLogEntry]
$comittedField :: Maybe EventLogEntry
omittedField :: Maybe EventLogEntry
FromJSON, [EventLogEntry] -> Value
[EventLogEntry] -> Encoding
EventLogEntry -> Bool
EventLogEntry -> Value
EventLogEntry -> Encoding
(EventLogEntry -> Value)
-> (EventLogEntry -> Encoding)
-> ([EventLogEntry] -> Value)
-> ([EventLogEntry] -> Encoding)
-> (EventLogEntry -> Bool)
-> ToJSON EventLogEntry
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: EventLogEntry -> Value
toJSON :: EventLogEntry -> Value
$ctoEncoding :: EventLogEntry -> Encoding
toEncoding :: EventLogEntry -> Encoding
$ctoJSONList :: [EventLogEntry] -> Value
toJSONList :: [EventLogEntry] -> Value
$ctoEncodingList :: [EventLogEntry] -> Encoding
toEncodingList :: [EventLogEntry] -> Encoding
$comitField :: EventLogEntry -> Bool
omitField :: EventLogEntry -> Bool
ToJSON)

-- | Cursor for filesystem store.
newtype FilesystemCursor = FilesystemCursor
    { FilesystemCursor -> Integer
getSequenceNo :: Integer
    -- ^ Global sequence number for event ordering
    }
    deriving (Int -> FilesystemCursor -> ShowS
[FilesystemCursor] -> ShowS
FilesystemCursor -> FilePath
(Int -> FilesystemCursor -> ShowS)
-> (FilesystemCursor -> FilePath)
-> ([FilesystemCursor] -> ShowS)
-> Show FilesystemCursor
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> FilesystemCursor -> ShowS
showsPrec :: Int -> FilesystemCursor -> ShowS
$cshow :: FilesystemCursor -> FilePath
show :: FilesystemCursor -> FilePath
$cshowList :: [FilesystemCursor] -> ShowS
showList :: [FilesystemCursor] -> ShowS
Show, FilesystemCursor -> FilesystemCursor -> Bool
(FilesystemCursor -> FilesystemCursor -> Bool)
-> (FilesystemCursor -> FilesystemCursor -> Bool)
-> Eq FilesystemCursor
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: FilesystemCursor -> FilesystemCursor -> Bool
== :: FilesystemCursor -> FilesystemCursor -> Bool
$c/= :: FilesystemCursor -> FilesystemCursor -> Bool
/= :: FilesystemCursor -> FilesystemCursor -> Bool
Eq, Eq FilesystemCursor
Eq FilesystemCursor =>
(FilesystemCursor -> FilesystemCursor -> Ordering)
-> (FilesystemCursor -> FilesystemCursor -> Bool)
-> (FilesystemCursor -> FilesystemCursor -> Bool)
-> (FilesystemCursor -> FilesystemCursor -> Bool)
-> (FilesystemCursor -> FilesystemCursor -> Bool)
-> (FilesystemCursor -> FilesystemCursor -> FilesystemCursor)
-> (FilesystemCursor -> FilesystemCursor -> FilesystemCursor)
-> Ord FilesystemCursor
FilesystemCursor -> FilesystemCursor -> Bool
FilesystemCursor -> FilesystemCursor -> Ordering
FilesystemCursor -> FilesystemCursor -> FilesystemCursor
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: FilesystemCursor -> FilesystemCursor -> Ordering
compare :: FilesystemCursor -> FilesystemCursor -> Ordering
$c< :: FilesystemCursor -> FilesystemCursor -> Bool
< :: FilesystemCursor -> FilesystemCursor -> Bool
$c<= :: FilesystemCursor -> FilesystemCursor -> Bool
<= :: FilesystemCursor -> FilesystemCursor -> Bool
$c> :: FilesystemCursor -> FilesystemCursor -> Bool
> :: FilesystemCursor -> FilesystemCursor -> Bool
$c>= :: FilesystemCursor -> FilesystemCursor -> Bool
>= :: FilesystemCursor -> FilesystemCursor -> Bool
$cmax :: FilesystemCursor -> FilesystemCursor -> FilesystemCursor
max :: FilesystemCursor -> FilesystemCursor -> FilesystemCursor
$cmin :: FilesystemCursor -> FilesystemCursor -> FilesystemCursor
min :: FilesystemCursor -> FilesystemCursor -> FilesystemCursor
Ord, (forall x. FilesystemCursor -> Rep FilesystemCursor x)
-> (forall x. Rep FilesystemCursor x -> FilesystemCursor)
-> Generic FilesystemCursor
forall x. Rep FilesystemCursor x -> FilesystemCursor
forall x. FilesystemCursor -> Rep FilesystemCursor x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. FilesystemCursor -> Rep FilesystemCursor x
from :: forall x. FilesystemCursor -> Rep FilesystemCursor x
$cto :: forall x. Rep FilesystemCursor x -> FilesystemCursor
to :: forall x. Rep FilesystemCursor x -> FilesystemCursor
Generic)
    deriving anyclass (Maybe FilesystemCursor
Value -> Parser [FilesystemCursor]
Value -> Parser FilesystemCursor
(Value -> Parser FilesystemCursor)
-> (Value -> Parser [FilesystemCursor])
-> Maybe FilesystemCursor
-> FromJSON FilesystemCursor
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser FilesystemCursor
parseJSON :: Value -> Parser FilesystemCursor
$cparseJSONList :: Value -> Parser [FilesystemCursor]
parseJSONList :: Value -> Parser [FilesystemCursor]
$comittedField :: Maybe FilesystemCursor
omittedField :: Maybe FilesystemCursor
FromJSON, [FilesystemCursor] -> Value
[FilesystemCursor] -> Encoding
FilesystemCursor -> Bool
FilesystemCursor -> Value
FilesystemCursor -> Encoding
(FilesystemCursor -> Value)
-> (FilesystemCursor -> Encoding)
-> ([FilesystemCursor] -> Value)
-> ([FilesystemCursor] -> Encoding)
-> (FilesystemCursor -> Bool)
-> ToJSON FilesystemCursor
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: FilesystemCursor -> Value
toJSON :: FilesystemCursor -> Value
$ctoEncoding :: FilesystemCursor -> Encoding
toEncoding :: FilesystemCursor -> Encoding
$ctoJSONList :: [FilesystemCursor] -> Value
toJSONList :: [FilesystemCursor] -> Value
$ctoEncodingList :: [FilesystemCursor] -> Encoding
toEncodingList :: [FilesystemCursor] -> Encoding
$comitField :: FilesystemCursor -> Bool
omitField :: FilesystemCursor -> Bool
ToJSON)

{- | Notifier for cross-process event notifications.

Watches the event log file and broadcasts changes to subscribers.
The central reload thread updates the in-memory state when files change.
-}
data Notifier = Notifier
    { Notifier -> Async ()
notifierThread :: Async ()
    -- ^ File watcher thread using fsnotify
    , Notifier -> Async ()
reloadThread :: Async ()
    -- ^ Central reload thread (one per store) that updates in-memory state
    }

-- | Store type marker
data FilesystemStore

-- Type family instances
type instance Cursor FilesystemStore = FilesystemCursor

type instance BackendHandle FilesystemStore = FilesystemStoreHandle

-- | Custom exceptions
data StoreException
    = LockTimeout FilePath
    | CorruptEventLog FilePath String
    deriving (Int -> StoreException -> ShowS
[StoreException] -> ShowS
StoreException -> FilePath
(Int -> StoreException -> ShowS)
-> (StoreException -> FilePath)
-> ([StoreException] -> ShowS)
-> Show StoreException
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> StoreException -> ShowS
showsPrec :: Int -> StoreException -> ShowS
$cshow :: StoreException -> FilePath
show :: StoreException -> FilePath
$cshowList :: [StoreException] -> ShowS
showList :: [StoreException] -> ShowS
Show)

instance Exception StoreException

-- | Handle for filesystem store operations.
data FilesystemStoreHandle = FilesystemStoreHandle
    { FilesystemStoreHandle -> FilesystemStoreConfig
config :: FilesystemStoreConfig
    -- ^ Store configuration
    , FilesystemStoreHandle -> TVar (StoreState FilesystemStore)
stateVar :: TVar (StoreState FilesystemStore)
    -- ^ In-memory event store state
    , FilesystemStoreHandle -> Notifier
notifier :: Notifier
    -- ^ File watcher and reload threads
    }

-- | File paths used by the store.
data StorePaths = StorePaths
    { StorePaths -> FilePath
eventLogPath :: FilePath
    -- ^ Path to the append-only event log file
    , StorePaths -> FilePath
storeLockPath :: FilePath
    -- ^ Path to the lock file for write coordination
    }

-- | Compute file paths for store files within a base directory.
getPaths ::
    -- | Base directory for the store
    FilePath ->
    -- | Computed file paths
    StorePaths
getPaths :: FilePath -> StorePaths
getPaths FilePath
base =
    StorePaths
        { eventLogPath :: FilePath
eventLogPath = FilePath
base FilePath -> ShowS
</> FilePath
"events.log"
        , storeLockPath :: FilePath
storeLockPath = FilePath
base FilePath -> ShowS
</> FilePath
"store.lock"
        }

{- | Create a default configuration for a filesystem store.

Defaults: sync every write, 5-second lock timeout.
-}
mkDefaultConfig ::
    -- | Base directory for store files
    FilePath ->
    -- | Configuration with defaults
    FilesystemStoreConfig
mkDefaultConfig :: FilePath -> FilesystemStoreConfig
mkDefaultConfig FilePath
path =
    FilesystemStoreConfig
        { storePath :: FilePath
storePath = FilePath
path
        , syncInterval :: Int
syncInterval = Int
1 -- Sync every write by default
        , lockTimeout :: Int
lockTimeout = Int
5000000 -- 5 seconds (allows ~25 instances @ 200ms each)
        }

{- | Get the configuration from a store handle.

Useful for accessing the store path during cleanup or creating additional instances.
-}
getStoreConfig ::
    -- | Store handle
    FilesystemStoreHandle ->
    -- | Store configuration
    FilesystemStoreConfig
getStoreConfig :: FilesystemStoreHandle -> FilesystemStoreConfig
getStoreConfig = (.config)

-- | Creates required directories
ensureDirectories :: FilePath -> IO ()
ensureDirectories :: FilePath -> IO ()
ensureDirectories FilePath
path = do
    let dir :: FilePath
dir = ShowS
takeDirectory FilePath
path
    Bool -> FilePath -> IO ()
createDirectoryIfMissing Bool
True FilePath
dir

{- | Safely perform operations with global store lock (direct config version)
Used by central reload thread which doesn't have access to handle
-}
withStoreLockDirect :: FilesystemStoreConfig -> IO a -> IO a
withStoreLockDirect :: forall a. FilesystemStoreConfig -> IO a -> IO a
withStoreLockDirect FilesystemStoreConfig
config IO a
action = do
    let paths :: StorePaths
paths = FilePath -> StorePaths
getPaths FilesystemStoreConfig
config.storePath
    -- Try to acquire lock with timeout
    result <-
        Int -> IO a -> IO (Maybe a)
forall a. Int -> IO a -> IO (Maybe a)
timeout FilesystemStoreConfig
config.lockTimeout (IO a -> IO (Maybe a)) -> IO a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$
            IO FileLock -> (FileLock -> IO ()) -> (FileLock -> IO a) -> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
                (FilePath -> SharedExclusive -> IO FileLock
FL.lockFile StorePaths
paths.storeLockPath SharedExclusive
FL.Exclusive)
                FileLock -> IO ()
FL.unlockFile
                (IO a -> FileLock -> IO a
forall a b. a -> b -> a
const IO a
action)
    case result of
        Maybe a
Nothing -> StoreException -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (StoreException -> IO a) -> StoreException -> IO a
forall a b. (a -> b) -> a -> b
$ FilePath -> StoreException
LockTimeout StorePaths
paths.storeLockPath
        Just a
value -> a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
value

-- | Safely perform operations with global store lock
withStoreLock :: FilesystemStoreHandle -> IO a -> IO a
withStoreLock :: forall a. FilesystemStoreHandle -> IO a -> IO a
withStoreLock FilesystemStoreHandle
handle = FilesystemStoreConfig -> IO a -> IO a
forall a. FilesystemStoreConfig -> IO a -> IO a
withStoreLockDirect FilesystemStoreHandle
handle.config

{- | Initialize or open a filesystem store.

Creates the store directory and event log file if they don't exist.
If the event log exists and contains events, automatically reloads them
into memory. Starts file watchers for cross-process event notifications.

This function handles both fresh stores and reopening existing stores,
making it suitable for process restarts and multi-instance deployments.
-}
newFilesystemStore ::
    -- | Store configuration
    FilesystemStoreConfig ->
    -- | Initialized store handle
    IO FilesystemStoreHandle
newFilesystemStore :: FilesystemStoreConfig -> IO FilesystemStoreHandle
newFilesystemStore FilesystemStoreConfig
config = do
    let paths :: StorePaths
paths = FilePath -> StorePaths
getPaths FilesystemStoreConfig
config.storePath

    -- Ensure directories exist
    FilePath -> IO ()
ensureDirectories FilesystemStoreConfig
config.storePath
    -- Create event log if it doesn't exist (like 'touch')
    exists <- FilePath -> IO Bool
doesFileExist StorePaths
paths.eventLogPath
    when (not exists) $ BS.writeFile paths.eventLogPath ""

    -- Initialize memory store components
    globalVar <- newTVarIO (-1)
    stateVar <-
        newTVarIO $
            StoreState
                { nextSequence = 0
                , events = Map.empty
                , streamEvents = Map.empty
                , streamVersions = Map.empty
                , streamLocalVersions = Map.empty
                , streamNotifications = Map.empty
                , globalNotification = globalVar
                }

    -- Start the file watcher notifier with central reload thread
    notifier <- startNotifier paths.eventLogPath stateVar config

    let handle = FilesystemStoreHandle{TVar (StoreState FilesystemStore)
Notifier
FilesystemStoreConfig
config :: FilesystemStoreConfig
stateVar :: TVar (StoreState FilesystemStore)
notifier :: Notifier
config :: FilesystemStoreConfig
stateVar :: TVar (StoreState FilesystemStore)
notifier :: Notifier
..}

    -- Reload existing events if the log file has content
    -- If lock is held by another process, skip reload - the notifier will catch up
    reloadResult <- try @StoreException $ do
        logExists <- doesFileExist paths.eventLogPath
        when logExists $ do
            entries <- readLogEntries handle
            let completedEvents = Map UUID [StoredEvent] -> [[StoredEvent]]
forall k a. Map k a -> [a]
Map.elems (Map UUID [StoredEvent] -> [[StoredEvent]])
-> Map UUID [StoredEvent] -> [[StoredEvent]]
forall a b. (a -> b) -> a -> b
$ [EventLogEntry] -> Map UUID [StoredEvent]
processLogEntries [EventLogEntry]
entries
                maxSeqNo = [Integer] -> Integer
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum ([Integer] -> Integer) -> [Integer] -> Integer
forall a b. (a -> b) -> a -> b
$ Integer
0 Integer -> [Integer] -> [Integer]
forall a. a -> [a] -> [a]
: [StoredEvent
e.seqNo | [StoredEvent]
es <- [[StoredEvent]]
completedEvents, StoredEvent
e <- [StoredEvent]
es]

            when (not $ null completedEvents) $ atomically $ do
                -- Update store state with completed events
                modifyTVar' stateVar $ \StoreState FilesystemStore
state ->
                    ([StoredEvent]
 -> StoreState FilesystemStore -> StoreState FilesystemStore)
-> StoreState FilesystemStore
-> [[StoredEvent]]
-> StoreState FilesystemStore
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr
                        ((StoreState FilesystemStore
 -> [StoredEvent] -> StoreState FilesystemStore)
-> [StoredEvent]
-> StoreState FilesystemStore
-> StoreState FilesystemStore
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((StoredEvent
 -> StoreState FilesystemStore -> StoreState FilesystemStore)
-> StoreState FilesystemStore
-> [StoredEvent]
-> StoreState FilesystemStore
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr StoredEvent
-> StoreState FilesystemStore -> StoreState FilesystemStore
forall backend.
StoreCursor backend =>
StoredEvent -> StoreState backend -> StoreState backend
updateState))
                        StoreState FilesystemStore
state{nextSequence = maxSeqNo + 1}
                        [[StoredEvent]]
completedEvents

                -- Get the state to access its globalNotification TVar
                state <- readTVar stateVar
                -- Update the global notification to match the last event
                writeTVar state.globalNotification maxSeqNo

    case reloadResult of
        Right () -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        Left (LockTimeout FilePath
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- Skip reload, notifier will catch up later
        Left (CorruptEventLog FilePath
path FilePath
reason) -> StoreException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (StoreException -> IO ()) -> StoreException -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> FilePath -> StoreException
CorruptEventLog FilePath
path FilePath
reason -- Fatal error
    pure handle

{- | Decode log entries strictly, throwing CorruptEventLog on parse failures.

Empty lines are skipped (normal for newline-delimited JSON format).
Non-empty unparseable lines are FATAL and throw CorruptEventLog.

This ensures we fail fast on corruption rather than silently losing data.
-}
decodeLogEntriesStrict :: FilePath -> BL.ByteString -> IO [EventLogEntry]
decodeLogEntriesStrict :: FilePath -> ByteString -> IO [EventLogEntry]
decodeLogEntriesStrict FilePath
path ByteString
contents = do
    let lines' :: [ByteString]
lines' = Char -> ByteString -> [ByteString]
BL8.split Char
'\n' ByteString
contents
        nonEmptyLines :: [ByteString]
nonEmptyLines = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool) -> (ByteString -> Bool) -> ByteString -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Bool
BL.null) [ByteString]
lines'
    ((Integer, ByteString) -> IO EventLogEntry)
-> [(Integer, ByteString)] -> IO [EventLogEntry]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (Integer, ByteString) -> IO EventLogEntry
decodeOne ([Integer] -> [ByteString] -> [(Integer, ByteString)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Integer
1 ..] [ByteString]
nonEmptyLines)
  where
    decodeOne :: (Integer, BL.ByteString) -> IO EventLogEntry
    decodeOne :: (Integer, ByteString) -> IO EventLogEntry
decodeOne (Integer
lineNum, ByteString
line) =
        case ByteString -> Maybe EventLogEntry
forall a. FromJSON a => ByteString -> Maybe a
decode ByteString
line of
            Just EventLogEntry
entry -> EventLogEntry -> IO EventLogEntry
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure EventLogEntry
entry
            Maybe EventLogEntry
Nothing ->
                StoreException -> IO EventLogEntry
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (StoreException -> IO EventLogEntry)
-> StoreException -> IO EventLogEntry
forall a b. (a -> b) -> a -> b
$
                    FilePath -> FilePath -> StoreException
CorruptEventLog FilePath
path (FilePath -> StoreException) -> FilePath -> StoreException
forall a b. (a -> b) -> a -> b
$
                        FilePath
"Failed to parse JSON at line "
                            FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ Integer -> FilePath
forall a. Show a => a -> FilePath
show Integer
lineNum
                            FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ FilePath
": "
                            FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> ShowS
forall a. Int -> [a] -> [a]
take Int
100 (ByteString -> FilePath
BL8.unpack ByteString
line)

-- | Process log entries and return completed transactions
processLogEntries :: [EventLogEntry] -> Map UUID [StoredEvent]
processLogEntries :: [EventLogEntry] -> Map UUID [StoredEvent]
processLogEntries [EventLogEntry]
entries = [(UUID, [StoredEvent])] -> Map UUID [StoredEvent]
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(EventLogEntry
e.transactionId, EventLogEntry
e.events) | EventLogEntry
e <- [EventLogEntry]
entries]

{- | Read and parse all entries from the event log

Throws CorruptEventLog if any entry fails to parse.
This ensures corruption is detected immediately rather than silently lost.
-}
readLogEntries :: FilesystemStoreHandle -> IO [EventLogEntry]
readLogEntries :: FilesystemStoreHandle -> IO [EventLogEntry]
readLogEntries FilesystemStoreHandle
handle =
    FilesystemStoreHandle -> IO [EventLogEntry] -> IO [EventLogEntry]
forall a. FilesystemStoreHandle -> IO a -> IO a
withStoreLock FilesystemStoreHandle
handle (IO [EventLogEntry] -> IO [EventLogEntry])
-> IO [EventLogEntry] -> IO [EventLogEntry]
forall a b. (a -> b) -> a -> b
$ do
        let paths :: StorePaths
paths = FilePath -> StorePaths
getPaths FilesystemStoreHandle
handle.config.storePath
        contents <- FilePath -> IO ByteString
BL.readFile StorePaths
paths.eventLogPath
        decodeLogEntriesStrict paths.eventLogPath contents

{- | Start the file watcher notifier thread
Watches the event log for modifications and broadcasts to subscribers
Also starts the central reload thread that updates in-memory state
-}
startNotifier :: FilePath -> TVar (StoreState FilesystemStore) -> FilesystemStoreConfig -> IO Notifier
startNotifier :: FilePath
-> TVar (StoreState FilesystemStore)
-> FilesystemStoreConfig
-> IO Notifier
startNotifier FilePath
eventLogPath TVar (StoreState FilesystemStore)
stateVar FilesystemStoreConfig
config = do
    chan <- IO (TChan ())
forall a. IO (TChan a)
newBroadcastTChanIO

    -- File watcher thread - broadcasts when file changes
    notifyThread <- async $ notifierLoop eventLogPath chan

    -- Central reload thread - responds to broadcasts by reloading events
    -- This replaces per-subscription reload threads, reducing lock contention
    -- CRITICAL: Must handle LockTimeout exceptions and retry, otherwise
    -- subscriptions will block forever waiting for updates that never come
    reloadChan <- atomically $ dupTChan chan
    reloadThread <- async $ forever $ do
        atomically $ readTChan reloadChan
        reloadEventsFromDiskCentralWithRetry stateVar config 0

    pure $ Notifier notifyThread reloadThread

-- | Stop the notifier threads
shutdownNotifier :: Notifier -> IO ()
shutdownNotifier :: Notifier -> IO ()
shutdownNotifier Notifier
notifier = do
    Async () -> IO ()
forall a. Async a -> IO ()
cancel Notifier
notifier.notifierThread -- Stop file watcher
    Async () -> IO ()
forall a. Async a -> IO ()
cancel Notifier
notifier.reloadThread -- Stop reload thread

{- | Main loop for the notifier
Uses fsnotify to watch for file modifications
-}
notifierLoop :: FilePath -> TChan () -> IO ()
notifierLoop :: FilePath -> TChan () -> IO ()
notifierLoop FilePath
eventLogPath TChan ()
chan = do
    -- Canonicalize the path to handle symlinks (e.g., /tmp -> /private/tmp on macOS)
    canonicalEventLogPath <- FilePath -> IO FilePath
canonicalizePath FilePath
eventLogPath

    withManager $ \WatchManager
mgr -> do
        let watchDir' :: FilePath
watchDir' = ShowS
takeDirectory FilePath
canonicalEventLogPath
            predicate :: Event -> Bool
predicate Event
event = Event -> FilePath
eventPath Event
event FilePath -> FilePath -> Bool
forall a. Eq a => a -> a -> Bool
== FilePath
canonicalEventLogPath
            handler :: Event -> IO ()
handler Event
_event = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan () -> () -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan ()
chan ()

        -- Watch for modifications to the event log
        IO (IO ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ WatchManager
-> FilePath -> (Event -> Bool) -> (Event -> IO ()) -> IO (IO ())
watchDir WatchManager
mgr FilePath
watchDir' Event -> Bool
predicate Event -> IO ()
handler

        -- Keep thread alive
        IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
forall a. Bounded a => a
maxBound

{- | Retry wrapper for reloadEventsFromDiskCentral with exponential backoff

Handles two types of exceptions differently:

* LockTimeout: Retryable - uses exponential backoff (5 retries max)
* CorruptEventLog: FATAL - logs error and re-throws (no retry, no swallow)

Max retries for LockTimeout: 5, with exponential backoff: 10ms, 20ms, 40ms, 80ms, 160ms
-}
reloadEventsFromDiskCentralWithRetry :: TVar (StoreState FilesystemStore) -> FilesystemStoreConfig -> Int -> IO ()
reloadEventsFromDiskCentralWithRetry :: TVar (StoreState FilesystemStore)
-> FilesystemStoreConfig -> Int -> IO ()
reloadEventsFromDiskCentralWithRetry TVar (StoreState FilesystemStore)
stateVar FilesystemStoreConfig
config Int
retryCount = do
    result <- IO () -> IO (Either StoreException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (IO () -> IO (Either StoreException ()))
-> IO () -> IO (Either StoreException ())
forall a b. (a -> b) -> a -> b
$ TVar (StoreState FilesystemStore) -> FilesystemStoreConfig -> IO ()
reloadEventsFromDiskCentral TVar (StoreState FilesystemStore)
stateVar FilesystemStoreConfig
config
    case result of
        Right () -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- Success, done
        Left (LockTimeout FilePath
path) -> do
            if Int
retryCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
5
                then do
                    -- Max retries exceeded - log warning and give up on this reload
                    -- This is NOT fatal - the next file change will trigger another reload
                    FilePath -> IO ()
putStrLn (FilePath -> IO ()) -> FilePath -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath
"WARNING: Failed to reload events after 5 retries due to lock contention on " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ FilePath
path
                    () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                else do
                    -- Exponential backoff: 10ms * 2^retryCount
                    let delayMicros :: Int
delayMicros = Int
10000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* (Int
2 Int -> Int -> Int
forall a b. (Num a, Integral b) => a -> b -> a
^ Int
retryCount)
                    Int -> IO ()
threadDelay Int
delayMicros
                    TVar (StoreState FilesystemStore)
-> FilesystemStoreConfig -> Int -> IO ()
reloadEventsFromDiskCentralWithRetry TVar (StoreState FilesystemStore)
stateVar FilesystemStoreConfig
config (Int
retryCount Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
        Left (CorruptEventLog FilePath
path FilePath
reason) -> do
            -- FATAL ERROR: Event log corruption indicates either a serious bug or external tampering
            -- DO NOT retry, DO NOT swallow - this requires operator intervention
            FilePath -> IO ()
putStrLn (FilePath -> IO ()) -> FilePath -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath
"FATAL: Event log corrupted at " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ FilePath
path
            FilePath -> IO ()
putStrLn (FilePath -> IO ()) -> FilePath -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath
"Reason: " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ FilePath
reason
            FilePath -> IO ()
putStrLn FilePath
"This indicates either a serious bug in the event store or external process tampering."
            FilePath -> IO ()
putStrLn FilePath
"The reload thread will now terminate. Manual intervention is required."
            StoreException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (StoreException -> IO ()) -> StoreException -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> FilePath -> StoreException
CorruptEventLog FilePath
path FilePath
reason

{- | Reload new events from disk into the in-memory store state
Called by the notifier's reload thread when file changes are detected
Uses locking to coordinate with writes and avoid "resource busy" errors
Uses STRICT ByteString to ensure file handle is closed immediately

Throws CorruptEventLog if parsing fails - this is FATAL and will terminate the reload thread.
-}
reloadEventsFromDiskCentral :: TVar (StoreState FilesystemStore) -> FilesystemStoreConfig -> IO ()
reloadEventsFromDiskCentral :: TVar (StoreState FilesystemStore) -> FilesystemStoreConfig -> IO ()
reloadEventsFromDiskCentral TVar (StoreState FilesystemStore)
stateVar FilesystemStoreConfig
config = do
    -- Acquire lock to coordinate with writes and avoid concurrent file access issues
    -- Use strict ByteString to ensure file is closed immediately (no lazy handle leak)
    entries <- FilesystemStoreConfig -> IO [EventLogEntry] -> IO [EventLogEntry]
forall a. FilesystemStoreConfig -> IO a -> IO a
withStoreLockDirect FilesystemStoreConfig
config (IO [EventLogEntry] -> IO [EventLogEntry])
-> IO [EventLogEntry] -> IO [EventLogEntry]
forall a b. (a -> b) -> a -> b
$ do
        let paths :: StorePaths
paths = FilePath -> StorePaths
getPaths FilesystemStoreConfig
config.storePath
        contents <- FilePath -> IO ByteString
BS.readFile StorePaths
paths.eventLogPath
        -- Convert strict ByteString to lazy for decoding
        decodeLogEntriesStrict paths.eventLogPath (BL.fromStrict contents)

    let completedEvents = Map UUID [StoredEvent] -> [[StoredEvent]]
forall k a. Map k a -> [a]
Map.elems (Map UUID [StoredEvent] -> [[StoredEvent]])
-> Map UUID [StoredEvent] -> [[StoredEvent]]
forall a b. (a -> b) -> a -> b
$ [EventLogEntry] -> Map UUID [StoredEvent]
processLogEntries [EventLogEntry]
entries

    atomically $ do
        state <- readTVar stateVar
        let currentMaxSeq = StoreState FilesystemStore
state.nextSequence Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
1
            newEvents = [StoredEvent
e | [StoredEvent]
es <- [[StoredEvent]]
completedEvents, StoredEvent
e <- [StoredEvent]
es, StoredEvent
e.seqNo Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
> Integer
currentMaxSeq]
            newMaxSeq = [Integer] -> Integer
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum ([Integer] -> Integer) -> [Integer] -> Integer
forall a b. (a -> b) -> a -> b
$ Integer
currentMaxSeq Integer -> [Integer] -> [Integer]
forall a. a -> [a] -> [a]
: [StoredEvent
e.seqNo | StoredEvent
e <- [StoredEvent]
newEvents]

        when (not $ null newEvents) $ do
            -- Update state with new events
            let newState = (StoredEvent
 -> StoreState FilesystemStore -> StoreState FilesystemStore)
-> StoreState FilesystemStore
-> [StoredEvent]
-> StoreState FilesystemStore
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr StoredEvent
-> StoreState FilesystemStore -> StoreState FilesystemStore
forall backend.
StoreCursor backend =>
StoredEvent -> StoreState backend -> StoreState backend
updateState StoreState FilesystemStore
state [StoredEvent]
newEvents
            writeTVar stateVar newState{nextSequence = newMaxSeq + 1}

            -- Update global notification
            writeTVar newState.globalNotification newMaxSeq

{- | Subscribe to events from the filesystem store
The notifier's reload thread keeps the in-memory state updated,
so subscriptions just read from the shared state variable
-}
subscribeFilesystem ::
    forall m ts.
    (MonadUnliftIO m) =>
    FilesystemStoreHandle ->
    EventMatcher ts FilesystemStore m ->
    EventSelector FilesystemStore ->
    m (SubscriptionHandle FilesystemStore)
subscribeFilesystem :: forall (m :: * -> *) (ts :: [Symbol]).
MonadUnliftIO m =>
FilesystemStoreHandle
-> EventMatcher ts FilesystemStore m
-> EventSelector FilesystemStore
-> m (SubscriptionHandle FilesystemStore)
subscribeFilesystem FilesystemStoreHandle
handle EventMatcher ts FilesystemStore m
matcher EventSelector FilesystemStore
selector =
    -- Subscribe directly to in-memory state
    -- The notifier's reload thread updates stateVar when files change
    TVar (StoreState FilesystemStore)
-> EventMatcher ts FilesystemStore m
-> EventSelector FilesystemStore
-> m (SubscriptionHandle FilesystemStore)
forall (m :: * -> *) backend (ts :: [Symbol]).
(MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
TVar (StoreState backend)
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribeToEvents FilesystemStoreHandle
handle.stateVar EventMatcher ts FilesystemStore m
matcher EventSelector FilesystemStore
selector

{- | Cleanup store resources and shut down background threads.

Stops file watchers and removes the lock file. Call this before
application shutdown to ensure clean termination.
-}
cleanupFilesystemStore ::
    -- | Store handle to clean up
    FilesystemStoreHandle ->
    IO ()
cleanupFilesystemStore :: FilesystemStoreHandle -> IO ()
cleanupFilesystemStore FilesystemStoreHandle
handle = do
    let paths :: StorePaths
paths = FilePath -> StorePaths
getPaths FilesystemStoreHandle
handle.config.storePath
    -- Shutdown the notifier
    Notifier -> IO ()
shutdownNotifier FilesystemStoreHandle
handle.notifier
    -- Remove lock file if it exists
    FilePath -> IO Bool
doesFileExist StorePaths
paths.storeLockPath IO Bool -> (Bool -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Bool -> IO () -> IO ()) -> IO () -> Bool -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (FilePath -> IO ()
removeFile StorePaths
paths.storeLockPath)

instance EventStore FilesystemStore where
    type StoreConstraints FilesystemStore m = (MonadUnliftIO m)

    insertEvents :: forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints FilesystemStore m) =>
BackendHandle FilesystemStore
-> Maybe CorrelationId
-> Transaction t FilesystemStore
-> m (InsertionResult FilesystemStore)
insertEvents BackendHandle FilesystemStore
handle Maybe CorrelationId
corrId (Transaction Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
batches) = IO (InsertionResult FilesystemStore)
-> m (InsertionResult FilesystemStore)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (InsertionResult FilesystemStore)
 -> m (InsertionResult FilesystemStore))
-> IO (InsertionResult FilesystemStore)
-> m (InsertionResult FilesystemStore)
forall a b. (a -> b) -> a -> b
$ do
        txId <- IO UUID
UUID.nextRandom
        now <- getCurrentTime

        -- All critical operations happen inside the file lock to prevent cross-instance races
        result <- try $ withStoreLock handle $ do
            -- Phase 1: Read ONLY max sequence number from disk (source of truth for cross-instance)
            -- We just need the last transaction entry, which contains the highest seqNos
            -- ASSUMPTION: Each log entry is exactly one line (Aeson's encode produces compact JSON
            -- without newlines; any newlines in event data are escaped as \n per JSON spec)
            diskMaxSeq <- do
                let paths = FilePath -> StorePaths
getPaths BackendHandle FilesystemStore
FilesystemStoreHandle
handle.config.storePath
                diskContents <- BS.readFile paths.eventLogPath
                let lastLine = [ByteString] -> Maybe ByteString
forall a. [a] -> Maybe a
listToMaybe ([ByteString] -> Maybe ByteString)
-> [ByteString] -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> [ByteString]
forall a. [a] -> [a]
reverse ([ByteString] -> [ByteString]) -> [ByteString] -> [ByteString]
forall a b. (a -> b) -> a -> b
$ (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool) -> (ByteString -> Bool) -> ByteString -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Bool
BS.null) ([ByteString] -> [ByteString]) -> [ByteString] -> [ByteString]
forall a b. (a -> b) -> a -> b
$ Char -> ByteString -> [ByteString]
BS8.split Char
'\n' ByteString
diskContents
                    lastEntry :: Maybe EventLogEntry
                    lastEntry = Maybe ByteString
lastLine Maybe ByteString
-> (ByteString -> Maybe EventLogEntry) -> Maybe EventLogEntry
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ByteString -> Maybe EventLogEntry
forall a. FromJSON a => ByteString -> Maybe a
decode (ByteString -> Maybe EventLogEntry)
-> (ByteString -> ByteString) -> ByteString -> Maybe EventLogEntry
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BL.fromStrict
                    lastSeqNos = [Integer]
-> (EventLogEntry -> [Integer]) -> Maybe EventLogEntry -> [Integer]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] (\EventLogEntry
entry -> (StoredEvent -> Integer) -> [StoredEvent] -> [Integer]
forall a b. (a -> b) -> [a] -> [b]
map StoredEvent -> Integer
seqNo EventLogEntry
entry.events) Maybe EventLogEntry
lastEntry
                pure $ maximum $ (-1) : lastSeqNos

            -- Phase 2: Check versions and generate sequence numbers based on DISK max
            versionCheckResult <- atomically $ do
                state <- readTVar handle.stateVar
                case checkAllVersions state batches of
                    Left EventStoreError FilesystemStore
err -> Either
  (EventStoreError FilesystemStore)
  (StoreState FilesystemStore, FilesystemCursor,
   Map StreamId FilesystemCursor, [StoredEvent])
-> STM
     (Either
        (EventStoreError FilesystemStore)
        (StoreState FilesystemStore, FilesystemCursor,
         Map StreamId FilesystemCursor, [StoredEvent]))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either
   (EventStoreError FilesystemStore)
   (StoreState FilesystemStore, FilesystemCursor,
    Map StreamId FilesystemCursor, [StoredEvent])
 -> STM
      (Either
         (EventStoreError FilesystemStore)
         (StoreState FilesystemStore, FilesystemCursor,
          Map StreamId FilesystemCursor, [StoredEvent])))
-> Either
     (EventStoreError FilesystemStore)
     (StoreState FilesystemStore, FilesystemCursor,
      Map StreamId FilesystemCursor, [StoredEvent])
-> STM
     (Either
        (EventStoreError FilesystemStore)
        (StoreState FilesystemStore, FilesystemCursor,
         Map StreamId FilesystemCursor, [StoredEvent]))
forall a b. (a -> b) -> a -> b
$ EventStoreError FilesystemStore
-> Either
     (EventStoreError FilesystemStore)
     (StoreState FilesystemStore, FilesystemCursor,
      Map StreamId FilesystemCursor, [StoredEvent])
forall a b. a -> Either a b
Left EventStoreError FilesystemStore
err
                    Right () -> do
                        -- Use max(STM nextSeq, disk maxSeq + 1) to handle both fresh state and stale STM
                        let actualNextSeq :: Integer
actualNextSeq = Integer -> Integer -> Integer
forall a. Ord a => a -> a -> a
max StoreState FilesystemStore
state.nextSequence (Integer
diskMaxSeq Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
1)
                            stateWithCorrectSeq :: StoreState FilesystemStore
stateWithCorrectSeq = StoreState FilesystemStore
state{nextSequence = actualNextSeq}
                            eventIds :: [EventId]
eventIds = Int -> EventId -> [EventId]
forall a. Int -> a -> [a]
replicate ([Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int] -> Int) -> [Int] -> Int
forall a b. (a -> b) -> a -> b
$ (StreamWrite t SomeLatestEvent FilesystemStore -> Int)
-> [StreamWrite t SomeLatestEvent FilesystemStore] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (t SomeLatestEvent -> Int
forall a. t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (t SomeLatestEvent -> Int)
-> (StreamWrite t SomeLatestEvent FilesystemStore
    -> t SomeLatestEvent)
-> StreamWrite t SomeLatestEvent FilesystemStore
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (.events)) ([StreamWrite t SomeLatestEvent FilesystemStore] -> [Int])
-> [StreamWrite t SomeLatestEvent FilesystemStore] -> [Int]
forall a b. (a -> b) -> a -> b
$ Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
-> [StreamWrite t SomeLatestEvent FilesystemStore]
forall k a. Map k a -> [a]
Map.elems Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
batches) (UUID -> EventId
EventId UUID
txId)
                            (StoreState FilesystemStore
newState, Cursor FilesystemStore
finalCursor, Map StreamId (Cursor FilesystemStore)
streamCursors) = StoreState FilesystemStore
-> Maybe CorrelationId
-> UTCTime
-> [EventId]
-> Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
-> (StoreState FilesystemStore, Cursor FilesystemStore,
    Map StreamId (Cursor FilesystemStore))
forall backend (t :: * -> *).
(StoreCursor backend, Foldable t) =>
StoreState backend
-> Maybe CorrelationId
-> UTCTime
-> [EventId]
-> Map StreamId (StreamWrite t SomeLatestEvent backend)
-> (StoreState backend, Cursor backend,
    Map StreamId (Cursor backend))
insertAllEvents StoreState FilesystemStore
stateWithCorrectSeq Maybe CorrelationId
corrId UTCTime
now [EventId]
eventIds Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
batches
                            newEvents :: [StoredEvent]
newEvents =
                                [ StoredEvent
event
                                | StoredEvent
event <- Map Integer StoredEvent -> [StoredEvent]
forall k a. Map k a -> [a]
Map.elems StoreState FilesystemStore
newState.events
                                , Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Integer -> Map Integer StoredEvent -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member StoredEvent
event.seqNo StoreState FilesystemStore
stateWithCorrectSeq.events
                                ]
                        Either
  (EventStoreError FilesystemStore)
  (StoreState FilesystemStore, FilesystemCursor,
   Map StreamId FilesystemCursor, [StoredEvent])
-> STM
     (Either
        (EventStoreError FilesystemStore)
        (StoreState FilesystemStore, FilesystemCursor,
         Map StreamId FilesystemCursor, [StoredEvent]))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either
   (EventStoreError FilesystemStore)
   (StoreState FilesystemStore, FilesystemCursor,
    Map StreamId FilesystemCursor, [StoredEvent])
 -> STM
      (Either
         (EventStoreError FilesystemStore)
         (StoreState FilesystemStore, FilesystemCursor,
          Map StreamId FilesystemCursor, [StoredEvent])))
-> Either
     (EventStoreError FilesystemStore)
     (StoreState FilesystemStore, FilesystemCursor,
      Map StreamId FilesystemCursor, [StoredEvent])
-> STM
     (Either
        (EventStoreError FilesystemStore)
        (StoreState FilesystemStore, FilesystemCursor,
         Map StreamId FilesystemCursor, [StoredEvent]))
forall a b. (a -> b) -> a -> b
$ (StoreState FilesystemStore, FilesystemCursor,
 Map StreamId FilesystemCursor, [StoredEvent])
-> Either
     (EventStoreError FilesystemStore)
     (StoreState FilesystemStore, FilesystemCursor,
      Map StreamId FilesystemCursor, [StoredEvent])
forall a b. b -> Either a b
Right (StoreState FilesystemStore
newState, Cursor FilesystemStore
FilesystemCursor
finalCursor, Map StreamId (Cursor FilesystemStore)
Map StreamId FilesystemCursor
streamCursors, [StoredEvent]
newEvents)

            case versionCheckResult of
                Left EventStoreError FilesystemStore
err -> Either
  (EventStoreError FilesystemStore)
  (FilesystemCursor, Map StreamId FilesystemCursor)
-> IO
     (Either
        (EventStoreError FilesystemStore)
        (FilesystemCursor, Map StreamId FilesystemCursor))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either
   (EventStoreError FilesystemStore)
   (FilesystemCursor, Map StreamId FilesystemCursor)
 -> IO
      (Either
         (EventStoreError FilesystemStore)
         (FilesystemCursor, Map StreamId FilesystemCursor)))
-> Either
     (EventStoreError FilesystemStore)
     (FilesystemCursor, Map StreamId FilesystemCursor)
-> IO
     (Either
        (EventStoreError FilesystemStore)
        (FilesystemCursor, Map StreamId FilesystemCursor))
forall a b. (a -> b) -> a -> b
$ EventStoreError FilesystemStore
-> Either
     (EventStoreError FilesystemStore)
     (FilesystemCursor, Map StreamId FilesystemCursor)
forall a b. a -> Either a b
Left EventStoreError FilesystemStore
err
                Right (StoreState FilesystemStore
newState, FilesystemCursor
finalCursor, Map StreamId FilesystemCursor
streamCursors, [StoredEvent]
newEvents) -> do
                    -- Phase 2: Write to disk
                    let paths :: StorePaths
paths = FilePath -> StorePaths
getPaths BackendHandle FilesystemStore
FilesystemStoreHandle
handle.config.storePath
                        jsonLine :: ByteString
jsonLine = ByteString -> ByteString
BL.toStrict (EventLogEntry -> ByteString
forall a. ToJSON a => a -> ByteString
encode (EventLogEntry -> ByteString) -> EventLogEntry -> ByteString
forall a b. (a -> b) -> a -> b
$ UUID -> [StoredEvent] -> UTCTime -> EventLogEntry
EventLogEntry UUID
txId [StoredEvent]
newEvents UTCTime
now) ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\n"
                    FilePath -> ByteString -> IO ()
BS.appendFile StorePaths
paths.eventLogPath ByteString
jsonLine

                    -- Phase 3: Update STM state (still inside lock!)
                    IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                        TVar (StoreState FilesystemStore)
-> StoreState FilesystemStore -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar BackendHandle FilesystemStore
FilesystemStoreHandle
handle.stateVar StoreState FilesystemStore
newState

                        -- Update notifications
                        [StreamId] -> (StreamId -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
-> [StreamId]
forall k a. Map k a -> [k]
Map.keys Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
batches) ((StreamId -> STM ()) -> STM ()) -> (StreamId -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \StreamId
streamId ->
                            Maybe (TVar Integer) -> (TVar Integer -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (StreamId -> Map StreamId (TVar Integer) -> Maybe (TVar Integer)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamId StoreState FilesystemStore
newState.streamNotifications) ((TVar Integer -> STM ()) -> STM ())
-> (TVar Integer -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \TVar Integer
var ->
                                TVar Integer -> Integer -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Integer
var (FilesystemCursor -> Integer
getSequenceNo FilesystemCursor
finalCursor)

                        TVar Integer -> Integer -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar StoreState FilesystemStore
newState.globalNotification (FilesystemCursor -> Integer
getSequenceNo FilesystemCursor
finalCursor)

                    Either
  (EventStoreError FilesystemStore)
  (FilesystemCursor, Map StreamId FilesystemCursor)
-> IO
     (Either
        (EventStoreError FilesystemStore)
        (FilesystemCursor, Map StreamId FilesystemCursor))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either
   (EventStoreError FilesystemStore)
   (FilesystemCursor, Map StreamId FilesystemCursor)
 -> IO
      (Either
         (EventStoreError FilesystemStore)
         (FilesystemCursor, Map StreamId FilesystemCursor)))
-> Either
     (EventStoreError FilesystemStore)
     (FilesystemCursor, Map StreamId FilesystemCursor)
-> IO
     (Either
        (EventStoreError FilesystemStore)
        (FilesystemCursor, Map StreamId FilesystemCursor))
forall a b. (a -> b) -> a -> b
$ (FilesystemCursor, Map StreamId FilesystemCursor)
-> Either
     (EventStoreError FilesystemStore)
     (FilesystemCursor, Map StreamId FilesystemCursor)
forall a b. b -> Either a b
Right (FilesystemCursor
finalCursor, Map StreamId FilesystemCursor
streamCursors)

        case result of
            Left (SomeException
e :: SomeException) ->
                -- Lock timeout, disk write, or STM update failed
                InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult FilesystemStore
 -> IO (InsertionResult FilesystemStore))
-> InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore)
forall a b. (a -> b) -> a -> b
$
                    EventStoreError FilesystemStore -> InsertionResult FilesystemStore
forall backend. EventStoreError backend -> InsertionResult backend
FailedInsertion (EventStoreError FilesystemStore
 -> InsertionResult FilesystemStore)
-> EventStoreError FilesystemStore
-> InsertionResult FilesystemStore
forall a b. (a -> b) -> a -> b
$
                        ErrorInfo -> EventStoreError FilesystemStore
forall backend. ErrorInfo -> EventStoreError backend
BackendError (ErrorInfo -> EventStoreError FilesystemStore)
-> ErrorInfo -> EventStoreError FilesystemStore
forall a b. (a -> b) -> a -> b
$
                            ErrorInfo
                                { errorMessage :: Text
errorMessage = FilePath -> Text
pack (FilePath -> Text) -> FilePath -> Text
forall a b. (a -> b) -> a -> b
$ FilePath
"Failed to persist events: " FilePath -> ShowS
forall a. Semigroup a => a -> a -> a
<> SomeException -> FilePath
forall e. Exception e => e -> FilePath
displayException SomeException
e
                                , exception :: Maybe SomeException
exception = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e
                                }
            Right (Left EventStoreError FilesystemStore
err) ->
                -- Version check failed
                InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult FilesystemStore
 -> IO (InsertionResult FilesystemStore))
-> InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore)
forall a b. (a -> b) -> a -> b
$ EventStoreError FilesystemStore -> InsertionResult FilesystemStore
forall backend. EventStoreError backend -> InsertionResult backend
FailedInsertion EventStoreError FilesystemStore
err
            Right (Right (FilesystemCursor
finalCursor, Map StreamId FilesystemCursor
streamCursors)) ->
                -- Success: both disk and STM updated atomically under lock
                InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult FilesystemStore
 -> IO (InsertionResult FilesystemStore))
-> InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore)
forall a b. (a -> b) -> a -> b
$
                    InsertionSuccess FilesystemStore -> InsertionResult FilesystemStore
forall backend. InsertionSuccess backend -> InsertionResult backend
SuccessfulInsertion (InsertionSuccess FilesystemStore
 -> InsertionResult FilesystemStore)
-> InsertionSuccess FilesystemStore
-> InsertionResult FilesystemStore
forall a b. (a -> b) -> a -> b
$
                        InsertionSuccess
                            { finalCursor :: Cursor FilesystemStore
finalCursor = Cursor FilesystemStore
FilesystemCursor
finalCursor
                            , streamCursors :: Map StreamId (Cursor FilesystemStore)
streamCursors = Map StreamId (Cursor FilesystemStore)
Map StreamId FilesystemCursor
streamCursors
                            }

    subscribe :: forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints FilesystemStore m =>
BackendHandle FilesystemStore
-> EventMatcher ts FilesystemStore m
-> EventSelector FilesystemStore
-> m (SubscriptionHandle FilesystemStore)
subscribe = BackendHandle FilesystemStore
-> EventMatcher ts FilesystemStore m
-> EventSelector FilesystemStore
-> m (SubscriptionHandle FilesystemStore)
FilesystemStoreHandle
-> EventMatcher ts FilesystemStore m
-> EventSelector FilesystemStore
-> m (SubscriptionHandle FilesystemStore)
forall (m :: * -> *) (ts :: [Symbol]).
MonadUnliftIO m =>
FilesystemStoreHandle
-> EventMatcher ts FilesystemStore m
-> EventSelector FilesystemStore
-> m (SubscriptionHandle FilesystemStore)
subscribeFilesystem

instance StoreCursor FilesystemStore where
    makeCursor :: Integer -> Cursor FilesystemStore
makeCursor = Integer -> Cursor FilesystemStore
Integer -> FilesystemCursor
FilesystemCursor
    makeSequenceNo :: Cursor FilesystemStore -> Integer
makeSequenceNo = (.getSequenceNo)