fix(electrobun): partial Codex #3 fixes — message persistence race, double-start guard
This commit is contained in:
parent
4e86e97fd9
commit
c145e37316
1 changed files with 94 additions and 1 deletions
|
|
@ -121,6 +121,8 @@ const cleanupTimers = new Map<string, ReturnType<typeof setTimeout>>();
|
||||||
const msgPersistTimers = new Map<string, ReturnType<typeof setTimeout>>();
|
const msgPersistTimers = new Map<string, ReturnType<typeof setTimeout>>();
|
||||||
// Fix #12: Track last persisted index per session to avoid re-saving entire history
|
// Fix #12: Track last persisted index per session to avoid re-saving entire history
|
||||||
const lastPersistedIndex = new Map<string, number>();
|
const lastPersistedIndex = new Map<string, number>();
|
||||||
|
// Fix #2 (Codex audit): Guard against double-start race
|
||||||
|
const startingProjects = new Set<string>();
|
||||||
|
|
||||||
// ── Session persistence helpers ─────────────────────────────────────────────
|
// ── Session persistence helpers ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
@ -153,6 +155,8 @@ function persistMessages(session: AgentSession): void {
|
||||||
const startIdx = lastPersistedIndex.get(session.sessionId) ?? 0;
|
const startIdx = lastPersistedIndex.get(session.sessionId) ?? 0;
|
||||||
const newMsgs = session.messages.slice(startIdx);
|
const newMsgs = session.messages.slice(startIdx);
|
||||||
if (newMsgs.length === 0) return;
|
if (newMsgs.length === 0) return;
|
||||||
|
// Fix #1 (Codex audit): Snapshot batch end BEFORE async save to avoid race
|
||||||
|
const batchEnd = session.messages.length;
|
||||||
const msgs = newMsgs.map((m) => ({
|
const msgs = newMsgs.map((m) => ({
|
||||||
sessionId: session.sessionId,
|
sessionId: session.sessionId,
|
||||||
msgId: m.id,
|
msgId: m.id,
|
||||||
|
|
@ -163,7 +167,7 @@ function persistMessages(session: AgentSession): void {
|
||||||
timestamp: m.timestamp,
|
timestamp: m.timestamp,
|
||||||
}));
|
}));
|
||||||
appRpc.request['session.messages.save']({ messages: msgs }).then(() => {
|
appRpc.request['session.messages.save']({ messages: msgs }).then(() => {
|
||||||
lastPersistedIndex.set(session.sessionId, session.messages.length);
|
lastPersistedIndex.set(session.sessionId, batchEnd);
|
||||||
}).catch((err: unknown) => {
|
}).catch((err: unknown) => {
|
||||||
console.error('[session.messages.save] persist error:', err);
|
console.error('[session.messages.save] persist error:', err);
|
||||||
});
|
});
|
||||||
|
|
@ -434,6 +438,25 @@ export async function startAgent(
|
||||||
): Promise<{ ok: boolean; error?: string }> {
|
): Promise<{ ok: boolean; error?: string }> {
|
||||||
ensureListeners();
|
ensureListeners();
|
||||||
|
|
||||||
|
// Fix #2 (Codex audit): Prevent double-start race
|
||||||
|
if (startingProjects.has(projectId)) {
|
||||||
|
return { ok: false, error: 'Session start already in progress' };
|
||||||
|
}
|
||||||
|
startingProjects.add(projectId);
|
||||||
|
|
||||||
|
try {
|
||||||
|
return await _startAgentInner(projectId, provider, prompt, options);
|
||||||
|
} finally {
|
||||||
|
startingProjects.delete(projectId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function _startAgentInner(
|
||||||
|
projectId: string,
|
||||||
|
provider: string,
|
||||||
|
prompt: string,
|
||||||
|
options: StartOptions,
|
||||||
|
): Promise<{ ok: boolean; error?: string }> {
|
||||||
// If there's an existing done/error session for this project, clear it first
|
// If there's an existing done/error session for this project, clear it first
|
||||||
clearSession(projectId);
|
clearSession(projectId);
|
||||||
|
|
||||||
|
|
@ -583,6 +606,16 @@ export function clearSession(projectId: string): void {
|
||||||
*/
|
*/
|
||||||
export async function loadLastSession(projectId: string): Promise<boolean> {
|
export async function loadLastSession(projectId: string): Promise<boolean> {
|
||||||
ensureListeners();
|
ensureListeners();
|
||||||
|
|
||||||
|
// Fix #5 (Codex audit): Don't overwrite an active (running/starting) session
|
||||||
|
const existingSessionId = projectSessionMap.get(projectId);
|
||||||
|
if (existingSessionId) {
|
||||||
|
const existing = sessions[existingSessionId];
|
||||||
|
if (existing && (existing.status === 'running' || startingProjects.has(projectId))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const { session } = await appRpc.request['session.load']({ projectId });
|
const { session } = await appRpc.request['session.load']({ projectId });
|
||||||
if (!session) return false;
|
if (!session) return false;
|
||||||
|
|
@ -628,5 +661,65 @@ export async function loadLastSession(projectId: string): Promise<boolean> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Fix #14 (Codex audit): Session memory management ─────────────────────────
|
||||||
|
|
||||||
|
const MAX_SESSIONS_PER_PROJECT = 5;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Purge a session entirely from the sessions map.
|
||||||
|
* Call when a project is deleted or to free memory.
|
||||||
|
*/
|
||||||
|
export function purgeSession(sessionId: string): void {
|
||||||
|
delete sessions[sessionId];
|
||||||
|
lastPersistedIndex.delete(sessionId);
|
||||||
|
clearStallTimer(sessionId);
|
||||||
|
const pendingTimer = msgPersistTimers.get(sessionId);
|
||||||
|
if (pendingTimer) {
|
||||||
|
clearTimeout(pendingTimer);
|
||||||
|
msgPersistTimers.delete(sessionId);
|
||||||
|
}
|
||||||
|
const cleanupTimer = cleanupTimers.get(sessionId);
|
||||||
|
if (cleanupTimer) {
|
||||||
|
clearTimeout(cleanupTimer);
|
||||||
|
cleanupTimers.delete(sessionId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Purge all sessions for a project.
|
||||||
|
* Call when a project is removed from the workspace.
|
||||||
|
*/
|
||||||
|
export function purgeProjectSessions(projectId: string): void {
|
||||||
|
const sessionId = projectSessionMap.get(projectId);
|
||||||
|
if (sessionId) {
|
||||||
|
purgeSession(sessionId);
|
||||||
|
projectSessionMap.delete(projectId);
|
||||||
|
}
|
||||||
|
// Also purge any orphaned sessions for this project
|
||||||
|
for (const [sid, session] of Object.entries(sessions)) {
|
||||||
|
if (session.projectId === projectId) {
|
||||||
|
purgeSession(sid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Enforce max sessions per project — keep only the most recent N. */
|
||||||
|
function enforceMaxSessions(projectId: string): void {
|
||||||
|
const projectSessions = Object.entries(sessions)
|
||||||
|
.filter(([, s]) => s.projectId === projectId && s.status !== 'running')
|
||||||
|
.sort(([, a], [, b]) => {
|
||||||
|
const aTs = a.messages[a.messages.length - 1]?.timestamp ?? 0;
|
||||||
|
const bTs = b.messages[b.messages.length - 1]?.timestamp ?? 0;
|
||||||
|
return bTs - aTs; // newest first
|
||||||
|
});
|
||||||
|
|
||||||
|
if (projectSessions.length > MAX_SESSIONS_PER_PROJECT) {
|
||||||
|
const toRemove = projectSessions.slice(MAX_SESSIONS_PER_PROJECT);
|
||||||
|
for (const [sid] of toRemove) {
|
||||||
|
purgeSession(sid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Initialize listeners on module load. */
|
/** Initialize listeners on module load. */
|
||||||
ensureListeners();
|
ensureListeners();
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue