{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
module Hindsight.Projection (
ProjectionId (..),
ProjectionState (..),
ProjectionStateError (..),
ProjectionResult (..),
ProjectionError (..),
runProjection,
loadProjectionState,
waitForEvent,
)
where
import Control.Concurrent (forkIO, killThread)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import Control.Exception (Exception, SomeException, bracket, bracket_, throwIO)
import Control.Monad (when)
import Control.Monad.IO.Class (liftIO)
import Data.Aeson (FromJSON, ToJSON, (.:))
import Data.Aeson qualified as Aeson
import Data.ByteString.Char8 qualified as ByteString
import Data.Function ((&))
import Data.Profunctor (dimap)
import Data.Text (Text, pack)
import Data.Text.Encoding qualified as Data.Text
import Data.Time (UTCTime, getCurrentTime)
import GHC.Generics (Generic)
import Hasql.Connection (Connection)
import Hasql.Notifications qualified as Notifications
import Hasql.Pool (Pool)
import Hasql.Pool qualified as Pool
import Hasql.Session (Session)
import Hasql.Session qualified as Session
import Hasql.Statement (Statement)
import Hasql.TH (maybeStatement)
import Hasql.Transaction qualified as Transaction
import Hasql.Transaction.Sessions qualified as Session
import Hindsight.Projection.Matching (ProjectionHandler, ProjectionHandlers (..))
import Hindsight.Projection.State qualified as ProjectionState
import Hindsight.Store (
BackendHandle,
Cursor,
EventEnvelope (position),
EventHandler,
EventMatcher (..),
EventSelector (EventSelector, startupPosition, streamId),
EventStore (StoreConstraints, subscribe),
StartupPosition (FromBeginning, FromLastProcessed),
StreamSelector (AllStreams),
SubscriptionResult (Continue, Stop),
)
import UnliftIO (MonadUnliftIO (..))
import UnliftIO.STM (TVar, atomically, writeTVar)
newtype ProjectionId = ProjectionId
{ProjectionId -> Text
unProjectionId :: Text}
deriving (Int -> ProjectionId -> ShowS
[ProjectionId] -> ShowS
ProjectionId -> String
(Int -> ProjectionId -> ShowS)
-> (ProjectionId -> String)
-> ([ProjectionId] -> ShowS)
-> Show ProjectionId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ProjectionId -> ShowS
showsPrec :: Int -> ProjectionId -> ShowS
$cshow :: ProjectionId -> String
show :: ProjectionId -> String
$cshowList :: [ProjectionId] -> ShowS
showList :: [ProjectionId] -> ShowS
Show, ProjectionId -> ProjectionId -> Bool
(ProjectionId -> ProjectionId -> Bool)
-> (ProjectionId -> ProjectionId -> Bool) -> Eq ProjectionId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ProjectionId -> ProjectionId -> Bool
== :: ProjectionId -> ProjectionId -> Bool
$c/= :: ProjectionId -> ProjectionId -> Bool
/= :: ProjectionId -> ProjectionId -> Bool
Eq, Eq ProjectionId
Eq ProjectionId =>
(ProjectionId -> ProjectionId -> Ordering)
-> (ProjectionId -> ProjectionId -> Bool)
-> (ProjectionId -> ProjectionId -> Bool)
-> (ProjectionId -> ProjectionId -> Bool)
-> (ProjectionId -> ProjectionId -> Bool)
-> (ProjectionId -> ProjectionId -> ProjectionId)
-> (ProjectionId -> ProjectionId -> ProjectionId)
-> Ord ProjectionId
ProjectionId -> ProjectionId -> Bool
ProjectionId -> ProjectionId -> Ordering
ProjectionId -> ProjectionId -> ProjectionId
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: ProjectionId -> ProjectionId -> Ordering
compare :: ProjectionId -> ProjectionId -> Ordering
$c< :: ProjectionId -> ProjectionId -> Bool
< :: ProjectionId -> ProjectionId -> Bool
$c<= :: ProjectionId -> ProjectionId -> Bool
<= :: ProjectionId -> ProjectionId -> Bool
$c> :: ProjectionId -> ProjectionId -> Bool
> :: ProjectionId -> ProjectionId -> Bool
$c>= :: ProjectionId -> ProjectionId -> Bool
>= :: ProjectionId -> ProjectionId -> Bool
$cmax :: ProjectionId -> ProjectionId -> ProjectionId
max :: ProjectionId -> ProjectionId -> ProjectionId
$cmin :: ProjectionId -> ProjectionId -> ProjectionId
min :: ProjectionId -> ProjectionId -> ProjectionId
Ord)
data ProjectionResult
= ProjectionSuccess
|
ProjectionSkipped
| ProjectionError ProjectionError
deriving (Int -> ProjectionResult -> ShowS
[ProjectionResult] -> ShowS
ProjectionResult -> String
(Int -> ProjectionResult -> ShowS)
-> (ProjectionResult -> String)
-> ([ProjectionResult] -> ShowS)
-> Show ProjectionResult
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ProjectionResult -> ShowS
showsPrec :: Int -> ProjectionResult -> ShowS
$cshow :: ProjectionResult -> String
show :: ProjectionResult -> String
$cshowList :: [ProjectionResult] -> ShowS
showList :: [ProjectionResult] -> ShowS
Show, ProjectionResult -> ProjectionResult -> Bool
(ProjectionResult -> ProjectionResult -> Bool)
-> (ProjectionResult -> ProjectionResult -> Bool)
-> Eq ProjectionResult
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ProjectionResult -> ProjectionResult -> Bool
== :: ProjectionResult -> ProjectionResult -> Bool
$c/= :: ProjectionResult -> ProjectionResult -> Bool
/= :: ProjectionResult -> ProjectionResult -> Bool
Eq)
data ProjectionError
= ParseError Text
| HandlerError SomeException
| BackendError Text
deriving (Int -> ProjectionError -> ShowS
[ProjectionError] -> ShowS
ProjectionError -> String
(Int -> ProjectionError -> ShowS)
-> (ProjectionError -> String)
-> ([ProjectionError] -> ShowS)
-> Show ProjectionError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ProjectionError -> ShowS
showsPrec :: Int -> ProjectionError -> ShowS
$cshow :: ProjectionError -> String
show :: ProjectionError -> String
$cshowList :: [ProjectionError] -> ShowS
showList :: [ProjectionError] -> ShowS
Show)
instance Eq ProjectionError where
(ParseError Text
t1) == :: ProjectionError -> ProjectionError -> Bool
== (ParseError Text
t2) = Text
t1 Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
t2
(HandlerError SomeException
e1) == (HandlerError SomeException
e2) = SomeException -> String
forall a. Show a => a -> String
show SomeException
e1 String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== SomeException -> String
forall a. Show a => a -> String
show SomeException
e2
(BackendError Text
t1) == (BackendError Text
t2) = Text
t1 Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
t2
ProjectionError
_ == ProjectionError
_ = Bool
False
data ProjectionState backend = ProjectionState
{ forall backend. ProjectionState backend -> ProjectionId
projectionId :: ProjectionId
, forall backend. ProjectionState backend -> Cursor backend
lastProcessed :: Cursor backend
, forall backend. ProjectionState backend -> UTCTime
lastUpdated :: UTCTime
}
data ProjectionStateError = ProjectionStateError
{ProjectionStateError -> Text
errorMessage :: Text}
deriving (Int -> ProjectionStateError -> ShowS
[ProjectionStateError] -> ShowS
ProjectionStateError -> String
(Int -> ProjectionStateError -> ShowS)
-> (ProjectionStateError -> String)
-> ([ProjectionStateError] -> ShowS)
-> Show ProjectionStateError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ProjectionStateError -> ShowS
showsPrec :: Int -> ProjectionStateError -> ShowS
$cshow :: ProjectionStateError -> String
show :: ProjectionStateError -> String
$cshowList :: [ProjectionStateError] -> ShowS
showList :: [ProjectionStateError] -> ShowS
Show, ProjectionStateError -> ProjectionStateError -> Bool
(ProjectionStateError -> ProjectionStateError -> Bool)
-> (ProjectionStateError -> ProjectionStateError -> Bool)
-> Eq ProjectionStateError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ProjectionStateError -> ProjectionStateError -> Bool
== :: ProjectionStateError -> ProjectionStateError -> Bool
$c/= :: ProjectionStateError -> ProjectionStateError -> Bool
/= :: ProjectionStateError -> ProjectionStateError -> Bool
Eq, (forall x. ProjectionStateError -> Rep ProjectionStateError x)
-> (forall x. Rep ProjectionStateError x -> ProjectionStateError)
-> Generic ProjectionStateError
forall x. Rep ProjectionStateError x -> ProjectionStateError
forall x. ProjectionStateError -> Rep ProjectionStateError x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. ProjectionStateError -> Rep ProjectionStateError x
from :: forall x. ProjectionStateError -> Rep ProjectionStateError x
$cto :: forall x. Rep ProjectionStateError x -> ProjectionStateError
to :: forall x. Rep ProjectionStateError x -> ProjectionStateError
Generic, Maybe ProjectionStateError
Value -> Parser [ProjectionStateError]
Value -> Parser ProjectionStateError
(Value -> Parser ProjectionStateError)
-> (Value -> Parser [ProjectionStateError])
-> Maybe ProjectionStateError
-> FromJSON ProjectionStateError
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser ProjectionStateError
parseJSON :: Value -> Parser ProjectionStateError
$cparseJSONList :: Value -> Parser [ProjectionStateError]
parseJSONList :: Value -> Parser [ProjectionStateError]
$comittedField :: Maybe ProjectionStateError
omittedField :: Maybe ProjectionStateError
FromJSON, [ProjectionStateError] -> Value
[ProjectionStateError] -> Encoding
ProjectionStateError -> Bool
ProjectionStateError -> Value
ProjectionStateError -> Encoding
(ProjectionStateError -> Value)
-> (ProjectionStateError -> Encoding)
-> ([ProjectionStateError] -> Value)
-> ([ProjectionStateError] -> Encoding)
-> (ProjectionStateError -> Bool)
-> ToJSON ProjectionStateError
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: ProjectionStateError -> Value
toJSON :: ProjectionStateError -> Value
$ctoEncoding :: ProjectionStateError -> Encoding
toEncoding :: ProjectionStateError -> Encoding
$ctoJSONList :: [ProjectionStateError] -> Value
toJSONList :: [ProjectionStateError] -> Value
$ctoEncodingList :: [ProjectionStateError] -> Encoding
toEncodingList :: [ProjectionStateError] -> Encoding
$comitField :: ProjectionStateError -> Bool
omitField :: ProjectionStateError -> Bool
ToJSON)
instance Exception ProjectionStateError
runProjection ::
forall backend m ts.
( EventStore backend
, MonadFail m
, FromJSON (Cursor backend)
, ToJSON (Cursor backend)
, StoreConstraints backend m
, MonadUnliftIO m
) =>
ProjectionId ->
Pool ->
Maybe (TVar (Maybe (ProjectionState backend))) ->
BackendHandle backend ->
ProjectionHandlers ts backend ->
m ()
runProjection :: forall backend (m :: * -> *) (ts :: [Symbol]).
(EventStore backend, MonadFail m, FromJSON (Cursor backend),
ToJSON (Cursor backend), StoreConstraints backend m,
MonadUnliftIO m) =>
ProjectionId
-> Pool
-> Maybe (TVar (Maybe (ProjectionState backend)))
-> BackendHandle backend
-> ProjectionHandlers ts backend
-> m ()
runProjection ProjectionId
projId Pool
pool Maybe (TVar (Maybe (ProjectionState backend)))
mbTVar BackendHandle backend
store ProjectionHandlers ts backend
handlers = do
mbLastState <- ProjectionId -> Pool -> m (Maybe (ProjectionState backend))
forall (m :: * -> *) backend.
(MonadUnliftIO m, MonadFail m, FromJSON (Cursor backend)) =>
ProjectionId -> Pool -> m (Maybe (ProjectionState backend))
loadProjectionState ProjectionId
projId Pool
pool
case (mbTVar, mbLastState) of
(Just TVar (Maybe (ProjectionState backend))
tvar, Just ProjectionState backend
state) ->
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (ProjectionState backend))
-> Maybe (ProjectionState backend) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (ProjectionState backend))
tvar (ProjectionState backend -> Maybe (ProjectionState backend)
forall a. a -> Maybe a
Just ProjectionState backend
state)
(Maybe (TVar (Maybe (ProjectionState backend))),
Maybe (ProjectionState backend))
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
_ <-
subscribe
store
(makeEventMatcher projId pool mbTVar handlers)
EventSelector
{ streamId = AllStreams
, startupPosition = maybe FromBeginning FromLastProcessed (fmap (.lastProcessed) mbLastState)
}
pure ()
loadProjectionState ::
(MonadUnliftIO m, MonadFail m, FromJSON (Cursor backend)) =>
ProjectionId ->
Pool ->
m (Maybe (ProjectionState backend))
loadProjectionState :: forall (m :: * -> *) backend.
(MonadUnliftIO m, MonadFail m, FromJSON (Cursor backend)) =>
ProjectionId -> Pool -> m (Maybe (ProjectionState backend))
loadProjectionState ProjectionId
projId Pool
pool = do
result <- IO
(Either
UsageError
(Maybe (Either ProjectionStateError (ProjectionState backend))))
-> m (Either
UsageError
(Maybe (Either ProjectionStateError (ProjectionState backend))))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
(Either
UsageError
(Maybe (Either ProjectionStateError (ProjectionState backend))))
-> m (Either
UsageError
(Maybe (Either ProjectionStateError (ProjectionState backend)))))
-> IO
(Either
UsageError
(Maybe (Either ProjectionStateError (ProjectionState backend))))
-> m (Either
UsageError
(Maybe (Either ProjectionStateError (ProjectionState backend))))
forall a b. (a -> b) -> a -> b
$ Pool
-> Session
(Maybe (Either ProjectionStateError (ProjectionState backend)))
-> IO
(Either
UsageError
(Maybe (Either ProjectionStateError (ProjectionState backend))))
forall a. Pool -> Session a -> IO (Either UsageError a)
Pool.use Pool
pool (Session
(Maybe (Either ProjectionStateError (ProjectionState backend)))
-> IO
(Either
UsageError
(Maybe (Either ProjectionStateError (ProjectionState backend)))))
-> Session
(Maybe (Either ProjectionStateError (ProjectionState backend)))
-> IO
(Either
UsageError
(Maybe (Either ProjectionStateError (ProjectionState backend))))
forall a b. (a -> b) -> a -> b
$ ProjectionId
-> Session
(Maybe (Either ProjectionStateError (ProjectionState backend)))
forall backend.
FromJSON (Cursor backend) =>
ProjectionId
-> Session
(Maybe (Either ProjectionStateError (ProjectionState backend)))
getProjectionState ProjectionId
projId
case result of
Left UsageError
err -> do
IO (Maybe (ProjectionState backend))
-> m (Maybe (ProjectionState backend))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (ProjectionState backend))
-> m (Maybe (ProjectionState backend)))
-> IO (Maybe (ProjectionState backend))
-> m (Maybe (ProjectionState backend))
forall a b. (a -> b) -> a -> b
$
ProjectionStateError -> IO (Maybe (ProjectionState backend))
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (ProjectionStateError -> IO (Maybe (ProjectionState backend)))
-> ProjectionStateError -> IO (Maybe (ProjectionState backend))
forall a b. (a -> b) -> a -> b
$
Text -> ProjectionStateError
ProjectionStateError (Text -> ProjectionStateError) -> Text -> ProjectionStateError
forall a b. (a -> b) -> a -> b
$
Text
"Failed to query projection state for " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (String -> Text
pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ ProjectionId -> String
forall a. Show a => a -> String
show ProjectionId
projId) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
": " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (String -> Text
pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ UsageError -> String
forall a. Show a => a -> String
show UsageError
err)
Right Maybe (Either ProjectionStateError (ProjectionState backend))
mbStateResult -> case Maybe (Either ProjectionStateError (ProjectionState backend))
mbStateResult of
Maybe (Either ProjectionStateError (ProjectionState backend))
Nothing -> Maybe (ProjectionState backend)
-> m (Maybe (ProjectionState backend))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (ProjectionState backend)
forall a. Maybe a
Nothing
Just (Left ProjectionStateError
err) -> IO (Maybe (ProjectionState backend))
-> m (Maybe (ProjectionState backend))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (ProjectionState backend))
-> m (Maybe (ProjectionState backend)))
-> IO (Maybe (ProjectionState backend))
-> m (Maybe (ProjectionState backend))
forall a b. (a -> b) -> a -> b
$ ProjectionStateError -> IO (Maybe (ProjectionState backend))
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO ProjectionStateError
err
Just (Right ProjectionState backend
state) -> Maybe (ProjectionState backend)
-> m (Maybe (ProjectionState backend))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (ProjectionState backend)
-> m (Maybe (ProjectionState backend)))
-> Maybe (ProjectionState backend)
-> m (Maybe (ProjectionState backend))
forall a b. (a -> b) -> a -> b
$ ProjectionState backend -> Maybe (ProjectionState backend)
forall a. a -> Maybe a
Just ProjectionState backend
state
makeEventMatcher ::
forall ts backend m.
(MonadUnliftIO m, ToJSON (Cursor backend)) =>
ProjectionId ->
Pool ->
Maybe (TVar (Maybe (ProjectionState backend))) ->
ProjectionHandlers ts backend ->
EventMatcher ts backend m
makeEventMatcher :: forall (ts :: [Symbol]) backend (m :: * -> *).
(MonadUnliftIO m, ToJSON (Cursor backend)) =>
ProjectionId
-> Pool
-> Maybe (TVar (Maybe (ProjectionState backend)))
-> ProjectionHandlers ts backend
-> EventMatcher ts backend m
makeEventMatcher ProjectionId
projId Pool
pool Maybe (TVar (Maybe (ProjectionState backend)))
mbTVar = ProjectionHandlers ts backend -> EventMatcher ts backend m
forall (ts' :: [Symbol]).
ProjectionHandlers ts' backend -> EventMatcher ts' backend m
go
where
go :: ProjectionHandlers ts' backend -> EventMatcher ts' backend m
go :: forall (ts' :: [Symbol]).
ProjectionHandlers ts' backend -> EventMatcher ts' backend m
go ProjectionHandlers ts' backend
ProjectionEnd = EventMatcher ts' backend m
EventMatcher '[] backend m
forall backend (m :: * -> *). EventMatcher '[] backend m
MatchEnd
go ((Proxy event
proxy, ProjectionHandler event backend
handler) :-> ProjectionHandlers ts1 backend
rest) =
(Proxy event
proxy, ProjectionHandler event backend -> EventHandler event m backend
forall (event :: Symbol).
ProjectionHandler event backend -> EventHandler event m backend
handleEvent ProjectionHandler event backend
handler) (Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
forall (event :: Symbol) (m :: * -> *) backend (ts1 :: [Symbol]).
Event event =>
(Proxy event, EventHandler event m backend)
-> EventMatcher ts1 backend m
-> EventMatcher (event : ts1) backend m
:? ProjectionHandlers ts1 backend -> EventMatcher ts1 backend m
forall (ts' :: [Symbol]).
ProjectionHandlers ts' backend -> EventMatcher ts' backend m
go ProjectionHandlers ts1 backend
rest
handleEvent ::
forall event.
ProjectionHandler event backend ->
EventHandler event m backend
handleEvent :: forall (event :: Symbol).
ProjectionHandler event backend -> EventHandler event m backend
handleEvent ProjectionHandler event backend
projHandler EventEnvelope event backend
envelope = do
now <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
let state =
ProjectionState
{ projectionId :: ProjectionId
projectionId = ProjectionId
projId
, lastProcessed :: Cursor backend
lastProcessed = EventEnvelope event backend
envelope.position
, lastUpdated :: UTCTime
lastUpdated = UTCTime
now
}
result <- liftIO $
Pool.use pool $
Session.transaction Session.ReadCommitted Session.Write $ do
projHandler envelope
Transaction.statement state (updateProjectionStatement)
case result of
Left UsageError
_err -> do
SubscriptionResult -> m SubscriptionResult
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Stop
Right ()
_ -> do
case Maybe (TVar (Maybe (ProjectionState backend)))
mbTVar of
Maybe (TVar (Maybe (ProjectionState backend)))
Nothing -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just TVar (Maybe (ProjectionState backend))
tvar -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (ProjectionState backend))
-> Maybe (ProjectionState backend) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (ProjectionState backend))
tvar (ProjectionState backend -> Maybe (ProjectionState backend)
forall a. a -> Maybe a
Just ProjectionState backend
state)
SubscriptionResult -> m SubscriptionResult
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SubscriptionResult
Continue
getProjectionState ::
(FromJSON (Cursor backend)) =>
ProjectionId ->
Session (Maybe (Either ProjectionStateError (ProjectionState backend)))
getProjectionState :: forall backend.
FromJSON (Cursor backend) =>
ProjectionId
-> Session
(Maybe (Either ProjectionStateError (ProjectionState backend)))
getProjectionState (ProjectionId Text
pid) =
Text
-> Statement Text (Maybe (Text, UTCTime, Maybe Value))
-> Session (Maybe (Text, UTCTime, Maybe Value))
forall params result.
params -> Statement params result -> Session result
Session.statement
Text
pid
[maybeStatement|
select
id :: text,
last_updated :: timestamptz,
head_position :: jsonb?
from projections
where id = $1 :: text
|]
Session (Maybe (Text, UTCTime, Maybe Value))
-> (Session (Maybe (Text, UTCTime, Maybe Value))
-> Session
(Maybe (Either ProjectionStateError (ProjectionState backend))))
-> Session
(Maybe (Either ProjectionStateError (ProjectionState backend)))
forall a b. a -> (a -> b) -> b
& (Maybe (Text, UTCTime, Maybe Value)
-> Maybe (Either ProjectionStateError (ProjectionState backend)))
-> Session (Maybe (Text, UTCTime, Maybe Value))
-> Session
(Maybe (Either ProjectionStateError (ProjectionState backend)))
forall a b. (a -> b) -> Session a -> Session b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe (Text, UTCTime, Maybe Value)
-> Maybe (Either ProjectionStateError (ProjectionState backend))
forall {backend}.
FromJSON (Cursor backend) =>
Maybe (Text, UTCTime, Maybe Value)
-> Maybe (Either ProjectionStateError (ProjectionState backend))
transform
where
transform :: Maybe (Text, UTCTime, Maybe Value)
-> Maybe (Either ProjectionStateError (ProjectionState backend))
transform Maybe (Text, UTCTime, Maybe Value)
res =
case Maybe (Text, UTCTime, Maybe Value)
res of
Maybe (Text, UTCTime, Maybe Value)
Nothing -> Maybe (Either ProjectionStateError (ProjectionState backend))
forall a. Maybe a
Nothing
Just (Text
id', UTCTime
updated, Maybe Value
mbCursorJson) ->
case Maybe Value
mbCursorJson of
Maybe Value
Nothing -> Maybe (Either ProjectionStateError (ProjectionState backend))
forall a. Maybe a
Nothing
Just Value
cursorJson ->
case Value -> Result (Cursor backend)
forall a. FromJSON a => Value -> Result a
Aeson.fromJSON Value
cursorJson of
Aeson.Success Cursor backend
cursor ->
Either ProjectionStateError (ProjectionState backend)
-> Maybe (Either ProjectionStateError (ProjectionState backend))
forall a. a -> Maybe a
Just (Either ProjectionStateError (ProjectionState backend)
-> Maybe (Either ProjectionStateError (ProjectionState backend)))
-> Either ProjectionStateError (ProjectionState backend)
-> Maybe (Either ProjectionStateError (ProjectionState backend))
forall a b. (a -> b) -> a -> b
$
ProjectionState backend
-> Either ProjectionStateError (ProjectionState backend)
forall a b. b -> Either a b
Right (ProjectionState backend
-> Either ProjectionStateError (ProjectionState backend))
-> ProjectionState backend
-> Either ProjectionStateError (ProjectionState backend)
forall a b. (a -> b) -> a -> b
$
ProjectionState
{ projectionId :: ProjectionId
projectionId = Text -> ProjectionId
ProjectionId Text
id'
, lastProcessed :: Cursor backend
lastProcessed = Cursor backend
cursor
, lastUpdated :: UTCTime
lastUpdated = UTCTime
updated
}
Aeson.Error String
err ->
Either ProjectionStateError (ProjectionState backend)
-> Maybe (Either ProjectionStateError (ProjectionState backend))
forall a. a -> Maybe a
Just (Either ProjectionStateError (ProjectionState backend)
-> Maybe (Either ProjectionStateError (ProjectionState backend)))
-> Either ProjectionStateError (ProjectionState backend)
-> Maybe (Either ProjectionStateError (ProjectionState backend))
forall a b. (a -> b) -> a -> b
$
ProjectionStateError
-> Either ProjectionStateError (ProjectionState backend)
forall a b. a -> Either a b
Left (ProjectionStateError
-> Either ProjectionStateError (ProjectionState backend))
-> ProjectionStateError
-> Either ProjectionStateError (ProjectionState backend)
forall a b. (a -> b) -> a -> b
$
Text -> ProjectionStateError
ProjectionStateError (Text -> ProjectionStateError) -> Text -> ProjectionStateError
forall a b. (a -> b) -> a -> b
$
Text
"Could not parse projection state cursor: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
pack String
err
updateProjectionStatement ::
(ToJSON (Cursor backend)) =>
Statement (ProjectionState backend) ()
updateProjectionStatement :: forall backend.
ToJSON (Cursor backend) =>
Statement (ProjectionState backend) ()
updateProjectionStatement =
Statement (Text, Value) ()
ProjectionState.upsertProjectionCursor
Statement (Text, Value) ()
-> (Statement (Text, Value) ()
-> Statement (ProjectionState backend) ())
-> Statement (ProjectionState backend) ()
forall a b. a -> (a -> b) -> b
& (ProjectionState backend -> (Text, Value))
-> (() -> ())
-> Statement (Text, Value) ()
-> Statement (ProjectionState backend) ()
forall a b c d.
(a -> b) -> (c -> d) -> Statement b c -> Statement a d
forall (p :: * -> * -> *) a b c d.
Profunctor p =>
(a -> b) -> (c -> d) -> p b c -> p a d
dimap
(\(ProjectionState (ProjectionId Text
pid) Cursor backend
cursor UTCTime
_ts) -> (Text
pid, Cursor backend -> Value
forall a. ToJSON a => a -> Value
Aeson.toJSON Cursor backend
cursor))
() -> ()
forall a. a -> a
id
waitForEvent ::
forall backend m.
( Ord (Cursor backend)
, MonadUnliftIO m
, FromJSON (Cursor backend)
) =>
ProjectionId ->
Cursor backend ->
Connection ->
m ()
waitForEvent :: forall backend (m :: * -> *).
(Ord (Cursor backend), MonadUnliftIO m,
FromJSON (Cursor backend)) =>
ProjectionId -> Cursor backend -> Connection -> m ()
waitForEvent projectionId :: ProjectionId
projectionId@(ProjectionId Text
projId) Cursor backend
targetCursor Connection
conn = do
let pgId :: PgIdentifier
pgId = Text -> PgIdentifier
Notifications.toPgIdentifier Text
projId
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
IO () -> IO () -> IO () -> IO ()
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_
(Connection -> PgIdentifier -> IO ()
Notifications.listen Connection
conn PgIdentifier
pgId)
(Connection -> PgIdentifier -> IO ()
Notifications.unlisten Connection
conn PgIdentifier
pgId)
( do
result <- Session
(Maybe (Either ProjectionStateError (ProjectionState backend)))
-> Connection
-> IO
(Either
SessionError
(Maybe (Either ProjectionStateError (ProjectionState backend))))
forall a. Session a -> Connection -> IO (Either SessionError a)
Session.run (ProjectionId
-> Session
(Maybe (Either ProjectionStateError (ProjectionState backend)))
forall backend.
FromJSON (Cursor backend) =>
ProjectionId
-> Session
(Maybe (Either ProjectionStateError (ProjectionState backend)))
getProjectionState ProjectionId
projectionId) Connection
conn
case result of
Right (Just (Right ProjectionState backend
currState))
| ProjectionState backend
currState.lastProcessed Cursor backend -> Cursor backend -> Bool
forall a. Ord a => a -> a -> Bool
>= Cursor backend
targetCursor ->
() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Right Maybe (Either ProjectionStateError (ProjectionState backend))
Nothing ->
IO ()
waitForNotifications
Right (Just (Right ProjectionState backend
_)) ->
IO ()
waitForNotifications
Right (Just (Left ProjectionStateError
err)) ->
ProjectionStateError -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO ProjectionStateError
err
Left SessionError
err ->
SessionError -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO SessionError
err
)
where
waitForNotifications :: IO ()
waitForNotifications = do
completionVar <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
bracket
( forkIO $ flip Notifications.waitForNotifications conn $ \ByteString
channel ByteString
notifPayload -> do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> Text
Data.Text.decodeASCII ByteString
channel Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
projId) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
case forall a. FromJSON a => ByteString -> Maybe a
Aeson.decode @(NotificationPayload backend) (ByteString -> Maybe (NotificationPayload backend))
-> ByteString -> Maybe (NotificationPayload backend)
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
ByteString.fromStrict ByteString
notifPayload of
Maybe (NotificationPayload backend)
Nothing ->
ProjectionStateError -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (ProjectionStateError -> IO ()) -> ProjectionStateError -> IO ()
forall a b. (a -> b) -> a -> b
$
Text -> ProjectionStateError
ProjectionStateError (Text -> ProjectionStateError) -> Text -> ProjectionStateError
forall a b. (a -> b) -> a -> b
$
Text
"Could not decode notification payload: "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
Data.Text.pack (ShowS
forall a. Show a => a -> String
show ShowS -> ShowS
forall a b. (a -> b) -> a -> b
$ ByteString -> String
ByteString.unpack ByteString
notifPayload)
Just NotificationPayload backend
payload ->
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NotificationPayload backend
payload.headPosition Cursor backend -> Cursor backend -> Bool
forall a. Ord a => a -> a -> Bool
>= Cursor backend
targetCursor) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
completionVar ()
)
killThread
(\ThreadId
_ -> MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
completionVar)
data NotificationPayload backend = NotificationPayload
{ forall backend. NotificationPayload backend -> Cursor backend
headPosition :: Cursor backend
, forall backend. NotificationPayload backend -> UTCTime
lastUpdated :: UTCTime
}
deriving ((forall x.
NotificationPayload backend -> Rep (NotificationPayload backend) x)
-> (forall x.
Rep (NotificationPayload backend) x -> NotificationPayload backend)
-> Generic (NotificationPayload backend)
forall x.
Rep (NotificationPayload backend) x -> NotificationPayload backend
forall x.
NotificationPayload backend -> Rep (NotificationPayload backend) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall backend x.
Rep (NotificationPayload backend) x -> NotificationPayload backend
forall backend x.
NotificationPayload backend -> Rep (NotificationPayload backend) x
$cfrom :: forall backend x.
NotificationPayload backend -> Rep (NotificationPayload backend) x
from :: forall x.
NotificationPayload backend -> Rep (NotificationPayload backend) x
$cto :: forall backend x.
Rep (NotificationPayload backend) x -> NotificationPayload backend
to :: forall x.
Rep (NotificationPayload backend) x -> NotificationPayload backend
Generic)
deriving instance (Show (Cursor backend)) => Show (NotificationPayload backend)
deriving instance (Eq (Cursor backend)) => Eq (NotificationPayload backend)
instance (FromJSON (Cursor backend)) => FromJSON (NotificationPayload backend) where
parseJSON :: Value -> Parser (NotificationPayload backend)
parseJSON = String
-> (Object -> Parser (NotificationPayload backend))
-> Value
-> Parser (NotificationPayload backend)
forall a. String -> (Object -> Parser a) -> Value -> Parser a
Aeson.withObject String
"NotificationPayload" ((Object -> Parser (NotificationPayload backend))
-> Value -> Parser (NotificationPayload backend))
-> (Object -> Parser (NotificationPayload backend))
-> Value
-> Parser (NotificationPayload backend)
forall a b. (a -> b) -> a -> b
$ \Object
obj -> do
headPosition <- Object
obj Object -> Key -> Parser (Cursor backend)
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"head_position"
lastUpdated <- obj .: "last_updated"
pure NotificationPayload{..}