{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}

{- |
Module      : Hindsight.Store.PostgreSQL
Description : PostgreSQL-backed event store with ACID guarantees
Copyright   : (c) 2024
License     : BSD3
Maintainer  : maintainer@example.com
Stability   : experimental

= Overview

PostgreSQL-backed event store providing full ACID guarantees, scalability, and
production-ready durability. Supports both synchronous and asynchronous projections.

Recommended for production systems, distributed deployments, and applications
requiring strong consistency guarantees.

= Quick Start

@
import Hindsight.Store.PostgreSQL

main :: IO ()
main = do
  -- Create store
  let connStr = "postgresql://localhost/events"
  store <- newSQLStore connStr

  -- Initialize schema (run once)
  Pool.use (getPool store) createSQLSchema

  -- Insert events (see Hindsight.Store for details)
  streamId <- StreamId \<$\> UUID.nextRandom
  let event = mkEvent MyEvent myData
  result <- insertEvents store Nothing $ singleEvent streamId NoStream event

  -- Subscribe to events
  handle <- subscribe store matcher (EventSelector AllStreams FromBeginning)
  -- ... process events ...

  -- Cleanup when done
  shutdownSQLStore store
@

= Configuration

Connection via PostgreSQL connection string. Pool size defaults to 300 connections.

For custom configuration, use 'newSQLStoreWithProjections' to register synchronous projections
that execute within event insertion transactions.

= Use Cases

__When to use PostgreSQL store:__

* Production systems requiring durability and ACID guarantees
* Distributed multi-node deployments
* High event throughput scenarios
* Large event volumes (millions+ events)
* Applications needing advanced SQL querying
* Systems requiring synchronous projections (strong consistency)

__When NOT to use PostgreSQL store:__

* Simple testing (use Memory store)
* Single-node apps without database (use Filesystem store)
* Environments where PostgreSQL can't be deployed
* Prototypes and development (unless testing PostgreSQL-specific features)

= Trade-offs

__Advantages:__

* Full ACID guarantees (PostgreSQL transactions)
* Scales to millions of events
* Distributed multi-node support
* Advanced querying via SQL
* Synchronous projections (consistency within single transaction)
* LISTEN/NOTIFY for efficient subscriptions
* Battle-tested PostgreSQL reliability

__Limitations:__

* Requires PostgreSQL database
* More complex deployment than Memory/Filesystem
* Higher resource requirements
* Network latency for remote databases

= Sync vs Async Projections

__Synchronous Projections:__ Execute within event insertion transaction. Changes to events
and projection state are atomic. Use 'registerSyncProjection' and 'newSQLStoreWithProjections'.
Guarantees strong consistency but adds latency to writes.

__Asynchronous Projections:__ Run separately using 'runProjection' from hindsight-postgresql-projections.
Process events after they're committed. Better write performance but eventual consistency.

= Implementation

Events stored in @events@ table with compound key @(transaction_no, seq_no)@ for total ordering.
Stream metadata in @stream_heads@ table. Projection state in @projections@ table.
LISTEN/NOTIFY used for efficient subscription updates.

See 'Hindsight.Store.PostgreSQL.Core.Schema' for complete schema DDL.
-}
module Hindsight.Store.PostgreSQL (
    newSQLStore,
    newSQLStoreWithProjections,
    shutdownSQLStore,
    SQLStoreHandle,
    getPool,
    getConnectionString,
    SQLStore,
    SQLCursor (..),
    SyncProjectionRegistry,
    emptySyncProjectionRegistry,
    registerSyncProjection,
    insertEventsWithSyncProjections,
    createSQLSchema,
    module Hindsight.Store,
)
where

