{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Hindsight.Store.Filesystem (
    
    FilesystemStore,
    FilesystemStoreHandle,
    FilesystemCursor (..),
    
    FilesystemStoreConfig (..),
    mkDefaultConfig,
    getStoreConfig,
    
    newFilesystemStore,
    cleanupFilesystemStore,
    
    StoreException (..),
    
    EventLogEntry (..),
    StorePaths (..),
    getPaths,
)
where
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (Async, async, cancel)
import Control.Concurrent.STM (
    TChan,
    TVar,
    atomically,
    dupTChan,
    modifyTVar',
    newBroadcastTChanIO,
    newTVarIO,
    readTChan,
    readTVar,
    writeTChan,
    writeTVar,
 )
import Control.Exception (Exception, SomeException, bracket, displayException, throwIO, try)
import Control.Monad (forM_, forever, void, when)
import Control.Monad.IO.Class (liftIO)
import Data.Aeson (FromJSON, ToJSON (..), decode, encode)
import Data.ByteString qualified as BS 
import Data.ByteString.Char8 qualified as BS8
import Data.ByteString.Lazy qualified as BL
import Data.ByteString.Lazy.Char8 qualified as BL8
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (listToMaybe)
import Data.Text (pack)
import Data.Time (UTCTime, getCurrentTime)
import Data.UUID (UUID)
import Data.UUID.V4 qualified as UUID
import UnliftIO (MonadUnliftIO)
import GHC.Generics (Generic)
import Hindsight.Store (
    BackendHandle,
    Cursor,
    ErrorInfo (..),
    EventId (EventId),
    EventMatcher,
    EventSelector (..),
    EventStore (..),
    EventStoreError (BackendError),
    InsertionResult (FailedInsertion, SuccessfulInsertion),
    InsertionSuccess (..),
    StreamWrite (events),
    SubscriptionHandle (..),
    Transaction (..),
 )
import Hindsight.Store.Memory.Internal (
    StoreCursor (..),
    StoreState (..),
    StoredEvent (seqNo),
    checkAllVersions,
    insertAllEvents,
    subscribeToEvents,
    updateState,
 )
import System.Directory (canonicalizePath, createDirectoryIfMissing, doesFileExist, removeFile)
import System.FSNotify (Event (..), eventPath, watchDir, withManager)
import System.FileLock qualified as FL
import System.FilePath (takeDirectory, (</>))
import System.Timeout (timeout)
data FilesystemStoreConfig = FilesystemStoreConfig
    { FilesystemStoreConfig -> FilePath
storePath :: FilePath
    
    , FilesystemStoreConfig -> Int
syncInterval :: Int
    
    , FilesystemStoreConfig -> Int
lockTimeout :: Int
    
    }
    deriving (Int -> FilesystemStoreConfig -> ShowS
[FilesystemStoreConfig] -> ShowS
FilesystemStoreConfig -> FilePath
(Int -> FilesystemStoreConfig -> ShowS)
-> (FilesystemStoreConfig -> FilePath)
-> ([FilesystemStoreConfig] -> ShowS)
-> Show FilesystemStoreConfig
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> FilesystemStoreConfig -> ShowS
showsPrec :: Int -> FilesystemStoreConfig -> ShowS
$cshow :: FilesystemStoreConfig -> FilePath
show :: FilesystemStoreConfig -> FilePath
$cshowList :: [FilesystemStoreConfig] -> ShowS
showList :: [FilesystemStoreConfig] -> ShowS
Show, FilesystemStoreConfig -> FilesystemStoreConfig -> Bool
(FilesystemStoreConfig -> FilesystemStoreConfig -> Bool)
-> (FilesystemStoreConfig -> FilesystemStoreConfig -> Bool)
-> Eq FilesystemStoreConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: FilesystemStoreConfig -> FilesystemStoreConfig -> Bool
== :: FilesystemStoreConfig -> FilesystemStoreConfig -> Bool
$c/= :: FilesystemStoreConfig -> FilesystemStoreConfig -> Bool
/= :: FilesystemStoreConfig -> FilesystemStoreConfig -> Bool
Eq, (forall x. FilesystemStoreConfig -> Rep FilesystemStoreConfig x)
-> (forall x. Rep FilesystemStoreConfig x -> FilesystemStoreConfig)
-> Generic FilesystemStoreConfig
forall x. Rep FilesystemStoreConfig x -> FilesystemStoreConfig
forall x. FilesystemStoreConfig -> Rep FilesystemStoreConfig x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. FilesystemStoreConfig -> Rep FilesystemStoreConfig x
from :: forall x. FilesystemStoreConfig -> Rep FilesystemStoreConfig x
$cto :: forall x. Rep FilesystemStoreConfig x -> FilesystemStoreConfig
to :: forall x. Rep FilesystemStoreConfig x -> FilesystemStoreConfig
Generic)
    deriving anyclass (Maybe FilesystemStoreConfig
Value -> Parser [FilesystemStoreConfig]
Value -> Parser FilesystemStoreConfig
(Value -> Parser FilesystemStoreConfig)
-> (Value -> Parser [FilesystemStoreConfig])
-> Maybe FilesystemStoreConfig
-> FromJSON FilesystemStoreConfig
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser FilesystemStoreConfig
parseJSON :: Value -> Parser FilesystemStoreConfig
$cparseJSONList :: Value -> Parser [FilesystemStoreConfig]
parseJSONList :: Value -> Parser [FilesystemStoreConfig]
$comittedField :: Maybe FilesystemStoreConfig
omittedField :: Maybe FilesystemStoreConfig
FromJSON, [FilesystemStoreConfig] -> Value
[FilesystemStoreConfig] -> Encoding
FilesystemStoreConfig -> Bool
FilesystemStoreConfig -> Value
FilesystemStoreConfig -> Encoding
(FilesystemStoreConfig -> Value)
-> (FilesystemStoreConfig -> Encoding)
-> ([FilesystemStoreConfig] -> Value)
-> ([FilesystemStoreConfig] -> Encoding)
-> (FilesystemStoreConfig -> Bool)
-> ToJSON FilesystemStoreConfig
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: FilesystemStoreConfig -> Value
toJSON :: FilesystemStoreConfig -> Value
$ctoEncoding :: FilesystemStoreConfig -> Encoding
toEncoding :: FilesystemStoreConfig -> Encoding
$ctoJSONList :: [FilesystemStoreConfig] -> Value
toJSONList :: [FilesystemStoreConfig] -> Value
$ctoEncodingList :: [FilesystemStoreConfig] -> Encoding
toEncodingList :: [FilesystemStoreConfig] -> Encoding
$comitField :: FilesystemStoreConfig -> Bool
omitField :: FilesystemStoreConfig -> Bool
ToJSON)
data EventLogEntry = EventLogEntry
    { EventLogEntry -> UUID
transactionId :: UUID
    
    , EventLogEntry -> [StoredEvent]
events :: [StoredEvent]
    
    , EventLogEntry -> UTCTime
timestamp :: UTCTime
    
    }
    deriving stock (Int -> EventLogEntry -> ShowS
[EventLogEntry] -> ShowS
EventLogEntry -> FilePath
(Int -> EventLogEntry -> ShowS)
-> (EventLogEntry -> FilePath)
-> ([EventLogEntry] -> ShowS)
-> Show EventLogEntry
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> EventLogEntry -> ShowS
showsPrec :: Int -> EventLogEntry -> ShowS
$cshow :: EventLogEntry -> FilePath
show :: EventLogEntry -> FilePath
$cshowList :: [EventLogEntry] -> ShowS
showList :: [EventLogEntry] -> ShowS
Show, EventLogEntry -> EventLogEntry -> Bool
(EventLogEntry -> EventLogEntry -> Bool)
-> (EventLogEntry -> EventLogEntry -> Bool) -> Eq EventLogEntry
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: EventLogEntry -> EventLogEntry -> Bool
== :: EventLogEntry -> EventLogEntry -> Bool
$c/= :: EventLogEntry -> EventLogEntry -> Bool
/= :: EventLogEntry -> EventLogEntry -> Bool
Eq, (forall x. EventLogEntry -> Rep EventLogEntry x)
-> (forall x. Rep EventLogEntry x -> EventLogEntry)
-> Generic EventLogEntry
forall x. Rep EventLogEntry x -> EventLogEntry
forall x. EventLogEntry -> Rep EventLogEntry x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. EventLogEntry -> Rep EventLogEntry x
from :: forall x. EventLogEntry -> Rep EventLogEntry x
$cto :: forall x. Rep EventLogEntry x -> EventLogEntry
to :: forall x. Rep EventLogEntry x -> EventLogEntry
Generic)
    deriving anyclass (Maybe EventLogEntry
Value -> Parser [EventLogEntry]
Value -> Parser EventLogEntry
(Value -> Parser EventLogEntry)
-> (Value -> Parser [EventLogEntry])
-> Maybe EventLogEntry
-> FromJSON EventLogEntry
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser EventLogEntry
parseJSON :: Value -> Parser EventLogEntry
$cparseJSONList :: Value -> Parser [EventLogEntry]
parseJSONList :: Value -> Parser [EventLogEntry]
$comittedField :: Maybe EventLogEntry
omittedField :: Maybe EventLogEntry
FromJSON, [EventLogEntry] -> Value
[EventLogEntry] -> Encoding
EventLogEntry -> Bool
EventLogEntry -> Value
EventLogEntry -> Encoding
(EventLogEntry -> Value)
-> (EventLogEntry -> Encoding)
-> ([EventLogEntry] -> Value)
-> ([EventLogEntry] -> Encoding)
-> (EventLogEntry -> Bool)
-> ToJSON EventLogEntry
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: EventLogEntry -> Value
toJSON :: EventLogEntry -> Value
$ctoEncoding :: EventLogEntry -> Encoding
toEncoding :: EventLogEntry -> Encoding
$ctoJSONList :: [EventLogEntry] -> Value
toJSONList :: [EventLogEntry] -> Value
$ctoEncodingList :: [EventLogEntry] -> Encoding
toEncodingList :: [EventLogEntry] -> Encoding
$comitField :: EventLogEntry -> Bool
omitField :: EventLogEntry -> Bool
ToJSON)
newtype FilesystemCursor = FilesystemCursor
    { FilesystemCursor -> Integer
getSequenceNo :: Integer
    
    }
    deriving (Int -> FilesystemCursor -> ShowS
[FilesystemCursor] -> ShowS
FilesystemCursor -> FilePath
(Int -> FilesystemCursor -> ShowS)
-> (FilesystemCursor -> FilePath)
-> ([FilesystemCursor] -> ShowS)
-> Show FilesystemCursor
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> FilesystemCursor -> ShowS
showsPrec :: Int -> FilesystemCursor -> ShowS
$cshow :: FilesystemCursor -> FilePath
show :: FilesystemCursor -> FilePath
$cshowList :: [FilesystemCursor] -> ShowS
showList :: [FilesystemCursor] -> ShowS
Show, FilesystemCursor -> FilesystemCursor -> Bool
(FilesystemCursor -> FilesystemCursor -> Bool)
-> (FilesystemCursor -> FilesystemCursor -> Bool)
-> Eq FilesystemCursor
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: FilesystemCursor -> FilesystemCursor -> Bool
== :: FilesystemCursor -> FilesystemCursor -> Bool
$c/= :: FilesystemCursor -> FilesystemCursor -> Bool
/= :: FilesystemCursor -> FilesystemCursor -> Bool
Eq, Eq FilesystemCursor
Eq FilesystemCursor =>
(FilesystemCursor -> FilesystemCursor -> Ordering)
-> (FilesystemCursor -> FilesystemCursor -> Bool)
-> (FilesystemCursor -> FilesystemCursor -> Bool)
-> (FilesystemCursor -> FilesystemCursor -> Bool)
-> (FilesystemCursor -> FilesystemCursor -> Bool)
-> (FilesystemCursor -> FilesystemCursor -> FilesystemCursor)
-> (FilesystemCursor -> FilesystemCursor -> FilesystemCursor)
-> Ord FilesystemCursor
FilesystemCursor -> FilesystemCursor -> Bool
FilesystemCursor -> FilesystemCursor -> Ordering
FilesystemCursor -> FilesystemCursor -> FilesystemCursor
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: FilesystemCursor -> FilesystemCursor -> Ordering
compare :: FilesystemCursor -> FilesystemCursor -> Ordering
$c< :: FilesystemCursor -> FilesystemCursor -> Bool
< :: FilesystemCursor -> FilesystemCursor -> Bool
$c<= :: FilesystemCursor -> FilesystemCursor -> Bool
<= :: FilesystemCursor -> FilesystemCursor -> Bool
$c> :: FilesystemCursor -> FilesystemCursor -> Bool
> :: FilesystemCursor -> FilesystemCursor -> Bool
$c>= :: FilesystemCursor -> FilesystemCursor -> Bool
>= :: FilesystemCursor -> FilesystemCursor -> Bool
$cmax :: FilesystemCursor -> FilesystemCursor -> FilesystemCursor
max :: FilesystemCursor -> FilesystemCursor -> FilesystemCursor
$cmin :: FilesystemCursor -> FilesystemCursor -> FilesystemCursor
min :: FilesystemCursor -> FilesystemCursor -> FilesystemCursor
Ord, (forall x. FilesystemCursor -> Rep FilesystemCursor x)
-> (forall x. Rep FilesystemCursor x -> FilesystemCursor)
-> Generic FilesystemCursor
forall x. Rep FilesystemCursor x -> FilesystemCursor
forall x. FilesystemCursor -> Rep FilesystemCursor x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. FilesystemCursor -> Rep FilesystemCursor x
from :: forall x. FilesystemCursor -> Rep FilesystemCursor x
$cto :: forall x. Rep FilesystemCursor x -> FilesystemCursor
to :: forall x. Rep FilesystemCursor x -> FilesystemCursor
Generic)
    deriving anyclass (Maybe FilesystemCursor
Value -> Parser [FilesystemCursor]
Value -> Parser FilesystemCursor
(Value -> Parser FilesystemCursor)
-> (Value -> Parser [FilesystemCursor])
-> Maybe FilesystemCursor
-> FromJSON FilesystemCursor
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser FilesystemCursor
parseJSON :: Value -> Parser FilesystemCursor
$cparseJSONList :: Value -> Parser [FilesystemCursor]
parseJSONList :: Value -> Parser [FilesystemCursor]
$comittedField :: Maybe FilesystemCursor
omittedField :: Maybe FilesystemCursor
FromJSON, [FilesystemCursor] -> Value
[FilesystemCursor] -> Encoding
FilesystemCursor -> Bool
FilesystemCursor -> Value
FilesystemCursor -> Encoding
(FilesystemCursor -> Value)
-> (FilesystemCursor -> Encoding)
-> ([FilesystemCursor] -> Value)
-> ([FilesystemCursor] -> Encoding)
-> (FilesystemCursor -> Bool)
-> ToJSON FilesystemCursor
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: FilesystemCursor -> Value
toJSON :: FilesystemCursor -> Value
$ctoEncoding :: FilesystemCursor -> Encoding
toEncoding :: FilesystemCursor -> Encoding
$ctoJSONList :: [FilesystemCursor] -> Value
toJSONList :: [FilesystemCursor] -> Value
$ctoEncodingList :: [FilesystemCursor] -> Encoding
toEncodingList :: [FilesystemCursor] -> Encoding
$comitField :: FilesystemCursor -> Bool
omitField :: FilesystemCursor -> Bool
ToJSON)
data Notifier = Notifier
    { Notifier -> Async ()
notifierThread :: Async ()
    
    , Notifier -> Async ()
reloadThread :: Async ()
    
    }
data FilesystemStore
type instance Cursor FilesystemStore = FilesystemCursor
type instance BackendHandle FilesystemStore = FilesystemStoreHandle
data StoreException
    = LockTimeout FilePath
    | CorruptEventLog FilePath String
    deriving (Int -> StoreException -> ShowS
[StoreException] -> ShowS
StoreException -> FilePath
(Int -> StoreException -> ShowS)
-> (StoreException -> FilePath)
-> ([StoreException] -> ShowS)
-> Show StoreException
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> StoreException -> ShowS
showsPrec :: Int -> StoreException -> ShowS
$cshow :: StoreException -> FilePath
show :: StoreException -> FilePath
$cshowList :: [StoreException] -> ShowS
showList :: [StoreException] -> ShowS
Show)
instance Exception StoreException
data FilesystemStoreHandle = FilesystemStoreHandle
    { FilesystemStoreHandle -> FilesystemStoreConfig
config :: FilesystemStoreConfig
    
    , FilesystemStoreHandle -> TVar (StoreState FilesystemStore)
stateVar :: TVar (StoreState FilesystemStore)
    
    , FilesystemStoreHandle -> Notifier
notifier :: Notifier
    
    }
data StorePaths = StorePaths
    { StorePaths -> FilePath
eventLogPath :: FilePath
    
    , StorePaths -> FilePath
storeLockPath :: FilePath
    
    }
getPaths ::
    
    FilePath ->
    
    StorePaths
getPaths :: FilePath -> StorePaths
getPaths FilePath
base =
    StorePaths
        { eventLogPath :: FilePath
eventLogPath = FilePath
base FilePath -> ShowS
</> FilePath
"events.log"
        , storeLockPath :: FilePath
storeLockPath = FilePath
base FilePath -> ShowS
</> FilePath
"store.lock"
        }
mkDefaultConfig ::
    
    FilePath ->
    
    FilesystemStoreConfig
mkDefaultConfig :: FilePath -> FilesystemStoreConfig
mkDefaultConfig FilePath
path =
    FilesystemStoreConfig
        { storePath :: FilePath
storePath = FilePath
path
        , syncInterval :: Int
syncInterval = Int
1 
        , lockTimeout :: Int
lockTimeout = Int
5000000 
        }
getStoreConfig ::
    
    FilesystemStoreHandle ->
    
    FilesystemStoreConfig
getStoreConfig :: FilesystemStoreHandle -> FilesystemStoreConfig
getStoreConfig = (.config)
ensureDirectories :: FilePath -> IO ()
ensureDirectories :: FilePath -> IO ()
ensureDirectories FilePath
path = do
    let dir :: FilePath
dir = ShowS
takeDirectory FilePath
path
    Bool -> FilePath -> IO ()
createDirectoryIfMissing Bool
True FilePath
dir
withStoreLockDirect :: FilesystemStoreConfig -> IO a -> IO a
withStoreLockDirect :: forall a. FilesystemStoreConfig -> IO a -> IO a
withStoreLockDirect FilesystemStoreConfig
config IO a
action = do
    let paths :: StorePaths
paths = FilePath -> StorePaths
getPaths FilesystemStoreConfig
config.storePath
    
    result <-
        Int -> IO a -> IO (Maybe a)
forall a. Int -> IO a -> IO (Maybe a)
timeout FilesystemStoreConfig
config.lockTimeout (IO a -> IO (Maybe a)) -> IO a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$
            IO FileLock -> (FileLock -> IO ()) -> (FileLock -> IO a) -> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
                (FilePath -> SharedExclusive -> IO FileLock
FL.lockFile StorePaths
paths.storeLockPath SharedExclusive
FL.Exclusive)
                FileLock -> IO ()
FL.unlockFile
                (IO a -> FileLock -> IO a
forall a b. a -> b -> a
const IO a
action)
    case result of
        Maybe a
Nothing -> StoreException -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (StoreException -> IO a) -> StoreException -> IO a
forall a b. (a -> b) -> a -> b
$ FilePath -> StoreException
LockTimeout StorePaths
paths.storeLockPath
        Just a
value -> a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
value
withStoreLock :: FilesystemStoreHandle -> IO a -> IO a
withStoreLock :: forall a. FilesystemStoreHandle -> IO a -> IO a
withStoreLock FilesystemStoreHandle
handle = FilesystemStoreConfig -> IO a -> IO a
forall a. FilesystemStoreConfig -> IO a -> IO a
withStoreLockDirect FilesystemStoreHandle
handle.config
newFilesystemStore ::
    
    FilesystemStoreConfig ->
    
    IO FilesystemStoreHandle
newFilesystemStore :: FilesystemStoreConfig -> IO FilesystemStoreHandle
newFilesystemStore FilesystemStoreConfig
config = do
    let paths :: StorePaths
paths = FilePath -> StorePaths
getPaths FilesystemStoreConfig
config.storePath
    
    FilePath -> IO ()
ensureDirectories FilesystemStoreConfig
config.storePath
    
    exists <- FilePath -> IO Bool
doesFileExist StorePaths
paths.eventLogPath
    when (not exists) $ BS.writeFile paths.eventLogPath ""
    
    globalVar <- newTVarIO (-1)
    stateVar <-
        newTVarIO $
            StoreState
                { nextSequence = 0
                , events = Map.empty
                , streamEvents = Map.empty
                , streamVersions = Map.empty
                , streamLocalVersions = Map.empty
                , streamNotifications = Map.empty
                , globalNotification = globalVar
                }
    
    notifier <- startNotifier paths.eventLogPath stateVar config
    let handle = FilesystemStoreHandle{TVar (StoreState FilesystemStore)
Notifier
FilesystemStoreConfig
config :: FilesystemStoreConfig
stateVar :: TVar (StoreState FilesystemStore)
notifier :: Notifier
config :: FilesystemStoreConfig
stateVar :: TVar (StoreState FilesystemStore)
notifier :: Notifier
..}
    
    
    reloadResult <- try @StoreException $ do
        logExists <- doesFileExist paths.eventLogPath
        when logExists $ do
            entries <- readLogEntries handle
            let completedEvents = Map UUID [StoredEvent] -> [[StoredEvent]]
forall k a. Map k a -> [a]
Map.elems (Map UUID [StoredEvent] -> [[StoredEvent]])
-> Map UUID [StoredEvent] -> [[StoredEvent]]
forall a b. (a -> b) -> a -> b
$ [EventLogEntry] -> Map UUID [StoredEvent]
processLogEntries [EventLogEntry]
entries
                maxSeqNo = [Integer] -> Integer
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum ([Integer] -> Integer) -> [Integer] -> Integer
forall a b. (a -> b) -> a -> b
$ Integer
0 Integer -> [Integer] -> [Integer]
forall a. a -> [a] -> [a]
: [StoredEvent
e.seqNo | [StoredEvent]
es <- [[StoredEvent]]
completedEvents, StoredEvent
e <- [StoredEvent]
es]
            when (not $ null completedEvents) $ atomically $ do
                
                modifyTVar' stateVar $ \StoreState FilesystemStore
state ->
                    ([StoredEvent]
 -> StoreState FilesystemStore -> StoreState FilesystemStore)
-> StoreState FilesystemStore
-> [[StoredEvent]]
-> StoreState FilesystemStore
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr
                        ((StoreState FilesystemStore
 -> [StoredEvent] -> StoreState FilesystemStore)
-> [StoredEvent]
-> StoreState FilesystemStore
-> StoreState FilesystemStore
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((StoredEvent
 -> StoreState FilesystemStore -> StoreState FilesystemStore)
-> StoreState FilesystemStore
-> [StoredEvent]
-> StoreState FilesystemStore
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr StoredEvent
-> StoreState FilesystemStore -> StoreState FilesystemStore
forall backend.
StoreCursor backend =>
StoredEvent -> StoreState backend -> StoreState backend
updateState))
                        StoreState FilesystemStore
state{nextSequence = maxSeqNo + 1}
                        [[StoredEvent]]
completedEvents
                
                state <- readTVar stateVar
                
                writeTVar state.globalNotification maxSeqNo
    case reloadResult of
        Right () -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        Left (LockTimeout FilePath
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure () 
        Left (CorruptEventLog FilePath
path FilePath
reason) -> StoreException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (StoreException -> IO ()) -> StoreException -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> FilePath -> StoreException
CorruptEventLog FilePath
path FilePath
reason 
    pure handle
decodeLogEntriesStrict :: FilePath -> BL.ByteString -> IO [EventLogEntry]
decodeLogEntriesStrict :: FilePath -> ByteString -> IO [EventLogEntry]
decodeLogEntriesStrict FilePath
path ByteString
contents = do
    let lines' :: [ByteString]
lines' = Char -> ByteString -> [ByteString]
BL8.split Char
'\n' ByteString
contents
        nonEmptyLines :: [ByteString]
nonEmptyLines = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool) -> (ByteString -> Bool) -> ByteString -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Bool
BL.null) [ByteString]
lines'
    ((Integer, ByteString) -> IO EventLogEntry)
