Interface ProcessedKafkaEventRepository

All Superinterfaces:
org.springframework.data.repository.CrudRepository<ProcessedKafkaEvent,ProcessedKafkaEventId>, org.springframework.data.jpa.repository.JpaRepository<ProcessedKafkaEvent,ProcessedKafkaEventId>, org.springframework.data.repository.ListCrudRepository<ProcessedKafkaEvent,ProcessedKafkaEventId>, org.springframework.data.repository.ListPagingAndSortingRepository<ProcessedKafkaEvent,ProcessedKafkaEventId>, org.springframework.data.repository.PagingAndSortingRepository<ProcessedKafkaEvent,ProcessedKafkaEventId>, org.springframework.data.repository.query.QueryByExampleExecutor<ProcessedKafkaEvent>, org.springframework.data.repository.Repository<ProcessedKafkaEvent,ProcessedKafkaEventId>

public interface ProcessedKafkaEventRepository extends org.springframework.data.jpa.repository.JpaRepository<ProcessedKafkaEvent,ProcessedKafkaEventId>
Spring Data repository for ProcessedKafkaEvent.

The dedup contract is enforced at INSERT time via Postgres ON CONFLICT (consumer_scope_id, event_id, week_start) DO NOTHING. Calling CrudRepository.save(S) would issue a Hibernate SELECT-then-INSERT that races between concurrent consumer pods; this repository exposes the atomic native variant via insertOrNothing(java.lang.String, java.lang.String, java.time.LocalDate, java.time.Instant, java.lang.String, int, long).

The offset column is a reserved word in PostgreSQL and is quoted explicitly in the native SQL ("offset").

  • Method Summary

    Modifier and Type
    Method
    Description
    long
    Returns the count of dedup rows for the given consumer scope + week.
    int
    insertOrNothing(String consumerScopeId, String eventId, LocalDate weekStart, Instant firstSeenAt, String topic, int partition, long offset)
    Atomic INSERT-OR-NOTHING into processed_kafka_events.

    Methods inherited from interface org.springframework.data.repository.CrudRepository

    count, delete, deleteAll, deleteAll, deleteAllById, deleteById, existsById, findById, save

    Methods inherited from interface org.springframework.data.jpa.repository.JpaRepository

    deleteAllByIdInBatch, deleteAllInBatch, deleteAllInBatch, deleteInBatch, findAll, findAll, flush, getById, getOne, getReferenceById, saveAllAndFlush, saveAndFlush

    Methods inherited from interface org.springframework.data.repository.ListCrudRepository

    findAll, findAllById, saveAll

    Methods inherited from interface org.springframework.data.repository.ListPagingAndSortingRepository

    findAll

    Methods inherited from interface org.springframework.data.repository.PagingAndSortingRepository

    findAll

    Methods inherited from interface org.springframework.data.repository.query.QueryByExampleExecutor

    count, exists, findAll, findBy, findOne
  • Method Details

    • insertOrNothing

      @Modifying @Transactional @Query(value="INSERT INTO processed_kafka_events (consumer_scope_id, event_id, week_start, first_seen_at, topic, partition, \"offset\") VALUES (:consumerScopeId, :eventId, :weekStart, :firstSeenAt, :topic, :partition, :offset) ON CONFLICT (consumer_scope_id, event_id, week_start) DO NOTHING", nativeQuery=true) int insertOrNothing(@Param("consumerScopeId") String consumerScopeId, @Param("eventId") String eventId, @Param("weekStart") LocalDate weekStart, @Param("firstSeenAt") Instant firstSeenAt, @Param("topic") String topic, @Param("partition") int partition, @Param("offset") long offset)
      Atomic INSERT-OR-NOTHING into processed_kafka_events.

      Returns 1 when the row was newly inserted (this consumer won the race and should proceed with the message handler); returns 0 when an identical compound key already existed (this consumer is a straggler and should silently skip).

      Uses positional parameters via Spring Data's @Param binding (NOT SpEL) — keeps the native SQL auditable and avoids the SpEL-on- native-query rendering quirks that bite under nested entity binding.

      Parameters:
      consumerScopeId - composite-key component — logical consumer identity
      eventId - composite-key component — event UUID as text
      weekStart - composite-key component — Monday of the event's week
      firstSeenAt - wall-clock at first-insert (defaults to now() if null via the DDL column default)
      topic - origin Kafka topic (forensic)
      partition - origin Kafka partition (forensic)
      offset - origin Kafka offset (forensic)
      Returns:
      1 on first insert, 0 on conflict
    • countByIdConsumerScopeIdAndIdWeekStart

      long countByIdConsumerScopeIdAndIdWeekStart(String consumerScopeId, LocalDate weekStart)
      Returns the count of dedup rows for the given consumer scope + week. Polled by the im2be_dedup_active_partitions_total gauge for operational visibility into per-scope dedup-table size.
      Parameters:
      consumerScopeId - scope to count
      weekStart - week-aligned partition
      Returns:
      row count in the matching partition