Class KafkaHeaderUtils

java.lang.Object
com.aim2be.platform.dedup.KafkaHeaderUtils

public final class KafkaHeaderUtils extends Object
Shared Kafka header-parsing helpers for the dedup flow.

Currently exposes resolveEventId(ConsumerRecord) — the canonical derivation of the dedup event_id from a consumed Kafka record. Before this class existed, all five consuming services (user / family / notification / calendar / diary) carried a private byte-for-byte (modulo a cosmetic StringBuilder in diary) copy of the same helper in their respective KafkaConsumerService. The duplication is extracted here so the header-name constant and the header-or-coordinates fallback live in exactly one place; consumer adoption is a separate phase (the services are NOT modified by the PR that introduces this class).

Lives in processed-kafka-events (rather than a new module) because the resolved event_id feeds straight into DedupGuard.tryClaim(String, String, java.time.Instant, String, int, long) / DedupGuard.tryClaimSameTx(String, String, java.time.Instant, String, int, long) — it is dedup-adjacent, and consumers that use the guard already depend on this module.

Stateless and final — utility class, never instantiated.

  • Field Details

    • HEADER_EVENT_ID

      public static final String HEADER_EVENT_ID
      Kafka header carrying the producer-side event_id (the outbox-publisher's UUIDv7). Matches the HEADER_EVENT_ID = "event_id" constant duplicated across the five consumer services.
      See Also:
  • Method Details

    • resolveEventId

      public static String resolveEventId(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record)
      Resolves the dedup event_id for a consumed Kafka record.

      Canonical behaviour (extracted unchanged from the five consumer services' resolveEventId helpers):

      1. Read the "event_id" header via Headers.lastHeader(String) (the LAST value wins if the producer set the header more than once).
      2. If that header is present and its byte[] value is non-null, decode it as UTF-8 and trim it. If the result is non-blank, return it — this is the producer's intended, reprocessing-stable event id.
      3. Otherwise (header absent, null bytes, or empty/blank after trim) fall back to a deterministic Kafka-coordinates pseudo-id topic:partition:offset. Replays from the same offset stay deduped, which is the dominant duplicate path.

      The header bytes are decoded with StandardCharsets.UTF_8, matching the producer's encoding and every consumer copy.

      Parameters:
      record - the consumed Kafka record (must not be null); the key/value types are irrelevant to id resolution, hence the wildcard bound
      Returns:
      the producer's "event_id" header value when present and non-blank, else the topic:partition:offset fallback; never null or blank