{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Hindsight.Store.Filesystem (
FilesystemStore,
FilesystemStoreHandle,
FilesystemCursor (..),
FilesystemStoreConfig (..),
mkDefaultConfig,
getStoreConfig,
newFilesystemStore,
cleanupFilesystemStore,
StoreException (..),
EventLogEntry (..),
StorePaths (..),
getPaths,
)
where
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (Async, async, cancel)
import Control.Concurrent.STM (
TChan,
TVar,
atomically,
dupTChan,
modifyTVar',
newBroadcastTChanIO,
newTVarIO,
readTChan,
readTVar,
writeTChan,
writeTVar,
)
import Control.Exception (Exception, SomeException, bracket, displayException, throwIO, try)
import Control.Monad (forM_, forever, void, when)
import Control.Monad.IO.Class (liftIO)
import Data.Aeson (FromJSON, ToJSON (..), decode, encode)
import Data.ByteString qualified as BS
import Data.ByteString.Char8 qualified as BS8
import Data.ByteString.Lazy qualified as BL
import Data.ByteString.Lazy.Char8 qualified as BL8
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (listToMaybe)
import Data.Text (pack)
import Data.Time (UTCTime, getCurrentTime)
import Data.UUID (UUID)
import Data.UUID.V4 qualified as UUID
import UnliftIO (MonadUnliftIO)
import GHC.Generics (Generic)
import Hindsight.Store (
BackendHandle,
Cursor,
ErrorInfo (..),
EventId (EventId),
EventMatcher,
EventSelector (..),
EventStore (..),
EventStoreError (BackendError),
InsertionResult (FailedInsertion, SuccessfulInsertion),
InsertionSuccess (..),
StreamWrite (events),
SubscriptionHandle (..),
Transaction (..),
)
import Hindsight.Store.Memory.Internal (
StoreCursor (..),
StoreState (..),
StoredEvent (seqNo),
checkAllVersions,
insertAllEvents,
subscribeToEvents,
updateState,
)
import System.Directory (canonicalizePath, createDirectoryIfMissing, doesFileExist, removeFile)
import System.FSNotify (Event (..), eventPath, watchDir, withManager)
import System.FileLock qualified as FL
import System.FilePath (takeDirectory, (</>))
import System.Timeout (timeout)
data FilesystemStoreConfig = FilesystemStoreConfig
{ FilesystemStoreConfig -> FilePath
storePath :: FilePath
, FilesystemStoreConfig -> Int
syncInterval :: Int
, FilesystemStoreConfig -> Int
lockTimeout :: Int
}
deriving (Int -> FilesystemStoreConfig -> ShowS
[FilesystemStoreConfig] -> ShowS
FilesystemStoreConfig -> FilePath
(Int -> FilesystemStoreConfig -> ShowS)
-> (FilesystemStoreConfig -> FilePath)
-> ([FilesystemStoreConfig] -> ShowS)
-> Show FilesystemStoreConfig
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> FilesystemStoreConfig -> ShowS
showsPrec :: Int -> FilesystemStoreConfig -> ShowS
$cshow :: FilesystemStoreConfig -> FilePath
show :: FilesystemStoreConfig -> FilePath
$cshowList :: [FilesystemStoreConfig] -> ShowS
showList :: [FilesystemStoreConfig] -> ShowS
Show, FilesystemStoreConfig -> FilesystemStoreConfig -> Bool
(FilesystemStoreConfig -> FilesystemStoreConfig -> Bool)
-> (FilesystemStoreConfig -> FilesystemStoreConfig -> Bool)
-> Eq FilesystemStoreConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: FilesystemStoreConfig -> FilesystemStoreConfig -> Bool
== :: FilesystemStoreConfig -> FilesystemStoreConfig -> Bool
$c/= :: FilesystemStoreConfig -> FilesystemStoreConfig -> Bool
/= :: FilesystemStoreConfig -> FilesystemStoreConfig -> Bool
Eq, (forall x. FilesystemStoreConfig -> Rep FilesystemStoreConfig x)
-> (forall x. Rep FilesystemStoreConfig x -> FilesystemStoreConfig)
-> Generic FilesystemStoreConfig
forall x. Rep FilesystemStoreConfig x -> FilesystemStoreConfig
forall x. FilesystemStoreConfig -> Rep FilesystemStoreConfig x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. FilesystemStoreConfig -> Rep FilesystemStoreConfig x
from :: forall x. FilesystemStoreConfig -> Rep FilesystemStoreConfig x
$cto :: forall x. Rep FilesystemStoreConfig x -> FilesystemStoreConfig
to :: forall x. Rep FilesystemStoreConfig x -> FilesystemStoreConfig
Generic)
deriving anyclass (Maybe FilesystemStoreConfig
Value -> Parser [FilesystemStoreConfig]
Value -> Parser FilesystemStoreConfig
(Value -> Parser FilesystemStoreConfig)
-> (Value -> Parser [FilesystemStoreConfig])
-> Maybe FilesystemStoreConfig
-> FromJSON FilesystemStoreConfig
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser FilesystemStoreConfig
parseJSON :: Value -> Parser FilesystemStoreConfig
$cparseJSONList :: Value -> Parser [FilesystemStoreConfig]
parseJSONList :: Value -> Parser [FilesystemStoreConfig]
$comittedField :: Maybe FilesystemStoreConfig
omittedField :: Maybe FilesystemStoreConfig
FromJSON, [FilesystemStoreConfig] -> Value
[FilesystemStoreConfig] -> Encoding
FilesystemStoreConfig -> Bool
FilesystemStoreConfig -> Value
FilesystemStoreConfig -> Encoding
(FilesystemStoreConfig -> Value)
-> (FilesystemStoreConfig -> Encoding)
-> ([FilesystemStoreConfig] -> Value)
-> ([FilesystemStoreConfig] -> Encoding)
-> (FilesystemStoreConfig -> Bool)
-> ToJSON FilesystemStoreConfig
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: FilesystemStoreConfig -> Value
toJSON :: FilesystemStoreConfig -> Value
$ctoEncoding :: FilesystemStoreConfig -> Encoding
toEncoding :: FilesystemStoreConfig -> Encoding
$ctoJSONList :: [FilesystemStoreConfig] -> Value
toJSONList :: [FilesystemStoreConfig] -> Value
$ctoEncodingList :: [FilesystemStoreConfig] -> Encoding
toEncodingList :: [FilesystemStoreConfig] -> Encoding
$comitField :: FilesystemStoreConfig -> Bool
omitField :: FilesystemStoreConfig -> Bool
ToJSON)
data EventLogEntry = EventLogEntry
{ EventLogEntry -> UUID
transactionId :: UUID
, EventLogEntry -> [StoredEvent]
events :: [StoredEvent]
, EventLogEntry -> UTCTime
timestamp :: UTCTime
}
deriving stock (Int -> EventLogEntry -> ShowS
[EventLogEntry] -> ShowS
EventLogEntry -> FilePath
(Int -> EventLogEntry -> ShowS)
-> (EventLogEntry -> FilePath)
-> ([EventLogEntry] -> ShowS)
-> Show EventLogEntry
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> EventLogEntry -> ShowS
showsPrec :: Int -> EventLogEntry -> ShowS
$cshow :: EventLogEntry -> FilePath
show :: EventLogEntry -> FilePath
$cshowList :: [EventLogEntry] -> ShowS
showList :: [EventLogEntry] -> ShowS
Show, EventLogEntry -> EventLogEntry -> Bool
(EventLogEntry -> EventLogEntry -> Bool)
-> (EventLogEntry -> EventLogEntry -> Bool) -> Eq EventLogEntry
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: EventLogEntry -> EventLogEntry -> Bool
== :: EventLogEntry -> EventLogEntry -> Bool
$c/= :: EventLogEntry -> EventLogEntry -> Bool
/= :: EventLogEntry -> EventLogEntry -> Bool
Eq, (forall x. EventLogEntry -> Rep EventLogEntry x)
-> (forall x. Rep EventLogEntry x -> EventLogEntry)
-> Generic EventLogEntry
forall x. Rep EventLogEntry x -> EventLogEntry
forall x. EventLogEntry -> Rep EventLogEntry x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. EventLogEntry -> Rep EventLogEntry x
from :: forall x. EventLogEntry -> Rep EventLogEntry x
$cto :: forall x. Rep EventLogEntry x -> EventLogEntry
to :: forall x. Rep EventLogEntry x -> EventLogEntry
Generic)
deriving anyclass (Maybe EventLogEntry
Value -> Parser [EventLogEntry]
Value -> Parser EventLogEntry
(Value -> Parser EventLogEntry)
-> (Value -> Parser [EventLogEntry])
-> Maybe EventLogEntry
-> FromJSON EventLogEntry
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser EventLogEntry
parseJSON :: Value -> Parser EventLogEntry
$cparseJSONList :: Value -> Parser [EventLogEntry]
parseJSONList :: Value -> Parser [EventLogEntry]
$comittedField :: Maybe EventLogEntry
omittedField :: Maybe EventLogEntry
FromJSON, [EventLogEntry] -> Value
[EventLogEntry] -> Encoding
EventLogEntry -> Bool
EventLogEntry -> Value
EventLogEntry -> Encoding
(EventLogEntry -> Value)
-> (EventLogEntry -> Encoding)
-> ([EventLogEntry] -> Value)
-> ([EventLogEntry] -> Encoding)
-> (EventLogEntry -> Bool)
-> ToJSON EventLogEntry
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: EventLogEntry -> Value
toJSON :: EventLogEntry -> Value
$ctoEncoding :: EventLogEntry -> Encoding
toEncoding :: EventLogEntry -> Encoding
$ctoJSONList :: [EventLogEntry] -> Value
toJSONList :: [EventLogEntry] -> Value
$ctoEncodingList :: [EventLogEntry] -> Encoding
toEncodingList :: [EventLogEntry] -> Encoding
$comitField :: EventLogEntry -> Bool
omitField :: EventLogEntry -> Bool
ToJSON)
newtype FilesystemCursor = FilesystemCursor
{ FilesystemCursor -> Integer
getSequenceNo :: Integer
}
deriving (Int -> FilesystemCursor -> ShowS
[FilesystemCursor] -> ShowS
FilesystemCursor -> FilePath
(Int -> FilesystemCursor -> ShowS)
-> (FilesystemCursor -> FilePath)
-> ([FilesystemCursor] -> ShowS)
-> Show FilesystemCursor
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> FilesystemCursor -> ShowS
showsPrec :: Int -> FilesystemCursor -> ShowS
$cshow :: FilesystemCursor -> FilePath
show :: FilesystemCursor -> FilePath
$cshowList :: [FilesystemCursor] -> ShowS
showList :: [FilesystemCursor] -> ShowS
Show, FilesystemCursor -> FilesystemCursor -> Bool
(FilesystemCursor -> FilesystemCursor -> Bool)
-> (FilesystemCursor -> FilesystemCursor -> Bool)
-> Eq FilesystemCursor
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: FilesystemCursor -> FilesystemCursor -> Bool
== :: FilesystemCursor -> FilesystemCursor -> Bool
$c/= :: FilesystemCursor -> FilesystemCursor -> Bool
/= :: FilesystemCursor -> FilesystemCursor -> Bool
Eq, Eq FilesystemCursor
Eq FilesystemCursor =>
(FilesystemCursor -> FilesystemCursor -> Ordering)
-> (FilesystemCursor -> FilesystemCursor -> Bool)
-> (FilesystemCursor -> FilesystemCursor -> Bool)
-> (FilesystemCursor -> FilesystemCursor -> Bool)
-> (FilesystemCursor -> FilesystemCursor -> Bool)
-> (FilesystemCursor -> FilesystemCursor -> FilesystemCursor)
-> (FilesystemCursor -> FilesystemCursor -> FilesystemCursor)
-> Ord FilesystemCursor
FilesystemCursor -> FilesystemCursor -> Bool
FilesystemCursor -> FilesystemCursor -> Ordering
FilesystemCursor -> FilesystemCursor -> FilesystemCursor
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: FilesystemCursor -> FilesystemCursor -> Ordering
compare :: FilesystemCursor -> FilesystemCursor -> Ordering
$c< :: FilesystemCursor -> FilesystemCursor -> Bool
< :: FilesystemCursor -> FilesystemCursor -> Bool
$c<= :: FilesystemCursor -> FilesystemCursor -> Bool
<= :: FilesystemCursor -> FilesystemCursor -> Bool
$c> :: FilesystemCursor -> FilesystemCursor -> Bool
> :: FilesystemCursor -> FilesystemCursor -> Bool
$c>= :: FilesystemCursor -> FilesystemCursor -> Bool
>= :: FilesystemCursor -> FilesystemCursor -> Bool
$cmax :: FilesystemCursor -> FilesystemCursor -> FilesystemCursor
max :: FilesystemCursor -> FilesystemCursor -> FilesystemCursor
$cmin :: FilesystemCursor -> FilesystemCursor -> FilesystemCursor
min :: FilesystemCursor -> FilesystemCursor -> FilesystemCursor
Ord, (forall x. FilesystemCursor -> Rep FilesystemCursor x)
-> (forall x. Rep FilesystemCursor x -> FilesystemCursor)
-> Generic FilesystemCursor
forall x. Rep FilesystemCursor x -> FilesystemCursor
forall x. FilesystemCursor -> Rep FilesystemCursor x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. FilesystemCursor -> Rep FilesystemCursor x
from :: forall x. FilesystemCursor -> Rep FilesystemCursor x
$cto :: forall x. Rep FilesystemCursor x -> FilesystemCursor
to :: forall x. Rep FilesystemCursor x -> FilesystemCursor
Generic)
deriving anyclass (Maybe FilesystemCursor
Value -> Parser [FilesystemCursor]
Value -> Parser FilesystemCursor
(Value -> Parser FilesystemCursor)
-> (Value -> Parser [FilesystemCursor])
-> Maybe FilesystemCursor
-> FromJSON FilesystemCursor
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser FilesystemCursor
parseJSON :: Value -> Parser FilesystemCursor
$cparseJSONList :: Value -> Parser [FilesystemCursor]
parseJSONList :: Value -> Parser [FilesystemCursor]
$comittedField :: Maybe FilesystemCursor
omittedField :: Maybe FilesystemCursor
FromJSON, [FilesystemCursor] -> Value
[FilesystemCursor] -> Encoding
FilesystemCursor -> Bool
FilesystemCursor -> Value
FilesystemCursor -> Encoding
(FilesystemCursor -> Value)
-> (FilesystemCursor -> Encoding)
-> ([FilesystemCursor] -> Value)
-> ([FilesystemCursor] -> Encoding)
-> (FilesystemCursor -> Bool)
-> ToJSON FilesystemCursor
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: FilesystemCursor -> Value
toJSON :: FilesystemCursor -> Value
$ctoEncoding :: FilesystemCursor -> Encoding
toEncoding :: FilesystemCursor -> Encoding
$ctoJSONList :: [FilesystemCursor] -> Value
toJSONList :: [FilesystemCursor] -> Value
$ctoEncodingList :: [FilesystemCursor] -> Encoding
toEncodingList :: [FilesystemCursor] -> Encoding
$comitField :: FilesystemCursor -> Bool
omitField :: FilesystemCursor -> Bool
ToJSON)
data Notifier = Notifier
{ Notifier -> Async ()
notifierThread :: Async ()
, Notifier -> Async ()
reloadThread :: Async ()
}
data FilesystemStore
type instance Cursor FilesystemStore = FilesystemCursor
type instance BackendHandle FilesystemStore = FilesystemStoreHandle
data StoreException
= LockTimeout FilePath
| CorruptEventLog FilePath String
deriving (Int -> StoreException -> ShowS
[StoreException] -> ShowS
StoreException -> FilePath
(Int -> StoreException -> ShowS)
-> (StoreException -> FilePath)
-> ([StoreException] -> ShowS)
-> Show StoreException
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> StoreException -> ShowS
showsPrec :: Int -> StoreException -> ShowS
$cshow :: StoreException -> FilePath
show :: StoreException -> FilePath
$cshowList :: [StoreException] -> ShowS
showList :: [StoreException] -> ShowS
Show)
instance Exception StoreException
data FilesystemStoreHandle = FilesystemStoreHandle
{ FilesystemStoreHandle -> FilesystemStoreConfig
config :: FilesystemStoreConfig
, FilesystemStoreHandle -> TVar (StoreState FilesystemStore)
stateVar :: TVar (StoreState FilesystemStore)
, FilesystemStoreHandle -> Notifier
notifier :: Notifier
}
data StorePaths = StorePaths
{ StorePaths -> FilePath
eventLogPath :: FilePath
, StorePaths -> FilePath
storeLockPath :: FilePath
}
getPaths ::
FilePath ->
StorePaths
getPaths :: FilePath -> StorePaths
getPaths FilePath
base =
StorePaths
{ eventLogPath :: FilePath
eventLogPath = FilePath
base FilePath -> ShowS
</> FilePath
"events.log"
, storeLockPath :: FilePath
storeLockPath = FilePath
base FilePath -> ShowS
</> FilePath
"store.lock"
}
mkDefaultConfig ::
FilePath ->
FilesystemStoreConfig
mkDefaultConfig :: FilePath -> FilesystemStoreConfig
mkDefaultConfig FilePath
path =
FilesystemStoreConfig
{ storePath :: FilePath
storePath = FilePath
path
, syncInterval :: Int
syncInterval = Int
1
, lockTimeout :: Int
lockTimeout = Int
5000000
}
getStoreConfig ::
FilesystemStoreHandle ->
FilesystemStoreConfig
getStoreConfig :: FilesystemStoreHandle -> FilesystemStoreConfig
getStoreConfig = (.config)
ensureDirectories :: FilePath -> IO ()
ensureDirectories :: FilePath -> IO ()
ensureDirectories FilePath
path = do
let dir :: FilePath
dir = ShowS
takeDirectory FilePath
path
Bool -> FilePath -> IO ()
createDirectoryIfMissing Bool
True FilePath
dir
withStoreLockDirect :: FilesystemStoreConfig -> IO a -> IO a
withStoreLockDirect :: forall a. FilesystemStoreConfig -> IO a -> IO a
withStoreLockDirect FilesystemStoreConfig
config IO a
action = do
let paths :: StorePaths
paths = FilePath -> StorePaths
getPaths FilesystemStoreConfig
config.storePath
result <-
Int -> IO a -> IO (Maybe a)
forall a. Int -> IO a -> IO (Maybe a)
timeout FilesystemStoreConfig
config.lockTimeout (IO a -> IO (Maybe a)) -> IO a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$
IO FileLock -> (FileLock -> IO ()) -> (FileLock -> IO a) -> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
(FilePath -> SharedExclusive -> IO FileLock
FL.lockFile StorePaths
paths.storeLockPath SharedExclusive
FL.Exclusive)
FileLock -> IO ()
FL.unlockFile
(IO a -> FileLock -> IO a
forall a b. a -> b -> a
const IO a
action)
case result of
Maybe a
Nothing -> StoreException -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (StoreException -> IO a) -> StoreException -> IO a
forall a b. (a -> b) -> a -> b
$ FilePath -> StoreException
LockTimeout StorePaths
paths.storeLockPath
Just a
value -> a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
value
withStoreLock :: FilesystemStoreHandle -> IO a -> IO a
withStoreLock :: forall a. FilesystemStoreHandle -> IO a -> IO a
withStoreLock FilesystemStoreHandle
handle = FilesystemStoreConfig -> IO a -> IO a
forall a. FilesystemStoreConfig -> IO a -> IO a
withStoreLockDirect FilesystemStoreHandle
handle.config
newFilesystemStore ::
FilesystemStoreConfig ->
IO FilesystemStoreHandle
newFilesystemStore :: FilesystemStoreConfig -> IO FilesystemStoreHandle
newFilesystemStore FilesystemStoreConfig
config = do
let paths :: StorePaths
paths = FilePath -> StorePaths
getPaths FilesystemStoreConfig
config.storePath
FilePath -> IO ()
ensureDirectories FilesystemStoreConfig
config.storePath
exists <- FilePath -> IO Bool
doesFileExist StorePaths
paths.eventLogPath
when (not exists) $ BS.writeFile paths.eventLogPath ""
globalVar <- newTVarIO (-1)
stateVar <-
newTVarIO $
StoreState
{ nextSequence = 0
, events = Map.empty
, streamEvents = Map.empty
, streamVersions = Map.empty
, streamLocalVersions = Map.empty
, streamNotifications = Map.empty
, globalNotification = globalVar
}
notifier <- startNotifier paths.eventLogPath stateVar config
let handle = FilesystemStoreHandle{TVar (StoreState FilesystemStore)
Notifier
FilesystemStoreConfig
config :: FilesystemStoreConfig
stateVar :: TVar (StoreState FilesystemStore)
notifier :: Notifier
config :: FilesystemStoreConfig
stateVar :: TVar (StoreState FilesystemStore)
notifier :: Notifier
..}
reloadResult <- try @StoreException $ do
logExists <- doesFileExist paths.eventLogPath
when logExists $ do
entries <- readLogEntries handle
let completedEvents = Map UUID [StoredEvent] -> [[StoredEvent]]
forall k a. Map k a -> [a]
Map.elems (Map UUID [StoredEvent] -> [[StoredEvent]])
-> Map UUID [StoredEvent] -> [[StoredEvent]]
forall a b. (a -> b) -> a -> b
$ [EventLogEntry] -> Map UUID [StoredEvent]
processLogEntries [EventLogEntry]
entries
maxSeqNo = [Integer] -> Integer
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum ([Integer] -> Integer) -> [Integer] -> Integer
forall a b. (a -> b) -> a -> b
$ Integer
0 Integer -> [Integer] -> [Integer]
forall a. a -> [a] -> [a]
: [StoredEvent
e.seqNo | [StoredEvent]
es <- [[StoredEvent]]
completedEvents, StoredEvent
e <- [StoredEvent]
es]
when (not $ null completedEvents) $ atomically $ do
modifyTVar' stateVar $ \StoreState FilesystemStore
state ->
([StoredEvent]
-> StoreState FilesystemStore -> StoreState FilesystemStore)
-> StoreState FilesystemStore
-> [[StoredEvent]]
-> StoreState FilesystemStore
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr
((StoreState FilesystemStore
-> [StoredEvent] -> StoreState FilesystemStore)
-> [StoredEvent]
-> StoreState FilesystemStore
-> StoreState FilesystemStore
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((StoredEvent
-> StoreState FilesystemStore -> StoreState FilesystemStore)
-> StoreState FilesystemStore
-> [StoredEvent]
-> StoreState FilesystemStore
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr StoredEvent
-> StoreState FilesystemStore -> StoreState FilesystemStore
forall backend.
StoreCursor backend =>
StoredEvent -> StoreState backend -> StoreState backend
updateState))
StoreState FilesystemStore
state{nextSequence = maxSeqNo + 1}
[[StoredEvent]]
completedEvents
state <- readTVar stateVar
writeTVar state.globalNotification maxSeqNo
case reloadResult of
Right () -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left (LockTimeout FilePath
_) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left (CorruptEventLog FilePath
path FilePath
reason) -> StoreException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (StoreException -> IO ()) -> StoreException -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> FilePath -> StoreException
CorruptEventLog FilePath
path FilePath
reason
pure handle
decodeLogEntriesStrict :: FilePath -> BL.ByteString -> IO [EventLogEntry]
decodeLogEntriesStrict :: FilePath -> ByteString -> IO [EventLogEntry]
decodeLogEntriesStrict FilePath
path ByteString
contents = do
let lines' :: [ByteString]
lines' = Char -> ByteString -> [ByteString]
BL8.split Char
'\n' ByteString
contents
nonEmptyLines :: [ByteString]
nonEmptyLines = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool) -> (ByteString -> Bool) -> ByteString -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Bool
BL.null) [ByteString]
lines'
((Integer, ByteString) -> IO EventLogEntry)
-> [(Integer, ByteString)] -> IO [EventLogEntry]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (Integer, ByteString) -> IO EventLogEntry
decodeOne ([Integer] -> [ByteString] -> [(Integer, ByteString)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Integer
1 ..] [ByteString]
nonEmptyLines)
where
decodeOne :: (Integer, BL.ByteString) -> IO EventLogEntry
decodeOne :: (Integer, ByteString) -> IO EventLogEntry
decodeOne (Integer
lineNum, ByteString
line) =
case ByteString -> Maybe EventLogEntry
forall a. FromJSON a => ByteString -> Maybe a
decode ByteString
line of
Just EventLogEntry
entry -> EventLogEntry -> IO EventLogEntry
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure EventLogEntry
entry
Maybe EventLogEntry
Nothing ->
StoreException -> IO EventLogEntry
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (StoreException -> IO EventLogEntry)
-> StoreException -> IO EventLogEntry
forall a b. (a -> b) -> a -> b
$
FilePath -> FilePath -> StoreException
CorruptEventLog FilePath
path (FilePath -> StoreException) -> FilePath -> StoreException
forall a b. (a -> b) -> a -> b
$
FilePath
"Failed to parse JSON at line "
FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ Integer -> FilePath
forall a. Show a => a -> FilePath
show Integer
lineNum
FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ FilePath
": "
FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> ShowS
forall a. Int -> [a] -> [a]
take Int
100 (ByteString -> FilePath
BL8.unpack ByteString
line)
processLogEntries :: [EventLogEntry] -> Map UUID [StoredEvent]
processLogEntries :: [EventLogEntry] -> Map UUID [StoredEvent]
processLogEntries [EventLogEntry]
entries = [(UUID, [StoredEvent])] -> Map UUID [StoredEvent]
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(EventLogEntry
e.transactionId, EventLogEntry
e.events) | EventLogEntry
e <- [EventLogEntry]
entries]
readLogEntries :: FilesystemStoreHandle -> IO [EventLogEntry]
readLogEntries :: FilesystemStoreHandle -> IO [EventLogEntry]
readLogEntries FilesystemStoreHandle
handle =
FilesystemStoreHandle -> IO [EventLogEntry] -> IO [EventLogEntry]
forall a. FilesystemStoreHandle -> IO a -> IO a
withStoreLock FilesystemStoreHandle
handle (IO [EventLogEntry] -> IO [EventLogEntry])
-> IO [EventLogEntry] -> IO [EventLogEntry]
forall a b. (a -> b) -> a -> b
$ do
let paths :: StorePaths
paths = FilePath -> StorePaths
getPaths FilesystemStoreHandle
handle.config.storePath
contents <- FilePath -> IO ByteString
BL.readFile StorePaths
paths.eventLogPath
decodeLogEntriesStrict paths.eventLogPath contents
startNotifier :: FilePath -> TVar (StoreState FilesystemStore) -> FilesystemStoreConfig -> IO Notifier
startNotifier :: FilePath
-> TVar (StoreState FilesystemStore)
-> FilesystemStoreConfig
-> IO Notifier
startNotifier FilePath
eventLogPath TVar (StoreState FilesystemStore)
stateVar FilesystemStoreConfig
config = do
chan <- IO (TChan ())
forall a. IO (TChan a)
newBroadcastTChanIO
notifyThread <- async $ notifierLoop eventLogPath chan
reloadChan <- atomically $ dupTChan chan
reloadThread <- async $ forever $ do
atomically $ readTChan reloadChan
reloadEventsFromDiskCentralWithRetry stateVar config 0
pure $ Notifier notifyThread reloadThread
shutdownNotifier :: Notifier -> IO ()
shutdownNotifier :: Notifier -> IO ()
shutdownNotifier Notifier
notifier = do
Async () -> IO ()
forall a. Async a -> IO ()
cancel Notifier
notifier.notifierThread
Async () -> IO ()
forall a. Async a -> IO ()
cancel Notifier
notifier.reloadThread
notifierLoop :: FilePath -> TChan () -> IO ()
notifierLoop :: FilePath -> TChan () -> IO ()
notifierLoop FilePath
eventLogPath TChan ()
chan = do
canonicalEventLogPath <- FilePath -> IO FilePath
canonicalizePath FilePath
eventLogPath
withManager $ \WatchManager
mgr -> do
let watchDir' :: FilePath
watchDir' = ShowS
takeDirectory FilePath
canonicalEventLogPath
predicate :: Event -> Bool
predicate Event
event = Event -> FilePath
eventPath Event
event FilePath -> FilePath -> Bool
forall a. Eq a => a -> a -> Bool
== FilePath
canonicalEventLogPath
handler :: Event -> IO ()
handler Event
_event = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan () -> () -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan ()
chan ()
IO (IO ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ WatchManager
-> FilePath -> (Event -> Bool) -> (Event -> IO ()) -> IO (IO ())
watchDir WatchManager
mgr FilePath
watchDir' Event -> Bool
predicate Event -> IO ()
handler
IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
forall a. Bounded a => a
maxBound
reloadEventsFromDiskCentralWithRetry :: TVar (StoreState FilesystemStore) -> FilesystemStoreConfig -> Int -> IO ()
reloadEventsFromDiskCentralWithRetry :: TVar (StoreState FilesystemStore)
-> FilesystemStoreConfig -> Int -> IO ()
reloadEventsFromDiskCentralWithRetry TVar (StoreState FilesystemStore)
stateVar FilesystemStoreConfig
config Int
retryCount = do
result <- IO () -> IO (Either StoreException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (IO () -> IO (Either StoreException ()))
-> IO () -> IO (Either StoreException ())
forall a b. (a -> b) -> a -> b
$ TVar (StoreState FilesystemStore) -> FilesystemStoreConfig -> IO ()
reloadEventsFromDiskCentral TVar (StoreState FilesystemStore)
stateVar FilesystemStoreConfig
config
case result of
Right () -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left (LockTimeout FilePath
path) -> do
if Int
retryCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
5
then do
FilePath -> IO ()
putStrLn (FilePath -> IO ()) -> FilePath -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath
"WARNING: Failed to reload events after 5 retries due to lock contention on " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ FilePath
path
() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
else do
let delayMicros :: Int
delayMicros = Int
10000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* (Int
2 Int -> Int -> Int
forall a b. (Num a, Integral b) => a -> b -> a
^ Int
retryCount)
Int -> IO ()
threadDelay Int
delayMicros
TVar (StoreState FilesystemStore)
-> FilesystemStoreConfig -> Int -> IO ()
reloadEventsFromDiskCentralWithRetry TVar (StoreState FilesystemStore)
stateVar FilesystemStoreConfig
config (Int
retryCount Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
Left (CorruptEventLog FilePath
path FilePath
reason) -> do
FilePath -> IO ()
putStrLn (FilePath -> IO ()) -> FilePath -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath
"FATAL: Event log corrupted at " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ FilePath
path
FilePath -> IO ()
putStrLn (FilePath -> IO ()) -> FilePath -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath
"Reason: " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ FilePath
reason
FilePath -> IO ()
putStrLn FilePath
"This indicates either a serious bug in the event store or external process tampering."
FilePath -> IO ()
putStrLn FilePath
"The reload thread will now terminate. Manual intervention is required."
StoreException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (StoreException -> IO ()) -> StoreException -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> FilePath -> StoreException
CorruptEventLog FilePath
path FilePath
reason
reloadEventsFromDiskCentral :: TVar (StoreState FilesystemStore) -> FilesystemStoreConfig -> IO ()
reloadEventsFromDiskCentral :: TVar (StoreState FilesystemStore) -> FilesystemStoreConfig -> IO ()
reloadEventsFromDiskCentral TVar (StoreState FilesystemStore)
stateVar FilesystemStoreConfig
config = do
entries <- FilesystemStoreConfig -> IO [EventLogEntry] -> IO [EventLogEntry]
forall a. FilesystemStoreConfig -> IO a -> IO a
withStoreLockDirect FilesystemStoreConfig
config (IO [EventLogEntry] -> IO [EventLogEntry])
-> IO [EventLogEntry] -> IO [EventLogEntry]
forall a b. (a -> b) -> a -> b
$ do
let paths :: StorePaths
paths = FilePath -> StorePaths
getPaths FilesystemStoreConfig
config.storePath
contents <- FilePath -> IO ByteString
BS.readFile StorePaths
paths.eventLogPath
decodeLogEntriesStrict paths.eventLogPath (BL.fromStrict contents)
let completedEvents = Map UUID [StoredEvent] -> [[StoredEvent]]
forall k a. Map k a -> [a]
Map.elems (Map UUID [StoredEvent] -> [[StoredEvent]])
-> Map UUID [StoredEvent] -> [[StoredEvent]]
forall a b. (a -> b) -> a -> b
$ [EventLogEntry] -> Map UUID [StoredEvent]
processLogEntries [EventLogEntry]
entries
atomically $ do
state <- readTVar stateVar
let currentMaxSeq = StoreState FilesystemStore
state.nextSequence Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
1
newEvents = [StoredEvent
e | [StoredEvent]
es <- [[StoredEvent]]
completedEvents, StoredEvent
e <- [StoredEvent]
es, StoredEvent
e.seqNo Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
> Integer
currentMaxSeq]
newMaxSeq = [Integer] -> Integer
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum ([Integer] -> Integer) -> [Integer] -> Integer
forall a b. (a -> b) -> a -> b
$ Integer
currentMaxSeq Integer -> [Integer] -> [Integer]
forall a. a -> [a] -> [a]
: [StoredEvent
e.seqNo | StoredEvent
e <- [StoredEvent]
newEvents]
when (not $ null newEvents) $ do
let newState = (StoredEvent
-> StoreState FilesystemStore -> StoreState FilesystemStore)
-> StoreState FilesystemStore
-> [StoredEvent]
-> StoreState FilesystemStore
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr StoredEvent
-> StoreState FilesystemStore -> StoreState FilesystemStore
forall backend.
StoreCursor backend =>
StoredEvent -> StoreState backend -> StoreState backend
updateState StoreState FilesystemStore
state [StoredEvent]
newEvents
writeTVar stateVar newState{nextSequence = newMaxSeq + 1}
writeTVar newState.globalNotification newMaxSeq
subscribeFilesystem ::
forall m ts.
(MonadUnliftIO m) =>
FilesystemStoreHandle ->
EventMatcher ts FilesystemStore m ->
EventSelector FilesystemStore ->
m (SubscriptionHandle FilesystemStore)
subscribeFilesystem :: forall (m :: * -> *) (ts :: [Symbol]).
MonadUnliftIO m =>
FilesystemStoreHandle
-> EventMatcher ts FilesystemStore m
-> EventSelector FilesystemStore
-> m (SubscriptionHandle FilesystemStore)
subscribeFilesystem FilesystemStoreHandle
handle EventMatcher ts FilesystemStore m
matcher EventSelector FilesystemStore
selector =
TVar (StoreState FilesystemStore)
-> EventMatcher ts FilesystemStore m
-> EventSelector FilesystemStore
-> m (SubscriptionHandle FilesystemStore)
forall (m :: * -> *) backend (ts :: [Symbol]).
(MonadUnliftIO m, StoreCursor backend, Show (Cursor backend)) =>
TVar (StoreState backend)
-> EventMatcher ts backend m
-> EventSelector backend
-> m (SubscriptionHandle backend)
subscribeToEvents FilesystemStoreHandle
handle.stateVar EventMatcher ts FilesystemStore m
matcher EventSelector FilesystemStore
selector
cleanupFilesystemStore ::
FilesystemStoreHandle ->
IO ()
cleanupFilesystemStore :: FilesystemStoreHandle -> IO ()
cleanupFilesystemStore FilesystemStoreHandle
handle = do
let paths :: StorePaths
paths = FilePath -> StorePaths
getPaths FilesystemStoreHandle
handle.config.storePath
Notifier -> IO ()
shutdownNotifier FilesystemStoreHandle
handle.notifier
FilePath -> IO Bool
doesFileExist StorePaths
paths.storeLockPath IO Bool -> (Bool -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Bool -> IO () -> IO ()) -> IO () -> Bool -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (FilePath -> IO ()
removeFile StorePaths
paths.storeLockPath)
instance EventStore FilesystemStore where
type StoreConstraints FilesystemStore m = (MonadUnliftIO m)
insertEvents :: forall (t :: * -> *) (m :: * -> *).
(Traversable t, StoreConstraints FilesystemStore m) =>
BackendHandle FilesystemStore
-> Maybe CorrelationId
-> Transaction t FilesystemStore
-> m (InsertionResult FilesystemStore)
insertEvents BackendHandle FilesystemStore
handle Maybe CorrelationId
corrId (Transaction Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
batches) = IO (InsertionResult FilesystemStore)
-> m (InsertionResult FilesystemStore)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (InsertionResult FilesystemStore)
-> m (InsertionResult FilesystemStore))
-> IO (InsertionResult FilesystemStore)
-> m (InsertionResult FilesystemStore)
forall a b. (a -> b) -> a -> b
$ do
txId <- IO UUID
UUID.nextRandom
now <- getCurrentTime
result <- try $ withStoreLock handle $ do
diskMaxSeq <- do
let paths = FilePath -> StorePaths
getPaths BackendHandle FilesystemStore
FilesystemStoreHandle
handle.config.storePath
diskContents <- BS.readFile paths.eventLogPath
let lastLine = [ByteString] -> Maybe ByteString
forall a. [a] -> Maybe a
listToMaybe ([ByteString] -> Maybe ByteString)
-> [ByteString] -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> [ByteString]
forall a. [a] -> [a]
reverse ([ByteString] -> [ByteString]) -> [ByteString] -> [ByteString]
forall a b. (a -> b) -> a -> b
$ (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool) -> (ByteString -> Bool) -> ByteString -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Bool
BS.null) ([ByteString] -> [ByteString]) -> [ByteString] -> [ByteString]
forall a b. (a -> b) -> a -> b
$ Char -> ByteString -> [ByteString]
BS8.split Char
'\n' ByteString
diskContents
lastEntry :: Maybe EventLogEntry
lastEntry = Maybe ByteString
lastLine Maybe ByteString
-> (ByteString -> Maybe EventLogEntry) -> Maybe EventLogEntry
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ByteString -> Maybe EventLogEntry
forall a. FromJSON a => ByteString -> Maybe a
decode (ByteString -> Maybe EventLogEntry)
-> (ByteString -> ByteString) -> ByteString -> Maybe EventLogEntry
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BL.fromStrict
lastSeqNos = [Integer]
-> (EventLogEntry -> [Integer]) -> Maybe EventLogEntry -> [Integer]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] (\EventLogEntry
entry -> (StoredEvent -> Integer) -> [StoredEvent] -> [Integer]
forall a b. (a -> b) -> [a] -> [b]
map StoredEvent -> Integer
seqNo EventLogEntry
entry.events) Maybe EventLogEntry
lastEntry
pure $ maximum $ (-1) : lastSeqNos
versionCheckResult <- atomically $ do
state <- readTVar handle.stateVar
case checkAllVersions state batches of
Left EventStoreError FilesystemStore
err -> Either
(EventStoreError FilesystemStore)
(StoreState FilesystemStore, FilesystemCursor,
Map StreamId FilesystemCursor, [StoredEvent])
-> STM
(Either
(EventStoreError FilesystemStore)
(StoreState FilesystemStore, FilesystemCursor,
Map StreamId FilesystemCursor, [StoredEvent]))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either
(EventStoreError FilesystemStore)
(StoreState FilesystemStore, FilesystemCursor,
Map StreamId FilesystemCursor, [StoredEvent])
-> STM
(Either
(EventStoreError FilesystemStore)
(StoreState FilesystemStore, FilesystemCursor,
Map StreamId FilesystemCursor, [StoredEvent])))
-> Either
(EventStoreError FilesystemStore)
(StoreState FilesystemStore, FilesystemCursor,
Map StreamId FilesystemCursor, [StoredEvent])
-> STM
(Either
(EventStoreError FilesystemStore)
(StoreState FilesystemStore, FilesystemCursor,
Map StreamId FilesystemCursor, [StoredEvent]))
forall a b. (a -> b) -> a -> b
$ EventStoreError FilesystemStore
-> Either
(EventStoreError FilesystemStore)
(StoreState FilesystemStore, FilesystemCursor,
Map StreamId FilesystemCursor, [StoredEvent])
forall a b. a -> Either a b
Left EventStoreError FilesystemStore
err
Right () -> do
let actualNextSeq :: Integer
actualNextSeq = Integer -> Integer -> Integer
forall a. Ord a => a -> a -> a
max StoreState FilesystemStore
state.nextSequence (Integer
diskMaxSeq Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
1)
stateWithCorrectSeq :: StoreState FilesystemStore
stateWithCorrectSeq = StoreState FilesystemStore
state{nextSequence = actualNextSeq}
eventIds :: [EventId]
eventIds = Int -> EventId -> [EventId]
forall a. Int -> a -> [a]
replicate ([Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int] -> Int) -> [Int] -> Int
forall a b. (a -> b) -> a -> b
$ (StreamWrite t SomeLatestEvent FilesystemStore -> Int)
-> [StreamWrite t SomeLatestEvent FilesystemStore] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (t SomeLatestEvent -> Int
forall a. t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (t SomeLatestEvent -> Int)
-> (StreamWrite t SomeLatestEvent FilesystemStore
-> t SomeLatestEvent)
-> StreamWrite t SomeLatestEvent FilesystemStore
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (.events)) ([StreamWrite t SomeLatestEvent FilesystemStore] -> [Int])
-> [StreamWrite t SomeLatestEvent FilesystemStore] -> [Int]
forall a b. (a -> b) -> a -> b
$ Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
-> [StreamWrite t SomeLatestEvent FilesystemStore]
forall k a. Map k a -> [a]
Map.elems Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
batches) (UUID -> EventId
EventId UUID
txId)
(StoreState FilesystemStore
newState, Cursor FilesystemStore
finalCursor, Map StreamId (Cursor FilesystemStore)
streamCursors) = StoreState FilesystemStore
-> Maybe CorrelationId
-> UTCTime
-> [EventId]
-> Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
-> (StoreState FilesystemStore, Cursor FilesystemStore,
Map StreamId (Cursor FilesystemStore))
forall backend (t :: * -> *).
(StoreCursor backend, Foldable t) =>
StoreState backend
-> Maybe CorrelationId
-> UTCTime
-> [EventId]
-> Map StreamId (StreamWrite t SomeLatestEvent backend)
-> (StoreState backend, Cursor backend,
Map StreamId (Cursor backend))
insertAllEvents StoreState FilesystemStore
stateWithCorrectSeq Maybe CorrelationId
corrId UTCTime
now [EventId]
eventIds Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
batches
newEvents :: [StoredEvent]
newEvents =
[ StoredEvent
event
| StoredEvent
event <- Map Integer StoredEvent -> [StoredEvent]
forall k a. Map k a -> [a]
Map.elems StoreState FilesystemStore
newState.events
, Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Integer -> Map Integer StoredEvent -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member StoredEvent
event.seqNo StoreState FilesystemStore
stateWithCorrectSeq.events
]
Either
(EventStoreError FilesystemStore)
(StoreState FilesystemStore, FilesystemCursor,
Map StreamId FilesystemCursor, [StoredEvent])
-> STM
(Either
(EventStoreError FilesystemStore)
(StoreState FilesystemStore, FilesystemCursor,
Map StreamId FilesystemCursor, [StoredEvent]))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either
(EventStoreError FilesystemStore)
(StoreState FilesystemStore, FilesystemCursor,
Map StreamId FilesystemCursor, [StoredEvent])
-> STM
(Either
(EventStoreError FilesystemStore)
(StoreState FilesystemStore, FilesystemCursor,
Map StreamId FilesystemCursor, [StoredEvent])))
-> Either
(EventStoreError FilesystemStore)
(StoreState FilesystemStore, FilesystemCursor,
Map StreamId FilesystemCursor, [StoredEvent])
-> STM
(Either
(EventStoreError FilesystemStore)
(StoreState FilesystemStore, FilesystemCursor,
Map StreamId FilesystemCursor, [StoredEvent]))
forall a b. (a -> b) -> a -> b
$ (StoreState FilesystemStore, FilesystemCursor,
Map StreamId FilesystemCursor, [StoredEvent])
-> Either
(EventStoreError FilesystemStore)
(StoreState FilesystemStore, FilesystemCursor,
Map StreamId FilesystemCursor, [StoredEvent])
forall a b. b -> Either a b
Right (StoreState FilesystemStore
newState, Cursor FilesystemStore
FilesystemCursor
finalCursor, Map StreamId (Cursor FilesystemStore)
Map StreamId FilesystemCursor
streamCursors, [StoredEvent]
newEvents)
case versionCheckResult of
Left EventStoreError FilesystemStore
err -> Either
(EventStoreError FilesystemStore)
(FilesystemCursor, Map StreamId FilesystemCursor)
-> IO
(Either
(EventStoreError FilesystemStore)
(FilesystemCursor, Map StreamId FilesystemCursor))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either
(EventStoreError FilesystemStore)
(FilesystemCursor, Map StreamId FilesystemCursor)
-> IO
(Either
(EventStoreError FilesystemStore)
(FilesystemCursor, Map StreamId FilesystemCursor)))
-> Either
(EventStoreError FilesystemStore)
(FilesystemCursor, Map StreamId FilesystemCursor)
-> IO
(Either
(EventStoreError FilesystemStore)
(FilesystemCursor, Map StreamId FilesystemCursor))
forall a b. (a -> b) -> a -> b
$ EventStoreError FilesystemStore
-> Either
(EventStoreError FilesystemStore)
(FilesystemCursor, Map StreamId FilesystemCursor)
forall a b. a -> Either a b
Left EventStoreError FilesystemStore
err
Right (StoreState FilesystemStore
newState, FilesystemCursor
finalCursor, Map StreamId FilesystemCursor
streamCursors, [StoredEvent]
newEvents) -> do
let paths :: StorePaths
paths = FilePath -> StorePaths
getPaths BackendHandle FilesystemStore
FilesystemStoreHandle
handle.config.storePath
jsonLine :: ByteString
jsonLine = ByteString -> ByteString
BL.toStrict (EventLogEntry -> ByteString
forall a. ToJSON a => a -> ByteString
encode (EventLogEntry -> ByteString) -> EventLogEntry -> ByteString
forall a b. (a -> b) -> a -> b
$ UUID -> [StoredEvent] -> UTCTime -> EventLogEntry
EventLogEntry UUID
txId [StoredEvent]
newEvents UTCTime
now) ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\n"
FilePath -> ByteString -> IO ()
BS.appendFile StorePaths
paths.eventLogPath ByteString
jsonLine
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar (StoreState FilesystemStore)
-> StoreState FilesystemStore -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar BackendHandle FilesystemStore
FilesystemStoreHandle
handle.stateVar StoreState FilesystemStore
newState
[StreamId] -> (StreamId -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
-> [StreamId]
forall k a. Map k a -> [k]
Map.keys Map StreamId (StreamWrite t SomeLatestEvent FilesystemStore)
batches) ((StreamId -> STM ()) -> STM ()) -> (StreamId -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \StreamId
streamId ->
Maybe (TVar Integer) -> (TVar Integer -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (StreamId -> Map StreamId (TVar Integer) -> Maybe (TVar Integer)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamId StoreState FilesystemStore
newState.streamNotifications) ((TVar Integer -> STM ()) -> STM ())
-> (TVar Integer -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \TVar Integer
var ->
TVar Integer -> Integer -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Integer
var (FilesystemCursor -> Integer
getSequenceNo FilesystemCursor
finalCursor)
TVar Integer -> Integer -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar StoreState FilesystemStore
newState.globalNotification (FilesystemCursor -> Integer
getSequenceNo FilesystemCursor
finalCursor)
Either
(EventStoreError FilesystemStore)
(FilesystemCursor, Map StreamId FilesystemCursor)
-> IO
(Either
(EventStoreError FilesystemStore)
(FilesystemCursor, Map StreamId FilesystemCursor))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either
(EventStoreError FilesystemStore)
(FilesystemCursor, Map StreamId FilesystemCursor)
-> IO
(Either
(EventStoreError FilesystemStore)
(FilesystemCursor, Map StreamId FilesystemCursor)))
-> Either
(EventStoreError FilesystemStore)
(FilesystemCursor, Map StreamId FilesystemCursor)
-> IO
(Either
(EventStoreError FilesystemStore)
(FilesystemCursor, Map StreamId FilesystemCursor))
forall a b. (a -> b) -> a -> b
$ (FilesystemCursor, Map StreamId FilesystemCursor)
-> Either
(EventStoreError FilesystemStore)
(FilesystemCursor, Map StreamId FilesystemCursor)
forall a b. b -> Either a b
Right (FilesystemCursor
finalCursor, Map StreamId FilesystemCursor
streamCursors)
case result of
Left (SomeException
e :: SomeException) ->
InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore))
-> InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore)
forall a b. (a -> b) -> a -> b
$
EventStoreError FilesystemStore -> InsertionResult FilesystemStore
forall backend. EventStoreError backend -> InsertionResult backend
FailedInsertion (EventStoreError FilesystemStore
-> InsertionResult FilesystemStore)
-> EventStoreError FilesystemStore
-> InsertionResult FilesystemStore
forall a b. (a -> b) -> a -> b
$
ErrorInfo -> EventStoreError FilesystemStore
forall backend. ErrorInfo -> EventStoreError backend
BackendError (ErrorInfo -> EventStoreError FilesystemStore)
-> ErrorInfo -> EventStoreError FilesystemStore
forall a b. (a -> b) -> a -> b
$
ErrorInfo
{ errorMessage :: Text
errorMessage = FilePath -> Text
pack (FilePath -> Text) -> FilePath -> Text
forall a b. (a -> b) -> a -> b
$ FilePath
"Failed to persist events: " FilePath -> ShowS
forall a. Semigroup a => a -> a -> a
<> SomeException -> FilePath
forall e. Exception e => e -> FilePath
displayException SomeException
e
, exception :: Maybe SomeException
exception = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e
}
Right (Left EventStoreError FilesystemStore
err) ->
InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore))
-> InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore)
forall a b. (a -> b) -> a -> b
$ EventStoreError FilesystemStore -> InsertionResult FilesystemStore
forall backend. EventStoreError backend -> InsertionResult backend
FailedInsertion EventStoreError FilesystemStore
err
Right (Right (FilesystemCursor
finalCursor, Map StreamId FilesystemCursor
streamCursors)) ->
InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore))
-> InsertionResult FilesystemStore
-> IO (InsertionResult FilesystemStore)
forall a b. (a -> b) -> a -> b
$
InsertionSuccess FilesystemStore -> InsertionResult FilesystemStore
forall backend. InsertionSuccess backend -> InsertionResult backend
SuccessfulInsertion (InsertionSuccess FilesystemStore
-> InsertionResult FilesystemStore)
-> InsertionSuccess FilesystemStore
-> InsertionResult FilesystemStore
forall a b. (a -> b) -> a -> b
$
InsertionSuccess
{ finalCursor :: Cursor FilesystemStore
finalCursor = Cursor FilesystemStore
FilesystemCursor
finalCursor
, streamCursors :: Map StreamId (Cursor FilesystemStore)
streamCursors = Map StreamId (Cursor FilesystemStore)
Map StreamId FilesystemCursor
streamCursors
}
subscribe :: forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints FilesystemStore m =>
BackendHandle FilesystemStore
-> EventMatcher ts FilesystemStore m
-> EventSelector FilesystemStore
-> m (SubscriptionHandle FilesystemStore)
subscribe = BackendHandle FilesystemStore
-> EventMatcher ts FilesystemStore m
-> EventSelector FilesystemStore
-> m (SubscriptionHandle FilesystemStore)
FilesystemStoreHandle
-> EventMatcher ts FilesystemStore m
-> EventSelector FilesystemStore
-> m (SubscriptionHandle FilesystemStore)
forall (m :: * -> *) (ts :: [Symbol]).
MonadUnliftIO m =>
FilesystemStoreHandle
-> EventMatcher ts FilesystemStore m
-> EventSelector FilesystemStore
-> m (SubscriptionHandle FilesystemStore)
subscribeFilesystem
instance StoreCursor FilesystemStore where
makeCursor :: Integer -> Cursor FilesystemStore
makeCursor = Integer -> Cursor FilesystemStore
Integer -> FilesystemCursor
FilesystemCursor
makeSequenceNo :: Cursor FilesystemStore -> Integer
makeSequenceNo = (.getSequenceNo)