feat(realtime): Kafka→Centrifugo notification relay (im2be.create.notification → user:<id>) #6
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "feat/kafka-notification-relay"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Header
realtime-service • Stage B slice-1 • PR-B Kafka→Centrifugo notification relay (ADR-0002 §3a Option 2)
TL;DR
NEEDS_REVIEW — realtime-service becomes the canonical producer of user-notification publishes to Centrifugo: a kafkajs consumer reads
NotificationEventJSON offim2be.create.notificationand server-publishes each touser:<recipientId>. Closes the last piece of the slice-1 notification→client mechanism (the publish source).Summary
Implements Option 2 of the §3a decision. Plain-JSON contract (no Avro/Confluent-wire-format):
JSON.parsethe message value →NotificationEvent(recipientIdUUID = target user). Publishes via the Centrifugo server API (POST /api/publish,apikeyauth) touser:<recipientId>(theusernamespace, matching subscribe_proxy). At-least-once: malformed→log+skip (commit, don't poison the partition); publish-fail→throw (no commit → redelivery). Resilient: relay start failure is non-fatal (proxy keeps serving); kafkajs handles reconnect; 5s publish timeout via AbortController. No PII logged (eventId/eventType/entityType only). kafkajs pinned2.2.4; global fetch (no HTTP-client dep).Findings (changes)
src/relay/centrifugo-publisher.tspublishToUserChannel; throws on transport/non-2xx/non-JSON/200-error-bodysrc/relay/notification-relay.tshandleNotificationMessage(testable core)src/config.tsKAFKA_BOOTSTRAP_SERVERS/NOTIFICATION_TOPIC/KAFKA_CONSUMER_GROUP/CENTRIFUGO_API_URL/CENTRIFUGO_API_KEY/NOTIFICATION_RELAY_ENABLED) +readBooleansrc/types.tsNotificationEvent+isNotificationEventguard (UUID-validates recipientId)src/main.tssrc/metrics.ts/src/otel.tstests/package.json/package-lock.jsonVerdict
NEEDS_REVIEW.
tsc --noEmitclean; 149/149 vitest (133 baseline + 16). Live relay e2e (inject NotificationEvent → client receives) is the parent's deploy-time verification. Decisions for review: payload omitstype+recipientId(recipient encoded in channel); permissive RFC-4122 UUID validation;fromBeginning:false(live-push, no backlog replay).Footer
realtime-service • PR R0 (author) • PR-B Kafka-relay • 2026-06-01
Make realtime-service the canonical producer of user-notification publishes to Centrifugo (ADR-0002 §3a Option 2, operator-chosen). Contract: consume NotificationEvent JSON messages (plain UTF-8/Jackson — no Avro, no Confluent wire-format, no schema registry) off the Kafka topic im2be.create.notification, and for each valid event publish to the target user's Centrifugo channel user:<recipientId> (the `user` namespace, matching subscribe_proxy's USER_CHANNEL_PREFIX) via the Centrifugo server HTTP API: POST {CENTRIFUGO_API_URL}/api/publish, header `Authorization: apikey <key>`, body {channel:"user:<recipientId>", data:{...}}. Delivery is at-least-once: kafkajs commits offsets after eachMessage resolves. A malformed message (non-JSON, non-notificationEvent, missing/non-UUID recipientId, empty value) is LOGGED + SKIPPED (offset committed — do not poison the partition). A publish failure (non-2xx, Centrifugo error body, or transport fault) THROWS, so the offset is NOT committed and kafkajs redelivers. Resilience: the relay is OPTIONAL. A start() failure (e.g. Kafka unreachable) is logged and swallowed so the Centrifugo proxy callbacks keep serving; kafkajs handles reconnect/retry internally. NOTIFICATION_RELAY_ENABLED=false disables it entirely (proxy-only mode). SIGTERM/SIGINT stop() the relay before the HTTP server drains. Config (src/config.ts, with readBoolean helper): KAFKA_BOOTSTRAP_SERVERS (localhost:9092), NOTIFICATION_TOPIC (im2be.create.notification), KAFKA_CONSUMER_GROUP (realtime-service-centrifugo-relay), CENTRIFUGO_API_URL (http://localhost:8000), CENTRIFUGO_API_KEY (""), NOTIFICATION_RELAY_ENABLED (true). New modules: src/relay/centrifugo-publisher.ts (injectable fetch, throws on failure), src/relay/notification-relay.ts (createNotificationRelay + testable handleNotificationMessage). NotificationEvent type + isNotificationEvent guard in src/types.ts. Relay metrics counter realtime_notification_relay_total {outcome=published|skipped_malformed|publish_failed}. OTEL span centrifugo.relay_publish. Logging discipline (rule 01/02): structured logs only; never log the message body or recipientId UUID — only eventId, eventType, entityType, outcome. Dependency: kafkajs 2.2.4 (pinned exact); uses global fetch (Node 22) for the HTTP publish — no HTTP-client dep added. Tests: +16 (133 → 149). typecheck clean.Show previous round
hib-pr-reviewer review — PR #6 (affinity-intelligence-rework/im2be-realtime-service)
Round 1 — head
5b4b1b796001, basemain, triggeropenedTL;DR: NEEDS_WORK — kept 7 findings (5 minor, 2 info): all 5 from A verified, all 3 from B verified, A5+B2 merged as one agreed info finding; 0 dropped.
Summary
Arbitration — PR #6 Kafka→Centrifugo relay
Actions taken: Read
notification-relay.ts(244 lines) andcentrifugo-publisher.ts(151 lines) in full; readmain.tslines 160-188 to verify the B1 leak path. No prior run history found in Memora (first arbitration). Memora run-summary persisted (memory 485).stop()happy-path disconnect unwrapped[]with no downstream guard${deps.apiUrl}/api/publishconfirmed; no normalizeSpanKind.PRODUCERwrong (agreed)run()throwsrelay = null; shutdown guardrelay !== nullnever callsstop()response.text()has no timeoutclearTimeoutat line 117 infinally,response.text()at line 128A5 vs B2 SpanKind fix direction: A recommends
SpanKind.CLIENT(primary latency is outbound HTTP); B recommendsSpanKind.CONSUMER(outer span represents Kafka consumption). Both positions have merit; both are fixes over the currentPRODUCER. Preserved as a single agreed info finding; author should consult OTel messaging semconv §5 to choose.B1 nuance:
stop()'s!runningbranch already guardsconsumer.disconnect()correctly — the gap is purely inmain.tssettingrelay = nullbeforestop()can be called. Option B from the finding (callrelay.stop()before nulling) is the minimal fix.Result: 7 kept (5 minor, 2 info), 0 dropped. All unique-to-one findings survived verification.
Blast Radius
The diff introduces a new Kafka consumer subsystem (kafkajs 2.2.4) and a Centrifugo HTTP publisher, both wired into
main.tsstartup/shutdown. It is purely additive to the existing proxy surface — the relay is optional and its failure is non-fatal to the HTTP server. Config, metrics, otel, and types are each extended but not modified. Risk is contained to the relay path itself, with the consumer-leak finding (B1) being the only cross-module concern.BLAST_SCORE: 4/10
Risk Indicators
publishToUserChannel,loadConfigpackage-lock.json,package.jsonCI status (head
5b4b1b796001)No CI checks reported for this commit.
Findings (7)
[MINOR] Happy-path
stop()leavesconsumer.disconnect()unwrapped — shutdown confirmation log can be lostsrc/relay/notification-relay.ts:239
The
!runningbranch (lines 226-237) wrapsconsumer.disconnect()in a try/catch that logs on failure and returns. The happy-path branch at line 239 makes the identical call without any guard:If
consumer.disconnect()throws, the error propagates out ofstop()intomain.ts's error-catch block (which logs onlyerr_name). The"notification relay stopped"confirmation log is never emitted, making graceful-shutdown observability inconsistent with the!runningbranch.Fix:
[MINOR] Kafka consumer connection leaked when
consumer.run()throws afterconsumer.connect()succeedssrc/relay/notification-relay.ts:207
Inside
start()(line 207), ifconsumer.connect()andconsumer.subscribe()succeed butconsumer.run()throws,runningstaysfalseandstart()propagates the error.main.tsline 179 then does:The shutdown closure's
if (relay !== null)check now fails —relay.stop()is never called. Althoughstop()already contains a!runningguard that correctly attemptsconsumer.disconnect(), it is unreachable because therelayreference has been nulled.The Kafka broker retains the group member, blocking consumer-group rebalance until the TCP keepalive times out.
Fix (main.ts):
This exercises the existing
!runningdisconnect guard instop()and keeps the fix self-contained.[MINOR] Empty broker array not guarded — kafkajs throws an opaque error instead of a clear
ConfigErrorsrc/relay/notification-relay.ts:180
Line 180:
If
KAFKA_BOOTSTRAP_SERVERSis set to only delimiters (e.g.","or" , ") the resulting array is[]. kafkajs will throw an internal error duringconsumer.connect(), caught bymain.tsand logged only aserr_name. The operator receives no indication that the env var is the root cause.Fix — add a guard in
loadConfig(src/config.ts) after the existing cross-field checks:[MINOR] Startup log omits whether
centrifugoApiKeyis set — silent misconfiguration risk in productionsrc/relay/notification-relay.ts:216
The relay startup log (lines 216-223) records
topic,group_id, andcentrifugo_api_urlbut nothing about the API key. WhenCENTRIFUGO_API_KEYis unset,config.centrifugoApiKeyis""(the explicit default) and every publish sendsAuthorization: apikey(bare prefix). Against an unauthenticated dev Centrifugo this passes silently; against a secured production instance every publish fails with 401/403 with no boot-time log entry pointing to the env-var root cause.Fix — add a
WARNat startup when the key is empty:Do NOT log the key value itself.
[MINOR] Trailing slash in
CENTRIFUGO_API_URLproduces double-slash URL…//api/publishsrc/relay/centrifugo-publisher.ts:91
Line 91:
The
CentrifugoPublisherDepsJSDoc says "no trailing slash" and.env.exampleships without one, butreadStringinconfig.tsdoes not strip trailing slashes. If an operator setsCENTRIFUGO_API_URL=http://centrifugo:8000/, the constructed URL becomeshttp://centrifugo:8000//api/publish. Some reverse-proxy configurations return 404 for the double-slash path, causing every publish to throw and the relay to all-retry — correct semantically but operationally confusing.Fix — normalise once at config load time:
Or strip at use-site in the publisher.
[INFO]
SpanKind.PRODUCERis semantically incorrect for a Kafka-consumer + HTTP-client spansrc/relay/notification-relay.ts:80
Line 80:
Both reviewers independently flagged this.
SpanKind.PRODUCERis reserved for spans that produce (write) messages to a messaging system. This span represents a Kafka consumer receipt followed by an outbound HTTP call to Centrifugo — neither of which is a messaging-system publish. UsingPRODUCERcauses APM tools (Jaeger, Grafana Tempo) and the OTel collector's messaging-topology views to mis-classify the relay.Reviewer A recommends
SpanKind.CLIENT(the primary latency contribution is the synchronous HTTP call to Centrifugo).Reviewer B recommends
SpanKind.CONSUMER(the outer span's lifecycle represents Kafka message consumption).Both are correct fixes. Per OTel messaging semconv §5, if one span covers both the Kafka receipt and the HTTP egress,
SpanKind.CLIENTis preferred to describe the synchronous-request nature; if the intent is to model the messaging leg only,SpanKind.CONSUMERis appropriate. Author should pick one and document the reasoning.[INFO]
response.text()body read has no timeout protection —AbortControllercleared before the body read beginssrc/relay/centrifugo-publisher.ts:117
The
AbortControllertimer is cleared in thefinallyblock at line 117-118, beforeresponse.text()is called at line 128:If Centrifugo sends HTTP response headers immediately but stalls on the body (overloaded server, large response),
response.text()hangs indefinitely. Because this executes insideeachMessage, the kafkajs run loop blocks: no offset commits, no new messages from that partition, until the process is killed.Practical risk is low for Centrifugo's small
{"result":{}}responses, but the structural gap would silently block the relay under a misbehaving instance.Fix — extend the
AbortController's live region to cover the body read:Verdict
NEEDS_WORK
hib-pr-reviewer • round 1 • 7 findings (5m/2i) • 2026-06-01T11:16:19.444Z → 2026-06-01T11:18:59.526Z • posted-as: pr-reviewer-bot • model: auto
R2 verdict findings (kept=7): (1) MINOR notification-relay.ts:238 — happy-path stop() disconnect unwrapped. Wrapped consumer.disconnect() in try/catch on the running=true branch so a throw is logged (err_name) and returns early instead of skipping the "notification relay stopped" log. Mirrors the existing !running guard. (2) MINOR main.ts:177 — consumer connection leak when run() throws after connect(). Added best-effort relay.stop() BEFORE relay=null in the start try/catch so the !running disconnect guard reaps the live Kafka group member now rather than at keepalive timeout. Existing error log unchanged. (3) MINOR config.ts — empty broker array not guarded. Added a relay-enabled cross-field validation: when NOTIFICATION_RELAY_ENABLED=true, an all-delimiter/whitespace KAFKA_BOOTSTRAP_SERVERS that collapses to zero brokers after split/trim/filter now throws ConfigError naming KAFKA_BOOTSTRAP_SERVERS as the root cause. Disabled relay still boots. (4) MINOR notification-relay.ts:216 — startup log omitted whether the API key is set. Added centrifugo_api_key_set: centrifugoApiKey.length > 0 to the "notification relay started" log (boolean only — never the key value, rule 01). (5) MINOR centrifugo-publisher.ts:91 — trailing slash in CENTRIFUGO_API_URL → //api/publish. Normalised via base = apiUrl.replace(/\/+$/, "") before building the URL; strips one or many trailing slashes. (6) INFO notification-relay.ts:80 — SpanKind.PRODUCER wrong for a consume+publish span. Changed to SpanKind.CONSUMER (trigger is a Kafka consume). Import already covers it. (7) INFO centrifugo-publisher.ts:117 — body text() read had no timeout (timer cleared in finally before the body read). Wrapped fetch + status check + body read in one try whose finally clears the timer, so the AbortController window now covers the body read too; signal still aborts a hung body. Error semantics unchanged (CentrifugoPublishError on any failure). Verification: - npm run typecheck → clean (tsc --noEmit, 0 errors) - npx vitest run → 153 passed (149 prior + 4 new: 2 config empty-broker enabled/disabled, 2 publisher trailing-slash single/multi)hib-pr-reviewer review — PR #6 (affinity-intelligence-rework/im2be-realtime-service)
Round 2 — head
ba27bf0731fb, basemain, triggersynchronizeTL;DR: CONDITIONAL_APPROVE — verified 2 unique-to-one findings (1 info, 1 minor); all 7 R1 findings confirmed resolved; no blocking issues.
Summary
Arbitration Round 2 — PR #6 im2be-realtime-service
Memora context: No prior run history found (both queries returned empty); this arbitration stores the canonical run-2 record.
Prior-round resolution: Both reviewers independently confirm all 7 R1 findings are resolved — stop() error handling, connection leak on run() throw, empty broker guard, startup log API-key flag, trailing-slash double-slash URL, SpanKind.CONSUMER, and response.text() timeout race.
Reconciliation of round-2 findings:
Finding A (info, src/types.ts:320) — unique to A, verified ✅. Read lines 320-332 of src/types.ts confirms the guard returns
x is NotificationEventafter checking onlycandidate["type"] === "notificationEvent"(line 325) and a UUID regex onrecipientId(lines 328-330). The six other required-by-interface fields (actorId,entityId,eventType,entityType,message,eventId) plusextraId/extraIdLongreceive no presence check. Code comment at lines 312-315 explicitly documents this as intentional leniency. Finding is accurate; kept as info.Finding B (minor, notification-relay.ts:112) — unique to B, verified ✅. Read lines 285-306 of src/types.ts confirms
extraId: string | null(line 295) andextraIdLong: number | null(line 297) are declared on theNotificationEventinterface. Read lines 108-119 of notification-relay.ts confirms the forwardeddataobject contains onlyeventType,entityType,message,eventId,actorId,entityId—extraIdandextraIdLongare absent. The JSDoc at types.ts lines 282-283 enumerates the forwarded fields without them, suggesting intentional exclusion, but neither that comment nor the relay comment at line 108 explains why they are dropped. Reviewer B's fix options are sound. Kept as minor.Kept 2 findings (0 agreed / 2 unique-to-one verified), dropped 0. No blocking issues.
Blast Radius
The PR introduces a Kafka→Centrifugo relay subsystem spanning 7 source files (config, types, metrics, otel, main, centrifugo-publisher, notification-relay). The relay is wired into the main shutdown sequence, so a relay lifecycle bug can affect HTTP server drain timing. The failure path is designed to degrade gracefully to proxy-only, but the new exported surface (
createNotificationRelay,NotificationRelay,NotificationEvent) is consumed by main.ts and the type guard is part of the public types module.BLAST_SCORE: 6/10
Risk Indicators
centrifugoApiKey (config read + Authorization header injection in centrifugo-publisher.ts),isNotificationEvent (UUID validation gate before channel-name interpolation),kafkaConsumerGroup (offset commit state / at-least-once delivery boundary)package-lock.json,package.jsonCI status (head
ba27bf0731fb)No CI checks reported for this commit.
Findings (2)
[INFO]
isNotificationEventonly validatestype+recipientId; remaining required fields typedstringbut unverified at runtimesrc/types.ts:320
The guard at line 320 returns
x is NotificationEventafter validating onlytype === "notificationEvent"(line 325) andrecipientIdas a UUID (lines 328-330). The six other fields declared asstringon the interface —actorId,entityId,eventType,entityType,message,eventId— plusextraId: string | nullandextraIdLong: number | nullreceive no runtime presence check. A malformed producer message that omits any of these fields passes the guard successfully.Current impact is contained: the code comment at lines 312-315 explicitly documents this intentional design choice (leniency on non-routing fields), and
handleNotificationMessagewrites them into aRecord<string, unknown>beforeJSON.stringifysilently drops anyundefined-valued keys.The latent risk is forward-looking: TypeScript callers downstream of the guard get a non-null
stringguarantee from the type system that the runtime does not honour. Any future code that reads e.g.parsed.actorIdand depends on it being a valid string will silently receiveundefinedfor messages from a non-conforming producer. Consider narrowing the return type to a partial shape that accurately reflects what is actually guaranteed at runtime, or adding presence checks for the forwarded fields.[MINOR]
extraIdandextraIdLongsilently dropped from Centrifugo payload — exclusion undocumentedsrc/relay/notification-relay.ts:112
NotificationEventdeclaresextraId: string | null(src/types.ts:295) andextraIdLong: number | null(src/types.ts:297). Thedataobject built at lines 112-119 forwards six of the eight non-routing fields but omits both.JSON.stringifysilently drops them with no runtime error, so Centrifugo subscribers receive events with these fields absent.The comment at lines 108-111 explains why
typeandrecipientIdare excluded; nothing explains the omission ofextraId/extraIdLong. The JSDoc at src/types.ts:282-283 enumerates only the forwarded fields, which implies intentional exclusion, but provides no rationale.If web/mobile Centrifugo subscribers use these fields (e.g. to deep-link to a specific comment or sub-entity), they will silently malfunction on any event where the producer populates them.
Fix — one of:
// extraId / extraIdLong are internal back-end correlation ids; front-end Centrifugo clients do not consume them per the front-end contract). Without that comment every future reader ofNotificationEventwill encounter the same ambiguity.Verdict
CONDITIONAL_APPROVE
hib-pr-reviewer • round 2 • 2 findings (1m/1i) • 2026-06-01T11:31:34.083Z → 2026-06-01T11:33:26.026Z • posted-as: pr-reviewer-bot • model: auto