-> [(Integer, ByteString)] -> IO [EventLogEntry]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (Integer, ByteString) -> IO EventLogEntry
decodeOne ([Integer] -> [ByteString] -> [(Integer, ByteString)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Integer
1 ..] [ByteString]
nonEmptyLines)
  where
    decodeOne :: (Integer, BL.ByteString) -> IO EventLogEntry
    decodeOne :: (Integer, ByteString) -> IO EventLogEntry
decodeOne (Integer
lineNum, ByteString
line) =
        case ByteString -> Maybe EventLogEntry
forall a. FromJSON a => ByteString -> Maybe a
decode ByteString
line of
            Just EventLogEntry
entry -> EventLogEntry -> IO EventLogEntry
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure EventLogEntry
entry
            Maybe EventLogEntry
Nothing ->
                StoreException -> IO EventLogEntry
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (StoreException -> IO EventLogEntry)
-> StoreException -> IO EventLogEntry
forall a b. (a -> b) -> a -> b
$
                    FilePath -> FilePath -> StoreException
CorruptEventLog FilePath
path (FilePath -> StoreException) -> FilePath -> StoreException
forall a b. (a -> b) -> a -> b
$
                        FilePath
"Failed to parse JSON at line "
                            FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ Integer -> FilePath
forall a. Show a => a -> FilePath
show Integer
lineNum
                            FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ FilePath
