agent-orchestrator/sidecar/claude-runner.ts

344 lines
11 KiB
TypeScript

// Claude Runner — Node.js sidecar entry point for Claude Code provider
// Spawned by Rust SidecarManager, communicates via stdio NDJSON
// Uses @anthropic-ai/claude-agent-sdk for Claude session management
import { stdin, stdout, stderr } from 'process';
import { createInterface } from 'readline';
import { execSync } from 'child_process';
import { existsSync } from 'fs';
import { join } from 'path';
import { homedir } from 'os';
import { query, listSessions, type Query } from '@anthropic-ai/claude-agent-sdk';
import { readFileSync, writeFileSync, readdirSync, statSync } from 'fs';
const rl = createInterface({ input: stdin });
// Active agent sessions keyed by session ID
const sessions = new Map<string, { query: Query; controller: AbortController }>();
function send(msg: Record<string, unknown>) {
stdout.write(JSON.stringify(msg) + '\n');
}
function log(message: string) {
stderr.write(`[sidecar] ${message}\n`);
}
rl.on('line', (line: string) => {
try {
const msg = JSON.parse(line);
handleMessage(msg).catch((err: unknown) => {
log(`Unhandled error in message handler: ${err}`);
});
} catch {
log(`Invalid JSON: ${line}`);
}
});
interface QueryMessage {
type: 'query';
sessionId: string;
prompt: string;
cwd?: string;
maxTurns?: number;
maxBudgetUsd?: number;
/** @deprecated Use resumeMode='resume' + resumeSessionId instead. */
resumeSessionId?: string;
/** Session continuity: 'new' (default), 'continue' (most recent), 'resume' (specific). */
resumeMode?: 'new' | 'continue' | 'resume';
permissionMode?: string;
settingSources?: string[];
systemPrompt?: string;
model?: string;
claudeConfigDir?: string;
additionalDirectories?: string[];
worktreeName?: string;
extraEnv?: Record<string, string>;
}
interface StopMessage {
type: 'stop';
sessionId: string;
}
async function handleMessage(msg: Record<string, unknown>) {
switch (msg.type) {
case 'ping':
send({ type: 'pong' });
break;
case 'query':
await handleQuery(msg as unknown as QueryMessage);
break;
case 'stop':
handleStop(msg as unknown as StopMessage);
break;
default:
send({ type: 'error', message: `Unknown message type: ${msg.type}` });
}
}
async function handleQuery(msg: QueryMessage) {
const { sessionId, prompt, cwd, maxTurns, maxBudgetUsd, resumeSessionId, resumeMode, permissionMode, settingSources, systemPrompt, model, claudeConfigDir, additionalDirectories, worktreeName, extraEnv } = msg;
if (sessions.has(sessionId)) {
send({ type: 'error', sessionId, message: 'Session already running' });
return;
}
log(`Starting agent session ${sessionId} via SDK`);
const controller = new AbortController();
// Strip CLAUDE* and ANTHROPIC_* env vars to prevent nesting detection by the spawned CLI.
// Whitelist CLAUDE_CODE_EXPERIMENTAL_* so feature flags (e.g. agent teams) pass through.
const cleanEnv: Record<string, string | undefined> = {};
for (const [key, value] of Object.entries(process.env)) {
if (key.startsWith('CLAUDE_CODE_EXPERIMENTAL_')) {
cleanEnv[key] = value;
} else if (!key.startsWith('CLAUDE') && !key.startsWith('ANTHROPIC_')) {
cleanEnv[key] = value;
}
}
// Override CLAUDE_CONFIG_DIR for multi-account support
if (claudeConfigDir) {
cleanEnv['CLAUDE_CONFIG_DIR'] = claudeConfigDir;
}
// Inject extra environment variables (e.g. BTMSG_AGENT_ID for agent communication)
if (extraEnv) {
for (const [key, value] of Object.entries(extraEnv)) {
cleanEnv[key] = value;
}
}
try {
if (!claudePath) {
send({ type: 'agent_error', sessionId, message: 'Claude CLI not found. Install Claude Code first.' });
return;
}
// Build resume/continue options based on resumeMode
let resumeOpt: string | undefined;
let continueOpt: boolean | undefined;
if (resumeMode === 'continue') {
continueOpt = true;
log(`Session ${sessionId}: continuing most recent session`);
// Sanitize the most recent session file before SDK reads it
sanitizeSessionFiles(cwd || process.cwd());
} else if (resumeMode === 'resume' && resumeSessionId) {
resumeOpt = resumeSessionId;
log(`Session ${sessionId}: resuming SDK session ${resumeSessionId}`);
// Sanitize the specific session file
sanitizeSessionFile(cwd || process.cwd(), resumeSessionId);
} else if (resumeSessionId && !resumeMode) {
// Legacy: direct resumeSessionId without resumeMode
resumeOpt = resumeSessionId;
sanitizeSessionFile(cwd || process.cwd(), resumeSessionId);
}
const q = query({
prompt,
options: {
pathToClaudeCodeExecutable: claudePath,
abortController: controller,
cwd: cwd || process.cwd(),
env: cleanEnv,
maxTurns: maxTurns ?? undefined,
maxBudgetUsd: maxBudgetUsd ?? undefined,
resume: resumeOpt,
continue: continueOpt,
permissionMode: (permissionMode ?? 'bypassPermissions') as 'bypassPermissions' | 'default',
allowDangerouslySkipPermissions: (permissionMode ?? 'bypassPermissions') === 'bypassPermissions',
...(systemPrompt ? { systemPrompt } : {}),
model: model ?? undefined,
additionalDirectories: additionalDirectories ?? undefined,
extraArgs: worktreeName ? { worktree: worktreeName } : undefined,
},
});
sessions.set(sessionId, { query: q, controller });
send({ type: 'agent_started', sessionId });
for await (const message of q) {
// Forward SDK messages as-is — they use the same format as CLI stream-json
const sdkMsg = message as Record<string, unknown>;
send({
type: 'agent_event',
sessionId,
event: sdkMsg,
});
}
// Session completed normally
sessions.delete(sessionId);
send({
type: 'agent_stopped',
sessionId,
exitCode: 0,
signal: null,
});
} catch (err: unknown) {
sessions.delete(sessionId);
const errMsg = err instanceof Error ? err.message : String(err);
if (controller.signal.aborted) {
log(`Agent session ${sessionId} aborted`);
send({
type: 'agent_stopped',
sessionId,
exitCode: null,
signal: 'SIGTERM',
});
} else {
log(`Agent session ${sessionId} error: ${errMsg}`);
send({
type: 'agent_error',
sessionId,
message: errMsg,
});
}
}
}
function handleStop(msg: StopMessage) {
const { sessionId } = msg;
const session = sessions.get(sessionId);
if (!session) {
send({ type: 'error', sessionId, message: 'Session not found' });
return;
}
log(`Stopping agent session ${sessionId}`);
session.controller.abort();
}
function findClaudeCli(): string | undefined {
// Check common locations
const candidates = [
join(homedir(), '.local', 'bin', 'claude'),
join(homedir(), '.claude', 'local', 'claude'),
'/usr/local/bin/claude',
'/usr/bin/claude',
];
for (const p of candidates) {
if (existsSync(p)) return p;
}
// Fall back to which/where
try {
return execSync('which claude 2>/dev/null || where claude 2>nul', { encoding: 'utf-8' }).trim().split('\n')[0];
} catch {
return undefined;
}
}
const claudePath = findClaudeCli();
if (claudePath) {
log(`Found Claude CLI at ${claudePath}`);
} else {
log('WARNING: Claude CLI not found — agent sessions will fail');
}
// ── Session sanitizer ──────────────────────────────────────────────────────
// Fixes "cache_control cannot be set for empty text blocks" API error on resume.
// The SDK stores empty text blocks with cache_control during streaming;
// the API rejects them on replay. We strip them before the SDK reads the file.
function encodeCwd(cwdPath: string): string {
return cwdPath.replace(/[^a-zA-Z0-9]/g, '-');
}
function sanitizeSessionFile(cwdPath: string, sdkSessionId: string): void {
const encoded = encodeCwd(cwdPath);
const filePath = join(homedir(), '.claude', 'projects', encoded, `${sdkSessionId}.jsonl`);
try {
sanitizeJsonlFile(filePath);
} catch (err) {
log(`sanitize: could not clean ${filePath}: ${err}`);
}
}
function sanitizeSessionFiles(cwdPath: string): void {
// For 'continue' mode, sanitize the most recent session
try {
const sessions = listSessions({ dir: cwdPath, limit: 1 });
if (sessions && sessions.length > 0) {
sanitizeSessionFile(cwdPath, (sessions[0] as Record<string, unknown>).sessionId as string);
}
} catch (err) {
log(`sanitize: listSessions failed: ${err}`);
// Fallback: try to find and sanitize the most recent .jsonl file
const encoded = encodeCwd(cwdPath);
const dir = join(homedir(), '.claude', 'projects', encoded);
try {
const files = readdirSync(dir).filter(f => f.endsWith('.jsonl'));
if (files.length > 0) {
// Sort by mtime descending
files.sort((a, b) => {
try {
return statSync(join(dir, b)).mtimeMs - statSync(join(dir, a)).mtimeMs;
} catch { return 0; }
});
sanitizeJsonlFile(join(dir, files[0]));
}
} catch { /* dir doesn't exist */ }
}
}
function sanitizeJsonlFile(filePath: string): void {
if (!existsSync(filePath)) return;
const content = readFileSync(filePath, 'utf-8');
const lines = content.split('\n');
let modified = false;
const cleaned = lines.map(line => {
if (!line.trim()) return line;
try {
const obj = JSON.parse(line);
if (cleanContentBlocks(obj)) {
modified = true;
return JSON.stringify(obj);
}
} catch { /* skip unparseable lines */ }
return line;
});
if (modified) {
writeFileSync(filePath, cleaned.join('\n'));
log(`sanitize: cleaned empty text blocks from ${filePath}`);
}
}
function cleanContentBlocks(obj: Record<string, unknown>): boolean {
let changed = false;
// Check message.content array
const msg = obj.message as Record<string, unknown> | undefined;
if (msg?.content && Array.isArray(msg.content)) {
const before = msg.content.length;
msg.content = (msg.content as Array<Record<string, unknown>>).filter(block => {
// Remove empty text blocks (with or without cache_control)
if (block.type === 'text' && (!block.text || !(block.text as string).trim())) {
return false;
}
return true;
});
if (msg.content.length !== before) changed = true;
}
// Also check top-level content array (some formats)
if (obj.content && Array.isArray(obj.content)) {
const before = (obj.content as unknown[]).length;
obj.content = (obj.content as Array<Record<string, unknown>>).filter(block => {
if (block.type === 'text' && (!block.text || !(block.text as string).trim())) {
return false;
}
return true;
});
if ((obj.content as unknown[]).length !== before) changed = true;
}
return changed;
}
log('Sidecar started');
send({ type: 'ready' });