feat(realtime-service): implement subscribe_proxy — own-user channel authorization (#351 slice-1 B2) #4
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "feat/centrifugo-subscribe-proxy"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Summary
Stage B slice-1 (B2): turns the
/centrifugo/subscribe501 stub into a real own-user channel authorization handler.A client may subscribe to
aim2be:user:${user_id}iff its connect-establisheduserequals the channel's embedded user id. Degraded fallback-pool connections (fallback:<hash>user) and channels outside theaim2be:user:namespace are denied (fail-closed, ADR-0002 §3.4).Contract (verified vs centrifugal.dev/docs/server/proxy, 2026-06-01)
Subscribe always replies HTTP 200; the verdict rides in the body:
{"result":{}}{"error":{"code":403,"message":"forbidden"}}(code in [400,1999],temporaryomitted → permanent)A non-200 status maps to Centrifugo's default error
100(temporary) → re-subscription loop, so status is pinned to 200. Deliberately different from connect's non-200 +status_to_code_transformspath.Notes
user+channelonly (Centrifugo does not forward the connectinfoblock), so the degraded check keys on thefallback:user prefix.realtime_centrifugo_subscribe_total{outcome}counter.fetch+kafkajsconsumer) is gated on a rule-9 dependency consent.Verification
npm run typecheck(tsc --noEmit, strict) → cleannpm test→ 130 passed (9 new + 121 existing)npm run build→ clean, 0 warningsTurns the /centrifugo/subscribe 501 stub into a real authorization handler for the Stage B slice-1 notification path. A client may subscribe to aim2be:user:${user_id} IFF its connect-established user id equals the channel's embedded user id; degraded fallback-pool connections + channels outside the aim2be:user: namespace are denied (fail-closed, ADR-0002 §3.4). Contract (verified against centrifugal.dev/docs/server/proxy 2026-06-01): subscribe ALWAYS replies HTTP 200 — allow={result:{}}, deny={error:{code, message}} with code in [400,1999] and `temporary` omitted (permanent). A non-200 status maps to Centrifugo's default error 100 (temporary) → re-subscription loop, so the handler pins status=200 and the verdict rides in the body. This is deliberately DIFFERENT from connect's non-200 + status_to_code_transforms path. Pure local authorization — no gRPC, no I/O. The subscribe request carries `user` (connect-established) + `channel` only (Centrifugo does not forward the connect `info` block to subscribe), so the degraded check keys on the fallback: user prefix re-exported from connect.ts. Logs never include the channel/uid (the channel embeds the pseudonymous user UUID — rule 01 least-data; identity-service gates that value behind a GDPR flag). New realtime_centrifugo_subscribe_total {outcome} counter. Dep-free — PR-A of the Option-2 split. PR-B (Centrifugo server-publish via fetch + kafkajs Kafka-relay consumer) is gated on a rule-9 dependency consent. Files: src/handlers/subscribe.ts (new); src/types.ts (+subscribe req/resp + SubscribeOutcome); src/server.ts (route replaces the 501 stub); src/metrics.ts (subscribe counter); src/otel.ts (span name); tests/handlers.subscribe.test.ts (new, 9 tests); tests/server.test.ts (narrowed the 501-stub test to publish/rpc). Verification: - npm run typecheck (tsc --noEmit, strict) -> clean - npm test -> 130 passed (9 new + 121 existing) - npm run build (tsc -p tsconfig.build.json) -> clean, 0 warningsShow previous round
hib-pr-reviewer review — PR #4 (affinity-intelligence-rework/im2be-realtime-service)
Round 1 — head
c5107659763d, basemain, triggeropenedTL;DR: CONDITIONAL_APPROVE — kept 3 unique-to-one info findings (all verified), dropped 1 explicitly deferred OTel finding; no blocking or major issues.
Summary
Arbitration Summary
First run for PR #4. No prior Memora history. All 4 submitted findings are unique-to-one reviewer; each was verified against the actual file at HEAD.
A1 kept (
status: number→200): Confirmed at line 52 —SubscribeHandlerResult.status: numberwhile every return site emitsstatus: 200(lines 115, 130, 139). Narrowing to the literal enforces the invariant at compile time.A2 kept (
channel_namespacelog field): Confirmed at line 159 —logDecisionunconditionally emitschannel_namespace: USER_CHANNEL_PREFIX("aim2be:user:"). On thedenied_channelpath the requested channel is outside this prefix, making the field semantically misleading for on-call engineers. Renaming toauthorized_namespaceremoves the ambiguity with no functional change.B1 kept (
asyncwithoutawait): Confirmed —export async function handleSubscribeat line 70 contains noawaitexpression anywhere in the function body. Thetry/finallyand all branches are pure synchronous control flow. B cited line 74 (first statement inside the function); corrected to the declaration at line 70.B2 dropped (OTel span context): B explicitly deferred this via a
deferannotation — pre-existing pattern shared withhandleConnect; cross-handler follow-on out of scope for this slice. Dropping per B's own scoping.Metrics coverage gap: Both reviewers noted it in prose summaries but neither submitted it as a structured JSON finding. Not subject to arbitration.
Result: 3 kept (all
info), 1 dropped → CONDITIONAL_APPROVE.Blast Radius
The diff spans 7 files across the handler layer (
subscribe.ts), metrics, OTel constants, the Fastify server route table, shared types, and two test files. The new route is isolated to a single authorization path with no gRPC or I/O; the metrics and type additions are additive. The primary risk surface is the Fastify route registration and the sharedtypes.ts— both are bounded in scope.BLAST_SCORE: 4/10
Risk Indicators
handleSubscribe,logDecision,recordSubscribeOutcomeCI status (head
c5107659763d)No CI checks reported for this commit.
Findings (3)
[INFO]
SubscribeHandlerResult.statustypednumberinstead of the literal200src/handlers/subscribe.ts:52
Verified at line 52. Every return site emits
status: 200(line 115:{ status: 200, body: { result: {} } }; line 130:forbidden()returns{ status: 200, … }; line 139:badRequest()returns{ status: 200, … }). Narrowing to the literal type200propagates the invariant into the type system, turning any future accidental non-200 return into a compile error rather than a silent correctness regression:The
reply.code(result.status)call inserver.tsis assignment-compatible with the narrowed type.[INFO]
channel_namespacelog field is misleading on thedenied_channelpathsrc/handlers/subscribe.ts:159
Verified at lines 156–164.
logDecisionunconditionally emitschannel_namespace: USER_CHANNEL_PREFIX("aim2be:user:") for every outcome, includingdenied_channel/channel_not_proxied.On that path the client requested a channel outside this prefix (e.g.
aim2be:family:…). An on-call engineer reading the structured log may infer the denied subscription was to a user channel when it was not. Renaming the field makes its meaning unambiguous:Pure naming change — no functional impact, no PII implications.
[INFO]
handleSubscribeis declaredasyncbut contains noawaitexpressionsrc/handlers/subscribe.ts:70
Verified at lines 70–121. The function is declared
asyncbut the entire body is pure synchronous control flow — no gRPC, no I/O, noawaitanywhere inside thetry/finallyblock. Theasynckeyword is superfluous, adds a microtask allocation per subscribe request, and makes the return type less precise than necessary.Removing it requires only the signature change:
The
await handleSubscribe(…)call inserver.tsremains valid —awaiton a non-Promise value is a no-op. TheSubscribeHandlerResultreturn type is slightly more precise (noPromisewrapper). Zero correctness impact.Verdict
CONDITIONAL_APPROVE
hib-pr-reviewer • round 1 • 3 findings (3i) • 2026-06-01T05:40:16.376Z → 2026-06-01T05:42:10.217Z • posted-as: pr-reviewer-bot • model: auto
R2 findings (kept=3, all INFO from R1 CONDITIONAL_APPROVE): (1) INFO subscribe.ts:52 — SubscribeHandlerResult.status typed `number` not the literal 200. Narrowed to `status: 200` so any accidental non-200 return is a compile error (the 200-always invariant is load-bearing: a non-200 maps to Centrifugo's temporary error 100 → re-subscription loop). (2) INFO subscribe.ts logDecision — field `channel_namespace` is misleading on the denied_channel path (requested channel is OUTSIDE the prefix). Renamed to `authorized_namespace` (the namespace this proxy authorizes). Pure naming. (3) INFO subscribe.ts:70 — handleSubscribe declared async with no await (pure sync authz). Removed async → returns SubscribeHandlerResult directly (drops a per-request microtask, tightens the type); dropped the now-needless await at the server.ts route call site. R1 dropped 1 OTel-span finding (B2) — explicitly deferred (shared w/ handleConnect; cross-handler follow-on, out of slice scope). Verification: - npm run typecheck (tsc --noEmit, strict) -> clean - npm test -> 130 passed - npm run build -> clean, 0 warningsShow previous round
hib-pr-reviewer review — PR #4 (affinity-intelligence-rework/im2be-realtime-service)
Round 2 — head
1c7c751baf45, basemain, triggersynchronizeTL;DR: CONDITIONAL_APPROVE — 1 agreed minor finding kept (stale async/await in 7 unit-test callbacks after handleSubscribe was made synchronous); all 3 round-1 findings resolved; no blocking issues.
Summary
Arbitration Summary
Round 1 resolutions confirmed: All three prior findings are fixed —
status: 200literal (line 57),authorized_namespacerename (line 164), andasyncremoved fromhandleSubscribe(line 75, now a plain synchronous function).Round 2 reconciliation: Both A and B independently flagged the same issue — 7 unit-test
it()callbacks indescribe("handleSubscribe")retainasyncandawait handleSubscribe(…)despite the handler being synchronous after the round-1 fix. This is an AGREED FINDING (highest confidence signal). Verification viaReadoftests/handlers.subscribe.test.tsconfirmsasyncat lines 32, 42, 51, 63, 72, 81, and 89, and thathandleSubscribeat line 75 is a plain synchronous function. The route-integration tests (lines 108–131) correctly remainasyncforserver.inject()/server.close().Result: 1 agreed minor finding kept. No unique-to-one findings to verify (both reviewers produced identical findings sets). Memora persistence attempted but tag allowlist prevented write; recorded in summary only.
Blast Radius
The diff spans 5 source files and 2 test files, adding a new exported handler (
handleSubscribe), new metrics counter (realtime_centrifugo_subscribe_total), a new OTEL span constant, and wires a new Fastify route inserver.ts. The new exported surface is used only byserver.tstoday, but the Prometheus counter name is dashboard/alertmanager-binding and the route replaces a 501 stub. No shared protocol contract files or migration files are touched.BLAST_SCORE: 4/10
Risk Indicators
handleSubscribe,forbidden,logDecisionCI status (head
1c7c751baf45)No CI checks reported for this commit.
Findings (1)
[MINOR] 7 unit-test
it()callbacks are stillasync/awaitafterhandleSubscribewas made synchronoustests/handlers.subscribe.test.ts:32
Lines 32, 42, 51, 63, 72, 81, and 89 (all inside
describe("handleSubscribe")) declare their callbacksasyncand callawait handleSubscribe(…). Since round 1 removedasyncfromhandleSubscribe(it now returnsSubscribeHandlerResult, notPromise<SubscribeHandlerResult>), theseasync/awaitpairs are stale. The runtime behaviour is unchanged —await nonPromiseresolves immediately — but the tests now contradict the function's type signature and could mislead a future maintainer into re-addingasyncto the handler, or misreading the handler as performing I/O.The route-wiring tests at lines 108–131 (
describe("POST /centrifugo/subscribe route")) correctly stayasyncbecause they callserver.inject()andserver.close(), which are genuinely async.Fix: remove
asyncfrom each of the 7 unit-testitcallbacks and drop theawaitprefix from eachhandleSubscribe(…)call:Repeat for lines 42, 51, 63, 72, 81, and 89. No assertion changes required.
Verdict
CONDITIONAL_APPROVE
hib-pr-reviewer • round 2 • 1 finding (1m) • 2026-06-01T06:40:57.311Z → 2026-06-01T06:42:18.843Z • posted-as: pr-reviewer-bot • model: auto
R3 finding (kept=1, MINOR, agreed): after R2 made handleSubscribe synchronous, the 7 unit-test callbacks still used `async () => { … await handleSubscribe(…) }` — a stale async + await-on-non-Promise. Made them sync `() => { … handleSubscribe(…) }`; the 2 route tests stay async (they await server.inject). All R1 findings + this R2-introduced nit now resolved. Verification: - npm run typecheck (tsc --noEmit, strict) -> clean - npm test -> 130 passedShow previous round
hib-pr-reviewer review — PR #4 (affinity-intelligence-rework/im2be-realtime-service)
Round 3 — head
8ddbc8fd5319, basemain, triggersynchronizeTL;DR: CONDITIONAL_APPROVE — kept 1 verified unique-to-B info finding (async callback no await); A's prose note on missing subscribe metrics test assertions not surfaced (A returned empty findings, B silent).
Summary
Round 3 Arbitration
Prior-round findings (R1–R2): All four carried findings confirmed resolved by both A and B. No re-raises.
Unique-to-B finding verified: Read
src/server.tslines 199–204 — the subscribe route callback is declaredasync (request, reply) =>but the body calls only synchronoushandleSubscribe(...),reply.code(...), andreturn result.body. Noawaitanywhere. B's claim is factually correct.A's prose observation (not a structured finding): A noted in summary prose that
tests/metrics.test.tsaddsrenderPrometheusMetrics+recordConnectOutcomecoverage but zero assertions forrecordSubscribeOutcome/SUBSCRIBE_COUNTER_NAME. Read oftests/metrics.test.tsconfirms this gap is real — the file imports neither symbol. However, A explicitly returnedfindings: [], choosing not to structure this as a finding. B did not raise it. Per arbiter rules (reconciliation, not fresh review), it is not surfaced as a structured finding.Result: 1 verified unique-to-B finding kept at its original
infoseverity; 0 agreed findings; 0 dropped after verification.Blast Radius
The diff spans 5 source files (new subscribe.ts handler, metrics.ts counter additions, otel.ts span constant, server.ts route wiring, types.ts new request/response shapes) plus 2 test files. The new route is a fully-wired authorization path replacing a 501 stub, so the blast radius is moderate — it adds a new exported public API surface used across modules but does not alter existing connect/fallback/health logic.
BLAST_SCORE: 4/10
Risk Indicators
handleSubscribe,FALLBACK_USER_PREFIX,recordSubscribeOutcomeCI status (head
8ddbc8fd5319)No CI checks reported for this commit.
Findings (1)
[INFO] Subscribe route handler declared
asyncbut contains noawaitsrc/server.ts:199
The route callback at line 199 is declared
async (request, reply) =>, buthandleSubscribeis a purely synchronous function and noawaitexpression appears anywhere in the callback body. Every call allocates a gratuitous Promise wrapper on the hot subscribe path. An@typescript-eslint/require-awaitrule would flag this. Changing to(request, reply) =>matches the handler's synchronous contract and eliminates the overhead. Verified via Read: lines 199–204 showconst result = handleSubscribe(...)(no await),reply.code(result.status),return result.body— all synchronous.Verdict
CONDITIONAL_APPROVE
hib-pr-reviewer • round 3 • 1 finding (1i) • 2026-06-01T06:48:33.330Z → 2026-06-01T06:50:30.046Z • posted-as: pr-reviewer-bot • model: auto
R4 (kept=1 INFO + 1 prose-noted real gap, both addressed): (1) INFO server.ts:199 — subscribe route callback declared `async` but its body is fully synchronous (handleSubscribe became sync in R2; no await). Dropped `async` → `(request, reply) =>` (no gratuitous Promise per subscribe; satisfies @typescript-eslint/require-await). Fastify serialises the returned object for sync handlers too. (2) Coverage gap (reviewer prose; not a structured finding due to arbiter reconciliation rules, but real per rule 6/63): tests/metrics.test.ts had zero assertions for the new recordSubscribeOutcome / SUBSCRIBE_COUNTER_NAME. Added a `subscribe metrics` describe — renders all 5 outcome labels at zero + TYPE marker; increments the matching label; asserts subscribe outcomes do NOT feed the connect-driven /health degraded gate. Verification: - npm run typecheck (tsc --noEmit, strict) -> clean - npm test -> 133 passed (3 new) - npm run build -> clean, 0 warningsShow previous round
hib-pr-reviewer review — PR #4 (affinity-intelligence-rework/im2be-realtime-service)
Round 4 — head
8fdaa8bb6f7b, basemain, triggersynchronizeTL;DR: CONDITIONAL_APPROVE — kept 1 unique-to-A finding (verified: server.close() unguarded in route tests), dropped nothing; no blocking issues.
Summary
Reconciliation — Round 4
Step 1 — Agreed findings: None. A=1, B=0. No agreed pair to keep automatically.
Step 2 — Unique-to-A verification: Read
tests/handlers.subscribe.test.tsin full (135 lines). The finding is confirmed:it("wires the handler — own-channel subscribe is allowed", async () => {—serveris created inline at line 112.await server.close()is the last statement before the callback closes.serveris again created inline at line 125.await server.close()is the last statement.afterEach(line 14) and uses it at lines 30–32 to reset metrics, but noafterEachcloses the server.expect()on lines 119–120 or 131–132 throws, the Fastify instance remains open and Vitest's process can hang.Finding kept as-is (severity: minor).
Step 3 — Memora persistence: Attempted to persist run-4 summary; tag allowlist errors prevented writes with tags. Absorbed without tags — permission gate blocked absorb as well. Persisted as best-effort.
Outcome: Kept 1 (unique-to-A, verified), dropped 0. CONDITIONAL_APPROVE.
Blast Radius
The diff adds a new
/centrifugo/subscriberoute wired to a pure synchronous authorization handler, a new Prometheus counter family, a new OTel span name constant, and 135 lines of tests. Changes span 8 files across src/ and tests/ but are well-scoped: the handler has no I/O, the metrics addition follows the existing pattern, and the server wiring is a singlefastify.postregistration. No shared data paths or public protocol contracts are altered.BLAST_SCORE: 4/10
Risk Indicators
handleSubscribe,forbidden,logDecisionCI status (head
8fdaa8bb6f7b)No CI checks reported for this commit.
Findings (1)
[MINOR]
server.close()not guarded bytry/finally— leaked Fastify instance if assertion failstests/handlers.subscribe.test.ts:111
Both route-integration tests construct a live
FastifyInstanceinline inside theit()callback (lines 112 and 125) and callawait server.close()only at the tail of the happy path (lines 121 and 133). If anyexpect(…)on lines 119–120 or 131–132 throws before reachingserver.close(), the instance is never torn down. Fastifyinject()skips port binding, but the open handle still prevents Vitest from exiting cleanly — the typical symptom is a hanging test run after a failing assertion.The file already imports
afterEach(line 14) and uses it (lines 30–32) for metrics reset; extend that pattern for server cleanup:Verdict
CONDITIONAL_APPROVE
hib-pr-reviewer • round 4 • 1 finding (1m) • 2026-06-01T06:55:42.787Z → 2026-06-01T06:57:09.098Z • posted-as: pr-reviewer-bot • model: auto
R5 finding (kept=1, MINOR, unique-to-A, verified): the 2 route tests called `await server.close()` after the assertions, so a failed assertion would skip close() and leak the Fastify instance into the test process. Wrapped each route-test body in try { … } finally { await server.close(); }. All R1-R4 findings + this now resolved. Verification: - npm run typecheck (tsc --noEmit, strict) -> clean - npm test -> 133 passedhib-pr-reviewer review — PR #4 (affinity-intelligence-rework/im2be-realtime-service)
Round 5 — head
4a674e43b4a6, basemain, triggersynchronizeTL;DR: NO_NEW_FINDINGS — No new findings this round.
Summary
[quorum-converged] A=0 = B=0. All 6 prior-round findings are resolved in the current commit. No new actionable findings.
Prior-round closures confirmed by file read:
status: 200literal —src/handlers/subscribe.ts:57showsstatus: 200;(literal type, notnumber).channel_namespacelog field — renamed toauthorized_namespaceat line 164; semantics now unambiguous on all paths.asynconhandleSubscribe— function declaration at line 75 is synchronous.async/awaitin 7 unit-testit()callbacks — all 7handleSubscribeunit tests (lines 35–98) are synchronous callbacks.async—server.ts:203handler is sync(request, reply) => {.server.close()not guarded — both route-level integration tests (lines 111–126, 128–142) usetry/finallywithawait server.close().Code quality spot-checks (no new findings):
metrics.ts:158–162renders# HELP/# TYPEheaders and all 5 outcome labels._resetMetricsForTestcorrectly zeroes the newsubscribeCountersmap.tests/metrics.test.tslines 121–152 add a dedicateddescribe("subscribe metrics")block covering zero-rendering, increment, and health-gate isolation — good coverage.handleSubscribespan is started before the try-block andspan.end()lives infinally— no span leak on early returns from malformed/channel/owner branches.[400, 1999]range;temporaryis correctly omitted for all denies in this handler.CI status (head
4a674e43b4a6)No CI checks reported for this commit.
Findings
No new findings this round.
Quorum converged on empty findings (A + B both returned 0).
Verdict
NO_NEW_FINDINGS
hib-pr-reviewer • round 5 • 0 findings • 2026-06-01T07:01:41.996Z → 2026-06-01T07:04:03.135Z • posted-as: pr-reviewer-bot • model: auto • [bookkeeping fallback]