Class DedupGuard

java.lang.Object
com.aim2be.platform.dedup.DedupGuard

public class DedupGuard extends Object
Single public API for consumer-side Kafka dedup, per ADR-0014 D-4.

Pattern: a Kafka consumer calls tryClaim(String, String, Instant, String, int, long) immediately after deserialising a message. If the call returns true the consumer proceeds with its business logic (this consumer pod won the race to claim the event). If false the message is silently skipped — a different pod (or this pod on a previous restart) already processed it.

Internally, the call performs an atomic INSERT-OR-NOTHING into processed_kafka_events keyed by (consumer_scope_id, event_id, week_start) where week_start is derived from the event's logical timestamp (the Kafka producer's intent, not the consumer's clock) — per ADR-0014 §L-10 this stabilises idempotency under reprocessing.

Week-start derivation uses UTC normalisation explicitly: the boundary is "Monday 00:00 UTC". This avoids the operator's local timezone (CET/CEST) causing different week assignments for events near the boundary. The choice is documented because it has a visible effect: a Sunday 23:30 UTC event maps to that ISO week's Monday; a Monday 00:30 UTC event maps to the next Monday — Postgres partition routing follows.

Per-pod broadcast consumers (notification-service push) get per-pod dedup by passing a pod-suffixed scope (e.g. "notification-service-pod-7") — see ADR-0014 §L-12.

Observability:

  • OTel span im2be.dedup.try_claim with attributes consumer.scope_id, event.id, event.week_start, kafka.topic, kafka.partition, kafka.offset, dedup.outcomeclaimed | duplicate.
  • Micrometer counter im2be_dedup_claims_total labelled consumer_scope_id + outcome — see DedupMetricsBinder.
  • Field Details

  • Constructor Details

    • DedupGuard

      public DedupGuard(ProcessedKafkaEventRepository repository, DedupMetricsBinder metrics, Supplier<io.opentelemetry.api.trace.Tracer> tracerSupplier, Clock clock, String defaultScopeId)
      Constructs a guard wired to a JPA repository, a Micrometer binder, a deferred OTel tracer supplier (re-resolved per tryClaim call so a late-registered OTel SDK is honoured), a clock for firstSeenAt stamping, and an optional default scope id used by tryClaim(String, Instant, String, int, long) (the no-scope overload).
      Parameters:
      repository - the JPA repository (must not be null)
      metrics - the Micrometer binder for counter increments (must not be null)
      tracerSupplier - deferred Tracer supplier (must not be null; must not itself return null when invoked). Re-invoked per tryClaim call — a late OTel SDK registration becomes effective without bean re-creation. Tests may pass a constant () -> TracerProvider.noop().get("test").
      clock - clock used to stamp first_seen_at on inserted rows; pass Clock.systemUTC() in production and a fixed clock in tests that assert on the timestamp
      defaultScopeId - default consumer_scope_id used by the no-scope tryClaim overload; may be null or blank — callers that hit the overload with no default configured will get an IllegalStateException
  • Method Details

    • tryClaim

      @Transactional(propagation=REQUIRES_NEW) public boolean tryClaim(String consumerScopeId, String eventId, Instant eventTime, String topic, int partition, long offset)
      Atomically attempts to mark an event as processed for the given consumer.

      Per ADR-0014 D-4: the compound PK (consumer_scope_id, event_id, week_start) is the dedup contract. The week_start is derived from the event's timestamp (NOT ingestion) per §L-10 — this stabilises idempotency under reprocessing. Per §L-12: instance-id-style scoping for per-pod broadcast consumers is achievable by passing a different consumer_scope_id per pod.

      Transactional: the INSERT runs in its own transaction so that a subsequent business-logic failure does NOT roll back the dedup mark. Rolling back the mark would let a retry deliver the same event twice to a fresh handler invocation — the opposite of what dedup is for. Operators who want at-least-once-with-rollback semantics should NOT use this guard.

      Parameters:
      consumerScopeId - logical consumer identity; must not be blank
      eventId - event identifier (UUID rendered as text); must not be blank
      eventTime - event's logical timestamp (used to derive the week-aligned partition row); must not be null
      topic - origin Kafka topic (forensic)
      partition - origin Kafka partition (forensic)
      offset - origin Kafka offset (forensic)
      Returns:
      true iff this call won the race (consumer should proceed with handler); false iff the row already existed (consumer should skip the message)
      Throws:
      IllegalArgumentException - if consumerScopeId or eventId is blank
      NullPointerException - if eventTime or topic is null
    • tryClaim

      @Transactional(propagation=REQUIRES_NEW) public boolean tryClaim(String eventId, Instant eventTime, String topic, int partition, long offset)
      No-scope convenience overload. Internally delegates to the shared tryClaimInternal core (NOT the public explicit-scope overload — self-invocation through this would bypass the Spring @Transactional(REQUIRES_NEW) proxy). The proxy intercepts THIS public entry point; the internal core runs within the established transaction. Uses the defaultScopeId configured at construction time (typically bound from im2be.dedup.default-scope-id). Consumers that always use a single scope (the common case: one canonical service name per pod) can call this overload to avoid threading the scope through every handler.

      Per-pod broadcast consumers (ADR-0014 §L-12) cannot use this overload — they MUST use the explicit-scope overload with a pod-suffixed scope.

      Throws:
      IllegalStateException - if no default scope was configured at construction time (i.e. im2be.dedup.default-scope-id is unset or blank)
      IllegalArgumentException - if eventId is blank
      NullPointerException - if eventTime or topic is null