Class RedisOutboxPollerWorker

java.lang.Object
com.aim2be.platform.outbox.redis.RedisOutboxPollerWorker

public class RedisOutboxPollerWorker extends Object
Cold-path poller for the Redis backend (refinements #4 + #5).

Single-writer lock — duplicate-suppression, not a hard mutex (refinement #4). Each tick tries to acquire :lock with SET key <token> NX PX <ttl> carrying a unique owner token. If another replica holds it the tick is skipped. The lock CAN expire mid-tick, so the poller does two things to bound (never eliminate) duplicates:

  1. Bounds the batch send-time below the TTL. Before each row it checks whether the next send could exceed the lock's remaining lease (lockTtl - elapsed); when the elapsed time crosses lockTtl - lockRenewMargin it RENEWS the lock via a compare-and-renew Lua script (PEXPIRE guarded by the owner token).
  2. Aborts the batch when it has lost the lock. If a renew returns 0 (another replica took the lock after ours expired), the poller stops the batch immediately rather than racing the new holder.
Any duplicate that still slips through (lock expiry between the last renew and a send) is absorbed by consumer-side dedup (processed-kafka-events); the bound guarantees it is at most a few rows, never a storm.

Release via compare-and-delete (refinement #4). At tick end the lock is released by a Lua script that DELs the key ONLY when the owner token still matches — a plain DEL could delete a lock a different replica re-acquired after ours expired.

Due fetch scored by nextAttemptAt (refinement #5). The batch is OutboxBackend.findPendingBatch(int) which fetches only DUE entries (ZRANGEBYSCORE :pending -inf now); a failed send re-scores the entry to now + backoff so a broker outage does not hot-loop the oldest entries.

Gated behind im2be.outbox.redis.poller.enabled=true (default).

  • Constructor Details

    • RedisOutboxPollerWorker

      public RedisOutboxPollerWorker(OutboxBackend backend, org.springframework.kafka.core.KafkaTemplate<byte[],byte[]> kafkaTemplate, org.springframework.data.redis.core.StringRedisTemplate redis, RedisOutboxScripts scripts, RedisOutboxKeys keys, RedisOutboxProperties properties, RedisOutboxMetrics metrics)
      Parameters:
      backend - the Redis SPI backend (batch fetch + transitions)
      kafkaTemplate - byte-key/byte-value Kafka template
      redis - Redis template for the lock acquire (SET NX PX)
      scripts - Lua scripts (lock renew + release)
      keys - the hash-tagged key set (the lock key)
      properties - poller config (lock TTL, renew margin, batch size…)
      metrics - metrics binder
  • Method Details

    • pollAndPublishPending

      @Scheduled(fixedDelayString="${im2be.outbox.redis.poller.interval-ms:5000}") public void pollAndPublishPending()
      Scheduled tick. Acquires the single-writer lock; on success sweeps a bounded batch of DUE PENDING entries and relays each to Kafka, renewing the lock as it drains and aborting if it loses it; always releases the lock (compare-and-delete) at the end.

      Cadence governed by im2be.outbox.redis.poller.interval-ms (default 5000ms).