{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

{- |
Module      : Hindsight.Store.PostgreSQL.Events.Insertion
Description : Event insertion logic for PostgreSQL backend
Copyright   : (c) 2024
License     : BSD3
Maintainer  : maintainer@example.com
Stability   : internal

Handles the insertion of events into PostgreSQL tables, including
transaction management, sequence number assignment, stream head updates,
and optional synchronous projection execution.
-}
module Hindsight.Store.PostgreSQL.Events.Insertion (
    -- * Types
    InsertedEvents (..),

    -- * Main insertion functions
    insertEventsWithSyncProjections,

    -- * Stream version utilities
    calculateStreamVersions,
    nextStreamVersions,
)
where

import Control.Monad (foldM, forM_, replicateM)
import Data.Aeson (Value, toJSON)
import Data.Foldable qualified as Foldable
import Data.Functor.Contravariant ((>$<))
import Data.Int (Int32, Int64)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Text (Text)
import Data.Time (UTCTime)
import Data.Time.Clock (getCurrentTime)
import Data.UUID (UUID)
import Data.UUID.V4 qualified as UUID
import Hasql.Decoders qualified as D
import Hasql.Encoders qualified as E
import Hasql.Statement (Statement (..))
import Hasql.Transaction qualified as HasqlTransaction
import Hindsight.Events (SomeLatestEvent (..), getEventName, getMaxVersion)
import Hindsight.Store (CorrelationId (..), EventEnvelope (..), EventId (..), StreamId (..), StreamVersion (..), StreamWrite (..))
import Hindsight.Store.PostgreSQL.Core.Types (SQLCursor (..), SQLStore, SyncProjectionRegistry (..))
import Hindsight.Store.PostgreSQL.Projections.State (updateSyncProjectionState)
import Hindsight.Store.PostgreSQL.Projections.Sync (executeSyncProjectionForEvent)

-- * Types

-- | Result of event insertion
data InsertedEvents = InsertedEvents
    { InsertedEvents -> SQLCursor
finalCursor :: SQLCursor
    , InsertedEvents -> Map StreamId SQLCursor
streamCursors :: Map StreamId SQLCursor
    -- ^ Per-stream final cursors
    , InsertedEvents -> [StreamId]
insertedStreamIds :: [StreamId]
    , InsertedEvents -> [UUID]
eventIds :: [UUID]
    , InsertedEvents -> UTCTime
createdAt :: UTCTime
    , InsertedEvents -> Map StreamId [StreamVersion]
streamVersions :: Map StreamId [StreamVersion]
    }

-- * Database statements

-- | Get next transaction number from sequence
getTransactionNumber :: Statement () Int64
getTransactionNumber :: Statement () Int64
getTransactionNumber = ByteString
-> Params () -> Result Int64 -> Bool -> Statement () Int64
forall params result.
ByteString
-> Params params
-> Result result
-> Bool
-> Statement params result
Statement ByteString
sql Params ()
E.noParams Result Int64
decoder Bool
True
  where
    sql :: ByteString
sql = ByteString
"select nextval('transaction_seq')"
    decoder :: Result Int64
decoder = Row Int64 -> Result Int64
forall a. Row a -> Result a
D.singleRow (Row Int64 -> Result Int64) -> Row Int64 -> Result Int64
forall a b. (a -> b) -> a -> b
$ NullableOrNot Value Int64 -> Row Int64
forall a. NullableOrNot Value a -> Row a
D.column (NullableOrNot Value Int64 -> Row Int64)
-> NullableOrNot Value Int64 -> Row Int64
forall a b. (a -> b) -> a -> b
$ Value Int64 -> NullableOrNot Value Int64
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value Int64
D.int8

-- | Insert transaction number into event_transactions table
insertTransactionNumber :: Statement Int64 ()
insertTransactionNumber :: Statement Int64 ()
insertTransactionNumber = ByteString
-> Params Int64 -> Result () -> Bool -> Statement Int64 ()
forall params result.
ByteString
-> Params params
-> Result result
-> Bool
-> Statement params result
Statement ByteString
sql Params Int64
encoder Result ()
D.noResult Bool
True
  where
    sql :: ByteString
sql = ByteString
"INSERT INTO event_transactions (transaction_no) VALUES ($1)"
    encoder :: Params Int64
encoder = NullableOrNot Value Int64 -> Params Int64
forall a. NullableOrNot Value a -> Params a
E.param (Value Int64 -> NullableOrNot Value Int64
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int64
E.int8)

-- | Insert a single event into the events table
insertEventStatement :: Statement (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32, Value, StreamVersion) ()
insertEventStatement :: Statement
  (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
   Value, StreamVersion)
  ()
insertEventStatement = ByteString
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
-> Result ()
-> Bool
-> Statement
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
     ()
forall params result.
ByteString
-> Params params
-> Result result
-> Bool
-> Statement params result
Statement ByteString
sql Params
  (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
   Value, StreamVersion)
encoder Result ()
D.noResult Bool
True
  where
    sql :: ByteString
sql =
        ByteString
"INSERT INTO events (\
        \    transaction_no, seq_no, event_id, stream_id,\
        \    correlation_id, created_at, event_name, event_version, payload, stream_version\
        \) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)"

    encoder :: Params
  (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
   Value, StreamVersion)
encoder =
        ((\(Int64
a, Int32
_, EventId
_, StreamId
_, Maybe UUID
_, UTCTime
_, Text
_, Int32
_, Value
_, StreamVersion
_) -> Int64
a) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
  Int32, Value, StreamVersion)
 -> Int64)
