Class OutboxPublisher

java.lang.Object
com.aim2be.platform.outbox.OutboxPublisher

public class OutboxPublisher extends Object
Transactional outbox publisher implementing ADR-0014 D-1 + D-9 (dual-mode relay).

Invocation contract:

  1. Caller is inside an active Spring-managed @Transactional method.
  2. Caller invokes publish(String, Function, int, UUID) (the preferred deferred-serialisation overload that keeps the payload's embedded event_id in lockstep with the row's) or the deprecated publish(String, byte[], int, UUID) byte[] overload. The method persists an OutboxRecord row in status OutboxRecord.Status.PENDING via the repository — this INSERT shares the business transaction so the row and the business state commit atomically.
  3. The method registers a Spring TransactionSynchronization.afterCommit() hook that attempts the Kafka send (the "hot relay"). Success transitions the row to OutboxRecord.Status.SENT; failure leaves it PENDING for the cold OutboxPollerWorker to retry.

Per ADR-0014 D-9 the relay is NOT transactional EOS — the poller-owned SENT transition is the source of truth. At-least-once delivery is the library guarantee; effective exactly-once is achieved via consumer-side dedup in processed-kafka-events (PR-PLATFORM-2).

The hot path is wrapped in a Resilience4j CircuitBreaker (im2be-outbox-relay) so a degraded broker doesn't slow down the business transaction's commit-time critical section (the AFTER_COMMIT callback runs on the request thread). When the breaker is OPEN, the hot publish short-circuits — the row stays PENDING and is picked up by the next poller tick.