import Control.Exception (SomeException (..), throwIO)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Data.ByteString (ByteString)
import Data.Text (Text, pack)
import Data.Text qualified
import Data.Text.Encoding (decodeUtf8)
import Hasql.Connection.Setting qualified as ConnectionSetting
import Hasql.Connection.Setting.Connection qualified as ConnectionSettingConnection
import Hasql.Pool (Pool)
import Hasql.Pool qualified as Pool
import Hasql.Pool.Config qualified as Config
import Hasql.Session (Session)
import Hasql.Transaction.Sessions qualified as Session
import Hindsight.Projection (ProjectionId (..))
import Hindsight.Store
import Hindsight.Store.PostgreSQL.Core.Schema (createSchema)
import Hindsight.Store.PostgreSQL.Core.Types (SQLCursor (..), SQLStore, SQLStoreHandle (..), SyncProjectionRegistry)
import Hindsight.Store.PostgreSQL.Events.Concurrency (checkVersions)
import Hindsight.Store.PostgreSQL.Events.Insertion qualified as Insertion
import Hindsight.Store.PostgreSQL.Events.Subscription qualified as Subscription
import Hindsight.Store.PostgreSQL.Projections.Sync (CatchUpError (..), catchUpSyncProjections, emptySyncProjectionRegistry, registerSyncProjection)
import UnliftIO (MonadUnliftIO)

-- | Re-export createSchema with a more specific name to avoid conflicts
createSQLSchema :: Session ()
createSQLSchema :: Session ()
createSQLSchema = Session ()
createSchema

{- | Get the connection pool from a store handle.

Useful for running custom queries or projections that need direct database access.
-}
getPool :: SQLStoreHandle -> Pool
getPool :: SQLStoreHandle -> Pool
getPool = (.pool)

{- | Get the connection string from a store handle.

Useful for creating additional store instances with the same configuration.
-}
getConnectionString :: SQLStoreHandle -> ByteString
getConnectionString :: SQLStoreHandle -> ByteString
getConnectionString = (.connectionString)

{- | Create a PostgreSQL event store with default configuration.

Uses a connection pool size of 10 and an empty sync projection registry.
-}
newSQLStore :: ByteString -> IO SQLStoreHandle
newSQLStore :: ByteString -> IO SQLStoreHandle
newSQLStore ByteString
connectionString = ByteString -> SyncProjectionRegistry -> IO SQLStoreHandle
newSQLStoreWithProjections ByteString
connectionString SyncProjectionRegistry
emptySyncProjectionRegistry

{- | Create a PostgreSQL event store with custom sync projections.

Sync projections run within the same transaction as event insertion,
providing guaranteed consistency between events and their projections.

On startup, this function will:
1. Register all sync projections in the database
2. Catch up any projections that are behind
3. Fail with an exception if catch-up fails
-}
newSQLStoreWithProjections :: ByteString -> SyncProjectionRegistry -> IO SQLStoreHandle
newSQLStoreWithProjections :: ByteString -> SyncProjectionRegistry -> IO SQLStoreHandle
newSQLStoreWithProjections ByteString
connectionString SyncProjectionRegistry
syncProjRegistry = do
    let connectionSettings :: [Setting]
connectionSettings = [Connection -> Setting
ConnectionSetting.connection (Connection -> Setting) -> Connection -> Setting
forall a b. (a -> b) -> a -> b
$ Text -> Connection
ConnectionSettingConnection.string (ByteString -> Text
decodeUtf8 ByteString
connectionString)]
        hasqlConfig :: Config
hasqlConfig = [Setting] -> Config
Config.settings [Int -> Setting
Config.size Int
300, [Setting] -> Setting
Config.staticConnectionSettings [Setting]
connectionSettings]
    pool <- Config -> IO Pool
Pool.acquire Config
hasqlConfig

    -- Run catch-up for sync projections
    catchUpResult <- catchUpSyncProjections pool syncProjRegistry

    case catchUpResult of
        Left CatchUpError
err -> do
            let errorMsg :: Text
errorMsg = case CatchUpError
err of
                    ProjectionExecutionError ProjectionId
projId Text
msg ->
                        String -> Text
