Class DedupGuard
Pattern: a Kafka consumer calls
tryClaim(String, String, Instant, String, int, long) immediately
after deserialising a message. If the call returns true the
consumer proceeds with its business logic (this consumer pod won the race
to claim the event). If false the message is silently skipped — a
different pod (or this pod on a previous restart) already processed it.
Internally, the call performs an atomic INSERT-OR-NOTHING into
processed_kafka_events keyed by
(consumer_scope_id, event_id, week_start) where week_start
is derived from the event's logical timestamp (the Kafka producer's
intent, not the consumer's clock) — per ADR-0014 §L-10 this stabilises
idempotency under reprocessing.
Week-start derivation uses UTC normalisation explicitly: the boundary
is "Monday 00:00 UTC". This avoids the operator's local timezone (CET/CEST)
causing different week assignments for events near the boundary. The
choice is documented because it has a visible effect: a Sunday
23:30 UTC event maps to that ISO week's Monday; a Monday 00:30
UTC event maps to the next Monday — Postgres partition routing follows.
Per-pod broadcast consumers (notification-service push) get per-pod
dedup by passing a pod-suffixed scope (e.g. "notification-service-pod-7")
— see ADR-0014 §L-12.
Observability:
- OTel span
im2be.dedup.try_claimwith attributesconsumer.scope_id,event.id,event.week_start,kafka.topic,kafka.partition,kafka.offset,dedup.outcome∈claimed | duplicate. - Micrometer counter
im2be_dedup_claims_totallabelledconsumer_scope_id+outcome— seeDedupMetricsBinder.
-
Field Summary
Fields -
Constructor Summary
ConstructorsConstructorDescriptionDedupGuard(ProcessedKafkaEventRepository repository, DedupMetricsBinder metrics, Supplier<io.opentelemetry.api.trace.Tracer> tracerSupplier, Clock clock, String defaultScopeId) Constructs a guard wired to a JPA repository, a Micrometer binder, a deferred OTel tracer supplier (re-resolved pertryClaimcall so a late-registered OTel SDK is honoured), a clock forfirstSeenAtstamping, and an optional default scope id used bytryClaim(String, Instant, String, int, long)(the no-scope overload). -
Method Summary
-
Field Details
-
OUTCOME_CLAIMED
Outcome attribute values + metric tag values.- See Also:
-
OUTCOME_DUPLICATE
- See Also:
-
-
Constructor Details
-
DedupGuard
public DedupGuard(ProcessedKafkaEventRepository repository, DedupMetricsBinder metrics, Supplier<io.opentelemetry.api.trace.Tracer> tracerSupplier, Clock clock, String defaultScopeId) Constructs a guard wired to a JPA repository, a Micrometer binder, a deferred OTel tracer supplier (re-resolved pertryClaimcall so a late-registered OTel SDK is honoured), a clock forfirstSeenAtstamping, and an optional default scope id used bytryClaim(String, Instant, String, int, long)(the no-scope overload).- Parameters:
repository- the JPA repository (must not benull)metrics- the Micrometer binder for counter increments (must not benull)tracerSupplier- deferred Tracer supplier (must not benull; must not itself returnnullwhen invoked). Re-invoked pertryClaimcall — a late OTel SDK registration becomes effective without bean re-creation. Tests may pass a constant() -> TracerProvider.noop().get("test").clock- clock used to stampfirst_seen_aton inserted rows; passClock.systemUTC()in production and a fixed clock in tests that assert on the timestampdefaultScopeId- defaultconsumer_scope_idused by the no-scopetryClaimoverload; may benullor blank — callers that hit the overload with no default configured will get anIllegalStateException
-
-
Method Details
-
tryClaim
@Transactional(propagation=REQUIRES_NEW) public boolean tryClaim(String consumerScopeId, String eventId, Instant eventTime, String topic, int partition, long offset) Atomically attempts to mark an event as processed for the given consumer.Per ADR-0014 D-4: the compound PK
(consumer_scope_id, event_id, week_start)is the dedup contract. The week_start is derived from the event's timestamp (NOT ingestion) per §L-10 — this stabilises idempotency under reprocessing. Per §L-12: instance-id-style scoping for per-pod broadcast consumers is achievable by passing a differentconsumer_scope_idper pod.Transactional: the INSERT runs in its own transaction so that a subsequent business-logic failure does NOT roll back the dedup mark. Rolling back the mark would let a retry deliver the same event twice to a fresh handler invocation — the opposite of what dedup is for. Operators who want at-least-once-with-rollback semantics should NOT use this guard.
- Parameters:
consumerScopeId- logical consumer identity; must not be blankeventId- event identifier (UUID rendered as text); must not be blankeventTime- event's logical timestamp (used to derive the week-aligned partition row); must not benulltopic- origin Kafka topic (forensic)partition- origin Kafka partition (forensic)offset- origin Kafka offset (forensic)- Returns:
trueiff this call won the race (consumer should proceed with handler);falseiff the row already existed (consumer should skip the message)- Throws:
IllegalArgumentException- ifconsumerScopeIdoreventIdis blankNullPointerException- ifeventTimeortopicisnull
-
tryClaim
@Transactional(propagation=REQUIRES_NEW) public boolean tryClaim(String eventId, Instant eventTime, String topic, int partition, long offset) No-scope convenience overload. Internally delegates to the sharedtryClaimInternalcore (NOT the public explicit-scope overload — self-invocation throughthiswould bypass the Spring@Transactional(REQUIRES_NEW)proxy). The proxy intercepts THIS public entry point; the internal core runs within the established transaction. Uses thedefaultScopeIdconfigured at construction time (typically bound fromim2be.dedup.default-scope-id). Consumers that always use a single scope (the common case: one canonical service name per pod) can call this overload to avoid threading the scope through every handler.Per-pod broadcast consumers (ADR-0014 §L-12) cannot use this overload — they MUST use the explicit-scope overload with a pod-suffixed scope.
- Throws:
IllegalStateException- if no default scope was configured at construction time (i.e.im2be.dedup.default-scope-idis unset or blank)IllegalArgumentException- ifeventIdis blankNullPointerException- ifeventTimeortopicisnull
-