": "
                            FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> ShowS
forall a. Int -> [a] -> [a]
take Int
100 (ByteString -> FilePath
BL8.unpack ByteString
line)
processLogEntries :: [EventLogEntry] -> Map UUID [StoredEvent]
processLogEntries :: [EventLogEntry] -> Map UUID [StoredEvent]
processLogEntries [EventLogEntry]
entries = [(UUID, [StoredEvent])] -> Map UUID [StoredEvent]
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(EventLogEntry
e.transactionId, EventLogEntry
e.events) | EventLogEntry
e <- [EventLogEntry]
entries]
readLogEntries :: FilesystemStoreHandle -> IO [EventLogEntry]
readLogEntries :: FilesystemStoreHandle -> IO [EventLogEntry]
readLogEntries FilesystemStoreHandle
handle =
    FilesystemStoreHandle -> IO [EventLogEntry] -> IO [EventLogEntry]
forall a. FilesystemStoreHandle -> IO a -> IO a
withStoreLock FilesystemStoreHandle
handle (IO [EventLogEntry] -> IO [EventLogEntry])
-> IO [EventLogEntry] -> IO [EventLogEntry]
forall a b. (a -> b) -> a -> b
$ do
        let paths :: StorePaths
paths = FilePath -> StorePaths
getPaths FilesystemStoreHandle
handle.config.storePath
        contents <- FilePath -> IO ByteString
