Interface ProcessedKafkaEventRepository
- All Superinterfaces:
org.springframework.data.repository.CrudRepository<ProcessedKafkaEvent,,ProcessedKafkaEventId> org.springframework.data.jpa.repository.JpaRepository<ProcessedKafkaEvent,,ProcessedKafkaEventId> org.springframework.data.repository.ListCrudRepository<ProcessedKafkaEvent,,ProcessedKafkaEventId> org.springframework.data.repository.ListPagingAndSortingRepository<ProcessedKafkaEvent,,ProcessedKafkaEventId> org.springframework.data.repository.PagingAndSortingRepository<ProcessedKafkaEvent,,ProcessedKafkaEventId> org.springframework.data.repository.query.QueryByExampleExecutor<ProcessedKafkaEvent>,org.springframework.data.repository.Repository<ProcessedKafkaEvent,ProcessedKafkaEventId>
ProcessedKafkaEvent.
The dedup contract is enforced at INSERT time via Postgres
ON CONFLICT (consumer_scope_id, event_id, week_start) DO NOTHING.
Calling CrudRepository.save(S) would issue a Hibernate SELECT-then-INSERT
that races between concurrent consumer pods; this repository exposes the
atomic native variant via insertOrNothing(java.lang.String, java.lang.String, java.time.LocalDate, java.time.Instant, java.lang.String, int, long).
The offset column is a reserved word in PostgreSQL and is
quoted explicitly in the native SQL ("offset").
-
Method Summary
Modifier and TypeMethodDescriptionlongcountByIdConsumerScopeIdAndIdWeekStart(String consumerScopeId, LocalDate weekStart) Returns the count of dedup rows for the given consumer scope + week.intinsertOrNothing(String consumerScopeId, String eventId, LocalDate weekStart, Instant firstSeenAt, String topic, int partition, long offset) Atomic INSERT-OR-NOTHING intoprocessed_kafka_events.Methods inherited from interface org.springframework.data.repository.CrudRepository
count, delete, deleteAll, deleteAll, deleteAllById, deleteById, existsById, findById, saveMethods inherited from interface org.springframework.data.jpa.repository.JpaRepository
deleteAllByIdInBatch, deleteAllInBatch, deleteAllInBatch, deleteInBatch, findAll, findAll, flush, getById, getOne, getReferenceById, saveAllAndFlush, saveAndFlushMethods inherited from interface org.springframework.data.repository.ListCrudRepository
findAll, findAllById, saveAllMethods inherited from interface org.springframework.data.repository.ListPagingAndSortingRepository
findAllMethods inherited from interface org.springframework.data.repository.PagingAndSortingRepository
findAllMethods inherited from interface org.springframework.data.repository.query.QueryByExampleExecutor
count, exists, findAll, findBy, findOne
-
Method Details
-
insertOrNothing
@Modifying @Transactional @Query(value="INSERT INTO processed_kafka_events (consumer_scope_id, event_id, week_start, first_seen_at, topic, partition, \"offset\") VALUES (:consumerScopeId, :eventId, :weekStart, :firstSeenAt, :topic, :partition, :offset) ON CONFLICT (consumer_scope_id, event_id, week_start) DO NOTHING", nativeQuery=true) int insertOrNothing(@Param("consumerScopeId") String consumerScopeId, @Param("eventId") String eventId, @Param("weekStart") LocalDate weekStart, @Param("firstSeenAt") Instant firstSeenAt, @Param("topic") String topic, @Param("partition") int partition, @Param("offset") long offset) Atomic INSERT-OR-NOTHING intoprocessed_kafka_events.Returns
1when the row was newly inserted (this consumer won the race and should proceed with the message handler); returns0when an identical compound key already existed (this consumer is a straggler and should silently skip).Uses positional parameters via Spring Data's
@Parambinding (NOT SpEL) — keeps the native SQL auditable and avoids the SpEL-on- native-query rendering quirks that bite under nested entity binding.- Parameters:
consumerScopeId- composite-key component — logical consumer identityeventId- composite-key component — event UUID as textweekStart- composite-key component — Monday of the event's weekfirstSeenAt- wall-clock at first-insert (defaults tonow()ifnullvia the DDL column default)topic- origin Kafka topic (forensic)partition- origin Kafka partition (forensic)offset- origin Kafka offset (forensic)- Returns:
1on first insert,0on conflict
-
countByIdConsumerScopeIdAndIdWeekStart
Returns the count of dedup rows for the given consumer scope + week. Polled by theim2be_dedup_active_partitions_totalgauge for operational visibility into per-scope dedup-table size.- Parameters:
consumerScopeId- scope to countweekStart- week-aligned partition- Returns:
- row count in the matching partition
-