Class OutboxPublisher
Invocation contract:
- Caller is inside an active Spring-managed
@Transactionalmethod. - Caller invokes
publish(String, Function, int, UUID)(the preferred deferred-serialisation overload that keeps the payload's embeddedevent_idin lockstep with the row's) or the deprecatedpublish(String, byte[], int, UUID)byte[] overload. The method persists anOutboxRecordrow in statusOutboxRecord.Status.PENDINGvia the repository — this INSERT shares the business transaction so the row and the business state commit atomically. - The method registers a Spring
TransactionSynchronization.afterCommit()hook that attempts the Kafka send (the "hot relay"). Success transitions the row toOutboxRecord.Status.SENT; failure leaves it PENDING for the coldOutboxPollerWorkerto retry.
Per ADR-0014 D-9 the relay is NOT transactional EOS — the poller-owned
SENT transition is the source of truth. At-least-once delivery is the
library guarantee; effective exactly-once is achieved via consumer-side
dedup in processed-kafka-events (PR-PLATFORM-2).
The hot path is wrapped in a Resilience4j CircuitBreaker
(im2be-outbox-relay) so a degraded broker doesn't slow down the
business transaction's commit-time critical section (the AFTER_COMMIT
callback runs on the request thread). When the breaker is OPEN, the hot
publish short-circuits — the row stays PENDING and is picked up by the
next poller tick.
An OTel span im2be.outbox.publish.hot is recorded per
hot-publish attempt with attributes topic, event.id,
aggregate.id, outcome.
-
Field Summary
Fields -
Constructor Summary
ConstructorsConstructorDescriptionOutboxPublisher(OutboxBackend backend, org.springframework.kafka.core.KafkaTemplate<byte[], byte[]> kafkaTemplate, io.github.resilience4j.circuitbreaker.CircuitBreaker circuitBreaker, io.opentelemetry.api.OpenTelemetry openTelemetry, OutboxMetricsBinder metrics, int maxRetries) Constructs an OutboxPublisher with all collaborators autowired byOutboxAutoConfiguration. -
Method Summary
Modifier and TypeMethodDescriptionDeprecated.Atomically inserts an outbox row inOutboxRecord.Status.PENDINGand registers an AFTER_COMMIT hook that attempts the Kafka send fire-and-forget — accepting a deferred-serialisationFunction<UUID, byte[]>so the payload's embeddedevent_idfield stays in lockstep with the outbox row'sevent_id(Memora #3555 / #3556).
-
Field Details
-
CIRCUIT_BREAKER_NAME
Resilience4jCircuitBreakerinstance name. Consumers may customise threshold + wait-duration via theim2be.outbox.circuit-breaker.*properties; the breaker itself is registered inOutboxAutoConfiguration.- See Also:
-
-
Constructor Details
-
OutboxPublisher
public OutboxPublisher(OutboxBackend backend, org.springframework.kafka.core.KafkaTemplate<byte[], byte[]> kafkaTemplate, io.github.resilience4j.circuitbreaker.CircuitBreaker circuitBreaker, io.opentelemetry.api.OpenTelemetry openTelemetry, OutboxMetricsBinder metrics, int maxRetries) Constructs an OutboxPublisher with all collaborators autowired byOutboxAutoConfiguration.Wave A.2 Phase 0a — the former
repository+completioncollaborators are now collapsed behind theOutboxBackendstorage SPI (design doc refinement #11). For the defaultPgOutboxBackendthe persist + transition semantics are identical to the pre-extraction code (zero behaviour change); a future Redis backend (Phase 0b) plugs in here unchanged.- Parameters:
backend- storage SPI — persists the PENDING row and drives themarkSent/markFailureAttempttransitions (the default PG impl wraps the JPA repository + theREQUIRES_NEWcompletion helper)kafkaTemplate- Spring Kafka template (byte-key, byte-value) for sending Avro-encoded payloadscircuitBreaker- Resilience4j breaker around the hot-publish callopenTelemetry- OpenTelemetry instance for tracer + metrics (autoconfig falls back toOpenTelemetry.noop()when no real SDK is present)metrics- metrics binder for latency/failure countersmaxRetries- retry budget fromim2be.outbox.poller.max-retries— passed through toOutboxBackend.markFailureAttempt(UUID, UUID, String, int)so the hot-relay path uses the same atomic budget evaluation as the cold poller (R5 fix — eliminates the stale-retries snapshot race)
-
-
Method Details
-
publish
@Deprecated(since="1.1.0", forRemoval=false) public UUID publish(String topic, byte[] payload, int schemaVersion, UUID aggregateId) Deprecated.since v1.1.0 — usepublish(String, Function, int, UUID)instead. The byte[] overload generates the outbox row's eventId internally, leaving the caller no opportunity to embed the same id in the payload — operators cannot correlate Kafka messages back to outbox rows by event_id (Memora #3555). The new overload accepts a deferred-serialisationFunction<UUID, byte[]>that receives the outbox-issued eventId, keeping the payload's embedded event_id in lockstep with the row's. Retained (forRemoval=false) for callers serialising opaque already-encoded bytes where the payload schema has NO event_id field.Atomically inserts an outbox row inOutboxRecord.Status.PENDINGand registers an AFTER_COMMIT hook that attempts the Kafka send fire-and-forget.Per ADR-0014 D-9 the relay is NOT transactional EOS — when the AFTER_COMMIT send fails or the circuit-breaker is OPEN, the row is left PENDING; the poller (cold path) owns recovery.
The supplied
payloadis defensively cloned on entry; the library owns the cloned bytes from this point forward (they back both the persistedpayload_bytescolumn and the AFTER_COMMIT closure). The caller MAY mutate or reuse its original array after this method returns without affecting what is persisted or published.- Parameters:
topic- Kafka topic name (not null)payload- Avro-encoded event bytes including the Apicurio global-id prefix (not null, not empty)schemaVersion- Avroschema_versionfield; mirrored on the row for operator triageaggregateId- aggregate identifier — partition-key hint passed to Kafka (not null)- Returns:
- the generated
eventId(UUIDv7); callers may persist or log this for correlation - Throws:
IllegalStateException- when invoked outside an active transaction — the outbox INSERT MUST share the business transaction boundary; otherwise the row commits independently and breaks atomicityIllegalArgumentException- whenpayloadis zero-length — a zero-byte event is never a valid outbox payload (symmetric with theFunctionoverload's guard)
-
publish
public UUID publish(String topic, Function<UUID, byte[]> payloadFn, int schemaVersion, UUID aggregateId) Atomically inserts an outbox row inOutboxRecord.Status.PENDINGand registers an AFTER_COMMIT hook that attempts the Kafka send fire-and-forget — accepting a deferred-serialisationFunction<UUID, byte[]>so the payload's embeddedevent_idfield stays in lockstep with the outbox row'sevent_id(Memora #3555 / #3556).Per ADR-0014 D-9 the relay is NOT transactional EOS — when the AFTER_COMMIT send fails or the circuit-breaker is OPEN, the row is left PENDING; the poller (cold path) owns recovery.
Invocation timing.
payloadFnis invoked synchronously on the calling thread, BEFORE the outbox row is persisted and BEFORE the AFTER_COMMIT hook is registered.Invocation count.
payloadFnis invoked exactly once perpublishcall. The library does NOT retry serialisation, does NOT cache the result, and does NOT invokepayloadFnfrom the AFTER_COMMIT hook or the poller — the bytes it returns are captured once and reused for both the persisted column and the hot relay.Lockstep invariant.
payloadFnMUST embed the suppliedeventIdin the returned payload'sevent_idfield. Failing to do so defeats the purpose of this overload and reproduces the Memora #3555 bug (Kafka ↔ outbox correlation byevent_idis broken).Thread-safety.
payloadFnis invoked on the calling thread only; it is never invoked on a Kafka client thread nor from the AFTER_COMMIT hook.Purity.
payloadFnSHOULD complete promptly — it executes inside the business transaction's critical section; avoid I/O, blocking calls, and long computation. It MAY read from the current persistence context, but MUST NOT issue writes via a separate persistence context or aREQUIRES_NEWboundary.Exception semantics. Any exception thrown by
payloadFnpropagates unmodified to the caller; the outbox row is NOT persisted, the AFTER_COMMIT hook is NOT registered, and the enclosing@Transactionalrolls back per the caller'srollbackFor(Spring's default:RuntimeException+Error). The library does NOT catch or rewrap the exception.Return value + defensive clone. Returns the outbox-issued
eventId(the sameUUIDpassed topayloadFn). The byte[] returned bypayloadFnis defensively cloned, so the caller MAY mutate or reuse it after this method returns without affecting what is persisted or published.- Parameters:
topic- Kafka topic name (not null)payloadFn- deferred serialiser receiving the outbox-issuedeventIdand returning the Avro/JSON-encoded event bytes (not null; must return a non-null, non-empty array)schemaVersion- Avroschema_versionfield; mirrored on the row for operator triageaggregateId- aggregate identifier — partition-key hint passed to Kafka (not null)- Returns:
- the outbox-issued
eventId(UUIDv7); identical to the value passed topayloadFn - Throws:
IllegalStateException- when invoked outside an active transaction — the outbox INSERT MUST share the business transaction boundary; otherwise the row commits independently and breaks atomicityNullPointerException- whenpayloadFnreturnsnullIllegalArgumentException- whenpayloadFnreturns a zero-length array
-
publish(String, Function, int, UUID)instead.