-> Params Int64
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value Int64 -> Params Int64
forall a. NullableOrNot Value a -> Params a
E.param (Value Int64 -> NullableOrNot Value Int64
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int64
E.int8))
            Params
  (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
   Value, StreamVersion)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(Int64
_, Int32
b, EventId
_, StreamId
_, Maybe UUID
_, UTCTime
_, Text
_, Int32
_, Value
_, StreamVersion
_) -> Int32
b) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
  Int32, Value, StreamVersion)
 -> Int32)
-> Params Int32
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value Int32 -> Params Int32
forall a. NullableOrNot Value a -> Params a
E.param (Value Int32 -> NullableOrNot Value Int32
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int32
E.int4))
            Params
  (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
   Value, StreamVersion)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(Int64
_, Int32
_, EventId UUID
c, StreamId
_, Maybe UUID
_, UTCTime
_, Text
_, Int32
_, Value
_, StreamVersion
_) -> UUID
c) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
  Int32, Value, StreamVersion)
 -> UUID)
-> Params UUID
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
E.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value UUID
E.uuid))
            Params
  (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
   Value, StreamVersion)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(Int64
_, Int32
_, EventId
_, StreamId UUID
d, Maybe UUID
_, UTCTime
_, Text
_, Int32
_, Value
_, StreamVersion
_) -> UUID
d) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
  Int32, Value, StreamVersion)
 -> UUID)
-> Params UUID
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
E.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value UUID
E.uuid))
            Params
  (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
   Value, StreamVersion)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(Int64
_, Int32
_, EventId
_, StreamId
_, Maybe UUID
e, UTCTime
_, Text
_, Int32
_, Value
_, StreamVersion
_) -> Maybe UUID
e) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
  Int32, Value, StreamVersion)
 -> Maybe UUID)
-> Params (Maybe UUID)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value (Maybe UUID) -> Params (Maybe UUID)
forall a. NullableOrNot Value a -> Params a
E.param (Value UUID -> NullableOrNot Value (Maybe UUID)
forall (encoder :: * -> *) a.
encoder a -> NullableOrNot encoder (Maybe a)
E.nullable Value UUID
E.uuid))
            Params
  (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
   Value, StreamVersion)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(Int64
_, Int32
_, EventId
_, StreamId
_, Maybe UUID
_, UTCTime
f, Text
_, Int32
_, Value
_, StreamVersion
_) -> UTCTime
f) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
  Int32, Value, StreamVersion)
 -> UTCTime)
-> Params UTCTime
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value UTCTime -> Params UTCTime
forall a. NullableOrNot Value a -> Params a
E.param (Value UTCTime -> NullableOrNot Value UTCTime
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value UTCTime
E.timestamptz))
            Params
  (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
   Value, StreamVersion)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(Int64
_, Int32
_, EventId
_, StreamId
_, Maybe UUID
_, UTCTime
_, Text
g, Int32
_, Value
_, StreamVersion
_) -> Text
g) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
  Int32, Value, StreamVersion)
 -> Text)
-> Params Text
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
E.param (Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Text
E.text))
            Params
  (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
   Value, StreamVersion)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(Int64
_, Int32
_, EventId
_, StreamId
_, Maybe UUID
_, UTCTime
_, Text
_, Int32
h, Value
_, StreamVersion
_) -> Int32
h) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
  Int32, Value, StreamVersion)
 -> Int32)
-> Params Int32
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value Int32 -> Params Int32
forall a. NullableOrNot Value a -> Params a
E.param (Value Int32 -> NullableOrNot Value Int32
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int32
E.int4))
            Params
  (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
   Value, StreamVersion)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(Int64
_, Int32
_, EventId
_, StreamId
_, Maybe UUID
_, UTCTime
_, Text
_, Int32
_, Value
i, StreamVersion
_) -> Value
i) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
  Int32, Value, StreamVersion)
 -> Value)
-> Params Value
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value Value -> Params Value
forall a. NullableOrNot Value a -> Params a
E.param (Value Value -> NullableOrNot Value Value
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Value
E.jsonb))
            Params
  (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
   Value, StreamVersion)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(Int64
_, Int32
_, EventId
_, StreamId
_, Maybe UUID
_, UTCTime
_, Text
_, Int32
_, Value
_, StreamVersion Int64
j) -> Int64
j) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
  Int32, Value, StreamVersion)
 -> Int64)