pack String
"Sync projection catch-up failed for " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> ProjectionId -> Text
unProjectionId ProjectionId
projId Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
pack String
": " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
msg
                    DatabaseError Text
msg ->
                        String -> Text
pack String
"Database error during sync projection catch-up: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
msg
                    CatchUpError
NoActiveProjections ->
                        String -> Text
pack String
"No active sync projections found in database"
            IOError -> IO SQLStoreHandle
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (IOError -> IO SQLStoreHandle) -> IOError -> IO SQLStoreHandle
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ Text -> String
Data.Text.unpack Text
errorMsg
        Right () -> do
            -- Create centralized notifier for efficient resource use
            notifier <- ByteString -> IO Notifier
forall (m :: * -> *). MonadIO m => ByteString -> m Notifier
Subscription.startNotifier ByteString
connectionString

            -- Create store handle with notifier
            pure $ SQLStoreHandle pool connectionString syncProjRegistry notifier

{- | PostgreSQL 'EventStore' instance.

This instance automatically uses the sync projections configured in the handle.
All insertEvents calls will execute the sync projections that were registered
when the store was created.
-}
instance EventStore SQLStore where
    type StoreConstraints SQLStore m = MonadUnliftIO m

    -- \| Insert events using the sync projection infrastructure.
    --
    -- This function automatically uses the sync projection registry from the handle,
    -- ensuring any registered sync projections are executed during insertion.
    insertEvents ::
        (Traversable t, MonadUnliftIO m) =>
        BackendHandle SQLStore ->
        Maybe CorrelationId ->
        Transaction t SQLStore ->
        m (InsertionResult SQLStore)
    insertEvents :: forall (t :: * -> *) (m :: * -> *).
(Traversable t, MonadUnliftIO m) =>
BackendHandle SQLStore
-> Maybe CorrelationId
-> Transaction t SQLStore
-> m (InsertionResult SQLStore)
insertEvents BackendHandle SQLStore
handle Maybe CorrelationId
corrId Transaction t SQLStore
transaction =
        SQLStoreHandle
-> SyncProjectionRegistry
-> Maybe CorrelationId
-> Transaction t SQLStore
-> m (InsertionResult SQLStore)
forall (t :: * -> *) (m :: * -> *).
(Traversable t, MonadUnliftIO m) =>
SQLStoreHandle
-> SyncProjectionRegistry
-> Maybe CorrelationId
-> Transaction t SQLStore
-> m (InsertionResult SQLStore)
insertEventsWithSyncProjections BackendHandle SQLStore
SQLStoreHandle
handle BackendHandle SQLStore
SQLStoreHandle
handle.syncProjectionRegistry Maybe CorrelationId
corrId Transaction t SQLStore
transaction

    subscribe :: forall (m :: * -> *) (ts :: [Symbol]).
StoreConstraints SQLStore m =>
BackendHandle SQLStore
-> EventMatcher ts SQLStore m
-> EventSelector SQLStore
-> m (SubscriptionHandle SQLStore)
subscribe BackendHandle SQLStore
handle EventMatcher ts SQLStore m
matcher EventSelector SQLStore
selector = SQLStoreHandle
-> EventMatcher ts SQLStore m
-> EventSelector SQLStore
-> m (SubscriptionHandle SQLStore)
forall (m :: * -> *) (ts :: [Symbol]).
MonadUnliftIO m =>
SQLStoreHandle
-> EventMatcher ts SQLStore m
-> EventSelector SQLStore
-> m (SubscriptionHandle SQLStore)
Subscription.subscribe BackendHandle SQLStore
SQLStoreHandle
handle EventMatcher ts SQLStore m
matcher EventSelector SQLStore
selector

