- TypeScript 98.4%
- Dockerfile 1.6%
|
|
||
|---|---|---|
| proto | ||
| src | ||
| tests | ||
| .dockerignore | ||
| .env.example | ||
| .gitignore | ||
| Dockerfile | ||
| LICENSE | ||
| package-lock.json | ||
| package.json | ||
| README.md | ||
| tsconfig.build.json | ||
| tsconfig.json | ||
| vitest.config.ts | ||
im2be-realtime-service
aim2be Centrifugo proxy + (future) Kafka-to-Centrifugo bridge. Node.js 22 + TypeScript + Fastify. Single sharded service hosting the Centrifugo v6 named-proxy callbacks per .planning/22-stage-b-centrifugo-topology.md §5.
Status: PR-OPAQUE-5 shipped the connect_proxy endpoint with full identity-service Validate forwarding. PR-OPAQUE-7 (this PR) wires the static fallback ticket pool consumption per ADR-0002 §3.4 — see Fallback pool consumption below. subscribe / publish / rpc remain stubs returning HTTP 501 pending future PRs.
Tech stack
- Node 22 (ES2022 target, ESM modules).
- Fastify 5 — HTTP server with Pino-default structured logging.
- @grpc/grpc-js + @grpc/proto-loader — dynamic gRPC client to identity-service.
- @opentelemetry/sdk-trace-node + OTLP HTTP exporter — traces via OTLP.
- vitest — unit tests (no network).
What it does today
Client (PWA / Capacitor)
|
| WebSocket(token=<opaque-ticket>)
v
+--------+--------+
| Centrifugo v6 | proxies[*].endpoint =
| (3-replica HA) | http://realtime-service:9730/centrifugo/<type>
+--------+--------+
|
| POST /centrifugo/connect
| body: { client, transport, protocol, encoding,
| data: { token: <opaque-ticket> } }
v
+-------------+-------------+
| im2be-realtime-service |
| - handleConnect(body) |
| - extract ticket |
| - gRPC Validate(ticket) |---+
+-------------+-------------+ |
^ |
| | gRPC realtime.v1
| | ConnectProxyService.Validate
| v
| +-----------+-----------+
+-----+ identity-service |
| ConnectProxyService |
| (Redis GETDEL ...) |
+-----------------------+
The handler translates the gRPC response into the Centrifugo connect-response wire shape:
| Identity-service result | Centrifugo response | HTTP status |
|---|---|---|
Success{degraded=false} |
{ result: { user, info, expire_at } } |
200 |
Success{degraded=true} (identity-side fallback) |
{ result: { user, info: { degraded: true, ... } } } |
200 |
Disconnect branch |
{ disconnect: { code, reason } } |
401 |
gRPC NOT_FOUND / INVALID_ARGUMENT / UNAUTHENTICATED / PERMISSION_DENIED |
{ disconnect: { code: 4401, reason: "invalid ticket" } } |
401 |
Error branch |
{ disconnect: { code: 4503, reason: "ticket validation temporarily unavailable" } } |
503 |
gRPC UNAVAILABLE / DEADLINE_EXCEEDED — outage NOT yet degraded |
{ disconnect: { code: 4503, reason: ... } } |
503 |
gRPC UNAVAILABLE / DEADLINE_EXCEEDED — degraded AND ticket in fallback pool |
{ result: { user: "fallback:<sha256-prefix>", info: { degraded: true, fallback_path: "pool" }, expire_at: <nowS + ttlS> } } |
200 |
gRPC UNAVAILABLE / DEADLINE_EXCEEDED — degraded BUT ticket NOT in pool |
{ disconnect: { code: 4503, reason: ... } } |
503 |
| any other failure | { error: { code: 100, message: "internal error" } } |
500 |
Centrifugo's client.proxy.connect.http.status_to_code_transforms config maps 401 → terminal disconnect 4401 and 5xx → built-in retry-error code 100 (planning §2.2). The explicit 4503 disconnect on transient gRPC failures overrides the retry to surface "ticket plane is down" cleanly to the client.
The info object Centrifugo passes verbatim to subscribe / publish / rpc proxies carries:
{
"family_id": "...",
"subscription_tier": "free|pro|family|unspecified",
"is_admin": false,
"parental_state": { "screen_time_lock": false, "feature_lock_chat": false, "feature_lock_diary": false, "feature_lock_flowcoach": false },
"degraded": true
}
degraded is only set when ADR-0002 §3.4 fallback-pool tickets are in play.
Env vars
| Name | Default | Purpose |
|---|---|---|
LOG_LEVEL |
info |
Pino log level. |
LISTEN_HOST |
0.0.0.0 |
HTTP bind host. |
LISTEN_PORT |
9730 |
HTTP listen port (aim2be 9000-range allocation). |
IDENTITY_VALIDATE_GRPC_ADDRESS |
localhost:50051 |
identity-service ConnectProxyService gRPC target. |
IDENTITY_VALIDATE_NEGOTIATION_TYPE |
plaintext |
plaintext (intra-pod loopback through Envoy sidecar) or tls (direct dev-laptop). |
IDENTITY_VALIDATE_DEADLINE_MS |
2500 |
Per-call deadline. |
OTEL_EXPORTER_OTLP_ENDPOINT |
http://localhost:4318/v1/traces |
OTLP HTTP traces endpoint. |
OTEL_SERVICE_NAME |
aim2be-realtime-service |
OTel service.name resource attribute. |
OTEL_TRACES_SAMPLER_ARG |
1.0 |
TraceIdRatioBased sampler ratio (0..1). |
CENTRIFUGO_PROXY_SECRET |
(empty — disabled) | HMAC-SHA256 key shared with Centrifugo proxy_secret. When set, /centrifugo/* requires X-Centrifugo-Proxy-Signature = hmac_sha256(secret, raw_body). Must be set in production — Envoy mTLS alone cannot pin the caller to Centrifugo specifically; any compromised mesh pod could race-consume single-use tickets without this guard. Empty value logs a loud WARN on startup. |
FALLBACK_TICKETS_PATH |
(empty — disabled) | On-disk path of the static fallback ticket pool (ADR-0002 §3.4). Mounted from the centrifugo-fallback-tickets Kubernetes Secret. Default is empty = disabled; production overlays MUST explicitly set this to the mount path (e.g. /var/run/secrets/centrifugo-fallback-tickets/fallback-tickets.txt) via ConfigMap. An explicit empty value is also honoured as disabled. Missing file is tolerated with a startup WARN. |
FALLBACK_DEGRADED_WINDOW_MS |
5000 |
Consecutive-failure window. Once identity-service has been UNAVAILABLE / DEADLINE_EXCEEDED continuously for longer than this window, the next /centrifugo/connect call MAY consume a fallback-pool ticket. Cross-field invariants (enforced only when the pool is enabled): (a) =0 is rejected — any single gRPC fault would immediately drain the pool; (b) < IDENTITY_VALIDATE_DEADLINE_MS is rejected — a UNAVAILABLE fault (TCP RST in <1 ms) would open the gate before the next legitimate call could complete a deadline-cycle. |
FALLBACK_LOW_WATERMARK |
1000 |
Pool-depletion alert threshold. The realtime_fallback_pool_size gauge crossing below this value fires alertmanager rules. A WARN is logged exactly once on startup (when initial size is below) and exactly once on consumption (when crossing from >= watermark to < watermark). Set to 0 to disable. |
FALLBACK_CONNECT_TTL_S |
300 |
TTL (seconds) applied to expire_at on fallback-pool-accepted connections. Once the deadline passes, Centrifugo disconnects the client; reconnect picks up the real identity if identity-service has recovered. Allowed range: 60..3600 s, enforced only when the pool is enabled (a leftover ConfigMap value in a pool-disabled CI/staging deployment must not crash boot). Without this, a fallback-accepted client would persist indefinitely with info.degraded=true even after the outage ends. |
See .env.example for a copy-paste starting point.
Dev quick-start
npm install
npm test # vitest — no network
npm run typecheck # tsc --noEmit
npm run dev # tsx watch src/main.ts
Hit the connect endpoint:
curl -X POST http://127.0.0.1:9730/centrifugo/connect \
-H 'content-type: application/json' \
-d '{"client":"test-uuid","data":{"token":"some-opaque-ticket"}}'
(For a happy-path response you need a running identity-service ConnectProxyService gRPC server with a populated Redis ticket; for development you can also point at the test stub in identity-service unit tests.)
Build + Docker
npm run build
docker build -t aim2be-realtime-service:local .
docker run --rm -p 9730:9730 aim2be-realtime-service:local
Production deployment manifests live in flux-applications/apps/realtime-service/{base,dev,prod} (TBD — L-1 wiring).
Architecture references
- ADR-0002 — Centrifugo Connection Token: Opaque Ticket via gRPC.
.planning/22-stage-b-centrifugo-topology.md— proxy callback contracts (§2), opaque-ticket flow (§3), realtime-service shape (§5)..planning/23-stage-b-proto-contracts.md—realtime.v1.ConnectProxyServiceproto.identity-service/.../ConnectProxyServiceImpl.java— server-side Validate implementation.- Centrifugo v6.7.x docs — https://centrifugal.dev/docs/server/proxy (connect / subscribe / publish / rpc proxy callbacks).
Fallback pool consumption (ADR-0002 §3.4)
When identity-service ConnectProxyService.Validate is UNAVAILABLE / DEADLINE_EXCEEDED for longer than FALLBACK_DEGRADED_WINDOW_MS (default 5 s), the realtime-service enters degraded mode and may accept a connection by matching the supplied ticket against an in-memory copy of the static fallback ticket pool.
identity-service Validate → UNAVAILABLE / DEADLINE_EXCEEDED
│
v
OutageDetector.notifyFailure() ←─ tracks consecutive-failure window
│
v
outage_age > FALLBACK_DEGRADED_WINDOW_MS ?
│ no → respond 503 (current behaviour)
│ yes
v
ticket in fallbackPool ?
│ no → respond 503 (no bypass for non-pool tickets)
│ yes
v
pool.delete(ticket) ←─ single-use; per-process monotonic depletion
gauge.dec(realtime_fallback_pool_size)
respond 200 { result: { user: "fallback:<sha256(ticket)[..12]>", info: { degraded: true, ... }, expire_at: nowS + ttlS } }
Per-process semantics. The pool is loaded ONCE at startup into an in-memory Set<string> and drains monotonically until process restart. It is NOT re-populated when identity-service recovers — that would defeat the single-use contract. Operator-initiated rotation via mint-fallback-tickets.sh --force + pod rollout is the recovery path.
Downstream fail-closed contract. info.degraded=true in the Centrifugo connect-response result means downstream subscribe_proxy / publish_proxy / rpc_proxy callbacks (future PRs) MUST fail-closed on any sensitive operation. The fallback connection carries NO real user / family / subscription context — only the degraded flag.
Observability.
| Signal | Where | Meaning |
|---|---|---|
realtime_fallback_pool_size gauge |
/metrics |
Current remaining tickets in the pool. Alert when < 1000 (ADR-0002 §3.4). |
realtime_centrifugo_connect_total{outcome="fallback_pool_consumed"} |
/metrics |
Successful fallback acceptances. |
identity_outage.degraded |
/health JSON |
Boolean — true when realtime-service believes identity-service is currently unreachable past the window. |
identity_outage.consecutive_failure_count |
/health JSON |
Reset to 0 on every Validate response of any branch. |
fallback_pool.size |
/health JSON |
Mirrors the gauge. |
fallback_pool.status |
/health JSON |
Static load outcome: loaded (healthy), disabled (default), missing (ENOENT — Secret unprovisioned), unreadable (EACCES / EIO / other — wrong fsGroup / defaultMode / mount), or empty (corrupt/half-rotated file). Lets alerts pivot on the root cause without scraping logs. |
INFO log centrifugo.connect accepted via fallback pool |
stdout | Per-acceptance log line. NEVER logs the ticket value; only the remaining pool size + outage status. |
WARN log fallback ticket pool dropped below low-watermark |
stderr | Emitted exactly once per process when consumption crosses below FALLBACK_LOW_WATERMARK. |
Local-dev fallback path. The default FALLBACK_TICKETS_PATH is empty = disabled, so local dev never opts in. Production overlays explicitly set the path to the mount of the centrifugo-fallback-tickets Secret. When the path is set but the file is missing (e.g. Secret not yet provisioned), the loader tolerates the absence with a startup WARN rather than crashing — the pool stays at size 0 + every degraded-mode connect falls through to 503.
Out of scope for this PR
/centrifugo/subscribe— Channel authorisation against family-service. (Planning §4.2.)/centrifugo/publish— Outbound RPC validation + payload-size enforcement. (Planning §2.3.)/centrifugo/rpc— RPC fan-out to family/diary/calendar/notification.- Kafka consumer + outbound
centrifuge-goclient. (Planning §5.) - SPIRE/SPIFFE workload-API client. (Planning §1.2.)
- Flux deployment overlay for the fallback-tickets Secret mount + the
FALLBACK_*env vars. The application code in this PR consumes the pool; the deployment-side wiring is a separate follow-up PR (PR-OPAQUE-5-FLUX-FOLLOWUP-fallback-mount).