-> Params Int64
-> Params
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value Int64 -> Params Int64
forall a. NullableOrNot Value a -> Params a
E.param (Value Int64 -> NullableOrNot Value Int64
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int64
E.int8))

-- | Update or insert a stream head record
updateStreamHeadStatement :: Statement (StreamId, Int64, Int32, EventId, StreamVersion) ()
updateStreamHeadStatement :: Statement (StreamId, Int64, Int32, EventId, StreamVersion) ()
updateStreamHeadStatement = ByteString
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
-> Result ()
-> Bool
-> Statement (StreamId, Int64, Int32, EventId, StreamVersion) ()
forall params result.
ByteString
-> Params params
-> Result result
-> Bool
-> Statement params result
Statement ByteString
sql Params (StreamId, Int64, Int32, EventId, StreamVersion)
encoder Result ()
D.noResult Bool
True
  where
    sql :: ByteString
sql =
        ByteString
"INSERT INTO stream_heads (\
        \    stream_id, latest_transaction_no, latest_seq_no, last_event_id, stream_version\
        \) VALUES ($1, $2, $3, $4, $5)\
        \  ON CONFLICT (stream_id) DO UPDATE SET\
        \    latest_transaction_no = excluded.latest_transaction_no,\
        \    latest_seq_no = excluded.latest_seq_no,\
        \    last_event_id = excluded.last_event_id,\
        \    stream_version = excluded.stream_version"

    encoder :: Params (StreamId, Int64, Int32, EventId, StreamVersion)
encoder =
        ((\(StreamId UUID
a, Int64
_, Int32
_, EventId
_, StreamVersion
_) -> UUID
a) ((StreamId, Int64, Int32, EventId, StreamVersion) -> UUID)
-> Params UUID
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
E.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value UUID
E.uuid))
            Params (StreamId, Int64, Int32, EventId, StreamVersion)
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(StreamId
_, Int64
b, Int32
_, EventId
_, StreamVersion
_) -> Int64
b) ((StreamId, Int64, Int32, EventId, StreamVersion) -> Int64)
-> Params Int64
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value Int64 -> Params Int64
forall a. NullableOrNot Value a -> Params a
E.param (Value Int64 -> NullableOrNot Value Int64
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int64
E.int8))
            Params (StreamId, Int64, Int32, EventId, StreamVersion)
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(StreamId
_, Int64
_, Int32
c, EventId
_, StreamVersion
_) -> Int32
c) ((StreamId, Int64, Int32, EventId, StreamVersion) -> Int32)
-> Params Int32
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value Int32 -> Params Int32
forall a. NullableOrNot Value a -> Params a
E.param (Value Int32 -> NullableOrNot Value Int32
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int32
E.int4))
            Params (StreamId, Int64, Int32, EventId, StreamVersion)
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(StreamId
_, Int64
_, Int32
_, EventId UUID
d, StreamVersion
_) -> UUID
d) ((StreamId, Int64, Int32, EventId, StreamVersion) -> UUID)
-> Params UUID
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
E.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value UUID
E.uuid))
            Params (StreamId, Int64, Int32, EventId, StreamVersion)
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(StreamId
_, Int64
_, Int32
_, EventId
_, StreamVersion Int64
e) -> Int64
e) ((StreamId, Int64, Int32, EventId, StreamVersion) -> Int64)
-> Params Int64
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value Int64 -> Params Int64
forall a. NullableOrNot Value a -> Params a
E.param (Value Int64 -> NullableOrNot Value Int64
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int64
E.int8))

-- | Get current stream version
getCurrentStreamVersionStmt :: Statement UUID (Maybe StreamVersion)
getCurrentStreamVersionStmt :: Statement UUID (Maybe StreamVersion)
getCurrentStreamVersionStmt = ByteString
-> Params UUID
-> Result (Maybe StreamVersion)
-> Bool
-> Statement UUID (Maybe StreamVersion)
forall params result.
ByteString
-> Params params
-> Result result
-> Bool
-> Statement params result
Statement ByteString
sql Params UUID
encoder Result (Maybe StreamVersion)
decoder Bool
True
  where
    sql :: ByteString
sql = ByteString
"SELECT stream_version FROM stream_heads WHERE stream_id = $1"
    encoder :: Params UUID
encoder = NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
E.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value UUID
E.uuid)
    decoder :: Result (Maybe StreamVersion)
