{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

module Hindsight.Store.PostgreSQL.Projections.State (
    -- * State Management
    updateSyncProjectionState,
    registerSyncProjectionInDb,
    getActiveProjections,

    -- * Types
    SyncProjectionState (..),
)
where

import Control.Category ((>>>))
import Data.Aeson qualified as Aeson
import Data.Functor ((<&>))
import Data.Int (Int32, Int64)
import Data.Text (Text)
import Data.Time (UTCTime)
import Data.Vector qualified as Vector
import Hasql.Statement (Statement)
import Hasql.TH
import Hasql.Transaction qualified as HasqlTransaction
import Hindsight.Projection (ProjectionId (..))
import Hindsight.Projection.State qualified as ProjectionState
import Hindsight.Store.PostgreSQL.Core.Types (SQLCursor (..))

-- | State of a sync projection in the database
data SyncProjectionState = SyncProjectionState
    { SyncProjectionState -> ProjectionId
projectionId :: ProjectionId
    , SyncProjectionState -> Int64
lastProcessedTransactionNo :: Int64
    , SyncProjectionState -> Int32
lastProcessedSeqNo :: Int32
    , SyncProjectionState -> UTCTime
lastUpdated :: UTCTime
    , SyncProjectionState -> Bool
isActive :: Bool
    , SyncProjectionState -> Maybe Text
errorMessage :: Maybe Text
    , SyncProjectionState -> Maybe UTCTime
errorTimestamp :: Maybe UTCTime
    }
    deriving (Int -> SyncProjectionState -> ShowS
[SyncProjectionState] -> ShowS
SyncProjectionState -> String
(Int -> SyncProjectionState -> ShowS)
-> (SyncProjectionState -> String)
-> ([SyncProjectionState] -> ShowS)
-> Show SyncProjectionState
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SyncProjectionState -> ShowS
showsPrec :: Int -> SyncProjectionState -> ShowS
$cshow :: SyncProjectionState -> String
show :: SyncProjectionState -> String
$cshowList :: [SyncProjectionState] -> ShowS
showList :: [SyncProjectionState] -> ShowS
Show, SyncProjectionState -> SyncProjectionState -> Bool
(SyncProjectionState -> SyncProjectionState -> Bool)
-> (SyncProjectionState -> SyncProjectionState -> Bool)
-> Eq SyncProjectionState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SyncProjectionState -> SyncProjectionState -> Bool
== :: SyncProjectionState -> SyncProjectionState -> Bool
$c/= :: SyncProjectionState -> SyncProjectionState -> Bool
/= :: SyncProjectionState -> SyncProjectionState -> Bool
Eq)

{- | Update sync projection state after successful processing

Uses the shared upsert operation from Hindsight.Projection.State
which handles cursor updates and error clearing.
-}
updateSyncProjectionState :: ProjectionId -> SQLCursor -> HasqlTransaction.Transaction ()
updateSyncProjectionState :: ProjectionId -> SQLCursor -> Transaction ()
updateSyncProjectionState (ProjectionId Text
projId) SQLCursor
cursor = do
    (Text, Value) -> Statement (Text, Value) () -> Transaction ()
forall a b. a -> Statement a b -> Transaction b
HasqlTransaction.statement
        (Text
projId, SQLCursor -> Value
forall a. ToJSON a => a -> Value
Aeson.toJSON SQLCursor
cursor)
        Statement (Text, Value) ()
ProjectionState.upsertProjectionCursor

{- | Register a sync projection in the database if it doesn't exist

Uses the shared registration operation from Hindsight.Projection.State.
-}
registerSyncProjectionInDb :: ProjectionId -> HasqlTransaction.Transaction ()
registerSyncProjectionInDb :: ProjectionId -> Transaction ()
registerSyncProjectionInDb (ProjectionId Text
projId) = do
    Text -> Statement Text () -> Transaction ()
forall a b. a -> Statement a b -> Transaction b
HasqlTransaction.statement
        Text
projId
        Statement Text ()
ProjectionState.registerProjection

-- | Get all active sync projections from the database
getActiveProjections :: HasqlTransaction.Transaction [SyncProjectionState]
getActiveProjections :: Transaction [SyncProjectionState]
getActiveProjections = do
    ()
-> Statement () [SyncProjectionState]
-> Transaction [SyncProjectionState]
forall a b. a -> Statement a b -> Transaction b
HasqlTransaction.statement () Statement () [SyncProjectionState]
getActiveStmt
  where
    getActiveStmt :: Statement () [SyncProjectionState]
    getActiveStmt :: Statement () [SyncProjectionState]
getActiveStmt =
        [vectorStatement|
        SELECT 
          id :: text,
          head_position :: jsonb?,
          last_updated :: timestamptz,
          is_active :: bool,
          error_message :: text?,
          error_timestamp :: timestamptz?
        FROM projections
        WHERE is_active = true
        ORDER BY id
      |]
            Statement
  ()
  (Vector
     (Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime))
-> (Vector
      (Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)
    -> [SyncProjectionState])
-> Statement () [SyncProjectionState]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> (Vector
  (Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)
-> [(Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)]
forall a. Vector a -> [a]
Vector.toList (Vector
   (Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)
 -> [(Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)])
-> ([(Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)]
    -> [SyncProjectionState])
-> Vector
     (Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)
-> [SyncProjectionState]
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> ((Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)
 -> SyncProjectionState)
-> [(Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)]
-> [SyncProjectionState]
forall a b. (a -> b) -> [a] -> [b]
map (Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)
-> SyncProjectionState
toSyncProjectionState)

    toSyncProjectionState :: (Text, Maybe Value, UTCTime, Bool, Maybe Text, Maybe UTCTime)
-> SyncProjectionState
toSyncProjectionState (Text
projId, Maybe Value
mbCursorJson, UTCTime
updated, Bool
active, Maybe Text
errMsg, Maybe UTCTime
errTs) =
        let (Int64
txNo, Int32
seqNo) = case Maybe Value
mbCursorJson of
                Maybe Value
Nothing -> (-Int64
1, -Int32
1) -- Never processed
                Just Value
cursorJson -> case Value -> Result SQLCursor
forall a. FromJSON a => Value -> Result a
Aeson.fromJSON Value
cursorJson :: Aeson.Result SQLCursor of
                    Aeson.Success (SQLCursor Int64
t Int32
s) -> (Int64
t, Int32
s)
                    Aeson.Error String
_ -> (-Int64
1, -Int32
1) -- Failed to parse, treat as never processed
         in SyncProjectionState
                { projectionId :: ProjectionId
projectionId = Text -> ProjectionId
ProjectionId Text
projId
                , lastProcessedTransactionNo :: Int64
lastProcessedTransactionNo = Int64
txNo
                , lastProcessedSeqNo :: Int32
lastProcessedSeqNo = Int32
seqNo
                , lastUpdated :: UTCTime
lastUpdated = UTCTime
updated
                , isActive :: Bool
isActive = Bool
active
                , errorMessage :: Maybe Text
errorMessage = Maybe Text
errMsg
                , errorTimestamp :: Maybe UTCTime
errorTimestamp = Maybe UTCTime
errTs
                }