BL.readFile StorePaths
paths.eventLogPath
        decodeLogEntriesStrict paths.eventLogPath contents
startNotifier :: FilePath -> TVar (StoreState FilesystemStore) -> FilesystemStoreConfig -> IO Notifier
startNotifier :: FilePath
-> TVar (StoreState FilesystemStore)
-> FilesystemStoreConfig
-> IO Notifier
startNotifier FilePath
eventLogPath TVar (StoreState FilesystemStore)
stateVar FilesystemStoreConfig
config = do
    chan <- IO (TChan ())
forall a. IO (TChan a)
newBroadcastTChanIO
    
    notifyThread <- async $ notifierLoop eventLogPath chan
    
    
    
    
    reloadChan <- atomically $ dupTChan chan
    reloadThread <- async $ forever $ do
        atomically $ readTChan reloadChan
        reloadEventsFromDiskCentralWithRetry stateVar config 0
    pure $ Notifier notifyThread reloadThread
shutdownNotifier :: Notifier -> IO ()
shutdownNotifier :: Notifier -> IO ()
shutdownNotifier Notifier
notifier = do
    Async () -> IO ()
forall a. Async a -> IO ()
cancel Notifier
notifier.notifierThread 
    Async () -> IO ()
forall a. Async a -> IO ()
cancel Notifier
notifier.reloadThread 
notifierLoop :: FilePath -> TChan () -> IO ()
notifierLoop :: FilePath -> TChan () -> IO ()
notifierLoop FilePath
eventLogPath TChan ()
chan = do
    
    canonicalEventLogPath <- FilePath -> IO FilePath