decoder = Row StreamVersion -> Result (Maybe StreamVersion)
forall a. Row a -> Result (Maybe a)
D.rowMaybe (Row StreamVersion -> Result (Maybe StreamVersion))
-> Row StreamVersion -> Result (Maybe StreamVersion)
forall a b. (a -> b) -> a -> b
$ NullableOrNot Value StreamVersion -> Row StreamVersion
forall a. NullableOrNot Value a -> Row a
D.column (NullableOrNot Value StreamVersion -> Row StreamVersion)
-> NullableOrNot Value StreamVersion -> Row StreamVersion
forall a b. (a -> b) -> a -> b
$ Value StreamVersion -> NullableOrNot Value StreamVersion
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable (Value StreamVersion -> NullableOrNot Value StreamVersion)
-> Value StreamVersion -> NullableOrNot Value StreamVersion
forall a b. (a -> b) -> a -> b
$ (Int64 -> StreamVersion) -> Value Int64 -> Value StreamVersion
forall a b. (a -> b) -> Value a -> Value b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Int64 -> StreamVersion
StreamVersion Value Int64
D.int8

-- * Helper functions

-- | Calculate stream versions for events based on current stream state
calculateStreamVersions ::
    Map StreamId (StreamWrite t SomeLatestEvent SQLStore) ->
    HasqlTransaction.Transaction (Map StreamId StreamVersion)
calculateStreamVersions :: forall (t :: * -> *).
Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> Transaction (Map StreamId StreamVersion)
calculateStreamVersions Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
eventBatches = do
    -- Get current stream versions for all streams
    currentVersions <- (StreamId -> Transaction StreamVersion)
-> [StreamId] -> Transaction [StreamVersion]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM StreamId -> Transaction StreamVersion
getCurrentStreamVersion (Map StreamId (StreamWrite t SomeLatestEvent SQLStore) -> [StreamId]
forall k a. Map k a -> [k]
Map.keys Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
eventBatches)
    pure $ Map.fromList $ zip (Map.keys eventBatches) currentVersions
  where
    getCurrentStreamVersion :: StreamId -> HasqlTransaction.Transaction StreamVersion
    getCurrentStreamVersion :: StreamId -> Transaction StreamVersion
getCurrentStreamVersion (StreamId UUID
streamUUID) = do
        result <- UUID
-> Statement UUID (Maybe StreamVersion)
-> Transaction (Maybe StreamVersion)
forall a b. a -> Statement a b -> Transaction b
HasqlTransaction.statement UUID
streamUUID Statement UUID (Maybe StreamVersion)
getCurrentStreamVersionStmt
        pure $ maybe (StreamVersion 0) id result

-- | Calculate next stream versions for each stream
nextStreamVersions ::
    (Foldable t) =>
    Map StreamId (StreamWrite t SomeLatestEvent SQLStore) ->
    Map StreamId StreamVersion ->
    Map StreamId [StreamVersion]
nextStreamVersions :: forall (t :: * -> *).
Foldable t =>
Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> Map StreamId StreamVersion -> Map StreamId [StreamVersion]
nextStreamVersions Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
eventBatches Map StreamId StreamVersion
currentVersions =
    (StreamId
 -> StreamWrite t SomeLatestEvent SQLStore -> [StreamVersion])
-> Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> Map StreamId [StreamVersion]
forall k a b. (k -> a -> b) -> Map k a -> Map k b
Map.mapWithKey StreamId
-> StreamWrite t SomeLatestEvent SQLStore -> [StreamVersion]
forall {t :: * -> *} {p} {a}.
(Foldable t, HasField "events" p (t a)) =>
StreamId -> p -> [StreamVersion]
calculateNextVersionsForStream Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
eventBatches
  where
    calculateNextVersionsForStream :: StreamId -> p -> [StreamVersion]
calculateNextVersionsForStream StreamId
streamId p
batch =
        let currentVersion :: StreamVersion
currentVersion = StreamVersion
-> StreamId -> Map StreamId StreamVersion -> StreamVersion
forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault (Int64 -> StreamVersion
StreamVersion Int64
0) StreamId
streamId Map StreamId StreamVersion
currentVersions
            eventCount :: Int
eventCount = [a] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([a] -> Int) -> [a] -> Int
forall a b. (a -> b) -> a -> b
$ t a -> [a]
forall a. t a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Foldable.toList p
batch.events
         in [StreamVersion
currentVersion StreamVersion -> StreamVersion -> StreamVersion
forall a. Num a => a -> a -> a
+ StreamVersion
1 .. StreamVersion
currentVersion StreamVersion -> StreamVersion -> StreamVersion
forall a. Num a => a -> a -> a
+ Int -> StreamVersion
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
eventCount]

-- * Main insertion functions

-- | Insert events and execute synchronous projections within the same transaction
insertEventsWithSyncProjections ::
    forall t.
    (Traversable t) =>
    SyncProjectionRegistry ->
    Maybe CorrelationId ->
    Map StreamId (StreamWrite t SomeLatestEvent SQLStore) ->
    IO (HasqlTransaction.Transaction InsertedEvents)
insertEventsWithSyncProjections :: forall (t :: * -> *).
Traversable t =>
SyncProjectionRegistry
-> Maybe CorrelationId
-> Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> IO (Transaction InsertedEvents)
insertEventsWithSyncProjections SyncProjectionRegistry
syncRegistry Maybe CorrelationId
correlationId Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
eventBatches = do
    -- Generate metadata once
    eventIds <- Int -> IO UUID -> IO [UUID]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
