feat(providers): add Codex and Ollama provider runners with message adapters

This commit is contained in:
Hibryda 2026-03-11 03:56:05 +01:00
parent 4ae7ca6634
commit 3e34fda59a
9 changed files with 985 additions and 2 deletions

222
v2/sidecar/codex-runner.ts Normal file
View file

@ -0,0 +1,222 @@
// Codex Runner — Node.js sidecar entry point for OpenAI Codex provider
// Spawned by Rust SidecarManager, communicates via stdio NDJSON
// Uses @openai/codex-sdk for Codex session management
import { stdin, stdout, stderr } from 'process';
import { createInterface } from 'readline';
import { execSync } from 'child_process';
import { existsSync } from 'fs';
import { join } from 'path';
import { homedir } from 'os';
const rl = createInterface({ input: stdin });
const sessions = new Map<string, { controller: AbortController }>();
function send(msg: Record<string, unknown>) {
stdout.write(JSON.stringify(msg) + '\n');
}
function log(message: string) {
stderr.write(`[codex-sidecar] ${message}\n`);
}
rl.on('line', (line: string) => {
try {
const msg = JSON.parse(line);
handleMessage(msg).catch((err: unknown) => {
log(`Unhandled error in message handler: ${err}`);
});
} catch {
log(`Invalid JSON: ${line}`);
}
});
interface QueryMessage {
type: 'query';
sessionId: string;
prompt: string;
cwd?: string;
maxTurns?: number;
resumeSessionId?: string;
permissionMode?: string;
systemPrompt?: string;
model?: string;
providerConfig?: Record<string, unknown>;
}
interface StopMessage {
type: 'stop';
sessionId: string;
}
async function handleMessage(msg: Record<string, unknown>) {
switch (msg.type) {
case 'ping':
send({ type: 'pong' });
break;
case 'query':
await handleQuery(msg as unknown as QueryMessage);
break;
case 'stop':
handleStop(msg as unknown as StopMessage);
break;
default:
send({ type: 'error', message: `Unknown message type: ${msg.type}` });
}
}
async function handleQuery(msg: QueryMessage) {
const { sessionId, prompt, cwd, maxTurns, resumeSessionId, permissionMode, model, providerConfig } = msg;
if (sessions.has(sessionId)) {
send({ type: 'error', sessionId, message: 'Session already running' });
return;
}
log(`Starting Codex session ${sessionId}`);
const controller = new AbortController();
// Strip CODEX*/OPENAI* env vars to prevent nesting issues
const cleanEnv: Record<string, string | undefined> = {};
for (const [key, value] of Object.entries(process.env)) {
if (!key.startsWith('CODEX') && !key.startsWith('OPENAI')) {
cleanEnv[key] = value;
}
}
// Re-inject the API key
const apiKey = process.env.CODEX_API_KEY || process.env.OPENAI_API_KEY;
if (apiKey) {
cleanEnv['CODEX_API_KEY'] = apiKey;
}
// Dynamically import SDK — fails gracefully if not installed
let Codex: any;
try {
const sdk = await import('@openai/codex-sdk');
Codex = sdk.Codex ?? sdk.default;
} catch {
send({ type: 'agent_error', sessionId, message: 'Codex SDK not installed. Run: npm install @openai/codex-sdk' });
return;
}
if (!apiKey) {
send({ type: 'agent_error', sessionId, message: 'No API key. Set CODEX_API_KEY or OPENAI_API_KEY.' });
return;
}
try {
// Map permission mode to Codex sandbox/approval settings
const sandbox = mapSandboxMode(providerConfig?.sandbox as string | undefined, permissionMode);
const approvalPolicy = permissionMode === 'bypassPermissions' ? 'never' : 'on-request';
const codex = new Codex({
env: cleanEnv as Record<string, string>,
config: {
model: model ?? 'gpt-5.4',
approval_policy: approvalPolicy,
sandbox: sandbox,
},
});
const threadOpts: Record<string, unknown> = {
workingDirectory: cwd || process.cwd(),
};
const thread = resumeSessionId
? codex.resumeThread(resumeSessionId)
: codex.startThread(threadOpts);
sessions.set(sessionId, { controller });
send({ type: 'agent_started', sessionId });
const streamResult = await thread.runStreamed(prompt);
for await (const event of streamResult.events) {
if (controller.signal.aborted) break;
// Forward raw Codex events — the message adapter parses them
send({
type: 'agent_event',
sessionId,
event: event as Record<string, unknown>,
});
}
sessions.delete(sessionId);
send({
type: 'agent_stopped',
sessionId,
exitCode: 0,
signal: null,
});
} catch (err: unknown) {
sessions.delete(sessionId);
const errMsg = err instanceof Error ? err.message : String(err);
if (controller.signal.aborted) {
log(`Codex session ${sessionId} aborted`);
send({
type: 'agent_stopped',
sessionId,
exitCode: null,
signal: 'SIGTERM',
});
} else {
log(`Codex session ${sessionId} error: ${errMsg}`);
send({
type: 'agent_error',
sessionId,
message: errMsg,
});
}
}
}
function handleStop(msg: StopMessage) {
const { sessionId } = msg;
const session = sessions.get(sessionId);
if (!session) {
send({ type: 'error', sessionId, message: 'Session not found' });
return;
}
log(`Stopping Codex session ${sessionId}`);
session.controller.abort();
}
function mapSandboxMode(
configSandbox: string | undefined,
permissionMode: string | undefined,
): string {
if (configSandbox) return configSandbox;
if (permissionMode === 'bypassPermissions') return 'danger-full-access';
return 'workspace-write';
}
function findCodexCli(): string | undefined {
const candidates = [
join(homedir(), '.local', 'bin', 'codex'),
'/usr/local/bin/codex',
'/usr/bin/codex',
];
for (const p of candidates) {
if (existsSync(p)) return p;
}
try {
return execSync('which codex 2>/dev/null || where codex 2>nul', { encoding: 'utf-8' }).trim().split('\n')[0];
} catch {
return undefined;
}
}
const codexPath = findCodexCli();
if (codexPath) {
log(`Found Codex CLI at ${codexPath}`);
} else {
log('Codex CLI not found — will use SDK if available');
}
log('Codex sidecar started');
send({ type: 'ready' });

