Class DedupGuard
Pattern: a Kafka consumer calls
tryClaim(String, String, Instant, String, int, long) immediately
after deserialising a message. If the call returns true the
consumer proceeds with its business logic (this consumer pod won the race
to claim the event). If false the message is silently skipped — a
different pod (or this pod on a previous restart) already processed it.
Two propagation variants of the claim exist; both share the same INSERT-OR-NOTHING core and differ only in delivery guarantee on the business-failure path:
tryClaim—Propagation.REQUIRES_NEW: the claim commits in a SEPARATE transaction, so a later business-logic failure does NOT roll it back → at-most-once-on-failure (no poison-message redelivery loop). Use for idempotent or loss-tolerant consumers.tryClaimSameTx—Propagation.MANDATORY: the claim JOINS the caller's transaction, so a business-logic failure rolls the claim back and the event is redelivered → at-least-once (REQUIRES a DLT + retry-limit on the consumer). Use when every event must be processed at least once.
Internally, the call performs an atomic INSERT-OR-NOTHING into
processed_kafka_events keyed by
(consumer_scope_id, event_id, week_start) where week_start
is derived from the event's logical timestamp (the Kafka producer's
intent, not the consumer's clock) — per ADR-0014 §L-10 this stabilises
idempotency under reprocessing.
Week-start derivation uses UTC normalisation explicitly: the boundary
is "Monday 00:00 UTC". This avoids the operator's local timezone (CET/CEST)
causing different week assignments for events near the boundary. The
choice is documented because it has a visible effect: a Sunday
23:30 UTC event maps to that ISO week's Monday; a Monday 00:30
UTC event maps to the next Monday — Postgres partition routing follows.
Per-pod broadcast consumers (notification-service push) get per-pod
dedup by passing a pod-suffixed scope (e.g. "notification-service-pod-7")
— see ADR-0014 §L-12.
Observability:
- OTel span
im2be.dedup.try_claimwith attributesconsumer.scope_id,event.id,event.week_start,kafka.topic,kafka.partition,kafka.offset,dedup.outcome∈claimed | duplicate. - Micrometer counter
im2be_dedup_claims_totallabelledconsumer_scope_id+outcome— seeDedupMetricsBinder.
-
Field Summary
Fields -
Constructor Summary
ConstructorsConstructorDescriptionDedupGuard(ProcessedKafkaEventRepository repository, DedupMetricsBinder metrics, Supplier<io.opentelemetry.api.trace.Tracer> tracerSupplier, Clock clock, String defaultScopeId) Constructs a guard wired to a JPA repository, a Micrometer binder, a deferred OTel tracer supplier (re-resolved pertryClaimcall so a late-registered OTel SDK is honoured), a clock forfirstSeenAtstamping, and an optional default scope id used bytryClaim(String, Instant, String, int, long)(the no-scope overload). -
Method Summary
Modifier and TypeMethodDescriptionbooleantryClaim(String consumerScopeId, String eventId, Instant eventTime, String topic, int partition, long offset) Atomically attempts to mark an event as processed for the given consumer.booleanNo-scope convenience overload.booleantryClaimSameTx(String consumerScopeId, String eventId, Instant eventTime, String topic, int partition, long offset) At-least-once sibling oftryClaim(String, String, Instant, String, int, long): the dedup claim JOINS the caller's transaction (Propagation.MANDATORY) instead of committing in a separate one.booleantryClaimSameTx(String eventId, Instant eventTime, String topic, int partition, long offset) No-scope convenience overload oftryClaimSameTx(String, String, Instant, String, int, long)— the at-least-once (Propagation.MANDATORY) counterpart of the no-scopetryClaim(String, Instant, String, int, long).
-
Field Details
-
OUTCOME_CLAIMED
Outcome attribute values + metric tag values.- See Also:
-
OUTCOME_DUPLICATE
- See Also:
-
-
Constructor Details
-
DedupGuard
public DedupGuard(ProcessedKafkaEventRepository repository, DedupMetricsBinder metrics, Supplier<io.opentelemetry.api.trace.Tracer> tracerSupplier, Clock clock, String defaultScopeId) Constructs a guard wired to a JPA repository, a Micrometer binder, a deferred OTel tracer supplier (re-resolved pertryClaimcall so a late-registered OTel SDK is honoured), a clock forfirstSeenAtstamping, and an optional default scope id used bytryClaim(String, Instant, String, int, long)(the no-scope overload).- Parameters:
repository- the JPA repository (must not benull)metrics- the Micrometer binder for counter increments (must not benull)tracerSupplier- deferred Tracer supplier (must not benull; must not itself returnnullwhen invoked). Re-invoked pertryClaimcall — a late OTel SDK registration becomes effective without bean re-creation. Tests may pass a constant() -> TracerProvider.noop().get("test").clock- clock used to stampfirst_seen_aton inserted rows; passClock.systemUTC()in production and a fixed clock in tests that assert on the timestampdefaultScopeId- defaultconsumer_scope_idused by the no-scopetryClaimoverload; may benullor blank — callers that hit the overload with no default configured will get anIllegalStateException
-
-
Method Details
-
tryClaim
@Transactional(propagation=REQUIRES_NEW) public boolean tryClaim(String consumerScopeId, String eventId, Instant eventTime, String topic, int partition, long offset) Atomically attempts to mark an event as processed for the given consumer.Per ADR-0014 D-4: the compound PK
(consumer_scope_id, event_id, week_start)is the dedup contract. The week_start is derived from the event's timestamp (NOT ingestion) per §L-10 — this stabilises idempotency under reprocessing. Per §L-12: instance-id-style scoping for per-pod broadcast consumers is achievable by passing a differentconsumer_scope_idper pod.Transactional: the INSERT runs in its own transaction so that a subsequent business-logic failure does NOT roll back the dedup mark. Rolling back the mark would let a retry deliver the same event twice to a fresh handler invocation — the opposite of what dedup is for. Operators who want at-least-once-with-rollback semantics should NOT use this guard.
- Parameters:
consumerScopeId- logical consumer identity; must not be blankeventId- event identifier (UUID rendered as text); must not be blankeventTime- event's logical timestamp (used to derive the week-aligned partition row); must not benulltopic- origin Kafka topic (forensic)partition- origin Kafka partition (forensic)offset- origin Kafka offset (forensic)- Returns:
trueiff this call won the race (consumer should proceed with handler);falseiff the row already existed (consumer should skip the message)- Throws:
IllegalArgumentException- ifconsumerScopeIdoreventIdis blankNullPointerException- ifeventTimeortopicisnull
-
tryClaimSameTx
@Transactional(propagation=MANDATORY) public boolean tryClaimSameTx(String consumerScopeId, String eventId, Instant eventTime, String topic, int partition, long offset) At-least-once sibling oftryClaim(String, String, Instant, String, int, long): the dedup claim JOINS the caller's transaction (Propagation.MANDATORY) instead of committing in a separate one.Semantic contrast — read this before choosing an overload. Both methods share the identical INSERT-OR-NOTHING core; they differ ONLY in transaction propagation, and that single difference flips the delivery guarantee on the business-logic failure path:
tryClaimvstryClaimSameTxtryClaim(REQUIRES_NEW)tryClaimSameTx(MANDATORY)Claim transaction Separate — commits independently of the caller. The caller's — commits/rolls back atomically with it. Business logic then fails (caller tx rolls back) Claim is already committed → event is NOT redelivered to a fresh handler → at-most-once-on-failure (the event is effectively dropped; there is NO poison-message redelivery loop). Claim rolls back WITH the business tx → the same event is redelivered and re-processed on the next poll → at-least-once. Requires an active caller transaction? No — opens its own. Yes — fails fast with IllegalTransactionStateExceptionif none is in progress.Use when The consumer is idempotent OR loss-tolerant — e.g. push notifications, where re-sending on every retry is worse than occasionally dropping one. The consumer MUST process every event at least once AND has a DLT + retry-limit (or a poison message redelivers forever — see the warning below). Propagation choice — MANDATORY, not REQUIRED, is deliberate.
MANDATORYasserts the consumer already runs inside a transaction and fails fast (throwingIllegalTransactionStateException) when it does not — surfacing a misconfiguration at the first call instead of silently opening a private transaction that would defeat the at-least-once contract (aREQUIREDclaim with no enclosing tx would commit on its own, degrading silently to at-most-once). This mirrors theOutboxPublisher.publish(...)fail-fast philosophy (ADR-0014 D-7): a claim that is supposed to share the business transaction boundary must verify that boundary exists.WARNING — poison-message redelivery. Because the claim rolls back with the business transaction, a deterministically-failing event (a "poison message") will be redelivered and re-fail forever unless the consumer caps redelivery. Callers of
tryClaimSameTxMUST configure a Dead-Letter Topic plus a retry/back-off limit (e.g. spring-kafkaDefaultErrorHandlerwith aDeadLetterPublishingRecovererand a boundedBackOff); otherwise the partition stalls on the first un-processable record.tryClaimhas no such requirement precisely because it never redelivers.All other behaviour (compound-PK contract, logical-timestamp week-start derivation per §L-10, OTel span, Micrometer counter, validation) is identical to
tryClaim— see that method and the class Javadoc.- Parameters:
consumerScopeId- logical consumer identity; must not be blankeventId- event identifier (UUID rendered as text); must not be blankeventTime- event's logical timestamp (used to derive the week-aligned partition row); must not benulltopic- origin Kafka topic (forensic)partition- origin Kafka partition (forensic)offset- origin Kafka offset (forensic)- Returns:
trueiff this call won the race (consumer should proceed with handler);falseiff the row already existed (consumer should skip the message)- Throws:
org.springframework.transaction.IllegalTransactionStateException- if invoked with no active transaction on the calling thread (MANDATORY)IllegalArgumentException- ifconsumerScopeIdoreventIdis blankNullPointerException- ifeventTimeortopicisnull
-
tryClaim
@Transactional(propagation=REQUIRES_NEW) public boolean tryClaim(String eventId, Instant eventTime, String topic, int partition, long offset) No-scope convenience overload. Internally delegates to the sharedtryClaimInternalcore (NOT the public explicit-scope overload — self-invocation throughthiswould bypass the Spring@Transactional(REQUIRES_NEW)proxy). The proxy intercepts THIS public entry point; the internal core runs within the established transaction. Uses thedefaultScopeIdconfigured at construction time (typically bound fromim2be.dedup.default-scope-id). Consumers that always use a single scope (the common case: one canonical service name per pod) can call this overload to avoid threading the scope through every handler.Per-pod broadcast consumers (ADR-0014 §L-12) cannot use this overload — they MUST use the explicit-scope overload with a pod-suffixed scope.
- Throws:
IllegalStateException- if no default scope was configured at construction time (i.e.im2be.dedup.default-scope-idis unset or blank)IllegalArgumentException- ifeventIdis blankNullPointerException- ifeventTimeortopicisnull
-
tryClaimSameTx
@Transactional(propagation=MANDATORY) public boolean tryClaimSameTx(String eventId, Instant eventTime, String topic, int partition, long offset) No-scope convenience overload oftryClaimSameTx(String, String, Instant, String, int, long)— the at-least-once (Propagation.MANDATORY) counterpart of the no-scopetryClaim(String, Instant, String, int, long).Uses the
defaultScopeIdconfigured at construction time (typically bound fromim2be.dedup.default-scope-id). The at-most-once vs at-least-once tradeoff, the MANDATORY-not-REQUIRED rationale, and the mandatory-DLT poison-message warning are all documented ontryClaimSameTx(String, String, Instant, String, int, long)— consult that method before adopting this variant.Like the no-scope
tryClaimoverload, it delegates to the sharedtryClaimInternalcore (NOT the public explicit-scope overload — self-invocation throughthiswould bypass the Spring@Transactional(MANDATORY)proxy). The proxy intercepts THIS public entry point; the internal core runs within the established transaction.Per-pod broadcast consumers (ADR-0014 §L-12) cannot use this overload — they MUST use the explicit-scope overload with a pod-suffixed scope.
- Parameters:
eventId- event identifier (UUID rendered as text); must not be blankeventTime- event's logical timestamp; must not benulltopic- origin Kafka topic (forensic)partition- origin Kafka partition (forensic)offset- origin Kafka offset (forensic)- Returns:
trueiff this call won the race (consumer should proceed with handler);falseiff the row already existed (consumer should skip the message)- Throws:
org.springframework.transaction.IllegalTransactionStateException- if invoked with no active transaction on the calling thread (MANDATORY)IllegalStateException- if no default scope was configured at construction time (i.e.im2be.dedup.default-scope-idis unset or blank)IllegalArgumentException- ifeventIdis blankNullPointerException- ifeventTimeortopicisnull
-