Class InMemoryOutboxRecordRepository
OutboxRecordRepository that replaces the JPA-backed
repository in test scope. Backed by a ConcurrentHashMap keyed on
OutboxRecord.PK and a TestOutboxRecordCaptor that records
every save(...) for downstream assertions.
The repository surface mirrors the production JPA repository's contract
for the methods the outbox-publisher hot relay + cold poller call:
save(OutboxRecord)— persists and captures.findById(OutboxRecord.PK)— straight lookup.markSent(UUID, UUID, Instant)— status-guardedPENDING → SENTtransition mirroring the JPQL UPDATE semantics; returns1on applied transition or0on no-op (when the row is already terminal).markFailureAttempt(UUID, UUID, String, int)— status-guarded retries increment with the sameCASE WHENbudget-exhaustion semantics as the production JPQL: whenretries + 1 >= maxRetries, the row transitions toOutboxRecord.Status.FAILEDin the same operation.findByStatusOrderByCreatedAtAsc(OutboxRecord.Status, Pageable)— FIFO scan with pagination.countByStatus(OutboxRecord.Status)— total row count for a given status.findOldestCreatedAtByStatus(OutboxRecord.Status)— minimumcreatedAtfor a given status (drives theim2be_outbox_backlog_age_secondsgauge).deleteAll()/saveAndFlush(OutboxRecord)— convenience helpers exercised by the integration test path.
Methods inherited from JpaRepository
but NOT used by the outbox library throw
UnsupportedOperationException via the JDK dynamic-proxy fall-through
in create(TestOutboxRecordCaptor). Downstream tests that need a
specific inherited method should extend this class via its protected
constructor and override the fall-through.
Thread-safety: the backing ConcurrentMap guarantees
thread-safe puts and gets; status transitions
(markSent, markFailureAttempt) use
ConcurrentMap#computeIfPresent so the read-modify-write sequence
on a single PK is atomic with respect to the map's presence check +
apply (no torn updates between concurrent computeIfPresent
callers for the same key). This is NOT, however, equivalent to the JPA
path's row-level
SELECT ... FOR UPDATE: the stored OutboxRecord is a
mutable reference shared with any prior findById caller, so a
concurrent reader can observe a row mid-transition (e.g.
status=SENT but sentAt=null) — production JPA
row-level locking would block the reader until the writer's transaction
commits. Test code MUST treat any OutboxRecord returned by
findById or the captor's snapshots as a moment-in-time view, not
a frozen snapshot (R1 finding — previous Javadoc overstated the
row-locking equivalence).
-
Constructor Summary
ConstructorsModifierConstructorDescriptionprotectedCreates an in-memory repository wiring captures tocaptor. -
Method Summary
Modifier and TypeMethodDescriptionlongcount()Returns the total number of stored rows (any status).longcountByStatus(OutboxRecord.Status status) Returns the total row count for a given status.static OutboxRecordRepositorycreate(TestOutboxRecordCaptor captor) Factory: returns a dynamic proxy that exposes the in-memory repository as anOutboxRecordRepository.voidClears the in-memory backing store.Returns the row for the compound PK, or empty when absent.org.springframework.data.domain.Page<OutboxRecord>findByStatusOrderByCreatedAtAsc(OutboxRecord.Status status, org.springframework.data.domain.Pageable pageable) Returns rows instatus, sorted bycreatedAtascending, paginated perpageable.Returns the minimumcreatedAtfor rows instatus, or empty when no rows match.voidflush()JPA flush no-op for in-memory backing.intmarkFailureAttempt(UUID aggregateId, UUID eventId, String lastError, int maxRetries) Atomic status-guarded retry-bump on publish failure.intAtomic status-guardedPENDING → SENTtransition mirroring the production JPQL UPDATE.save(OutboxRecord record) Persists a row and captures it for assertions.saveAndFlush(OutboxRecord record) Variant ofsave(OutboxRecord)that mirrors the JPAsaveAndFlushsemantics (in-memory has no buffer, so flush is a no-op).
-
Constructor Details
-
InMemoryOutboxRecordRepository
Creates an in-memory repository wiring captures tocaptor.- Parameters:
captor- the test captor; non-null
-
-
Method Details
-
create
Factory: returns a dynamic proxy that exposes the in-memory repository as anOutboxRecordRepository. The proxy intercepts every interface method, dispatches the supported methods to this class, and throwsUnsupportedOperationExceptionfor the rest.- Parameters:
captor- the test captor; non-null- Returns:
- a proxy implementing
OutboxRecordRepository
-
save
Persists a row and captures it for assertions. MirrorsCrudRepository.save(Object).- Parameters:
record- the row to persist; non-null with non-null compound PK- Returns:
- the persisted row (same instance)
-
saveAndFlush
Variant ofsave(OutboxRecord)that mirrors the JPAsaveAndFlushsemantics (in-memory has no buffer, so flush is a no-op).- Parameters:
record- the row to persist; non-null- Returns:
- the persisted row (same instance)
-
findById
Returns the row for the compound PK, or empty when absent.- Parameters:
pk- the compound primary key; non-null- Returns:
- the row, or empty
-
findByStatusOrderByCreatedAtAsc
public org.springframework.data.domain.Page<OutboxRecord> findByStatusOrderByCreatedAtAsc(OutboxRecord.Status status, org.springframework.data.domain.Pageable pageable) Returns rows instatus, sorted bycreatedAtascending, paginated perpageable.- Parameters:
status- target status; non-nullpageable- bounded page; non-null- Returns:
- matching rows in FIFO creation order
-
countByStatus
Returns the total row count for a given status.- Parameters:
status- target status; non-null- Returns:
- total matching rows
-
findOldestCreatedAtByStatus
Returns the minimumcreatedAtfor rows instatus, or empty when no rows match.- Parameters:
status- target status; non-null- Returns:
- oldest
createdAtor empty
-
markSent
Atomic status-guardedPENDING → SENTtransition mirroring the production JPQL UPDATE. TheConcurrentMap#computeIfPresentcall serialises concurrent transitions on the same row, so a late second-writer that races with another success path observes the row already inOutboxRecord.Status.SENTand returns rowcount 0.- Parameters:
aggregateId- compound-PK component; non-nulleventId- compound-PK component; non-nullsentAt- wall-clock instant; non-null- Returns:
1on applied transition;0on no-op (row absent or already terminal)
-
markFailureAttempt
Atomic status-guarded retry-bump on publish failure. Mirrors the production JPQL UPDATE'sCASE WHENbudget-exhaustion semantics: when the post-increment retries count reachesmaxRetries, the row transitions toOutboxRecord.Status.FAILEDin the same operation.- Parameters:
aggregateId- compound-PK component; non-nulleventId- compound-PK component; non-nulllastError- error message; truncated byOutboxRecord.setLastError(String)when overOutboxRecord.LAST_ERROR_MAX_LENGTHcharsmaxRetries- retry budget; row transitions to FAILED whenretries + 1 >= maxRetries- Returns:
1on applied increment;0on no-op (row absent or already terminal)
-
deleteAll
public void deleteAll()Clears the in-memory backing store. Used between tests that share a Spring context.Does NOT touch the
TestOutboxRecordCaptor. The captor is a separate test-observation surface — tests that want to reset captures across a shared context must callTestOutboxRecordCaptor.clear()explicitly (R1 finding — previous Javadoc claimed "every captured row + the backing store" but the implementation only clearsrows). -
count
public long count()Returns the total number of stored rows (any status).- Returns:
- row count
-
flush
public void flush()JPA flush no-op for in-memory backing. Present so the proxy can routeJpaRepository#flush()without throwing.
-