Class OutboxPollerWorker

java.lang.Object
com.aim2be.platform.outbox.OutboxPollerWorker

public class OutboxPollerWorker extends Object
Cold-path poller that owns the PENDING → SENT transition for outbox rows the hot AFTER_COMMIT relay did not finish (broker outage, process crash, circuit-breaker OPEN window, etc.).

Per ADR-0014 D-7, the poller is the source of truth for the SENT transition. Each tick:

  1. Acquires the outbox-poller ShedLock — only one replica processes a tick at a time.
  2. Fetches up to im2be.outbox.poller.batch-size PENDING rows in createdAt ASC order.
  3. For each row, performs a synchronous Kafka send. The cold path does NOT wrap the send in a circuit-breaker — the breaker exists to protect the hot path's commit-time critical section; the cold path is already off-cycle, so trying every row each tick is the correct semantic.
  4. Success → markSent (clears lastError); failure → markFailureAttempt (bumps retries + persists truncated error message). Row stays PENDING on failure for the next tick.

OTel span im2be.outbox.poller.tick per invocation with attributes batch.size, success.count, failure.count.

The worker is gated behind im2be.outbox.poller.enabled=true (default) — disabling it is useful in dev-only contexts where the operator inspects PENDING rows manually.

  • Field Details

    • LOCK_NAME

      public static final String LOCK_NAME
      ShedLock name; matches the value scoped in SchedulerLock.
      See Also:
  • Constructor Details

    • OutboxPollerWorker

      public OutboxPollerWorker(OutboxBackend backend, org.springframework.kafka.core.KafkaTemplate<byte[],byte[]> kafkaTemplate, OutboxProperties properties, io.opentelemetry.api.OpenTelemetry openTelemetry, OutboxMetricsBinder metrics)
      Constructs the worker with all collaborators autowired by OutboxAutoConfiguration.

      Wave A.2 Phase 0a — the former repository + completion collaborators are now collapsed behind the OutboxBackend storage SPI (design doc refinement #11). For the default PgOutboxBackend the FIFO batch fetch + transition semantics are identical to the pre-extraction code (zero behaviour change).

      Parameters:
      backend - storage SPI — supplies the FIFO PENDING-row batch and drives the markSent / markFailureAttempt transitions (the default PG impl wraps the JPA repository + the REQUIRES_NEW completion helper)
      kafkaTemplate - Spring Kafka template (byte-key, byte-value)
      properties - type-safe configuration (poller batch size etc.)
      openTelemetry - OpenTelemetry instance for the span tracer
      metrics - metrics binder for latency/failure counters
  • Method Details

    • pollAndPublishPending

      @Scheduled(fixedDelayString="${im2be.outbox.poller.interval-ms:5000}") public void pollAndPublishPending()
      Scheduled tick. Distributed-locked via ShedLock so only one replica sweeps PENDING rows at a time.

      Cadence governed by im2be.outbox.poller.interval-ms (default 5000ms). Batch size by im2be.outbox.poller.batch-size (default 500).

      NOT wrapped in @Transactional — each row's send is independent, and the status-transition writes occur in their own REQUIRES_NEW transactions inside OutboxPublishCompletion.