canonicalizePath FilePath
eventLogPath
    withManager $ \WatchManager
mgr -> do
        let watchDir' :: FilePath
watchDir' = ShowS
takeDirectory FilePath
canonicalEventLogPath
            predicate :: Event -> Bool
predicate Event
event = Event -> FilePath
eventPath Event
event FilePath -> FilePath -> Bool
forall a. Eq a => a -> a -> Bool
== FilePath
canonicalEventLogPath
            handler :: Event -> IO ()
handler Event
_event = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan () -> () -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan ()
chan ()
        
        IO (IO ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ WatchManager
-> FilePath -> (Event -> Bool) -> (Event -> IO ()) -> IO (IO ())
watchDir WatchManager
mgr FilePath
watchDir' Event -> Bool
predicate Event -> IO ()
handler
        
        IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
forall a. Bounded a => a
maxBound
reloadEventsFromDiskCentralWithRetry :: TVar (StoreState FilesystemStore) -> FilesystemStoreConfig -> Int -> IO ()
reloadEventsFromDiskCentralWithRetry :: TVar (StoreState FilesystemStore)
-> FilesystemStoreConfig -> Int -> IO ()
reloadEventsFromDiskCentralWithRetry TVar (StoreState FilesystemStore)
stateVar FilesystemStoreConfig
config Int
retryCount = do
    result <- IO () -> IO (Either StoreException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (IO () -> IO (Either StoreException ()))
-> IO () -> IO (Either StoreException ())
forall a b. (a -> b) -> a -> b
$ TVar (StoreState FilesystemStore) -> FilesystemStoreConfig -> IO ()
reloadEventsFromDiskCentral TVar (StoreState FilesystemStore)
stateVar FilesystemStoreConfig
config
    case result of
        Right () -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure () 
        Left (LockTimeout FilePath
path) -> do
            if Int
retryCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
5
                then do
                    
                    
                    FilePath -> IO ()
putStrLn (FilePath -> IO ()) -> FilePath -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath
"WARNING: Failed to reload events after 5 retries due to lock contention on " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ FilePath
path
                    () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                else do
                    
                    let delayMicros :: Int
delayMicros = Int
10000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* (Int
2 Int -> Int -> Int
forall a b. (Num a, Integral b) => a -> b -> a
^ Int
retryCount)
                    Int -> IO ()
threadDelay Int
delayMicros
                    TVar (StoreState FilesystemStore)
-> FilesystemStoreConfig -> Int -> IO ()
reloadEventsFromDiskCentralWithRetry TVar (StoreState FilesystemStore)
stateVar FilesystemStoreConfig
config (Int
retryCount Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
        Left (CorruptEventLog FilePath
path FilePath
reason) -> do
            
            
            FilePath -> IO ()
putStrLn (FilePath -> IO ()) -> FilePath -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath
"FATAL: Event log corrupted at " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ FilePath
path
            FilePath -> IO ()
putStrLn (FilePath -> IO ()) -> FilePath -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath
"Reason: " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ FilePath
reason
            FilePath -> IO ()
putStrLn FilePath
"This indicates either a serious bug in the event store or external process tampering."
            FilePath -> IO ()
putStrLn FilePath
"The reload thread will now terminate. Manual intervention is required."
            StoreException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (StoreException -> IO ()) -> StoreException -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> FilePath -> StoreException
CorruptEventLog FilePath
path FilePath
reason
reloadEventsFromDiskCentral :: TVar (StoreState FilesystemStore) -> FilesystemStoreConfig -> IO ()
reloadEventsFromDiskCentral :: TVar (StoreState FilesystemStore) -> FilesystemStoreConfig -> IO ()
reloadEventsFromDiskCentral TVar (StoreState FilesystemStore)
stateVar FilesystemStoreConfig
config = do
    
    
    entries <- FilesystemStoreConfig -> IO [EventLogEntry] -> IO [EventLogEntry]
forall a. FilesystemStoreConfig -> IO a -> IO a
withStoreLockDirect FilesystemStoreConfig
config (IO [EventLogEntry] -> IO [EventLogEntry])
-> IO [EventLogEntry] -> IO [EventLogEntry]
forall a b. (a -> b) -> a -> b
$ do
        let paths :: StorePaths
paths = FilePath -> StorePaths
getPaths FilesystemStoreConfig
config.storePath
        contents <- FilePath -> IO ByteString
BS.readFile StorePaths
paths.eventLogPath
        
        decodeLogEntriesStrict paths.eventLogPath (BL.fromStrict contents)
    let completedEvents = Map UUID [StoredEvent] -> [[StoredEvent]]
forall k a. Map k a -> [a]
Map.elems (Map UUID [StoredEvent] -> [[StoredEvent]])
-> Map UUID [StoredEvent] -> [[StoredEvent]]
forall a b. (a -> b) -> a -> b
$ [EventLogEntry] -> Map UUID [StoredEvent]
processLogEntries [EventLogEntry]
entries
    atomically $ do
        state <- readTVar stateVar
        let currentMaxSeq = StoreState FilesystemStore
state.nextSequence Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
1
            newEvents = [StoredEvent
e | [StoredEvent]
es <- [[StoredEvent]]
completedEvents, StoredEvent
e <- [StoredEvent]
es, StoredEvent
e.seqNo Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
> Integer
currentMaxSeq]
            newMaxSeq = [Integer] -> Integer
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum ([Integer] -> Integer) -> [Integer] -> Integer
forall a b. (a -> b) -> a -> b
$ Integer
currentMaxSeq Integer -> [Integer] -> [Integer]
forall a. a -> [a] -> [a]
: [StoredEvent
e.seqNo | StoredEvent
e <- [StoredEvent]
newEvents]
        when (not $ null newEvents) $ do
            
            let newState = (StoredEvent
 -> StoreState FilesystemStore -> StoreState FilesystemStore)
-> StoreState FilesystemStore
-> [StoredEvent]
-> StoreState FilesystemStore
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr StoredEvent
-> StoreState FilesystemStore -> StoreState FilesystemStore
forall backend.
StoreCursor backend =>
StoredEvent -> StoreState backend -> StoreState backend
updateState StoreState FilesystemStore
state [StoredEvent]
newEvents
            writeTVar stateVar newState{nextSequence = newMaxSeq + 1}
            
            writeTVar newState.globalNotification newMaxSeq
subscribeFilesystem ::
    forall m ts.
    (MonadUnliftIO m) =>
    FilesystemStoreHandle ->
    EventMatcher ts FilesystemStore m ->
    EventSelector FilesystemStore ->
    m (SubscriptionHandle FilesystemStore)
subscribeFilesystem :: forall (m :: * -> *) (ts :: [Symbol]).
MonadUnliftIO m =>
FilesystemStoreHandle
-> EventMatcher ts FilesystemStore m
-> EventSelector FilesystemStore
-> m (SubscriptionHandle FilesystemStore)
subscribeFilesystem FilesystemStoreHandle
handle EventMatcher ts FilesystemStore m
matcher EventSelector FilesystemStore
selector =
    
    
    TVar (StoreState FilesystemStore)
-> EventMatcher ts FilesystemStore m
-> EventSelector FilesystemStore
-> m (SubscriptionHandle FilesystemStore)
forall (m :: * -> *) backend (ts :: [Symbol]).
(MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
TVar (StoreState backend)
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribeToEvents FilesystemStoreHandle
handle.stateVar EventMatcher ts FilesystemStore m
matcher EventSelector FilesystemStore
selector
cleanupFilesystemStore ::
    
    FilesystemStoreHandle ->
    IO ()
cleanupFilesystemStore :: FilesystemStoreHandle -> IO ()
cleanupFilesystemStore FilesystemStoreHandle
handle = do
    let paths :: StorePaths
paths = FilePath -> StorePaths
getPaths FilesystemStoreHandle
handle.config.storePath
    
    Notifier -> IO ()
shutdownNotifier FilesystemStoreHandle
handle.notifier
    
    FilePath -> IO Bool
doesFileExist StorePaths
paths.storeLockPath IO Bool -> (Bool -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Bool -> IO () -> IO ()) -> IO () -> Bool -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (FilePath -> IO ()
removeFile StorePaths
paths.storeLockPath)
instance EventStore FilesystemStore where
    type StoreConstraints FilesystemStore m = (MonadUnliftIO m)
    insertEvents :: forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints FilesystemStore m) =>
BackendHandle FilesystemStore
-> Maybe CorrelationId
-> Transaction t FilesystemStore
-> m (InsertionResult FilesystemStore)
insertEvents BackendHandle FilesystemStore
handle Maybe CorrelationId
corrId (Transaction Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
batches) = IO (InsertionResult FilesystemStore)
-> m (InsertionResult FilesystemStore)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (InsertionResult FilesystemStore)
 -> m (InsertionResult FilesystemStore))
-> IO (InsertionResult FilesystemStore)
-> m (InsertionResult FilesystemStore)
forall a b. (a -> b) -> a -> b
$ do
        txId <- IO UUID
UUID.nextRandom
        now <- getCurrentTime
        
        result <- try $ withStoreLock handle $ do
            
            
            
            
            diskMaxSeq <- do
                let paths = FilePath -> StorePaths
getPaths BackendHandle FilesystemStore
FilesystemStoreHandle
handle.config.storePath
                diskContents <- BS.readFile paths.eventLogPath
                let lastLine = [ByteString] -> Maybe ByteString
forall a. [a] -> Maybe a
listToMaybe ([ByteString] -> Maybe ByteString)
-> [ByteString] -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> [ByteString]
forall a. [a] -> [a]
reverse ([ByteString] -> [ByteString]) -> [ByteString] -> [ByteString]
forall a b. (a -> b) -> a -> b
$ (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool) -> (ByteString -> Bool) -> ByteString -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Bool
BS.null) ([ByteString] -> [ByteString]) -> [ByteString] -> [ByteString]
forall a b. (a -> b) -> a -> b
$ Char -> ByteString -> [ByteString]
BS8.split Char
'\n' ByteString
diskContents
                    lastEntry :: Maybe EventLogEntry
                    lastEntry = Maybe ByteString
lastLine Maybe ByteString
-> (ByteString -> Maybe EventLogEntry) -> Maybe EventLogEntry
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ByteString -> Maybe EventLogEntry
forall a. FromJSON a => ByteString -> Maybe a
decode (ByteString -> Maybe EventLogEntry)
-> (ByteString -> ByteString) -> ByteString -> Maybe EventLogEntry
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BL.fromStrict
                    lastSeqNos = [Integer]
-> (EventLogEntry -> [Integer]) -> Maybe EventLogEntry -> [Integer]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] (\EventLogEntry
entry -> (StoredEvent -> Integer) -> [StoredEvent] -> [Integer]
forall a b. (a -> b) -> [a] -> [b]
map StoredEvent -> Integer
seqNo EventLogEntry
entry.events) Maybe EventLogEntry
lastEntry
                pure $ maximum $ (-1) : lastSeqNos
            
            versionCheckResult <- atomically $ do
                state <- readTVar handle.stateVar
                case checkAllVersions state batches of
                    Left EventStoreError FilesystemStore
