{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
module Hindsight.Store.PostgreSQL.Projections.State (
updateSyncProjectionState,
registerSyncProjectionInDb,
getActiveProjections,
SyncProjectionState (..),
)
where
import Control.Category ((>>>))
import Data.Aeson qualified as Aeson
import Data.Functor ((<&>))
import Data.Int (Int32, Int64)
import Data.Text (Text)
import Data.Time (UTCTime)
import Data.Vector qualified as Vector
import Hasql.Statement (Statement)
import Hasql.TH
import Hasql.Transaction qualified as HasqlTransaction
import Hindsight.Projection (ProjectionId (..))
import Hindsight.Projection.State qualified as ProjectionState
import Hindsight.Store.PostgreSQL.Core.Types (SQLCursor (..))
data SyncProjectionState = SyncProjectionState
{ SyncProjectionState -> ProjectionId
projectionId :: ProjectionId
, SyncProjectionState -> Int64
lastProcessedTransactionNo :: Int64
, SyncProjectionState -> Int32
lastProcessedSeqNo :: Int32
, SyncProjectionState -> UTCTime
lastUpdated :: UTCTime
, SyncProjectionState -> Bool
isActive :: Bool
, SyncProjectionState -> Maybe Text
errorMessage :: Maybe Text
, SyncProjectionState -> Maybe UTCTime
errorTimestamp :: Maybe UTCTime
}
deriving (Int -> SyncProjectionState -> ShowS
[SyncProjectionState] -> ShowS
SyncProjectionState -> String
(Int -> SyncProjectionState -> ShowS)
-> (SyncProjectionState -> String)
-> ([SyncProjectionState] -> ShowS)
-> Show SyncProjectionState
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SyncProjectionState -> ShowS
showsPrec :: Int -> SyncProjectionState -> ShowS
$cshow :: SyncProjectionState -> String
show :: SyncProjectionState -> String
$cshowList :: [SyncProjectionState] -> ShowS
showList :: [SyncProjectionState] -> ShowS
Show, SyncProjectionState -> SyncProjectionState -> Bool
(SyncProjectionState -> SyncProjectionState -> Bool)
-> (SyncProjectionState -> SyncProjectionState -> Bool)
-> Eq SyncProjectionState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SyncProjectionState -> SyncProjectionState -> Bool
== :: SyncProjectionState -> SyncProjectionState -> Bool
$c/= :: SyncProjectionState -> SyncProjectionState -> Bool
/= :: SyncProjectionState -> SyncProjectionState -> Bool
Eq)
updateSyncProjectionState :: ProjectionId -> SQLCursor -> HasqlTransaction.Transaction ()
updateSyncProjectionState :: ProjectionId -> SQLCursor -> Transaction ()
updateSyncProjectionState (ProjectionId Text
projId) SQLCursor
cursor = do
(Text, Value) -> Statement (Text, Value) () -> Transaction ()
forall a b. a -> Statement a b -> Transaction b
HasqlTransaction.statement
(Text
projId, SQLCursor -> Value
forall a. ToJSON a => a -> Value
Aeson.toJSON SQLCursor
cursor)
Statement (Text, Value) ()
ProjectionState.upsertProjectionCursor
registerSyncProjectionInDb :: ProjectionId -> HasqlTransaction.Transaction ()
registerSyncProjectionInDb :: ProjectionId -> Transaction ()
registerSyncProjectionInDb (ProjectionId Text
projId) = do
Text -> Statement Text () -> Transaction ()
forall a b. a -> Statement a b -> Transaction b
HasqlTransaction.statement
Text
projId
Statement Text ()
ProjectionState.registerProjection
getActiveProjections :: HasqlTransaction.Transaction [SyncProjectionState]
getActiveProjections :: Transaction [SyncProjectionState]
getActiveProjections = do
()
-> Statement () [SyncProjectionState]
-> Transaction [SyncProjectionState]
forall a b. a -> Statement a b -> Transaction b
HasqlTransaction.statement () Statement () [SyncProjectionState]
getActiveStmt
where
getActiveStmt :: Statement () [SyncProjectionState]
getActiveStmt :: Statement () [SyncProjectionState]
getActiveStmt =
[vectorStatement|
SELECT
id :: text,
head_position :: jsonb?,
last_updated :: timestamptz,
is_active :: bool,
error_message :: text?,
error_timestamp :: timestamptz?
FROM projections
WHERE is_active = true
ORDER BY id
|]
Statement
()
(Vector
(Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime))
-> (Vector
(Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)
-> [SyncProjectionState])
-> Statement () [SyncProjectionState]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> (Vector
(Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)
-> [(Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)]
forall a. Vector a -> [a]
Vector.toList (Vector
(Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)
-> [(Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)])
-> ([(Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)]
-> [SyncProjectionState])
-> Vector
(Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)
-> [SyncProjectionState]
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> ((Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)
-> SyncProjectionState)
-> [(Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)]
-> [SyncProjectionState]
forall a b. (a -> b) -> [a] -> [b]
map (Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)
-> SyncProjectionState
toSyncProjectionState)
toSyncProjectionState :: (Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)
-> SyncProjectionState
toSyncProjectionState (Text
projId, Maybe Value
mbCursorJson, UTCTime
updated, Bool
active, Maybe Text
errMsg, Maybe UTCTime
errTs) =
let (Int64
txNo, Int32
seqNo) = case Maybe Value
mbCursorJson of
Maybe Value
Nothing -> (-Int64
1, -Int32
1)
Just Value
cursorJson -> case Value -> Result SQLCursor
forall a. FromJSON a => Value -> Result a
Aeson.fromJSON Value
cursorJson :: Aeson.Result SQLCursor of
Aeson.Success (SQLCursor Int64
t Int32
s) -> (Int64
t, Int32
s)
Aeson.Error String
_ -> (-Int64
1, -Int32
1)
in SyncProjectionState
{ projectionId :: ProjectionId
projectionId = Text -> ProjectionId
ProjectionId Text
projId
, lastProcessedTransactionNo :: Int64
lastProcessedTransactionNo = Int64
txNo
, lastProcessedSeqNo :: Int32
lastProcessedSeqNo = Int32
seqNo
, lastUpdated :: UTCTime
lastUpdated = UTCTime
updated
, isActive :: Bool
isActive = Bool
active
, errorMessage :: Maybe Text
errorMessage = Maybe Text
errMsg
, errorTimestamp :: Maybe UTCTime
errorTimestamp = Maybe UTCTime
errTs
}