totalEventCount IO UUID
UUID.nextRandom
    createdAt <- getCurrentTime

    pure $ do
        -- Get transaction number first (single round-trip)
        -- MVCC handles transaction visibility automatically
        txNo <- HasqlTransaction.statement () getTransactionNumber

        -- Insert transaction number into event_transactions table
        HasqlTransaction.statement txNo insertTransactionNumber

        -- Calculate current stream versions and determine next versions
        currentVersions <- calculateStreamVersions eventBatches
        let streamVersionMap = Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> Map StreamId StreamVersion -> Map StreamId [StreamVersion]
forall (t :: * -> *).
Foldable t =>
Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> Map StreamId StreamVersion -> Map StreamId [StreamVersion]
nextStreamVersions Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
eventBatches Map StreamId StreamVersion
currentVersions

        -- Process all events in a single traversal, generating EventWithMetadata for each
        -- Sync projections receive identical metadata to what's persisted
        streamHeadMetadata <- processEventsWithUnifiedMetadata syncRegistry correlationId eventIds createdAt streamVersionMap txNo eventBatches

        -- Compute per-stream cursors from stream head metadata
        let perStreamCursors =
                (StreamId -> (UUID, Int32) -> SQLCursor)
-> Map StreamId (UUID, Int32) -> Map StreamId SQLCursor
forall k a b. (k -> a -> b) -> Map k a -> Map k b
Map.mapWithKey
                    (\StreamId
_ (UUID
_lastEventId, Int32
lastSeqNo) -> Int64 -> Int32 -> SQLCursor
SQLCursor Int64
txNo Int32
lastSeqNo)
                    Map StreamId (UUID, Int32)
streamHeadMetadata

        -- Return final cursor and updated streams
        pure
            InsertedEvents
                { finalCursor =
                    SQLCursor
                        { transactionNo = txNo
                        , sequenceNo = fromIntegral totalEventCount - 1
                        }
                , streamCursors = perStreamCursors
                , insertedStreamIds = Map.keys eventBatches
                , eventIds = eventIds
                , createdAt = createdAt
                , streamVersions = streamVersionMap
                }
  where
    totalEventCount :: Int
totalEventCount = [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int] -> Int) -> [Int] -> Int
forall a b. (a -> b) -> a -> b
$ (StreamWrite t SomeLatestEvent SQLStore -> Int)
-> [StreamWrite t SomeLatestEvent SQLStore] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (t SomeLatestEvent -> Int
forall a. t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (t SomeLatestEvent -> Int)
-> (StreamWrite t SomeLatestEvent SQLStore -> t SomeLatestEvent)
-> StreamWrite t SomeLatestEvent SQLStore
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (.events)) ([StreamWrite t SomeLatestEvent SQLStore] -> [Int])
-> [StreamWrite t SomeLatestEvent SQLStore] -> [Int]
forall a b. (a -> b) -> a -> b
$ Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> [StreamWrite t SomeLatestEvent SQLStore]
forall k a. Map k a -> [a]
Map.elems Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
eventBatches

    -- Process all events in unified manner: database insertion AND sync projections
    -- Returns the stream head metadata mapping (streamId -> (lastEventId, lastSeqNo))
    processEventsWithUnifiedMetadata ::
        SyncProjectionRegistry ->
        Maybe CorrelationId ->
        [UUID] ->
        UTCTime ->
        Map StreamId [StreamVersion] ->
        Int64 ->
        Map StreamId (StreamWrite t SomeLatestEvent SQLStore) ->
        HasqlTransaction.Transaction (Map StreamId (UUID, Int32))
    processEventsWithUnifiedMetadata :: SyncProjectionRegistry
-> Maybe CorrelationId
-> [UUID]
-> UTCTime
-> Map StreamId [StreamVersion]
-> Int64
-> Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> Transaction (Map StreamId (UUID, Int32))
processEventsWithUnifiedMetadata SyncProjectionRegistry
registry Maybe CorrelationId
corrId [UUID]
allEventIds UTCTime
createdAtTime Map StreamId [StreamVersion]
streamVersionMap Int64
txNo Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
batches = do
        let streamList :: [(StreamId, StreamWrite t SomeLatestEvent SQLStore)]
streamList = Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> [(StreamId, StreamWrite t SomeLatestEvent SQLStore)]
forall k a. Map k a -> [(k, a)]
Map.toList Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
batches
        -- Process each stream, distributing event IDs and tracking sequence numbers
        -- Track per-stream last event metadata (eventId, seqNo) for stream heads
        (finalSeqNoOffset, _, streamHeadMetadata) <- ((Int32, [UUID], Map StreamId (UUID, Int32))
 -> (StreamId, StreamWrite t SomeLatestEvent SQLStore)
 -> Transaction (Int32, [UUID], Map StreamId (UUID, Int32)))
