agent-orchestrator/ui-electrobun/src/bun/sidecar-manager.ts
Hibryda ef0183de7f feat(electrobun): agent execution layer — sidecar manager + message adapters + store
- SidecarManager: spawns claude/codex/ollama runners via Bun.spawn(),
  NDJSON stdio protocol, Claude CLI auto-detection, env stripping,
  AbortController stop, Deno/Node runtime detection
- MessageAdapter: parses Claude stream-json, Codex ThreadEvent, Ollama
  chunks into common AgentMessage format
- agent-store.svelte.ts: per-project reactive session state, RPC event
  listeners for agent.message/status/cost
- AgentPane: wired to real sessions (start/stop/prompt), stop button,
  thinking/system message rendering
- ProjectCard: status dot from real agent status, cost/tokens from store
- 5 new RPC types (agent.start/stop/prompt/list + events)
2026-03-22 01:03:05 +01:00

428 lines
13 KiB
TypeScript

// Sidecar Manager — spawns and manages agent sidecar processes via Bun.spawn()
// Each session runs a provider-specific runner (.mjs) communicating via NDJSON on stdio.
import { join } from "path";
import { homedir } from "os";
import { existsSync } from "fs";
import { parseMessage, type AgentMessage, type ProviderId } from "./message-adapter.ts";
// ── Types ────────────────────────────────────────────────────────────────────
export type SessionStatus = "running" | "idle" | "done" | "error";
export interface SessionState {
sessionId: string;
provider: ProviderId;
status: SessionStatus;
costUsd: number;
inputTokens: number;
outputTokens: number;
startedAt: number;
}
export interface StartSessionOptions {
cwd?: string;
model?: string;
systemPrompt?: string;
maxTurns?: number;
permissionMode?: string;
claudeConfigDir?: string;
extraEnv?: Record<string, string>;
}
type MessageCallback = (sessionId: string, messages: AgentMessage[]) => void;
type StatusCallback = (sessionId: string, status: SessionStatus, error?: string) => void;
interface ActiveSession {
state: SessionState;
proc: ReturnType<typeof Bun.spawn>;
controller: AbortController;
onMessage: MessageCallback[];
onStatus: StatusCallback[];
}
// ── Environment stripping ────────────────────────────────────────────────────
const STRIP_PREFIXES = ["CLAUDE", "CODEX", "OLLAMA", "ANTHROPIC_"];
const WHITELIST_PREFIXES = ["CLAUDE_CODE_EXPERIMENTAL_"];
function buildCleanEnv(extraEnv?: Record<string, string>, claudeConfigDir?: string): Record<string, string> {
const clean: Record<string, string> = {};
for (const [key, value] of Object.entries(process.env)) {
if (value === undefined) continue;
const shouldStrip = STRIP_PREFIXES.some((p) => key.startsWith(p));
const isWhitelisted = WHITELIST_PREFIXES.some((p) => key.startsWith(p));
if (!shouldStrip || isWhitelisted) {
clean[key] = value;
}
}
if (claudeConfigDir) {
clean["CLAUDE_CONFIG_DIR"] = claudeConfigDir;
}
if (extraEnv) {
Object.assign(clean, extraEnv);
}
return clean;
}
// ── Claude CLI detection ─────────────────────────────────────────────────────
function findClaudeCli(): string | undefined {
const candidates = [
join(homedir(), ".local", "bin", "claude"),
join(homedir(), ".claude", "local", "claude"),
"/usr/local/bin/claude",
"/usr/bin/claude",
];
for (const p of candidates) {
if (existsSync(p)) return p;
}
try {
const result = Bun.spawnSync(["which", "claude"]);
const path = new TextDecoder().decode(result.stdout).trim();
if (path && existsSync(path)) return path;
} catch {
// not found
}
return undefined;
}
// ── Runner resolution ────────────────────────────────────────────────────────
function resolveRunnerPath(provider: ProviderId): string {
// Sidecar runners live in the repo's sidecar/dist/ directory
const repoRoot = join(import.meta.dir, "..", "..", "..");
return join(repoRoot, "sidecar", "dist", `${provider}-runner.mjs`);
}
function findNodeRuntime(): string {
// Prefer Deno, fallback to Node.js (matching Tauri sidecar behavior)
try {
const result = Bun.spawnSync(["which", "deno"]);
const path = new TextDecoder().decode(result.stdout).trim();
if (path) return path;
} catch { /* fallthrough */ }
try {
const result = Bun.spawnSync(["which", "node"]);
const path = new TextDecoder().decode(result.stdout).trim();
if (path) return path;
} catch { /* fallthrough */ }
return "node"; // last resort
}
// ── SidecarManager ───────────────────────────────────────────────────────────
export class SidecarManager {
private sessions = new Map<string, ActiveSession>();
private claudePath: string | undefined;
private nodeRuntime: string;
constructor() {
this.claudePath = findClaudeCli();
this.nodeRuntime = findNodeRuntime();
if (this.claudePath) {
console.log(`[sidecar] Claude CLI found at ${this.claudePath}`);
} else {
console.warn("[sidecar] Claude CLI not found — Claude sessions will fail");
}
console.log(`[sidecar] Node runtime: ${this.nodeRuntime}`);
}
/** Start an agent session with the given provider */
startSession(
sessionId: string,
provider: ProviderId,
prompt: string,
options: StartSessionOptions = {},
): { ok: boolean; error?: string } {
if (this.sessions.has(sessionId)) {
return { ok: false, error: "Session already exists" };
}
if (provider === "claude" && !this.claudePath) {
return { ok: false, error: "Claude CLI not found. Install Claude Code first." };
}
const runnerPath = resolveRunnerPath(provider);
if (!existsSync(runnerPath)) {
return { ok: false, error: `Runner not found: ${runnerPath}` };
}
const controller = new AbortController();
const env = buildCleanEnv(options.extraEnv, options.claudeConfigDir);
const proc = Bun.spawn([this.nodeRuntime, runnerPath], {
stdin: "pipe",
stdout: "pipe",
stderr: "pipe",
env,
signal: controller.signal,
});
const state: SessionState = {
sessionId,
provider,
status: "running",
costUsd: 0,
inputTokens: 0,
outputTokens: 0,
startedAt: Date.now(),
};
const session: ActiveSession = {
state,
proc,
controller,
onMessage: [],
onStatus: [],
};
this.sessions.set(sessionId, session);
// Start reading stdout NDJSON
this.readStdout(sessionId, session);
// Read stderr for logging
this.readStderr(sessionId, session);
// Monitor process exit
proc.exited.then((exitCode) => {
const s = this.sessions.get(sessionId);
if (s) {
s.state.status = exitCode === 0 ? "done" : "error";
this.emitStatus(sessionId, s.state.status, exitCode !== 0 ? `Exit code: ${exitCode}` : undefined);
}
});
// Send the query command to the runner
const queryMsg = {
type: "query",
sessionId,
prompt,
cwd: options.cwd,
model: options.model,
systemPrompt: options.systemPrompt,
maxTurns: options.maxTurns,
permissionMode: options.permissionMode ?? "bypassPermissions",
claudeConfigDir: options.claudeConfigDir,
extraEnv: options.extraEnv,
};
this.writeToProcess(sessionId, queryMsg);
return { ok: true };
}
/** Stop a running session */
stopSession(sessionId: string): { ok: boolean; error?: string } {
const session = this.sessions.get(sessionId);
if (!session) {
return { ok: false, error: "Session not found" };
}
// Send stop command to runner first
this.writeToProcess(sessionId, { type: "stop", sessionId });
// Abort after a grace period if still running
setTimeout(() => {
const s = this.sessions.get(sessionId);
if (s && s.state.status === "running") {
s.controller.abort();
s.state.status = "done";
this.emitStatus(sessionId, "done");
this.sessions.delete(sessionId);
}
}, 3000);
return { ok: true };
}
/** Send a follow-up prompt to a running session */
writePrompt(sessionId: string, prompt: string): { ok: boolean; error?: string } {
const session = this.sessions.get(sessionId);
if (!session) {
return { ok: false, error: "Session not found" };
}
if (session.state.status !== "running" && session.state.status !== "idle") {
return { ok: false, error: `Session is ${session.state.status}` };
}
this.writeToProcess(sessionId, { type: "query", sessionId, prompt });
session.state.status = "running";
this.emitStatus(sessionId, "running");
return { ok: true };
}
/** List all sessions with their state */
listSessions(): SessionState[] {
return Array.from(this.sessions.values()).map((s) => ({ ...s.state }));
}
/** Register a callback for messages from a specific session */
onMessage(sessionId: string, callback: MessageCallback): void {
const session = this.sessions.get(sessionId);
if (session) {
session.onMessage.push(callback);
}
}
/** Register a callback for status changes of a specific session */
onStatus(sessionId: string, callback: StatusCallback): void {
const session = this.sessions.get(sessionId);
if (session) {
session.onStatus.push(callback);
}
}
/** Clean up a completed session */
removeSession(sessionId: string): void {
const session = this.sessions.get(sessionId);
if (session) {
if (session.state.status === "running") {
session.controller.abort();
}
this.sessions.delete(sessionId);
}
}
// ── Internal ───────────────────────────────────────────────────────────────
private writeToProcess(sessionId: string, msg: Record<string, unknown>): void {
const session = this.sessions.get(sessionId);
if (!session) return;
try {
const line = JSON.stringify(msg) + "\n";
session.proc.stdin.write(line);
} catch (err) {
console.error(`[sidecar] Write error for ${sessionId}:`, err);
}
}
private async readStdout(sessionId: string, session: ActiveSession): Promise<void> {
const reader = session.proc.stdout;
const decoder = new TextDecoder();
let buffer = "";
try {
for await (const chunk of reader) {
buffer += decoder.decode(chunk, { stream: true });
let newlineIdx: number;
while ((newlineIdx = buffer.indexOf("\n")) !== -1) {
const line = buffer.slice(0, newlineIdx).trim();
buffer = buffer.slice(newlineIdx + 1);
if (!line) continue;
this.handleNdjsonLine(sessionId, session, line);
}
}
} catch (err) {
// Stream closed — expected on process exit
if (!session.controller.signal.aborted) {
console.error(`[sidecar] stdout read error for ${sessionId}:`, err);
}
}
}
private async readStderr(sessionId: string, session: ActiveSession): Promise<void> {
const reader = session.proc.stderr;
const decoder = new TextDecoder();
try {
for await (const chunk of reader) {
const text = decoder.decode(chunk, { stream: true });
// Log sidecar stderr as debug output
for (const line of text.split("\n")) {
if (line.trim()) {
console.log(`[sidecar:${sessionId}] ${line.trim()}`);
}
}
}
} catch {
// Stream closed — expected
}
}
private handleNdjsonLine(sessionId: string, session: ActiveSession, line: string): void {
let raw: Record<string, unknown>;
try {
raw = JSON.parse(line);
} catch {
console.warn(`[sidecar] Invalid JSON from ${sessionId}: ${line.slice(0, 100)}`);
return;
}
// Handle sidecar-level events (not forwarded to message adapter)
const type = raw.type;
if (type === "ready" || type === "pong") return;
if (type === "agent_started") {
session.state.status = "running";
this.emitStatus(sessionId, "running");
return;
}
if (type === "agent_stopped") {
session.state.status = "done";
this.emitStatus(sessionId, "done");
return;
}
if (type === "agent_error") {
session.state.status = "error";
const errorMsg = typeof raw.message === "string" ? raw.message : "Unknown error";
this.emitStatus(sessionId, "error", errorMsg);
return;
}
// Extract the inner event for agent_event wrapper
const event =
type === "agent_event" && typeof raw.event === "object" && raw.event !== null
? (raw.event as Record<string, unknown>)
: raw;
// Parse through message adapter
const messages = parseMessage(session.state.provider, event);
// Update session state from cost messages
for (const msg of messages) {
if (msg.type === "cost") {
const cost = msg.content as Record<string, unknown>;
if (typeof cost.totalCostUsd === "number") session.state.costUsd = cost.totalCostUsd;
if (typeof cost.inputTokens === "number") session.state.inputTokens += cost.inputTokens;
if (typeof cost.outputTokens === "number") session.state.outputTokens += cost.outputTokens;
}
}
// Emit to callbacks
if (messages.length > 0) {
for (const cb of session.onMessage) {
try {
cb(sessionId, messages);
} catch (err) {
console.error(`[sidecar] Message callback error for ${sessionId}:`, err);
}
}
}
}
private emitStatus(sessionId: string, status: SessionStatus, error?: string): void {
const session = this.sessions.get(sessionId);
if (!session) return;
for (const cb of session.onStatus) {
try {
cb(sessionId, status, error);
} catch (err) {
console.error(`[sidecar] Status callback error for ${sessionId}:`, err);
}
}
}
}