/** * Agent session store — manages per-project agent state and RPC communication. * * Listens for agent.message, agent.status, agent.cost events from Bun process. * Exposes reactive Svelte 5 rune state per project. */ import { appRpc } from './rpc.ts'; import { recordActivity, recordToolDone, recordTokenSnapshot, setProjectStatus } from './health-store.svelte.ts'; // ── Types ──────────────────────────────────────────────────────────────────── export type AgentStatus = 'idle' | 'running' | 'done' | 'error'; export type MsgRole = 'user' | 'assistant' | 'tool-call' | 'tool-result' | 'thinking' | 'system'; export interface AgentMessage { id: string; seqId: number; role: MsgRole; content: string; toolName?: string; toolInput?: string; toolPath?: string; timestamp: number; } export interface AgentSession { sessionId: string; projectId: string; provider: string; status: AgentStatus; messages: AgentMessage[]; costUsd: number; inputTokens: number; outputTokens: number; model: string; error?: string; } interface StartOptions { cwd?: string; model?: string; systemPrompt?: string; maxTurns?: number; permissionMode?: string; claudeConfigDir?: string; extraEnv?: Record; additionalDirectories?: string[]; worktreeName?: string; } // ── Toast callback (set by App.svelte) ──────────────────────────────────────── type ToastFn = (message: string, variant: 'success' | 'warning' | 'error' | 'info') => void; let _toastFn: ToastFn | null = null; /** Register a toast callback for agent notifications. */ export function setAgentToastFn(fn: ToastFn): void { _toastFn = fn; } function emitToast(message: string, variant: 'success' | 'warning' | 'error' | 'info') { _toastFn?.(message, variant); } // ── Stall detection ─────────────────────────────────────────────────────────── const stallTimers = new Map>(); const DEFAULT_STALL_MS = 15 * 60 * 1000; // 15 minutes function resetStallTimer(sessionId: string, projectId: string): void { const existing = stallTimers.get(sessionId); if (existing) clearTimeout(existing); const timer = setTimeout(() => { stallTimers.delete(sessionId); const session = sessions[sessionId]; if (session && session.status === 'running') { emitToast(`Agent stalled on ${projectId} (no activity for 15 min)`, 'warning'); } }, DEFAULT_STALL_MS); stallTimers.set(sessionId, timer); } function clearStallTimer(sessionId: string): void { const timer = stallTimers.get(sessionId); if (timer) { clearTimeout(timer); stallTimers.delete(sessionId); } } // ── Env var validation (Fix #14) ───────────────────────────────────────────── const BLOCKED_ENV_PREFIXES = ['CLAUDE', 'CODEX', 'OLLAMA', 'ANTHROPIC_']; function validateExtraEnv(env: Record | undefined): Record | undefined { if (!env) return undefined; const clean: Record = {}; for (const [key, value] of Object.entries(env)) { const blocked = BLOCKED_ENV_PREFIXES.some(p => key.startsWith(p)); if (blocked) { console.warn(`[agent-store] Rejected extraEnv key "${key}" — provider-prefixed keys are not allowed`); continue; } clean[key] = value; } return Object.keys(clean).length > 0 ? clean : undefined; } // ── Internal state ─────────────────────────────────────────────────────────── // Map projectId -> sessionId for lookup const projectSessionMap = new Map(); // Map sessionId -> reactive session state let sessions = $state>({}); // Grace period timers for cleanup after done/error const cleanupTimers = new Map>(); // Debounce timer for message persistence const msgPersistTimers = new Map>(); // Fix #12: Track last persisted index per session to avoid re-saving entire history const lastPersistedIndex = new Map(); // Fix #2 (Codex audit): Guard against double-start race const startingProjects = new Set(); // Feature 1: Monotonic seqId counter per session for dedup on restore const seqCounters = new Map(); function nextSeqId(sessionId: string): number { const current = seqCounters.get(sessionId) ?? 0; const next = current + 1; seqCounters.set(sessionId, next); return next; } // ── Session persistence helpers ───────────────────────────────────────────── function persistSession(session: AgentSession): void { appRpc.request['session.save']({ projectId: session.projectId, sessionId: session.sessionId, provider: session.provider, status: session.status, costUsd: session.costUsd, inputTokens: session.inputTokens, outputTokens: session.outputTokens, model: session.model, error: session.error, createdAt: session.messages[0]?.timestamp ?? Date.now(), updatedAt: Date.now(), }).catch((err: unknown) => { console.error('[session.save] persist error:', err); }); } function persistMessages(session: AgentSession): void { // Debounce: batch message saves every 2 seconds const existing = msgPersistTimers.get(session.sessionId); if (existing) clearTimeout(existing); const timer = setTimeout(() => { msgPersistTimers.delete(session.sessionId); // Fix #12: Only persist NEW messages (from lastPersistedIndex onward) const startIdx = lastPersistedIndex.get(session.sessionId) ?? 0; const newMsgs = session.messages.slice(startIdx); if (newMsgs.length === 0) return; // Fix #1 (Codex audit): Snapshot batch end BEFORE async save to avoid race const batchEnd = session.messages.length; const msgs = newMsgs.map((m) => ({ sessionId: session.sessionId, msgId: m.id, role: m.role, content: m.content, toolName: m.toolName, toolInput: m.toolInput, timestamp: m.timestamp, seqId: m.seqId, })); appRpc.request['session.messages.save']({ messages: msgs }).then(() => { lastPersistedIndex.set(session.sessionId, batchEnd); }).catch((err: unknown) => { console.error('[session.messages.save] persist error:', err); }); }, 2000); msgPersistTimers.set(session.sessionId, timer); } // ── RPC event listeners (registered once) ──────────────────────────────────── let listenersRegistered = false; function ensureListeners() { if (listenersRegistered) return; listenersRegistered = true; // agent.message — raw messages from sidecar, converted to display format appRpc.addMessageListener('agent.message', (payload: { sessionId: string; messages: Array<{ id: string; type: string; parentId?: string; content: unknown; timestamp: number; }>; }) => { const session = sessions[payload.sessionId]; if (!session) return; const converted: AgentMessage[] = []; for (const raw of payload.messages) { const msg = convertRawMessage(raw); if (msg) converted.push(msg); } if (converted.length > 0) { // Feature 1: Assign monotonic seqId to each message for dedup for (const msg of converted) { msg.seqId = nextSeqId(payload.sessionId); } session.messages = [...session.messages, ...converted]; persistMessages(session); // Reset stall timer on activity resetStallTimer(payload.sessionId, session.projectId); // Fix #14: Wire health store — record activity on every message batch for (const msg of converted) { if (msg.role === 'tool-call') { recordActivity(session.projectId, msg.toolName); } else if (msg.role === 'tool-result') { recordToolDone(session.projectId); } else { recordActivity(session.projectId); } } } }); // agent.status — session status changes appRpc.addMessageListener('agent.status', (payload: { sessionId: string; status: string; error?: string; }) => { const session = sessions[payload.sessionId]; if (!session) return; session.status = normalizeStatus(payload.status); if (payload.error) session.error = payload.error; // Fix #14: Wire health store — update project status setProjectStatus(session.projectId, session.status === 'done' ? 'done' : session.status === 'error' ? 'error' : session.status === 'running' ? 'running' : 'idle'); // Persist on every status change persistSession(session); // Emit toast notification on completion if (session.status === 'done') { clearStallTimer(payload.sessionId); emitToast(`Agent completed on ${session.projectId}`, 'success'); } else if (session.status === 'error') { clearStallTimer(payload.sessionId); emitToast(`Agent error on ${session.projectId}: ${payload.error ?? 'unknown'}`, 'error'); } // Schedule cleanup after done/error (Fix #2) if (session.status === 'done' || session.status === 'error') { // Flush any pending message persistence immediately const pendingTimer = msgPersistTimers.get(session.sessionId); if (pendingTimer) { clearTimeout(pendingTimer); msgPersistTimers.delete(session.sessionId); } persistMessages(session); scheduleCleanup(session.sessionId, session.projectId); // Fix #14 (Codex audit): Enforce max sessions per project on completion enforceMaxSessions(session.projectId); } }); // agent.cost — token/cost updates appRpc.addMessageListener('agent.cost', (payload: { sessionId: string; costUsd: number; inputTokens: number; outputTokens: number; }) => { const session = sessions[payload.sessionId]; if (!session) return; session.costUsd = payload.costUsd; session.inputTokens = payload.inputTokens; session.outputTokens = payload.outputTokens; // Fix #14: Wire health store — record token/cost snapshot recordTokenSnapshot(session.projectId, payload.inputTokens + payload.outputTokens, payload.costUsd); }); } // ── Cleanup scheduling (Fix #2) ────────────────────────────────────────────── const CLEANUP_GRACE_MS = 60_000; // 60 seconds after done/error function scheduleCleanup(sessionId: string, projectId: string) { // Cancel any existing timer for this session const existing = cleanupTimers.get(sessionId); if (existing) clearTimeout(existing); const timer = setTimeout(() => { cleanupTimers.delete(sessionId); // Only clean up if session is still in done/error state const session = sessions[sessionId]; if (session && (session.status === 'done' || session.status === 'error')) { // Keep session data (messages, cost) but remove from projectSessionMap // so starting a new session on this project works cleanly const currentMapped = projectSessionMap.get(projectId); if (currentMapped === sessionId) { projectSessionMap.delete(projectId); } } }, CLEANUP_GRACE_MS); cleanupTimers.set(sessionId, timer); } // ── Message conversion ─────────────────────────────────────────────────────── function convertRawMessage(raw: { id: string; type: string; parentId?: string; content: unknown; timestamp: number; }): AgentMessage | null { const c = raw.content as Record | undefined; switch (raw.type) { case 'text': return { id: raw.id, seqId: 0, role: 'assistant', content: String(c?.text ?? ''), timestamp: raw.timestamp, }; case 'thinking': return { id: raw.id, seqId: 0, role: 'thinking', content: String(c?.text ?? ''), timestamp: raw.timestamp, }; case 'tool_call': { const name = String(c?.name ?? 'Tool'); const input = c?.input as Record | undefined; const path = extractToolPath(name, input); return { id: raw.id, seqId: 0, role: 'tool-call', content: formatToolInput(name, input), toolName: name, toolInput: JSON.stringify(input, null, 2), toolPath: path, timestamp: raw.timestamp, }; } case 'tool_result': { const output = c?.output; const text = typeof output === 'string' ? output : JSON.stringify(output, null, 2); return { id: raw.id, seqId: 0, role: 'tool-result', content: truncateOutput(text, 500), timestamp: raw.timestamp, }; } case 'init': { const model = String(c?.model ?? ''); const sid = String(c?.sessionId ?? ''); for (const s of Object.values(sessions)) { if (s.sessionId === raw.id || (sid && s.sessionId.includes(sid.slice(0, 8)))) { if (model) s.model = model; } } return { id: raw.id, seqId: 0, role: 'system', content: `Session initialized${model ? ` (${model})` : ''}`, timestamp: raw.timestamp, }; } case 'error': return { id: raw.id, seqId: 0, role: 'system', content: `Error: ${String(c?.message ?? 'Unknown error')}`, timestamp: raw.timestamp, }; case 'cost': case 'status': case 'compaction': case 'unknown': return null; default: return null; } } function extractToolPath(name: string, input: Record | undefined): string | undefined { if (!input) return undefined; if (typeof input.file_path === 'string') return input.file_path; if (typeof input.path === 'string') return input.path; if (name === 'Bash' && typeof input.command === 'string') { return input.command.length > 80 ? input.command.slice(0, 80) + '...' : input.command; } return undefined; } function formatToolInput(name: string, input: Record | undefined): string { if (!input) return ''; if (name === 'Bash' && typeof input.command === 'string') return input.command; if (typeof input.file_path === 'string') return input.file_path; return JSON.stringify(input, null, 2); } function truncateOutput(text: string, maxLines: number): string { const lines = text.split('\n'); if (lines.length <= maxLines) return text; return lines.slice(0, maxLines).join('\n') + `\n... (${lines.length - maxLines} more lines)`; } function normalizeStatus(status: string): AgentStatus { if (status === 'running' || status === 'idle' || status === 'done' || status === 'error') { return status; } return 'idle'; } // ── Public API ─────────────────────────────────────────────────────────────── /** Start an agent session for a project (Fix #5: reads permission_mode + system_prompt from settings). */ export async function startAgent( projectId: string, provider: string, prompt: string, options: StartOptions = {}, ): Promise<{ ok: boolean; error?: string }> { ensureListeners(); // Fix #2 (Codex audit): Prevent double-start race if (startingProjects.has(projectId)) { return { ok: false, error: 'Session start already in progress' }; } startingProjects.add(projectId); try { return await _startAgentInner(projectId, provider, prompt, options); } finally { startingProjects.delete(projectId); } } async function _startAgentInner( projectId: string, provider: string, prompt: string, options: StartOptions, ): Promise<{ ok: boolean; error?: string }> { // If there's an existing done/error session for this project, clear it first clearSession(projectId); const sessionId = `${projectId}-${Date.now()}`; // Read settings defaults if not explicitly provided (Fix #5) let permissionMode = options.permissionMode; let systemPrompt = options.systemPrompt; let defaultModel = options.model; let cwd = options.cwd; try { const { settings } = await appRpc.request['settings.getAll']({}); if (!permissionMode && settings['permission_mode']) { permissionMode = settings['permission_mode']; } if (!systemPrompt && settings['system_prompt_template']) { systemPrompt = settings['system_prompt_template']; } if (!cwd && settings['default_cwd']) { cwd = settings['default_cwd']; } // Read default model from provider_settings if not specified if (!defaultModel && settings['provider_settings']) { try { const providerSettings = JSON.parse(settings['provider_settings']); const provConfig = providerSettings[provider]; if (provConfig?.defaultModel) defaultModel = provConfig.defaultModel; } catch { /* ignore parse errors */ } } } catch { /* use provided or defaults */ } // Create reactive session state sessions[sessionId] = { sessionId, projectId, provider, status: 'running', messages: [{ id: `${sessionId}-user-0`, seqId: nextSeqId(sessionId), role: 'user', content: prompt, timestamp: Date.now(), }], costUsd: 0, inputTokens: 0, outputTokens: 0, model: defaultModel ?? 'claude-opus-4-5', }; projectSessionMap.set(projectId, sessionId); resetStallTimer(sessionId, projectId); const result = await appRpc.request['agent.start']({ sessionId, provider: provider as 'claude' | 'codex' | 'ollama', prompt, cwd, model: defaultModel, systemPrompt: systemPrompt, maxTurns: options.maxTurns, permissionMode: permissionMode, claudeConfigDir: options.claudeConfigDir, extraEnv: validateExtraEnv(options.extraEnv), }); if (!result.ok) { sessions[sessionId].status = 'error'; sessions[sessionId].error = result.error; } return result; } /** Stop a running agent session for a project. */ export async function stopAgent(projectId: string): Promise<{ ok: boolean; error?: string }> { const sessionId = projectSessionMap.get(projectId); if (!sessionId) return { ok: false, error: 'No session for project' }; const result = await appRpc.request['agent.stop']({ sessionId }); if (result.ok) { const session = sessions[sessionId]; if (session) session.status = 'done'; } return result; } /** Send a follow-up prompt to a running session. */ export async function sendPrompt(projectId: string, prompt: string): Promise<{ ok: boolean; error?: string }> { const sessionId = projectSessionMap.get(projectId); if (!sessionId) return { ok: false, error: 'No session for project' }; const session = sessions[sessionId]; if (!session) return { ok: false, error: 'Session not found' }; // Add user message immediately session.messages = [...session.messages, { id: `${sessionId}-user-${Date.now()}`, seqId: nextSeqId(sessionId), role: 'user', content: prompt, timestamp: Date.now(), }]; session.status = 'running'; return appRpc.request['agent.prompt']({ sessionId, prompt }); } /** Get the current session for a project (reactive). */ export function getSession(projectId: string): AgentSession | undefined { const sessionId = projectSessionMap.get(projectId); if (!sessionId) return undefined; return sessions[sessionId]; } /** Check if a project has an active session. */ export function hasSession(projectId: string): boolean { return projectSessionMap.has(projectId); } /** * Clear a done/error session for a project (Fix #2). * Removes from projectSessionMap so a new session can start. * Keeps session data in sessions map for history access. */ export function clearSession(projectId: string): void { const sessionId = projectSessionMap.get(projectId); if (!sessionId) return; const session = sessions[sessionId]; if (session && (session.status === 'done' || session.status === 'error')) { projectSessionMap.delete(projectId); // Cancel any pending cleanup timer const timer = cleanupTimers.get(sessionId); if (timer) { clearTimeout(timer); cleanupTimers.delete(sessionId); } } } /** * Load the last session for a project from SQLite (for restart recovery). * Restores session state + messages into the reactive store. * Only restores done/error sessions (running sessions are gone after restart). */ export async function loadLastSession(projectId: string): Promise { ensureListeners(); // Fix #5 (Codex audit): Don't overwrite an active (running/starting) session const existingSessionId = projectSessionMap.get(projectId); if (existingSessionId) { const existing = sessions[existingSessionId]; if (existing && (existing.status === 'running' || startingProjects.has(projectId))) { return false; } } try { const { session } = await appRpc.request['session.load']({ projectId }); if (!session) return false; // Only restore completed sessions (running sessions can't be resumed) if (session.status !== 'done' && session.status !== 'error') return false; // Load messages for this session const { messages: storedMsgs } = await appRpc.request['session.messages.load']({ sessionId: session.sessionId, }); // Feature 1: Deduplicate by seqId and resume counter from max const seqIdSet = new Set(); const restoredMessages: AgentMessage[] = []; let maxSeqId = 0; for (const m of storedMsgs as Array<{ msgId: string; role: string; content: string; toolName?: string; toolInput?: string; timestamp: number; seqId?: number; }>) { const sid = m.seqId ?? 0; if (sid > 0 && seqIdSet.has(sid)) continue; // deduplicate if (sid > 0) seqIdSet.add(sid); if (sid > maxSeqId) maxSeqId = sid; restoredMessages.push({ id: m.msgId, seqId: sid, role: m.role as MsgRole, content: m.content, toolName: m.toolName, toolInput: m.toolInput, timestamp: m.timestamp, }); } // Resume seqId counter from max if (maxSeqId > 0) seqCounters.set(session.sessionId, maxSeqId); sessions[session.sessionId] = { sessionId: session.sessionId, projectId: session.projectId, provider: session.provider, status: normalizeStatus(session.status), messages: restoredMessages, costUsd: session.costUsd, inputTokens: session.inputTokens, outputTokens: session.outputTokens, model: session.model, error: session.error, }; projectSessionMap.set(projectId, session.sessionId); return true; } catch (err) { console.error('[loadLastSession] error:', err); return false; } } // ── Fix #14 (Codex audit): Session memory management ───────────────────────── // Feature 6: Configurable retention — defaults, overridable via settings let retentionCount = 5; let retentionDays = 30; /** Update retention settings (called from ProjectSettings). */ export function setRetentionConfig(count: number, days: number): void { retentionCount = Math.max(1, Math.min(50, count)); retentionDays = Math.max(1, Math.min(365, days)); } /** Load retention settings from backend on startup. */ export async function loadRetentionConfig(): Promise { try { const { settings } = await appRpc.request['settings.getAll']({}); if (settings['session_retention_count']) { retentionCount = Math.max(1, parseInt(settings['session_retention_count'], 10) || 5); } if (settings['session_retention_days']) { retentionDays = Math.max(1, parseInt(settings['session_retention_days'], 10) || 30); } } catch { /* use defaults */ } } const MAX_SESSIONS_PER_PROJECT = 5; // legacy fallback /** * Purge a session entirely from the sessions map. * Call when a project is deleted or to free memory. */ export function purgeSession(sessionId: string): void { delete sessions[sessionId]; lastPersistedIndex.delete(sessionId); seqCounters.delete(sessionId); clearStallTimer(sessionId); const pendingTimer = msgPersistTimers.get(sessionId); if (pendingTimer) { clearTimeout(pendingTimer); msgPersistTimers.delete(sessionId); } const cleanupTimer = cleanupTimers.get(sessionId); if (cleanupTimer) { clearTimeout(cleanupTimer); cleanupTimers.delete(sessionId); } } /** * Purge all sessions for a project. * Call when a project is removed from the workspace. */ export function purgeProjectSessions(projectId: string): void { const sessionId = projectSessionMap.get(projectId); if (sessionId) { purgeSession(sessionId); projectSessionMap.delete(projectId); } // Also purge any orphaned sessions for this project for (const [sid, session] of Object.entries(sessions)) { if (session.projectId === projectId) { purgeSession(sid); } } } /** Enforce max sessions per project — keep only the most recent N + prune by age. */ function enforceMaxSessions(projectId: string): void { const now = Date.now(); const maxAgeMs = retentionDays * 24 * 60 * 60 * 1000; const projectSessions = Object.entries(sessions) .filter(([, s]) => s.projectId === projectId && s.status !== 'running') .sort(([, a], [, b]) => { const aTs = a.messages[a.messages.length - 1]?.timestamp ?? 0; const bTs = b.messages[b.messages.length - 1]?.timestamp ?? 0; return bTs - aTs; // newest first }); // Feature 6: Prune by retention count if (projectSessions.length > retentionCount) { const toRemove = projectSessions.slice(retentionCount); for (const [sid] of toRemove) { purgeSession(sid); } } // Feature 6: Prune by age (retention days) for (const [sid, s] of projectSessions) { const lastTs = s.messages[s.messages.length - 1]?.timestamp ?? 0; if (lastTs > 0 && (now - lastTs) > maxAgeMs) { purgeSession(sid); } } } /** Initialize listeners on module load. */ ensureListeners();