err -> Either
  (EventStoreError FilesystemStore)
  (StoreState FilesystemStore, FilesystemCursor,
   Map StreamId FilesystemCursor, [StoredEvent])
-> STM
     (Either
        (EventStoreError FilesystemStore)
        (StoreState FilesystemStore, FilesystemCursor,
         Map StreamId FilesystemCursor, [StoredEvent]))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either
   (EventStoreError FilesystemStore)
   (StoreState FilesystemStore, FilesystemCursor,
    Map StreamId FilesystemCursor, [StoredEvent])
 -> STM
      (Either
         (EventStoreError FilesystemStore)
         (StoreState FilesystemStore, FilesystemCursor,
          Map StreamId FilesystemCursor, [StoredEvent])))
-> Either
     (EventStoreError FilesystemStore)
     (StoreState FilesystemStore, FilesystemCursor,
      Map StreamId FilesystemCursor, [StoredEvent])
-> STM
     (Either
        (EventStoreError FilesystemStore)
        (StoreState FilesystemStore, FilesystemCursor,
         Map StreamId FilesystemCursor, [StoredEvent]))
forall a b. (a -> b) -> a -> b
$ EventStoreError FilesystemStore
-> Either
     (EventStoreError FilesystemStore)
     (StoreState FilesystemStore, FilesystemCursor,
      Map StreamId FilesystemCursor, [StoredEvent])