269
v2/sidecar/ollama-runner.ts Normal file
View file

@ -0,0 +1,269 @@
// Ollama Runner — Node.js sidecar entry point for local Ollama provider
// Spawned by Rust SidecarManager, communicates via stdio NDJSON
// Uses direct HTTP to Ollama REST API (no external dependencies)
import { stdin, stdout, stderr } from 'process';
import { createInterface } from 'readline';
const rl = createInterface({ input: stdin });
const sessions = new Map<string, { controller: AbortController }>();
function send(msg: Record<string, unknown>) {
stdout.write(JSON.stringify(msg) + '\n');
}
function log(message: string) {
stderr.write(`[ollama-sidecar] ${message}\n`);
}
rl.on('line', (line: string) => {
try {
const msg = JSON.parse(line);
handleMessage(msg).catch((err: unknown) => {
log(`Unhandled error in message handler: ${err}`);
});
} catch {
log(`Invalid JSON: ${line}`);
}
});
interface QueryMessage {
type: 'query';
sessionId: string;
prompt: string;
cwd?: string;
model?: string;
systemPrompt?: string;
providerConfig?: Record<string, unknown>;
}
interface StopMessage {
type: 'stop';
sessionId: string;
}
async function handleMessage(msg: Record<string, unknown>) {
switch (msg.type) {
case 'ping':
send({ type: 'pong' });
break;
case 'query':
await handleQuery(msg as unknown as QueryMessage);
break;
case 'stop':
handleStop(msg as unknown as StopMessage);
break;
default:
send({ type: 'error', message: `Unknown message type: ${msg.type}` });
}
}
async function handleQuery(msg: QueryMessage) {
const { sessionId, prompt, cwd, model, systemPrompt, providerConfig } = msg;
if (sessions.has(sessionId)) {
send({ type: 'error', sessionId, message: 'Session already running' });
return;
}
const ollamaHost = (providerConfig?.host as string) || process.env.OLLAMA_HOST || 'http://127.0.0.1:11434';
const ollamaModel = model || 'qwen3:8b';
const numCtx = (providerConfig?.num_ctx as number) || 32768;
const think = (providerConfig?.think as boolean) ?? false;
log(`Starting Ollama session ${sessionId} with model ${ollamaModel}`);
// Health check
try {
const healthRes = await fetch(`${ollamaHost}/api/version`);
if (!healthRes.ok) {
send({ type: 'agent_error', sessionId, message: `Ollama not reachable at ${ollamaHost} (HTTP ${healthRes.status})` });
return;
}
} catch (err: unknown) {
const errMsg = err instanceof Error ? err.message : String(err);
send({ type: 'agent_error', sessionId, message: `Cannot connect to Ollama at ${ollamaHost}: ${errMsg}` });
return;
}
const controller = new AbortController();
sessions.set(sessionId, { controller });
send({ type: 'agent_started', sessionId });
// Emit init event
send({
type: 'agent_event',
sessionId,
event: {
type: 'system',
subtype: 'init',
session_id: sessionId,
model: ollamaModel,
cwd: cwd || process.cwd(),
},
});
// Build messages array
const messages: Array<{ role: string; content: string }> = [];
if (systemPrompt && typeof systemPrompt === 'string') {
messages.push({ role: 'system', content: systemPrompt });
}
messages.push({ role: 'user', content: prompt });
try {
const res = await fetch(`${ollamaHost}/api/chat`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
model: ollamaModel,
messages,
stream: true,
options: { num_ctx: numCtx },
think,
}),
signal: controller.signal,
});
if (!res.ok) {
const errBody = await res.text();
let errMsg: string;
try {
const parsed = JSON.parse(errBody);
errMsg = parsed.error || errBody;
} catch {
errMsg = errBody;
}
send({ type: 'agent_error', sessionId, message: `Ollama error (${res.status}): ${errMsg}` });
sessions.delete(sessionId);
return;
}
if (!res.body) {
send({ type: 'agent_error', sessionId, message: 'No response body from Ollama' });
sessions.delete(sessionId);
return;
}
// Parse NDJSON stream
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
if (controller.signal.aborted) break;
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
const chunk = JSON.parse(trimmed) as Record<string, unknown>;
// Check for mid-stream error
if (typeof chunk.error === 'string') {
send({
type: 'agent_event',
sessionId,
event: { type: 'error', message: chunk.error },
});
continue;
}
// Forward as chunk event for the message adapter
send({
type: 'agent_event',
sessionId,
event: {
type: 'chunk',
message: chunk.message,
done: chunk.done,
done_reason: chunk.done_reason,
model: chunk.model,
prompt_eval_count: chunk.prompt_eval_count,
eval_count: chunk.eval_count,
eval_duration: chunk.eval_duration,
total_duration: chunk.total_duration,
},
});
} catch {
log(`Failed to parse Ollama chunk: ${trimmed}`);
}
}
}
// Process remaining buffer
if (buffer.trim()) {
try {
const chunk = JSON.parse(buffer.trim()) as Record<string, unknown>;
send({
type: 'agent_event',
sessionId,
event: {
type: 'chunk',
message: chunk.message,
done: chunk.done,
done_reason: chunk.done_reason,
model: chunk.model,
prompt_eval_count: chunk.prompt_eval_count,
eval_count: chunk.eval_count,
eval_duration: chunk.eval_duration,
total_duration: chunk.total_duration,
},
});
} catch {
log(`Failed to parse final Ollama buffer: ${buffer}`);
}
}
sessions.delete(sessionId);
send({
type: 'agent_stopped',
sessionId,
exitCode: 0,
signal: null,
});
} catch (err: unknown) {
sessions.delete(sessionId);
const errMsg = err instanceof Error ? err.message : String(err);
if (controller.signal.aborted) {
log(`Ollama session ${sessionId} aborted`);
send({
type: 'agent_stopped',
sessionId,
exitCode: null,
signal: 'SIGTERM',
});
} else {
log(`Ollama session ${sessionId} error: ${errMsg}`);
send({
type: 'agent_error',
sessionId,
message: errMsg,
});
}
}
}
function handleStop(msg: StopMessage) {
const { sessionId } = msg;
const session = sessions.get(sessionId);
if (!session) {
send({ type: 'error', sessionId, message: 'Session not found' });
return;
}
log(`Stopping Ollama session ${sessionId}`);
session.controller.abort();
}
log('Ollama sidecar started');
send({ type: 'ready' });