Class PgOutboxBackend

java.lang.Object
com.aim2be.platform.outbox.PgOutboxBackend
All Implemented Interfaces:
OutboxBackend

public class PgOutboxBackend extends Object implements OutboxBackend
Default Postgres-backed OutboxBackend — the verbatim PG storage logic extracted from OutboxPublisher + OutboxPollerWorker during Wave A.2 Phase 0a (design doc .planning/32-wave-a2-redis-outbox-design.md refinement #11) with ZERO behaviour change.

This implementation is a thin adapter over the two collaborators that already own the PG semantics:

  • OutboxRecordRepository — JPA persistence + the status-guarded JPQL transition queries + the FIFO PENDING-batch fetch.
  • OutboxPublishCompletion — the REQUIRES_NEW transactional wrapper around markSent / markFailureAttempt (required by the AFTER_COMMIT hot-relay path, where the business transaction is already committed; see that class's Javadoc).

The transition methods delegate to OutboxPublishCompletion (NOT directly to the repository) so the REQUIRES_NEW transaction proxy still intercepts each call exactly as before the extraction — preserving the relay's post-commit transaction boundary. They propagate the JPQL rowcount returned by OutboxPublishCompletion (which in turn returns the @Modifying query's rowcount), so the SPI's documented 1 = applied / 0 = already-terminal no-op contract is honoured truthfully — the Phase 0b Redis backend returns the same 0/1 for racing terminal transitions, keeping the contract uniform across backends. The orchestration layer does not currently branch on the rowcount, so this is behaviour-identical to the pre-extraction path.

requiresActiveTransaction() returns true: the PG persist MUST share the caller's business transaction (ADR-0014 D-7) so the row and the business state commit atomically. OutboxPublisher keeps its active-transaction guard active for this backend — behaviour identical to the pre-extraction code.

  • Constructor Details

    • PgOutboxBackend

      public PgOutboxBackend(OutboxRecordRepository repository, OutboxPublishCompletion completion)
      Constructs the PG backend.
      Parameters:
      repository - JPA repository for persistence + FIFO batch fetch; the status-guarded transition queries are invoked indirectly via completion
      completion - the REQUIRES_NEW transactional helper wrapping the markSent / markFailureAttempt JPQL transitions (preserves the AFTER_COMMIT relay's post-commit transaction boundary)
  • Method Details

    • persistPending

      public void persistPending(OutboxRecord record)
      Persists a freshly-minted outbox row in OutboxRecord.Status.PENDING.

      For the PG backend this is the JPA repository.save(record) that shares the caller's active business transaction (the row commits atomically with the business state). The orchestration layer has already populated the row's fields (aggregateId, eventId, status, topic, payload, schemaVersion, retries=0, createdAt) and defensively cloned the payload bytes; the backend stores the record as-is and does not mutate it.

      Delegates to repository.save(record) — the INSERT shares the caller's active business transaction (verbatim from OutboxPublisher's former persistAndRegisterRelay).

      Specified by:
      persistPending in interface OutboxBackend
      Parameters:
      record - the PENDING row to persist; never null
    • findPendingBatch

      public List<OutboxRecord> findPendingBatch(int batchSize)
      Fetches up to batchSize OutboxRecord.Status.PENDING rows in FIFO (createdAt ASC) order for the cold poller's sweep.

      For the PG backend this delegates to OutboxRecordRepository.findByStatusOrderByCreatedAtAsc(OutboxRecord.Status, org.springframework.data.domain.Pageable) with a single bounded page; the (status, created_at) compound index keeps the scan O(batchSize), not O(table size).

      Delegates to OutboxRecordRepository.findByStatusOrderByCreatedAtAsc(OutboxRecord.Status, org.springframework.data.domain.Pageable) with a single bounded page — verbatim from OutboxPollerWorker's former batch fetch.

      Specified by:
      findPendingBatch in interface OutboxBackend
      Parameters:
      batchSize - maximum number of rows to return (the poller passes im2be.outbox.poller.batch-size)
      Returns:
      a list of PENDING rows in FIFO creation order; empty when none match, never null
    • markSent

      public int markSent(UUID aggregateId, UUID eventId, Instant sentAt)
      Transitions a row to OutboxRecord.Status.SENT, clearing any prior lastError. Idempotent: when the row is already terminal (SENT, FAILED, or ARCHIVED) the status-guarded write affects zero rows and the method returns 0 without throwing — the expected outcome when a delayed hot-relay callback fires after the cold poller already transitioned the row (or vice versa).

      Mirrors OutboxPublishCompletion.markSent(UUID, UUID, Instant).

      Delegates to OutboxPublishCompletion.markSent(UUID, UUID, Instant) so the REQUIRES_NEW transaction proxy intercepts the call exactly as before the extraction, and propagates that helper's rowcount (the @Modifying query result): 1 when the PENDING→SENT transition applied, 0 when the row was already terminal — exactly the SPI contract.

      Specified by:
      markSent in interface OutboxBackend
      Parameters:
      aggregateId - compound-PK component
      eventId - compound-PK component
      sentAt - wall-clock instant at which the publish succeeded
      Returns:
      number of rows updated (1 on applied transition; 0 when the row was already terminal)
    • markFailureAttempt

      public int markFailureAttempt(UUID aggregateId, UUID eventId, String lastError, int maxRetries)
      Records a publish-failure attempt: increments the row's retries counter, stores the (pre-truncated) error message, AND atomically transitions the row to OutboxRecord.Status.FAILED when the post-increment retries count reaches maxRetries. The budget is evaluated against the store's CURRENT retries value at write time, so the orchestration layer never decides the budget against a stale in-memory snapshot (ADR-0014 D-9 / R5 fix).

      Status-guarded like OutboxBackend.markSent(UUID, UUID, Instant): a late failure callback racing a terminal transition is a rowcount = 0 no-op.

      Mirrors OutboxPublishCompletion.markFailureAttempt(UUID, UUID, String, int).

      Delegates to OutboxPublishCompletion.markFailureAttempt(UUID, UUID, String, int) so the REQUIRES_NEW transaction proxy intercepts the call and the atomic retry-budget JPQL evaluation runs exactly as before, and propagates that helper's rowcount (1 = increment applied, 0 = already-terminal no-op) per the SPI contract.

      Specified by:
      markFailureAttempt in interface OutboxBackend
      Parameters:
      aggregateId - compound-PK component
      eventId - compound-PK component
      lastError - pre-truncated error message (caller respects OutboxRecord.LAST_ERROR_MAX_LENGTH)
      maxRetries - retry budget from im2be.outbox.poller.max-retries
      Returns:
      number of rows updated (1 on applied increment; 0 when the row was already terminal)
    • requiresActiveTransaction

      public boolean requiresActiveTransaction()
      Whether OutboxBackend.persistPending(OutboxRecord) requires an active Spring-managed transaction to be in progress on the calling thread.

      The PG backend returns true — its persist shares the caller's business transaction so the outbox row and the business state commit atomically (ADR-0014 D-7); OutboxPublisher enforces this with an isActualTransactionActive() guard. A non-relational backend that achieves atomicity without a Spring transaction returns false, relaxing that guard.

      Always true: the PG persist shares the caller's business transaction (ADR-0014 D-7).

      Specified by:
      requiresActiveTransaction in interface OutboxBackend
      Returns:
      true when the publisher MUST run inside an active transaction (PG default), false when atomicity is achieved outside Spring transaction management