agent-orchestrator/ui-electrobun/src/bun/btmsg-db.ts
Hibryda 252fca70df feat(electrobun): file management — CodeMirror editor, PDF viewer, CSV table, real file I/O
- CodeEditor: CodeMirror 6 with Catppuccin theme, 15+ languages, Ctrl+S save,
  dirty tracking, save-on-blur
- PdfViewer: pdfjs-dist canvas rendering, zoom 0.5-3x, HiDPI, lazy page load
- CsvTable: RFC 4180 parser, delimiter auto-detect, sortable columns, sticky header
- FileBrowser: real filesystem via files.list/read/write RPC, lazy dir loading,
  file type routing (code→editor, pdf→viewer, csv→table, images→display)
- 10MB size gate, binary detection, base64 encoding for non-text files
2026-03-22 01:36:02 +01:00

379 lines
14 KiB
TypeScript

/**
* btmsg — Inter-agent messaging SQLite store.
* DB: ~/.local/share/agor/btmsg.db (shared with btmsg CLI + bttask).
* Uses bun:sqlite. Schema matches Rust btmsg.rs.
*/
import { Database } from "bun:sqlite";
import { homedir } from "os";
import { mkdirSync } from "fs";
import { join } from "path";
import { randomUUID } from "crypto";
// ── DB path ──────────────────────────────────────────────────────────────────
const DATA_DIR = join(homedir(), ".local", "share", "agor");
const DB_PATH = join(DATA_DIR, "btmsg.db");
// ── Types ────────────────────────────────────────────────────────────────────
export interface BtmsgAgent {
id: string;
name: string;
role: string;
groupId: string;
tier: number;
model: string | null;
status: string;
unreadCount: number;
}
export interface BtmsgMessage {
id: string;
fromAgent: string;
toAgent: string;
content: string;
read: boolean;
replyTo: string | null;
createdAt: string;
senderName: string | null;
senderRole: string | null;
}
export interface BtmsgChannel {
id: string;
name: string;
groupId: string;
createdBy: string;
memberCount: number;
createdAt: string;
}
export interface BtmsgChannelMessage {
id: string;
channelId: string;
fromAgent: string;
content: string;
createdAt: string;
senderName: string;
senderRole: string;
}
export interface DeadLetter {
id: number;
fromAgent: string;
toAgent: string;
content: string;
error: string;
createdAt: string;
}
export interface AuditEntry {
id: number;
agentId: string;
eventType: string;
detail: string;
createdAt: string;
}
// ── Schema (create-if-absent, matches Rust open_db_or_create) ────────────────
const SCHEMA = `
CREATE TABLE IF NOT EXISTS agents (
id TEXT PRIMARY KEY, name TEXT NOT NULL, role TEXT NOT NULL,
group_id TEXT NOT NULL, tier INTEGER NOT NULL DEFAULT 2,
model TEXT, cwd TEXT, system_prompt TEXT,
status TEXT DEFAULT 'stopped', last_active_at TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS contacts (
agent_id TEXT NOT NULL, contact_id TEXT NOT NULL,
PRIMARY KEY (agent_id, contact_id)
);
CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY, from_agent TEXT NOT NULL, to_agent TEXT NOT NULL,
content TEXT NOT NULL, read INTEGER DEFAULT 0, reply_to TEXT,
group_id TEXT NOT NULL, sender_group_id TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_messages_to ON messages(to_agent, read);
CREATE INDEX IF NOT EXISTS idx_messages_from ON messages(from_agent);
CREATE TABLE IF NOT EXISTS channels (
id TEXT PRIMARY KEY, name TEXT NOT NULL, group_id TEXT NOT NULL,
created_by TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS channel_members (
channel_id TEXT NOT NULL, agent_id TEXT NOT NULL,
joined_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (channel_id, agent_id)
);
CREATE TABLE IF NOT EXISTS channel_messages (
id TEXT PRIMARY KEY, channel_id TEXT NOT NULL, from_agent TEXT NOT NULL,
content TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_channel_messages ON channel_messages(channel_id, created_at);
CREATE TABLE IF NOT EXISTS heartbeats (
agent_id TEXT PRIMARY KEY, timestamp INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS dead_letter_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT, from_agent TEXT NOT NULL,
to_agent TEXT NOT NULL, content TEXT NOT NULL, error TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT, agent_id TEXT NOT NULL,
event_type TEXT NOT NULL, detail TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS seen_messages (
session_id TEXT NOT NULL, message_id TEXT NOT NULL,
seen_at INTEGER NOT NULL DEFAULT (unixepoch()),
PRIMARY KEY (session_id, message_id)
);
CREATE INDEX IF NOT EXISTS idx_seen_messages_session ON seen_messages(session_id);
`;
// Also create tasks/task_comments (shared DB with bttask)
const TASK_SCHEMA = `
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY, title TEXT NOT NULL, description TEXT DEFAULT '',
status TEXT DEFAULT 'todo', priority TEXT DEFAULT 'medium',
assigned_to TEXT, created_by TEXT NOT NULL, group_id TEXT NOT NULL,
parent_task_id TEXT, sort_order INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now')), version INTEGER DEFAULT 1
);
CREATE INDEX IF NOT EXISTS idx_tasks_group ON tasks(group_id);
CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status);
CREATE TABLE IF NOT EXISTS task_comments (
id TEXT PRIMARY KEY, task_id TEXT NOT NULL, agent_id TEXT NOT NULL,
content TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_task_comments_task ON task_comments(task_id);
`;
// ── BtmsgDb class ────────────────────────────────────────────────────────────
export class BtmsgDb {
private db: Database;
constructor() {
mkdirSync(DATA_DIR, { recursive: true });
this.db = new Database(DB_PATH);
this.db.exec("PRAGMA journal_mode = WAL");
this.db.exec("PRAGMA busy_timeout = 5000");
this.db.exec("PRAGMA foreign_keys = ON");
this.db.exec(SCHEMA);
this.db.exec(TASK_SCHEMA);
}
// ── Agents ───────────────────────────────────────────────────────────────
registerAgent(
id: string, name: string, role: string,
groupId: string, tier: number, model?: string,
): void {
this.db.query(
`INSERT INTO agents (id, name, role, group_id, tier, model)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)
ON CONFLICT(id) DO UPDATE SET
name=excluded.name, role=excluded.role,
group_id=excluded.group_id, tier=excluded.tier, model=excluded.model`
).run(id, name, role, groupId, tier, model ?? null);
}
getAgents(groupId: string): BtmsgAgent[] {
return this.db.query<{
id: string; name: string; role: string; group_id: string;
tier: number; model: string | null; status: string | null;
unread_count: number;
}, [string]>(
`SELECT a.*, (SELECT COUNT(*) FROM messages m
WHERE m.to_agent = a.id AND m.read = 0) as unread_count
FROM agents a WHERE a.group_id = ? ORDER BY a.tier, a.role, a.name`
).all(groupId).map(r => ({
id: r.id, name: r.name, role: r.role, groupId: r.group_id,
tier: r.tier, model: r.model, status: r.status ?? 'stopped',
unreadCount: r.unread_count,
}));
}
// ── Direct messages ──────────────────────────────────────────────────────
sendMessage(fromAgent: string, toAgent: string, content: string): string {
// Get sender's group_id
const sender = this.db.query<{ group_id: string }, [string]>(
"SELECT group_id FROM agents WHERE id = ?"
).get(fromAgent);
if (!sender) throw new Error(`Sender agent '${fromAgent}' not found`);
const id = randomUUID();
this.db.query(
`INSERT INTO messages (id, from_agent, to_agent, content, group_id, sender_group_id)
VALUES (?1, ?2, ?3, ?4, ?5, ?5)`
).run(id, fromAgent, toAgent, content, sender.group_id);
return id;
}
listMessages(agentId: string, otherId: string, limit = 50): BtmsgMessage[] {
return this.db.query<{
id: string; from_agent: string; to_agent: string; content: string;
read: number; reply_to: string | null; created_at: string;
sender_name: string | null; sender_role: string | null;
}, [string, string, string, string, number]>(
`SELECT m.id, m.from_agent, m.to_agent, m.content, m.read,
m.reply_to, m.created_at,
a.name AS sender_name, a.role AS sender_role
FROM messages m JOIN agents a ON m.from_agent = a.id
WHERE (m.from_agent = ?1 AND m.to_agent = ?2)
OR (m.from_agent = ?3 AND m.to_agent = ?4)
ORDER BY m.created_at ASC LIMIT ?5`
).all(agentId, otherId, otherId, agentId, limit).map(r => ({
id: r.id, fromAgent: r.from_agent, toAgent: r.to_agent,
content: r.content, read: r.read !== 0, replyTo: r.reply_to,
createdAt: r.created_at, senderName: r.sender_name,
senderRole: r.sender_role,
}));
}
markRead(agentId: string, messageIds: string[]): void {
if (messageIds.length === 0) return;
const stmt = this.db.prepare(
"UPDATE messages SET read = 1 WHERE id = ? AND to_agent = ?"
);
const tx = this.db.transaction(() => {
for (const mid of messageIds) stmt.run(mid, agentId);
});
tx();
}
// ── Channels ─────────────────────────────────────────────────────────────
createChannel(name: string, groupId: string, createdBy: string): string {
const id = randomUUID();
this.db.query(
`INSERT INTO channels (id, name, group_id, created_by)
VALUES (?1, ?2, ?3, ?4)`
).run(id, name, groupId, createdBy);
return id;
}
listChannels(groupId: string): BtmsgChannel[] {
return this.db.query<{
id: string; name: string; group_id: string; created_by: string;
member_count: number; created_at: string;
}, [string]>(
`SELECT c.id, c.name, c.group_id, c.created_by, c.created_at,
(SELECT COUNT(*) FROM channel_members cm WHERE cm.channel_id = c.id) AS member_count
FROM channels c WHERE c.group_id = ? ORDER BY c.name`
).all(groupId).map(r => ({
id: r.id, name: r.name, groupId: r.group_id, createdBy: r.created_by,
memberCount: r.member_count, createdAt: r.created_at,
}));
}
getChannelMessages(channelId: string, limit = 100): BtmsgChannelMessage[] {
return this.db.query<{
id: string; channel_id: string; from_agent: string;
content: string; created_at: string;
sender_name: string; sender_role: string;
}, [string, number]>(
`SELECT cm.id, cm.channel_id, cm.from_agent, cm.content, cm.created_at,
COALESCE(a.name, cm.from_agent) AS sender_name,
COALESCE(a.role, 'unknown') AS sender_role
FROM channel_messages cm
LEFT JOIN agents a ON cm.from_agent = a.id
WHERE cm.channel_id = ?
ORDER BY cm.created_at ASC LIMIT ?`
).all(channelId, limit).map(r => ({
id: r.id, channelId: r.channel_id, fromAgent: r.from_agent,
content: r.content, createdAt: r.created_at,
senderName: r.sender_name, senderRole: r.sender_role,
}));
}
sendChannelMessage(channelId: string, fromAgent: string, content: string): string {
const id = randomUUID();
this.db.query(
`INSERT INTO channel_messages (id, channel_id, from_agent, content)
VALUES (?1, ?2, ?3, ?4)`
).run(id, channelId, fromAgent, content);
return id;
}
// ── Heartbeats ───────────────────────────────────────────────────────────
heartbeat(agentId: string): void {
const now = Math.floor(Date.now() / 1000);
this.db.query(
`INSERT INTO heartbeats (agent_id, timestamp) VALUES (?1, ?2)
ON CONFLICT(agent_id) DO UPDATE SET timestamp = excluded.timestamp`
).run(agentId, now);
}
// ── Dead letter queue ────────────────────────────────────────────────────
getDeadLetters(limit = 50): DeadLetter[] {
return this.db.query<{
id: number; from_agent: string; to_agent: string;
content: string; error: string; created_at: string;
}, [number]>(
`SELECT id, from_agent, to_agent, content, error, created_at
FROM dead_letter_queue ORDER BY created_at DESC LIMIT ?`
).all(limit).map(r => ({
id: r.id, fromAgent: r.from_agent, toAgent: r.to_agent,
content: r.content, error: r.error, createdAt: r.created_at,
}));
}
// ── Audit log ────────────────────────────────────────────────────────────
logAudit(agentId: string, eventType: string, detail: string): void {
this.db.query(
`INSERT INTO audit_log (agent_id, event_type, detail)
VALUES (?1, ?2, ?3)`
).run(agentId, eventType, detail);
}
getAuditLog(limit = 100): AuditEntry[] {
return this.db.query<{
id: number; agent_id: string; event_type: string;
detail: string; created_at: string;
}, [number]>(
`SELECT id, agent_id, event_type, detail, created_at
FROM audit_log ORDER BY created_at DESC LIMIT ?`
).all(limit).map(r => ({
id: r.id, agentId: r.agent_id, eventType: r.event_type,
detail: r.detail, createdAt: r.created_at,
}));
}
// ── Seen messages (per-session acknowledgment) ───────────────────────────
markSeen(sessionId: string, messageIds: string[]): void {
if (messageIds.length === 0) return;
const stmt = this.db.prepare(
"INSERT OR IGNORE INTO seen_messages (session_id, message_id) VALUES (?, ?)"
);
const tx = this.db.transaction(() => {
for (const mid of messageIds) stmt.run(sessionId, mid);
});
tx();
}
pruneSeen(maxAgeSecs = 7 * 24 * 3600): number {
const result = this.db.query(
"DELETE FROM seen_messages WHERE seen_at < unixepoch() - ?"
).run(maxAgeSecs);
return (result as { changes: number }).changes;
}
// ── Lifecycle ────────────────────────────────────────────────────────────
close(): void {
this.db.close();
}
}
// Singleton
export const btmsgDb = new BtmsgDb();