forall a b. a -> Either a b
Left EventStoreError FilesystemStore
err
                    Right () -> do
                        
                        let actualNextSeq :: Integer
actualNextSeq = Integer -> Integer -> Integer
forall a. Ord a => a -> a -> a
max StoreState FilesystemStore
state.nextSequence (Integer
diskMaxSeq Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
1)
                            stateWithCorrectSeq :: StoreState FilesystemStore
stateWithCorrectSeq = StoreState FilesystemStore
state{nextSequence = actualNextSeq}
                            eventIds :: [EventId]
eventIds = Int -> EventId -> [EventId]
forall a. Int -> a -> [a]
replicate ([Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int] -> Int) -> [Int] -> Int
forall a b. (a -> b) -> a -> b
$ (StreamWrite t SomeLatestEvent FilesystemStore -> Int)
-> [StreamWrite t SomeLatestEvent FilesystemStore] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (t SomeLatestEvent -> Int
forall a. t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (t SomeLatestEvent -> Int)
-> (StreamWrite t SomeLatestEvent FilesystemStore
    -> t SomeLatestEvent)
-> StreamWrite t SomeLatestEvent FilesystemStore
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (.events)) ([StreamWrite t SomeLatestEvent FilesystemStore] -> [Int])
-> [StreamWrite t SomeLatestEvent FilesystemStore] -> [Int]
forall a b. (a -> b) -> a -> b
$ Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
-> [StreamWrite t SomeLatestEvent FilesystemStore]
forall k a. Map k a -> [a]
Map.elems Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
batches) (UUID -> EventId
EventId UUID
txId)
                            (StoreState FilesystemStore
newState, Cursor FilesystemStore
finalCursor, Map StreamId (Cursor FilesystemStore)
streamCursors) = StoreState FilesystemStore
-> Maybe CorrelationId
-> UTCTime
-> [EventId]
-> Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
-> (StoreState FilesystemStore, Cursor FilesystemStore,
    Map StreamId (Cursor FilesystemStore))
forall backend (t :: * -> *).
(StoreCursor backend, Foldable t) =>
StoreState backend
-> Maybe CorrelationId
-> UTCTime
-> [EventId]
-> Map StreamId (StreamWrite t SomeLatestEvent backend)
-> (StoreState backend, Cursor backend,
    Map StreamId (Cursor backend))
insertAllEvents StoreState FilesystemStore
stateWithCorrectSeq Maybe CorrelationId
corrId UTCTime
now [EventId]
eventIds Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
batches
                            newEvents :: [StoredEvent]
newEvents =
                                [ StoredEvent
event
                                | StoredEvent
event <- Map Integer StoredEvent -> [StoredEvent]
forall k a. Map k a -> [a]
Map.elems StoreState FilesystemStore
newState.events
                                , Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Integer -> Map Integer StoredEvent -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member StoredEvent
event.seqNo StoreState FilesystemStore
stateWithCorrectSeq.events
                                ]
                        Either
  (EventStoreError FilesystemStore)
  (StoreState FilesystemStore, FilesystemCursor,
   Map StreamId FilesystemCursor, [StoredEvent])
-> STM
     (Either
        (EventStoreError FilesystemStore)
        (StoreState FilesystemStore, FilesystemCursor,
         Map StreamId FilesystemCursor, [StoredEvent]))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either
   (EventStoreError FilesystemStore)
   (StoreState FilesystemStore, FilesystemCursor,
    Map StreamId FilesystemCursor, [StoredEvent])
 -> STM
      (Either
         (EventStoreError FilesystemStore)
         (StoreState FilesystemStore, FilesystemCursor,
          Map StreamId FilesystemCursor, [StoredEvent])))
-> Either
     (EventStoreError FilesystemStore)
     (StoreState FilesystemStore, FilesystemCursor,
      Map StreamId FilesystemCursor, [StoredEvent])
-> STM
     (Either
        (EventStoreError FilesystemStore)
        (StoreState FilesystemStore, FilesystemCursor,
         Map StreamId FilesystemCursor, [StoredEvent]))
forall a b. (a -> b) -> a -> b
$ (StoreState FilesystemStore, FilesystemCursor,
 Map StreamId FilesystemCursor, [StoredEvent])
-> Either
     (EventStoreError FilesystemStore)
     (StoreState FilesystemStore, FilesystemCursor,
      Map StreamId FilesystemCursor, [StoredEvent])
forall a b. b -> Either a b
Right (StoreState FilesystemStore
newState, Cursor FilesystemStore
FilesystemCursor
finalCursor, Map StreamId (Cursor FilesystemStore)
Map StreamId FilesystemCursor
streamCursors, [StoredEvent]
newEvents)
            case versionCheckResult of
                Left EventStoreError FilesystemStore
err -> Either
  (EventStoreError FilesystemStore)
  (FilesystemCursor, Map StreamId FilesystemCursor)
-> IO
     (Either
        (EventStoreError FilesystemStore)
        (FilesystemCursor, Map StreamId FilesystemCursor))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either
   (EventStoreError FilesystemStore)
   (FilesystemCursor, Map StreamId FilesystemCursor)
 -> IO
      (Either
         (EventStoreError FilesystemStore)
         (FilesystemCursor, Map StreamId FilesystemCursor)))
-> Either
     (EventStoreError FilesystemStore)
     (FilesystemCursor, Map StreamId FilesystemCursor)
-> IO
     (Either
        (EventStoreError FilesystemStore)
        (FilesystemCursor, Map StreamId FilesystemCursor))
forall a b. (a -> b) -> a -> b
$ EventStoreError FilesystemStore
-> Either
     (EventStoreError FilesystemStore)
     (FilesystemCursor, Map StreamId FilesystemCursor)
forall a b. a -> Either a b
Left EventStoreError FilesystemStore
err
                Right (StoreState FilesystemStore
newState, FilesystemCursor
finalCursor, Map StreamId FilesystemCursor
streamCursors, [StoredEvent]
newEvents) -> do
                    
                    let paths :: StorePaths
paths = FilePath -> StorePaths
getPaths BackendHandle FilesystemStore
FilesystemStoreHandle
handle.config.storePath
                        jsonLine :: ByteString
