Class PgOutboxBackend
- All Implemented Interfaces:
OutboxBackend
OutboxBackend — the verbatim PG storage logic
extracted from OutboxPublisher + OutboxPollerWorker during
Wave A.2 Phase 0a (design doc .planning/32-wave-a2-redis-outbox-design.md
refinement #11) with ZERO behaviour change.
This implementation is a thin adapter over the two collaborators that already own the PG semantics:
OutboxRecordRepository— JPA persistence + the status-guarded JPQL transition queries + the FIFO PENDING-batch fetch.OutboxPublishCompletion— theREQUIRES_NEWtransactional wrapper aroundmarkSent/markFailureAttempt(required by the AFTER_COMMIT hot-relay path, where the business transaction is already committed; see that class's Javadoc).
The transition methods delegate to OutboxPublishCompletion (NOT
directly to the repository) so the REQUIRES_NEW transaction proxy
still intercepts each call exactly as before the extraction — preserving the
relay's post-commit transaction boundary. They propagate the JPQL rowcount
returned by OutboxPublishCompletion (which in turn returns the
@Modifying query's rowcount), so the SPI's documented 1 =
applied / 0 = already-terminal no-op contract is honoured truthfully
— the Phase 0b Redis backend returns the same 0/1 for racing
terminal transitions, keeping the contract uniform across backends. The
orchestration layer does not currently branch on the rowcount, so this is
behaviour-identical to the pre-extraction path.
requiresActiveTransaction() returns true: the PG persist
MUST share the caller's business transaction (ADR-0014 D-7) so the row and
the business state commit atomically. OutboxPublisher keeps its
active-transaction guard active for this backend — behaviour identical to the
pre-extraction code.
-
Constructor Summary
ConstructorsConstructorDescriptionPgOutboxBackend(OutboxRecordRepository repository, OutboxPublishCompletion completion) Constructs the PG backend. -
Method Summary
Modifier and TypeMethodDescriptionfindPendingBatch(int batchSize) Fetches up tobatchSizeOutboxRecord.Status.PENDINGrows in FIFO (createdAt ASC) order for the cold poller's sweep.intmarkFailureAttempt(UUID aggregateId, UUID eventId, String lastError, int maxRetries) Records a publish-failure attempt: increments the row'sretriescounter, stores the (pre-truncated) error message, AND atomically transitions the row toOutboxRecord.Status.FAILEDwhen the post-increment retries count reachesmaxRetries.intTransitions a row toOutboxRecord.Status.SENT, clearing any priorlastError.voidpersistPending(OutboxRecord record) Persists a freshly-minted outbox row inOutboxRecord.Status.PENDING.booleanWhetherOutboxBackend.persistPending(OutboxRecord)requires an active Spring-managed transaction to be in progress on the calling thread.
-
Constructor Details
-
PgOutboxBackend
Constructs the PG backend.- Parameters:
repository- JPA repository for persistence + FIFO batch fetch; the status-guarded transition queries are invoked indirectly viacompletioncompletion- theREQUIRES_NEWtransactional helper wrapping themarkSent/markFailureAttemptJPQL transitions (preserves the AFTER_COMMIT relay's post-commit transaction boundary)
-
-
Method Details
-
persistPending
Persists a freshly-minted outbox row inOutboxRecord.Status.PENDING.For the PG backend this is the JPA
repository.save(record)that shares the caller's active business transaction (the row commits atomically with the business state). The orchestration layer has already populated the row's fields (aggregateId, eventId, status, topic, payload, schemaVersion, retries=0, createdAt) and defensively cloned the payload bytes; the backend stores the record as-is and does not mutate it.Delegates to
repository.save(record)— the INSERT shares the caller's active business transaction (verbatim fromOutboxPublisher's formerpersistAndRegisterRelay).- Specified by:
persistPendingin interfaceOutboxBackend- Parameters:
record- the PENDING row to persist; nevernull
-
findPendingBatch
Fetches up tobatchSizeOutboxRecord.Status.PENDINGrows in FIFO (createdAt ASC) order for the cold poller's sweep.For the PG backend this delegates to
OutboxRecordRepository.findByStatusOrderByCreatedAtAsc(OutboxRecord.Status, org.springframework.data.domain.Pageable)with a single bounded page; the(status, created_at)compound index keeps the scan O(batchSize), not O(table size).Delegates to
OutboxRecordRepository.findByStatusOrderByCreatedAtAsc(OutboxRecord.Status, org.springframework.data.domain.Pageable)with a single bounded page — verbatim fromOutboxPollerWorker's former batch fetch.- Specified by:
findPendingBatchin interfaceOutboxBackend- Parameters:
batchSize- maximum number of rows to return (the poller passesim2be.outbox.poller.batch-size)- Returns:
- a list of PENDING rows in FIFO creation order; empty when none
match, never
null
-
markSent
Transitions a row toOutboxRecord.Status.SENT, clearing any priorlastError. Idempotent: when the row is already terminal (SENT, FAILED, or ARCHIVED) the status-guarded write affects zero rows and the method returns0without throwing — the expected outcome when a delayed hot-relay callback fires after the cold poller already transitioned the row (or vice versa).Mirrors
OutboxPublishCompletion.markSent(UUID, UUID, Instant).Delegates to
OutboxPublishCompletion.markSent(UUID, UUID, Instant)so theREQUIRES_NEWtransaction proxy intercepts the call exactly as before the extraction, and propagates that helper's rowcount (the@Modifyingquery result):1when the PENDING→SENT transition applied,0when the row was already terminal — exactly the SPI contract.- Specified by:
markSentin interfaceOutboxBackend- Parameters:
aggregateId- compound-PK componenteventId- compound-PK componentsentAt- wall-clock instant at which the publish succeeded- Returns:
- number of rows updated (1 on applied transition; 0 when the row was already terminal)
-
markFailureAttempt
Records a publish-failure attempt: increments the row'sretriescounter, stores the (pre-truncated) error message, AND atomically transitions the row toOutboxRecord.Status.FAILEDwhen the post-increment retries count reachesmaxRetries. The budget is evaluated against the store's CURRENT retries value at write time, so the orchestration layer never decides the budget against a stale in-memory snapshot (ADR-0014 D-9 / R5 fix).Status-guarded like
OutboxBackend.markSent(UUID, UUID, Instant): a late failure callback racing a terminal transition is arowcount = 0no-op.Mirrors
OutboxPublishCompletion.markFailureAttempt(UUID, UUID, String, int).Delegates to
OutboxPublishCompletion.markFailureAttempt(UUID, UUID, String, int)so theREQUIRES_NEWtransaction proxy intercepts the call and the atomic retry-budget JPQL evaluation runs exactly as before, and propagates that helper's rowcount (1= increment applied,0= already-terminal no-op) per the SPI contract.- Specified by:
markFailureAttemptin interfaceOutboxBackend- Parameters:
aggregateId- compound-PK componenteventId- compound-PK componentlastError- pre-truncated error message (caller respectsOutboxRecord.LAST_ERROR_MAX_LENGTH)maxRetries- retry budget fromim2be.outbox.poller.max-retries- Returns:
- number of rows updated (1 on applied increment; 0 when the row was already terminal)
-
requiresActiveTransaction
public boolean requiresActiveTransaction()WhetherOutboxBackend.persistPending(OutboxRecord)requires an active Spring-managed transaction to be in progress on the calling thread.The PG backend returns
true— its persist shares the caller's business transaction so the outbox row and the business state commit atomically (ADR-0014 D-7);OutboxPublisherenforces this with anisActualTransactionActive()guard. A non-relational backend that achieves atomicity without a Spring transaction returnsfalse, relaxing that guard.Always
true: the PG persist shares the caller's business transaction (ADR-0014 D-7).- Specified by:
requiresActiveTransactionin interfaceOutboxBackend- Returns:
truewhen the publisher MUST run inside an active transaction (PG default),falsewhen atomicity is achieved outside Spring transaction management
-