aim2be Centrifugo proxy + Kafka-to-Centrifugo bridge (Node.js + TypeScript). Per planning .planning/22-stage-b-centrifugo-topology.md.
  • TypeScript 98.4%
  • Dockerfile 1.6%
Find a file
2026-06-01 15:38:22 +02:00
proto feat(realtime-service): L0 T0 #6 PR-OPAQUE-5 — initial Centrifugo proxy + Validate gRPC client 2026-05-25 00:51:31 +02:00
src feat(realtime): extract inbound Kafka-header traceparent in the relay (GC-23 (b) consumer half) (#8) 2026-06-01 15:38:22 +02:00
tests feat(realtime): extract inbound Kafka-header traceparent in the relay (GC-23 (b) consumer half) (#8) 2026-06-01 15:38:22 +02:00
.dockerignore feat(realtime-service): L0 T0 #6 PR-OPAQUE-5 — initial Centrifugo proxy + Validate gRPC client 2026-05-25 00:51:31 +02:00
.env.example feat(realtime): Kafka→Centrifugo notification relay (im2be.create.notification → user:<id>) (#6) 2026-06-01 13:42:08 +02:00
.gitignore Initial commit 2026-05-25 00:38:32 +02:00
Dockerfile fix(realtime-service): pin tini version in Dockerfile — R5 review (1 MINOR) 2026-05-25 01:32:24 +02:00
LICENSE chore: add Apache-2.0 LICENSE (#3) 2026-05-27 05:44:45 +02:00
package-lock.json feat(realtime): Kafka→Centrifugo notification relay (im2be.create.notification → user:<id>) (#6) 2026-06-01 13:42:08 +02:00
package.json feat(realtime): Kafka→Centrifugo notification relay (im2be.create.notification → user:<id>) (#6) 2026-06-01 13:42:08 +02:00
README.md fix(realtime-service): PR-OPAQUE-7 R3 verdict — 2 MINOR config-validation symmetry 2026-05-25 04:10:29 +02:00
tsconfig.build.json feat(realtime-service): L0 T0 #6 PR-OPAQUE-5 — initial Centrifugo proxy + Validate gRPC client 2026-05-25 00:51:31 +02:00
tsconfig.json feat(realtime-service): L0 T0 #6 PR-OPAQUE-5 — initial Centrifugo proxy + Validate gRPC client 2026-05-25 00:51:31 +02:00
vitest.config.ts feat(realtime-service): L0 T0 #6 PR-OPAQUE-5 — initial Centrifugo proxy + Validate gRPC client 2026-05-25 00:51:31 +02:00

im2be-realtime-service

aim2be Centrifugo proxy + (future) Kafka-to-Centrifugo bridge. Node.js 22 + TypeScript + Fastify. Single sharded service hosting the Centrifugo v6 named-proxy callbacks per .planning/22-stage-b-centrifugo-topology.md §5.

Status: PR-OPAQUE-5 shipped the connect_proxy endpoint with full identity-service Validate forwarding. PR-OPAQUE-7 (this PR) wires the static fallback ticket pool consumption per ADR-0002 §3.4 — see Fallback pool consumption below. subscribe / publish / rpc remain stubs returning HTTP 501 pending future PRs.

Tech stack

  • Node 22 (ES2022 target, ESM modules).
  • Fastify 5 — HTTP server with Pino-default structured logging.
  • @grpc/grpc-js + @grpc/proto-loader — dynamic gRPC client to identity-service.
  • @opentelemetry/sdk-trace-node + OTLP HTTP exporter — traces via OTLP.
  • vitest — unit tests (no network).

What it does today

        Client (PWA / Capacitor)
                 |
                 |  WebSocket(token=<opaque-ticket>)
                 v
        +--------+--------+
        |  Centrifugo v6  |  proxies[*].endpoint =
        | (3-replica HA)  |    http://realtime-service:9730/centrifugo/<type>
        +--------+--------+
                 |
                 |  POST /centrifugo/connect
                 |  body: { client, transport, protocol, encoding,
                 |          data: { token: <opaque-ticket> } }
                 v
   +-------------+-------------+
   |   im2be-realtime-service  |
   |  - handleConnect(body)    |
   |  - extract ticket         |
   |  - gRPC Validate(ticket)  |---+
   +-------------+-------------+   |
                 ^                 |
                 |                 | gRPC realtime.v1
                 |                 | ConnectProxyService.Validate
                 |                 v
                 |     +-----------+-----------+
                 +-----+   identity-service    |
                       |  ConnectProxyService  |
                       |   (Redis GETDEL ...)  |
                       +-----------------------+

The handler translates the gRPC response into the Centrifugo connect-response wire shape:

Identity-service result Centrifugo response HTTP status
Success{degraded=false} { result: { user, info, expire_at } } 200
Success{degraded=true} (identity-side fallback) { result: { user, info: { degraded: true, ... } } } 200
Disconnect branch { disconnect: { code, reason } } 401
gRPC NOT_FOUND / INVALID_ARGUMENT / UNAUTHENTICATED / PERMISSION_DENIED { disconnect: { code: 4401, reason: "invalid ticket" } } 401
Error branch { disconnect: { code: 4503, reason: "ticket validation temporarily unavailable" } } 503
gRPC UNAVAILABLE / DEADLINE_EXCEEDED — outage NOT yet degraded { disconnect: { code: 4503, reason: ... } } 503
gRPC UNAVAILABLE / DEADLINE_EXCEEDED — degraded AND ticket in fallback pool { result: { user: "fallback:<sha256-prefix>", info: { degraded: true, fallback_path: "pool" }, expire_at: <nowS + ttlS> } } 200
gRPC UNAVAILABLE / DEADLINE_EXCEEDED — degraded BUT ticket NOT in pool { disconnect: { code: 4503, reason: ... } } 503
any other failure { error: { code: 100, message: "internal error" } } 500

Centrifugo's client.proxy.connect.http.status_to_code_transforms config maps 401 → terminal disconnect 4401 and 5xx → built-in retry-error code 100 (planning §2.2). The explicit 4503 disconnect on transient gRPC failures overrides the retry to surface "ticket plane is down" cleanly to the client.

The info object Centrifugo passes verbatim to subscribe / publish / rpc proxies carries:

{
  "family_id": "...",
  "subscription_tier": "free|pro|family|unspecified",
  "is_admin": false,
  "parental_state": { "screen_time_lock": false, "feature_lock_chat": false, "feature_lock_diary": false, "feature_lock_flowcoach": false },
  "degraded": true
}

degraded is only set when ADR-0002 §3.4 fallback-pool tickets are in play.

Env vars

Name Default Purpose
LOG_LEVEL info Pino log level.
LISTEN_HOST 0.0.0.0 HTTP bind host.
LISTEN_PORT 9730 HTTP listen port (aim2be 9000-range allocation).
IDENTITY_VALIDATE_GRPC_ADDRESS localhost:50051 identity-service ConnectProxyService gRPC target.
IDENTITY_VALIDATE_NEGOTIATION_TYPE plaintext plaintext (intra-pod loopback through Envoy sidecar) or tls (direct dev-laptop).
IDENTITY_VALIDATE_DEADLINE_MS 2500 Per-call deadline.
OTEL_EXPORTER_OTLP_ENDPOINT http://localhost:4318/v1/traces OTLP HTTP traces endpoint.
OTEL_SERVICE_NAME aim2be-realtime-service OTel service.name resource attribute.
OTEL_TRACES_SAMPLER_ARG 1.0 TraceIdRatioBased sampler ratio (0..1).
CENTRIFUGO_PROXY_SECRET (empty — disabled) HMAC-SHA256 key shared with Centrifugo proxy_secret. When set, /centrifugo/* requires X-Centrifugo-Proxy-Signature = hmac_sha256(secret, raw_body). Must be set in production — Envoy mTLS alone cannot pin the caller to Centrifugo specifically; any compromised mesh pod could race-consume single-use tickets without this guard. Empty value logs a loud WARN on startup.
FALLBACK_TICKETS_PATH (empty — disabled) On-disk path of the static fallback ticket pool (ADR-0002 §3.4). Mounted from the centrifugo-fallback-tickets Kubernetes Secret. Default is empty = disabled; production overlays MUST explicitly set this to the mount path (e.g. /var/run/secrets/centrifugo-fallback-tickets/fallback-tickets.txt) via ConfigMap. An explicit empty value is also honoured as disabled. Missing file is tolerated with a startup WARN.
FALLBACK_DEGRADED_WINDOW_MS 5000 Consecutive-failure window. Once identity-service has been UNAVAILABLE / DEADLINE_EXCEEDED continuously for longer than this window, the next /centrifugo/connect call MAY consume a fallback-pool ticket. Cross-field invariants (enforced only when the pool is enabled): (a) =0 is rejected — any single gRPC fault would immediately drain the pool; (b) < IDENTITY_VALIDATE_DEADLINE_MS is rejected — a UNAVAILABLE fault (TCP RST in <1 ms) would open the gate before the next legitimate call could complete a deadline-cycle.
FALLBACK_LOW_WATERMARK 1000 Pool-depletion alert threshold. The realtime_fallback_pool_size gauge crossing below this value fires alertmanager rules. A WARN is logged exactly once on startup (when initial size is below) and exactly once on consumption (when crossing from >= watermark to < watermark). Set to 0 to disable.
FALLBACK_CONNECT_TTL_S 300 TTL (seconds) applied to expire_at on fallback-pool-accepted connections. Once the deadline passes, Centrifugo disconnects the client; reconnect picks up the real identity if identity-service has recovered. Allowed range: 60..3600 s, enforced only when the pool is enabled (a leftover ConfigMap value in a pool-disabled CI/staging deployment must not crash boot). Without this, a fallback-accepted client would persist indefinitely with info.degraded=true even after the outage ends.

See .env.example for a copy-paste starting point.

Dev quick-start

npm install
npm test           # vitest — no network
npm run typecheck  # tsc --noEmit
npm run dev        # tsx watch src/main.ts

Hit the connect endpoint:

curl -X POST http://127.0.0.1:9730/centrifugo/connect \
  -H 'content-type: application/json' \
  -d '{"client":"test-uuid","data":{"token":"some-opaque-ticket"}}'

(For a happy-path response you need a running identity-service ConnectProxyService gRPC server with a populated Redis ticket; for development you can also point at the test stub in identity-service unit tests.)

Build + Docker

npm run build
docker build -t aim2be-realtime-service:local .
docker run --rm -p 9730:9730 aim2be-realtime-service:local

Production deployment manifests live in flux-applications/apps/realtime-service/{base,dev,prod} (TBD — L-1 wiring).

Architecture references

  • ADR-0002 — Centrifugo Connection Token: Opaque Ticket via gRPC.
  • .planning/22-stage-b-centrifugo-topology.md — proxy callback contracts (§2), opaque-ticket flow (§3), realtime-service shape (§5).
  • .planning/23-stage-b-proto-contracts.mdrealtime.v1.ConnectProxyService proto.
  • identity-service/.../ConnectProxyServiceImpl.java — server-side Validate implementation.
  • Centrifugo v6.7.x docshttps://centrifugal.dev/docs/server/proxy (connect / subscribe / publish / rpc proxy callbacks).

Fallback pool consumption (ADR-0002 §3.4)

When identity-service ConnectProxyService.Validate is UNAVAILABLE / DEADLINE_EXCEEDED for longer than FALLBACK_DEGRADED_WINDOW_MS (default 5 s), the realtime-service enters degraded mode and may accept a connection by matching the supplied ticket against an in-memory copy of the static fallback ticket pool.

identity-service Validate → UNAVAILABLE / DEADLINE_EXCEEDED
   │
   v
OutageDetector.notifyFailure()                     ←─ tracks consecutive-failure window
   │
   v
outage_age > FALLBACK_DEGRADED_WINDOW_MS ?
   │ no  → respond 503 (current behaviour)
   │ yes
   v
ticket in fallbackPool ?
   │ no  → respond 503 (no bypass for non-pool tickets)
   │ yes
   v
pool.delete(ticket)                                ←─ single-use; per-process monotonic depletion
gauge.dec(realtime_fallback_pool_size)
respond 200 { result: { user: "fallback:<sha256(ticket)[..12]>", info: { degraded: true, ... }, expire_at: nowS + ttlS } }

Per-process semantics. The pool is loaded ONCE at startup into an in-memory Set<string> and drains monotonically until process restart. It is NOT re-populated when identity-service recovers — that would defeat the single-use contract. Operator-initiated rotation via mint-fallback-tickets.sh --force + pod rollout is the recovery path.

Downstream fail-closed contract. info.degraded=true in the Centrifugo connect-response result means downstream subscribe_proxy / publish_proxy / rpc_proxy callbacks (future PRs) MUST fail-closed on any sensitive operation. The fallback connection carries NO real user / family / subscription context — only the degraded flag.

Observability.

Signal Where Meaning
realtime_fallback_pool_size gauge /metrics Current remaining tickets in the pool. Alert when < 1000 (ADR-0002 §3.4).
realtime_centrifugo_connect_total{outcome="fallback_pool_consumed"} /metrics Successful fallback acceptances.
identity_outage.degraded /health JSON Boolean — true when realtime-service believes identity-service is currently unreachable past the window.
identity_outage.consecutive_failure_count /health JSON Reset to 0 on every Validate response of any branch.
fallback_pool.size /health JSON Mirrors the gauge.
fallback_pool.status /health JSON Static load outcome: loaded (healthy), disabled (default), missing (ENOENT — Secret unprovisioned), unreadable (EACCES / EIO / other — wrong fsGroup / defaultMode / mount), or empty (corrupt/half-rotated file). Lets alerts pivot on the root cause without scraping logs.
INFO log centrifugo.connect accepted via fallback pool stdout Per-acceptance log line. NEVER logs the ticket value; only the remaining pool size + outage status.
WARN log fallback ticket pool dropped below low-watermark stderr Emitted exactly once per process when consumption crosses below FALLBACK_LOW_WATERMARK.

Local-dev fallback path. The default FALLBACK_TICKETS_PATH is empty = disabled, so local dev never opts in. Production overlays explicitly set the path to the mount of the centrifugo-fallback-tickets Secret. When the path is set but the file is missing (e.g. Secret not yet provisioned), the loader tolerates the absence with a startup WARN rather than crashing — the pool stays at size 0 + every degraded-mode connect falls through to 503.

Out of scope for this PR

  • /centrifugo/subscribe — Channel authorisation against family-service. (Planning §4.2.)
  • /centrifugo/publish — Outbound RPC validation + payload-size enforcement. (Planning §2.3.)
  • /centrifugo/rpc — RPC fan-out to family/diary/calendar/notification.
  • Kafka consumer + outbound centrifuge-go client. (Planning §5.)
  • SPIRE/SPIFFE workload-API client. (Planning §1.2.)
  • Flux deployment overlay for the fallback-tickets Secret mount + the FALLBACK_* env vars. The application code in this PR consumes the pool; the deployment-side wiring is a separate follow-up PR (PR-OPAQUE-5-FLUX-FOLLOWUP-fallback-mount).