// btmsg — Access to btmsg SQLite database // Database at ~/.local/share/agor/btmsg.db (created by btmsg CLI) // Path configurable via init() for test isolation. use rusqlite::{params, Connection, OpenFlags}; use serde::{Deserialize, Serialize}; use crate::error::AppError; use std::path::PathBuf; use std::sync::OnceLock; static DB_PATH: OnceLock = OnceLock::new(); /// Set the btmsg database path. Must be called before any db access. /// Called from lib.rs setup with AppConfig-resolved path. pub fn init(path: PathBuf) { let _ = DB_PATH.set(path); } fn db_path() -> PathBuf { DB_PATH.get().cloned().unwrap_or_else(|| { dirs::data_dir() .unwrap_or_else(|| PathBuf::from(".")) .join("agor") .join("btmsg.db") }) } fn open_db() -> Result { let path = db_path(); if !path.exists() { return Err(AppError::database("btmsg database not found. Run 'btmsg register' first.")); } let conn = Connection::open_with_flags(&path, OpenFlags::SQLITE_OPEN_READ_WRITE) .map_err(|e| AppError::database(format!("Failed to open btmsg.db: {e}")))?; conn.query_row("PRAGMA journal_mode=WAL", [], |_| Ok(())) .map_err(|e| AppError::database(format!("Failed to set WAL mode: {e}")))?; conn.query_row("PRAGMA busy_timeout = 5000", [], |_| Ok(())) .map_err(|e| AppError::database(format!("Failed to set busy_timeout: {e}")))?; conn.execute_batch("PRAGMA foreign_keys = ON") .map_err(|e| AppError::database(format!("Failed to enable foreign keys: {e}")))?; // Migration: add seen_messages table if not present conn.execute_batch( "CREATE TABLE IF NOT EXISTS seen_messages ( session_id TEXT NOT NULL, message_id TEXT NOT NULL, seen_at INTEGER NOT NULL DEFAULT (unixepoch()), PRIMARY KEY (session_id, message_id), FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_seen_messages_session ON seen_messages(session_id);" ).map_err(|e| format!("Migration error (seen_messages): {e}"))?; // Migration: add sender_group_id column to messages if not present // SQLite ALTER TABLE ADD COLUMN is a no-op if column already exists (errors silently) let _ = conn.execute_batch("ALTER TABLE messages ADD COLUMN sender_group_id TEXT"); Ok(conn) } #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct BtmsgAgent { pub id: String, pub name: String, pub role: String, pub group_id: String, pub tier: i32, pub model: Option, pub status: String, pub unread_count: i32, } #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct BtmsgMessage { pub id: String, pub from_agent: String, pub to_agent: String, pub content: String, pub read: bool, pub reply_to: Option, pub created_at: String, pub sender_name: Option, pub sender_role: Option, } #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct BtmsgFeedMessage { pub id: String, pub from_agent: String, pub to_agent: String, pub content: String, pub created_at: String, pub reply_to: Option, pub sender_name: String, pub sender_role: String, pub recipient_name: String, pub recipient_role: String, } #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct BtmsgChannel { pub id: String, pub name: String, pub group_id: String, pub created_by: String, pub member_count: i32, pub created_at: String, } #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct BtmsgChannelMessage { pub id: String, pub channel_id: String, pub from_agent: String, pub content: String, pub created_at: String, pub sender_name: String, pub sender_role: String, } pub fn get_agents(group_id: &str) -> Result, AppError> { let db = open_db()?; let mut stmt = db.prepare( "SELECT a.*, (SELECT COUNT(*) FROM messages m WHERE m.to_agent = a.id AND m.read = 0) as unread_count \ FROM agents a WHERE a.group_id = ? ORDER BY a.tier, a.role, a.name" ).map_err(|e| AppError::database(format!("Query error: {e}")))?; let agents = stmt.query_map(params![group_id], |row| { Ok(BtmsgAgent { id: row.get("id")?, name: row.get("name")?, role: row.get("role")?, group_id: row.get("group_id")?, tier: row.get("tier")?, model: row.get("model")?, status: row.get::<_, Option>("status")?.unwrap_or_else(|| "stopped".into()), unread_count: row.get("unread_count")?, }) }).map_err(|e| AppError::database(format!("Query error: {e}")))?; agents.collect::, _>>().map_err(|e| AppError::database(format!("Row error: {e}"))) } pub fn unread_count(agent_id: &str) -> Result { let db = open_db()?; db.query_row( "SELECT COUNT(*) FROM messages WHERE to_agent = ? AND read = 0", params![agent_id], |row| row.get(0), ).map_err(|e| AppError::database(format!("Query error: {e}"))) } pub fn unread_messages(agent_id: &str) -> Result, AppError> { let db = open_db()?; let mut stmt = db.prepare( "SELECT m.id, m.from_agent, m.to_agent, m.content, m.read, m.reply_to, m.created_at, \ a.name AS sender_name, a.role AS sender_role \ FROM messages m JOIN agents a ON m.from_agent = a.id \ WHERE m.to_agent = ? AND m.read = 0 ORDER BY m.created_at ASC" ).map_err(|e| AppError::database(format!("Query error: {e}")))?; let msgs = stmt.query_map(params![agent_id], |row| { Ok(BtmsgMessage { id: row.get("id")?, from_agent: row.get("from_agent")?, to_agent: row.get("to_agent")?, content: row.get("content")?, read: row.get::<_, i32>("read")? != 0, reply_to: row.get("reply_to")?, created_at: row.get("created_at")?, sender_name: row.get("sender_name")?, sender_role: row.get("sender_role")?, }) }).map_err(|e| AppError::database(format!("Query error: {e}")))?; msgs.collect::, _>>().map_err(|e| AppError::database(format!("Row error: {e}"))) } /// Get messages that have not been seen by this session. /// Unlike unread_messages (which uses the global `read` flag), /// this tracks per-session acknowledgment via the seen_messages table. pub fn unseen_messages(agent_id: &str, session_id: &str) -> Result, AppError> { let db = open_db()?; let mut stmt = db.prepare( "SELECT m.id, m.from_agent, m.to_agent, m.content, m.read, m.reply_to, m.created_at, \ a.name AS sender_name, a.role AS sender_role \ FROM messages m \ LEFT JOIN agents a ON a.id = m.from_agent \ WHERE m.to_agent = ?1 \ AND m.id NOT IN (SELECT message_id FROM seen_messages WHERE session_id = ?2) \ ORDER BY m.created_at ASC" ).map_err(|e| AppError::database(format!("Prepare unseen query: {e}")))?; let rows = stmt.query_map(params![agent_id, session_id], |row| { Ok(BtmsgMessage { id: row.get("id")?, from_agent: row.get("from_agent")?, to_agent: row.get("to_agent")?, content: row.get("content")?, read: row.get::<_, i32>("read")? != 0, reply_to: row.get("reply_to")?, created_at: row.get("created_at")?, sender_name: row.get("sender_name")?, sender_role: row.get("sender_role")?, }) }).map_err(|e| AppError::database(format!("Query unseen: {e}")))?; rows.collect::, _>>().map_err(|e| AppError::database(format!("Row error: {e}"))) } /// Mark specific message IDs as seen by this session. pub fn mark_messages_seen(session_id: &str, message_ids: &[String]) -> Result<(), AppError> { if message_ids.is_empty() { return Ok(()); } let db = open_db()?; let mut stmt = db.prepare( "INSERT OR IGNORE INTO seen_messages (session_id, message_id) VALUES (?1, ?2)" ).map_err(|e| AppError::database(format!("Prepare mark_seen: {e}")))?; for id in message_ids { stmt.execute(params![session_id, id]) .map_err(|e| AppError::database(format!("Insert seen: {e}")))?; } Ok(()) } /// Prune seen_messages entries older than the given threshold. /// Uses emergency aggressive pruning (3 days) when row count exceeds the threshold. pub fn prune_seen_messages(max_age_secs: i64, emergency_threshold: i64) -> Result { let db = open_db()?; let count: i64 = db.query_row( "SELECT COUNT(*) FROM seen_messages", [], |row| row.get(0) ).map_err(|e| AppError::database(format!("Count seen: {e}")))?; let threshold_secs = if count > emergency_threshold { // Emergency: prune more aggressively (3 days instead of configured max) max_age_secs.min(3 * 24 * 3600) } else { max_age_secs }; let deleted = db.execute( "DELETE FROM seen_messages WHERE seen_at < unixepoch() - ?1", params![threshold_secs], ).map_err(|e| AppError::database(format!("Prune seen: {e}")))?; Ok(deleted as u64) } pub fn history(agent_id: &str, other_id: &str, limit: i32) -> Result, AppError> { let db = open_db()?; let mut stmt = db.prepare( "SELECT m.id, m.from_agent, m.to_agent, m.content, m.read, m.reply_to, m.created_at, \ a.name AS sender_name, a.role AS sender_role \ FROM messages m JOIN agents a ON m.from_agent = a.id \ WHERE (m.from_agent = ?1 AND m.to_agent = ?2) OR (m.from_agent = ?2 AND m.to_agent = ?1) \ ORDER BY m.created_at ASC LIMIT ?3" ).map_err(|e| AppError::database(format!("Query error: {e}")))?; let msgs = stmt.query_map(params![agent_id, other_id, limit], |row| { Ok(BtmsgMessage { id: row.get("id")?, from_agent: row.get("from_agent")?, to_agent: row.get("to_agent")?, content: row.get("content")?, read: row.get::<_, i32>("read")? != 0, reply_to: row.get("reply_to")?, created_at: row.get("created_at")?, sender_name: row.get("sender_name")?, sender_role: row.get("sender_role")?, }) }).map_err(|e| AppError::database(format!("Query error: {e}")))?; msgs.collect::, _>>().map_err(|e| AppError::database(format!("Row error: {e}"))) } /// Default heartbeat staleness threshold: 5 minutes const STALE_HEARTBEAT_SECS: i64 = 300; pub fn send_message(from_agent: &str, to_agent: &str, content: &str) -> Result { let db = open_db()?; // Get sender's group and tier let (group_id, sender_tier): (String, i32) = db.query_row( "SELECT group_id, tier FROM agents WHERE id = ?", params![from_agent], |row| Ok((row.get(0)?, row.get(1)?)), ).map_err(|e| AppError::database(format!("Sender not found: {e}")))?; // Admin (tier 0) bypasses contact restrictions if sender_tier > 0 { let allowed: bool = db.query_row( "SELECT COUNT(*) > 0 FROM contacts WHERE agent_id = ? AND contact_id = ?", params![from_agent, to_agent], |row| row.get(0), ).map_err(|e| AppError::database(format!("Contact check error: {e}")))?; if !allowed { return Err(AppError::database(format!("Not allowed to message '{to_agent}'"))); } } // Check if recipient is stale (no heartbeat in 5 min) — route to dead letter queue // Skip heartbeat check for admin (tier 0) — admin messages always go through if sender_tier > 0 { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs() as i64; let cutoff = now - STALE_HEARTBEAT_SECS; let recipient_stale: bool = db .query_row( "SELECT COALESCE(h.timestamp, 0) < ?1 FROM agents a \ LEFT JOIN heartbeats h ON a.id = h.agent_id \ WHERE a.id = ?2", params![cutoff, to_agent], |row| row.get(0), ) .unwrap_or(false); if recipient_stale { // Queue to dead letter instead of delivering let error_msg = format!("Recipient '{}' is stale (no heartbeat in {} seconds)", to_agent, STALE_HEARTBEAT_SECS); db.execute( "INSERT INTO dead_letter_queue (from_agent, to_agent, content, error) VALUES (?1, ?2, ?3, ?4)", params![from_agent, to_agent, content, error_msg], ) .map_err(|e| AppError::database(format!("Dead letter insert error: {e}")))?; // Also log audit event let _ = db.execute( "INSERT INTO audit_log (agent_id, event_type, detail) VALUES (?1, 'dead_letter', ?2)", params![from_agent, format!("Message to '{}' routed to dead letter queue: {}", to_agent, error_msg)], ); return Err(AppError::database(error_msg)); } } let msg_id = uuid::Uuid::new_v4().to_string(); db.execute( "INSERT INTO messages (id, from_agent, to_agent, content, group_id, sender_group_id) \ VALUES (?1, ?2, ?3, ?4, ?5, (SELECT group_id FROM agents WHERE id = ?2))", params![msg_id, from_agent, to_agent, content, group_id], ).map_err(|e| AppError::database(format!("Insert error: {e}")))?; Ok(msg_id) } pub fn set_status(agent_id: &str, status: &str) -> Result<(), AppError> { let db = open_db()?; db.execute( "UPDATE agents SET status = ?, last_active_at = datetime('now') WHERE id = ?", params![status, agent_id], ).map_err(|e| AppError::database(format!("Update error: {e}")))?; Ok(()) } pub fn ensure_admin(group_id: &str) -> Result<(), AppError> { let db = open_db()?; let exists: bool = db.query_row( "SELECT COUNT(*) > 0 FROM agents WHERE id = 'admin'", [], |row| row.get(0), ).map_err(|e| AppError::database(format!("Query error: {e}")))?; if !exists { db.execute( "INSERT INTO agents (id, name, role, group_id, tier, status) \ VALUES ('admin', 'Operator', 'admin', ?, 0, 'active')", params![group_id], ).map_err(|e| AppError::database(format!("Insert error: {e}")))?; } // Ensure admin has bidirectional contacts with ALL agents in the group let mut stmt = db.prepare( "SELECT id FROM agents WHERE group_id = ? AND id != 'admin'" ).map_err(|e| AppError::database(format!("Query error: {e}")))?; let agent_ids: Vec = stmt.query_map(params![group_id], |row| row.get(0)) .map_err(|e| AppError::database(format!("Query error: {e}")))? .collect::, _>>() .map_err(|e| AppError::database(format!("Row error: {e}")))?; drop(stmt); for aid in &agent_ids { db.execute( "INSERT OR IGNORE INTO contacts (agent_id, contact_id) VALUES ('admin', ?)", params![aid], ).map_err(|e| AppError::database(format!("Insert error: {e}")))?; db.execute( "INSERT OR IGNORE INTO contacts (agent_id, contact_id) VALUES (?, 'admin')", params![aid], ).map_err(|e| AppError::database(format!("Insert error: {e}")))?; } Ok(()) } /// Register all agents from groups.json into the btmsg database. /// Creates/updates agent records and sets up contact permissions: /// - All Tier 1 agents get bidirectional contacts with each other /// - Manager gets contacts with ALL agents (Tier 1 + Tier 2) /// - Other Tier 1 agents get contacts with Manager /// Also ensures admin agent and review channels exist. pub fn register_agents_from_groups(groups: &crate::groups::GroupsFile) -> Result<(), AppError> { for group in &groups.groups { register_group_agents(group)?; } Ok(()) } /// Register all agents for a single group. fn register_group_agents(group: &crate::groups::GroupConfig) -> Result<(), AppError> { let db = open_db_or_create()?; let group_id = &group.id; // Collect agent IDs by tier for contact setup let mut tier1_ids: Vec = Vec::new(); let mut tier2_ids: Vec = Vec::new(); let mut manager_id: Option = None; // Register Tier 1 agents (from agents array) for agent in &group.agents { let tier = match agent.role.as_str() { "manager" | "architect" | "tester" | "reviewer" => 1, _ => 2, }; upsert_agent( &db, &agent.id, &agent.name, &agent.role, group_id, tier, agent.model.as_deref(), agent.cwd.as_deref(), agent.system_prompt.as_deref(), )?; if tier == 1 { tier1_ids.push(agent.id.clone()); if agent.role == "manager" { manager_id = Some(agent.id.clone()); } } else { tier2_ids.push(agent.id.clone()); } } // Register Tier 2 agents (from projects array) for project in &group.projects { upsert_agent( &db, &project.id, &project.name, "project", group_id, 2, None, Some(&project.cwd), None, )?; tier2_ids.push(project.id.clone()); } // Set up contact permissions // All Tier 1 agents: bidirectional contacts with each other for i in 0..tier1_ids.len() { for j in (i + 1)..tier1_ids.len() { db.execute( "INSERT OR IGNORE INTO contacts (agent_id, contact_id) VALUES (?1, ?2)", params![tier1_ids[i], tier1_ids[j]], ).map_err(|e| AppError::database(format!("Contact insert error: {e}")))?; db.execute( "INSERT OR IGNORE INTO contacts (agent_id, contact_id) VALUES (?1, ?2)", params![tier1_ids[j], tier1_ids[i]], ).map_err(|e| AppError::database(format!("Contact insert error: {e}")))?; } } // Manager gets contacts with ALL Tier 2 agents (bidirectional) if let Some(ref mgr_id) = manager_id { for t2_id in &tier2_ids { db.execute( "INSERT OR IGNORE INTO contacts (agent_id, contact_id) VALUES (?1, ?2)", params![mgr_id, t2_id], ).map_err(|e| AppError::database(format!("Contact insert error: {e}")))?; db.execute( "INSERT OR IGNORE INTO contacts (agent_id, contact_id) VALUES (?1, ?2)", params![t2_id, mgr_id], ).map_err(|e| AppError::database(format!("Contact insert error: {e}")))?; } } // Ensure review channels exist ensure_review_channels_for_group(&db, group_id); // Drop the connection before calling ensure_admin (which opens its own) drop(db); // Ensure admin agent exists with contacts to all agents let _ = ensure_admin(group_id); Ok(()) } fn upsert_agent( db: &Connection, id: &str, name: &str, role: &str, group_id: &str, tier: i32, model: Option<&str>, cwd: Option<&str>, system_prompt: Option<&str>, ) -> Result<(), AppError> { db.execute( "INSERT INTO agents (id, name, role, group_id, tier, model, cwd, system_prompt) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) ON CONFLICT(id) DO UPDATE SET name = excluded.name, role = excluded.role, group_id = excluded.group_id, tier = excluded.tier, model = excluded.model, cwd = excluded.cwd, system_prompt = excluded.system_prompt", params![id, name, role, group_id, tier, model, cwd, system_prompt], ) .map_err(|e| AppError::database(format!("Upsert agent error: {e}")))?; Ok(()) } /// Ensure #review-queue and #review-log channels exist for a group (public wrapper). fn ensure_review_channels_for_group(db: &Connection, group_id: &str) { for name in &["review-queue", "review-log"] { let exists: bool = db .query_row( "SELECT COUNT(*) > 0 FROM channels WHERE name = ?1 AND group_id = ?2", params![name, group_id], |row| row.get(0), ) .unwrap_or(false); if !exists { let id = uuid::Uuid::new_v4().to_string(); let _ = db.execute( "INSERT INTO channels (id, name, group_id, created_by) VALUES (?1, ?2, ?3, 'system')", params![id, name, group_id], ); } } } /// Open btmsg database, creating it with schema if it doesn't exist. fn open_db_or_create() -> Result { let path = db_path(); if let Some(parent) = path.parent() { std::fs::create_dir_all(parent) .map_err(|e| AppError::database(format!("Failed to create data dir: {e}")))?; } let conn = Connection::open_with_flags( &path, OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE, ) .map_err(|e| AppError::database(format!("Failed to open/create btmsg.db: {e}")))?; conn.query_row("PRAGMA journal_mode=WAL", [], |_| Ok(())) .map_err(|e| AppError::database(format!("Failed to set WAL mode: {e}")))?; conn.query_row("PRAGMA busy_timeout = 5000", [], |_| Ok(())) .map_err(|e| AppError::database(format!("Failed to set busy_timeout: {e}")))?; // Create tables if they don't exist (same schema as Python btmsg CLI) conn.execute_batch( "CREATE TABLE IF NOT EXISTS agents ( id TEXT PRIMARY KEY, name TEXT NOT NULL, role TEXT NOT NULL, group_id TEXT NOT NULL, tier INTEGER NOT NULL DEFAULT 2, model TEXT, cwd TEXT, system_prompt TEXT, status TEXT DEFAULT 'stopped', last_active_at TEXT, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS contacts ( agent_id TEXT NOT NULL, contact_id TEXT NOT NULL, PRIMARY KEY (agent_id, contact_id), FOREIGN KEY (agent_id) REFERENCES agents(id), FOREIGN KEY (contact_id) REFERENCES agents(id) ); CREATE TABLE IF NOT EXISTS messages ( id TEXT PRIMARY KEY, from_agent TEXT NOT NULL, to_agent TEXT NOT NULL, content TEXT NOT NULL, read INTEGER DEFAULT 0, reply_to TEXT, group_id TEXT NOT NULL, sender_group_id TEXT, created_at TEXT DEFAULT (datetime('now')), FOREIGN KEY (from_agent) REFERENCES agents(id), FOREIGN KEY (to_agent) REFERENCES agents(id) ); CREATE INDEX IF NOT EXISTS idx_messages_to ON messages(to_agent, read); CREATE INDEX IF NOT EXISTS idx_messages_from ON messages(from_agent); CREATE INDEX IF NOT EXISTS idx_messages_group ON messages(group_id); CREATE INDEX IF NOT EXISTS idx_messages_reply ON messages(reply_to); CREATE TABLE IF NOT EXISTS channels ( id TEXT PRIMARY KEY, name TEXT NOT NULL, group_id TEXT NOT NULL, created_by TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS channel_members ( channel_id TEXT NOT NULL, agent_id TEXT NOT NULL, joined_at TEXT DEFAULT (datetime('now')), PRIMARY KEY (channel_id, agent_id), FOREIGN KEY (channel_id) REFERENCES channels(id), FOREIGN KEY (agent_id) REFERENCES agents(id) ); CREATE TABLE IF NOT EXISTS channel_messages ( id TEXT PRIMARY KEY, channel_id TEXT NOT NULL, from_agent TEXT NOT NULL, content TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')), FOREIGN KEY (channel_id) REFERENCES channels(id), FOREIGN KEY (from_agent) REFERENCES agents(id) ); CREATE INDEX IF NOT EXISTS idx_channel_messages ON channel_messages(channel_id, created_at); CREATE TABLE IF NOT EXISTS tasks ( id TEXT PRIMARY KEY, title TEXT NOT NULL, description TEXT DEFAULT '', status TEXT DEFAULT 'todo', priority TEXT DEFAULT 'medium', assigned_to TEXT, created_by TEXT NOT NULL, group_id TEXT NOT NULL, parent_task_id TEXT, sort_order INTEGER DEFAULT 0, created_at TEXT DEFAULT (datetime('now')), updated_at TEXT DEFAULT (datetime('now')), FOREIGN KEY (assigned_to) REFERENCES agents(id), FOREIGN KEY (created_by) REFERENCES agents(id) ); CREATE TABLE IF NOT EXISTS task_comments ( id TEXT PRIMARY KEY, task_id TEXT NOT NULL, agent_id TEXT NOT NULL, content TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')), FOREIGN KEY (task_id) REFERENCES tasks(id), FOREIGN KEY (agent_id) REFERENCES agents(id) ); CREATE INDEX IF NOT EXISTS idx_tasks_group ON tasks(group_id); CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status); CREATE INDEX IF NOT EXISTS idx_tasks_assigned ON tasks(assigned_to); CREATE INDEX IF NOT EXISTS idx_task_comments_task ON task_comments(task_id); CREATE TABLE IF NOT EXISTS heartbeats ( agent_id TEXT PRIMARY KEY, timestamp INTEGER NOT NULL, FOREIGN KEY (agent_id) REFERENCES agents(id) ); CREATE TABLE IF NOT EXISTS dead_letter_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, from_agent TEXT NOT NULL, to_agent TEXT NOT NULL, content TEXT NOT NULL, error TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')), FOREIGN KEY (from_agent) REFERENCES agents(id), FOREIGN KEY (to_agent) REFERENCES agents(id) ); CREATE INDEX IF NOT EXISTS idx_dead_letter_from ON dead_letter_queue(from_agent); CREATE INDEX IF NOT EXISTS idx_dead_letter_to ON dead_letter_queue(to_agent); CREATE TABLE IF NOT EXISTS audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent_id TEXT NOT NULL, event_type TEXT NOT NULL, detail TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')), FOREIGN KEY (agent_id) REFERENCES agents(id) ); CREATE INDEX IF NOT EXISTS idx_audit_log_agent ON audit_log(agent_id); CREATE INDEX IF NOT EXISTS idx_audit_log_type ON audit_log(event_type); CREATE TABLE IF NOT EXISTS seen_messages ( session_id TEXT NOT NULL, message_id TEXT NOT NULL, seen_at INTEGER NOT NULL DEFAULT (unixepoch()), PRIMARY KEY (session_id, message_id), FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_seen_messages_session ON seen_messages(session_id);" ).map_err(|e| AppError::database(format!("Schema creation error: {e}")))?; // Enable foreign keys for ON DELETE CASCADE support conn.execute_batch("PRAGMA foreign_keys = ON") .map_err(|e| AppError::database(format!("Failed to enable foreign keys: {e}")))?; Ok(conn) } // ---- Heartbeat monitoring ---- #[allow(dead_code)] // Constructed in get_agent_heartbeats, called via Tauri IPC #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct AgentHeartbeat { pub agent_id: String, pub agent_name: String, pub agent_role: String, pub timestamp: i64, } pub fn record_heartbeat(agent_id: &str) -> Result<(), AppError> { let db = open_db()?; let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map_err(|e| AppError::database(format!("Time error: {e}")))? .as_secs() as i64; db.execute( "INSERT INTO heartbeats (agent_id, timestamp) VALUES (?1, ?2) \ ON CONFLICT(agent_id) DO UPDATE SET timestamp = excluded.timestamp", params![agent_id, now], ) .map_err(|e| AppError::database(format!("Heartbeat upsert error: {e}")))?; Ok(()) } #[allow(dead_code)] // Called via Tauri IPC command btmsg_get_agent_heartbeats pub fn get_agent_heartbeats(group_id: &str) -> Result, AppError> { let db = open_db()?; let mut stmt = db .prepare( "SELECT h.agent_id, a.name AS agent_name, a.role AS agent_role, h.timestamp \ FROM heartbeats h JOIN agents a ON h.agent_id = a.id \ WHERE a.group_id = ? ORDER BY h.timestamp DESC", ) .map_err(|e| AppError::database(format!("Query error: {e}")))?; let rows = stmt .query_map(params![group_id], |row| { Ok(AgentHeartbeat { agent_id: row.get("agent_id")?, agent_name: row.get("agent_name")?, agent_role: row.get("agent_role")?, timestamp: row.get("timestamp")?, }) }) .map_err(|e| AppError::database(format!("Query error: {e}")))?; rows.collect::, _>>() .map_err(|e| AppError::database(format!("Row error: {e}"))) } pub fn get_stale_agents(group_id: &str, threshold_secs: i64) -> Result, AppError> { let db = open_db()?; let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map_err(|e| AppError::database(format!("Time error: {e}")))? .as_secs() as i64; let cutoff = now - threshold_secs; // Agents in the group that either have no heartbeat or heartbeat older than cutoff let mut stmt = db .prepare( "SELECT a.id FROM agents a LEFT JOIN heartbeats h ON a.id = h.agent_id \ WHERE a.group_id = ? AND a.tier > 0 AND (h.timestamp IS NULL OR h.timestamp < ?)", ) .map_err(|e| AppError::database(format!("Query error: {e}")))?; let ids = stmt .query_map(params![group_id, cutoff], |row| row.get::<_, String>(0)) .map_err(|e| AppError::database(format!("Query error: {e}")))?; ids.collect::, _>>() .map_err(|e| AppError::database(format!("Row error: {e}"))) } // ---- Dead letter queue ---- #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct DeadLetter { pub id: i64, pub from_agent: String, pub to_agent: String, pub content: String, pub error: String, pub created_at: String, } pub fn get_dead_letters(group_id: &str, limit: i32) -> Result, AppError> { let db = open_db()?; let mut stmt = db .prepare( "SELECT d.id, d.from_agent, d.to_agent, d.content, d.error, d.created_at \ FROM dead_letter_queue d \ JOIN agents a ON d.to_agent = a.id \ WHERE a.group_id = ? \ ORDER BY d.created_at DESC LIMIT ?", ) .map_err(|e| AppError::database(format!("Query error: {e}")))?; let rows = stmt .query_map(params![group_id, limit], |row| { Ok(DeadLetter { id: row.get("id")?, from_agent: row.get("from_agent")?, to_agent: row.get("to_agent")?, content: row.get("content")?, error: row.get("error")?, created_at: row.get("created_at")?, }) }) .map_err(|e| AppError::database(format!("Query error: {e}")))?; rows.collect::, _>>() .map_err(|e| AppError::database(format!("Row error: {e}"))) } #[allow(dead_code)] // Called via Tauri IPC command btmsg_queue_dead_letter pub fn queue_dead_letter( from_agent: &str, to_agent: &str, content: &str, error: &str, ) -> Result<(), AppError> { let db = open_db()?; db.execute( "INSERT INTO dead_letter_queue (from_agent, to_agent, content, error) VALUES (?1, ?2, ?3, ?4)", params![from_agent, to_agent, content, error], ) .map_err(|e| AppError::database(format!("Dead letter insert error: {e}")))?; Ok(()) } pub fn clear_dead_letters(group_id: &str) -> Result<(), AppError> { let db = open_db()?; db.execute( "DELETE FROM dead_letter_queue WHERE to_agent IN (SELECT id FROM agents WHERE group_id = ?)", params![group_id], ) .map_err(|e| AppError::database(format!("Delete error: {e}")))?; Ok(()) } /// Clear all communications for a group: messages, channel messages, seen tracking, dead letters pub fn clear_all_communications(group_id: &str) -> Result<(), AppError> { let db = open_db()?; let agent_ids_clause = "(SELECT id FROM agents WHERE group_id = ?1)"; // Order matters: seen_messages references messages, so delete seen first db.execute( &format!("DELETE FROM seen_messages WHERE message_id IN (SELECT id FROM messages WHERE group_id = ?1 OR from_agent IN {agent_ids_clause} OR to_agent IN {agent_ids_clause})"), params![group_id], ).map_err(|e| AppError::database(format!("Clear seen_messages error: {e}")))?; db.execute( &format!("DELETE FROM messages WHERE group_id = ?1 OR from_agent IN {agent_ids_clause} OR to_agent IN {agent_ids_clause}"), params![group_id], ).map_err(|e| AppError::database(format!("Clear messages error: {e}")))?; db.execute( &format!("DELETE FROM channel_messages WHERE channel_id IN (SELECT id FROM channels WHERE group_id = ?1) OR from_agent IN {agent_ids_clause}"), params![group_id], ).map_err(|e| AppError::database(format!("Clear channel_messages error: {e}")))?; db.execute( &format!("DELETE FROM dead_letter_queue WHERE from_agent IN {agent_ids_clause} OR to_agent IN {agent_ids_clause}"), params![group_id], ).map_err(|e| AppError::database(format!("Clear dead_letter_queue error: {e}")))?; Ok(()) } // ---- Audit log ---- #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct AuditEntry { pub id: i64, pub agent_id: String, pub event_type: String, pub detail: String, pub created_at: String, } pub fn log_audit_event(agent_id: &str, event_type: &str, detail: &str) -> Result<(), AppError> { let db = open_db_or_create()?; db.execute( "INSERT INTO audit_log (agent_id, event_type, detail) VALUES (?1, ?2, ?3)", params![agent_id, event_type, detail], ) .map_err(|e| AppError::database(format!("Audit log insert error: {e}")))?; Ok(()) } pub fn get_audit_log(group_id: &str, limit: i32, offset: i32) -> Result, AppError> { let db = open_db()?; let mut stmt = db .prepare( "SELECT al.id, al.agent_id, al.event_type, al.detail, al.created_at \ FROM audit_log al \ JOIN agents a ON al.agent_id = a.id \ WHERE a.group_id = ? \ ORDER BY al.created_at DESC, al.id DESC LIMIT ? OFFSET ?", ) .map_err(|e| AppError::database(format!("Query error: {e}")))?; let rows = stmt .query_map(params![group_id, limit, offset], |row| { Ok(AuditEntry { id: row.get("id")?, agent_id: row.get("agent_id")?, event_type: row.get("event_type")?, detail: row.get("detail")?, created_at: row.get("created_at")?, }) }) .map_err(|e| AppError::database(format!("Query error: {e}")))?; rows.collect::, _>>() .map_err(|e| AppError::database(format!("Row error: {e}"))) } pub fn get_audit_log_for_agent( agent_id: &str, limit: i32, ) -> Result, AppError> { let db = open_db()?; let mut stmt = db .prepare( "SELECT id, agent_id, event_type, detail, created_at \ FROM audit_log WHERE agent_id = ? ORDER BY created_at DESC, id DESC LIMIT ?", ) .map_err(|e| AppError::database(format!("Query error: {e}")))?; let rows = stmt .query_map(params![agent_id, limit], |row| { Ok(AuditEntry { id: row.get("id")?, agent_id: row.get("agent_id")?, event_type: row.get("event_type")?, detail: row.get("detail")?, created_at: row.get("created_at")?, }) }) .map_err(|e| AppError::database(format!("Query error: {e}")))?; rows.collect::, _>>() .map_err(|e| AppError::database(format!("Row error: {e}"))) } pub fn all_feed(group_id: &str, limit: i32) -> Result, AppError> { let db = open_db()?; let mut stmt = db.prepare( "SELECT m.id, m.from_agent, m.to_agent, m.content, m.created_at, m.reply_to, \ a1.name AS sender_name, a1.role AS sender_role, \ a2.name AS recipient_name, a2.role AS recipient_role \ FROM messages m \ JOIN agents a1 ON m.from_agent = a1.id \ JOIN agents a2 ON m.to_agent = a2.id \ WHERE m.group_id = ? \ ORDER BY m.created_at DESC LIMIT ?" ).map_err(|e| AppError::database(format!("Query error: {e}")))?; let msgs = stmt.query_map(params![group_id, limit], |row| { Ok(BtmsgFeedMessage { id: row.get("id")?, from_agent: row.get("from_agent")?, to_agent: row.get("to_agent")?, content: row.get("content")?, created_at: row.get("created_at")?, reply_to: row.get("reply_to")?, sender_name: row.get("sender_name")?, sender_role: row.get("sender_role")?, recipient_name: row.get("recipient_name")?, recipient_role: row.get("recipient_role")?, }) }).map_err(|e| AppError::database(format!("Query error: {e}")))?; msgs.collect::, _>>().map_err(|e| AppError::database(format!("Row error: {e}"))) } pub fn mark_read_conversation(reader_id: &str, sender_id: &str) -> Result<(), AppError> { let db = open_db()?; db.execute( "UPDATE messages SET read = 1 WHERE to_agent = ? AND from_agent = ? AND read = 0", params![reader_id, sender_id], ).map_err(|e| AppError::database(format!("Update error: {e}")))?; Ok(()) } pub fn get_channels(group_id: &str) -> Result, AppError> { let db = open_db()?; let mut stmt = db.prepare( "SELECT c.id, c.name, c.group_id, c.created_by, \ (SELECT COUNT(*) FROM channel_members cm WHERE cm.channel_id = c.id) AS member_count, \ c.created_at \ FROM channels c WHERE c.group_id = ? ORDER BY c.name" ).map_err(|e| AppError::database(format!("Query error: {e}")))?; let channels = stmt.query_map(params![group_id], |row| { Ok(BtmsgChannel { id: row.get("id")?, name: row.get("name")?, group_id: row.get("group_id")?, created_by: row.get("created_by")?, member_count: row.get("member_count")?, created_at: row.get("created_at")?, }) }).map_err(|e| AppError::database(format!("Query error: {e}")))?; channels.collect::, _>>().map_err(|e| AppError::database(format!("Row error: {e}"))) } pub fn get_channel_messages(channel_id: &str, limit: i32) -> Result, AppError> { let db = open_db()?; let mut stmt = db.prepare( "SELECT cm.id, cm.channel_id, cm.from_agent, cm.content, cm.created_at, \ a.name AS sender_name, a.role AS sender_role \ FROM channel_messages cm JOIN agents a ON cm.from_agent = a.id \ WHERE cm.channel_id = ? ORDER BY cm.created_at ASC LIMIT ?" ).map_err(|e| AppError::database(format!("Query error: {e}")))?; let msgs = stmt.query_map(params![channel_id, limit], |row| { Ok(BtmsgChannelMessage { id: row.get("id")?, channel_id: row.get("channel_id")?, from_agent: row.get("from_agent")?, content: row.get("content")?, created_at: row.get("created_at")?, sender_name: row.get("sender_name")?, sender_role: row.get("sender_role")?, }) }).map_err(|e| AppError::database(format!("Query error: {e}")))?; msgs.collect::, _>>().map_err(|e| AppError::database(format!("Row error: {e}"))) } pub fn send_channel_message(channel_id: &str, from_agent: &str, content: &str) -> Result { let db = open_db()?; // Verify channel exists let _: String = db.query_row( "SELECT id FROM channels WHERE id = ?", params![channel_id], |row| row.get(0), ).map_err(|e| AppError::database(format!("Channel not found: {e}")))?; // Check membership (admin bypasses) let sender_tier: i32 = db.query_row( "SELECT tier FROM agents WHERE id = ?", params![from_agent], |row| row.get(0), ).map_err(|e| AppError::database(format!("Sender not found: {e}")))?; if sender_tier > 0 { let is_member: bool = db.query_row( "SELECT COUNT(*) > 0 FROM channel_members WHERE channel_id = ? AND agent_id = ?", params![channel_id, from_agent], |row| row.get(0), ).map_err(|e| AppError::database(format!("Membership check error: {e}")))?; if !is_member { return Err(AppError::database("Not a member of this channel")); } } let msg_id = uuid::Uuid::new_v4().to_string(); db.execute( "INSERT INTO channel_messages (id, channel_id, from_agent, content) VALUES (?1, ?2, ?3, ?4)", params![msg_id, channel_id, from_agent, content], ).map_err(|e| AppError::database(format!("Insert error: {e}")))?; Ok(msg_id) } pub fn create_channel(name: &str, group_id: &str, created_by: &str) -> Result { let db = open_db()?; let channel_id = uuid::Uuid::new_v4().to_string()[..8].to_string(); db.execute( "INSERT INTO channels (id, name, group_id, created_by) VALUES (?1, ?2, ?3, ?4)", params![channel_id, name, group_id, created_by], ).map_err(|e| AppError::database(format!("Insert error: {e}")))?; // Auto-add creator as member db.execute( "INSERT INTO channel_members (channel_id, agent_id) VALUES (?1, ?2)", params![channel_id, created_by], ).map_err(|e| AppError::database(format!("Insert error: {e}")))?; Ok(channel_id) } pub fn add_channel_member(channel_id: &str, agent_id: &str) -> Result<(), AppError> { let db = open_db()?; db.execute( "INSERT OR IGNORE INTO channel_members (channel_id, agent_id) VALUES (?1, ?2)", params![channel_id, agent_id], ).map_err(|e| AppError::database(format!("Insert error: {e}")))?; Ok(()) } #[cfg(test)] mod tests { use super::*; use rusqlite::Connection; /// Create an in-memory DB with the btmsg schema for testing. fn test_db() -> Connection { let conn = Connection::open_in_memory().unwrap(); conn.execute_batch( "CREATE TABLE agents ( id TEXT PRIMARY KEY, name TEXT NOT NULL, role TEXT NOT NULL, group_id TEXT NOT NULL, tier INTEGER NOT NULL DEFAULT 1, model TEXT, cwd TEXT, system_prompt TEXT, status TEXT DEFAULT 'stopped', last_active_at TEXT ); CREATE TABLE messages ( id TEXT PRIMARY KEY, from_agent TEXT NOT NULL, to_agent TEXT NOT NULL, content TEXT NOT NULL, group_id TEXT NOT NULL, read INTEGER DEFAULT 0, reply_to TEXT, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE contacts ( agent_id TEXT NOT NULL, contact_id TEXT NOT NULL, PRIMARY KEY (agent_id, contact_id) ); CREATE TABLE channels ( id TEXT PRIMARY KEY, name TEXT NOT NULL, group_id TEXT NOT NULL, created_by TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE channel_members ( channel_id TEXT NOT NULL, agent_id TEXT NOT NULL, PRIMARY KEY (channel_id, agent_id) ); CREATE TABLE channel_messages ( id TEXT PRIMARY KEY, channel_id TEXT NOT NULL, from_agent TEXT NOT NULL, content TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE heartbeats ( agent_id TEXT PRIMARY KEY, timestamp INTEGER NOT NULL ); CREATE TABLE dead_letter_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, from_agent TEXT NOT NULL, to_agent TEXT NOT NULL, content TEXT NOT NULL, error TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent_id TEXT NOT NULL, event_type TEXT NOT NULL, detail TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')) );", ) .unwrap(); conn } fn seed_agents(conn: &Connection) { conn.execute( "INSERT INTO agents (id, name, role, group_id, tier, model, system_prompt, status) VALUES ('a1', 'Coder', 'developer', 'g1', 1, 'claude-4', 'You are a coder', 'active')", [], ).unwrap(); conn.execute( "INSERT INTO agents (id, name, role, group_id, tier, model, system_prompt, status) VALUES ('a2', 'Reviewer', 'reviewer', 'g1', 2, NULL, 'You review code', 'sleeping')", [], ).unwrap(); conn.execute( "INSERT INTO agents (id, name, role, group_id, tier, model, system_prompt, status) VALUES ('admin', 'Operator', 'admin', 'g1', 0, NULL, NULL, 'active')", [], ).unwrap(); } // ---- CRITICAL REGRESSION: get_agents named column access ---- // Bug: positional index 7 returned system_prompt instead of status (column 8). // Fix: named access via row.get("status"). #[test] fn test_get_agents_returns_status_not_system_prompt() { let conn = test_db(); seed_agents(&conn); let mut stmt = conn.prepare( "SELECT a.*, (SELECT COUNT(*) FROM messages m WHERE m.to_agent = a.id AND m.read = 0) as unread_count \ FROM agents a WHERE a.group_id = ? ORDER BY a.tier, a.role, a.name" ).unwrap(); let agents: Vec = stmt.query_map(params!["g1"], |row| { Ok(BtmsgAgent { id: row.get("id")?, name: row.get("name")?, role: row.get("role")?, group_id: row.get("group_id")?, tier: row.get("tier")?, model: row.get("model")?, status: row.get::<_, Option>("status")?.unwrap_or_else(|| "stopped".into()), unread_count: row.get("unread_count")?, }) }).unwrap().collect::, _>>().unwrap(); assert_eq!(agents.len(), 3); // admin (tier 0) comes first assert_eq!(agents[0].id, "admin"); assert_eq!(agents[0].status, "active"); assert_eq!(agents[0].tier, 0); // Coder — status must be "active", NOT "You are a coder" (system_prompt) let coder = agents.iter().find(|a| a.id == "a1").unwrap(); assert_eq!(coder.status, "active"); assert_eq!(coder.name, "Coder"); assert_eq!(coder.model, Some("claude-4".to_string())); // Reviewer — status must be "sleeping", NOT "You review code" (system_prompt) let reviewer = agents.iter().find(|a| a.id == "a2").unwrap(); assert_eq!(reviewer.status, "sleeping"); assert_eq!(reviewer.model, None); } #[test] fn test_get_agents_unread_count() { let conn = test_db(); seed_agents(&conn); // Send 2 unread messages to a1 conn.execute( "INSERT INTO messages (id, from_agent, to_agent, content, group_id, read) VALUES ('m1', 'a2', 'a1', 'hi', 'g1', 0)", [], ).unwrap(); conn.execute( "INSERT INTO messages (id, from_agent, to_agent, content, group_id, read) VALUES ('m2', 'admin', 'a1', 'task', 'g1', 0)", [], ).unwrap(); // Send 1 read message to a1 conn.execute( "INSERT INTO messages (id, from_agent, to_agent, content, group_id, read) VALUES ('m3', 'a2', 'a1', 'old', 'g1', 1)", [], ).unwrap(); let mut stmt = conn.prepare( "SELECT a.*, (SELECT COUNT(*) FROM messages m WHERE m.to_agent = a.id AND m.read = 0) as unread_count \ FROM agents a WHERE a.group_id = ? ORDER BY a.tier, a.role, a.name" ).unwrap(); let agents: Vec = stmt.query_map(params!["g1"], |row| { Ok(BtmsgAgent { id: row.get("id")?, name: row.get("name")?, role: row.get("role")?, group_id: row.get("group_id")?, tier: row.get("tier")?, model: row.get("model")?, status: row.get::<_, Option>("status")?.unwrap_or_else(|| "stopped".into()), unread_count: row.get("unread_count")?, }) }).unwrap().collect::, _>>().unwrap(); let coder = agents.iter().find(|a| a.id == "a1").unwrap(); assert_eq!(coder.unread_count, 2); let reviewer = agents.iter().find(|a| a.id == "a2").unwrap(); assert_eq!(reviewer.unread_count, 0); } // ---- REGRESSION: all_feed JOIN alias disambiguation ---- // Bug: JOINed queries had duplicate "name" columns. Fixed with AS aliases. #[test] fn test_all_feed_returns_correct_sender_and_recipient_names() { let conn = test_db(); seed_agents(&conn); conn.execute( "INSERT INTO messages (id, from_agent, to_agent, content, group_id) VALUES ('m1', 'a1', 'a2', 'review this', 'g1')", [], ).unwrap(); let mut stmt = conn.prepare( "SELECT m.id, m.from_agent, m.to_agent, m.content, m.created_at, m.reply_to, \ a1.name AS sender_name, a1.role AS sender_role, \ a2.name AS recipient_name, a2.role AS recipient_role \ FROM messages m \ JOIN agents a1 ON m.from_agent = a1.id \ JOIN agents a2 ON m.to_agent = a2.id \ WHERE m.group_id = ? \ ORDER BY m.created_at DESC LIMIT ?" ).unwrap(); let msgs: Vec = stmt.query_map(params!["g1", 100], |row| { Ok(BtmsgFeedMessage { id: row.get("id")?, from_agent: row.get("from_agent")?, to_agent: row.get("to_agent")?, content: row.get("content")?, created_at: row.get("created_at")?, reply_to: row.get("reply_to")?, sender_name: row.get("sender_name")?, sender_role: row.get("sender_role")?, recipient_name: row.get("recipient_name")?, recipient_role: row.get("recipient_role")?, }) }).unwrap().collect::, _>>().unwrap(); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].sender_name, "Coder"); assert_eq!(msgs[0].sender_role, "developer"); assert_eq!(msgs[0].recipient_name, "Reviewer"); assert_eq!(msgs[0].recipient_role, "reviewer"); } // ---- REGRESSION: unread_messages JOIN alias ---- #[test] fn test_unread_messages_returns_sender_info() { let conn = test_db(); seed_agents(&conn); conn.execute( "INSERT INTO messages (id, from_agent, to_agent, content, group_id, read) VALUES ('m1', 'a1', 'a2', 'check this', 'g1', 0)", [], ).unwrap(); let mut stmt = conn.prepare( "SELECT m.id, m.from_agent, m.to_agent, m.content, m.read, m.reply_to, m.created_at, \ a.name AS sender_name, a.role AS sender_role \ FROM messages m JOIN agents a ON m.from_agent = a.id \ WHERE m.to_agent = ? AND m.read = 0 ORDER BY m.created_at ASC" ).unwrap(); let msgs: Vec = stmt.query_map(params!["a2"], |row| { Ok(BtmsgMessage { id: row.get("id")?, from_agent: row.get("from_agent")?, to_agent: row.get("to_agent")?, content: row.get("content")?, read: row.get::<_, i32>("read")? != 0, reply_to: row.get("reply_to")?, created_at: row.get("created_at")?, sender_name: row.get("sender_name")?, sender_role: row.get("sender_role")?, }) }).unwrap().collect::, _>>().unwrap(); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].sender_name, Some("Coder".to_string())); assert_eq!(msgs[0].sender_role, Some("developer".to_string())); assert!(!msgs[0].read); } // ---- REGRESSION: channel_messages JOIN alias ---- #[test] fn test_channel_messages_returns_sender_info() { let conn = test_db(); seed_agents(&conn); conn.execute( "INSERT INTO channels (id, name, group_id, created_by) VALUES ('ch1', 'general', 'g1', 'admin')", [], ).unwrap(); conn.execute( "INSERT INTO channel_members (channel_id, agent_id) VALUES ('ch1', 'a1')", [], ).unwrap(); conn.execute( "INSERT INTO channel_messages (id, channel_id, from_agent, content) VALUES ('cm1', 'ch1', 'a1', 'hello channel')", [], ).unwrap(); let mut stmt = conn.prepare( "SELECT cm.id, cm.channel_id, cm.from_agent, cm.content, cm.created_at, \ a.name AS sender_name, a.role AS sender_role \ FROM channel_messages cm JOIN agents a ON cm.from_agent = a.id \ WHERE cm.channel_id = ? ORDER BY cm.created_at ASC LIMIT ?" ).unwrap(); let msgs: Vec = stmt.query_map(params!["ch1", 100], |row| { Ok(BtmsgChannelMessage { id: row.get("id")?, channel_id: row.get("channel_id")?, from_agent: row.get("from_agent")?, content: row.get("content")?, created_at: row.get("created_at")?, sender_name: row.get("sender_name")?, sender_role: row.get("sender_role")?, }) }).unwrap().collect::, _>>().unwrap(); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].sender_name, "Coder"); assert_eq!(msgs[0].sender_role, "developer"); } // ---- serde camelCase serialization ---- #[test] fn test_btmsg_agent_serializes_to_camel_case() { let agent = BtmsgAgent { id: "a1".into(), name: "Test".into(), role: "dev".into(), group_id: "g1".into(), tier: 1, model: None, status: "active".into(), unread_count: 5, }; let json = serde_json::to_value(&agent).unwrap(); // Verify camelCase keys (matches TypeScript interface) assert!(json.get("groupId").is_some(), "expected camelCase 'groupId'"); assert!(json.get("unreadCount").is_some(), "expected camelCase 'unreadCount'"); assert!(json.get("group_id").is_none(), "should not have snake_case 'group_id'"); assert!(json.get("unread_count").is_none(), "should not have snake_case 'unread_count'"); } #[test] fn test_btmsg_message_serializes_to_camel_case() { let msg = BtmsgMessage { id: "m1".into(), from_agent: "a1".into(), to_agent: "a2".into(), content: "hi".into(), read: false, reply_to: None, created_at: "2026-01-01".into(), sender_name: Some("Coder".into()), sender_role: Some("dev".into()), }; let json = serde_json::to_value(&msg).unwrap(); assert!(json.get("fromAgent").is_some(), "expected camelCase 'fromAgent'"); assert!(json.get("toAgent").is_some(), "expected camelCase 'toAgent'"); assert!(json.get("replyTo").is_some(), "expected camelCase 'replyTo'"); assert!(json.get("createdAt").is_some(), "expected camelCase 'createdAt'"); assert!(json.get("senderName").is_some(), "expected camelCase 'senderName'"); assert!(json.get("senderRole").is_some(), "expected camelCase 'senderRole'"); } #[test] fn test_btmsg_feed_message_serializes_to_camel_case() { let msg = BtmsgFeedMessage { id: "m1".into(), from_agent: "a1".into(), to_agent: "a2".into(), content: "hi".into(), created_at: "2026-01-01".into(), reply_to: None, sender_name: "Coder".into(), sender_role: "dev".into(), recipient_name: "Reviewer".into(), recipient_role: "reviewer".into(), }; let json = serde_json::to_value(&msg).unwrap(); assert!(json.get("recipientName").is_some(), "expected camelCase 'recipientName'"); assert!(json.get("recipientRole").is_some(), "expected camelCase 'recipientRole'"); } // ---- Agent registration tests ---- #[test] fn test_upsert_agent_inserts_new() { let conn = test_db(); upsert_agent(&conn, "mgr1", "Manager", "manager", "g1", 1, Some("claude-4"), None, Some("You manage")).unwrap(); let agent = conn.query_row( "SELECT * FROM agents WHERE id = 'mgr1'", [], |row| Ok(( row.get::<_, String>("name").unwrap(), row.get::<_, String>("role").unwrap(), row.get::<_, i32>("tier").unwrap(), row.get::<_, Option>("model").unwrap(), row.get::<_, Option>("system_prompt").unwrap(), )), ).unwrap(); assert_eq!(agent.0, "Manager"); assert_eq!(agent.1, "manager"); assert_eq!(agent.2, 1); assert_eq!(agent.3, Some("claude-4".to_string())); assert_eq!(agent.4, Some("You manage".to_string())); } #[test] fn test_upsert_agent_updates_existing() { let conn = test_db(); upsert_agent(&conn, "a1", "OldName", "manager", "g1", 1, None, None, None).unwrap(); upsert_agent(&conn, "a1", "NewName", "architect", "g1", 1, Some("model-x"), None, None).unwrap(); let (name, role, model): (String, String, Option) = conn.query_row( "SELECT name, role, model FROM agents WHERE id = 'a1'", [], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)), ).unwrap(); assert_eq!(name, "NewName"); assert_eq!(role, "architect"); assert_eq!(model, Some("model-x".to_string())); } #[test] fn test_ensure_review_channels_for_group_creates_both() { let conn = test_db(); ensure_review_channels_for_group(&conn, "g1"); let queue: i64 = conn.query_row( "SELECT COUNT(*) FROM channels WHERE name = 'review-queue' AND group_id = 'g1'", [], |row| row.get(0), ).unwrap(); let log: i64 = conn.query_row( "SELECT COUNT(*) FROM channels WHERE name = 'review-log' AND group_id = 'g1'", [], |row| row.get(0), ).unwrap(); assert_eq!(queue, 1); assert_eq!(log, 1); // Idempotent ensure_review_channels_for_group(&conn, "g1"); let queue2: i64 = conn.query_row( "SELECT COUNT(*) FROM channels WHERE name = 'review-queue' AND group_id = 'g1'", [], |row| row.get(0), ).unwrap(); assert_eq!(queue2, 1); } #[test] fn test_upsert_preserves_status() { let conn = test_db(); upsert_agent(&conn, "a1", "Agent", "manager", "g1", 1, None, None, None).unwrap(); // Manually set status to 'active' conn.execute("UPDATE agents SET status = 'active' WHERE id = 'a1'", []).unwrap(); // Upsert should NOT overwrite status (ON CONFLICT only updates name/role/etc) upsert_agent(&conn, "a1", "Agent Updated", "manager", "g1", 1, None, None, None).unwrap(); let status: String = conn.query_row( "SELECT status FROM agents WHERE id = 'a1'", [], |row| row.get(0), ).unwrap(); assert_eq!(status, "active", "upsert should preserve existing status"); } // ---- Heartbeat tests ---- #[test] fn test_heartbeat_upsert_and_query() { let conn = test_db(); seed_agents(&conn); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs() as i64; // Insert heartbeat for a1 conn.execute( "INSERT INTO heartbeats (agent_id, timestamp) VALUES ('a1', ?1) \ ON CONFLICT(agent_id) DO UPDATE SET timestamp = excluded.timestamp", params![now], ).unwrap(); // Query heartbeats for group let mut stmt = conn.prepare( "SELECT h.agent_id, a.name AS agent_name, a.role AS agent_role, h.timestamp \ FROM heartbeats h JOIN agents a ON h.agent_id = a.id \ WHERE a.group_id = ? ORDER BY h.timestamp DESC" ).unwrap(); let heartbeats: Vec = stmt.query_map(params!["g1"], |row| { Ok(AgentHeartbeat { agent_id: row.get("agent_id")?, agent_name: row.get("agent_name")?, agent_role: row.get("agent_role")?, timestamp: row.get("timestamp")?, }) }).unwrap().collect::, _>>().unwrap(); assert_eq!(heartbeats.len(), 1); assert_eq!(heartbeats[0].agent_id, "a1"); assert_eq!(heartbeats[0].agent_name, "Coder"); assert_eq!(heartbeats[0].timestamp, now); // Upsert overwrites let later = now + 10; conn.execute( "INSERT INTO heartbeats (agent_id, timestamp) VALUES ('a1', ?1) \ ON CONFLICT(agent_id) DO UPDATE SET timestamp = excluded.timestamp", params![later], ).unwrap(); let ts: i64 = conn.query_row( "SELECT timestamp FROM heartbeats WHERE agent_id = 'a1'", [], |row| row.get(0), ).unwrap(); assert_eq!(ts, later); } #[test] fn test_stale_agents_detection() { let conn = test_db(); seed_agents(&conn); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs() as i64; // a1 has a recent heartbeat, a2 has a stale one conn.execute("INSERT INTO heartbeats (agent_id, timestamp) VALUES ('a1', ?)", params![now]).unwrap(); conn.execute("INSERT INTO heartbeats (agent_id, timestamp) VALUES ('a2', ?)", params![now - 600]).unwrap(); // Stale threshold: 300 seconds let cutoff = now - 300; let mut stmt = conn.prepare( "SELECT a.id FROM agents a LEFT JOIN heartbeats h ON a.id = h.agent_id \ WHERE a.group_id = ? AND a.tier > 0 AND (h.timestamp IS NULL OR h.timestamp < ?)" ).unwrap(); let stale: Vec = stmt.query_map(params!["g1", cutoff], |row| { row.get::<_, String>(0) }).unwrap().collect::, _>>().unwrap(); assert_eq!(stale.len(), 1); assert_eq!(stale[0], "a2"); } // ---- Dead letter queue tests ---- #[test] fn test_dead_letter_queue_insert_and_query() { let conn = test_db(); seed_agents(&conn); conn.execute( "INSERT INTO dead_letter_queue (from_agent, to_agent, content, error) VALUES ('a1', 'a2', 'hello', 'stale')", [], ).unwrap(); let mut stmt = conn.prepare( "SELECT d.id, d.from_agent, d.to_agent, d.content, d.error, d.created_at \ FROM dead_letter_queue d JOIN agents a ON d.to_agent = a.id \ WHERE a.group_id = ? ORDER BY d.created_at DESC LIMIT 50" ).unwrap(); let letters: Vec = stmt.query_map(params!["g1"], |row| { Ok(DeadLetter { id: row.get("id")?, from_agent: row.get("from_agent")?, to_agent: row.get("to_agent")?, content: row.get("content")?, error: row.get("error")?, created_at: row.get("created_at")?, }) }).unwrap().collect::, _>>().unwrap(); assert_eq!(letters.len(), 1); assert_eq!(letters[0].from_agent, "a1"); assert_eq!(letters[0].to_agent, "a2"); assert_eq!(letters[0].content, "hello"); assert_eq!(letters[0].error, "stale"); } #[test] fn test_dead_letter_clear() { let conn = test_db(); seed_agents(&conn); conn.execute( "INSERT INTO dead_letter_queue (from_agent, to_agent, content, error) VALUES ('a1', 'a2', 'msg', 'err')", [], ).unwrap(); let count: i64 = conn.query_row( "SELECT COUNT(*) FROM dead_letter_queue", [], |row| row.get(0), ).unwrap(); assert_eq!(count, 1); conn.execute( "DELETE FROM dead_letter_queue WHERE to_agent IN (SELECT id FROM agents WHERE group_id = ?)", params!["g1"], ).unwrap(); let count: i64 = conn.query_row( "SELECT COUNT(*) FROM dead_letter_queue", [], |row| row.get(0), ).unwrap(); assert_eq!(count, 0); } // ---- Audit log tests ---- #[test] fn test_audit_log_insert_and_query() { let conn = test_db(); seed_agents(&conn); conn.execute( "INSERT INTO audit_log (agent_id, event_type, detail) VALUES ('a1', 'status_change', 'Started running')", [], ).unwrap(); conn.execute( "INSERT INTO audit_log (agent_id, event_type, detail) VALUES ('a1', 'wake_event', 'Auto-wake triggered')", [], ).unwrap(); let mut stmt = conn.prepare( "SELECT al.id, al.agent_id, al.event_type, al.detail, al.created_at \ FROM audit_log al JOIN agents a ON al.agent_id = a.id \ WHERE a.group_id = ? ORDER BY al.created_at DESC, al.id DESC LIMIT ? OFFSET ?" ).unwrap(); let entries: Vec = stmt.query_map(params!["g1", 50, 0], |row| { Ok(AuditEntry { id: row.get("id")?, agent_id: row.get("agent_id")?, event_type: row.get("event_type")?, detail: row.get("detail")?, created_at: row.get("created_at")?, }) }).unwrap().collect::, _>>().unwrap(); assert_eq!(entries.len(), 2); // Reverse chronological (by id tiebreaker) — most recent first assert_eq!(entries[0].event_type, "wake_event"); assert_eq!(entries[1].event_type, "status_change"); } #[test] fn test_audit_log_for_agent() { let conn = test_db(); seed_agents(&conn); conn.execute( "INSERT INTO audit_log (agent_id, event_type, detail) VALUES ('a1', 'status_change', 'started')", [], ).unwrap(); conn.execute( "INSERT INTO audit_log (agent_id, event_type, detail) VALUES ('a2', 'btmsg_sent', 'sent msg')", [], ).unwrap(); let mut stmt = conn.prepare( "SELECT id, agent_id, event_type, detail, created_at \ FROM audit_log WHERE agent_id = ? ORDER BY created_at DESC LIMIT ?" ).unwrap(); let entries: Vec = stmt.query_map(params!["a1", 50], |row| { Ok(AuditEntry { id: row.get("id")?, agent_id: row.get("agent_id")?, event_type: row.get("event_type")?, detail: row.get("detail")?, created_at: row.get("created_at")?, }) }).unwrap().collect::, _>>().unwrap(); assert_eq!(entries.len(), 1); assert_eq!(entries[0].agent_id, "a1"); assert_eq!(entries[0].event_type, "status_change"); } #[test] fn test_agent_heartbeat_serializes_to_camel_case() { let hb = AgentHeartbeat { agent_id: "a1".into(), agent_name: "Coder".into(), agent_role: "developer".into(), timestamp: 1234567890, }; let json = serde_json::to_value(&hb).unwrap(); assert!(json.get("agentId").is_some(), "expected camelCase 'agentId'"); assert!(json.get("agentName").is_some(), "expected camelCase 'agentName'"); assert!(json.get("agentRole").is_some(), "expected camelCase 'agentRole'"); assert!(json.get("agent_id").is_none(), "should not have snake_case 'agent_id'"); } #[test] fn test_dead_letter_serializes_to_camel_case() { let dl = DeadLetter { id: 1, from_agent: "a1".into(), to_agent: "a2".into(), content: "hello".into(), error: "stale".into(), created_at: "2026-01-01".into(), }; let json = serde_json::to_value(&dl).unwrap(); assert!(json.get("fromAgent").is_some(), "expected camelCase 'fromAgent'"); assert!(json.get("toAgent").is_some(), "expected camelCase 'toAgent'"); assert!(json.get("createdAt").is_some(), "expected camelCase 'createdAt'"); } #[test] fn test_audit_entry_serializes_to_camel_case() { let entry = AuditEntry { id: 1, agent_id: "a1".into(), event_type: "status_change".into(), detail: "started".into(), created_at: "2026-01-01".into(), }; let json = serde_json::to_value(&entry).unwrap(); assert!(json.get("agentId").is_some(), "expected camelCase 'agentId'"); assert!(json.get("eventType").is_some(), "expected camelCase 'eventType'"); assert!(json.get("createdAt").is_some(), "expected camelCase 'createdAt'"); assert!(json.get("agent_id").is_none(), "should not have snake_case"); } // ---- WAL checkpoint tests ---- #[test] fn test_checkpoint_wal_on_temp_database() { let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("test_wal.db"); // Create a WAL-mode database with some data { let conn = Connection::open(&db_path).unwrap(); conn.query_row("PRAGMA journal_mode=WAL", [], |_| Ok(())).unwrap(); conn.execute_batch("CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT);").unwrap(); for i in 0..100 { conn.execute("INSERT INTO t (val) VALUES (?)", params![format!("row-{i}")]).unwrap(); } // Keep connection open while we checkpoint from another connection // to simulate the real scenario (app holds a conn, background task checkpoints) // Run checkpoint from a separate connection (like the background task does) let result = crate::checkpoint_wal(&db_path); assert!(result.is_ok(), "checkpoint_wal should succeed: {:?}", result); } // Verify data is still intact after checkpoint let conn = Connection::open(&db_path).unwrap(); let count: i64 = conn.query_row("SELECT COUNT(*) FROM t", [], |row| row.get(0)).unwrap(); assert_eq!(count, 100, "all rows should survive checkpoint"); } #[test] fn test_checkpoint_wal_nonexistent_db_is_ok() { let result = crate::checkpoint_wal(std::path::Path::new("/tmp/nonexistent_agor_test.db")); assert!(result.is_ok(), "checkpoint_wal should return Ok for missing DB"); } #[test] fn test_contact_permissions_bidirectional() { let conn = test_db(); upsert_agent(&conn, "mgr", "Manager", "manager", "g1", 1, None, None, None).unwrap(); upsert_agent(&conn, "arch", "Architect", "architect", "g1", 1, None, None, None).unwrap(); // Set up bidirectional contacts (simulating what register_group_agents does) conn.execute("INSERT OR IGNORE INTO contacts (agent_id, contact_id) VALUES ('mgr', 'arch')", []).unwrap(); conn.execute("INSERT OR IGNORE INTO contacts (agent_id, contact_id) VALUES ('arch', 'mgr')", []).unwrap(); let fwd: bool = conn.query_row( "SELECT COUNT(*) > 0 FROM contacts WHERE agent_id = 'mgr' AND contact_id = 'arch'", [], |row| row.get(0), ).unwrap(); let rev: bool = conn.query_row( "SELECT COUNT(*) > 0 FROM contacts WHERE agent_id = 'arch' AND contact_id = 'mgr'", [], |row| row.get(0), ).unwrap(); assert!(fwd, "forward contact should exist"); assert!(rev, "reverse contact should exist"); } }