@agor/stores: - theme.svelte.ts, notifications.svelte.ts, health.svelte.ts extracted - Original files replaced with re-exports (zero consumer changes needed) - pnpm workspace + Vite/tsconfig aliases configured BackendAdapter tests (58 new): - backend-adapter.test.ts: 9 tests (lifecycle, singleton, testing seam) - tauri-adapter.test.ts: 28 tests (invoke mapping, command names, params) - electrobun-adapter.test.ts: 21 tests (RPC names, capabilities, stubs) Total: 523 tests passing (was 465, +58)
505 lines
16 KiB
TypeScript
505 lines
16 KiB
TypeScript
// Sidecar Manager — spawns and manages agent sidecar processes via Bun.spawn()
|
|
// Each session runs a provider-specific runner (.mjs) communicating via NDJSON on stdio.
|
|
|
|
import { join } from "path";
|
|
import { homedir } from "os";
|
|
import { existsSync } from "fs";
|
|
import { parseMessage, type AgentMessage, type ProviderId } from "./message-adapter.ts";
|
|
|
|
// ── Types ────────────────────────────────────────────────────────────────────
|
|
|
|
export type SessionStatus = "running" | "idle" | "done" | "error";
|
|
|
|
export interface SessionState {
|
|
sessionId: string;
|
|
provider: ProviderId;
|
|
status: SessionStatus;
|
|
costUsd: number;
|
|
inputTokens: number;
|
|
outputTokens: number;
|
|
startedAt: number;
|
|
}
|
|
|
|
export interface StartSessionOptions {
|
|
cwd?: string;
|
|
model?: string;
|
|
systemPrompt?: string;
|
|
maxTurns?: number;
|
|
permissionMode?: string;
|
|
claudeConfigDir?: string;
|
|
extraEnv?: Record<string, string>;
|
|
additionalDirectories?: string[];
|
|
worktreeName?: string;
|
|
}
|
|
|
|
type MessageCallback = (sessionId: string, messages: AgentMessage[]) => void;
|
|
type StatusCallback = (sessionId: string, status: SessionStatus, error?: string) => void;
|
|
|
|
interface ActiveSession {
|
|
state: SessionState;
|
|
proc: ReturnType<typeof Bun.spawn>;
|
|
controller: AbortController;
|
|
onMessage: MessageCallback[];
|
|
onStatus: StatusCallback[];
|
|
}
|
|
|
|
// ── Environment stripping (Fix #14) ──────────────────────────────────────────
|
|
|
|
const STRIP_PREFIXES = ["CLAUDE", "CODEX", "OLLAMA", "ANTHROPIC_"];
|
|
const WHITELIST_PREFIXES = ["CLAUDE_CODE_EXPERIMENTAL_"];
|
|
|
|
function validateExtraEnv(extraEnv: Record<string, string> | undefined): Record<string, string> | undefined {
|
|
if (!extraEnv) return undefined;
|
|
const clean: Record<string, string> = {};
|
|
for (const [key, value] of Object.entries(extraEnv)) {
|
|
const blocked = STRIP_PREFIXES.some((p) => key.startsWith(p));
|
|
if (blocked) {
|
|
console.warn(`[sidecar] Rejected extraEnv key "${key}" — provider-prefixed keys not allowed`);
|
|
continue;
|
|
}
|
|
clean[key] = value;
|
|
}
|
|
return Object.keys(clean).length > 0 ? clean : undefined;
|
|
}
|
|
|
|
function buildCleanEnv(extraEnv?: Record<string, string>, claudeConfigDir?: string): Record<string, string> {
|
|
const clean: Record<string, string> = {};
|
|
|
|
for (const [key, value] of Object.entries(process.env)) {
|
|
if (value === undefined) continue;
|
|
const shouldStrip = STRIP_PREFIXES.some((p) => key.startsWith(p));
|
|
const isWhitelisted = WHITELIST_PREFIXES.some((p) => key.startsWith(p));
|
|
if (!shouldStrip || isWhitelisted) {
|
|
clean[key] = value;
|
|
}
|
|
}
|
|
|
|
if (claudeConfigDir) {
|
|
clean["CLAUDE_CONFIG_DIR"] = claudeConfigDir;
|
|
}
|
|
// Apply validated extraEnv
|
|
const validated = validateExtraEnv(extraEnv);
|
|
if (validated) {
|
|
Object.assign(clean, validated);
|
|
}
|
|
|
|
return clean;
|
|
}
|
|
|
|
// ── Claude CLI detection ─────────────────────────────────────────────────────
|
|
|
|
function findClaudeCli(): string | undefined {
|
|
const candidates = [
|
|
join(homedir(), ".local", "bin", "claude"),
|
|
join(homedir(), ".claude", "local", "claude"),
|
|
"/usr/local/bin/claude",
|
|
"/usr/bin/claude",
|
|
];
|
|
for (const p of candidates) {
|
|
if (existsSync(p)) return p;
|
|
}
|
|
try {
|
|
const result = Bun.spawnSync(["which", "claude"]);
|
|
const path = new TextDecoder().decode(result.stdout).trim();
|
|
if (path && existsSync(path)) return path;
|
|
} catch {
|
|
// not found
|
|
}
|
|
return undefined;
|
|
}
|
|
|
|
// ── Runner resolution ────────────────────────────────────────────────────────
|
|
|
|
function resolveRunnerPath(provider: ProviderId): string {
|
|
const repoRoot = join(import.meta.dir, "..", "..", "..");
|
|
return join(repoRoot, "sidecar", "dist", `${provider}-runner.mjs`);
|
|
}
|
|
|
|
function findNodeRuntime(): string {
|
|
try {
|
|
const result = Bun.spawnSync(["which", "deno"]);
|
|
const path = new TextDecoder().decode(result.stdout).trim();
|
|
if (path) return path;
|
|
} catch { /* fallthrough */ }
|
|
|
|
try {
|
|
const result = Bun.spawnSync(["which", "node"]);
|
|
const path = new TextDecoder().decode(result.stdout).trim();
|
|
if (path) return path;
|
|
} catch { /* fallthrough */ }
|
|
|
|
return "node"; // last resort
|
|
}
|
|
|
|
// ── Cleanup grace period ─────────────────────────────────────────────────────
|
|
|
|
const CLEANUP_GRACE_MS = 60_000; // 60s after done/error before removing session
|
|
// Fix #12 (Codex audit): Max NDJSON line size — prevent OOM on malformed output
|
|
const MAX_LINE_SIZE = 10 * 1024 * 1024; // 10 MB
|
|
// Feature 5: Max total pending stdout buffer per session (50 MB)
|
|
const MAX_PENDING_BUFFER = 50 * 1024 * 1024;
|
|
|
|
// ── SidecarManager ───────────────────────────────────────────────────────────
|
|
|
|
export class SidecarManager {
|
|
private sessions = new Map<string, ActiveSession>();
|
|
private cleanupTimers = new Map<string, ReturnType<typeof setTimeout>>();
|
|
private claudePath: string | undefined;
|
|
private nodeRuntime: string;
|
|
|
|
constructor() {
|
|
this.claudePath = findClaudeCli();
|
|
this.nodeRuntime = findNodeRuntime();
|
|
|
|
if (this.claudePath) {
|
|
console.log(`[sidecar] Claude CLI found at ${this.claudePath}`);
|
|
} else {
|
|
console.warn("[sidecar] Claude CLI not found — Claude sessions will fail");
|
|
}
|
|
console.log(`[sidecar] Node runtime: ${this.nodeRuntime}`);
|
|
}
|
|
|
|
/** Start an agent session with the given provider */
|
|
startSession(
|
|
sessionId: string,
|
|
provider: ProviderId,
|
|
prompt: string,
|
|
options: StartSessionOptions = {},
|
|
): { ok: boolean; error?: string } {
|
|
if (this.sessions.has(sessionId)) {
|
|
return { ok: false, error: "Session already exists" };
|
|
}
|
|
|
|
if (provider === "claude" && !this.claudePath) {
|
|
return { ok: false, error: "Claude CLI not found. Install Claude Code first." };
|
|
}
|
|
|
|
const runnerPath = resolveRunnerPath(provider);
|
|
if (!existsSync(runnerPath)) {
|
|
return { ok: false, error: `Runner not found: ${runnerPath}` };
|
|
}
|
|
|
|
const controller = new AbortController();
|
|
const env = buildCleanEnv(options.extraEnv, options.claudeConfigDir);
|
|
|
|
const proc = Bun.spawn([this.nodeRuntime, runnerPath], {
|
|
stdin: "pipe",
|
|
stdout: "pipe",
|
|
stderr: "pipe",
|
|
env,
|
|
signal: controller.signal,
|
|
});
|
|
|
|
const state: SessionState = {
|
|
sessionId,
|
|
provider,
|
|
status: "running",
|
|
costUsd: 0,
|
|
inputTokens: 0,
|
|
outputTokens: 0,
|
|
startedAt: Date.now(),
|
|
};
|
|
|
|
const session: ActiveSession = {
|
|
state,
|
|
proc,
|
|
controller,
|
|
onMessage: [],
|
|
onStatus: [],
|
|
};
|
|
|
|
this.sessions.set(sessionId, session);
|
|
|
|
// Start reading stdout NDJSON
|
|
this.readStdout(sessionId, session);
|
|
|
|
// Read stderr for logging
|
|
this.readStderr(sessionId, session);
|
|
|
|
// Monitor process exit
|
|
proc.exited.then((exitCode) => {
|
|
const s = this.sessions.get(sessionId);
|
|
if (s) {
|
|
s.state.status = exitCode === 0 ? "done" : "error";
|
|
this.emitStatus(sessionId, s.state.status, exitCode !== 0 ? `Exit code: ${exitCode}` : undefined);
|
|
// Schedule cleanup (Fix #2)
|
|
this.scheduleCleanup(sessionId);
|
|
}
|
|
});
|
|
|
|
// Send the query command to the runner
|
|
const queryMsg: Record<string, unknown> = {
|
|
type: "query",
|
|
sessionId,
|
|
prompt,
|
|
cwd: options.cwd,
|
|
model: options.model,
|
|
systemPrompt: options.systemPrompt,
|
|
maxTurns: options.maxTurns,
|
|
permissionMode: options.permissionMode ?? "bypassPermissions",
|
|
claudeConfigDir: options.claudeConfigDir,
|
|
extraEnv: validateExtraEnv(options.extraEnv),
|
|
};
|
|
|
|
if (options.additionalDirectories?.length) {
|
|
queryMsg.additionalDirectories = options.additionalDirectories;
|
|
}
|
|
if (options.worktreeName) {
|
|
queryMsg.worktreeName = options.worktreeName;
|
|
}
|
|
|
|
this.writeToProcess(sessionId, queryMsg);
|
|
|
|
return { ok: true };
|
|
}
|
|
|
|
/** Stop a running session */
|
|
stopSession(sessionId: string): { ok: boolean; error?: string } {
|
|
const session = this.sessions.get(sessionId);
|
|
if (!session) {
|
|
return { ok: false, error: "Session not found" };
|
|
}
|
|
|
|
// Send stop command to runner first
|
|
this.writeToProcess(sessionId, { type: "stop", sessionId });
|
|
|
|
// Abort after a grace period if still running
|
|
setTimeout(() => {
|
|
const s = this.sessions.get(sessionId);
|
|
if (s && s.state.status === "running") {
|
|
s.controller.abort();
|
|
s.state.status = "done";
|
|
this.emitStatus(sessionId, "done");
|
|
this.scheduleCleanup(sessionId);
|
|
}
|
|
}, 3000);
|
|
|
|
return { ok: true };
|
|
}
|
|
|
|
/** Send a follow-up prompt to a running session */
|
|
writePrompt(sessionId: string, prompt: string): { ok: boolean; error?: string } {
|
|
const session = this.sessions.get(sessionId);
|
|
if (!session) {
|
|
return { ok: false, error: "Session not found" };
|
|
}
|
|
if (session.state.status !== "running" && session.state.status !== "idle") {
|
|
return { ok: false, error: `Session is ${session.state.status}` };
|
|
}
|
|
|
|
this.writeToProcess(sessionId, { type: "query", sessionId, prompt });
|
|
session.state.status = "running";
|
|
this.emitStatus(sessionId, "running");
|
|
|
|
return { ok: true };
|
|
}
|
|
|
|
/** List all sessions with their state */
|
|
listSessions(): SessionState[] {
|
|
return Array.from(this.sessions.values()).map((s) => ({ ...s.state }));
|
|
}
|
|
|
|
/** Register a callback for messages from a specific session */
|
|
onMessage(sessionId: string, callback: MessageCallback): void {
|
|
const session = this.sessions.get(sessionId);
|
|
if (session) {
|
|
session.onMessage.push(callback);
|
|
}
|
|
}
|
|
|
|
/** Register a callback for status changes of a specific session */
|
|
onStatus(sessionId: string, callback: StatusCallback): void {
|
|
const session = this.sessions.get(sessionId);
|
|
if (session) {
|
|
session.onStatus.push(callback);
|
|
}
|
|
}
|
|
|
|
/** Clean up a completed session */
|
|
removeSession(sessionId: string): void {
|
|
const session = this.sessions.get(sessionId);
|
|
if (session) {
|
|
if (session.state.status === "running") {
|
|
session.controller.abort();
|
|
}
|
|
this.sessions.delete(sessionId);
|
|
}
|
|
// Cancel any cleanup timer
|
|
const timer = this.cleanupTimers.get(sessionId);
|
|
if (timer) {
|
|
clearTimeout(timer);
|
|
this.cleanupTimers.delete(sessionId);
|
|
}
|
|
}
|
|
|
|
// ── Cleanup scheduling (Fix #2) ─────────────────────────────────────────
|
|
|
|
private scheduleCleanup(sessionId: string): void {
|
|
// Cancel any existing timer
|
|
const existing = this.cleanupTimers.get(sessionId);
|
|
if (existing) clearTimeout(existing);
|
|
|
|
const timer = setTimeout(() => {
|
|
this.cleanupTimers.delete(sessionId);
|
|
const session = this.sessions.get(sessionId);
|
|
if (session && (session.state.status === "done" || session.state.status === "error")) {
|
|
this.sessions.delete(sessionId);
|
|
}
|
|
}, CLEANUP_GRACE_MS);
|
|
|
|
this.cleanupTimers.set(sessionId, timer);
|
|
}
|
|
|
|
// ── Internal ───────────────────────────────────────────────────────────────
|
|
|
|
private writeToProcess(sessionId: string, msg: Record<string, unknown>): void {
|
|
const session = this.sessions.get(sessionId);
|
|
if (!session) return;
|
|
|
|
try {
|
|
const line = JSON.stringify(msg) + "\n";
|
|
session.proc.stdin.write(line);
|
|
} catch (err) {
|
|
console.error(`[sidecar] Write error for ${sessionId}:`, err);
|
|
}
|
|
}
|
|
|
|
private async readStdout(sessionId: string, session: ActiveSession): Promise<void> {
|
|
const reader = session.proc.stdout;
|
|
const decoder = new TextDecoder();
|
|
let buffer = "";
|
|
|
|
try {
|
|
for await (const chunk of reader) {
|
|
buffer += decoder.decode(chunk, { stream: true });
|
|
|
|
// Fix #12 (Codex audit): Guard against unbounded buffer growth
|
|
if (buffer.length > MAX_LINE_SIZE && !buffer.includes("\n")) {
|
|
console.error(`[sidecar] Buffer exceeded ${MAX_LINE_SIZE} bytes without newline for ${sessionId}, truncating`);
|
|
buffer = "";
|
|
continue;
|
|
}
|
|
|
|
// Feature 5: Backpressure guard — pause if total buffer exceeds 50MB
|
|
if (buffer.length > MAX_PENDING_BUFFER) {
|
|
console.warn(`[sidecar] Buffer exceeded ${MAX_PENDING_BUFFER} bytes for ${sessionId}, pausing read`);
|
|
// Drain what we can and skip the rest
|
|
buffer = buffer.slice(-MAX_LINE_SIZE);
|
|
}
|
|
|
|
let newlineIdx: number;
|
|
while ((newlineIdx = buffer.indexOf("\n")) !== -1) {
|
|
const line = buffer.slice(0, newlineIdx).trim();
|
|
buffer = buffer.slice(newlineIdx + 1);
|
|
if (!line) continue;
|
|
|
|
this.handleNdjsonLine(sessionId, session, line);
|
|
}
|
|
}
|
|
|
|
// Parse any residual data left in the buffer after stream ends
|
|
const residual = buffer.trim();
|
|
if (residual) {
|
|
this.handleNdjsonLine(sessionId, session, residual);
|
|
}
|
|
} catch (err) {
|
|
// Stream closed — expected on process exit
|
|
if (!session.controller.signal.aborted) {
|
|
console.error(`[sidecar] stdout read error for ${sessionId}:`, err);
|
|
}
|
|
}
|
|
}
|
|
|
|
private async readStderr(sessionId: string, session: ActiveSession): Promise<void> {
|
|
const reader = session.proc.stderr;
|
|
const decoder = new TextDecoder();
|
|
|
|
try {
|
|
for await (const chunk of reader) {
|
|
const text = decoder.decode(chunk, { stream: true });
|
|
for (const line of text.split("\n")) {
|
|
if (line.trim()) {
|
|
console.log(`[sidecar:${sessionId}] ${line.trim()}`);
|
|
}
|
|
}
|
|
}
|
|
} catch {
|
|
// Stream closed — expected
|
|
}
|
|
}
|
|
|
|
private handleNdjsonLine(sessionId: string, session: ActiveSession, line: string): void {
|
|
let raw: Record<string, unknown>;
|
|
try {
|
|
raw = JSON.parse(line);
|
|
} catch {
|
|
console.warn(`[sidecar] Invalid JSON from ${sessionId}: ${line.slice(0, 100)}`);
|
|
return;
|
|
}
|
|
|
|
// Handle sidecar-level events (not forwarded to message adapter)
|
|
const type = raw.type;
|
|
if (type === "ready" || type === "pong") return;
|
|
|
|
if (type === "agent_started") {
|
|
session.state.status = "running";
|
|
this.emitStatus(sessionId, "running");
|
|
return;
|
|
}
|
|
|
|
if (type === "agent_stopped") {
|
|
session.state.status = "done";
|
|
this.emitStatus(sessionId, "done");
|
|
return;
|
|
}
|
|
|
|
if (type === "agent_error") {
|
|
session.state.status = "error";
|
|
const errorMsg = typeof raw.message === "string" ? raw.message : "Unknown error";
|
|
this.emitStatus(sessionId, "error", errorMsg);
|
|
return;
|
|
}
|
|
|
|
// Extract the inner event for agent_event wrapper
|
|
const event =
|
|
type === "agent_event" && typeof raw.event === "object" && raw.event !== null
|
|
? (raw.event as Record<string, unknown>)
|
|
: raw;
|
|
|
|
// Parse through message adapter
|
|
const messages = parseMessage(session.state.provider, event);
|
|
|
|
// Update session state from cost messages
|
|
for (const msg of messages) {
|
|
if (msg.type === "cost") {
|
|
const cost = msg.content as Record<string, unknown>;
|
|
if (typeof cost.totalCostUsd === "number") session.state.costUsd = cost.totalCostUsd;
|
|
if (typeof cost.inputTokens === "number") session.state.inputTokens += cost.inputTokens;
|
|
if (typeof cost.outputTokens === "number") session.state.outputTokens += cost.outputTokens;
|
|
}
|
|
}
|
|
|
|
// Emit to callbacks
|
|
if (messages.length > 0) {
|
|
for (const cb of session.onMessage) {
|
|
try {
|
|
cb(sessionId, messages);
|
|
} catch (err) {
|
|
console.error(`[sidecar] Message callback error for ${sessionId}:`, err);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private emitStatus(sessionId: string, status: SessionStatus, error?: string): void {
|
|
const session = this.sessions.get(sessionId);
|
|
if (!session) return;
|
|
for (const cb of session.onStatus) {
|
|
try {
|
|
cb(sessionId, status, error);
|
|
} catch (err) {
|
|
console.error(`[sidecar] Status callback error for ${sessionId}:`, err);
|
|
}
|
|
}
|
|
}
|
|
}
|