-> (Int32, [UUID], Map StreamId (UUID, Int32))
-> [(StreamId, StreamWrite t SomeLatestEvent SQLStore)]
-> Transaction (Int32, [UUID], Map StreamId (UUID, Int32))
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM (Int32, [UUID], Map StreamId (UUID, Int32))
-> (StreamId, StreamWrite t SomeLatestEvent SQLStore)
-> Transaction (Int32, [UUID], Map StreamId (UUID, Int32))
forall {t :: * -> *} {b}.
(Foldable t, HasField "events" b (t SomeLatestEvent)) =>
(Int32, [UUID], Map StreamId (UUID, Int32))
-> (StreamId, b)
-> Transaction (Int32, [UUID], Map StreamId (UUID, Int32))
processStream (Int32
0, [UUID]
allEventIds, Map StreamId (UUID, Int32)
forall k a. Map k a
Map.empty) [(StreamId, StreamWrite t SomeLatestEvent SQLStore)]
streamList

        -- Update sync projection state once for all projections with the final cursor
        let SyncProjectionRegistry projMap = registry
            finalCursor = Int64 -> Int32 -> SQLCursor
SQLCursor Int64
txNo (Int32
finalSeqNoOffset Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
- Int32
1) -- Final seq is last event's position
        forM_ (Map.keys projMap) $ \ProjectionId
projId ->
            ProjectionId -> SQLCursor -> Transaction ()
updateSyncProjectionState ProjectionId
projId SQLCursor
finalCursor

        -- Update stream heads with correct per-stream values
        updateStreamHeadsFromVersionMap streamVersionMap streamHeadMetadata txNo

        -- Return stream head metadata for per-stream cursor computation
        pure streamHeadMetadata
      where
        processStream :: (Int32, [UUID], Map StreamId (UUID, Int32))
-> (StreamId, b)
-> Transaction (Int32, [UUID], Map StreamId (UUID, Int32))
processStream (Int32
seqNoOffset, [UUID]
remainingIds, Map StreamId (UUID, Int32)
headMetadata) (StreamId
streamId, b
batch) = do
            let events :: [SomeLatestEvent]
events = t SomeLatestEvent -> [SomeLatestEvent]
forall a. t a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Foldable.toList b
batch.events
                numEvents :: Int
numEvents = [SomeLatestEvent] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [SomeLatestEvent]
events
                ([UUID]
streamEventIds, [UUID]
leftoverIds) = Int -> [UUID] -> ([UUID], [UUID])
forall a. Int -> [a] -> ([a], [a])
splitAt Int
numEvents [UUID]
remainingIds
                streamVersions :: [StreamVersion]
streamVersions = [StreamVersion]
-> StreamId -> Map StreamId [StreamVersion] -> [StreamVersion]
forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault [] StreamId
streamId Map StreamId [StreamVersion]
streamVersionMap

            -- Process each event in this stream
            [((SomeLatestEvent, UUID, StreamVersion), Int32)]
-> (((SomeLatestEvent, UUID, StreamVersion), Int32)
    -> Transaction ())
-> Transaction ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([SomeLatestEvent]
-> [UUID]
-> [StreamVersion]
-> [(SomeLatestEvent, UUID, StreamVersion)]
forall a b c. [a] -> [b] -> [c] -> [(a, b, c)]
zip3 [SomeLatestEvent]
events [UUID]
streamEventIds [StreamVersion]
streamVersions [(SomeLatestEvent, UUID, StreamVersion)]
-> [Int32] -> [((SomeLatestEvent, UUID, StreamVersion), Int32)]
forall a b. [a] -> [b] -> [(a, b)]
`zip` [Int32
0 :: Int32 ..]) ((((SomeLatestEvent, UUID, StreamVersion), Int32)
  -> Transaction ())
 -> Transaction ())
-> (((SomeLatestEvent, UUID, StreamVersion), Int32)
    -> Transaction ())
-> Transaction ()
forall a b. (a -> b) -> a -> b
$
                \((SomeLatestEvent
event, UUID
eventId, StreamVersion
streamVer), Int32
idx) -> do
                    let seqNo :: Int32
seqNo = Int32
seqNoOffset Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
idx
                        cursor :: SQLCursor
cursor = Int64 -> Int32 -> SQLCursor
SQLCursor Int64
txNo Int32
seqNo

                    -- Process with unified metadata (database insert + sync projections)
                    UUID
-> StreamId
-> SQLCursor
-> Maybe CorrelationId
-> UTCTime
-> StreamVersion
-> SomeLatestEvent
-> Transaction ()
processEventWithUnifiedMetadata UUID
eventId StreamId
streamId SQLCursor
cursor Maybe CorrelationId
corrId UTCTime
createdAtTime StreamVersion
streamVer SomeLatestEvent
event

            -- Update sync projection state for all registered projections
            let SyncProjectionRegistry Map ProjectionId (SomeProjectionHandlers SQLStore)
