{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
module Hindsight.Store.PostgreSQL.Events.Insertion (
InsertedEvents (..),
insertEventsWithSyncProjections,
calculateStreamVersions,
nextStreamVersions,
)
where
import Control.Monad (foldM, forM_, replicateM)
import Data.Aeson (Value, toJSON)
import Data.Foldable qualified as Foldable
import Data.Functor.Contravariant ((>$<))
import Data.Int (Int32, Int64)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Text (Text)
import Data.Time (UTCTime)
import Data.Time.Clock (getCurrentTime)
import Data.UUID (UUID)
import Data.UUID.V4 qualified as UUID
import Hasql.Decoders qualified as D
import Hasql.Encoders qualified as E
import Hasql.Statement (Statement (..))
import Hasql.Transaction qualified as HasqlTransaction
import Hindsight.Events (SomeLatestEvent (..), getEventName, getMaxVersion)
import Hindsight.Store (CorrelationId (..), EventEnvelope (..), EventId (..), StreamId (..), StreamVersion (..), StreamWrite (..))
import Hindsight.Store.PostgreSQL.Core.Types (SQLCursor (..), SQLStore, SyncProjectionRegistry (..))
import Hindsight.Store.PostgreSQL.Projections.State (updateSyncProjectionState)
import Hindsight.Store.PostgreSQL.Projections.Sync (executeSyncProjectionForEvent)
data InsertedEvents = InsertedEvents
{ InsertedEvents -> SQLCursor
finalCursor :: SQLCursor
, InsertedEvents -> Map StreamId SQLCursor
streamCursors :: Map StreamId SQLCursor
, InsertedEvents -> [StreamId]
insertedStreamIds :: [StreamId]
, InsertedEvents -> [UUID]
eventIds :: [UUID]
, InsertedEvents -> UTCTime
createdAt :: UTCTime
, InsertedEvents -> Map StreamId [StreamVersion]
streamVersions :: Map StreamId [StreamVersion]
}
getTransactionNumber :: Statement () Int64
getTransactionNumber :: Statement () Int64
getTransactionNumber = ByteString
-> Params () -> Result Int64 -> Bool -> Statement () Int64
forall params result.
ByteString
-> Params params
-> Result result
-> Bool
-> Statement params result
Statement ByteString
sql Params ()
E.noParams Result Int64
decoder Bool
True
where
sql :: ByteString
sql = ByteString
"select nextval('transaction_seq')"
decoder :: Result Int64
decoder = Row Int64 -> Result Int64
forall a. Row a -> Result a
D.singleRow (Row Int64 -> Result Int64) -> Row Int64 -> Result Int64
forall a b. (a -> b) -> a -> b
$ NullableOrNot Value Int64 -> Row Int64
forall a. NullableOrNot Value a -> Row a
D.column (NullableOrNot Value Int64 -> Row Int64)
-> NullableOrNot Value Int64 -> Row Int64
forall a b. (a -> b) -> a -> b
$ Value Int64 -> NullableOrNot Value Int64
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable Value Int64
D.int8
insertTransactionNumber :: Statement Int64 ()
insertTransactionNumber :: Statement Int64 ()
insertTransactionNumber = ByteString
-> Params Int64 -> Result () -> Bool -> Statement Int64 ()
forall params result.
ByteString
-> Params params
-> Result result
-> Bool
-> Statement params result
Statement ByteString
sql Params Int64
encoder Result ()
D.noResult Bool
True
where
sql :: ByteString
sql = ByteString
"INSERT INTO event_transactions (transaction_no) VALUES ($1)"
encoder :: Params Int64
encoder = NullableOrNot Value Int64 -> Params Int64
forall a. NullableOrNot Value a -> Params a
E.param (Value Int64 -> NullableOrNot Value Int64
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int64
E.int8)
insertEventStatement :: Statement (Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32, Value, StreamVersion) ()
insertEventStatement :: Statement
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
()
insertEventStatement = ByteString
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Result ()
-> Bool
-> Statement
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
()
forall params result.
ByteString
-> Params params
-> Result result
-> Bool
-> Statement params result
Statement ByteString
sql Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
encoder Result ()
D.noResult Bool
True
where
sql :: ByteString
sql =
ByteString
"INSERT INTO events (\
\ transaction_no, seq_no, event_id, stream_id,\
\ correlation_id, created_at, event_name, event_version, payload, stream_version\
\) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)"
encoder :: Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
encoder =
((\(Int64
a, Int32
_, EventId
_, StreamId
_, Maybe UUID
_, UTCTime
_, Text
_, Int32
_, Value
_, StreamVersion
_) -> Int64
a) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
Int32, Value, StreamVersion)
-> Int64)
-> Params Int64
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value Int64 -> Params Int64
forall a. NullableOrNot Value a -> Params a
E.param (Value Int64 -> NullableOrNot Value Int64
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int64
E.int8))
Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(Int64
_, Int32
b, EventId
_, StreamId
_, Maybe UUID
_, UTCTime
_, Text
_, Int32
_, Value
_, StreamVersion
_) -> Int32
b) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
Int32, Value, StreamVersion)
-> Int32)
-> Params Int32
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value Int32 -> Params Int32
forall a. NullableOrNot Value a -> Params a
E.param (Value Int32 -> NullableOrNot Value Int32
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int32
E.int4))
Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(Int64
_, Int32
_, EventId UUID
c, StreamId
_, Maybe UUID
_, UTCTime
_, Text
_, Int32
_, Value
_, StreamVersion
_) -> UUID
c) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
Int32, Value, StreamVersion)
-> UUID)
-> Params UUID
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
E.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value UUID
E.uuid))
Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(Int64
_, Int32
_, EventId
_, StreamId UUID
d, Maybe UUID
_, UTCTime
_, Text
_, Int32
_, Value
_, StreamVersion
_) -> UUID
d) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
Int32, Value, StreamVersion)
-> UUID)
-> Params UUID
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
E.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value UUID
E.uuid))
Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(Int64
_, Int32
_, EventId
_, StreamId
_, Maybe UUID
e, UTCTime
_, Text
_, Int32
_, Value
_, StreamVersion
_) -> Maybe UUID
e) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
Int32, Value, StreamVersion)
-> Maybe UUID)
-> Params (Maybe UUID)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value (Maybe UUID) -> Params (Maybe UUID)
forall a. NullableOrNot Value a -> Params a
E.param (Value UUID -> NullableOrNot Value (Maybe UUID)
forall (encoder :: * -> *) a.
encoder a -> NullableOrNot encoder (Maybe a)
E.nullable Value UUID
E.uuid))
Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(Int64
_, Int32
_, EventId
_, StreamId
_, Maybe UUID
_, UTCTime
f, Text
_, Int32
_, Value
_, StreamVersion
_) -> UTCTime
f) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
Int32, Value, StreamVersion)
-> UTCTime)
-> Params UTCTime
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value UTCTime -> Params UTCTime
forall a. NullableOrNot Value a -> Params a
E.param (Value UTCTime -> NullableOrNot Value UTCTime
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value UTCTime
E.timestamptz))
Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(Int64
_, Int32
_, EventId
_, StreamId
_, Maybe UUID
_, UTCTime
_, Text
g, Int32
_, Value
_, StreamVersion
_) -> Text
g) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
Int32, Value, StreamVersion)
-> Text)
-> Params Text
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
E.param (Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Text
E.text))
Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(Int64
_, Int32
_, EventId
_, StreamId
_, Maybe UUID
_, UTCTime
_, Text
_, Int32
h, Value
_, StreamVersion
_) -> Int32
h) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
Int32, Value, StreamVersion)
-> Int32)
-> Params Int32
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value Int32 -> Params Int32
forall a. NullableOrNot Value a -> Params a
E.param (Value Int32 -> NullableOrNot Value Int32
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int32
E.int4))
Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(Int64
_, Int32
_, EventId
_, StreamId
_, Maybe UUID
_, UTCTime
_, Text
_, Int32
_, Value
i, StreamVersion
_) -> Value
i) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
Int32, Value, StreamVersion)
-> Value)
-> Params Value
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value Value -> Params Value
forall a. NullableOrNot Value a -> Params a
E.param (Value Value -> NullableOrNot Value Value
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Value
E.jsonb))
Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(Int64
_, Int32
_, EventId
_, StreamId
_, Maybe UUID
_, UTCTime
_, Text
_, Int32
_, Value
_, StreamVersion Int64
j) -> Int64
j) ((Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text,
Int32, Value, StreamVersion)
-> Int64)
-> Params Int64
-> Params
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value Int64 -> Params Int64
forall a. NullableOrNot Value a -> Params a
E.param (Value Int64 -> NullableOrNot Value Int64
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int64
E.int8))
updateStreamHeadStatement :: Statement (StreamId, Int64, Int32, EventId, StreamVersion) ()
updateStreamHeadStatement :: Statement (StreamId, Int64, Int32, EventId, StreamVersion) ()
updateStreamHeadStatement = ByteString
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
-> Result ()
-> Bool
-> Statement (StreamId, Int64, Int32, EventId, StreamVersion) ()
forall params result.
ByteString
-> Params params
-> Result result
-> Bool
-> Statement params result
Statement ByteString
sql Params (StreamId, Int64, Int32, EventId, StreamVersion)
encoder Result ()
D.noResult Bool
True
where
sql :: ByteString
sql =
ByteString
"INSERT INTO stream_heads (\
\ stream_id, latest_transaction_no, latest_seq_no, last_event_id, stream_version\
\) VALUES ($1, $2, $3, $4, $5)\
\ ON CONFLICT (stream_id) DO UPDATE SET\
\ latest_transaction_no = excluded.latest_transaction_no,\
\ latest_seq_no = excluded.latest_seq_no,\
\ last_event_id = excluded.last_event_id,\
\ stream_version = excluded.stream_version"
encoder :: Params (StreamId, Int64, Int32, EventId, StreamVersion)
encoder =
((\(StreamId UUID
a, Int64
_, Int32
_, EventId
_, StreamVersion
_) -> UUID
a) ((StreamId, Int64, Int32, EventId, StreamVersion) -> UUID)
-> Params UUID
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
E.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value UUID
E.uuid))
Params (StreamId, Int64, Int32, EventId, StreamVersion)
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(StreamId
_, Int64
b, Int32
_, EventId
_, StreamVersion
_) -> Int64
b) ((StreamId, Int64, Int32, EventId, StreamVersion) -> Int64)
-> Params Int64
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value Int64 -> Params Int64
forall a. NullableOrNot Value a -> Params a
E.param (Value Int64 -> NullableOrNot Value Int64
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int64
E.int8))
Params (StreamId, Int64, Int32, EventId, StreamVersion)
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(StreamId
_, Int64
_, Int32
c, EventId
_, StreamVersion
_) -> Int32
c) ((StreamId, Int64, Int32, EventId, StreamVersion) -> Int32)
-> Params Int32
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value Int32 -> Params Int32
forall a. NullableOrNot Value a -> Params a
E.param (Value Int32 -> NullableOrNot Value Int32
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int32
E.int4))
Params (StreamId, Int64, Int32, EventId, StreamVersion)
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(StreamId
_, Int64
_, Int32
_, EventId UUID
d, StreamVersion
_) -> UUID
d) ((StreamId, Int64, Int32, EventId, StreamVersion) -> UUID)
-> Params UUID
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
E.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value UUID
E.uuid))
Params (StreamId, Int64, Int32, EventId, StreamVersion)
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
forall a. Semigroup a => a -> a -> a
<> ((\(StreamId
_, Int64
_, Int32
_, EventId
_, StreamVersion Int64
e) -> Int64
e) ((StreamId, Int64, Int32, EventId, StreamVersion) -> Int64)
-> Params Int64
-> Params (StreamId, Int64, Int32, EventId, StreamVersion)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< NullableOrNot Value Int64 -> Params Int64
forall a. NullableOrNot Value a -> Params a
E.param (Value Int64 -> NullableOrNot Value Int64
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value Int64
E.int8))
getCurrentStreamVersionStmt :: Statement UUID (Maybe StreamVersion)
getCurrentStreamVersionStmt :: Statement UUID (Maybe StreamVersion)
getCurrentStreamVersionStmt = ByteString
-> Params UUID
-> Result (Maybe StreamVersion)
-> Bool
-> Statement UUID (Maybe StreamVersion)
forall params result.
ByteString
-> Params params
-> Result result
-> Bool
-> Statement params result
Statement ByteString
sql Params UUID
encoder Result (Maybe StreamVersion)
decoder Bool
True
where
sql :: ByteString
sql = ByteString
"SELECT stream_version FROM stream_heads WHERE stream_id = $1"
encoder :: Params UUID
encoder = NullableOrNot Value UUID -> Params UUID
forall a. NullableOrNot Value a -> Params a
E.param (Value UUID -> NullableOrNot Value UUID
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
E.nonNullable Value UUID
E.uuid)
decoder :: Result (Maybe StreamVersion)
decoder = Row StreamVersion -> Result (Maybe StreamVersion)
forall a. Row a -> Result (Maybe a)
D.rowMaybe (Row StreamVersion -> Result (Maybe StreamVersion))
-> Row StreamVersion -> Result (Maybe StreamVersion)
forall a b. (a -> b) -> a -> b
$ NullableOrNot Value StreamVersion -> Row StreamVersion
forall a. NullableOrNot Value a -> Row a
D.column (NullableOrNot Value StreamVersion -> Row StreamVersion)
-> NullableOrNot Value StreamVersion -> Row StreamVersion
forall a b. (a -> b) -> a -> b
$ Value StreamVersion -> NullableOrNot Value StreamVersion
forall (decoder :: * -> *) a. decoder a -> NullableOrNot decoder a
D.nonNullable (Value StreamVersion -> NullableOrNot Value StreamVersion)
-> Value StreamVersion -> NullableOrNot Value StreamVersion
forall a b. (a -> b) -> a -> b
$ (Int64 -> StreamVersion) -> Value Int64 -> Value StreamVersion
forall a b. (a -> b) -> Value a -> Value b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Int64 -> StreamVersion
StreamVersion Value Int64
D.int8
calculateStreamVersions ::
Map StreamId (StreamWrite t SomeLatestEvent SQLStore) ->
HasqlTransaction.Transaction (Map StreamId StreamVersion)
calculateStreamVersions :: forall (t :: * -> *).
Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> Transaction (Map StreamId StreamVersion)
calculateStreamVersions Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
eventBatches = do
currentVersions <- (StreamId -> Transaction StreamVersion)
-> [StreamId] -> Transaction [StreamVersion]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM StreamId -> Transaction StreamVersion
getCurrentStreamVersion (Map StreamId (StreamWrite t SomeLatestEvent SQLStore) -> [StreamId]
forall k a. Map k a -> [k]
Map.keys Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
eventBatches)
pure $ Map.fromList $ zip (Map.keys eventBatches) currentVersions
where
getCurrentStreamVersion :: StreamId -> HasqlTransaction.Transaction StreamVersion
getCurrentStreamVersion :: StreamId -> Transaction StreamVersion
getCurrentStreamVersion (StreamId UUID
streamUUID) = do
result <- UUID
-> Statement UUID (Maybe StreamVersion)
-> Transaction (Maybe StreamVersion)
forall a b. a -> Statement a b -> Transaction b
HasqlTransaction.statement UUID
streamUUID Statement UUID (Maybe StreamVersion)
getCurrentStreamVersionStmt
pure $ maybe (StreamVersion 0) id result
nextStreamVersions ::
(Foldable t) =>
Map StreamId (StreamWrite t SomeLatestEvent SQLStore) ->
Map StreamId StreamVersion ->
Map StreamId [StreamVersion]
nextStreamVersions :: forall (t :: * -> *).
Foldable t =>
Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> Map StreamId StreamVersion -> Map StreamId [StreamVersion]
nextStreamVersions Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
eventBatches Map StreamId StreamVersion
currentVersions =
(StreamId
-> StreamWrite t SomeLatestEvent SQLStore -> [StreamVersion])
-> Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> Map StreamId [StreamVersion]
forall k a b. (k -> a -> b) -> Map k a -> Map k b
Map.mapWithKey StreamId
-> StreamWrite t SomeLatestEvent SQLStore -> [StreamVersion]
forall {t :: * -> *} {p} {a}.
(Foldable t, HasField "events" p (t a)) =>
StreamId -> p -> [StreamVersion]
calculateNextVersionsForStream Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
eventBatches
where
calculateNextVersionsForStream :: StreamId -> p -> [StreamVersion]
calculateNextVersionsForStream StreamId
streamId p
batch =
let currentVersion :: StreamVersion
currentVersion = StreamVersion
-> StreamId -> Map StreamId StreamVersion -> StreamVersion
forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault (Int64 -> StreamVersion
StreamVersion Int64
0) StreamId
streamId Map StreamId StreamVersion
currentVersions
eventCount :: Int
eventCount = [a] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([a] -> Int) -> [a] -> Int
forall a b. (a -> b) -> a -> b
$ t a -> [a]
forall a. t a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Foldable.toList p
batch.events
in [StreamVersion
currentVersion StreamVersion -> StreamVersion -> StreamVersion
forall a. Num a => a -> a -> a
+ StreamVersion
1 .. StreamVersion
currentVersion StreamVersion -> StreamVersion -> StreamVersion
forall a. Num a => a -> a -> a
+ Int -> StreamVersion
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
eventCount]
insertEventsWithSyncProjections ::
forall t.
(Traversable t) =>
SyncProjectionRegistry ->
Maybe CorrelationId ->
Map StreamId (StreamWrite t SomeLatestEvent SQLStore) ->
IO (HasqlTransaction.Transaction InsertedEvents)
insertEventsWithSyncProjections :: forall (t :: * -> *).
Traversable t =>
SyncProjectionRegistry
-> Maybe CorrelationId
-> Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> IO (Transaction InsertedEvents)
insertEventsWithSyncProjections SyncProjectionRegistry
syncRegistry Maybe CorrelationId
correlationId Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
eventBatches = do
eventIds <- Int -> IO UUID -> IO [UUID]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
totalEventCount IO UUID
UUID.nextRandom
createdAt <- getCurrentTime
pure $ do
txNo <- HasqlTransaction.statement () getTransactionNumber
HasqlTransaction.statement txNo insertTransactionNumber
currentVersions <- calculateStreamVersions eventBatches
let streamVersionMap = Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> Map StreamId StreamVersion -> Map StreamId [StreamVersion]
forall (t :: * -> *).
Foldable t =>
Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> Map StreamId StreamVersion -> Map StreamId [StreamVersion]
nextStreamVersions Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
eventBatches Map StreamId StreamVersion
currentVersions
streamHeadMetadata <- processEventsWithUnifiedMetadata syncRegistry correlationId eventIds createdAt streamVersionMap txNo eventBatches
let perStreamCursors =
(StreamId -> (UUID, Int32) -> SQLCursor)
-> Map StreamId (UUID, Int32) -> Map StreamId SQLCursor
forall k a b. (k -> a -> b) -> Map k a -> Map k b
Map.mapWithKey
(\StreamId
_ (UUID
_lastEventId, Int32
lastSeqNo) -> Int64 -> Int32 -> SQLCursor
SQLCursor Int64
txNo Int32
lastSeqNo)
Map StreamId (UUID, Int32)
streamHeadMetadata
pure
InsertedEvents
{ finalCursor =
SQLCursor
{ transactionNo = txNo
, sequenceNo = fromIntegral totalEventCount - 1
}
, streamCursors = perStreamCursors
, insertedStreamIds = Map.keys eventBatches
, eventIds = eventIds
, createdAt = createdAt
, streamVersions = streamVersionMap
}
where
totalEventCount :: Int
totalEventCount = [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int] -> Int) -> [Int] -> Int
forall a b. (a -> b) -> a -> b
$ (StreamWrite t SomeLatestEvent SQLStore -> Int)
-> [StreamWrite t SomeLatestEvent SQLStore] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (t SomeLatestEvent -> Int
forall a. t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (t SomeLatestEvent -> Int)
-> (StreamWrite t SomeLatestEvent SQLStore -> t SomeLatestEvent)
-> StreamWrite t SomeLatestEvent SQLStore
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (.events)) ([StreamWrite t SomeLatestEvent SQLStore] -> [Int])
-> [StreamWrite t SomeLatestEvent SQLStore] -> [Int]
forall a b. (a -> b) -> a -> b
$ Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> [StreamWrite t SomeLatestEvent SQLStore]
forall k a. Map k a -> [a]
Map.elems Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
eventBatches
processEventsWithUnifiedMetadata ::
SyncProjectionRegistry ->
Maybe CorrelationId ->
[UUID] ->
UTCTime ->
Map StreamId [StreamVersion] ->
Int64 ->
Map StreamId (StreamWrite t SomeLatestEvent SQLStore) ->
HasqlTransaction.Transaction (Map StreamId (UUID, Int32))
processEventsWithUnifiedMetadata :: SyncProjectionRegistry
-> Maybe CorrelationId
-> [UUID]
-> UTCTime
-> Map StreamId [StreamVersion]
-> Int64
-> Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> Transaction (Map StreamId (UUID, Int32))
processEventsWithUnifiedMetadata SyncProjectionRegistry
registry Maybe CorrelationId
corrId [UUID]
allEventIds UTCTime
createdAtTime Map StreamId [StreamVersion]
streamVersionMap Int64
txNo Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
batches = do
let streamList :: [(StreamId, StreamWrite t SomeLatestEvent SQLStore)]
streamList = Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> [(StreamId, StreamWrite t SomeLatestEvent SQLStore)]
forall k a. Map k a -> [(k, a)]
Map.toList Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
batches
(finalSeqNoOffset, _, streamHeadMetadata) <- ((Int32, [UUID], Map StreamId (UUID, Int32))
-> (StreamId, StreamWrite t SomeLatestEvent SQLStore)
-> Transaction (Int32, [UUID], Map StreamId (UUID, Int32)))
-> (Int32, [UUID], Map StreamId (UUID, Int32))
-> [(StreamId, StreamWrite t SomeLatestEvent SQLStore)]
-> Transaction (Int32, [UUID], Map StreamId (UUID, Int32))
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM (Int32, [UUID], Map StreamId (UUID, Int32))
-> (StreamId, StreamWrite t SomeLatestEvent SQLStore)
-> Transaction (Int32, [UUID], Map StreamId (UUID, Int32))
forall {t :: * -> *} {b}.
(Foldable t, HasField "events" b (t SomeLatestEvent)) =>
(Int32, [UUID], Map StreamId (UUID, Int32))
-> (StreamId, b)
-> Transaction (Int32, [UUID], Map StreamId (UUID, Int32))
processStream (Int32
0, [UUID]
allEventIds, Map StreamId (UUID, Int32)
forall k a. Map k a
Map.empty) [(StreamId, StreamWrite t SomeLatestEvent SQLStore)]
streamList
let SyncProjectionRegistry projMap = registry
finalCursor = Int64 -> Int32 -> SQLCursor
SQLCursor Int64
txNo (Int32
finalSeqNoOffset Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
- Int32
1)
forM_ (Map.keys projMap) $ \ProjectionId
projId ->
ProjectionId -> SQLCursor -> Transaction ()
updateSyncProjectionState ProjectionId
projId SQLCursor
finalCursor
updateStreamHeadsFromVersionMap streamVersionMap streamHeadMetadata txNo
pure streamHeadMetadata
where
processStream :: (Int32, [UUID], Map StreamId (UUID, Int32))
-> (StreamId, b)
-> Transaction (Int32, [UUID], Map StreamId (UUID, Int32))
processStream (Int32
seqNoOffset, [UUID]
remainingIds, Map StreamId (UUID, Int32)
headMetadata) (StreamId
streamId, b
batch) = do
let events :: [SomeLatestEvent]
events = t SomeLatestEvent -> [SomeLatestEvent]
forall a. t a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Foldable.toList b
batch.events
numEvents :: Int
numEvents = [SomeLatestEvent] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [SomeLatestEvent]
events
([UUID]
streamEventIds, [UUID]
leftoverIds) = Int -> [UUID] -> ([UUID], [UUID])
forall a. Int -> [a] -> ([a], [a])
splitAt Int
numEvents [UUID]
remainingIds
streamVersions :: [StreamVersion]
streamVersions = [StreamVersion]
-> StreamId -> Map StreamId [StreamVersion] -> [StreamVersion]
forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault [] StreamId
streamId Map StreamId [StreamVersion]
streamVersionMap
[((SomeLatestEvent, UUID, StreamVersion), Int32)]
-> (((SomeLatestEvent, UUID, StreamVersion), Int32)
-> Transaction ())
-> Transaction ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([SomeLatestEvent]
-> [UUID]
-> [StreamVersion]
-> [(SomeLatestEvent, UUID, StreamVersion)]
forall a b c. [a] -> [b] -> [c] -> [(a, b, c)]
zip3 [SomeLatestEvent]
events [UUID]
streamEventIds [StreamVersion]
streamVersions [(SomeLatestEvent, UUID, StreamVersion)]
-> [Int32] -> [((SomeLatestEvent, UUID, StreamVersion), Int32)]
forall a b. [a] -> [b] -> [(a, b)]
`zip` [Int32
0 :: Int32 ..]) ((((SomeLatestEvent, UUID, StreamVersion), Int32)
-> Transaction ())
-> Transaction ())
-> (((SomeLatestEvent, UUID, StreamVersion), Int32)
-> Transaction ())
-> Transaction ()
forall a b. (a -> b) -> a -> b
$
\((SomeLatestEvent
event, UUID
eventId, StreamVersion
streamVer), Int32
idx) -> do
let seqNo :: Int32
seqNo = Int32
seqNoOffset Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
idx
cursor :: SQLCursor
cursor = Int64 -> Int32 -> SQLCursor
SQLCursor Int64
txNo Int32
seqNo
UUID
-> StreamId
-> SQLCursor
-> Maybe CorrelationId
-> UTCTime
-> StreamVersion
-> SomeLatestEvent
-> Transaction ()
processEventWithUnifiedMetadata UUID
eventId StreamId
streamId SQLCursor
cursor Maybe CorrelationId
corrId UTCTime
createdAtTime StreamVersion
streamVer SomeLatestEvent
event
let SyncProjectionRegistry Map ProjectionId (SomeProjectionHandlers SQLStore)
projMap = SyncProjectionRegistry
registry
finalSeqNo :: Int32
finalSeqNo = Int32
seqNoOffset Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
numEvents Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
- Int32
1
finalCursor :: SQLCursor
finalCursor = Int64 -> Int32 -> SQLCursor
SQLCursor Int64
txNo Int32
finalSeqNo
[ProjectionId]
-> (ProjectionId -> Transaction ()) -> Transaction ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map ProjectionId (SomeProjectionHandlers SQLStore)
-> [ProjectionId]
forall k a. Map k a -> [k]
Map.keys Map ProjectionId (SomeProjectionHandlers SQLStore)
projMap) ((ProjectionId -> Transaction ()) -> Transaction ())
-> (ProjectionId -> Transaction ()) -> Transaction ()
forall a b. (a -> b) -> a -> b
$ \ProjectionId
projId ->
ProjectionId -> SQLCursor -> Transaction ()
updateSyncProjectionState ProjectionId
projId SQLCursor
finalCursor
let updatedMetadata :: Map StreamId (UUID, Int32)
updatedMetadata = case [UUID]
streamEventIds of
[] -> Map StreamId (UUID, Int32)
headMetadata
[UUID]
_ ->
let lastEventId :: UUID
lastEventId = [UUID] -> UUID
forall a. HasCallStack => [a] -> a
Prelude.last [UUID]
streamEventIds
lastSeqNo :: Int32
lastSeqNo = Int32
seqNoOffset Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
numEvents Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
- Int32
1
in StreamId
-> (UUID, Int32)
-> Map StreamId (UUID, Int32)
-> Map StreamId (UUID, Int32)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert StreamId
streamId (UUID
lastEventId, Int32
lastSeqNo) Map StreamId (UUID, Int32)
headMetadata
(Int32, [UUID], Map StreamId (UUID, Int32))
-> Transaction (Int32, [UUID], Map StreamId (UUID, Int32))
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int32
seqNoOffset Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
numEvents, [UUID]
leftoverIds, Map StreamId (UUID, Int32)
updatedMetadata)
processEventWithUnifiedMetadata :: UUID -> StreamId -> SQLCursor -> Maybe CorrelationId -> UTCTime -> StreamVersion -> SomeLatestEvent -> HasqlTransaction.Transaction ()
processEventWithUnifiedMetadata :: UUID
-> StreamId
-> SQLCursor
-> Maybe CorrelationId
-> UTCTime
-> StreamVersion
-> SomeLatestEvent
-> Transaction ()
processEventWithUnifiedMetadata UUID
eventId StreamId
streamId SQLCursor
cursor Maybe CorrelationId
corrId' UTCTime
timestamp StreamVersion
streamVer (SomeLatestEvent Proxy event
eventProxy CurrentPayloadType event
payload) = do
let envelope :: EventEnvelope event SQLStore
envelope =
EventWithMetadata
{ position :: Cursor SQLStore
position = Cursor SQLStore
SQLCursor
cursor
, eventId :: EventId
eventId = UUID -> EventId
EventId UUID
eventId
, streamId :: StreamId
streamId = StreamId
streamId
, streamVersion :: StreamVersion
streamVersion = StreamVersion
streamVer
, correlationId :: Maybe CorrelationId
correlationId = Maybe CorrelationId
corrId'
, createdAt :: UTCTime
createdAt = UTCTime
timestamp
, payload :: CurrentPayloadType event
payload = CurrentPayloadType event
payload
}
EventEnvelope event SQLStore
-> StreamVersion
-> Proxy event
-> FinalVersionType (FromList (Versions event))
-> Transaction ()
forall {event :: Symbol} {a} {a} {r}.
(AssertPeanoEqual
(ListLength (Versions event))
('PeanoSucc (ToPeanoNat (MaxVersion event)))
(((((('Text "Version count mismatch for event '" ':<>: 'Text event)
':<>: 'Text "'")
':$$: (('Text " MaxVersion declares "
':<>: 'ShowType (1 + FromPeanoNat (ToPeanoNat (MaxVersion event))))
':<>: 'Text " versions"))
':$$: (('Text " But Versions list has "
':<>: 'ShowType (FromPeanoNat (ListLength (Versions event))))
':<>: 'Text " elements"))
':$$: 'Text "")
':$$: 'Text
"Hint: Check that (MaxVersion event + 1) matches the length of your Versions list"),
Event event, ToJSON a,
ToJSON (FinalVersionType (FromList (Versions event))),
HasEvidenceList
'PeanoZero
('PeanoSucc (ToPeanoNat (MaxVersion event)))
event
ValidPayloadForVersion
(FromList (Versions event)),
HasField "toUUID" a UUID, HasField "correlationId" r (Maybe a),
HasField "createdAt" r UTCTime, HasField "eventId" r EventId,
HasField "position" r SQLCursor, HasField "streamId" r StreamId) =>
r -> StreamVersion -> Proxy event -> a -> Transaction ()
insertSingleEventFromEnvelope EventEnvelope event SQLStore
envelope StreamVersion
streamVer Proxy event
eventProxy FinalVersionType (FromList (Versions event))
CurrentPayloadType event
payload
SyncProjectionRegistry
-> Proxy event -> EventEnvelope event SQLStore -> Transaction ()
forall (event :: Symbol).
Event event =>
SyncProjectionRegistry
-> Proxy event -> EventEnvelope event SQLStore -> Transaction ()
executeSyncProjectionForEvent SyncProjectionRegistry
registry Proxy event
eventProxy EventEnvelope event SQLStore
envelope
insertSingleEventFromEnvelope :: r -> StreamVersion -> Proxy event -> a -> Transaction ()
insertSingleEventFromEnvelope r
envelope StreamVersion
streamVer Proxy event
proxy a
payload = do
let SQLCursor Int64
cursorTxNo Int32
cursorSeqNo = r
envelope.position
EventId UUID
eventId = r
envelope.eventId
name :: Text
name = Proxy event -> Text
forall (event :: Symbol). KnownSymbol event => Proxy event -> Text
getEventName Proxy event
proxy
ver :: Integer
ver = Proxy event -> Integer
forall (event :: Symbol). Event event => Proxy event -> Integer
getMaxVersion Proxy event
proxy
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
-> Statement
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
()
-> Transaction ()
forall a b. a -> Statement a b -> Transaction b
HasqlTransaction.statement
( Int64
cursorTxNo
, Int32
cursorSeqNo
, UUID -> EventId
EventId UUID
eventId
, r
envelope.streamId
, (.toUUID) (a -> UUID) -> Maybe a -> Maybe UUID
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> r
envelope.correlationId
, r
envelope.createdAt
, Text
name
, Integer -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
ver
, a -> Value
forall a. ToJSON a => a -> Value
toJSON a
payload
, StreamVersion
streamVer
)
Statement
(Int64, Int32, EventId, StreamId, Maybe UUID, UTCTime, Text, Int32,
Value, StreamVersion)
()
insertEventStatement
updateStreamHeadsFromVersionMap :: Map StreamId [StreamVersion]
-> Map StreamId (UUID, Int32) -> Int64 -> Transaction ()
updateStreamHeadsFromVersionMap Map StreamId [StreamVersion]
versionMap Map StreamId (UUID, Int32)
headMetadata Int64
txNo' = do
[(StreamId, [StreamVersion])]
-> ((StreamId, [StreamVersion]) -> Transaction ())
-> Transaction ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map StreamId [StreamVersion] -> [(StreamId, [StreamVersion])]
forall k a. Map k a -> [(k, a)]
Map.toList Map StreamId [StreamVersion]
versionMap) (((StreamId, [StreamVersion]) -> Transaction ()) -> Transaction ())
-> ((StreamId, [StreamVersion]) -> Transaction ())
-> Transaction ()
forall a b. (a -> b) -> a -> b
$ \(StreamId
streamId, [StreamVersion]
versions) -> do
case [StreamVersion] -> [StreamVersion]
forall a. [a] -> [a]
reverse [StreamVersion]
versions of
[] -> () -> Transaction ()
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
(StreamVersion
finalVersion : [StreamVersion]
_) -> do
case StreamId -> Map StreamId (UUID, Int32) -> Maybe (UUID, Int32)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup StreamId
streamId Map StreamId (UUID, Int32)
headMetadata of
Maybe (UUID, Int32)
Nothing -> () -> Transaction ()
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just (UUID
lastEventId, Int32
lastSeqNo) ->
(StreamId, Int64, Int32, EventId, StreamVersion)
-> Statement (StreamId, Int64, Int32, EventId, StreamVersion) ()
-> Transaction ()
forall a b. a -> Statement a b -> Transaction b
HasqlTransaction.statement
(StreamId
streamId, Int64
txNo', Int32
lastSeqNo, UUID -> EventId
EventId UUID
lastEventId, StreamVersion
finalVersion)
Statement (StreamId, Int64, Int32, EventId, StreamVersion) ()
updateStreamHeadStatement