fix(outbox): build dedicated raw-bytes producer for the relay (#332) #19
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "fix/outbox-bytes-producer-332"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Header
Project: im2be-platform-libs · Module: outbox-publisher · Issue: #332 (ADR-0015 error-event stream) · Type: fix (silent-failure)
TL;DR
BLOCKED→FIXED— the outbox relay's autoconfig injected aKafkaTemplate<byte[],byte[]>but no byte[] producer existed, so Spring wired the service's String/JSON default by generic erasure → every outbox row failed withSerializationException: Can't convert key of class [B to StringSerializer, silently breaking ADR-0014 delivery (and the ADR-0015 error-event stream on top of it).Summary
Scope:
OutboxAutoConfiguration+ its IT/context tests. The relay now builds its own raw-bytes producer instead of depending on an ambiently-present byte[]KafkaTemplate. Verified: full module build green, zero compile warnings (failOnWarning=true), the integration test rewired to reproduce the production wiring. Commitc5a26e3.Findings / Changes
outbox-publisher/.../OutboxAutoConfiguration.javabuildOutboxKafkaTemplate(KafkaProperties, ObjectProvider<SslBundles>)— derives props from the service'sspring.kafka.*viaKafkaProperties.buildProducerProperties(SslBundles), then forces key+valueByteArraySerializer,acks=all,enable.idempotence=true.outboxPublisher+outboxPollerWorker@Beanmethods now takeKafkaProperties+ObjectProvider<SslBundles>and call the helper, instead of injectingKafkaTemplate<byte[],byte[]>.application-it.propertiesbyte[]→ String + keepacks=1— closes the test gap that masked the prod bug; the IT now exercises the exact prod wiring.OutboxBackendSelectionTest.javaKafkaProperties(autoconfig no longer injects aKafkaTemplate); dropped the now-unused raw-mock@SuppressWarnings("unchecked").README.md@Bean).Why built inside the bean method (not a
@Bean)KafkaAutoConfiguration.kafkaTemplate/kafkaProducerFactoryare@ConditionalOnMissingBean. Contributing a competingKafkaTemplate/ProducerFactorybean would suppress the service's default producer and break every non-outbox Kafka send. Keeping the byte[] template private to the relay leaves the service default untouched.Why force (not
putIfAbsent) acks=all + idempotenceA durable error-event relay must never silently inherit a weaker
acksfrom a service's general-purpose producer — it owns its at-least-once guarantee. No-op for the 7 services (alreadyacks=all+ idempotent); load-bearing only against a weaker config (e.g. the IT'sacks=1). Forcing both together keeps them internally consistent (idempotence requiresacks=all).Verdict
READY— fixes a silent prod data-loss path with a regression test that now reproduces it. No public API change (constructors unchanged). Gating the deploy on the operator (image rebuild of the 6 local services).Risk indicators
KafkaPropertiesin context test.Footer
im2be-platform-libs • fix • #332 • files=4 (+81/-9) • build=SUCCESS warnings=0 tests=4IT+units(0 fail) • 2026-05-31
Show previous round
hib-pr-reviewer review — PR #19 (affinity-intelligence-rework/im2be-platform-libs)
Round 1 — head
c5a26e376601, basemain, triggeropenedTL;DR: NEEDS_WORK — kept 2 agreed findings (1 major, 1 minor), both independently confirmed by Read/Grep; no prior run history; first arbitration for this PR.
Summary
Reconciliation complete — first run for this PR.
No prior Memora run history found. Both reviewer A and reviewer B independently raised the same two findings against the same file (
OutboxAutoConfiguration.java). Code inspection at HEAD (c5a26e3) fully confirmed both:Major (line 396):
new DefaultKafkaProducerFactory<>(props)is instantiated inline, passed directly intonew KafkaTemplate<>(...), and never exposed as a Spring bean.DefaultKafkaProducerFactoryimplementsDisposableBeanspecifically to flush/close its internalKafkaProducerpool; because it is never managed by the application context itsdestroy()is never called. Agrepacross the entire outbox package (*.java) confirmed that neitherOutboxPublishernorOutboxPollerWorkerimplementDisposableBean,Closeable, or carry@PreDestroy, leaving no secondary cleanup path. Records buffered at shutdown (non-zerolinger.ms) may be dropped — a direct correctness gap for an at-least-once relay.Minor (lines 314 and 342):
buildOutboxKafkaTemplateis aprivate staticmethod with no state caching. BothoutboxPublisherandoutboxPollerWorker(both active by default —matchIfMissing = true) each invoke it independently, producing two separateDefaultKafkaProducerFactoryinstances with two independentKafkaProducerthread pools and broker TCP connections, for no isolation benefit.Both findings kept. Severity preserved as-is. Recommended fix: convert the static helper to an instance method, store the factory as a field on
OutboxAutoConfiguration(which IS a Spring singleton), and add@PreDestroyto callfactory.destroy()at context shutdown — this simultaneously addresses the lifecycle leak and eliminates the duplicate-factory problem.Blast Radius
The diff touches one auto-configuration class plus its test and properties files. The affected surface is limited to the outbox relay submodule, but
OutboxAutoConfigurationis the sole wiring point for all relay beans consumed by 7 downstream services, making the lifecycle leak a production-correctness concern on any service that hosts the outbox.BLAST_SCORE: 4/10
Risk Indicators
buildOutboxKafkaTemplate,OutboxPublisher,OutboxPollerWorkerCI status (head
c5a26e376601)Overall: ✗ failure
2 checks: 2 pending
Findings (2)
[MAJOR] DefaultKafkaProducerFactory is never destroyed — producer threads and connections leak on shutdown
outbox-publisher/src/main/java/com/aim2be/platform/outbox/OutboxAutoConfiguration.java:396
Line 396:
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));The
DefaultKafkaProducerFactoryis constructed inline and never held by any Spring-managed bean, so Spring'sDisposableBeancontract is never invoked on it at context shutdown.DefaultKafkaProducerFactoryimplementsDisposableBeanspecifically to flush and close its internalKafkaProducerpool; skippingdestroy()means:linger.ms > 0) are silently dropped at JVM exit — a correctness gap for an at-least-once relay during rolling deploys.OutboxPublisherandOutboxPollerWorkerdo not implementDisposableBean,Closeable, or carry@PreDestroy(confirmed by grep across the outbox package), so there is no secondary cleanup path.Fix: convert
buildOutboxKafkaTemplatefromprivate staticto a non-static instance method onOutboxAutoConfiguration(which is itself a Spring-managed singleton), cache the factory as an instance field, and add@PreDestroy:This also resolves the minor duplicate-factory finding.
[MINOR] buildOutboxKafkaTemplate called twice — two independent producer factories open doubled broker connections
outbox-publisher/src/main/java/com/aim2be/platform/outbox/OutboxAutoConfiguration.java:314
Lines 314 and 342 each call
buildOutboxKafkaTemplate(kafkaProperties, sslBundles)independently. The method isprivate staticwith no caching, so each invocation allocates a freshDefaultKafkaProducerFactory. BothoutboxPublisherandoutboxPollerWorkerare active by default (matchIfMissing = true), meaning at runtime the relay holds two separateKafkaProducerinstances with independent sender threads, metadata-fetch cycles, and TCP connections to the Kafka broker — for no isolation benefit, as both beans form a single logical relay.Resolved as a side-effect of the lifecycle-leak fix above: caching the factory as an instance field means both bean methods share one
DefaultKafkaProducerFactoryand one broker connection pool.Verdict
NEEDS_WORK
hib-pr-reviewer • round 1 • 2 findings (1M/1m) • 2026-05-31T15:25:32.968Z → 2026-05-31T15:26:48.138Z • posted-as: pr-reviewer-bot • model: auto
Show previous round
hib-pr-reviewer review — PR #19 (affinity-intelligence-rework/im2be-platform-libs)
Round 2 — head
1b60f3fe6cce, basemain, triggersynchronizeTL;DR: CONDITIONAL_APPROVE — kept 1 unique-to-A minor finding (verified at line 434: missing MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION cap under forced idempotence); both round-1 blocking findings confirmed resolved.
Summary
Arbitration summary — Round 2
Prior-round findings: Both round-1 findings (producer-factory resource leak, duplicate-factory construction) confirmed resolved by both reviewers. Not re-raised.
Unique-to-A (1 finding): Reviewer A flagged that
buildOutboxKafkaTemplateforcesenable.idempotence=true(line 434) but does not also capmax.in.flight.requests.per.connectionto ≤ 5. Verified viaReadof lines 425–438: the four forced knobs areKEY_SERIALIZER_CLASS_CONFIG,VALUE_SERIALIZER_CLASS_CONFIG,ACKS_CONFIG, andENABLE_IDEMPOTENCE_CONFIG—MAX_IN_FLIGHT_REQUESTS_PER_CONNECTIONis absent. A service-level override above 5 carried in viabuildProducerProperties()would survive unmodified and causeConfigExceptionat the first lazy-producer send. Finding is kept as minor.Unique-to-B: Reviewer B raised no new findings. Nothing to evaluate.
Net result: 1 agreed-or-kept finding, 0 dropped, verdict CONDITIONAL_APPROVE.
Blast Radius
Changes are tightly scoped to
OutboxAutoConfiguration(a single auto-configuration class), its companion test, and README documentation. The newbuildOutboxKafkaTemplatehelper is private and called only during single-threaded context refresh; no exported surface is altered. The blast radius is low: only services that importoutbox-publisherare affected, and the behavioral change (dedicated byte[] producer) is isolated from the service's defaultKafkaTemplate.BLAST_SCORE: 2/10
Risk Indicators
buildOutboxKafkaTemplate,closeOutboxProducerFactoryCI status (head
1b60f3fe6cce)Overall: ✗ failure
2 checks: 2 pending
Findings (1)
[MINOR] Forcing
enable.idempotence=truewithout cappingmax.in.flight.requests.per.connection≤ 5 can produce a silentConfigExceptionat first sendoutbox-publisher/src/main/java/com/aim2be/platform/outbox/OutboxAutoConfiguration.java:434
Lines 431–434 force four producer knobs (
ByteArraySerializer× 2,acks=all,enable.idempotence=true), but omitMAX_IN_FLIGHT_REQUESTS_PER_CONNECTION. The props map is first populated bykafkaProperties.buildProducerProperties(sslBundles.getIfAvailable())(line 430), which faithfully copies everyspring.kafka.producer.properties.*override the service has declared. If a future adopter setsspring.kafka.producer.properties.max.in.flight.requests.per.connectionabove 5 for throughput (the Kafka default is 5, so this requires a deliberate override), that value will be present inpropsand will not be overwritten. Kafka enforces the ≤ 5 constraint eagerly duringKafkaProducerconstruction; becauseDefaultKafkaProducerFactoryis lazy, thisConfigExceptionsurfaces only at the first enqueue attempt — not at application startup — making it hard to diagnose.The 7 existing services are unaffected (they already configure idempotence and thus already comply), but the risk is structural: every other forced durability knob here has a corresponding explicit override; this one does not.
Fix: add
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5)alongside the other forced knobs. 5 is Kafka's maximum permissible value under idempotence and preserves ordering guarantees consistent with the relay's at-least-once contract. Alternatively, useprops.merge(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5, (existing, cap) -> Math.min((Integer) existing, (Integer) cap))if preserving compliant values already set by the service is desirable — but the simpler forced-5 approach is consistent with howacksandidempotenceare handled here.Verdict
CONDITIONAL_APPROVE
hib-pr-reviewer • round 2 • 1 finding (1m) • 2026-05-31T15:32:46.264Z → 2026-05-31T15:34:12.020Z • posted-as: pr-reviewer-bot • model: auto
hib-pr-reviewer review — PR #19 (affinity-intelligence-rework/im2be-platform-libs)
Round 3 — head
a3e09ff0a9db, basemain, triggersynchronizeTL;DR: NO_NEW_FINDINGS — No new findings this round.
Summary
[quorum-converged] A=0 = B=0. All three prior-round findings are resolved. The full file confirms:
Round-1 factory-destroy leak → closed.
@PreDestroy closeOutboxProducerFactory()(lines 165–170) guards against null and callsoutboxProducerFactory.destroy(), which flushes buffered records and releases the sender thread + broker TCP connections at context shutdown.Round-1 doubled connections → closed.
buildOutboxKafkaTemplate(lines 427–448) gates factory creation behindif (outboxProducerFactory == null)and caches the result in the field. BothoutboxPublisher(line 345) andoutboxPollerWorker(line 373) receive distinctKafkaTemplatewrapper instances, but they share a single underlyingDefaultKafkaProducerFactoryand therefore a single broker connection pool.Round-2 idempotence
ConfigExceptionrisk → closed. Line 444 now pinsMAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5(Kafka's idempotence-compatible maximum), consistent with the forcedenable.idempotence=trueandacks=all. Kafka'sAbstractConfigcorrectly handles theIntegerandBooleanliteral types used for these overrides.Test coverage is sound:
OutboxBackendSelectionTestreplaces theKafkaTemplatemock with a realKafkaPropertiesbean (KafkaProperties::new) — construction ofDefaultKafkaProducerFactoryis deferred to first send, so no broker is required during the gate tests.application-it.propertiesnow deliberately configures String serializers +acks=1(mirroring the 7 production services), proving that the relay's ownbuildOutboxKafkaTemplateoverrides the service-default config and that the pre-fix masking (byte-array IT defaults hiding theSerializationException) cannot recur.No new actionable findings.
CI status (head
a3e09ff0a9db)Overall: ✗ failure
2 checks: 2 pending
Findings
No new findings this round.
Quorum converged on empty findings (A + B both returned 0).
Verdict
NO_NEW_FINDINGS
hib-pr-reviewer • round 3 • 0 findings • 2026-05-31T15:35:50.957Z → 2026-05-31T15:38:31.231Z • posted-as: pr-reviewer-bot • model: auto • [bookkeeping fallback]