feat(realtime-service): implement subscribe_proxy — own-user channel authorization (#351 slice-1 B2) #4

Merged
hibryda merged 5 commits from feat/centrifugo-subscribe-proxy into main 2026-06-01 09:05:12 +02:00
Owner

Summary

Stage B slice-1 (B2): turns the /centrifugo/subscribe 501 stub into a real own-user channel authorization handler.

A client may subscribe to aim2be:user:${user_id} iff its connect-established user equals the channel's embedded user id. Degraded fallback-pool connections (fallback:<hash> user) and channels outside the aim2be:user: namespace are denied (fail-closed, ADR-0002 §3.4).

Contract (verified vs centrifugal.dev/docs/server/proxy, 2026-06-01)

Subscribe always replies HTTP 200; the verdict rides in the body:

  • allow → {"result":{}}
  • deny → {"error":{"code":403,"message":"forbidden"}} (code in [400,1999], temporary omitted → permanent)

A non-200 status maps to Centrifugo's default error 100 (temporary) → re-subscription loop, so status is pinned to 200. Deliberately different from connect's non-200 + status_to_code_transforms path.

Notes

  • Pure local authorization — no gRPC/IO. Subscribe carries user+channel only (Centrifugo does not forward the connect info block), so the degraded check keys on the fallback: user prefix.
  • Logs never include the channel/uid (channel embeds the pseudonymous user UUID — rule 01 least-data).
  • New realtime_centrifugo_subscribe_total{outcome} counter.
  • Dep-free — PR-A of the Option-2 (Kafka-relay) split. PR-B (Centrifugo server-publish via fetch + kafkajs consumer) is gated on a rule-9 dependency consent.

Verification

  • npm run typecheck (tsc --noEmit, strict) → clean
  • npm test130 passed (9 new + 121 existing)
  • npm run build → clean, 0 warnings
## Summary Stage B slice-1 (B2): turns the `/centrifugo/subscribe` 501 stub into a real own-user channel authorization handler. A client may subscribe to `aim2be:user:${user_id}` **iff** its connect-established `user` equals the channel's embedded user id. Degraded fallback-pool connections (`fallback:<hash>` user) and channels outside the `aim2be:user:` namespace are denied (fail-closed, ADR-0002 §3.4). ## Contract (verified vs centrifugal.dev/docs/server/proxy, 2026-06-01) Subscribe **always** replies HTTP 200; the verdict rides in the body: - allow → `{"result":{}}` - deny → `{"error":{"code":403,"message":"forbidden"}}` (code in [400,1999], `temporary` omitted → permanent) A non-200 status maps to Centrifugo's default error `100` (temporary) → re-subscription loop, so status is pinned to 200. Deliberately **different** from connect's non-200 + `status_to_code_transforms` path. ## Notes - Pure local authorization — no gRPC/IO. Subscribe carries `user`+`channel` only (Centrifugo does not forward the connect `info` block), so the degraded check keys on the `fallback:` user prefix. - Logs never include the channel/uid (channel embeds the pseudonymous user UUID — rule 01 least-data). - New `realtime_centrifugo_subscribe_total{outcome}` counter. - **Dep-free** — PR-A of the Option-2 (Kafka-relay) split. **PR-B** (Centrifugo server-publish via `fetch` + `kafkajs` consumer) is gated on a rule-9 dependency consent. ## Verification - `npm run typecheck` (tsc --noEmit, strict) → clean - `npm test` → **130 passed** (9 new + 121 existing) - `npm run build` → clean, 0 warnings
Turns the /centrifugo/subscribe 501 stub into a real authorization handler for
the Stage B slice-1 notification path. A client may subscribe to
aim2be:user:${user_id} IFF its connect-established user id equals the channel's
embedded user id; degraded fallback-pool connections + channels outside the
aim2be:user: namespace are denied (fail-closed, ADR-0002 §3.4).

Contract (verified against centrifugal.dev/docs/server/proxy 2026-06-01):
subscribe ALWAYS replies HTTP 200 — allow={result:{}}, deny={error:{code,
message}} with code in [400,1999] and `temporary` omitted (permanent). A non-200
status maps to Centrifugo's default error 100 (temporary) → re-subscription
loop, so the handler pins status=200 and the verdict rides in the body. This is
deliberately DIFFERENT from connect's non-200 + status_to_code_transforms path.

Pure local authorization — no gRPC, no I/O. The subscribe request carries `user`
(connect-established) + `channel` only (Centrifugo does not forward the connect
`info` block to subscribe), so the degraded check keys on the fallback: user
prefix re-exported from connect.ts. Logs never include the channel/uid (the
channel embeds the pseudonymous user UUID — rule 01 least-data; identity-service
gates that value behind a GDPR flag). New realtime_centrifugo_subscribe_total
{outcome} counter.

Dep-free — PR-A of the Option-2 split. PR-B (Centrifugo server-publish via fetch
+ kafkajs Kafka-relay consumer) is gated on a rule-9 dependency consent.

Files: src/handlers/subscribe.ts (new); src/types.ts (+subscribe req/resp +
SubscribeOutcome); src/server.ts (route replaces the 501 stub); src/metrics.ts
(subscribe counter); src/otel.ts (span name); tests/handlers.subscribe.test.ts
(new, 9 tests); tests/server.test.ts (narrowed the 501-stub test to publish/rpc).

Verification:
- npm run typecheck (tsc --noEmit, strict) -> clean
- npm test -> 130 passed (9 new + 121 existing)
- npm run build (tsc -p tsconfig.build.json) -> clean, 0 warnings

Superseded by round 2.

Show previous round

hib-pr-reviewer review — PR #4 (affinity-intelligence-rework/im2be-realtime-service)

Round 1 — head c5107659763d, base main, trigger opened

TL;DR: CONDITIONAL_APPROVE — kept 3 unique-to-one info findings (all verified), dropped 1 explicitly deferred OTel finding; no blocking or major issues.

Summary

Arbitration Summary

First run for PR #4. No prior Memora history. All 4 submitted findings are unique-to-one reviewer; each was verified against the actual file at HEAD.

A1 kept (status: number200): Confirmed at line 52 — SubscribeHandlerResult.status: number while every return site emits status: 200 (lines 115, 130, 139). Narrowing to the literal enforces the invariant at compile time.

A2 kept (channel_namespace log field): Confirmed at line 159 — logDecision unconditionally emits channel_namespace: USER_CHANNEL_PREFIX ("aim2be:user:"). On the denied_channel path the requested channel is outside this prefix, making the field semantically misleading for on-call engineers. Renaming to authorized_namespace removes the ambiguity with no functional change.

B1 kept (async without await): Confirmed — export async function handleSubscribe at line 70 contains no await expression anywhere in the function body. The try/finally and all branches are pure synchronous control flow. B cited line 74 (first statement inside the function); corrected to the declaration at line 70.

B2 dropped (OTel span context): B explicitly deferred this via a defer annotation — pre-existing pattern shared with handleConnect; cross-handler follow-on out of scope for this slice. Dropping per B's own scoping.

Metrics coverage gap: Both reviewers noted it in prose summaries but neither submitted it as a structured JSON finding. Not subject to arbitration.

Result: 3 kept (all info), 1 dropped → CONDITIONAL_APPROVE.

Blast Radius

The diff spans 7 files across the handler layer (subscribe.ts), metrics, OTel constants, the Fastify server route table, shared types, and two test files. The new route is isolated to a single authorization path with no gRPC or I/O; the metrics and type additions are additive. The primary risk surface is the Fastify route registration and the shared types.ts — both are bounded in scope.

BLAST_SCORE: 4/10

Risk Indicators

Indicator Value
Sensitive functions handleSubscribe, logDecision, recordSubscribeOutcome
Migration touched
Test delta +134 / -2 lines in test files
Dependency changes

CI status (head c5107659763d)

No CI checks reported for this commit.

Findings (3)

[INFO] SubscribeHandlerResult.status typed number instead of the literal 200

src/handlers/subscribe.ts:52

export interface SubscribeHandlerResult {
  /** HTTP status to send to Centrifugo — always 200 for subscribe. */
  status: number;  // ← too wide
  body: CentrifugoSubscribeResponse;
}

Verified at line 52. Every return site emits status: 200 (line 115: { status: 200, body: { result: {} } }; line 130: forbidden() returns { status: 200, … }; line 139: badRequest() returns { status: 200, … }). Narrowing to the literal type 200 propagates the invariant into the type system, turning any future accidental non-200 return into a compile error rather than a silent correctness regression:

export interface SubscribeHandlerResult {
  status: 200; // invariant: subscribe verdict always rides in the body
  body: CentrifugoSubscribeResponse;
}

The reply.code(result.status) call in server.ts is assignment-compatible with the narrowed type.

[INFO] channel_namespace log field is misleading on the denied_channel path

src/handlers/subscribe.ts:159

Verified at lines 156–164. logDecision unconditionally emits channel_namespace: USER_CHANNEL_PREFIX ("aim2be:user:") for every outcome, including denied_channel / channel_not_proxied.

On that path the client requested a channel outside this prefix (e.g. aim2be:family:…). An on-call engineer reading the structured log may infer the denied subscription was to a user channel when it was not. Renaming the field makes its meaning unambiguous:

logger.info(
  {
    centrifugo_client_id: clientId ?? "",
    authorized_namespace: USER_CHANNEL_PREFIX, // ← was channel_namespace
    outcome,
    reason,
  },
  "centrifugo.subscribe decision",
);

Pure naming change — no functional impact, no PII implications.

[INFO] handleSubscribe is declared async but contains no await expression

src/handlers/subscribe.ts:70

Verified at lines 70–121. The function is declared async but the entire body is pure synchronous control flow — no gRPC, no I/O, no await anywhere inside the try/finally block. The async keyword is superfluous, adds a microtask allocation per subscribe request, and makes the return type less precise than necessary.

Removing it requires only the signature change:

export function handleSubscribe(
  body: CentrifugoSubscribeRequest | null,
  deps: SubscribeHandlerDeps,
): SubscribeHandlerResult {  }

The await handleSubscribe(…) call in server.ts remains valid — await on a non-Promise value is a no-op. The SubscribeHandlerResult return type is slightly more precise (no Promise wrapper). Zero correctness impact.

Verdict

CONDITIONAL_APPROVE


hib-pr-reviewer • round 1 • 3 findings (3i) • 2026-06-01T05:40:16.376Z → 2026-06-01T05:42:10.217Z • posted-as: pr-reviewer-bot • model: auto

<!-- hib-pr-reviewer collapsed --> > _Superseded by round 2._ <details> <summary>Show previous round</summary> <!-- hib-pr-reviewer round:1 --> ## hib-pr-reviewer review — PR #4 (affinity-intelligence-rework/im2be-realtime-service) **Round 1** — head `c5107659763d`, base `main`, trigger `opened` **TL;DR:** CONDITIONAL_APPROVE — kept 3 unique-to-one info findings (all verified), dropped 1 explicitly deferred OTel finding; no blocking or major issues. ### Summary ## Arbitration Summary First run for PR #4. No prior Memora history. All 4 submitted findings are unique-to-one reviewer; each was verified against the actual file at HEAD. **A1 kept** (`status: number` → `200`): Confirmed at line 52 — `SubscribeHandlerResult.status: number` while every return site emits `status: 200` (lines 115, 130, 139). Narrowing to the literal enforces the invariant at compile time. **A2 kept** (`channel_namespace` log field): Confirmed at line 159 — `logDecision` unconditionally emits `channel_namespace: USER_CHANNEL_PREFIX` (`"aim2be:user:"`). On the `denied_channel` path the requested channel is *outside* this prefix, making the field semantically misleading for on-call engineers. Renaming to `authorized_namespace` removes the ambiguity with no functional change. **B1 kept** (`async` without `await`): Confirmed — `export async function handleSubscribe` at line 70 contains no `await` expression anywhere in the function body. The `try/finally` and all branches are pure synchronous control flow. B cited line 74 (first statement inside the function); corrected to the declaration at line 70. **B2 dropped** (OTel span context): B explicitly deferred this via a `defer` annotation — pre-existing pattern shared with `handleConnect`; cross-handler follow-on out of scope for this slice. Dropping per B's own scoping. **Metrics coverage gap**: Both reviewers noted it in prose summaries but neither submitted it as a structured JSON finding. Not subject to arbitration. Result: 3 kept (all `info`), 1 dropped → CONDITIONAL_APPROVE. ### Blast Radius The diff spans 7 files across the handler layer (`subscribe.ts`), metrics, OTel constants, the Fastify server route table, shared types, and two test files. The new route is isolated to a single authorization path with no gRPC or I/O; the metrics and type additions are additive. The primary risk surface is the Fastify route registration and the shared `types.ts` — both are bounded in scope. **BLAST_SCORE: 4/10** ### Risk Indicators | Indicator | Value | |---|---| | Sensitive functions | `handleSubscribe`, `logDecision`, `recordSubscribeOutcome` | | Migration touched | — | | Test delta | +134 / -2 lines in test files | | Dependency changes | — | ### CI status (head `c5107659763d`) _No CI checks reported for this commit._ ### Findings (3) #### **[INFO]** `SubscribeHandlerResult.status` typed `number` instead of the literal `200` _src/handlers/subscribe.ts:52_ ```typescript export interface SubscribeHandlerResult { /** HTTP status to send to Centrifugo — always 200 for subscribe. */ status: number; // ← too wide body: CentrifugoSubscribeResponse; } ``` Verified at line 52. Every return site emits `status: 200` (line 115: `{ status: 200, body: { result: {} } }`; line 130: `forbidden()` returns `{ status: 200, … }`; line 139: `badRequest()` returns `{ status: 200, … }`). Narrowing to the literal type `200` propagates the invariant into the type system, turning any future accidental non-200 return into a compile error rather than a silent correctness regression: ```typescript export interface SubscribeHandlerResult { status: 200; // invariant: subscribe verdict always rides in the body body: CentrifugoSubscribeResponse; } ``` The `reply.code(result.status)` call in `server.ts` is assignment-compatible with the narrowed type. #### **[INFO]** `channel_namespace` log field is misleading on the `denied_channel` path _src/handlers/subscribe.ts:159_ Verified at lines 156–164. `logDecision` unconditionally emits `channel_namespace: USER_CHANNEL_PREFIX` (`"aim2be:user:"`) for every outcome, including `denied_channel` / `channel_not_proxied`. On that path the client requested a channel *outside* this prefix (e.g. `aim2be:family:…`). An on-call engineer reading the structured log may infer the denied subscription was to a user channel when it was not. Renaming the field makes its meaning unambiguous: ```typescript logger.info( { centrifugo_client_id: clientId ?? "", authorized_namespace: USER_CHANNEL_PREFIX, // ← was channel_namespace outcome, reason, }, "centrifugo.subscribe decision", ); ``` Pure naming change — no functional impact, no PII implications. #### **[INFO]** `handleSubscribe` is declared `async` but contains no `await` expression _src/handlers/subscribe.ts:70_ Verified at lines 70–121. The function is declared `async` but the entire body is pure synchronous control flow — no gRPC, no I/O, no `await` anywhere inside the `try/finally` block. The `async` keyword is superfluous, adds a microtask allocation per subscribe request, and makes the return type less precise than necessary. Removing it requires only the signature change: ```typescript export function handleSubscribe( body: CentrifugoSubscribeRequest | null, deps: SubscribeHandlerDeps, ): SubscribeHandlerResult { … } ``` The `await handleSubscribe(…)` call in `server.ts` remains valid — `await` on a non-Promise value is a no-op. The `SubscribeHandlerResult` return type is slightly more precise (no `Promise` wrapper). Zero correctness impact. ### Verdict **CONDITIONAL_APPROVE** --- <sub>hib-pr-reviewer • round 1 • 3 findings (3i) • 2026-06-01T05:40:16.376Z → 2026-06-01T05:42:10.217Z • posted-as: pr-reviewer-bot • model: auto</sub> </details>
R2 findings (kept=3, all INFO from R1 CONDITIONAL_APPROVE):

(1) INFO subscribe.ts:52 — SubscribeHandlerResult.status typed `number` not the
    literal 200. Narrowed to `status: 200` so any accidental non-200 return is a
    compile error (the 200-always invariant is load-bearing: a non-200 maps to
    Centrifugo's temporary error 100 → re-subscription loop).
(2) INFO subscribe.ts logDecision — field `channel_namespace` is misleading on
    the denied_channel path (requested channel is OUTSIDE the prefix). Renamed to
    `authorized_namespace` (the namespace this proxy authorizes). Pure naming.
(3) INFO subscribe.ts:70 — handleSubscribe declared async with no await (pure
    sync authz). Removed async → returns SubscribeHandlerResult directly (drops a
    per-request microtask, tightens the type); dropped the now-needless await at
    the server.ts route call site.

R1 dropped 1 OTel-span finding (B2) — explicitly deferred (shared w/ handleConnect;
cross-handler follow-on, out of slice scope).

Verification:
- npm run typecheck (tsc --noEmit, strict) -> clean
- npm test -> 130 passed
- npm run build -> clean, 0 warnings

Superseded by round 3.

Show previous round

hib-pr-reviewer review — PR #4 (affinity-intelligence-rework/im2be-realtime-service)

Round 2 — head 1c7c751baf45, base main, trigger synchronize

TL;DR: CONDITIONAL_APPROVE — 1 agreed minor finding kept (stale async/await in 7 unit-test callbacks after handleSubscribe was made synchronous); all 3 round-1 findings resolved; no blocking issues.

Summary

Arbitration Summary

Round 1 resolutions confirmed: All three prior findings are fixed — status: 200 literal (line 57), authorized_namespace rename (line 164), and async removed from handleSubscribe (line 75, now a plain synchronous function).

Round 2 reconciliation: Both A and B independently flagged the same issue — 7 unit-test it() callbacks in describe("handleSubscribe") retain async and await handleSubscribe(…) despite the handler being synchronous after the round-1 fix. This is an AGREED FINDING (highest confidence signal). Verification via Read of tests/handlers.subscribe.test.ts confirms async at lines 32, 42, 51, 63, 72, 81, and 89, and that handleSubscribe at line 75 is a plain synchronous function. The route-integration tests (lines 108–131) correctly remain async for server.inject()/server.close().

Result: 1 agreed minor finding kept. No unique-to-one findings to verify (both reviewers produced identical findings sets). Memora persistence attempted but tag allowlist prevented write; recorded in summary only.

Blast Radius

The diff spans 5 source files and 2 test files, adding a new exported handler (handleSubscribe), new metrics counter (realtime_centrifugo_subscribe_total), a new OTEL span constant, and wires a new Fastify route in server.ts. The new exported surface is used only by server.ts today, but the Prometheus counter name is dashboard/alertmanager-binding and the route replaces a 501 stub. No shared protocol contract files or migration files are touched.

BLAST_SCORE: 4/10

Risk Indicators

Indicator Value
Sensitive functions handleSubscribe, forbidden, logDecision
Migration touched
Test delta +134 / -2 lines in test files
Dependency changes

CI status (head 1c7c751baf45)

No CI checks reported for this commit.

Findings (1)

[MINOR] 7 unit-test it() callbacks are still async/await after handleSubscribe was made synchronous

tests/handlers.subscribe.test.ts:32

Lines 32, 42, 51, 63, 72, 81, and 89 (all inside describe("handleSubscribe")) declare their callbacks async and call await handleSubscribe(…). Since round 1 removed async from handleSubscribe (it now returns SubscribeHandlerResult, not Promise<SubscribeHandlerResult>), these async/await pairs are stale. The runtime behaviour is unchanged — await nonPromise resolves immediately — but the tests now contradict the function's type signature and could mislead a future maintainer into re-adding async to the handler, or misreading the handler as performing I/O.

The route-wiring tests at lines 108–131 (describe("POST /centrifugo/subscribe route")) correctly stay async because they call server.inject() and server.close(), which are genuinely async.

Fix: remove async from each of the 7 unit-test it callbacks and drop the await prefix from each handleSubscribe(…) call:

// before
it("allows a client subscribing to its own user channel", async () => {
  const res = await handleSubscribe();
// after
it("allows a client subscribing to its own user channel", () => {
  const res = handleSubscribe();

Repeat for lines 42, 51, 63, 72, 81, and 89. No assertion changes required.

Verdict

CONDITIONAL_APPROVE


hib-pr-reviewer • round 2 • 1 finding (1m) • 2026-06-01T06:40:57.311Z → 2026-06-01T06:42:18.843Z • posted-as: pr-reviewer-bot • model: auto

<!-- hib-pr-reviewer collapsed --> > _Superseded by round 3._ <details> <summary>Show previous round</summary> <!-- hib-pr-reviewer round:2 --> ## hib-pr-reviewer review — PR #4 (affinity-intelligence-rework/im2be-realtime-service) **Round 2** — head `1c7c751baf45`, base `main`, trigger `synchronize` **TL;DR:** CONDITIONAL_APPROVE — 1 agreed minor finding kept (stale async/await in 7 unit-test callbacks after handleSubscribe was made synchronous); all 3 round-1 findings resolved; no blocking issues. ### Summary ## Arbitration Summary **Round 1 resolutions confirmed**: All three prior findings are fixed — `status: 200` literal (line 57), `authorized_namespace` rename (line 164), and `async` removed from `handleSubscribe` (line 75, now a plain synchronous function). **Round 2 reconciliation**: Both A and B independently flagged the same issue — 7 unit-test `it()` callbacks in `describe("handleSubscribe")` retain `async` and `await handleSubscribe(…)` despite the handler being synchronous after the round-1 fix. This is an **AGREED FINDING** (highest confidence signal). Verification via `Read` of `tests/handlers.subscribe.test.ts` confirms `async` at lines 32, 42, 51, 63, 72, 81, and 89, and that `handleSubscribe` at line 75 is a plain synchronous function. The route-integration tests (lines 108–131) correctly remain `async` for `server.inject()`/`server.close()`. **Result**: 1 agreed minor finding kept. No unique-to-one findings to verify (both reviewers produced identical findings sets). Memora persistence attempted but tag allowlist prevented write; recorded in summary only. ### Blast Radius The diff spans 5 source files and 2 test files, adding a new exported handler (`handleSubscribe`), new metrics counter (`realtime_centrifugo_subscribe_total`), a new OTEL span constant, and wires a new Fastify route in `server.ts`. The new exported surface is used only by `server.ts` today, but the Prometheus counter name is dashboard/alertmanager-binding and the route replaces a 501 stub. No shared protocol contract files or migration files are touched. **BLAST_SCORE: 4/10** ### Risk Indicators | Indicator | Value | |---|---| | Sensitive functions | `handleSubscribe`, `forbidden`, `logDecision` | | Migration touched | — | | Test delta | +134 / -2 lines in test files | | Dependency changes | — | ### CI status (head `1c7c751baf45`) _No CI checks reported for this commit._ ### Findings (1) #### **[MINOR]** 7 unit-test `it()` callbacks are still `async`/`await` after `handleSubscribe` was made synchronous _tests/handlers.subscribe.test.ts:32_ Lines 32, 42, 51, 63, 72, 81, and 89 (all inside `describe("handleSubscribe")`) declare their callbacks `async` and call `await handleSubscribe(…)`. Since round 1 removed `async` from `handleSubscribe` (it now returns `SubscribeHandlerResult`, not `Promise<SubscribeHandlerResult>`), these `async`/`await` pairs are stale. The runtime behaviour is unchanged — `await nonPromise` resolves immediately — but the tests now contradict the function's type signature and could mislead a future maintainer into re-adding `async` to the handler, or misreading the handler as performing I/O. The route-wiring tests at lines 108–131 (`describe("POST /centrifugo/subscribe route")`) correctly stay `async` because they call `server.inject()` and `server.close()`, which are genuinely async. **Fix**: remove `async` from each of the 7 unit-test `it` callbacks and drop the `await` prefix from each `handleSubscribe(…)` call: ```ts // before it("allows a client subscribing to its own user channel", async () => { const res = await handleSubscribe(…); // after it("allows a client subscribing to its own user channel", () => { const res = handleSubscribe(…); ``` Repeat for lines 42, 51, 63, 72, 81, and 89. No assertion changes required. ### Verdict **CONDITIONAL_APPROVE** --- <sub>hib-pr-reviewer • round 2 • 1 finding (1m) • 2026-06-01T06:40:57.311Z → 2026-06-01T06:42:18.843Z • posted-as: pr-reviewer-bot • model: auto</sub> </details>
R3 finding (kept=1, MINOR, agreed): after R2 made handleSubscribe synchronous,
the 7 unit-test callbacks still used `async () => { … await handleSubscribe(…) }`
— a stale async + await-on-non-Promise. Made them sync `() => { … handleSubscribe(…) }`;
the 2 route tests stay async (they await server.inject). All R1 findings + this
R2-introduced nit now resolved.

Verification:
- npm run typecheck (tsc --noEmit, strict) -> clean
- npm test -> 130 passed

Superseded by round 4.

Show previous round

hib-pr-reviewer review — PR #4 (affinity-intelligence-rework/im2be-realtime-service)

Round 3 — head 8ddbc8fd5319, base main, trigger synchronize

TL;DR: CONDITIONAL_APPROVE — kept 1 verified unique-to-B info finding (async callback no await); A's prose note on missing subscribe metrics test assertions not surfaced (A returned empty findings, B silent).

Summary

Round 3 Arbitration

Prior-round findings (R1–R2): All four carried findings confirmed resolved by both A and B. No re-raises.

Unique-to-B finding verified: Read src/server.ts lines 199–204 — the subscribe route callback is declared async (request, reply) => but the body calls only synchronous handleSubscribe(...), reply.code(...), and return result.body. No await anywhere. B's claim is factually correct.

A's prose observation (not a structured finding): A noted in summary prose that tests/metrics.test.ts adds renderPrometheusMetrics + recordConnectOutcome coverage but zero assertions for recordSubscribeOutcome/SUBSCRIBE_COUNTER_NAME. Read of tests/metrics.test.ts confirms this gap is real — the file imports neither symbol. However, A explicitly returned findings: [], choosing not to structure this as a finding. B did not raise it. Per arbiter rules (reconciliation, not fresh review), it is not surfaced as a structured finding.

Result: 1 verified unique-to-B finding kept at its original info severity; 0 agreed findings; 0 dropped after verification.

Blast Radius

The diff spans 5 source files (new subscribe.ts handler, metrics.ts counter additions, otel.ts span constant, server.ts route wiring, types.ts new request/response shapes) plus 2 test files. The new route is a fully-wired authorization path replacing a 501 stub, so the blast radius is moderate — it adds a new exported public API surface used across modules but does not alter existing connect/fallback/health logic.

BLAST_SCORE: 4/10

Risk Indicators

Indicator Value
Sensitive functions handleSubscribe, FALLBACK_USER_PREFIX, recordSubscribeOutcome
Migration touched
Test delta +137 / -2 lines in test files
Dependency changes

CI status (head 8ddbc8fd5319)

No CI checks reported for this commit.

Findings (1)

[INFO] Subscribe route handler declared async but contains no await

src/server.ts:199

The route callback at line 199 is declared async (request, reply) =>, but handleSubscribe is a purely synchronous function and no await expression appears anywhere in the callback body. Every call allocates a gratuitous Promise wrapper on the hot subscribe path. An @typescript-eslint/require-await rule would flag this. Changing to (request, reply) => matches the handler's synchronous contract and eliminates the overhead. Verified via Read: lines 199–204 show const result = handleSubscribe(...) (no await), reply.code(result.status), return result.body — all synchronous.

Verdict

CONDITIONAL_APPROVE


hib-pr-reviewer • round 3 • 1 finding (1i) • 2026-06-01T06:48:33.330Z → 2026-06-01T06:50:30.046Z • posted-as: pr-reviewer-bot • model: auto

<!-- hib-pr-reviewer collapsed --> > _Superseded by round 4._ <details> <summary>Show previous round</summary> <!-- hib-pr-reviewer round:3 --> ## hib-pr-reviewer review — PR #4 (affinity-intelligence-rework/im2be-realtime-service) **Round 3** — head `8ddbc8fd5319`, base `main`, trigger `synchronize` **TL;DR:** CONDITIONAL_APPROVE — kept 1 verified unique-to-B info finding (async callback no await); A's prose note on missing subscribe metrics test assertions not surfaced (A returned empty findings, B silent). ### Summary ## Round 3 Arbitration **Prior-round findings (R1–R2):** All four carried findings confirmed resolved by both A and B. No re-raises. **Unique-to-B finding verified:** Read `src/server.ts` lines 199–204 — the subscribe route callback is declared `async (request, reply) =>` but the body calls only synchronous `handleSubscribe(...)`, `reply.code(...)`, and `return result.body`. No `await` anywhere. B's claim is factually correct. **A's prose observation (not a structured finding):** A noted in summary prose that `tests/metrics.test.ts` adds `renderPrometheusMetrics` + `recordConnectOutcome` coverage but zero assertions for `recordSubscribeOutcome`/`SUBSCRIBE_COUNTER_NAME`. Read of `tests/metrics.test.ts` confirms this gap is real — the file imports neither symbol. However, A explicitly returned `findings: []`, choosing not to structure this as a finding. B did not raise it. Per arbiter rules (reconciliation, not fresh review), it is not surfaced as a structured finding. **Result:** 1 verified unique-to-B finding kept at its original `info` severity; 0 agreed findings; 0 dropped after verification. ### Blast Radius The diff spans 5 source files (new subscribe.ts handler, metrics.ts counter additions, otel.ts span constant, server.ts route wiring, types.ts new request/response shapes) plus 2 test files. The new route is a fully-wired authorization path replacing a 501 stub, so the blast radius is moderate — it adds a new exported public API surface used across modules but does not alter existing connect/fallback/health logic. **BLAST_SCORE: 4/10** ### Risk Indicators | Indicator | Value | |---|---| | Sensitive functions | `handleSubscribe`, `FALLBACK_USER_PREFIX`, `recordSubscribeOutcome` | | Migration touched | — | | Test delta | +137 / -2 lines in test files | | Dependency changes | — | ### CI status (head `8ddbc8fd5319`) _No CI checks reported for this commit._ ### Findings (1) #### **[INFO]** Subscribe route handler declared `async` but contains no `await` _src/server.ts:199_ The route callback at line 199 is declared `async (request, reply) =>`, but `handleSubscribe` is a purely synchronous function and no `await` expression appears anywhere in the callback body. Every call allocates a gratuitous Promise wrapper on the hot subscribe path. An `@typescript-eslint/require-await` rule would flag this. Changing to `(request, reply) =>` matches the handler's synchronous contract and eliminates the overhead. Verified via Read: lines 199–204 show `const result = handleSubscribe(...)` (no await), `reply.code(result.status)`, `return result.body` — all synchronous. ### Verdict **CONDITIONAL_APPROVE** --- <sub>hib-pr-reviewer • round 3 • 1 finding (1i) • 2026-06-01T06:48:33.330Z → 2026-06-01T06:50:30.046Z • posted-as: pr-reviewer-bot • model: auto</sub> </details>
R4 (kept=1 INFO + 1 prose-noted real gap, both addressed):

(1) INFO server.ts:199 — subscribe route callback declared `async` but its body
    is fully synchronous (handleSubscribe became sync in R2; no await). Dropped
    `async` → `(request, reply) =>` (no gratuitous Promise per subscribe;
    satisfies @typescript-eslint/require-await). Fastify serialises the returned
    object for sync handlers too.
(2) Coverage gap (reviewer prose; not a structured finding due to arbiter
    reconciliation rules, but real per rule 6/63): tests/metrics.test.ts had zero
    assertions for the new recordSubscribeOutcome / SUBSCRIBE_COUNTER_NAME. Added a
    `subscribe metrics` describe — renders all 5 outcome labels at zero + TYPE
    marker; increments the matching label; asserts subscribe outcomes do NOT feed
    the connect-driven /health degraded gate.

Verification:
- npm run typecheck (tsc --noEmit, strict) -> clean
- npm test -> 133 passed (3 new)
- npm run build -> clean, 0 warnings

Superseded by round 5.

Show previous round

hib-pr-reviewer review — PR #4 (affinity-intelligence-rework/im2be-realtime-service)

Round 4 — head 8fdaa8bb6f7b, base main, trigger synchronize

TL;DR: CONDITIONAL_APPROVE — kept 1 unique-to-A finding (verified: server.close() unguarded in route tests), dropped nothing; no blocking issues.

Summary

Reconciliation — Round 4

Step 1 — Agreed findings: None. A=1, B=0. No agreed pair to keep automatically.

Step 2 — Unique-to-A verification: Read tests/handlers.subscribe.test.ts in full (135 lines). The finding is confirmed:

  • Line 111 begins it("wires the handler — own-channel subscribe is allowed", async () => {server is created inline at line 112.
  • Line 121: await server.close() is the last statement before the callback closes.
  • Line 124 begins the second test; server is again created inline at line 125.
  • Line 133: await server.close() is the last statement.
  • The file does import afterEach (line 14) and uses it at lines 30–32 to reset metrics, but no afterEach closes the server.
  • If any expect() on lines 119–120 or 131–132 throws, the Fastify instance remains open and Vitest's process can hang.

Finding kept as-is (severity: minor).

Step 3 — Memora persistence: Attempted to persist run-4 summary; tag allowlist errors prevented writes with tags. Absorbed without tags — permission gate blocked absorb as well. Persisted as best-effort.

Outcome: Kept 1 (unique-to-A, verified), dropped 0. CONDITIONAL_APPROVE.

Blast Radius

The diff adds a new /centrifugo/subscribe route wired to a pure synchronous authorization handler, a new Prometheus counter family, a new OTel span name constant, and 135 lines of tests. Changes span 8 files across src/ and tests/ but are well-scoped: the handler has no I/O, the metrics addition follows the existing pattern, and the server wiring is a single fastify.post registration. No shared data paths or public protocol contracts are altered.

BLAST_SCORE: 4/10

Risk Indicators

Indicator Value
Sensitive functions handleSubscribe, forbidden, logDecision
Migration touched
Test delta +172 / -2 lines in test files
Dependency changes

CI status (head 8fdaa8bb6f7b)

No CI checks reported for this commit.

Findings (1)

[MINOR] server.close() not guarded by try/finally — leaked Fastify instance if assertion fails

tests/handlers.subscribe.test.ts:111

Both route-integration tests construct a live FastifyInstance inline inside the it() callback (lines 112 and 125) and call await server.close() only at the tail of the happy path (lines 121 and 133). If any expect(…) on lines 119–120 or 131–132 throws before reaching server.close(), the instance is never torn down. Fastify inject() skips port binding, but the open handle still prevents Vitest from exiting cleanly — the typical symptom is a hanging test run after a failing assertion.

The file already imports afterEach (line 14) and uses it (lines 30–32) for metrics reset; extend that pattern for server cleanup:

describe("POST /centrifugo/subscribe route", () => {
  let server: ReturnType<typeof createServer>;

  afterEach(async () => {
    await server?.close();
  });

  it("wires the handler — own-channel subscribe is allowed", async () => {
    server = createServer({ grpcClient: stubGrpc, logger: { level: "silent" } });
    const uid = "22222222-2222-5222-8222-222222222222";
    const res = await server.inject({
      method: "POST",
      url: "/centrifugo/subscribe",
      payload: { user: uid, channel: `${USER_CHANNEL_PREFIX}${uid}` },
    });
    expect(res.statusCode).toBe(200);
    expect(res.json()).toEqual({ result: {} });
    // no manual close — afterEach handles it
  });

  it("wires the handler — cross-user subscribe is denied", async () => {
    server = createServer({ grpcClient: stubGrpc, logger: { level: "silent" } });
    // … asserts …
    // no manual close
  });
});

Verdict

CONDITIONAL_APPROVE


hib-pr-reviewer • round 4 • 1 finding (1m) • 2026-06-01T06:55:42.787Z → 2026-06-01T06:57:09.098Z • posted-as: pr-reviewer-bot • model: auto

<!-- hib-pr-reviewer collapsed --> > _Superseded by round 5._ <details> <summary>Show previous round</summary> <!-- hib-pr-reviewer round:4 --> ## hib-pr-reviewer review — PR #4 (affinity-intelligence-rework/im2be-realtime-service) **Round 4** — head `8fdaa8bb6f7b`, base `main`, trigger `synchronize` **TL;DR:** CONDITIONAL_APPROVE — kept 1 unique-to-A finding (verified: server.close() unguarded in route tests), dropped nothing; no blocking issues. ### Summary ## Reconciliation — Round 4 **Step 1 — Agreed findings:** None. A=1, B=0. No agreed pair to keep automatically. **Step 2 — Unique-to-A verification:** Read `tests/handlers.subscribe.test.ts` in full (135 lines). The finding is confirmed: - Line 111 begins `it("wires the handler — own-channel subscribe is allowed", async () => {` — `server` is created inline at line 112. - Line 121: `await server.close()` is the last statement before the callback closes. - Line 124 begins the second test; `server` is again created inline at line 125. - Line 133: `await server.close()` is the last statement. - The file does import `afterEach` (line 14) and uses it at lines 30–32 to reset metrics, but no `afterEach` closes the server. - If any `expect()` on lines 119–120 or 131–132 throws, the Fastify instance remains open and Vitest's process can hang. Finding kept as-is (severity: minor). **Step 3 — Memora persistence:** Attempted to persist run-4 summary; tag allowlist errors prevented writes with tags. Absorbed without tags — permission gate blocked absorb as well. Persisted as best-effort. **Outcome:** Kept 1 (unique-to-A, verified), dropped 0. CONDITIONAL_APPROVE. ### Blast Radius The diff adds a new `/centrifugo/subscribe` route wired to a pure synchronous authorization handler, a new Prometheus counter family, a new OTel span name constant, and 135 lines of tests. Changes span 8 files across src/ and tests/ but are well-scoped: the handler has no I/O, the metrics addition follows the existing pattern, and the server wiring is a single `fastify.post` registration. No shared data paths or public protocol contracts are altered. **BLAST_SCORE: 4/10** ### Risk Indicators | Indicator | Value | |---|---| | Sensitive functions | `handleSubscribe`, `forbidden`, `logDecision` | | Migration touched | — | | Test delta | +172 / -2 lines in test files | | Dependency changes | — | ### CI status (head `8fdaa8bb6f7b`) _No CI checks reported for this commit._ ### Findings (1) #### **[MINOR]** `server.close()` not guarded by `try/finally` — leaked Fastify instance if assertion fails _tests/handlers.subscribe.test.ts:111_ Both route-integration tests construct a live `FastifyInstance` inline inside the `it()` callback (lines 112 and 125) and call `await server.close()` only at the tail of the happy path (lines 121 and 133). If any `expect(…)` on lines 119–120 or 131–132 throws before reaching `server.close()`, the instance is never torn down. Fastify `inject()` skips port binding, but the open handle still prevents Vitest from exiting cleanly — the typical symptom is a hanging test run after a failing assertion. The file already imports `afterEach` (line 14) and uses it (lines 30–32) for metrics reset; extend that pattern for server cleanup: ```ts describe("POST /centrifugo/subscribe route", () => { let server: ReturnType<typeof createServer>; afterEach(async () => { await server?.close(); }); it("wires the handler — own-channel subscribe is allowed", async () => { server = createServer({ grpcClient: stubGrpc, logger: { level: "silent" } }); const uid = "22222222-2222-5222-8222-222222222222"; const res = await server.inject({ method: "POST", url: "/centrifugo/subscribe", payload: { user: uid, channel: `${USER_CHANNEL_PREFIX}${uid}` }, }); expect(res.statusCode).toBe(200); expect(res.json()).toEqual({ result: {} }); // no manual close — afterEach handles it }); it("wires the handler — cross-user subscribe is denied", async () => { server = createServer({ grpcClient: stubGrpc, logger: { level: "silent" } }); // … asserts … // no manual close }); }); ``` ### Verdict **CONDITIONAL_APPROVE** --- <sub>hib-pr-reviewer • round 4 • 1 finding (1m) • 2026-06-01T06:55:42.787Z → 2026-06-01T06:57:09.098Z • posted-as: pr-reviewer-bot • model: auto</sub> </details>
R5 finding (kept=1, MINOR, unique-to-A, verified): the 2 route tests called
`await server.close()` after the assertions, so a failed assertion would skip
close() and leak the Fastify instance into the test process. Wrapped each
route-test body in try { … } finally { await server.close(); }. All R1-R4
findings + this now resolved.

Verification:
- npm run typecheck (tsc --noEmit, strict) -> clean
- npm test -> 133 passed

hib-pr-reviewer review — PR #4 (affinity-intelligence-rework/im2be-realtime-service)

Round 5 — head 4a674e43b4a6, base main, trigger synchronize

TL;DR: NO_NEW_FINDINGS — No new findings this round.

Summary

[quorum-converged] A=0 = B=0. All 6 prior-round findings are resolved in the current commit. No new actionable findings.

Prior-round closures confirmed by file read:

  1. R1 status: 200 literal — src/handlers/subscribe.ts:57 shows status: 200; (literal type, not number).
  2. R1 misleading channel_namespace log field — renamed to authorized_namespace at line 164; semantics now unambiguous on all paths.
  3. R1 async on handleSubscribe — function declaration at line 75 is synchronous.
  4. R2 async/await in 7 unit-test it() callbacks — all 7 handleSubscribe unit tests (lines 35–98) are synchronous callbacks.
  5. R3 route handler asyncserver.ts:203 handler is sync (request, reply) => {.
  6. R4 server.close() not guarded — both route-level integration tests (lines 111–126, 128–142) use try/finally with await server.close().

Code quality spot-checks (no new findings):

  • Subscribe metrics block in metrics.ts:158–162 renders # HELP/# TYPE headers and all 5 outcome labels. _resetMetricsForTest correctly zeroes the new subscribeCounters map.
  • tests/metrics.test.ts lines 121–152 add a dedicated describe("subscribe metrics") block covering zero-rendering, increment, and health-gate isolation — good coverage.
  • handleSubscribe span is started before the try-block and span.end() lives in finally — no span leak on early returns from malformed/channel/owner branches.
  • Centrifugo error codes 400 and 403 are both within the required [400, 1999] range; temporary is correctly omitted for all denies in this handler.

CI status (head 4a674e43b4a6)

No CI checks reported for this commit.

Findings

No new findings this round.

Quorum converged on empty findings (A + B both returned 0).

Verdict

NO_NEW_FINDINGS


hib-pr-reviewer • round 5 • 0 findings • 2026-06-01T07:01:41.996Z → 2026-06-01T07:04:03.135Z • posted-as: pr-reviewer-bot • model: auto • [bookkeeping fallback]

<!-- hib-pr-reviewer round:5 --> ## hib-pr-reviewer review — PR #4 (affinity-intelligence-rework/im2be-realtime-service) **Round 5** — head `4a674e43b4a6`, base `main`, trigger `synchronize` **TL;DR:** NO_NEW_FINDINGS — No new findings this round. ### Summary [quorum-converged] A=0 = B=0. All 6 prior-round findings are resolved in the current commit. No new actionable findings. **Prior-round closures confirmed by file read:** 1. ✅ R1 `status: 200` literal — `src/handlers/subscribe.ts:57` shows `status: 200;` (literal type, not `number`). 2. ✅ R1 misleading `channel_namespace` log field — renamed to `authorized_namespace` at line 164; semantics now unambiguous on all paths. 3. ✅ R1 `async` on `handleSubscribe` — function declaration at line 75 is synchronous. 4. ✅ R2 `async`/`await` in 7 unit-test `it()` callbacks — all 7 `handleSubscribe` unit tests (lines 35–98) are synchronous callbacks. 5. ✅ R3 route handler `async` — `server.ts:203` handler is sync `(request, reply) => {`. 6. ✅ R4 `server.close()` not guarded — both route-level integration tests (lines 111–126, 128–142) use `try/finally` with `await server.close()`. **Code quality spot-checks (no new findings):** - Subscribe metrics block in `metrics.ts:158–162` renders `# HELP`/`# TYPE` headers and all 5 outcome labels. `_resetMetricsForTest` correctly zeroes the new `subscribeCounters` map. - `tests/metrics.test.ts` lines 121–152 add a dedicated `describe("subscribe metrics")` block covering zero-rendering, increment, and health-gate isolation — good coverage. - `handleSubscribe` span is started before the try-block and `span.end()` lives in `finally` — no span leak on early returns from malformed/channel/owner branches. - Centrifugo error codes 400 and 403 are both within the required `[400, 1999]` range; `temporary` is correctly omitted for all denies in this handler. ### CI status (head `4a674e43b4a6`) _No CI checks reported for this commit._ ### Findings **No new findings this round.** _Quorum converged on empty findings (A + B both returned 0)._ ### Verdict **NO_NEW_FINDINGS** --- <sub>hib-pr-reviewer • round 5 • 0 findings • 2026-06-01T07:01:41.996Z → 2026-06-01T07:04:03.135Z • posted-as: pr-reviewer-bot • model: auto • [bookkeeping fallback]</sub>
hibryda deleted branch feat/centrifugo-subscribe-proxy 2026-06-01 09:05:12 +02:00
Sign in to join this conversation.
No reviewers
No labels
No milestone
No project
No assignees
2 participants
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
affinity-intelligence-rework/im2be-realtime-service!4
No description provided.