Class DedupGuard

java.lang.Object
com.aim2be.platform.dedup.DedupGuard

public class DedupGuard extends Object
Single public API for consumer-side Kafka dedup, per ADR-0014 D-4.

Pattern: a Kafka consumer calls tryClaim(String, String, Instant, String, int, long) immediately after deserialising a message. If the call returns true the consumer proceeds with its business logic (this consumer pod won the race to claim the event). If false the message is silently skipped — a different pod (or this pod on a previous restart) already processed it.

Two propagation variants of the claim exist; both share the same INSERT-OR-NOTHING core and differ only in delivery guarantee on the business-failure path:

  • tryClaimPropagation.REQUIRES_NEW: the claim commits in a SEPARATE transaction, so a later business-logic failure does NOT roll it back → at-most-once-on-failure (no poison-message redelivery loop). Use for idempotent or loss-tolerant consumers.
  • tryClaimSameTxPropagation.MANDATORY: the claim JOINS the caller's transaction, so a business-logic failure rolls the claim back and the event is redelivered → at-least-once (REQUIRES a DLT + retry-limit on the consumer). Use when every event must be processed at least once.
See each method's Javadoc for the full contrast table and tradeoffs.

Internally, the call performs an atomic INSERT-OR-NOTHING into processed_kafka_events keyed by (consumer_scope_id, event_id, week_start) where week_start is derived from the event's logical timestamp (the Kafka producer's intent, not the consumer's clock) — per ADR-0014 §L-10 this stabilises idempotency under reprocessing.

Week-start derivation uses UTC normalisation explicitly: the boundary is "Monday 00:00 UTC". This avoids the operator's local timezone (CET/CEST) causing different week assignments for events near the boundary. The choice is documented because it has a visible effect: a Sunday 23:30 UTC event maps to that ISO week's Monday; a Monday 00:30 UTC event maps to the next Monday — Postgres partition routing follows.

Per-pod broadcast consumers (notification-service push) get per-pod dedup by passing a pod-suffixed scope (e.g. "notification-service-pod-7") — see ADR-0014 §L-12.