projMap = SyncProjectionRegistry
registry
                finalSeqNo :: Int32
finalSeqNo = Int32
seqNoOffset Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
numEvents Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
- Int32
1
                finalCursor :: SQLCursor
finalCursor = Int64 -> Int32 -> SQLCursor
SQLCursor Int64
txNo Int32
finalSeqNo
            [ProjectionId]
-> (ProjectionId -> Transaction ()) -> Transaction ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map ProjectionId (SomeProjectionHandlers SQLStore)
-> [ProjectionId]
forall k a. Map k a -> [k]
Map.keys Map ProjectionId (SomeProjectionHandlers SQLStore)
projMap) ((ProjectionId -> Transaction ()) -> Transaction ())
-> (ProjectionId -> Transaction ()) -> Transaction ()
forall a b. (a -> b) -> a -> b
$ \ProjectionId
projId ->
                ProjectionId -> SQLCursor -> Transaction ()
updateSyncProjectionState ProjectionId
projId SQLCursor
finalCursor

            -- Capture last event metadata for this stream (for stream head update)
            -- Only update metadata if there are events in this stream
            let updatedMetadata :: Map StreamId (UUID, Int32)
updatedMetadata = case [UUID]
streamEventIds of
                    [] -> Map StreamId (UUID, Int32)
headMetadata -- No events, don't update
                    [UUID]
_ ->
                        let lastEventId :: UUID
lastEventId = [UUID] -> UUID
forall a. HasCallStack => [a] -> a
Prelude.last [UUID]
streamEventIds -- Last event ID for THIS stream
                            lastSeqNo :: Int32
lastSeqNo = Int32
seqNoOffset Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
numEvents Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
- Int32
1
                         in StreamId
-> (UUID, Int32)
-> Map StreamId (UUID, Int32)
-> Map StreamId (UUID, Int32)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert StreamId
streamId (UUID
lastEventId, Int32
lastSeqNo) Map StreamId (UUID, Int32)
headMetadata

            (Int32, [UUID], Map StreamId (UUID, Int32))
-> Transaction (Int32, [UUID], Map StreamId (UUID, Int32))
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int32
seqNoOffset Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
numEvents, [UUID]
leftoverIds, Map StreamId (UUID, Int32)
updatedMetadata)

        -- Create envelope and execute both database insertion and sync projections
        -- Ensures identical metadata for both database insertion and projection execution
        processEventWithUnifiedMetadata :: UUID -> StreamId -> SQLCursor -> Maybe CorrelationId -> UTCTime -> StreamVersion -> SomeLatestEvent -> HasqlTransaction.Transaction ()
        processEventWithUnifiedMetadata :: UUID
-> StreamId
-> SQLCursor
-> Maybe CorrelationId
-> UTCTime
-> StreamVersion
-> SomeLatestEvent
-> Transaction ()
processEventWithUnifiedMetadata UUID
eventId StreamId
streamId SQLCursor
cursor Maybe CorrelationId
corrId' UTCTime
timestamp StreamVersion
streamVer (SomeLatestEvent Proxy event
eventProxy CurrentPayloadType event
payload) = do
            let envelope :: EventEnvelope event SQLStore
envelope =
                    EventWithMetadata
                        { position :: Cursor SQLStore
position = Cursor SQLStore
SQLCursor
cursor
                        , eventId :: EventId
eventId = UUID -> EventId
EventId UUID
eventId
                        , streamId :: StreamId
streamId = StreamId
streamId
                        , streamVersion :: StreamVersion
streamVersion = StreamVersion
streamVer
                        , correlationId :: Maybe CorrelationId
correlationId = Maybe CorrelationId
corrId'
                        , createdAt :: UTCTime
createdAt = UTCTime
timestamp
                        , payload :: CurrentPayloadType event
payload = CurrentPayloadType event
payload
                        }

            -- Database insertion using the envelope
            EventEnvelope event SQLStore
-> StreamVersion
-> Proxy event
-> FinalVersionType (FromList (Versions event))
-> Transaction ()
forall {event :: Symbol} {a} {a} {r}.
(AssertPeanoEqual
   (ListLength (Versions event))
   ('PeanoSucc (ToPeanoNat (MaxVersion event)))
   (((((('Text "Version count mismatch for event '" ':<>: 'Text event)
        ':<>: 'Text "'")
       ':$$: (('Text "  MaxVersion declares "
               ':<>: 'ShowType (1 + FromPeanoNat (ToPeanoNat (MaxVersion event))))
              ':<>: 'Text " versions"))
      ':$$: (('Text "  But Versions list has "
              ':<>: 'ShowType (FromPeanoNat (ListLength (Versions event))))
             ':<>: 'Text " elements"))
     ':$$: 'Text "")
    ':$$: 'Text
            "Hint: Check that (MaxVersion event + 1) matches the length of your Versions list"),
 Event event, ToJSON a,
 ToJSON (FinalVersionType (FromList (Versions event))),
 HasEvidenceList
   'PeanoZero
   ('PeanoSucc (ToPeanoNat (MaxVersion event)))
   event
   ValidPayloadForVersion
   (FromList (Versions event)),
 HasField "toUUID" a UUID, HasField "correlationId" r (Maybe a),
 HasField "createdAt" r UTCTime, HasField "eventId" r EventId,
 HasField "position" r SQLCursor, HasField "streamId" r StreamId) =>