jsonLine = ByteString -> ByteString
BL.toStrict (EventLogEntry -> ByteString
forall a. ToJSON a => a -> ByteString
encode (EventLogEntry -> ByteString) -> EventLogEntry -> ByteString
forall a b. (a -> b) -> a -> b
$ UUID -> [StoredEvent] -> UTCTime -> EventLogEntry
EventLogEntry UUID
txId [StoredEvent]
newEvents UTCTime
now) ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\n"
                    FilePath -> ByteString -> IO ()
BS.appendFile StorePaths
paths.eventLogPath ByteString
jsonLine
                    
                    IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                        TVar (StoreState FilesystemStore)
-> StoreState FilesystemStore -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar BackendHandle FilesystemStore
FilesystemStoreHandle
handle.stateVar StoreState FilesystemStore
newState
                        
                        [StreamId] -> (StreamId -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
-> [StreamId]
forall k a. Map k a -> [k]
Map.keys Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
batches) ((StreamId -> STM ()) -> STM ()) -> (StreamId -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \StreamId
streamId ->
                            Maybe (TVar Integer) -> (TVar Integer -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (StreamId -> Map StreamId (TVar Integer) -> Maybe (TVar Integer)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamId StoreState FilesystemStore
newState.streamNotifications) ((TVar Integer -> STM ()) -> STM ())
-> (TVar Integer -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \TVar Integer
var ->
                                TVar Integer -> Integer -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Integer
var (FilesystemCursor -> Integer
getSequenceNo FilesystemCursor
finalCursor)
                        TVar Integer -> Integer -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar StoreState FilesystemStore
newState.globalNotification (FilesystemCursor -> Integer
getSequenceNo FilesystemCursor
finalCursor)
                    Either
  (EventStoreError FilesystemStore)
  (FilesystemCursor, Map StreamId FilesystemCursor)
-> IO
     (Either
        (EventStoreError FilesystemStore)
        (FilesystemCursor, Map StreamId FilesystemCursor))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either
   (EventStoreError FilesystemStore)
   (FilesystemCursor, Map StreamId FilesystemCursor)
 -> IO
      (Either
         (EventStoreError FilesystemStore)
         (FilesystemCursor, Map StreamId FilesystemCursor)))
-> Either
     (EventStoreError FilesystemStore)
     (FilesystemCursor, Map StreamId FilesystemCursor)
-> IO
     (Either
        (EventStoreError FilesystemStore)
        (FilesystemCursor, Map StreamId FilesystemCursor))
forall a b. (a -> b) -> a -> b
$ (FilesystemCursor, Map StreamId FilesystemCursor)
-> Either
     (EventStoreError FilesystemStore)
     (FilesystemCursor, Map StreamId FilesystemCursor)
forall a b. b -> Either a b
Right (FilesystemCursor
finalCursor, Map StreamId FilesystemCursor
streamCursors)
        case result of
            Left (SomeException
e :: SomeException) ->
                
                InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult FilesystemStore
 -> IO (InsertionResult FilesystemStore))
-> InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore)
forall a b. (a -> b) -> a -> b
$
                    EventStoreError FilesystemStore -> InsertionResult FilesystemStore
forall backend. EventStoreError backend -> InsertionResult backend
FailedInsertion (EventStoreError FilesystemStore
 -> InsertionResult FilesystemStore)
-> EventStoreError FilesystemStore
-> InsertionResult FilesystemStore
forall a b. (a -> b) -> a -> b
$
                        ErrorInfo -> EventStoreError FilesystemStore
forall backend. ErrorInfo -> EventStoreError backend
BackendError (ErrorInfo -> EventStoreError FilesystemStore)
-> ErrorInfo -> EventStoreError FilesystemStore
forall a b. (a -> b) -> a -> b
$
                            ErrorInfo
                                { errorMessage :: Text
errorMessage = FilePath -> Text
pack (FilePath -> Text) -> FilePath -> Text
forall a b. (a -> b) -> a -> b
$ FilePath
"Failed to persist events: " FilePath -> ShowS
forall a. Semigroup a => a -> a -> a
<> SomeException -> FilePath
forall e. Exception e => e -> FilePath
displayException SomeException
e
                                , exception :: Maybe SomeException
exception = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e
                                }
            Right (Left EventStoreError FilesystemStore
err) ->
                
                InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult FilesystemStore
 -> IO (InsertionResult FilesystemStore))
-> InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore)
forall a b. (a -> b) -> a -> b
$ EventStoreError FilesystemStore -> InsertionResult FilesystemStore
forall backend. EventStoreError backend -> InsertionResult backend
FailedInsertion EventStoreError FilesystemStore
err
            Right (Right (FilesystemCursor
finalCursor, Map StreamId FilesystemCursor
streamCursors)) ->
                
                InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult FilesystemStore
 -> IO (InsertionResult FilesystemStore))
-> InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore)
forall a b. (a -> b) -> a -> b
$
                    InsertionSuccess FilesystemStore -> InsertionResult FilesystemStore
forall backend. InsertionSuccess backend -> InsertionResult backend
SuccessfulInsertion (InsertionSuccess FilesystemStore
 -> InsertionResult FilesystemStore)
-> InsertionSuccess FilesystemStore
-> InsertionResult FilesystemStore
forall a b. (a -> b) -> a -> b
$
                        InsertionSuccess
                            { finalCursor :: Cursor FilesystemStore
finalCursor = Cursor FilesystemStore
FilesystemCursor
finalCursor
                            , streamCursors :: Map StreamId (Cursor FilesystemStore)
streamCursors = Map StreamId (Cursor FilesystemStore)
Map StreamId FilesystemCursor
streamCursors
                            }
    subscribe :: forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints FilesystemStore m =>
BackendHandle FilesystemStore
-> EventMatcher ts FilesystemStore m
-> EventSelector FilesystemStore
-> m (SubscriptionHandle FilesystemStore)
subscribe = BackendHandle FilesystemStore
-> EventMatcher ts FilesystemStore m
-> EventSelector FilesystemStore
-> m (SubscriptionHandle FilesystemStore)
FilesystemStoreHandle
-> EventMatcher ts FilesystemStore m
-> EventSelector FilesystemStore
-> m (SubscriptionHandle FilesystemStore)
forall (m :: * -> *) (ts :: [Symbol]).
MonadUnliftIO m =>
FilesystemStoreHandle
-> EventMatcher ts FilesystemStore m
-> EventSelector FilesystemStore
-> m (SubscriptionHandle FilesystemStore)
subscribeFilesystem
instance StoreCursor FilesystemStore where
    makeCursor :: Integer -> Cursor FilesystemStore
makeCursor = Integer -> Cursor FilesystemStore
Integer -> FilesystemCursor
FilesystemCursor
    makeSequenceNo :: Cursor FilesystemStore -> Integer
makeSequenceNo = (.getSequenceNo)