An OTel span im2be.outbox.publish.hot is recorded per hot-publish attempt with attributes topic, event.id, aggregate.id, outcome.

  • Field Details

    • CIRCUIT_BREAKER_NAME

      public static final String CIRCUIT_BREAKER_NAME
      Resilience4j CircuitBreaker instance name. Consumers may customise threshold + wait-duration via the im2be.outbox.circuit-breaker.* properties; the breaker itself is registered in OutboxAutoConfiguration.
      See Also:
  • Constructor Details

    • OutboxPublisher

      public OutboxPublisher(OutboxBackend backend, org.springframework.kafka.core.KafkaTemplate<byte[],byte[]> kafkaTemplate, io.github.resilience4j.circuitbreaker.CircuitBreaker circuitBreaker, io.opentelemetry.api.OpenTelemetry openTelemetry, OutboxMetricsBinder metrics, int maxRetries)
      Constructs an OutboxPublisher with all collaborators autowired by OutboxAutoConfiguration.

      Wave A.2 Phase 0a — the former repository + completion collaborators are now collapsed behind the OutboxBackend storage SPI (design doc refinement #11). For the default PgOutboxBackend the persist + transition semantics are identical to the pre-extraction code (zero behaviour change); a future Redis backend (Phase 0b) plugs in here unchanged.

      Parameters:
      backend - storage SPI — persists the PENDING row and drives the markSent / markFailureAttempt transitions (the default PG impl wraps the JPA repository + the REQUIRES_NEW completion helper)
      kafkaTemplate - Spring Kafka template (byte-key, byte-value) for sending Avro-encoded payloads
      circuitBreaker - Resilience4j breaker around the hot-publish call
      openTelemetry - OpenTelemetry instance for tracer + metrics (autoconfig falls back to OpenTelemetry.noop() when no real SDK is present)
      metrics - metrics binder for latency/failure counters
      maxRetries - retry budget from im2be.outbox.poller.max-retries — passed through to OutboxBackend.markFailureAttempt(UUID, UUID, String, int) so the hot-relay path uses the same atomic budget evaluation as the cold poller (R5 fix — eliminates the stale-retries snapshot race)
  • Method Details

    • publish

      @Deprecated(since="1.1.0", forRemoval=false) public UUID publish(String topic, byte[] payload, int schemaVersion, UUID aggregateId)
      Deprecated.
      since v1.1.0 — use publish(String, Function, int, UUID) instead. The byte[] overload generates the outbox row's eventId internally, leaving the caller no opportunity to embed the same id in the payload — operators cannot correlate Kafka messages back to outbox rows by event_id (Memora #3555). The new overload accepts a deferred-serialisation Function<UUID, byte[]> that receives the outbox-issued eventId, keeping the payload's embedded event_id in lockstep with the row's. Retained (forRemoval=false) for callers serialising opaque already-encoded bytes where the payload schema has NO event_id field.
      Atomically inserts an outbox row in OutboxRecord.Status.PENDING and registers an AFTER_COMMIT hook that attempts the Kafka send fire-and-forget.

      Per ADR-0014 D-9 the relay is NOT transactional EOS — when the AFTER_COMMIT send fails or the circuit-breaker is OPEN, the row is left PENDING; the poller (cold path) owns recovery.

      The supplied payload is defensively cloned on entry; the library owns the cloned bytes from this point forward (they back both the persisted payload_bytes column and the AFTER_COMMIT closure). The caller MAY mutate or reuse its original array after this method returns without affecting what is persisted or published.

      Parameters:
      topic - Kafka topic name (not null)
      payload - Avro-encoded event bytes including the Apicurio global-id prefix (not null, not empty)
      schemaVersion - Avro schema_version field; mirrored on the row for operator triage
      aggregateId - aggregate identifier — partition-key hint passed to Kafka (not null)
      Returns:
      the generated eventId (UUIDv7); callers may persist or log this for correlation
      Throws:
      IllegalStateException - when invoked outside an active transaction — the outbox INSERT MUST share the business transaction boundary; otherwise the row commits independently and breaks atomicity
      IllegalArgumentException - when payload is zero-length — a zero-byte event is never a valid outbox payload (symmetric with the Function overload's guard)
    • publish

      public UUID publish(String topic, Function<UUID,byte[]> payloadFn, int schemaVersion, UUID aggregateId)
      Atomically inserts an outbox row in OutboxRecord.Status.PENDING and registers an AFTER_COMMIT hook that attempts the Kafka send fire-and-forget — accepting a deferred-serialisation Function<UUID, byte[]> so the payload's embedded event_id field stays in lockstep with the outbox row's event_id (Memora #3555 / #3556).

      Per ADR-0014 D-9 the relay is NOT transactional EOS — when the AFTER_COMMIT send fails or the circuit-breaker is OPEN, the row is left PENDING; the poller (cold path) owns recovery.

      Invocation timing. payloadFn is invoked synchronously on the calling thread, BEFORE the outbox row is persisted and BEFORE the AFTER_COMMIT hook is registered.

      Invocation count. payloadFn is invoked exactly once per publish call. The library does NOT retry serialisation, does NOT cache the result, and does NOT invoke payloadFn from the AFTER_COMMIT hook or the poller — the bytes it returns are captured once and reused for both the persisted column and the hot relay.

      Lockstep invariant. payloadFn MUST embed the supplied eventId in the returned payload's event_id field. Failing to do so defeats the purpose of this overload and reproduces the Memora #3555 bug (Kafka ↔ outbox correlation by event_id is broken).

      Thread-safety. payloadFn is invoked on the calling thread only; it is never invoked on a Kafka client thread nor from the AFTER_COMMIT hook.

      Purity. payloadFn SHOULD complete promptly — it executes inside the business transaction's critical section; avoid I/O, blocking calls, and long computation. It MAY read from the current persistence context, but MUST NOT issue writes via a separate persistence context or a REQUIRES_NEW boundary.

      Exception semantics. Any exception thrown by payloadFn propagates unmodified to the caller; the outbox row is NOT persisted, the AFTER_COMMIT hook is NOT registered, and the enclosing @Transactional rolls back per the caller's rollbackFor (Spring's default: RuntimeException + Error). The library does NOT catch or rewrap the exception.

      Return value + defensive clone. Returns the outbox-issued eventId (the same UUID passed to payloadFn). The byte[] returned by payloadFn is defensively cloned, so the caller MAY mutate or reuse it after this method returns without affecting what is persisted or published.

      Parameters:
      topic - Kafka topic name (not null)
      payloadFn - deferred serialiser receiving the outbox-issued eventId and returning the Avro/JSON-encoded event bytes (not null; must return a non-null, non-empty array)
      schemaVersion - Avro schema_version field; mirrored on the row for operator triage
      aggregateId - aggregate identifier — partition-key hint passed to Kafka (not null)
      Returns:
      the outbox-issued eventId (UUIDv7); identical to the value passed to payloadFn
      Throws:
      IllegalStateException - when invoked outside an active transaction — the outbox INSERT MUST share the business transaction boundary; otherwise the row commits independently and breaks atomicity
      NullPointerException - when payloadFn returns null
      IllegalArgumentException - when payloadFn returns a zero-length array