agent-orchestrator/ui-electrobun/src/bun/relay-client.ts
Hibryda 0f75cb8e32 fix(electrobun): complete all 16 Codex #3 findings
CRITICAL:
- Message persistence race: snapshot batchEnd before async save
- Double-start guard: startingProjects Set prevents concurrent launches
- Symlink path traversal: fs.realpathSync() in path-guard.ts
- Relay false success: connect() returns { ok, machineId, error }

HIGH:
- Session restore skips if active session exists
- Remote remove: new RPC, cleans backend map
- Task board poll token: stale responses discarded after drag-drop
- Health concurrent tools: toolsInFlight counter (was boolean)
- bttask transactions: delete wraps comments+task, addComment validates
- PTY buffer cleared on reconnect
- PTY large paste: chunked String.fromCharCode (8KB chunks)
- Sidecar max line: 10MB limit prevents unbounded memory
- btmsg authorization: group validation, channel membership checks

MEDIUM:
- Session retention: max 5 per project, purgeSession/untrackProject
- Relay IPv6: URL parser replaces string split
- PTY schema: fixed misleading base64 comment
2026-03-22 02:52:04 +01:00

351 lines
10 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* WebSocket client for connecting to agor-relay instances.
*
* Features:
* - Token-based auth handshake (Bearer header)
* - Exponential backoff reconnection (1s30s cap)
* - TCP probe before full WS upgrade on reconnect
* - Per-connection command routing
* - Event forwarding to webview via callback
*/
import { randomUUID } from "crypto";
import { Socket } from "net";
// ── Types ──────────────────────────────────────────────────────────────────
export type ConnectionStatus = "connecting" | "connected" | "disconnected" | "error";
export interface RelayCommand {
id: string;
type: string;
payload: Record<string, unknown>;
}
export interface RelayEvent {
type: string;
sessionId?: string;
machineId?: string;
payload?: unknown;
}
export type EventCallback = (machineId: string, event: RelayEvent) => void;
export type StatusCallback = (machineId: string, status: ConnectionStatus, error?: string) => void;
interface MachineConnection {
machineId: string;
label: string;
url: string;
token: string;
status: ConnectionStatus;
latencyMs: number | null;
ws: WebSocket | null;
heartbeatTimer: ReturnType<typeof setInterval> | null;
reconnectTimer: ReturnType<typeof setTimeout> | null;
cancelled: boolean;
lastPingSent: number;
}
// ── Relay Client ───────────────────────────────────────────────────────────
export class RelayClient {
private machines = new Map<string, MachineConnection>();
private eventListeners: EventCallback[] = [];
private statusListeners: StatusCallback[] = [];
/** Register an event listener for relay events from any machine. */
onEvent(cb: EventCallback): void {
this.eventListeners.push(cb);
}
/** Register a listener for connection status changes. */
onStatus(cb: StatusCallback): void {
this.statusListeners.push(cb);
}
/**
* Connect to an agor-relay instance.
* Fix #4 (Codex audit): Returns { ok, machineId, error } instead of always
* returning machineId even on failure.
*/
async connect(url: string, token: string, label?: string): Promise<{ ok: boolean; machineId?: string; error?: string }> {
const machineId = randomUUID();
const machine: MachineConnection = {
machineId,
label: label ?? url,
url,
token,
status: "connecting",
latencyMs: null,
ws: null,
heartbeatTimer: null,
reconnectTimer: null,
cancelled: false,
lastPingSent: 0,
};
this.machines.set(machineId, machine);
this.emitStatus(machineId, "connecting");
try {
await this.openWebSocket(machine);
return { ok: true, machineId };
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
machine.status = "error";
this.emitStatus(machineId, "error", msg);
this.scheduleReconnect(machine);
return { ok: false, machineId, error: msg };
}
}
/** Disconnect from a relay and stop reconnection attempts. */
disconnect(machineId: string): void {
const machine = this.machines.get(machineId);
if (!machine) return;
machine.cancelled = true;
this.cleanupConnection(machine);
machine.status = "disconnected";
this.emitStatus(machineId, "disconnected");
}
/** Remove a machine entirely from tracking. */
removeMachine(machineId: string): void {
this.disconnect(machineId);
this.machines.delete(machineId);
}
/** Send a command to a connected relay. */
sendCommand(machineId: string, type: string, payload: Record<string, unknown>): void {
const machine = this.machines.get(machineId);
if (!machine?.ws || machine.status !== "connected") {
throw new Error(`Machine ${machineId} not connected`);
}
const cmd: RelayCommand = {
id: randomUUID(),
type,
payload,
};
machine.ws.send(JSON.stringify(cmd));
}
/** Get the status of a specific machine. */
getStatus(machineId: string): { status: ConnectionStatus; latencyMs: number | null } | null {
const machine = this.machines.get(machineId);
if (!machine) return null;
return { status: machine.status, latencyMs: machine.latencyMs };
}
/** List all tracked machines. */
listMachines(): Array<{
machineId: string;
label: string;
url: string;
status: ConnectionStatus;
latencyMs: number | null;
}> {
return Array.from(this.machines.values()).map((m) => ({
machineId: m.machineId,
label: m.label,
url: m.url,
status: m.status,
latencyMs: m.latencyMs,
}));
}
// ── Internal ─────────────────────────────────────────────────────────────
private async openWebSocket(machine: MachineConnection): Promise<void> {
return new Promise<void>((resolve, reject) => {
const ws = new WebSocket(machine.url, {
headers: {
Authorization: `Bearer ${machine.token}`,
},
} as unknown as string[]);
const timeout = setTimeout(() => {
ws.close();
reject(new Error("Connection timeout (10s)"));
}, 10_000);
ws.addEventListener("open", () => {
clearTimeout(timeout);
machine.ws = ws;
machine.status = "connected";
machine.cancelled = false;
this.emitStatus(machine.machineId, "connected");
this.startHeartbeat(machine);
resolve();
});
ws.addEventListener("message", (ev) => {
this.handleMessage(machine, String(ev.data));
});
ws.addEventListener("close", () => {
clearTimeout(timeout);
if (machine.status === "connected") {
this.cleanupConnection(machine);
machine.status = "disconnected";
this.emitStatus(machine.machineId, "disconnected");
if (!machine.cancelled) {
this.scheduleReconnect(machine);
}
}
});
ws.addEventListener("error", (ev) => {
clearTimeout(timeout);
const errMsg = "WebSocket error";
if (machine.status !== "connected") {
reject(new Error(errMsg));
} else {
this.cleanupConnection(machine);
machine.status = "error";
this.emitStatus(machine.machineId, "error", errMsg);
if (!machine.cancelled) {
this.scheduleReconnect(machine);
}
}
});
});
}
private handleMessage(machine: MachineConnection, data: string): void {
let event: RelayEvent;
try {
event = JSON.parse(data) as RelayEvent;
} catch {
console.error(`[relay] Invalid JSON from ${machine.machineId}`);
return;
}
// Handle pong for latency measurement
if (event.type === "pong") {
if (machine.lastPingSent > 0) {
machine.latencyMs = Date.now() - machine.lastPingSent;
}
return;
}
// Forward all other events
event.machineId = machine.machineId;
for (const cb of this.eventListeners) {
try {
cb(machine.machineId, event);
} catch (err) {
console.error("[relay] Event listener error:", err);
}
}
}
private startHeartbeat(machine: MachineConnection): void {
this.stopHeartbeat(machine);
machine.heartbeatTimer = setInterval(() => {
if (machine.ws?.readyState === WebSocket.OPEN) {
machine.lastPingSent = Date.now();
machine.ws.send(JSON.stringify({ id: "", type: "ping", payload: {} }));
}
}, 15_000);
}
private stopHeartbeat(machine: MachineConnection): void {
if (machine.heartbeatTimer) {
clearInterval(machine.heartbeatTimer);
machine.heartbeatTimer = null;
}
}
private cleanupConnection(machine: MachineConnection): void {
this.stopHeartbeat(machine);
if (machine.reconnectTimer) {
clearTimeout(machine.reconnectTimer);
machine.reconnectTimer = null;
}
if (machine.ws) {
try { machine.ws.close(); } catch { /* ignore */ }
machine.ws = null;
}
}
private scheduleReconnect(machine: MachineConnection): void {
let delay = 1_000;
const maxDelay = 30_000;
const attempt = async () => {
if (machine.cancelled || !this.machines.has(machine.machineId)) return;
machine.status = "connecting";
this.emitStatus(machine.machineId, "connecting");
// TCP probe first — avoids full WS overhead if host unreachable
const probeOk = await this.tcpProbe(machine.url);
if (!probeOk) {
delay = Math.min(delay * 2, maxDelay);
if (!machine.cancelled) {
machine.reconnectTimer = setTimeout(attempt, delay);
}
return;
}
try {
await this.openWebSocket(machine);
// Success — reset
} catch {
delay = Math.min(delay * 2, maxDelay);
if (!machine.cancelled) {
machine.reconnectTimer = setTimeout(attempt, delay);
}
}
};
machine.reconnectTimer = setTimeout(attempt, delay);
}
/**
* TCP-only probe to check if the relay host is reachable.
* Fix #15 (Codex audit): Uses URL() to correctly parse IPv6, ports, etc.
*/
private tcpProbe(wsUrl: string): Promise<boolean> {
return new Promise((resolve) => {
let hostname: string;
let port: number;
try {
// Convert ws/wss to http/https so URL() can parse it
const httpUrl = wsUrl.replace(/^ws(s)?:\/\//, "http$1://");
const parsed = new URL(httpUrl);
hostname = parsed.hostname; // strips IPv6 brackets automatically
port = parsed.port ? parseInt(parsed.port, 10) : 9750;
} catch {
resolve(false);
return;
}
const socket = new Socket();
const timer = setTimeout(() => { socket.destroy(); resolve(false); }, 5_000);
socket.connect(port, hostname, () => {
clearTimeout(timer);
socket.destroy();
resolve(true);
});
socket.on("error", () => {
clearTimeout(timer);
socket.destroy();
resolve(false);
});
});
}
private emitStatus(machineId: string, status: ConnectionStatus, error?: string): void {
for (const cb of this.statusListeners) {
try {
cb(machineId, status, error);
} catch (err) {
console.error("[relay] Status listener error:", err);
}
}
}
}