Observability:

  • OTel span im2be.dedup.try_claim with attributes consumer.scope_id, event.id, event.week_start, kafka.topic, kafka.partition, kafka.offset, dedup.outcomeclaimed | duplicate.
  • Micrometer counter im2be_dedup_claims_total labelled consumer_scope_id + outcome — see DedupMetricsBinder.
  • Field Details

  • Constructor Details

    • DedupGuard

      public DedupGuard(ProcessedKafkaEventRepository repository, DedupMetricsBinder metrics, Supplier<io.opentelemetry.api.trace.Tracer> tracerSupplier, Clock clock, String defaultScopeId)
      Constructs a guard wired to a JPA repository, a Micrometer binder, a deferred OTel tracer supplier (re-resolved per tryClaim call so a late-registered OTel SDK is honoured), a clock for firstSeenAt stamping, and an optional default scope id used by tryClaim(String, Instant, String, int, long) (the no-scope overload).
      Parameters:
      repository - the JPA repository (must not be null)
      metrics - the Micrometer binder for counter increments (must not be null)
      tracerSupplier - deferred Tracer supplier (must not be null; must not itself return null when invoked). Re-invoked per tryClaim call — a late OTel SDK registration becomes effective without bean re-creation. Tests may pass a constant () -> TracerProvider.noop().get("test").
      clock - clock used to stamp first_seen_at on inserted rows; pass Clock.systemUTC() in production and a fixed clock in tests that assert on the timestamp
      defaultScopeId - default consumer_scope_id used by the no-scope tryClaim overload; may be null or blank — callers that hit the overload with no default configured will get an IllegalStateException
  • Method Details

    • tryClaim

      @Transactional(propagation=REQUIRES_NEW) public boolean tryClaim(String consumerScopeId, String eventId, Instant eventTime, String topic, int partition, long offset)
      Atomically attempts to mark an event as processed for the given consumer.

      Per ADR-0014 D-4: the compound PK (consumer_scope_id, event_id, week_start) is the dedup contract. The week_start is derived from the event's timestamp (NOT ingestion) per §L-10 — this stabilises idempotency under reprocessing. Per §L-12: instance-id-style scoping for per-pod broadcast consumers is achievable by passing a different consumer_scope_id per pod.

      Transactional: the INSERT runs in its own transaction so that a subsequent business-logic failure does NOT roll back the dedup mark. Rolling back the mark would let a retry deliver the same event twice to a fresh handler invocation — the opposite of what dedup is for. Operators who want at-least-once-with-rollback semantics should NOT use this guard.

      Parameters:
      consumerScopeId - logical consumer identity; must not be blank
      eventId - event identifier (UUID rendered as text); must not be blank
      eventTime - event's logical timestamp (used to derive the week-aligned partition row); must not be null
      topic - origin Kafka topic (forensic)
      partition - origin Kafka partition (forensic)
      offset - origin Kafka offset (forensic)
      Returns:
      true iff this call won the race (consumer should proceed with handler); false iff the row already existed (consumer should skip the message)
      Throws:
      IllegalArgumentException - if consumerScopeId or eventId is blank
      NullPointerException - if eventTime or topic is null
    • tryClaimSameTx

      @Transactional(propagation=MANDATORY) public boolean tryClaimSameTx(String consumerScopeId, String eventId, Instant eventTime, String topic, int partition, long offset)
      At-least-once sibling of tryClaim(String, String, Instant, String, int, long): the dedup claim JOINS the caller's transaction (Propagation.MANDATORY) instead of committing in a separate one.

      Semantic contrast — read this before choosing an overload. Both methods share the identical INSERT-OR-NOTHING core; they differ ONLY in transaction propagation, and that single difference flips the delivery guarantee on the business-logic failure path:

      tryClaim vs tryClaimSameTx
      tryClaim (REQUIRES_NEW) tryClaimSameTx (MANDATORY)
      Claim transaction Separate — commits independently of the caller. The caller's — commits/rolls back atomically with it.
      Business logic then fails (caller tx rolls back) Claim is already committed → event is NOT redelivered to a fresh handler → at-most-once-on-failure (the event is effectively dropped; there is NO poison-message redelivery loop). Claim rolls back WITH the business tx → the same event is redelivered and re-processed on the next poll → at-least-once.
      Requires an active caller transaction? No — opens its own. Yes — fails fast with IllegalTransactionStateException if none is in progress.
      Use when The consumer is idempotent OR loss-tolerant — e.g. push notifications, where re-sending on every retry is worse than occasionally dropping one. The consumer MUST process every event at least once AND has a DLT + retry-limit (or a poison message redelivers forever — see the warning below).

      Propagation choice — MANDATORY, not REQUIRED, is deliberate. MANDATORY asserts the consumer already runs inside a transaction and fails fast (throwing IllegalTransactionStateException) when it does not — surfacing a misconfiguration at the first call instead of silently opening a private transaction that would defeat the at-least-once contract (a REQUIRED claim with no enclosing tx would commit on its own, degrading silently to at-most-once). This mirrors the OutboxPublisher.publish(...) fail-fast philosophy (ADR-0014 D-7): a claim that is supposed to share the business transaction boundary must verify that boundary exists.

      WARNING — poison-message redelivery. Because the claim rolls back with the business transaction, a deterministically-failing event (a "poison message") will be redelivered and re-fail forever unless the consumer caps redelivery. Callers of tryClaimSameTx MUST configure a Dead-Letter Topic plus a retry/back-off limit (e.g. spring-kafka DefaultErrorHandler with a DeadLetterPublishingRecoverer and a bounded BackOff); otherwise the partition stalls on the first un-processable record. tryClaim has no such requirement precisely because it never redelivers.

      All other behaviour (compound-PK contract, logical-timestamp week-start derivation per §L-10, OTel span, Micrometer counter, validation) is identical to tryClaim — see that method and the class Javadoc.

      Parameters:
      consumerScopeId - logical consumer identity; must not be blank
      eventId - event identifier (UUID rendered as text); must not be blank
      eventTime - event's logical timestamp (used to derive the week-aligned partition row); must not be null
      topic - origin Kafka topic (forensic)
      partition - origin Kafka partition (forensic)
      offset - origin Kafka offset (forensic)
      Returns:
      true iff this call won the race (consumer should proceed with handler); false iff the row already existed (consumer should skip the message)
      Throws:
      org.springframework.transaction.IllegalTransactionStateException - if invoked with no active transaction on the calling thread (MANDATORY)
      IllegalArgumentException - if consumerScopeId or eventId is blank
      NullPointerException - if eventTime or topic is null
    • tryClaim

      @Transactional(propagation=REQUIRES_NEW) public boolean tryClaim(String eventId, Instant eventTime, String topic, int partition, long offset)
      No-scope convenience overload. Internally delegates to the shared tryClaimInternal core (NOT the public explicit-scope overload — self-invocation through this would bypass the Spring @Transactional(REQUIRES_NEW) proxy). The proxy intercepts THIS public entry point; the internal core runs within the established transaction. Uses the defaultScopeId configured at construction time (typically bound from im2be.dedup.default-scope-id). Consumers that always use a single scope (the common case: one canonical service name per pod) can call this overload to avoid threading the scope through every handler.

      Per-pod broadcast consumers (ADR-0014 §L-12) cannot use this overload — they MUST use the explicit-scope overload with a pod-suffixed scope.

      Throws:
      IllegalStateException - if no default scope was configured at construction time (i.e. im2be.dedup.default-scope-id is unset or blank)
      IllegalArgumentException - if eventId is blank
      NullPointerException - if eventTime or topic is null
    • tryClaimSameTx

      @Transactional(propagation=MANDATORY) public boolean tryClaimSameTx(String eventId, Instant eventTime, String topic, int partition, long offset)
      No-scope convenience overload of tryClaimSameTx(String, String, Instant, String, int, long) — the at-least-once (Propagation.MANDATORY) counterpart of the no-scope tryClaim(String, Instant, String, int, long).

      Uses the defaultScopeId configured at construction time (typically bound from im2be.dedup.default-scope-id). The at-most-once vs at-least-once tradeoff, the MANDATORY-not-REQUIRED rationale, and the mandatory-DLT poison-message warning are all documented on tryClaimSameTx(String, String, Instant, String, int, long) — consult that method before adopting this variant.

      Like the no-scope tryClaim overload, it delegates to the shared tryClaimInternal core (NOT the public explicit-scope overload — self-invocation through this would bypass the Spring @Transactional(MANDATORY) proxy). The proxy intercepts THIS public entry point; the internal core runs within the established transaction.

      Per-pod broadcast consumers (ADR-0014 §L-12) cannot use this overload — they MUST use the explicit-scope overload with a pod-suffixed scope.

      Parameters:
      eventId - event identifier (UUID rendered as text); must not be blank
      eventTime - event's logical timestamp; must not be null
      topic - origin Kafka topic (forensic)
      partition - origin Kafka partition (forensic)
      offset - origin Kafka offset (forensic)
      Returns:
      true iff this call won the race (consumer should proceed with handler); false iff the row already existed (consumer should skip the message)
      Throws:
      org.springframework.transaction.IllegalTransactionStateException - if invoked with no active transaction on the calling thread (MANDATORY)
      IllegalStateException - if no default scope was configured at construction time (i.e. im2be.dedup.default-scope-id is unset or blank)
      IllegalArgumentException - if eventId is blank
      NullPointerException - if eventTime or topic is null