feat(realtime): Kafka→Centrifugo notification relay (im2be.create.notification → user:<id>) #6

Merged
hibryda merged 2 commits from feat/kafka-notification-relay into main 2026-06-01 13:42:08 +02:00
Owner

Header

realtime-service • Stage B slice-1 • PR-B Kafka→Centrifugo notification relay (ADR-0002 §3a Option 2)

TL;DR

NEEDS_REVIEW — realtime-service becomes the canonical producer of user-notification publishes to Centrifugo: a kafkajs consumer reads NotificationEvent JSON off im2be.create.notification and server-publishes each to user:<recipientId>. Closes the last piece of the slice-1 notification→client mechanism (the publish source).

Summary

Implements Option 2 of the §3a decision. Plain-JSON contract (no Avro/Confluent-wire-format): JSON.parse the message value → NotificationEvent (recipientId UUID = target user). Publishes via the Centrifugo server API (POST /api/publish, apikey auth) to user:<recipientId> (the user namespace, matching subscribe_proxy). At-least-once: malformed→log+skip (commit, don't poison the partition); publish-fail→throw (no commit → redelivery). Resilient: relay start failure is non-fatal (proxy keeps serving); kafkajs handles reconnect; 5s publish timeout via AbortController. No PII logged (eventId/eventType/entityType only). kafkajs pinned 2.2.4; global fetch (no HTTP-client dep).

Findings (changes)

File Change
src/relay/centrifugo-publisher.ts NEW — publishToUserChannel; throws on transport/non-2xx/non-JSON/200-error-body
src/relay/notification-relay.ts NEW — kafkajs consumer + handleNotificationMessage (testable core)
src/config.ts +6 env fields (KAFKA_BOOTSTRAP_SERVERS/NOTIFICATION_TOPIC/KAFKA_CONSUMER_GROUP/CENTRIFUGO_API_URL/CENTRIFUGO_API_KEY/NOTIFICATION_RELAY_ENABLED) + readBoolean
src/types.ts NotificationEvent + isNotificationEvent guard (UUID-validates recipientId)
src/main.ts start relay after listen (non-fatal); stop first on SIGTERM/SIGINT
src/metrics.ts / src/otel.ts relay counter + span
tests/ +16 tests (publisher 6, relay 7, config 3)
package.json/package-lock.json kafkajs 2.2.4 (exact)

Verdict

NEEDS_REVIEW. tsc --noEmit clean; 149/149 vitest (133 baseline + 16). Live relay e2e (inject NotificationEvent → client receives) is the parent's deploy-time verification. Decisions for review: payload omits type+recipientId (recipient encoded in channel); permissive RFC-4122 UUID validation; fromBeginning:false (live-push, no backlog replay).

realtime-service • PR R0 (author) • PR-B Kafka-relay • 2026-06-01

## Header realtime-service • Stage B slice-1 • PR-B Kafka→Centrifugo notification relay (ADR-0002 §3a Option 2) ## TL;DR **NEEDS_REVIEW** — realtime-service becomes the canonical producer of user-notification publishes to Centrifugo: a kafkajs consumer reads `NotificationEvent` JSON off `im2be.create.notification` and server-publishes each to `user:<recipientId>`. Closes the last piece of the slice-1 notification→client mechanism (the publish *source*). ## Summary Implements Option 2 of the §3a decision. Plain-JSON contract (no Avro/Confluent-wire-format): `JSON.parse` the message value → `NotificationEvent` (`recipientId` UUID = target user). Publishes via the Centrifugo server API (`POST /api/publish`, `apikey` auth) to `user:<recipientId>` (the `user` namespace, matching subscribe_proxy). At-least-once: malformed→log+skip (commit, don't poison the partition); publish-fail→throw (no commit → redelivery). Resilient: relay start failure is non-fatal (proxy keeps serving); kafkajs handles reconnect; 5s publish timeout via AbortController. No PII logged (eventId/eventType/entityType only). kafkajs pinned `2.2.4`; global fetch (no HTTP-client dep). ## Findings (changes) | File | Change | |---|---| | `src/relay/centrifugo-publisher.ts` | NEW — `publishToUserChannel`; throws on transport/non-2xx/non-JSON/200-error-body | | `src/relay/notification-relay.ts` | NEW — kafkajs consumer + `handleNotificationMessage` (testable core) | | `src/config.ts` | +6 env fields (`KAFKA_BOOTSTRAP_SERVERS`/`NOTIFICATION_TOPIC`/`KAFKA_CONSUMER_GROUP`/`CENTRIFUGO_API_URL`/`CENTRIFUGO_API_KEY`/`NOTIFICATION_RELAY_ENABLED`) + `readBoolean` | | `src/types.ts` | `NotificationEvent` + `isNotificationEvent` guard (UUID-validates recipientId) | | `src/main.ts` | start relay after listen (non-fatal); stop first on SIGTERM/SIGINT | | `src/metrics.ts` / `src/otel.ts` | relay counter + span | | `tests/` | +16 tests (publisher 6, relay 7, config 3) | | `package.json`/`package-lock.json` | kafkajs 2.2.4 (exact) | ## Verdict **NEEDS_REVIEW.** `tsc --noEmit` clean; **149/149 vitest** (133 baseline + 16). Live relay e2e (inject NotificationEvent → client receives) is the parent's deploy-time verification. Decisions for review: payload omits `type`+`recipientId` (recipient encoded in channel); permissive RFC-4122 UUID validation; `fromBeginning:false` (live-push, no backlog replay). ## Footer realtime-service • PR R0 (author) • PR-B Kafka-relay • 2026-06-01
Make realtime-service the canonical producer of user-notification publishes
to Centrifugo (ADR-0002 §3a Option 2, operator-chosen).

Contract: consume NotificationEvent JSON messages (plain UTF-8/Jackson — no
Avro, no Confluent wire-format, no schema registry) off the Kafka topic
im2be.create.notification, and for each valid event publish to the target
user's Centrifugo channel user:<recipientId> (the `user` namespace, matching
subscribe_proxy's USER_CHANNEL_PREFIX) via the Centrifugo server HTTP API:
POST {CENTRIFUGO_API_URL}/api/publish, header `Authorization: apikey <key>`,
body {channel:"user:<recipientId>", data:{...}}.

Delivery is at-least-once: kafkajs commits offsets after eachMessage resolves.
A malformed message (non-JSON, non-notificationEvent, missing/non-UUID
recipientId, empty value) is LOGGED + SKIPPED (offset committed — do not
poison the partition). A publish failure (non-2xx, Centrifugo error body, or
transport fault) THROWS, so the offset is NOT committed and kafkajs
redelivers.

Resilience: the relay is OPTIONAL. A start() failure (e.g. Kafka unreachable)
is logged and swallowed so the Centrifugo proxy callbacks keep serving;
kafkajs handles reconnect/retry internally. NOTIFICATION_RELAY_ENABLED=false
disables it entirely (proxy-only mode). SIGTERM/SIGINT stop() the relay before
the HTTP server drains.

Config (src/config.ts, with readBoolean helper): KAFKA_BOOTSTRAP_SERVERS
(localhost:9092), NOTIFICATION_TOPIC (im2be.create.notification),
KAFKA_CONSUMER_GROUP (realtime-service-centrifugo-relay), CENTRIFUGO_API_URL
(http://localhost:8000), CENTRIFUGO_API_KEY (""), NOTIFICATION_RELAY_ENABLED
(true).

New modules: src/relay/centrifugo-publisher.ts (injectable fetch, throws on
failure), src/relay/notification-relay.ts (createNotificationRelay +
testable handleNotificationMessage). NotificationEvent type + isNotificationEvent
guard in src/types.ts. Relay metrics counter realtime_notification_relay_total
{outcome=published|skipped_malformed|publish_failed}. OTEL span
centrifugo.relay_publish.

Logging discipline (rule 01/02): structured logs only; never log the message
body or recipientId UUID — only eventId, eventType, entityType, outcome.

Dependency: kafkajs 2.2.4 (pinned exact); uses global fetch (Node 22) for the
HTTP publish — no HTTP-client dep added.

Tests: +16 (133 → 149). typecheck clean.

Superseded by round 2.

Show previous round

hib-pr-reviewer review — PR #6 (affinity-intelligence-rework/im2be-realtime-service)

Round 1 — head 5b4b1b796001, base main, trigger opened

TL;DR: NEEDS_WORK — kept 7 findings (5 minor, 2 info): all 5 from A verified, all 3 from B verified, A5+B2 merged as one agreed info finding; 0 dropped.

Summary

Arbitration — PR #6 Kafka→Centrifugo relay

Actions taken: Read notification-relay.ts (244 lines) and centrifugo-publisher.ts (151 lines) in full; read main.ts lines 160-188 to verify the B1 leak path. No prior run history found in Memora (first arbitration). Memora run-summary persisted (memory 485).

Finding Source Verified? Decision
A1 — stop() happy-path disconnect unwrapped A only lines 238-240 confirm no try/catch KEEP
A2 — empty broker array not guarded A only line 180 filter can produce [] with no downstream guard KEEP
A3 — no startup WARN for empty API key A only lines 216-223 log block confirmed; no key-empty guard KEEP
A4 — trailing slash → double-slash URL A only line 91 ${deps.apiUrl}/api/publish confirmed; no normalize KEEP
A5/B2 — SpanKind.PRODUCER wrong (agreed) BOTH line 80 confirmed KEEP (info)
B1 — consumer leaked when run() throws B only main.ts 179 sets relay = null; shutdown guard relay !== null never calls stop() KEEP
B3 — response.text() has no timeout B only clearTimeout at line 117 in finally, response.text() at line 128 KEEP (info)

A5 vs B2 SpanKind fix direction: A recommends SpanKind.CLIENT (primary latency is outbound HTTP); B recommends SpanKind.CONSUMER (outer span represents Kafka consumption). Both positions have merit; both are fixes over the current PRODUCER. Preserved as a single agreed info finding; author should consult OTel messaging semconv §5 to choose.

B1 nuance: stop()'s !running branch already guards consumer.disconnect() correctly — the gap is purely in main.ts setting relay = null before stop() can be called. Option B from the finding (call relay.stop() before nulling) is the minimal fix.

Result: 7 kept (5 minor, 2 info), 0 dropped. All unique-to-one findings survived verification.

Blast Radius

The diff introduces a new Kafka consumer subsystem (kafkajs 2.2.4) and a Centrifugo HTTP publisher, both wired into main.ts startup/shutdown. It is purely additive to the existing proxy surface — the relay is optional and its failure is non-fatal to the HTTP server. Config, metrics, otel, and types are each extended but not modified. Risk is contained to the relay path itself, with the consumer-leak finding (B1) being the only cross-module concern.

BLAST_SCORE: 4/10

Risk Indicators

Indicator Value
Sensitive functions publishToUserChannel, loadConfig
Migration touched
Test delta +292 / -0 lines in test files
Dependency changes package-lock.json, package.json

CI status (head 5b4b1b796001)

No CI checks reported for this commit.

Findings (7)

[MINOR] Happy-path stop() leaves consumer.disconnect() unwrapped — shutdown confirmation log can be lost

src/relay/notification-relay.ts:239

The !running branch (lines 226-237) wraps consumer.disconnect() in a try/catch that logs on failure and returns. The happy-path branch at line 239 makes the identical call without any guard:

running = false;
await consumer.disconnect();   // line 239 — no try/catch
deps.logger.info({}, "notification relay stopped");

If consumer.disconnect() throws, the error propagates out of stop() into main.ts's error-catch block (which logs only err_name). The "notification relay stopped" confirmation log is never emitted, making graceful-shutdown observability inconsistent with the !running branch.

Fix:

running = false;
try {
  await consumer.disconnect();
} catch (err) {
  deps.logger.warn(
    { err_name: err instanceof Error ? err.name : "UnknownError" },
    "notification relay disconnect failed",
  );
  return;
}
deps.logger.info({}, "notification relay stopped");

[MINOR] Kafka consumer connection leaked when consumer.run() throws after consumer.connect() succeeds

src/relay/notification-relay.ts:207

Inside start() (line 207), if consumer.connect() and consumer.subscribe() succeed but consumer.run() throws, running stays false and start() propagates the error.

main.ts line 179 then does:

} catch (err) {
  relay = null;   // reference dropped

The shutdown closure's if (relay !== null) check now fails — relay.stop() is never called. Although stop() already contains a !running guard that correctly attempts consumer.disconnect(), it is unreachable because the relay reference has been nulled.

The Kafka broker retains the group member, blocking consumer-group rebalance until the TCP keepalive times out.

Fix (main.ts):

try {
  await relay.start();
} catch (err) {
  try { await relay.stop(); } catch { /* best-effort */ }
  relay = null;
  ...
}

This exercises the existing !running disconnect guard in stop() and keeps the fix self-contained.

[MINOR] Empty broker array not guarded — kafkajs throws an opaque error instead of a clear ConfigError

src/relay/notification-relay.ts:180

Line 180:

brokers: config.kafkaBootstrapServers.split(",").map((b) => b.trim()).filter((b) => b.length > 0),

If KAFKA_BOOTSTRAP_SERVERS is set to only delimiters (e.g. "," or " , ") the resulting array is []. kafkajs will throw an internal error during consumer.connect(), caught by main.ts and logged only as err_name. The operator receives no indication that the env var is the root cause.

Fix — add a guard in loadConfig (src/config.ts) after the existing cross-field checks:

if (config.notificationRelayEnabled) {
  const brokers = config.kafkaBootstrapServers
    .split(",").map(b => b.trim()).filter(Boolean);
  if (brokers.length === 0) {
    throw new ConfigError(
      "KAFKA_BOOTSTRAP_SERVERS must contain at least one broker address"
    );
  }
}

[MINOR] Startup log omits whether centrifugoApiKey is set — silent misconfiguration risk in production

src/relay/notification-relay.ts:216

The relay startup log (lines 216-223) records topic, group_id, and centrifugo_api_url but nothing about the API key. When CENTRIFUGO_API_KEY is unset, config.centrifugoApiKey is "" (the explicit default) and every publish sends Authorization: apikey (bare prefix). Against an unauthenticated dev Centrifugo this passes silently; against a secured production instance every publish fails with 401/403 with no boot-time log entry pointing to the env-var root cause.

Fix — add a WARN at startup when the key is empty:

if (config.centrifugoApiKey === "") {
  deps.logger.warn(
    { centrifugo_api_url: config.centrifugoApiUrl },
    "CENTRIFUGO_API_KEY is empty — unauthenticated publishes (local dev only)",
  );
}

Do NOT log the key value itself.

[MINOR] Trailing slash in CENTRIFUGO_API_URL produces double-slash URL …//api/publish

src/relay/centrifugo-publisher.ts:91

Line 91:

const url = `${deps.apiUrl}/api/publish`;

The CentrifugoPublisherDeps JSDoc says "no trailing slash" and .env.example ships without one, but readString in config.ts does not strip trailing slashes. If an operator sets CENTRIFUGO_API_URL=http://centrifugo:8000/, the constructed URL becomes http://centrifugo:8000//api/publish. Some reverse-proxy configurations return 404 for the double-slash path, causing every publish to throw and the relay to all-retry — correct semantically but operationally confusing.

Fix — normalise once at config load time:

centrifugoApiUrl: readString(env, "CENTRIFUGO_API_URL", "http://localhost:8000").replace(/\/$/, ""),

Or strip at use-site in the publisher.

[INFO] SpanKind.PRODUCER is semantically incorrect for a Kafka-consumer + HTTP-client span

src/relay/notification-relay.ts:80

Line 80:

const span = getTracer().startSpan(SPAN_NAME_RELAY, { kind: SpanKind.PRODUCER });

Both reviewers independently flagged this. SpanKind.PRODUCER is reserved for spans that produce (write) messages to a messaging system. This span represents a Kafka consumer receipt followed by an outbound HTTP call to Centrifugo — neither of which is a messaging-system publish. Using PRODUCER causes APM tools (Jaeger, Grafana Tempo) and the OTel collector's messaging-topology views to mis-classify the relay.

Reviewer A recommends SpanKind.CLIENT (the primary latency contribution is the synchronous HTTP call to Centrifugo).
Reviewer B recommends SpanKind.CONSUMER (the outer span's lifecycle represents Kafka message consumption).

Both are correct fixes. Per OTel messaging semconv §5, if one span covers both the Kafka receipt and the HTTP egress, SpanKind.CLIENT is preferred to describe the synchronous-request nature; if the intent is to model the messaging leg only, SpanKind.CONSUMER is appropriate. Author should pick one and document the reasoning.

[INFO] response.text() body read has no timeout protection — AbortController cleared before the body read begins

src/relay/centrifugo-publisher.ts:117

The AbortController timer is cleared in the finally block at line 117-118, before response.text() is called at line 128:

} finally {
  clearTimeout(timer);          // timer gone — line 117-118
}
// ...
bodyText = await response.text();  // no timeout — line 128

If Centrifugo sends HTTP response headers immediately but stalls on the body (overloaded server, large response), response.text() hangs indefinitely. Because this executes inside eachMessage, the kafkajs run loop blocks: no offset commits, no new messages from that partition, until the process is killed.

Practical risk is low for Centrifugo's small {"result":{}} responses, but the structural gap would silently block the relay under a misbehaving instance.

Fix — extend the AbortController's live region to cover the body read:

try {
  response = await fetchFn(url, { method: "POST", headers, body: payload, signal: controller.signal });
  if (!response.ok) { throw new CentrifugoPublishError(`centrifugo publish non-2xx status: ${response.status}`); }
  bodyText = await response.text();  // now also guarded
} catch (err) {
  const reason = err instanceof Error ? err.name : "UnknownError";
  throw new CentrifugoPublishError(`centrifugo publish error: ${reason}`);
} finally {
  clearTimeout(timer);
}

Verdict

NEEDS_WORK


hib-pr-reviewer • round 1 • 7 findings (5m/2i) • 2026-06-01T11:16:19.444Z → 2026-06-01T11:18:59.526Z • posted-as: pr-reviewer-bot • model: auto

<!-- hib-pr-reviewer collapsed --> > _Superseded by round 2._ <details> <summary>Show previous round</summary> <!-- hib-pr-reviewer round:1 --> ## hib-pr-reviewer review — PR #6 (affinity-intelligence-rework/im2be-realtime-service) **Round 1** — head `5b4b1b796001`, base `main`, trigger `opened` **TL;DR:** NEEDS_WORK — kept 7 findings (5 minor, 2 info): all 5 from A verified, all 3 from B verified, A5+B2 merged as one agreed info finding; 0 dropped. ### Summary ## Arbitration — PR #6 Kafka→Centrifugo relay **Actions taken:** Read `notification-relay.ts` (244 lines) and `centrifugo-publisher.ts` (151 lines) in full; read `main.ts` lines 160-188 to verify the B1 leak path. No prior run history found in Memora (first arbitration). Memora run-summary persisted (memory 485). | Finding | Source | Verified? | Decision | |---------|--------|-----------|----------| | A1 — `stop()` happy-path disconnect unwrapped | A only | ✅ lines 238-240 confirm no try/catch | KEEP | | A2 — empty broker array not guarded | A only | ✅ line 180 filter can produce `[]` with no downstream guard | KEEP | | A3 — no startup WARN for empty API key | A only | ✅ lines 216-223 log block confirmed; no key-empty guard | KEEP | | A4 — trailing slash → double-slash URL | A only | ✅ line 91 `${deps.apiUrl}/api/publish` confirmed; no normalize | KEEP | | A5/B2 — `SpanKind.PRODUCER` wrong (agreed) | BOTH | ✅ line 80 confirmed | KEEP (info) | | B1 — consumer leaked when `run()` throws | B only | ✅ main.ts 179 sets `relay = null`; shutdown guard `relay !== null` never calls `stop()` | KEEP | | B3 — `response.text()` has no timeout | B only | ✅ `clearTimeout` at line 117 in `finally`, `response.text()` at line 128 | KEEP (info) | **A5 vs B2 SpanKind fix direction:** A recommends `SpanKind.CLIENT` (primary latency is outbound HTTP); B recommends `SpanKind.CONSUMER` (outer span represents Kafka consumption). Both positions have merit; both are fixes over the current `PRODUCER`. Preserved as a single agreed info finding; author should consult OTel messaging semconv §5 to choose. **B1 nuance:** `stop()`'s `!running` branch already guards `consumer.disconnect()` correctly — the gap is purely in `main.ts` setting `relay = null` before `stop()` can be called. Option B from the finding (call `relay.stop()` before nulling) is the minimal fix. Result: **7 kept** (5 minor, 2 info), 0 dropped. All unique-to-one findings survived verification. ### Blast Radius The diff introduces a new Kafka consumer subsystem (kafkajs 2.2.4) and a Centrifugo HTTP publisher, both wired into `main.ts` startup/shutdown. It is purely additive to the existing proxy surface — the relay is optional and its failure is non-fatal to the HTTP server. Config, metrics, otel, and types are each extended but not modified. Risk is contained to the relay path itself, with the consumer-leak finding (B1) being the only cross-module concern. **BLAST_SCORE: 4/10** ### Risk Indicators | Indicator | Value | |---|---| | Sensitive functions | `publishToUserChannel`, `loadConfig` | | Migration touched | — | | Test delta | +292 / -0 lines in test files | | Dependency changes | `package-lock.json`, `package.json` | ### CI status (head `5b4b1b796001`) _No CI checks reported for this commit._ ### Findings (7) #### **[MINOR]** Happy-path `stop()` leaves `consumer.disconnect()` unwrapped — shutdown confirmation log can be lost _src/relay/notification-relay.ts:239_ The `!running` branch (lines 226-237) wraps `consumer.disconnect()` in a try/catch that logs on failure and returns. The happy-path branch at line 239 makes the identical call without any guard: ```ts running = false; await consumer.disconnect(); // line 239 — no try/catch deps.logger.info({}, "notification relay stopped"); ``` If `consumer.disconnect()` throws, the error propagates out of `stop()` into `main.ts`'s error-catch block (which logs only `err_name`). The `"notification relay stopped"` confirmation log is never emitted, making graceful-shutdown observability inconsistent with the `!running` branch. **Fix:** ```ts running = false; try { await consumer.disconnect(); } catch (err) { deps.logger.warn( { err_name: err instanceof Error ? err.name : "UnknownError" }, "notification relay disconnect failed", ); return; } deps.logger.info({}, "notification relay stopped"); ``` #### **[MINOR]** Kafka consumer connection leaked when `consumer.run()` throws after `consumer.connect()` succeeds _src/relay/notification-relay.ts:207_ Inside `start()` (line 207), if `consumer.connect()` and `consumer.subscribe()` succeed but `consumer.run()` throws, `running` stays `false` and `start()` propagates the error. `main.ts` line 179 then does: ```ts } catch (err) { relay = null; // reference dropped ``` The shutdown closure's `if (relay !== null)` check now fails — `relay.stop()` is never called. Although `stop()` already contains a `!running` guard that correctly attempts `consumer.disconnect()`, it is unreachable because the `relay` reference has been nulled. The Kafka broker retains the group member, blocking consumer-group rebalance until the TCP keepalive times out. **Fix (main.ts):** ```ts try { await relay.start(); } catch (err) { try { await relay.stop(); } catch { /* best-effort */ } relay = null; ... } ``` This exercises the existing `!running` disconnect guard in `stop()` and keeps the fix self-contained. #### **[MINOR]** Empty broker array not guarded — kafkajs throws an opaque error instead of a clear `ConfigError` _src/relay/notification-relay.ts:180_ Line 180: ```ts brokers: config.kafkaBootstrapServers.split(",").map((b) => b.trim()).filter((b) => b.length > 0), ``` If `KAFKA_BOOTSTRAP_SERVERS` is set to only delimiters (e.g. `","` or `" , "`) the resulting array is `[]`. kafkajs will throw an internal error during `consumer.connect()`, caught by `main.ts` and logged only as `err_name`. The operator receives no indication that the env var is the root cause. **Fix** — add a guard in `loadConfig` (src/config.ts) after the existing cross-field checks: ```ts if (config.notificationRelayEnabled) { const brokers = config.kafkaBootstrapServers .split(",").map(b => b.trim()).filter(Boolean); if (brokers.length === 0) { throw new ConfigError( "KAFKA_BOOTSTRAP_SERVERS must contain at least one broker address" ); } } ``` #### **[MINOR]** Startup log omits whether `centrifugoApiKey` is set — silent misconfiguration risk in production _src/relay/notification-relay.ts:216_ The relay startup log (lines 216-223) records `topic`, `group_id`, and `centrifugo_api_url` but nothing about the API key. When `CENTRIFUGO_API_KEY` is unset, `config.centrifugoApiKey` is `""` (the explicit default) and every publish sends `Authorization: apikey ` (bare prefix). Against an unauthenticated dev Centrifugo this passes silently; against a secured production instance every publish fails with 401/403 with no boot-time log entry pointing to the env-var root cause. **Fix** — add a `WARN` at startup when the key is empty: ```ts if (config.centrifugoApiKey === "") { deps.logger.warn( { centrifugo_api_url: config.centrifugoApiUrl }, "CENTRIFUGO_API_KEY is empty — unauthenticated publishes (local dev only)", ); } ``` Do NOT log the key value itself. #### **[MINOR]** Trailing slash in `CENTRIFUGO_API_URL` produces double-slash URL `…//api/publish` _src/relay/centrifugo-publisher.ts:91_ Line 91: ```ts const url = `${deps.apiUrl}/api/publish`; ``` The `CentrifugoPublisherDeps` JSDoc says "no trailing slash" and `.env.example` ships without one, but `readString` in `config.ts` does not strip trailing slashes. If an operator sets `CENTRIFUGO_API_URL=http://centrifugo:8000/`, the constructed URL becomes `http://centrifugo:8000//api/publish`. Some reverse-proxy configurations return 404 for the double-slash path, causing every publish to throw and the relay to all-retry — correct semantically but operationally confusing. **Fix** — normalise once at config load time: ```ts centrifugoApiUrl: readString(env, "CENTRIFUGO_API_URL", "http://localhost:8000").replace(/\/$/, ""), ``` Or strip at use-site in the publisher. #### **[INFO]** `SpanKind.PRODUCER` is semantically incorrect for a Kafka-consumer + HTTP-client span _src/relay/notification-relay.ts:80_ Line 80: ```ts const span = getTracer().startSpan(SPAN_NAME_RELAY, { kind: SpanKind.PRODUCER }); ``` Both reviewers independently flagged this. `SpanKind.PRODUCER` is reserved for spans that *produce* (write) messages to a messaging system. This span represents a Kafka consumer receipt followed by an outbound HTTP call to Centrifugo — neither of which is a messaging-system publish. Using `PRODUCER` causes APM tools (Jaeger, Grafana Tempo) and the OTel collector's messaging-topology views to mis-classify the relay. **Reviewer A** recommends `SpanKind.CLIENT` (the primary latency contribution is the synchronous HTTP call to Centrifugo). **Reviewer B** recommends `SpanKind.CONSUMER` (the outer span's lifecycle represents Kafka message consumption). Both are correct fixes. Per OTel messaging semconv §5, if one span covers both the Kafka receipt and the HTTP egress, `SpanKind.CLIENT` is preferred to describe the synchronous-request nature; if the intent is to model the messaging leg only, `SpanKind.CONSUMER` is appropriate. Author should pick one and document the reasoning. #### **[INFO]** `response.text()` body read has no timeout protection — `AbortController` cleared before the body read begins _src/relay/centrifugo-publisher.ts:117_ The `AbortController` timer is cleared in the `finally` block at line 117-118, *before* `response.text()` is called at line 128: ```ts } finally { clearTimeout(timer); // timer gone — line 117-118 } // ... bodyText = await response.text(); // no timeout — line 128 ``` If Centrifugo sends HTTP response headers immediately but stalls on the body (overloaded server, large response), `response.text()` hangs indefinitely. Because this executes inside `eachMessage`, the kafkajs run loop blocks: no offset commits, no new messages from that partition, until the process is killed. Practical risk is low for Centrifugo's small `{"result":{}}` responses, but the structural gap would silently block the relay under a misbehaving instance. **Fix** — extend the `AbortController`'s live region to cover the body read: ```ts try { response = await fetchFn(url, { method: "POST", headers, body: payload, signal: controller.signal }); if (!response.ok) { throw new CentrifugoPublishError(`centrifugo publish non-2xx status: ${response.status}`); } bodyText = await response.text(); // now also guarded } catch (err) { const reason = err instanceof Error ? err.name : "UnknownError"; throw new CentrifugoPublishError(`centrifugo publish error: ${reason}`); } finally { clearTimeout(timer); } ``` ### Verdict **NEEDS_WORK** --- <sub>hib-pr-reviewer • round 1 • 7 findings (5m/2i) • 2026-06-01T11:16:19.444Z → 2026-06-01T11:18:59.526Z • posted-as: pr-reviewer-bot • model: auto</sub> </details>
R2 verdict findings (kept=7):

(1) MINOR notification-relay.ts:238 — happy-path stop() disconnect unwrapped.
    Wrapped consumer.disconnect() in try/catch on the running=true branch so a
    throw is logged (err_name) and returns early instead of skipping the
    "notification relay stopped" log. Mirrors the existing !running guard.
(2) MINOR main.ts:177 — consumer connection leak when run() throws after
    connect(). Added best-effort relay.stop() BEFORE relay=null in the start
    try/catch so the !running disconnect guard reaps the live Kafka group
    member now rather than at keepalive timeout. Existing error log unchanged.
(3) MINOR config.ts — empty broker array not guarded. Added a relay-enabled
    cross-field validation: when NOTIFICATION_RELAY_ENABLED=true, an
    all-delimiter/whitespace KAFKA_BOOTSTRAP_SERVERS that collapses to zero
    brokers after split/trim/filter now throws ConfigError naming
    KAFKA_BOOTSTRAP_SERVERS as the root cause. Disabled relay still boots.
(4) MINOR notification-relay.ts:216 — startup log omitted whether the API key
    is set. Added centrifugo_api_key_set: centrifugoApiKey.length > 0 to the
    "notification relay started" log (boolean only — never the key value, rule 01).
(5) MINOR centrifugo-publisher.ts:91 — trailing slash in CENTRIFUGO_API_URL →
    //api/publish. Normalised via base = apiUrl.replace(/\/+$/, "") before
    building the URL; strips one or many trailing slashes.
(6) INFO notification-relay.ts:80 — SpanKind.PRODUCER wrong for a
    consume+publish span. Changed to SpanKind.CONSUMER (trigger is a Kafka
    consume). Import already covers it.
(7) INFO centrifugo-publisher.ts:117 — body text() read had no timeout (timer
    cleared in finally before the body read). Wrapped fetch + status check +
    body read in one try whose finally clears the timer, so the
    AbortController window now covers the body read too; signal still aborts a
    hung body. Error semantics unchanged (CentrifugoPublishError on any failure).

Verification:
- npm run typecheck → clean (tsc --noEmit, 0 errors)
- npx vitest run → 153 passed (149 prior + 4 new: 2 config empty-broker
  enabled/disabled, 2 publisher trailing-slash single/multi)

hib-pr-reviewer review — PR #6 (affinity-intelligence-rework/im2be-realtime-service)

Round 2 — head ba27bf0731fb, base main, trigger synchronize

TL;DR: CONDITIONAL_APPROVE — verified 2 unique-to-one findings (1 info, 1 minor); all 7 R1 findings confirmed resolved; no blocking issues.

Summary

Arbitration Round 2 — PR #6 im2be-realtime-service

Memora context: No prior run history found (both queries returned empty); this arbitration stores the canonical run-2 record.

Prior-round resolution: Both reviewers independently confirm all 7 R1 findings are resolved — stop() error handling, connection leak on run() throw, empty broker guard, startup log API-key flag, trailing-slash double-slash URL, SpanKind.CONSUMER, and response.text() timeout race.

Reconciliation of round-2 findings:

  • Finding A (info, src/types.ts:320) — unique to A, verified . Read lines 320-332 of src/types.ts confirms the guard returns x is NotificationEvent after checking only candidate["type"] === "notificationEvent" (line 325) and a UUID regex on recipientId (lines 328-330). The six other required-by-interface fields (actorId, entityId, eventType, entityType, message, eventId) plus extraId/extraIdLong receive no presence check. Code comment at lines 312-315 explicitly documents this as intentional leniency. Finding is accurate; kept as info.

  • Finding B (minor, notification-relay.ts:112) — unique to B, verified . Read lines 285-306 of src/types.ts confirms extraId: string | null (line 295) and extraIdLong: number | null (line 297) are declared on the NotificationEvent interface. Read lines 108-119 of notification-relay.ts confirms the forwarded data object contains only eventType, entityType, message, eventId, actorId, entityIdextraId and extraIdLong are absent. The JSDoc at types.ts lines 282-283 enumerates the forwarded fields without them, suggesting intentional exclusion, but neither that comment nor the relay comment at line 108 explains why they are dropped. Reviewer B's fix options are sound. Kept as minor.

Kept 2 findings (0 agreed / 2 unique-to-one verified), dropped 0. No blocking issues.

Blast Radius

The PR introduces a Kafka→Centrifugo relay subsystem spanning 7 source files (config, types, metrics, otel, main, centrifugo-publisher, notification-relay). The relay is wired into the main shutdown sequence, so a relay lifecycle bug can affect HTTP server drain timing. The failure path is designed to degrade gracefully to proxy-only, but the new exported surface (createNotificationRelay, NotificationRelay, NotificationEvent) is consumed by main.ts and the type guard is part of the public types module.

BLAST_SCORE: 6/10

Risk Indicators

Indicator Value
Sensitive functions centrifugoApiKey (config read + Authorization header injection in centrifugo-publisher.ts), isNotificationEvent (UUID validation gate before channel-name interpolation), kafkaConsumerGroup (offset commit state / at-least-once delivery boundary)
Migration touched
Test delta +336 / -0 lines in test files
Dependency changes package-lock.json, package.json

CI status (head ba27bf0731fb)

No CI checks reported for this commit.

Findings (2)

[INFO] isNotificationEvent only validates type + recipientId; remaining required fields typed string but unverified at runtime

src/types.ts:320

The guard at line 320 returns x is NotificationEvent after validating only type === "notificationEvent" (line 325) and recipientId as a UUID (lines 328-330). The six other fields declared as string on the interface — actorId, entityId, eventType, entityType, message, eventId — plus extraId: string | null and extraIdLong: number | null receive no runtime presence check. A malformed producer message that omits any of these fields passes the guard successfully.

Current impact is contained: the code comment at lines 312-315 explicitly documents this intentional design choice (leniency on non-routing fields), and handleNotificationMessage writes them into a Record<string, unknown> before JSON.stringify silently drops any undefined-valued keys.

The latent risk is forward-looking: TypeScript callers downstream of the guard get a non-null string guarantee from the type system that the runtime does not honour. Any future code that reads e.g. parsed.actorId and depends on it being a valid string will silently receive undefined for messages from a non-conforming producer. Consider narrowing the return type to a partial shape that accurately reflects what is actually guaranteed at runtime, or adding presence checks for the forwarded fields.

[MINOR] extraId and extraIdLong silently dropped from Centrifugo payload — exclusion undocumented

src/relay/notification-relay.ts:112

NotificationEvent declares extraId: string | null (src/types.ts:295) and extraIdLong: number | null (src/types.ts:297). The data object built at lines 112-119 forwards six of the eight non-routing fields but omits both. JSON.stringify silently drops them with no runtime error, so Centrifugo subscribers receive events with these fields absent.

The comment at lines 108-111 explains why type and recipientId are excluded; nothing explains the omission of extraId/extraIdLong. The JSDoc at src/types.ts:282-283 enumerates only the forwarded fields, which implies intentional exclusion, but provides no rationale.

If web/mobile Centrifugo subscribers use these fields (e.g. to deep-link to a specific comment or sub-entity), they will silently malfunction on any event where the producer populates them.

Fix — one of:

  1. Add them to the forwarded payload:
const data: Record<string, unknown> = {
  eventType:   parsed.eventType,
  entityType:  parsed.entityType,
  message:     parsed.message,
  eventId:     parsed.eventId,
  actorId:     parsed.actorId,
  entityId:    parsed.entityId,
  extraId:     parsed.extraId,      // ← add
  extraIdLong: parsed.extraIdLong,  // ← add
};
  1. Or, if the exclusion is intentional, add a comment at line 108 explaining why (e.g. // extraId / extraIdLong are internal back-end correlation ids; front-end Centrifugo clients do not consume them per the front-end contract). Without that comment every future reader of NotificationEvent will encounter the same ambiguity.

Verdict

CONDITIONAL_APPROVE


hib-pr-reviewer • round 2 • 2 findings (1m/1i) • 2026-06-01T11:31:34.083Z → 2026-06-01T11:33:26.026Z • posted-as: pr-reviewer-bot • model: auto

<!-- hib-pr-reviewer round:2 --> ## hib-pr-reviewer review — PR #6 (affinity-intelligence-rework/im2be-realtime-service) **Round 2** — head `ba27bf0731fb`, base `main`, trigger `synchronize` **TL;DR:** CONDITIONAL_APPROVE — verified 2 unique-to-one findings (1 info, 1 minor); all 7 R1 findings confirmed resolved; no blocking issues. ### Summary ## Arbitration Round 2 — PR #6 im2be-realtime-service **Memora context:** No prior run history found (both queries returned empty); this arbitration stores the canonical run-2 record. **Prior-round resolution:** Both reviewers independently confirm all 7 R1 findings are resolved — stop() error handling, connection leak on run() throw, empty broker guard, startup log API-key flag, trailing-slash double-slash URL, SpanKind.CONSUMER, and response.text() timeout race. **Reconciliation of round-2 findings:** - **Finding A (info, src/types.ts:320) — unique to A, verified ✅.** Read lines 320-332 of src/types.ts confirms the guard returns `x is NotificationEvent` after checking only `candidate["type"] === "notificationEvent"` (line 325) and a UUID regex on `recipientId` (lines 328-330). The six other required-by-interface fields (`actorId`, `entityId`, `eventType`, `entityType`, `message`, `eventId`) plus `extraId`/`extraIdLong` receive no presence check. Code comment at lines 312-315 explicitly documents this as intentional leniency. Finding is accurate; kept as **info**. - **Finding B (minor, notification-relay.ts:112) — unique to B, verified ✅.** Read lines 285-306 of src/types.ts confirms `extraId: string | null` (line 295) and `extraIdLong: number | null` (line 297) are declared on the `NotificationEvent` interface. Read lines 108-119 of notification-relay.ts confirms the forwarded `data` object contains only `eventType`, `entityType`, `message`, `eventId`, `actorId`, `entityId` — `extraId` and `extraIdLong` are absent. The JSDoc at types.ts lines 282-283 enumerates the forwarded fields without them, suggesting intentional exclusion, but neither that comment nor the relay comment at line 108 explains *why* they are dropped. Reviewer B's fix options are sound. Kept as **minor**. Kept 2 findings (0 agreed / 2 unique-to-one verified), dropped 0. No blocking issues. ### Blast Radius The PR introduces a Kafka→Centrifugo relay subsystem spanning 7 source files (config, types, metrics, otel, main, centrifugo-publisher, notification-relay). The relay is wired into the main shutdown sequence, so a relay lifecycle bug can affect HTTP server drain timing. The failure path is designed to degrade gracefully to proxy-only, but the new exported surface (`createNotificationRelay`, `NotificationRelay`, `NotificationEvent`) is consumed by main.ts and the type guard is part of the public types module. **BLAST_SCORE: 6/10** ### Risk Indicators | Indicator | Value | |---|---| | Sensitive functions | `centrifugoApiKey (config read + Authorization header injection in centrifugo-publisher.ts)`, `isNotificationEvent (UUID validation gate before channel-name interpolation)`, `kafkaConsumerGroup (offset commit state / at-least-once delivery boundary)` | | Migration touched | — | | Test delta | +336 / -0 lines in test files | | Dependency changes | `package-lock.json`, `package.json` | ### CI status (head `ba27bf0731fb`) _No CI checks reported for this commit._ ### Findings (2) #### **[INFO]** `isNotificationEvent` only validates `type` + `recipientId`; remaining required fields typed `string` but unverified at runtime _src/types.ts:320_ The guard at line 320 returns `x is NotificationEvent` after validating only `type === "notificationEvent"` (line 325) and `recipientId` as a UUID (lines 328-330). The six other fields declared as `string` on the interface — `actorId`, `entityId`, `eventType`, `entityType`, `message`, `eventId` — plus `extraId: string | null` and `extraIdLong: number | null` receive no runtime presence check. A malformed producer message that omits any of these fields passes the guard successfully. Current impact is contained: the code comment at lines 312-315 explicitly documents this intentional design choice (leniency on non-routing fields), and `handleNotificationMessage` writes them into a `Record<string, unknown>` before `JSON.stringify` silently drops any `undefined`-valued keys. The latent risk is forward-looking: TypeScript callers downstream of the guard get a non-null `string` guarantee from the type system that the runtime does not honour. Any future code that reads e.g. `parsed.actorId` and depends on it being a valid string will silently receive `undefined` for messages from a non-conforming producer. Consider narrowing the return type to a partial shape that accurately reflects what is actually guaranteed at runtime, or adding presence checks for the forwarded fields. #### **[MINOR]** `extraId` and `extraIdLong` silently dropped from Centrifugo payload — exclusion undocumented _src/relay/notification-relay.ts:112_ `NotificationEvent` declares `extraId: string | null` (src/types.ts:295) and `extraIdLong: number | null` (src/types.ts:297). The `data` object built at lines 112-119 forwards six of the eight non-routing fields but omits both. `JSON.stringify` silently drops them with no runtime error, so Centrifugo subscribers receive events with these fields absent. The comment at lines 108-111 explains why `type` and `recipientId` are excluded; nothing explains the omission of `extraId`/`extraIdLong`. The JSDoc at src/types.ts:282-283 enumerates only the forwarded fields, which implies intentional exclusion, but provides no rationale. If web/mobile Centrifugo subscribers use these fields (e.g. to deep-link to a specific comment or sub-entity), they will silently malfunction on any event where the producer populates them. **Fix — one of:** 1. Add them to the forwarded payload: ```ts const data: Record<string, unknown> = { eventType: parsed.eventType, entityType: parsed.entityType, message: parsed.message, eventId: parsed.eventId, actorId: parsed.actorId, entityId: parsed.entityId, extraId: parsed.extraId, // ← add extraIdLong: parsed.extraIdLong, // ← add }; ``` 2. Or, if the exclusion is intentional, add a comment at line 108 explaining why (e.g. `// extraId / extraIdLong are internal back-end correlation ids; front-end Centrifugo clients do not consume them per the front-end contract`). Without that comment every future reader of `NotificationEvent` will encounter the same ambiguity. ### Verdict **CONDITIONAL_APPROVE** --- <sub>hib-pr-reviewer • round 2 • 2 findings (1m/1i) • 2026-06-01T11:31:34.083Z → 2026-06-01T11:33:26.026Z • posted-as: pr-reviewer-bot • model: auto</sub>
hibryda deleted branch feat/kafka-notification-relay 2026-06-01 13:42:08 +02:00
Sign in to join this conversation.
No reviewers
No labels
No milestone
No project
No assignees
2 participants
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
affinity-intelligence-rework/im2be-realtime-service!6
No description provided.