/** * btmsg — Inter-agent messaging SQLite store. * DB: ~/.local/share/agor/btmsg.db (shared with btmsg CLI + bttask). * Uses bun:sqlite. Schema matches Rust btmsg.rs. */ import { Database } from "bun:sqlite"; import { homedir } from "os"; import { join } from "path"; import { randomUUID } from "crypto"; import { openDb } from "./db-utils.ts"; // ── DB path ────────────────────────────────────────────────────────────────── const DATA_DIR = join(homedir(), ".local", "share", "agor"); const DB_PATH = join(DATA_DIR, "btmsg.db"); // ── Types ──────────────────────────────────────────────────────────────────── export interface BtmsgAgent { id: string; name: string; role: string; groupId: string; tier: number; model: string | null; status: string; unreadCount: number; } export interface BtmsgMessage { id: string; fromAgent: string; toAgent: string; content: string; read: boolean; replyTo: string | null; createdAt: string; senderName: string | null; senderRole: string | null; } export interface BtmsgChannel { id: string; name: string; groupId: string; createdBy: string; memberCount: number; createdAt: string; } export interface BtmsgChannelMessage { id: string; channelId: string; fromAgent: string; content: string; createdAt: string; senderName: string; senderRole: string; } export interface DeadLetter { id: number; fromAgent: string; toAgent: string; content: string; error: string; createdAt: string; } export interface AuditEntry { id: number; agentId: string; eventType: string; detail: string; createdAt: string; } // ── Schema (create-if-absent, matches Rust open_db_or_create) ──────────────── const SCHEMA = ` CREATE TABLE IF NOT EXISTS agents ( id TEXT PRIMARY KEY, name TEXT NOT NULL, role TEXT NOT NULL, group_id TEXT NOT NULL, tier INTEGER NOT NULL DEFAULT 2, model TEXT, cwd TEXT, system_prompt TEXT, status TEXT DEFAULT 'stopped', last_active_at TEXT, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS contacts ( agent_id TEXT NOT NULL, contact_id TEXT NOT NULL, PRIMARY KEY (agent_id, contact_id) ); CREATE TABLE IF NOT EXISTS messages ( id TEXT PRIMARY KEY, from_agent TEXT NOT NULL, to_agent TEXT NOT NULL, content TEXT NOT NULL, read INTEGER DEFAULT 0, reply_to TEXT, group_id TEXT NOT NULL, sender_group_id TEXT, created_at TEXT DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_messages_to ON messages(to_agent, read); CREATE INDEX IF NOT EXISTS idx_messages_from ON messages(from_agent); CREATE TABLE IF NOT EXISTS channels ( id TEXT PRIMARY KEY, name TEXT NOT NULL, group_id TEXT NOT NULL, created_by TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS channel_members ( channel_id TEXT NOT NULL, agent_id TEXT NOT NULL, joined_at TEXT DEFAULT (datetime('now')), PRIMARY KEY (channel_id, agent_id) ); CREATE TABLE IF NOT EXISTS channel_messages ( id TEXT PRIMARY KEY, channel_id TEXT NOT NULL, from_agent TEXT NOT NULL, content TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_channel_messages ON channel_messages(channel_id, created_at); CREATE TABLE IF NOT EXISTS heartbeats ( agent_id TEXT PRIMARY KEY, timestamp INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS dead_letter_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, from_agent TEXT NOT NULL, to_agent TEXT NOT NULL, content TEXT NOT NULL, error TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent_id TEXT NOT NULL, event_type TEXT NOT NULL, detail TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS seen_messages ( session_id TEXT NOT NULL, message_id TEXT NOT NULL, seen_at INTEGER NOT NULL DEFAULT (unixepoch()), PRIMARY KEY (session_id, message_id) ); CREATE INDEX IF NOT EXISTS idx_seen_messages_session ON seen_messages(session_id); `; // Also create tasks/task_comments (shared DB with bttask) const TASK_SCHEMA = ` CREATE TABLE IF NOT EXISTS tasks ( id TEXT PRIMARY KEY, title TEXT NOT NULL, description TEXT DEFAULT '', status TEXT DEFAULT 'todo', priority TEXT DEFAULT 'medium', assigned_to TEXT, created_by TEXT NOT NULL, group_id TEXT NOT NULL, parent_task_id TEXT, sort_order INTEGER DEFAULT 0, created_at TEXT DEFAULT (datetime('now')), updated_at TEXT DEFAULT (datetime('now')), version INTEGER DEFAULT 1 ); CREATE INDEX IF NOT EXISTS idx_tasks_group ON tasks(group_id); CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status); CREATE TABLE IF NOT EXISTS task_comments ( id TEXT PRIMARY KEY, task_id TEXT NOT NULL, agent_id TEXT NOT NULL, content TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_task_comments_task ON task_comments(task_id); `; // ── BtmsgDb class ──────────────────────────────────────────────────────────── export class BtmsgDb { private db: Database; constructor() { this.db = openDb(DB_PATH, { busyTimeout: 5000, foreignKeys: true }); this.db.exec(SCHEMA); this.db.exec(TASK_SCHEMA); } /** Expose the underlying Database handle for shared-DB consumers (bttask). */ getHandle(): Database { return this.db; } // ── Agents ─────────────────────────────────────────────────────────────── registerAgent( id: string, name: string, role: string, groupId: string, tier: number, model?: string, ): void { this.db.query( `INSERT INTO agents (id, name, role, group_id, tier, model) VALUES (?1, ?2, ?3, ?4, ?5, ?6) ON CONFLICT(id) DO UPDATE SET name=excluded.name, role=excluded.role, group_id=excluded.group_id, tier=excluded.tier, model=excluded.model` ).run(id, name, role, groupId, tier, model ?? null); } getAgents(groupId: string): BtmsgAgent[] { return this.db.query<{ id: string; name: string; role: string; group_id: string; tier: number; model: string | null; status: string | null; unread_count: number; }, [string]>( `SELECT a.*, (SELECT COUNT(*) FROM messages m WHERE m.to_agent = a.id AND m.read = 0) as unread_count FROM agents a WHERE a.group_id = ? ORDER BY a.tier, a.role, a.name` ).all(groupId).map(r => ({ id: r.id, name: r.name, role: r.role, groupId: r.group_id, tier: r.tier, model: r.model, status: r.status ?? 'stopped', unreadCount: r.unread_count, })); } // ── Direct messages ────────────────────────────────────────────────────── /** * Fix #13 (Codex audit): Validate sender and recipient are in the same group. */ sendMessage(fromAgent: string, toAgent: string, content: string): string { // Get sender's group_id const sender = this.db.query<{ group_id: string }, [string]>( "SELECT group_id FROM agents WHERE id = ?" ).get(fromAgent); if (!sender) throw new Error(`Sender agent '${fromAgent}' not found`); // Validate recipient exists and is in the same group const recipient = this.db.query<{ group_id: string }, [string]>( "SELECT group_id FROM agents WHERE id = ?" ).get(toAgent); if (!recipient) throw new Error(`Recipient agent '${toAgent}' not found`); if (sender.group_id !== recipient.group_id) { throw new Error(`Cross-group messaging denied: '${fromAgent}' (${sender.group_id}) -> '${toAgent}' (${recipient.group_id})`); } const id = randomUUID(); this.db.query( `INSERT INTO messages (id, from_agent, to_agent, content, group_id, sender_group_id) VALUES (?1, ?2, ?3, ?4, ?5, ?5)` ).run(id, fromAgent, toAgent, content, sender.group_id); return id; } listMessages(agentId: string, otherId: string, limit = 50): BtmsgMessage[] { return this.db.query<{ id: string; from_agent: string; to_agent: string; content: string; read: number; reply_to: string | null; created_at: string; sender_name: string | null; sender_role: string | null; }, [string, string, string, string, number]>( `SELECT m.id, m.from_agent, m.to_agent, m.content, m.read, m.reply_to, m.created_at, a.name AS sender_name, a.role AS sender_role FROM messages m JOIN agents a ON m.from_agent = a.id WHERE (m.from_agent = ?1 AND m.to_agent = ?2) OR (m.from_agent = ?3 AND m.to_agent = ?4) ORDER BY m.created_at ASC LIMIT ?5` ).all(agentId, otherId, otherId, agentId, limit).map(r => ({ id: r.id, fromAgent: r.from_agent, toAgent: r.to_agent, content: r.content, read: r.read !== 0, replyTo: r.reply_to, createdAt: r.created_at, senderName: r.sender_name, senderRole: r.sender_role, })); } markRead(agentId: string, messageIds: string[]): void { if (messageIds.length === 0) return; const stmt = this.db.prepare( "UPDATE messages SET read = 1 WHERE id = ? AND to_agent = ?" ); const tx = this.db.transaction(() => { for (const mid of messageIds) stmt.run(mid, agentId); }); tx(); } // ── Channels ───────────────────────────────────────────────────────────── /** * Fix #13 (Codex audit): Auto-add creator to channel membership on create. */ createChannel(name: string, groupId: string, createdBy: string): string { const id = randomUUID(); const tx = this.db.transaction(() => { this.db.query( `INSERT INTO channels (id, name, group_id, created_by) VALUES (?1, ?2, ?3, ?4)` ).run(id, name, groupId, createdBy); // Auto-add creator as channel member this.db.query( `INSERT OR IGNORE INTO channel_members (channel_id, agent_id) VALUES (?1, ?2)` ).run(id, createdBy); }); tx(); return id; } listChannels(groupId: string): BtmsgChannel[] { return this.db.query<{ id: string; name: string; group_id: string; created_by: string; member_count: number; created_at: string; }, [string]>( `SELECT c.id, c.name, c.group_id, c.created_by, c.created_at, (SELECT COUNT(*) FROM channel_members cm WHERE cm.channel_id = c.id) AS member_count FROM channels c WHERE c.group_id = ? ORDER BY c.name` ).all(groupId).map(r => ({ id: r.id, name: r.name, groupId: r.group_id, createdBy: r.created_by, memberCount: r.member_count, createdAt: r.created_at, })); } getChannelMessages(channelId: string, limit = 100): BtmsgChannelMessage[] { return this.db.query<{ id: string; channel_id: string; from_agent: string; content: string; created_at: string; sender_name: string; sender_role: string; }, [string, number]>( `SELECT cm.id, cm.channel_id, cm.from_agent, cm.content, cm.created_at, COALESCE(a.name, cm.from_agent) AS sender_name, COALESCE(a.role, 'unknown') AS sender_role FROM channel_messages cm LEFT JOIN agents a ON cm.from_agent = a.id WHERE cm.channel_id = ? ORDER BY cm.created_at ASC LIMIT ?` ).all(channelId, limit).map(r => ({ id: r.id, channelId: r.channel_id, fromAgent: r.from_agent, content: r.content, createdAt: r.created_at, senderName: r.sender_name, senderRole: r.sender_role, })); } /** * Fix #13 (Codex audit): Validate sender is a member of the channel. */ sendChannelMessage(channelId: string, fromAgent: string, content: string): string { const member = this.db.query<{ agent_id: string }, [string, string]>( "SELECT agent_id FROM channel_members WHERE channel_id = ? AND agent_id = ?" ).get(channelId, fromAgent); if (!member) { throw new Error(`Agent '${fromAgent}' is not a member of channel '${channelId}'`); } const id = randomUUID(); this.db.query( `INSERT INTO channel_messages (id, channel_id, from_agent, content) VALUES (?1, ?2, ?3, ?4)` ).run(id, channelId, fromAgent, content); return id; } // ── Feature 7: Channel membership management ───────────────────────────── joinChannel(channelId: string, agentId: string): void { // Validate channel exists const ch = this.db.query<{ id: string }, [string]>( "SELECT id FROM channels WHERE id = ?" ).get(channelId); if (!ch) throw new Error(`Channel '${channelId}' not found`); this.db.query( "INSERT OR IGNORE INTO channel_members (channel_id, agent_id) VALUES (?1, ?2)" ).run(channelId, agentId); } leaveChannel(channelId: string, agentId: string): void { this.db.query( "DELETE FROM channel_members WHERE channel_id = ? AND agent_id = ?" ).run(channelId, agentId); } getChannelMembers(channelId: string): Array<{ agentId: string; name: string; role: string }> { return this.db.query<{ agent_id: string; name: string; role: string; }, [string]>( `SELECT cm.agent_id, a.name, a.role FROM channel_members cm JOIN agents a ON cm.agent_id = a.id WHERE cm.channel_id = ? ORDER BY a.name` ).all(channelId).map(r => ({ agentId: r.agent_id, name: r.name, role: r.role, })); } // ── Heartbeats ─────────────────────────────────────────────────────────── heartbeat(agentId: string): void { const now = Math.floor(Date.now() / 1000); this.db.query( `INSERT INTO heartbeats (agent_id, timestamp) VALUES (?1, ?2) ON CONFLICT(agent_id) DO UPDATE SET timestamp = excluded.timestamp` ).run(agentId, now); } // ── Dead letter queue ──────────────────────────────────────────────────── getDeadLetters(limit = 50): DeadLetter[] { return this.db.query<{ id: number; from_agent: string; to_agent: string; content: string; error: string; created_at: string; }, [number]>( `SELECT id, from_agent, to_agent, content, error, created_at FROM dead_letter_queue ORDER BY created_at DESC LIMIT ?` ).all(limit).map(r => ({ id: r.id, fromAgent: r.from_agent, toAgent: r.to_agent, content: r.content, error: r.error, createdAt: r.created_at, })); } // ── Audit log ──────────────────────────────────────────────────────────── logAudit(agentId: string, eventType: string, detail: string): void { this.db.query( `INSERT INTO audit_log (agent_id, event_type, detail) VALUES (?1, ?2, ?3)` ).run(agentId, eventType, detail); } getAuditLog(limit = 100): AuditEntry[] { return this.db.query<{ id: number; agent_id: string; event_type: string; detail: string; created_at: string; }, [number]>( `SELECT id, agent_id, event_type, detail, created_at FROM audit_log ORDER BY created_at DESC LIMIT ?` ).all(limit).map(r => ({ id: r.id, agentId: r.agent_id, eventType: r.event_type, detail: r.detail, createdAt: r.created_at, })); } // ── Seen messages (per-session acknowledgment) ─────────────────────────── markSeen(sessionId: string, messageIds: string[]): void { if (messageIds.length === 0) return; const stmt = this.db.prepare( "INSERT OR IGNORE INTO seen_messages (session_id, message_id) VALUES (?, ?)" ); const tx = this.db.transaction(() => { for (const mid of messageIds) stmt.run(sessionId, mid); }); tx(); } pruneSeen(maxAgeSecs = 7 * 24 * 3600): number { const result = this.db.query( "DELETE FROM seen_messages WHERE seen_at < unixepoch() - ?" ).run(maxAgeSecs); return (result as { changes: number }).changes; } // ── Lifecycle ──────────────────────────────────────────────────────────── close(): void { this.db.close(); } } // Singleton export const btmsgDb = new BtmsgDb();