r -> StreamVersion -> Proxy event -> a -> Transaction ()
insertSingleEventFromEnvelope EventEnvelope event SQLStore
envelope StreamVersion
streamVer Proxy event
eventProxy FinalVersionType (FromList (Versions event))
CurrentPayloadType event
payload

            -- Sync projection using the SAME envelope
            SyncProjectionRegistry
-> Proxy event -> EventEnvelope event SQLStore -> Transaction ()
forall (event :: Symbol).
Event event =>
SyncProjectionRegistry
-> Proxy event -> EventEnvelope event SQLStore -> Transaction ()
executeSyncProjectionForEvent SyncProjectionRegistry
registry Proxy event
eventProxy EventEnvelope event SQLStore
envelope

        insertSingleEventFromEnvelope :: r -> StreamVersion -> Proxy event -> a -> Transaction ()
insertSingleEventFromEnvelope r
envelope StreamVersion
streamVer Proxy event
proxy a
payload = do
            let SQLCursor Int64
cursorTxNo Int32
cursorSeqNo = r
envelope.position
                EventId UUID
eventId = r
envelope.eventId
                name :: Text
name = Proxy event -> Text
forall (event :: Symbol). KnownSymbol event => Proxy event -> Text
getEventName Proxy event
proxy
                ver :: Integer
ver = Proxy event -> Integer
forall (event :: Symbol). Event event => Proxy event -> Integer
getMaxVersion Proxy event
proxy
            (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
 Value, StreamVersion)
-> Statement
     (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
      Value, StreamVersion)
     ()
-> Transaction ()
forall a b. a -> Statement a b -> Transaction b
HasqlTransaction.statement
                ( Int64
cursorTxNo
                , Int32
cursorSeqNo
                , UUID -> EventId
EventId UUID
eventId
                , r
envelope.streamId
                , (.toUUID) (a -> UUID) -> Maybe a -> Maybe UUID
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> r
envelope.correlationId
                , r
envelope.createdAt
                , Text
name
                , Integer -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
ver
                , a -> Value
forall a. ToJSON a => a -> Value
toJSON a
payload
                , StreamVersion
streamVer
                )
                Statement
  (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
   Value, StreamVersion)
  ()
insertEventStatement

        updateStreamHeadsFromVersionMap :: Map StreamId [StreamVersion]
-> Map StreamId (UUID, Int32) -> Int64 -> Transaction ()
updateStreamHeadsFromVersionMap Map StreamId [StreamVersion]
versionMap Map StreamId (UUID, Int32)
headMetadata Int64
txNo' = do
            [(StreamId, [StreamVersion])]
-> ((StreamId, [StreamVersion]) -> Transaction ())
-> Transaction ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map StreamId [StreamVersion] -> [(StreamId, [StreamVersion])]
forall k a. Map k a -> [(k, a)]
Map.toList Map StreamId [StreamVersion]
versionMap) (((StreamId, [StreamVersion]) -> Transaction ()) -> Transaction ())
-> ((StreamId, [StreamVersion]) -> Transaction ())
-> Transaction ()
forall a b. (a -> b) -> a -> b
$ \(StreamId
streamId, [StreamVersion]
versions) -> do
                case [StreamVersion] -> [StreamVersion]
forall a. [a] -> [a]
reverse [StreamVersion]
versions of -- Get the last (highest) version
                    [] -> () -> Transaction ()
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                    (StreamVersion
finalVersion : [StreamVersion]
_) -> do
                        -- Look up the actual last event ID and sequence number for THIS stream
                        case StreamId -> Map StreamId (UUID, Int32) -> Maybe (UUID, Int32)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamId Map StreamId (UUID, Int32)
headMetadata of
                            Maybe (UUID, Int32)
Nothing -> () -> Transaction ()
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- Stream has no events, skip
                            Just (UUID
lastEventId, Int32
lastSeqNo) ->
                                (StreamId, Int64, Int32, EventId, StreamVersion)
-> Statement (StreamId, Int64, Int32, EventId, StreamVersion) ()
-> Transaction ()
forall a b. a -> Statement a b -> Transaction b
HasqlTransaction.statement
                                    (StreamId
streamId, Int64
txNo', Int32
lastSeqNo, UUID -> EventId
EventId UUID
lastEventId, StreamVersion
finalVersion)
                                    Statement (StreamId, Int64, Int32, EventId, StreamVersion) ()
updateStreamHeadStatement