{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Hindsight.Store.PostgreSQL (
newSQLStore,
newSQLStoreWithProjections,
shutdownSQLStore,
SQLStoreHandle,
getPool,
getConnectionString,
SQLStore,
SQLCursor (..),
SyncProjectionRegistry,
emptySyncProjectionRegistry,
registerSyncProjection,
insertEventsWithSyncProjections,
createSQLSchema,
module Hindsight.Store,
)
where
import Control.Exception (SomeException (..), throwIO)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Data.ByteString (ByteString)
import Data.Text (Text, pack)
import Data.Text qualified
import Data.Text.Encoding (decodeUtf8)
import Hasql.Connection.Setting qualified as ConnectionSetting
import Hasql.Connection.Setting.Connection qualified as ConnectionSettingConnection
import Hasql.Pool (Pool)
import Hasql.Pool qualified as Pool
import Hasql.Pool.Config qualified as Config
import Hasql.Session (Session)
import Hasql.Transaction.Sessions qualified as Session
import Hindsight.Projection (ProjectionId (..))
import Hindsight.Store
import Hindsight.Store.PostgreSQL.Core.Schema (createSchema)
import Hindsight.Store.PostgreSQL.Core.Types (SQLCursor (..), SQLStore, SQLStoreHandle (..), SyncProjectionRegistry)
import Hindsight.Store.PostgreSQL.Events.Concurrency (checkVersions)
import Hindsight.Store.PostgreSQL.Events.Insertion qualified as Insertion
import Hindsight.Store.PostgreSQL.Events.Subscription qualified as Subscription
import Hindsight.Store.PostgreSQL.Projections.Sync (CatchUpError (..), catchUpSyncProjections, emptySyncProjectionRegistry, registerSyncProjection)
import UnliftIO (MonadUnliftIO)
createSQLSchema :: Session ()
createSQLSchema :: Session ()
createSQLSchema = Session ()
createSchema
getPool :: SQLStoreHandle -> Pool
getPool :: SQLStoreHandle -> Pool
getPool = (.pool)
getConnectionString :: SQLStoreHandle -> ByteString
getConnectionString :: SQLStoreHandle -> ByteString
getConnectionString = (.connectionString)
newSQLStore :: ByteString -> IO SQLStoreHandle
newSQLStore :: ByteString -> IO SQLStoreHandle
newSQLStore ByteString
connectionString = ByteString -> SyncProjectionRegistry -> IO SQLStoreHandle
newSQLStoreWithProjections ByteString
connectionString SyncProjectionRegistry
emptySyncProjectionRegistry
newSQLStoreWithProjections :: ByteString -> SyncProjectionRegistry -> IO SQLStoreHandle
newSQLStoreWithProjections :: ByteString -> SyncProjectionRegistry -> IO SQLStoreHandle
newSQLStoreWithProjections ByteString
connectionString SyncProjectionRegistry
syncProjRegistry = do
let connectionSettings :: [Setting]
connectionSettings = [Connection -> Setting
ConnectionSetting.connection (Connection -> Setting) -> Connection -> Setting
forall a b. (a -> b) -> a -> b
$ Text -> Connection
ConnectionSettingConnection.string (ByteString -> Text
decodeUtf8 ByteString
connectionString)]
hasqlConfig :: Config
hasqlConfig = [Setting] -> Config
Config.settings [Int -> Setting
Config.size Int
300, [Setting] -> Setting
Config.staticConnectionSettings [Setting]
connectionSettings]
pool <- Config -> IO Pool
Pool.acquire Config
hasqlConfig
catchUpResult <- catchUpSyncProjections pool syncProjRegistry
case catchUpResult of
Left CatchUpError
err -> do
let errorMsg :: Text
errorMsg = case CatchUpError
err of
ProjectionExecutionError ProjectionId
projId Text
msg ->
String -> Text
pack String
"Sync projection catch-up failed for " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> ProjectionId -> Text
unProjectionId ProjectionId
projId Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
pack String
": " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
msg
DatabaseError Text
msg ->
String -> Text
pack String
"Database error during sync projection catch-up: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
msg
CatchUpError
NoActiveProjections ->
String -> Text
pack String
"No active sync projections found in database"
IOError -> IO SQLStoreHandle
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (IOError -> IO SQLStoreHandle) -> IOError -> IO SQLStoreHandle
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ Text -> String
Data.Text.unpack Text
errorMsg
Right () -> do
notifier <- ByteString -> IO Notifier
forall (m :: * -> *). MonadIO m => ByteString -> m Notifier
Subscription.startNotifier ByteString
connectionString
pure $ SQLStoreHandle pool connectionString syncProjRegistry notifier
instance EventStore SQLStore where
type StoreConstraints SQLStore m = MonadUnliftIO m
insertEvents ::
(Traversable t, MonadUnliftIO m) =>
BackendHandle SQLStore ->
Maybe CorrelationId ->
Transaction t SQLStore ->
m (InsertionResult SQLStore)
insertEvents :: forall (t :: * -> *) (m :: * -> *).
(Traversable t, MonadUnliftIO m) =>
BackendHandle SQLStore
-> Maybe CorrelationId
-> Transaction t SQLStore
-> m (InsertionResult SQLStore)
insertEvents BackendHandle SQLStore
handle Maybe CorrelationId
corrId Transaction t SQLStore
transaction =
SQLStoreHandle
-> SyncProjectionRegistry
-> Maybe CorrelationId
-> Transaction t SQLStore
-> m (InsertionResult SQLStore)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, MonadUnliftIO m) =>
SQLStoreHandle
-> SyncProjectionRegistry
-> Maybe CorrelationId
-> Transaction t SQLStore
-> m (InsertionResult SQLStore)
insertEventsWithSyncProjections BackendHandle SQLStore
SQLStoreHandle
handle BackendHandle SQLStore
SQLStoreHandle
handle.syncProjectionRegistry Maybe CorrelationId
corrId Transaction t SQLStore
transaction
subscribe :: forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints SQLStore m =>
BackendHandle SQLStore
-> EventMatcher ts SQLStore m
-> EventSelector SQLStore
-> m (SubscriptionHandle SQLStore)
subscribe BackendHandle SQLStore
handle EventMatcher ts SQLStore m
matcher EventSelector SQLStore
selector = SQLStoreHandle
-> EventMatcher ts SQLStore m
-> EventSelector SQLStore
-> m (SubscriptionHandle SQLStore)
forall (m :: * -> *) (ts :: [Symbol]).
MonadUnliftIO m =>
SQLStoreHandle
-> EventMatcher ts SQLStore m
-> EventSelector SQLStore
-> m (SubscriptionHandle SQLStore)
Subscription.subscribe BackendHandle SQLStore
SQLStoreHandle
handle EventMatcher ts SQLStore m
matcher EventSelector SQLStore
selector
insertEventsWithSyncProjections ::
(Traversable t, MonadUnliftIO m) =>
SQLStoreHandle ->
SyncProjectionRegistry ->
Maybe CorrelationId ->
Transaction t SQLStore ->
m (InsertionResult SQLStore)
insertEventsWithSyncProjections :: forall (t :: * -> *) (m :: * -> *).
(Traversable t, MonadUnliftIO m) =>
SQLStoreHandle
-> SyncProjectionRegistry
-> Maybe CorrelationId
-> Transaction t SQLStore
-> m (InsertionResult SQLStore)
insertEventsWithSyncProjections SQLStoreHandle
handle SyncProjectionRegistry
syncRegistry Maybe CorrelationId
corrId (Transaction Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
batches) = IO (InsertionResult SQLStore) -> m (InsertionResult SQLStore)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (InsertionResult SQLStore) -> m (InsertionResult SQLStore))
-> IO (InsertionResult SQLStore) -> m (InsertionResult SQLStore)
forall a b. (a -> b) -> a -> b
$ do
tx <- SyncProjectionRegistry
-> Maybe CorrelationId
-> Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> IO (Transaction InsertedEvents)
forall (t :: * -> *).
Traversable t =>
SyncProjectionRegistry
-> Maybe CorrelationId
-> Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> IO (Transaction InsertedEvents)
Insertion.insertEventsWithSyncProjections SyncProjectionRegistry
syncRegistry Maybe CorrelationId
corrId Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
batches
result <- Pool.use handle.pool $ Session.transaction Session.Serializable Session.Write $ do
mbError <- checkVersions batches
case mbError of
Just ConsistencyErrorInfo SQLStore
err -> Either (EventStoreError SQLStore) InsertedEvents
-> Transaction (Either (EventStoreError SQLStore) InsertedEvents)
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (EventStoreError SQLStore) InsertedEvents
-> Transaction (Either (EventStoreError SQLStore) InsertedEvents))
-> Either (EventStoreError SQLStore) InsertedEvents
-> Transaction (Either (EventStoreError SQLStore) InsertedEvents)
forall a b. (a -> b) -> a -> b
$ EventStoreError SQLStore
-> Either (EventStoreError SQLStore) InsertedEvents
forall a b. a -> Either a b
Left (EventStoreError SQLStore
-> Either (EventStoreError SQLStore) InsertedEvents)
-> EventStoreError SQLStore
-> Either (EventStoreError SQLStore) InsertedEvents
forall a b. (a -> b) -> a -> b
$ ConsistencyErrorInfo SQLStore -> EventStoreError SQLStore
forall backend.
ConsistencyErrorInfo backend -> EventStoreError backend
ConsistencyError ConsistencyErrorInfo SQLStore
err
Maybe (ConsistencyErrorInfo SQLStore)
Nothing -> do
InsertedEvents -> Either (EventStoreError SQLStore) InsertedEvents
forall a b. b -> Either a b
Right (InsertedEvents
-> Either (EventStoreError SQLStore) InsertedEvents)
-> Transaction InsertedEvents
-> Transaction (Either (EventStoreError SQLStore) InsertedEvents)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Transaction InsertedEvents
tx
case result of
Left UsageError
dbError ->
InsertionResult SQLStore -> IO (InsertionResult SQLStore)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult SQLStore -> IO (InsertionResult SQLStore))
-> InsertionResult SQLStore -> IO (InsertionResult SQLStore)
forall a b. (a -> b) -> a -> b
$
EventStoreError SQLStore -> InsertionResult SQLStore
forall backend. EventStoreError backend -> InsertionResult backend
FailedInsertion (EventStoreError SQLStore -> InsertionResult SQLStore)
-> EventStoreError SQLStore -> InsertionResult SQLStore
forall a b. (a -> b) -> a -> b
$
ErrorInfo -> EventStoreError SQLStore
forall backend. ErrorInfo -> EventStoreError backend
BackendError (ErrorInfo -> EventStoreError SQLStore)
-> ErrorInfo -> EventStoreError SQLStore
forall a b. (a -> b) -> a -> b
$
ErrorInfo
{ errorMessage :: Text
errorMessage = String -> Text
pack String
"Database error: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UsageError -> Text
forall a. Show a => a -> Text
tshow UsageError
dbError
, exception :: Maybe SomeException
exception = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (SomeException -> Maybe SomeException)
-> SomeException -> Maybe SomeException
forall a b. (a -> b) -> a -> b
$ UsageError -> SomeException
forall e. (Exception e, HasExceptionContext) => e -> SomeException
SomeException UsageError
dbError
}
Right (Left EventStoreError SQLStore
err) ->
InsertionResult SQLStore -> IO (InsertionResult SQLStore)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult SQLStore -> IO (InsertionResult SQLStore))
-> InsertionResult SQLStore -> IO (InsertionResult SQLStore)
forall a b. (a -> b) -> a -> b
$ EventStoreError SQLStore -> InsertionResult SQLStore
forall backend. EventStoreError backend -> InsertionResult backend
FailedInsertion EventStoreError SQLStore
err
Right (Right InsertedEvents
insertedEvents) ->
InsertionResult SQLStore -> IO (InsertionResult SQLStore)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult SQLStore -> IO (InsertionResult SQLStore))
-> InsertionResult SQLStore -> IO (InsertionResult SQLStore)
forall a b. (a -> b) -> a -> b
$
InsertionSuccess SQLStore -> InsertionResult SQLStore
forall backend. InsertionSuccess backend -> InsertionResult backend
SuccessfulInsertion (InsertionSuccess SQLStore -> InsertionResult SQLStore)
-> InsertionSuccess SQLStore -> InsertionResult SQLStore
forall a b. (a -> b) -> a -> b
$
InsertionSuccess
{ finalCursor :: Cursor SQLStore
finalCursor = InsertedEvents -> SQLCursor
Insertion.finalCursor InsertedEvents
insertedEvents
, streamCursors :: Map StreamId (Cursor SQLStore)
streamCursors = InsertedEvents -> Map StreamId SQLCursor
Insertion.streamCursors InsertedEvents
insertedEvents
}
shutdownSQLStore :: SQLStoreHandle -> IO ()
shutdownSQLStore :: SQLStoreHandle -> IO ()
shutdownSQLStore SQLStoreHandle
handle = do
Notifier -> IO ()
Subscription.shutdownNotifier SQLStoreHandle
handle.notifier
tshow :: (Show a) => a -> Text
tshow :: forall a. Show a => a -> Text
tshow = String -> Text
pack (String -> Text) -> (a -> String) -> a -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> String
forall a. Show a => a -> String
show