Class RedisOutboxBackend

java.lang.Object
com.aim2be.platform.outbox.redis.RedisOutboxBackend
All Implemented Interfaces:
OutboxBackend

public class RedisOutboxBackend extends Object implements OutboxBackend
Redis Lua-EVAL implementation of the OutboxBackend SPI (Wave A.2 Phase 0b, design doc .planning/32-wave-a2-redis-outbox-design.md). The store that keeps identity-service's ticket mint and its outbox event atomic, because the mint is itself a Redis op (ADR-0014 D-3 / 0014-supporting/identity-redis-outbox-backend.md).

Transaction contract — requiresActiveTransaction() returns false. Atomicity is achieved by a single-slot Lua EVAL (refinement #8 — hash-tagged keys), NOT by a Spring transaction. This relaxes the publisher's persistPending active-transaction guard. CRUCIALLY, per the OutboxBackend.requiresActiveTransaction() @apiNote, the PG OutboxPublisher's AFTER_COMMIT hot-relay registration is UNCONDITIONAL and still throws outside a Spring transaction — so this backend does NOT route through that publisher. Phase 0b owns its OWN inline relay (RedisOutboxRelay) + cold poller (RedisOutboxPollerWorker); the SPI's persistPending / findPendingBatch / markSent / markFailureAttempt are still implemented faithfully so the contract is uniform across backends, but the orchestration around them is Redis-specific.

Atomicity (refinement #1). Every mutation is a single Lua script: outbox-enqueue.lua (HSET entry + ZADD pending, fail-closed ordered), outbox-mark-sent.lua, outbox-mark-failure.lua. A runtime error mid-script does NOT roll back earlier writes (Redis is atomic, NOT transactional — verified against https://redis.io/docs/latest/develop/interact/programmability/eval-intro/), so the scripts pre-check key types and order writes so any partial state is recoverable by RedisOutboxRepairWorker (refinement #10).

Pending score (refinement #5). The :pending sorted-set is scored by nextAttemptAt, NOT createdAt, so a Kafka outage does not hot-loop the oldest entries. findPendingBatch(int) fetches ZRANGEBYSCORE … -inf <now> (only DUE entries).

Parked FAILED (refinement #3). markFailureAttempt parks an entry to FAILED at the retry budget — retained in the Hash, removed from :pending, never auto-deleted; an operator re-arms it.

This backend bean is stateless + thread-safe; the ObjectMapper and StringRedisTemplate are thread-safe singletons.

  • Constructor Details

    • RedisOutboxBackend

      public RedisOutboxBackend(org.springframework.data.redis.core.StringRedisTemplate redis, RedisOutboxScripts scripts, RedisOutboxKeys keys, com.fasterxml.jackson.databind.ObjectMapper objectMapper, RedisOutboxBackoff backoff, RedisOutboxMetrics metrics)
      Constructs the backend.
      Parameters:
      redis - string-typed Redis template (keys + JSON values are strings, sorted-set scores numeric); never null
      scripts - the loaded Lua script holder; never null
      keys - the hash-tagged key set; never null
      objectMapper - JSON (de)serialiser for RedisOutboxEntry; never null
      backoff - exponential-backoff calculator for failure re-scoring (refinement #5); never null
      metrics - metrics binder (parked-FAILED + enqueue counters); never null
  • Method Details

    • persistPending

      public void persistPending(OutboxRecord record)

      Enqueues a fresh PENDING entry via outbox-enqueue.lua — HSET the Hash field + ZADD the :pending index in one fail-closed ordered script (refinement #1). A freshly-minted entry is immediately due, so its :pending score is now (refinement #5 — subsequent failure re-scores push it into the future). The script's HEXISTS guard makes a duplicate logical enqueue an idempotent no-op (refinement #12); the SPI returns void so a no-op is silently absorbed (it is logged at DEBUG + counted).

      Specified by:
      persistPending in interface OutboxBackend
      Throws:
      RedisOutboxStorageException - when JSON encoding or the EVAL fails
    • findPendingBatch

      public List<OutboxRecord> findPendingBatch(int batchSize)

      Fetches up to batchSize DUE entries: ZRANGEBYSCORE :pending -inf now bounded by LIMIT 0 batchSize (refinement #5 — only entries whose nextAttemptAt has elapsed), then a single HMGET for their JSON bodies. Members whose Hash field is missing (an orphan zset member — refinement #10) are skipped here and reconciled by RedisOutboxRepairWorker. The returned OutboxRecord views are in ascending-score (oldest-due-first) order.

      Specified by:
      findPendingBatch in interface OutboxBackend
    • markSent

      public int markSent(UUID aggregateId, UUID eventId, Instant sentAt)

      Runs outbox-mark-sent.lua: status-guarded PENDING→SENT (set sentAt, clear lastError, ZREM from :pending). Returns the real applied row-count — 1 when the transition applied, 0 when the entry was missing or already terminal (a racing terminal transition is a no-op, matching the SPI contract).

      Specified by:
      markSent in interface OutboxBackend
      Parameters:
      aggregateId - unused for the Redis key (the event_id is the Hash field); accepted for SPI symmetry + logging correlation
    • markFailureAttempt

      public int markFailureAttempt(UUID aggregateId, UUID eventId, String lastError, int maxRetries)

      Runs outbox-mark-failure.lua: status-guarded retries++ + stored error. On a retryable attempt the :pending score is re-set to now + backoff(retries) (refinement #5 — exponential, capped); at the budget the entry is PARKED to FAILED (refinement #3 — retained in the Hash, removed from :pending, never auto-deleted) and a parked-FAILED metric fires.

      The backoff exponent uses the entry's CURRENT retries from the store — the script reads it server-side, so the re-score is computed against the authoritative count, not a stale snapshot. The Java side computes the candidate nextAttemptAt from maxRetries-bounded backoff and passes it in; the script applies it only on the retryable branch.

      Specified by:
      markFailureAttempt in interface OutboxBackend
      Returns:
      1 when the attempt was recorded; 0 when no-op (missing or already terminal)
    • requiresActiveTransaction

      public boolean requiresActiveTransaction()

      Always false: atomicity is the single-slot Lua EVAL, not a Spring transaction. See the class Javadoc for why this backend cannot reuse the PG OutboxPublisher's AFTER_COMMIT relay and owns its own relay/poller instead.

      Specified by:
      requiresActiveTransaction in interface OutboxBackend
    • recordExpiryDue

      public void recordExpiryDue(UUID ticketId, Instant expiresAt)
      Mint-time primitive (refinement #2): records a ticket's expiration intent by ZADDing it into the :expiry-due sorted-set (score = expiresAt epoch-ms). The companion key shares the hash-tag so a future cluster keeps it co-located with the outbox (refinement #8).

      This is the durable expiration intent that expiry-decide.lua later consumes — Redis key TTL alone is NOT a business callback (keyspace notifications are timing-imprecise fire-and-forget Pub/Sub), so the due-time decision is driven off this explicit zset, not off key expiry.

      Parameters:
      ticketId - the ticket identifier (the zset member); never null
      expiresAt - the ticket's expiry instant (the zset score); never null
    • findDueExpiries

      public List<UUID> findDueExpiries(int limit)
      Fetches up to limit tickets whose expiry is due (score ≤ now) for the Phase-1 due-time worker to decide. Read-only primitive (ZRANGEBYSCORE -inf now LIMIT 0 limit); the actual Expired-vs-Revoked decision + enqueue is decideExpiry(java.util.UUID, java.util.UUID, com.aim2be.platform.outbox.redis.RedisOutboxEntry, java.time.Instant, boolean).
      Parameters:
      limit - maximum due tickets to return
      Returns:
      the due ticket ids (oldest-due-first), empty when none; never null
    • decideExpiry

      public RedisOutboxBackend.ExpiryDecision decideExpiry(UUID ticketId, UUID expiredEventId, RedisOutboxEntry expiredEntry, Instant dueCutoff, boolean alreadyRevoked)
      Atomic due-time decision (refinement #2 primitive): runs expiry-decide.lua to enqueue a TicketExpired outbox entry for a due ticket, unless it was already revoked (suppress) or already expired-enqueued (idempotent collapse via the deterministic UUIDv3 expiredEventId).

      NB: the expiredEventId MUST be a deterministic UUIDv3(ticket_id|expires_at) so a double-fire (worker crash + replay) produces the SAME outbox field — the script collapses it. Computing that id + supplying the TicketExpired RedisOutboxEntry payload is the Phase-1 identity wiring; Phase 0b tests it with a synthetic id + payload.

      TODO(Phase 1) — close the alreadyRevoked TOCTOU (reviewer R3 INFO). alreadyRevoked is computed by the caller BEFORE this EVAL; a concurrent TicketRevoked landing in that window makes the script enqueue TicketExpired anyway, so consumers can see BOTH events for one ticket. The deterministic UUIDv3 dedup (ADR-0014 D-4) + the audit pipeline's idempotency mitigate it, but the revocation-suppression branch is bypassed in the race. The robust fix is to move the revocation check INSIDE expiry-decide.lua against a Redis-resident revocation marker (e.g. EXISTS the revoked-ticket key / a revoked-set membership test) so the decision is atomic — this lands with the Phase-1 due-time worker that first calls this method (it is unreachable in Phase 0b, which only tests the primitive).

      Parameters:
      ticketId - the due ticket (the :expiry-due member)
      expiredEventId - deterministic UUIDv3 outbox event id
      expiredEntry - the TicketExpired entry to enqueue (status PENDING)
      dueCutoff - only decide members scored ≤ this instant
      alreadyRevoked - true when the ticket was already revoked (suppress the expiry)
      Returns:
      the decision outcome