{- | Insert events with synchronous projections using a custom registry.

Events and projections are committed atomically in a single PostgreSQL
transaction. If any projection fails, the entire transaction is rolled
back, ensuring consistency.

This function allows you to override the sync projection registry from
the handle. In most cases, you should use the generic 'insertEvents'
function instead, which automatically uses the registry from the handle.

This function is useful for testing or scenarios where you need to use
a different registry than the one configured in the handle.
-}
insertEventsWithSyncProjections ::
    (Traversable t, MonadUnliftIO m) =>
    -- | Store handle with connection pool
    SQLStoreHandle ->
    -- | Registry of sync projections to execute
    SyncProjectionRegistry ->
    -- | Optional correlation ID for event tracking
    Maybe CorrelationId ->
    -- | Transaction containing events to insert
    Transaction t SQLStore ->
    -- | Result indicating success or failure
    m (InsertionResult SQLStore)
insertEventsWithSyncProjections :: forall (t :: * -> *) (m :: * -> *).
(Traversable t, MonadUnliftIO m) =>
SQLStoreHandle
-> SyncProjectionRegistry
-> Maybe CorrelationId
-> Transaction t SQLStore
-> m (InsertionResult SQLStore)
insertEventsWithSyncProjections SQLStoreHandle
handle SyncProjectionRegistry
syncRegistry Maybe CorrelationId
corrId (Transaction Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
batches) = IO (InsertionResult SQLStore) -> m (InsertionResult SQLStore)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (InsertionResult SQLStore) -> m (InsertionResult SQLStore))
-> IO (InsertionResult SQLStore) -> m (InsertionResult SQLStore)
forall a b. (a -> b) -> a -> b
$ do
    -- Create transaction that will insert events and run sync projections
    tx <- SyncProjectionRegistry
-> Maybe CorrelationId
-> Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> IO (Transaction InsertedEvents)
forall (t :: * -> *).
Traversable t =>
SyncProjectionRegistry
-> Maybe CorrelationId
-> Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
-> IO (Transaction InsertedEvents)
Insertion.insertEventsWithSyncProjections SyncProjectionRegistry
syncRegistry Maybe CorrelationId
corrId Map StreamId (StreamWrite t SomeLatestEvent SQLStore)
batches

    -- Execute transaction with proper error handling
    result <- Pool.use handle.pool $ Session.transaction Session.Serializable Session.Write $ do
        -- First check version constraints
        mbError <- checkVersions batches
        case mbError of
            Just ConsistencyErrorInfo SQLStore
err -> Either (EventStoreError SQLStore) InsertedEvents
-> Transaction (Either (EventStoreError SQLStore) InsertedEvents)
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (EventStoreError SQLStore) InsertedEvents
 -> Transaction (Either (EventStoreError SQLStore) InsertedEvents))
-> Either (EventStoreError SQLStore) InsertedEvents
-> Transaction (Either (EventStoreError SQLStore) InsertedEvents)
forall a b. (a -> b) -> a -> b
$ EventStoreError SQLStore
-> Either (EventStoreError SQLStore) InsertedEvents
forall a b. a -> Either a b
Left (EventStoreError SQLStore
 -> Either (EventStoreError SQLStore) InsertedEvents)
-> EventStoreError SQLStore
-> Either (EventStoreError SQLStore) InsertedEvents
forall a b. (a -> b) -> a -> b
$ ConsistencyErrorInfo SQLStore -> EventStoreError SQLStore
forall backend.
ConsistencyErrorInfo backend -> EventStoreError backend
ConsistencyError ConsistencyErrorInfo SQLStore
err
            Maybe (ConsistencyErrorInfo SQLStore)
Nothing -> do
                -- Then insert events (and run projections if applicable)
                InsertedEvents -> Either (EventStoreError SQLStore) InsertedEvents
forall a b. b -> Either a b
Right (InsertedEvents
 -> Either (EventStoreError SQLStore) InsertedEvents)
-> Transaction InsertedEvents
-> Transaction (Either (EventStoreError SQLStore) InsertedEvents)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Transaction InsertedEvents
tx

    -- Convert result to InsertionResult
    case result of
        -- Handle database errors
        Left UsageError
