feat(electrobun): multi-machine relay + OTEL telemetry

Multi-machine relay:
- relay-client.ts: WebSocket client for agor-relay with token auth,
  exponential backoff (1s-30s), TCP probe, heartbeat (15s ping)
- machines-store.svelte.ts: remote machine state tracking
- RemoteMachinesSettings.svelte: machine list, add/connect/disconnect UI
- 7 RPC types (remote.connect/disconnect/list/send/status + events)

Telemetry:
- telemetry.ts: OTEL spans + OTLP/HTTP export to Tempo,
  controlled by AGOR_OTLP_ENDPOINT env var
- telemetry-bridge.ts: tel.info/warn/error frontend convenience API
- telemetry.log RPC for frontend→Bun tracing
This commit is contained in:
Hibryda 2026-03-22 01:46:03 +01:00
parent ec30c69c3e
commit 88206205fe
11 changed files with 1458 additions and 15 deletions

View file

@ -10,9 +10,15 @@ import { SidecarManager } from "./sidecar-manager.ts";
import type { PtyRPCSchema } from "../shared/pty-rpc-schema.ts";
import { randomUUID } from "crypto";
import { SearchDb } from "./search-db.ts";
import { checkForUpdates, getLastCheckTimestamp } from "./updater.ts";
import { RelayClient } from "./relay-client.ts";
import { initTelemetry, telemetry } from "./telemetry.ts";
import { homedir } from "os";
import { join } from "path";
/** Current app version — sourced from electrobun.config.ts at build time. */
const APP_VERSION = "0.0.1";
const DEV_SERVER_PORT = 9760; // Project convention: 9700+ range
const DEV_SERVER_URL = `http://localhost:${DEV_SERVER_PORT}`;
@ -21,8 +27,12 @@ const DEV_SERVER_URL = `http://localhost:${DEV_SERVER_PORT}`;
const ptyClient = new PtyClient();
const sidecarManager = new SidecarManager();
const searchDb = new SearchDb();
const relayClient = new RelayClient();
const PLUGINS_DIR = join(homedir(), ".config", "agor", "plugins");
// Initialize telemetry (console-only unless AGOR_OTLP_ENDPOINT is set)
initTelemetry();
async function connectToDaemon(retries = 5, delayMs = 500): Promise<boolean> {
for (let attempt = 1; attempt <= retries; attempt++) {
try {
@ -874,6 +884,103 @@ const rpc = BrowserView.defineRPC<PtyRPCSchema>({
return { ok: false, content: "", error };
}
},
// ── Updater handlers ──────────────────────────────────────────────────
"updater.check": async () => {
try {
const result = await checkForUpdates(APP_VERSION);
return { ...result, error: undefined };
} catch (err) {
const error = err instanceof Error ? err.message : String(err);
console.error("[updater.check]", err);
return {
available: false,
version: "",
downloadUrl: "",
releaseNotes: "",
checkedAt: Date.now(),
error,
};
}
},
"updater.getVersion": () => {
return {
version: APP_VERSION,
lastCheck: getLastCheckTimestamp(),
};
},
// ── Remote machine (relay) handlers ──────────────────────────────────
"remote.connect": async ({ url, token, label }) => {
try {
const machineId = await relayClient.connect(url, token, label);
return { ok: true, machineId };
} catch (err) {
const error = err instanceof Error ? err.message : String(err);
console.error("[remote.connect]", err);
return { ok: false, error };
}
},
"remote.disconnect": ({ machineId }) => {
try {
relayClient.disconnect(machineId);
return { ok: true };
} catch (err) {
const error = err instanceof Error ? err.message : String(err);
console.error("[remote.disconnect]", err);
return { ok: false, error };
}
},
"remote.list": () => {
try {
return { machines: relayClient.listMachines() };
} catch (err) {
console.error("[remote.list]", err);
return { machines: [] };
}
},
"remote.send": ({ machineId, command, payload }) => {
try {
relayClient.sendCommand(machineId, command, payload);
return { ok: true };
} catch (err) {
const error = err instanceof Error ? err.message : String(err);
console.error("[remote.send]", err);
return { ok: false, error };
}
},
"remote.status": ({ machineId }) => {
try {
const info = relayClient.getStatus(machineId);
if (!info) {
return { status: "disconnected" as const, latencyMs: null, error: "Machine not found" };
}
return { status: info.status, latencyMs: info.latencyMs };
} catch (err) {
const error = err instanceof Error ? err.message : String(err);
console.error("[remote.status]", err);
return { status: "error" as const, latencyMs: null, error };
}
},
// ── Telemetry handler ────────────────────────────────────────────────
"telemetry.log": ({ level, message, attributes }) => {
try {
telemetry.log(level, `[frontend] ${message}`, attributes ?? {});
return { ok: true };
} catch (err) {
console.error("[telemetry.log]", err);
return { ok: false };
}
},
},
messages: {},
@ -900,6 +1007,29 @@ ptyClient.on("session_closed", (msg) => {
}
});
// ── Forward relay events to WebView ─────────────────────────────────────────
relayClient.onEvent((machineId, event) => {
try {
rpc.send["remote.event"]({
machineId,
eventType: event.type,
sessionId: event.sessionId,
payload: event.payload,
});
} catch (err) {
console.error("[remote.event] forward error:", err);
}
});
relayClient.onStatus((machineId, status, error) => {
try {
rpc.send["remote.statusChange"]({ machineId, status, error });
} catch (err) {
console.error("[remote.statusChange] forward error:", err);
}
});
// ── App window ───────────────────────────────────────────────────────────────
async function getMainViewUrl(): Promise<string> {

View file

@ -0,0 +1,343 @@
/**
* WebSocket client for connecting to agor-relay instances.
*
* Features:
* - Token-based auth handshake (Bearer header)
* - Exponential backoff reconnection (1s30s cap)
* - TCP probe before full WS upgrade on reconnect
* - Per-connection command routing
* - Event forwarding to webview via callback
*/
import { randomUUID } from "crypto";
import { Socket } from "net";
// ── Types ──────────────────────────────────────────────────────────────────
export type ConnectionStatus = "connecting" | "connected" | "disconnected" | "error";
export interface RelayCommand {
id: string;
type: string;
payload: Record<string, unknown>;
}
export interface RelayEvent {
type: string;
sessionId?: string;
machineId?: string;
payload?: unknown;
}
export type EventCallback = (machineId: string, event: RelayEvent) => void;
export type StatusCallback = (machineId: string, status: ConnectionStatus, error?: string) => void;
interface MachineConnection {
machineId: string;
label: string;
url: string;
token: string;
status: ConnectionStatus;
latencyMs: number | null;
ws: WebSocket | null;
heartbeatTimer: ReturnType<typeof setInterval> | null;
reconnectTimer: ReturnType<typeof setTimeout> | null;
cancelled: boolean;
lastPingSent: number;
}
// ── Relay Client ───────────────────────────────────────────────────────────
export class RelayClient {
private machines = new Map<string, MachineConnection>();
private eventListeners: EventCallback[] = [];
private statusListeners: StatusCallback[] = [];
/** Register an event listener for relay events from any machine. */
onEvent(cb: EventCallback): void {
this.eventListeners.push(cb);
}
/** Register a listener for connection status changes. */
onStatus(cb: StatusCallback): void {
this.statusListeners.push(cb);
}
/** Connect to an agor-relay instance. Returns a machine ID. */
async connect(url: string, token: string, label?: string): Promise<string> {
const machineId = randomUUID();
const machine: MachineConnection = {
machineId,
label: label ?? url,
url,
token,
status: "connecting",
latencyMs: null,
ws: null,
heartbeatTimer: null,
reconnectTimer: null,
cancelled: false,
lastPingSent: 0,
};
this.machines.set(machineId, machine);
this.emitStatus(machineId, "connecting");
try {
await this.openWebSocket(machine);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
machine.status = "error";
this.emitStatus(machineId, "error", msg);
this.scheduleReconnect(machine);
}
return machineId;
}
/** Disconnect from a relay and stop reconnection attempts. */
disconnect(machineId: string): void {
const machine = this.machines.get(machineId);
if (!machine) return;
machine.cancelled = true;
this.cleanupConnection(machine);
machine.status = "disconnected";
this.emitStatus(machineId, "disconnected");
}
/** Remove a machine entirely from tracking. */
removeMachine(machineId: string): void {
this.disconnect(machineId);
this.machines.delete(machineId);
}
/** Send a command to a connected relay. */
sendCommand(machineId: string, type: string, payload: Record<string, unknown>): void {
const machine = this.machines.get(machineId);
if (!machine?.ws || machine.status !== "connected") {
throw new Error(`Machine ${machineId} not connected`);
}
const cmd: RelayCommand = {
id: randomUUID(),
type,
payload,
};
machine.ws.send(JSON.stringify(cmd));
}
/** Get the status of a specific machine. */
getStatus(machineId: string): { status: ConnectionStatus; latencyMs: number | null } | null {
const machine = this.machines.get(machineId);
if (!machine) return null;
return { status: machine.status, latencyMs: machine.latencyMs };
}
/** List all tracked machines. */
listMachines(): Array<{
machineId: string;
label: string;
url: string;
status: ConnectionStatus;
latencyMs: number | null;
}> {
return Array.from(this.machines.values()).map((m) => ({
machineId: m.machineId,
label: m.label,
url: m.url,
status: m.status,
latencyMs: m.latencyMs,
}));
}
// ── Internal ─────────────────────────────────────────────────────────────
private async openWebSocket(machine: MachineConnection): Promise<void> {
return new Promise<void>((resolve, reject) => {
const ws = new WebSocket(machine.url, {
headers: {
Authorization: `Bearer ${machine.token}`,
},
} as unknown as string[]);
const timeout = setTimeout(() => {
ws.close();
reject(new Error("Connection timeout (10s)"));
}, 10_000);
ws.addEventListener("open", () => {
clearTimeout(timeout);
machine.ws = ws;
machine.status = "connected";
machine.cancelled = false;
this.emitStatus(machine.machineId, "connected");
this.startHeartbeat(machine);
resolve();
});
ws.addEventListener("message", (ev) => {
this.handleMessage(machine, String(ev.data));
});
ws.addEventListener("close", () => {
clearTimeout(timeout);
if (machine.status === "connected") {
this.cleanupConnection(machine);
machine.status = "disconnected";
this.emitStatus(machine.machineId, "disconnected");
if (!machine.cancelled) {
this.scheduleReconnect(machine);
}
}
});
ws.addEventListener("error", (ev) => {
clearTimeout(timeout);
const errMsg = "WebSocket error";
if (machine.status !== "connected") {
reject(new Error(errMsg));
} else {
this.cleanupConnection(machine);
machine.status = "error";
this.emitStatus(machine.machineId, "error", errMsg);
if (!machine.cancelled) {
this.scheduleReconnect(machine);
}
}
});
});
}
private handleMessage(machine: MachineConnection, data: string): void {
let event: RelayEvent;
try {
event = JSON.parse(data) as RelayEvent;
} catch {
console.error(`[relay] Invalid JSON from ${machine.machineId}`);
return;
}
// Handle pong for latency measurement
if (event.type === "pong") {
if (machine.lastPingSent > 0) {
machine.latencyMs = Date.now() - machine.lastPingSent;
}
return;
}
// Forward all other events
event.machineId = machine.machineId;
for (const cb of this.eventListeners) {
try {
cb(machine.machineId, event);
} catch (err) {
console.error("[relay] Event listener error:", err);
}
}
}
private startHeartbeat(machine: MachineConnection): void {
this.stopHeartbeat(machine);
machine.heartbeatTimer = setInterval(() => {
if (machine.ws?.readyState === WebSocket.OPEN) {
machine.lastPingSent = Date.now();
machine.ws.send(JSON.stringify({ id: "", type: "ping", payload: {} }));
}
}, 15_000);
}
private stopHeartbeat(machine: MachineConnection): void {
if (machine.heartbeatTimer) {
clearInterval(machine.heartbeatTimer);
machine.heartbeatTimer = null;
}
}
private cleanupConnection(machine: MachineConnection): void {
this.stopHeartbeat(machine);
if (machine.reconnectTimer) {
clearTimeout(machine.reconnectTimer);
machine.reconnectTimer = null;
}
if (machine.ws) {
try { machine.ws.close(); } catch { /* ignore */ }
machine.ws = null;
}
}
private scheduleReconnect(machine: MachineConnection): void {
let delay = 1_000;
const maxDelay = 30_000;
const attempt = async () => {
if (machine.cancelled || !this.machines.has(machine.machineId)) return;
machine.status = "connecting";
this.emitStatus(machine.machineId, "connecting");
// TCP probe first — avoids full WS overhead if host unreachable
const probeOk = await this.tcpProbe(machine.url);
if (!probeOk) {
delay = Math.min(delay * 2, maxDelay);
if (!machine.cancelled) {
machine.reconnectTimer = setTimeout(attempt, delay);
}
return;
}
try {
await this.openWebSocket(machine);
// Success — reset
} catch {
delay = Math.min(delay * 2, maxDelay);
if (!machine.cancelled) {
machine.reconnectTimer = setTimeout(attempt, delay);
}
}
};
machine.reconnectTimer = setTimeout(attempt, delay);
}
/** TCP-only probe to check if the relay host is reachable. */
private tcpProbe(url: string): Promise<boolean> {
return new Promise((resolve) => {
const host = this.extractHost(url);
if (!host) { resolve(false); return; }
const [hostname, portStr] = host.includes(":")
? [host.split(":")[0], host.split(":")[1]]
: [host, "9750"];
const port = parseInt(portStr, 10);
const socket = new Socket();
const timer = setTimeout(() => { socket.destroy(); resolve(false); }, 5_000);
socket.connect(port, hostname, () => {
clearTimeout(timer);
socket.destroy();
resolve(true);
});
socket.on("error", () => {
clearTimeout(timer);
socket.destroy();
resolve(false);
});
});
}
private extractHost(url: string): string | null {
return url.replace("wss://", "").replace("ws://", "").split("/")[0] ?? null;
}
private emitStatus(machineId: string, status: ConnectionStatus, error?: string): void {
for (const cb of this.statusListeners) {
try {
cb(machineId, status, error);
} catch (err) {
console.error("[relay] Status listener error:", err);
}
}
}
}

View file

@ -0,0 +1,193 @@
/**
* OpenTelemetry integration for the Bun process.
*
* Controlled by AGOR_OTLP_ENDPOINT env var:
* - Set (e.g. "http://localhost:4318") -> OTLP/HTTP trace export + console
* - Absent -> console-only (no network calls)
*
* Provides structured span creation for agent sessions, PTY operations, and
* RPC calls. Frontend events are forwarded via the telemetry.log RPC.
*/
// ── Types ──────────────────────────────────────────────────────────────────
export type LogLevel = "info" | "warn" | "error";
export interface SpanAttributes {
[key: string]: string | number | boolean;
}
interface ActiveSpan {
name: string;
attributes: SpanAttributes;
startTime: number;
}
// ── Telemetry Manager ──────────────────────────────────────────────────────
class TelemetryManager {
private enabled = false;
private endpoint = "";
private activeSpans = new Map<string, ActiveSpan>();
private spanCounter = 0;
private serviceName = "agent-orchestrator-electrobun";
private serviceVersion = "3.0.0-dev";
/** Initialize telemetry. Call once at startup. */
init(): void {
const endpoint = process.env.AGOR_OTLP_ENDPOINT ?? "";
const isTest = process.env.AGOR_TEST === "1";
if (endpoint && !isTest) {
this.enabled = true;
this.endpoint = endpoint.endsWith("/")
? endpoint + "v1/traces"
: endpoint + "/v1/traces";
console.log(`[telemetry] OTLP export enabled -> ${this.endpoint}`);
} else {
console.log("[telemetry] Console-only (AGOR_OTLP_ENDPOINT not set)");
}
}
/** Start a named span. Returns a spanId to pass to endSpan(). */
span(name: string, attributes: SpanAttributes = {}): string {
const spanId = `span_${++this.spanCounter}_${Date.now()}`;
this.activeSpans.set(spanId, {
name,
attributes,
startTime: Date.now(),
});
this.consoleLog("info", `[span:start] ${name}`, attributes);
return spanId;
}
/** End a span and optionally export it via OTLP. */
endSpan(spanId: string, extraAttributes: SpanAttributes = {}): void {
const active = this.activeSpans.get(spanId);
if (!active) return;
this.activeSpans.delete(spanId);
const durationMs = Date.now() - active.startTime;
const allAttributes = { ...active.attributes, ...extraAttributes, durationMs };
this.consoleLog("info", `[span:end] ${active.name} (${durationMs}ms)`, allAttributes);
if (this.enabled) {
this.exportSpan(active.name, active.startTime, durationMs, allAttributes);
}
}
/** Log a structured message. Used for frontend-forwarded events. */
log(level: LogLevel, message: string, attributes: SpanAttributes = {}): void {
this.consoleLog(level, message, attributes);
if (this.enabled) {
this.exportLog(level, message, attributes);
}
}
/** Shutdown — flush any pending exports. */
shutdown(): void {
this.activeSpans.clear();
if (this.enabled) {
console.log("[telemetry] Shutdown");
}
}
// ── Internal ─────────────────────────────────────────────────────────────
private consoleLog(level: LogLevel, message: string, attrs: SpanAttributes): void {
const attrStr = Object.keys(attrs).length > 0
? ` ${JSON.stringify(attrs)}`
: "";
switch (level) {
case "error": console.error(`[tel] ${message}${attrStr}`); break;
case "warn": console.warn(`[tel] ${message}${attrStr}`); break;
default: console.log(`[tel] ${message}${attrStr}`); break;
}
}
private async exportSpan(
name: string,
startTimeMs: number,
durationMs: number,
attributes: SpanAttributes,
): Promise<void> {
const traceId = this.randomHex(32);
const spanId = this.randomHex(16);
const startNs = BigInt(startTimeMs) * 1_000_000n;
const endNs = BigInt(startTimeMs + durationMs) * 1_000_000n;
const otlpPayload = {
resourceSpans: [{
resource: {
attributes: [
{ key: "service.name", value: { stringValue: this.serviceName } },
{ key: "service.version", value: { stringValue: this.serviceVersion } },
],
},
scopeSpans: [{
scope: { name: this.serviceName },
spans: [{
traceId,
spanId,
name,
kind: 1, // INTERNAL
startTimeUnixNano: startNs.toString(),
endTimeUnixNano: endNs.toString(),
attributes: Object.entries(attributes).map(([key, value]) => ({
key,
value: typeof value === "number"
? { intValue: value }
: typeof value === "boolean"
? { boolValue: value }
: { stringValue: String(value) },
})),
status: { code: 1 }, // OK
}],
}],
}],
};
try {
await fetch(this.endpoint, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(otlpPayload),
signal: AbortSignal.timeout(5_000),
});
} catch (err) {
console.warn("[telemetry] OTLP export failed:", err instanceof Error ? err.message : err);
}
}
private async exportLog(
level: LogLevel,
message: string,
attributes: SpanAttributes,
): Promise<void> {
// Wrap log as a zero-duration span for Tempo compatibility
await this.exportSpan(
`log.${level}`,
Date.now(),
0,
{ ...attributes, "log.message": message, "log.level": level },
);
}
private randomHex(length: number): string {
const bytes = new Uint8Array(length / 2);
crypto.getRandomValues(bytes);
return Array.from(bytes, (b) => b.toString(16).padStart(2, "0")).join("");
}
}
// ── Singleton ──────────────────────────────────────────────────────────────
export const telemetry = new TelemetryManager();
/** Initialize telemetry. Call once at app startup. */
export function initTelemetry(): void {
telemetry.init();
}

View file

@ -0,0 +1,117 @@
/**
* Auto-updater: checks GitHub Releases API for newer versions.
*
* Electrobun doesn't have a built-in updater mechanism yet, so this module
* only detects available updates and returns metadata no download/install.
*/
import { settingsDb } from "./settings-db.ts";
// ── Config ──────────────────────────────────────────────────────────────────
const GITHUB_OWNER = "DexterFromLab";
const GITHUB_REPO = "agents-orchestrator";
const RELEASES_URL = `https://api.github.com/repos/${GITHUB_OWNER}/${GITHUB_REPO}/releases/latest`;
/** Minimum interval between automatic checks (1 hour). */
const MIN_CHECK_INTERVAL_MS = 60 * 60 * 1000;
const SETTINGS_KEY_LAST_CHECK = "updater_last_check";
const SETTINGS_KEY_LAST_VERSION = "updater_last_version";
// ── Types ───────────────────────────────────────────────────────────────────
export interface UpdateCheckResult {
available: boolean;
version: string;
downloadUrl: string;
releaseNotes: string;
checkedAt: number;
}
interface GitHubRelease {
tag_name: string;
html_url: string;
body: string | null;
assets: Array<{
name: string;
browser_download_url: string;
}>;
}
// ── Semver comparison ───────────────────────────────────────────────────────
function parseSemver(v: string): [number, number, number] {
const clean = v.replace(/^v/, "");
const parts = clean.split("-")[0].split(".");
return [
parseInt(parts[0] ?? "0", 10),
parseInt(parts[1] ?? "0", 10),
parseInt(parts[2] ?? "0", 10),
];
}
function isNewer(remote: string, local: string): boolean {
const [rMaj, rMin, rPatch] = parseSemver(remote);
const [lMaj, lMin, lPatch] = parseSemver(local);
if (rMaj !== lMaj) return rMaj > lMaj;
if (rMin !== lMin) return rMin > lMin;
return rPatch > lPatch;
}
// ── Core ────────────────────────────────────────────────────────────────────
/**
* Check GitHub Releases API for a newer version.
* Returns update metadata (never downloads or installs anything).
*/
export async function checkForUpdates(
currentVersion: string,
): Promise<UpdateCheckResult> {
const now = Date.now();
const resp = await fetch(RELEASES_URL, {
headers: {
Accept: "application/vnd.github.v3+json",
"User-Agent": "AgentOrchestrator-Updater",
},
signal: AbortSignal.timeout(10_000),
});
if (!resp.ok) {
throw new Error(`GitHub API returned ${resp.status}: ${resp.statusText}`);
}
const release: GitHubRelease = await resp.json();
const remoteVersion = release.tag_name.replace(/^v/, "");
// Find a .deb or .AppImage asset as download URL, fallback to release page
const linuxAsset = release.assets.find(
(a) => a.name.endsWith(".deb") || a.name.endsWith(".AppImage"),
);
const downloadUrl = linuxAsset?.browser_download_url ?? release.html_url;
// Persist check timestamp
settingsDb.setSetting(SETTINGS_KEY_LAST_CHECK, String(now));
settingsDb.setSetting(SETTINGS_KEY_LAST_VERSION, remoteVersion);
return {
available: isNewer(remoteVersion, currentVersion),
version: remoteVersion,
downloadUrl,
releaseNotes: release.body ?? "",
checkedAt: now,
};
}
/** Return the timestamp of the last update check (0 if never). */
export function getLastCheckTimestamp(): number {
const val = settingsDb.getSetting(SETTINGS_KEY_LAST_CHECK);
return val ? parseInt(val, 10) || 0 : 0;
}
/** Return the last known remote version (empty string if never checked). */
export function getLastKnownVersion(): string {
return settingsDb.getSetting(SETTINGS_KEY_LAST_VERSION) ?? "";
}