dbError ->
            InsertionResult SQLStore -> IO (InsertionResult SQLStore)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult SQLStore -> IO (InsertionResult SQLStore))
-> InsertionResult SQLStore -> IO (InsertionResult SQLStore)
forall a b. (a -> b) -> a -> b
$
                EventStoreError SQLStore -> InsertionResult SQLStore
forall backend. EventStoreError backend -> InsertionResult backend
FailedInsertion (EventStoreError SQLStore -> InsertionResult SQLStore)
-> EventStoreError SQLStore -> InsertionResult SQLStore
forall a b. (a -> b) -> a -> b
$
                    ErrorInfo -> EventStoreError SQLStore
forall backend. ErrorInfo -> EventStoreError backend
BackendError (ErrorInfo -> EventStoreError SQLStore)
-> ErrorInfo -> EventStoreError SQLStore
forall a b. (a -> b) -> a -> b
$
                        ErrorInfo
                            { errorMessage :: Text
errorMessage = String -> Text
pack String
"Database error: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UsageError -> Text
forall a. Show a => a -> Text
tshow UsageError
dbError
                            , exception :: Maybe SomeException
exception = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (SomeException -> Maybe SomeException)
-> SomeException -> Maybe SomeException
forall a b. (a -> b) -> a -> b
$ UsageError -> SomeException
forall e. (Exception e, HasExceptionContext) => e -> SomeException
SomeException UsageError
dbError
                            }
        -- Handle business errors
        Right (Left EventStoreError SQLStore
err) ->
            InsertionResult SQLStore -> IO (InsertionResult SQLStore)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult SQLStore -> IO (InsertionResult SQLStore))
-> InsertionResult SQLStore -> IO (InsertionResult SQLStore)
forall a b. (a -> b) -> a -> b
$ EventStoreError SQLStore -> InsertionResult SQLStore
forall backend. EventStoreError backend -> InsertionResult backend
FailedInsertion EventStoreError SQLStore
err
        -- Handle success
        Right (Right InsertedEvents
insertedEvents) ->
            InsertionResult SQLStore -> IO (InsertionResult SQLStore)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InsertionResult SQLStore -> IO (InsertionResult SQLStore))
-> InsertionResult SQLStore -> IO (InsertionResult SQLStore)
forall a b. (a -> b) -> a -> b
$
                InsertionSuccess SQLStore -> InsertionResult SQLStore
forall backend. InsertionSuccess backend -> InsertionResult backend
SuccessfulInsertion (InsertionSuccess SQLStore -> InsertionResult SQLStore)
-> InsertionSuccess SQLStore -> InsertionResult SQLStore
forall a b. (a -> b) -> a -> b
$
                    InsertionSuccess
                        { finalCursor :: Cursor SQLStore
finalCursor = InsertedEvents -> SQLCursor
Insertion.finalCursor InsertedEvents
insertedEvents
                        , streamCursors :: Map StreamId (Cursor SQLStore)
streamCursors = InsertedEvents -> Map StreamId SQLCursor
Insertion.streamCursors InsertedEvents
insertedEvents
                        }

{- | Shutdown the SQL store gracefully

This function should be called before releasing the connection pool.
It shuts down the event dispatcher used by all subscriptions.
-}
shutdownSQLStore :: SQLStoreHandle -> IO ()
shutdownSQLStore :: SQLStoreHandle -> IO ()
shutdownSQLStore SQLStoreHandle
handle = do
    -- Shutdown the notifier
    Notifier -> IO ()
Subscription.shutdownNotifier SQLStoreHandle
handle.notifier

-- | Convert showable values to 'Text'.
tshow :: (Show a) => a -> Text
tshow :: forall a. Show a => a -> Text
tshow = String -> Text
pack (String -> Text) -> (a -> String) -> a -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> String
forall a. Show a => a -> String
show