feat: add agent health monitoring, audit log, and dead letter queue

heartbeats + dead_letter_queue + audit_log tables in btmsg.db. 15s
heartbeat polling in ProjectBox, stale detection, ProjectHeader heart
indicator. AuditLogTab for Manager. register_agents_from_groups() with
bidirectional contacts and review channel creation.
This commit is contained in:
Hibryda 2026-03-12 04:57:29 +01:00
parent b2932273ba
commit 5c31668760
10 changed files with 1624 additions and 4 deletions

View file

@ -31,7 +31,7 @@ fn open_db() -> Result<Connection, String> {
}
let conn = Connection::open_with_flags(&path, OpenFlags::SQLITE_OPEN_READ_WRITE)
.map_err(|e| format!("Failed to open btmsg.db: {e}"))?;
conn.pragma_update(None, "journal_mode", "WAL")
conn.query_row("PRAGMA journal_mode=WAL", [], |_| Ok(()))
.map_err(|e| format!("Failed to set WAL mode: {e}"))?;
conn.pragma_update(None, "busy_timeout", 5000)
.map_err(|e| format!("Failed to set busy_timeout: {e}"))?;
@ -188,6 +188,9 @@ pub fn history(agent_id: &str, other_id: &str, limit: i32) -> Result<Vec<BtmsgMe
msgs.collect::<Result<Vec<_>, _>>().map_err(|e| format!("Row error: {e}"))
}
/// Default heartbeat staleness threshold: 5 minutes
const STALE_HEARTBEAT_SECS: i64 = 300;
pub fn send_message(from_agent: &str, to_agent: &str, content: &str) -> Result<String, String> {
let db = open_db()?;
@ -211,6 +214,41 @@ pub fn send_message(from_agent: &str, to_agent: &str, content: &str) -> Result<S
}
}
// Check if recipient is stale (no heartbeat in 5 min) — route to dead letter queue
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64;
let cutoff = now - STALE_HEARTBEAT_SECS;
let recipient_stale: bool = db
.query_row(
"SELECT COALESCE(h.timestamp, 0) < ?1 FROM agents a \
LEFT JOIN heartbeats h ON a.id = h.agent_id \
WHERE a.id = ?2",
params![cutoff, to_agent],
|row| row.get(0),
)
.unwrap_or(false);
if recipient_stale {
// Queue to dead letter instead of delivering
let error_msg = format!("Recipient '{}' is stale (no heartbeat in {} seconds)", to_agent, STALE_HEARTBEAT_SECS);
db.execute(
"INSERT INTO dead_letter_queue (from_agent, to_agent, content, error) VALUES (?1, ?2, ?3, ?4)",
params![from_agent, to_agent, content, error_msg],
)
.map_err(|e| format!("Dead letter insert error: {e}"))?;
// Also log audit event
let _ = db.execute(
"INSERT INTO audit_log (agent_id, event_type, detail) VALUES (?1, 'dead_letter', ?2)",
params![from_agent, format!("Message to '{}' routed to dead letter queue: {}", to_agent, error_msg)],
);
return Err(error_msg);
}
let msg_id = uuid::Uuid::new_v4().to_string();
db.execute(
"INSERT INTO messages (id, from_agent, to_agent, content, group_id) VALUES (?1, ?2, ?3, ?4, ?5)",
@ -270,6 +308,540 @@ pub fn ensure_admin(group_id: &str) -> Result<(), String> {
Ok(())
}
/// Register all agents from groups.json into the btmsg database.
/// Creates/updates agent records and sets up contact permissions:
/// - All Tier 1 agents get bidirectional contacts with each other
/// - Manager gets contacts with ALL agents (Tier 1 + Tier 2)
/// - Other Tier 1 agents get contacts with Manager
/// Also ensures admin agent and review channels exist.
pub fn register_agents_from_groups(groups: &crate::groups::GroupsFile) -> Result<(), String> {
for group in &groups.groups {
register_group_agents(group)?;
}
Ok(())
}
/// Register all agents for a single group.
fn register_group_agents(group: &crate::groups::GroupConfig) -> Result<(), String> {
let db = open_db_or_create()?;
let group_id = &group.id;
// Collect agent IDs by tier for contact setup
let mut tier1_ids: Vec<String> = Vec::new();
let mut tier2_ids: Vec<String> = Vec::new();
let mut manager_id: Option<String> = None;
// Register Tier 1 agents (from agents array)
for agent in &group.agents {
let tier = match agent.role.as_str() {
"manager" | "architect" | "tester" | "reviewer" => 1,
_ => 2,
};
upsert_agent(
&db,
&agent.id,
&agent.name,
&agent.role,
group_id,
tier,
agent.model.as_deref(),
agent.cwd.as_deref(),
agent.system_prompt.as_deref(),
)?;
if tier == 1 {
tier1_ids.push(agent.id.clone());
if agent.role == "manager" {
manager_id = Some(agent.id.clone());
}
} else {
tier2_ids.push(agent.id.clone());
}
}
// Register Tier 2 agents (from projects array)
for project in &group.projects {
upsert_agent(
&db,
&project.id,
&project.name,
"project",
group_id,
2,
None,
Some(&project.cwd),
None,
)?;
tier2_ids.push(project.id.clone());
}
// Set up contact permissions
// All Tier 1 agents: bidirectional contacts with each other
for i in 0..tier1_ids.len() {
for j in (i + 1)..tier1_ids.len() {
db.execute(
"INSERT OR IGNORE INTO contacts (agent_id, contact_id) VALUES (?1, ?2)",
params![tier1_ids[i], tier1_ids[j]],
).map_err(|e| format!("Contact insert error: {e}"))?;
db.execute(
"INSERT OR IGNORE INTO contacts (agent_id, contact_id) VALUES (?1, ?2)",
params![tier1_ids[j], tier1_ids[i]],
).map_err(|e| format!("Contact insert error: {e}"))?;
}
}
// Manager gets contacts with ALL Tier 2 agents (bidirectional)
if let Some(ref mgr_id) = manager_id {
for t2_id in &tier2_ids {
db.execute(
"INSERT OR IGNORE INTO contacts (agent_id, contact_id) VALUES (?1, ?2)",
params![mgr_id, t2_id],
).map_err(|e| format!("Contact insert error: {e}"))?;
db.execute(
"INSERT OR IGNORE INTO contacts (agent_id, contact_id) VALUES (?1, ?2)",
params![t2_id, mgr_id],
).map_err(|e| format!("Contact insert error: {e}"))?;
}
}
// Ensure review channels exist
ensure_review_channels_for_group(&db, group_id);
// Drop the connection before calling ensure_admin (which opens its own)
drop(db);
// Ensure admin agent exists with contacts to all agents
let _ = ensure_admin(group_id);
Ok(())
}
fn upsert_agent(
db: &Connection,
id: &str,
name: &str,
role: &str,
group_id: &str,
tier: i32,
model: Option<&str>,
cwd: Option<&str>,
system_prompt: Option<&str>,
) -> Result<(), String> {
db.execute(
"INSERT INTO agents (id, name, role, group_id, tier, model, cwd, system_prompt)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
ON CONFLICT(id) DO UPDATE SET
name = excluded.name,
role = excluded.role,
group_id = excluded.group_id,
tier = excluded.tier,
model = excluded.model,
cwd = excluded.cwd,
system_prompt = excluded.system_prompt",
params![id, name, role, group_id, tier, model, cwd, system_prompt],
)
.map_err(|e| format!("Upsert agent error: {e}"))?;
Ok(())
}
/// Ensure #review-queue and #review-log channels exist for a group (public wrapper).
fn ensure_review_channels_for_group(db: &Connection, group_id: &str) {
for name in &["review-queue", "review-log"] {
let exists: bool = db
.query_row(
"SELECT COUNT(*) > 0 FROM channels WHERE name = ?1 AND group_id = ?2",
params![name, group_id],
|row| row.get(0),
)
.unwrap_or(false);
if !exists {
let id = uuid::Uuid::new_v4().to_string();
let _ = db.execute(
"INSERT INTO channels (id, name, group_id, created_by) VALUES (?1, ?2, ?3, 'system')",
params![id, name, group_id],
);
}
}
}
/// Open btmsg database, creating it with schema if it doesn't exist.
fn open_db_or_create() -> Result<Connection, String> {
let path = db_path();
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("Failed to create data dir: {e}"))?;
}
let conn = Connection::open_with_flags(
&path,
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
)
.map_err(|e| format!("Failed to open/create btmsg.db: {e}"))?;
conn.query_row("PRAGMA journal_mode=WAL", [], |_| Ok(()))
.map_err(|e| format!("Failed to set WAL mode: {e}"))?;
conn.pragma_update(None, "busy_timeout", 5000)
.map_err(|e| format!("Failed to set busy_timeout: {e}"))?;
// Create tables if they don't exist (same schema as Python btmsg CLI)
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS agents (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
role TEXT NOT NULL,
group_id TEXT NOT NULL,
tier INTEGER NOT NULL DEFAULT 2,
model TEXT,
cwd TEXT,
system_prompt TEXT,
status TEXT DEFAULT 'stopped',
last_active_at TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS contacts (
agent_id TEXT NOT NULL,
contact_id TEXT NOT NULL,
PRIMARY KEY (agent_id, contact_id),
FOREIGN KEY (agent_id) REFERENCES agents(id),
FOREIGN KEY (contact_id) REFERENCES agents(id)
);
CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY,
from_agent TEXT NOT NULL,
to_agent TEXT NOT NULL,
content TEXT NOT NULL,
read INTEGER DEFAULT 0,
reply_to TEXT,
group_id TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (from_agent) REFERENCES agents(id),
FOREIGN KEY (to_agent) REFERENCES agents(id)
);
CREATE INDEX IF NOT EXISTS idx_messages_to ON messages(to_agent, read);
CREATE INDEX IF NOT EXISTS idx_messages_from ON messages(from_agent);
CREATE INDEX IF NOT EXISTS idx_messages_group ON messages(group_id);
CREATE INDEX IF NOT EXISTS idx_messages_reply ON messages(reply_to);
CREATE TABLE IF NOT EXISTS channels (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
group_id TEXT NOT NULL,
created_by TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS channel_members (
channel_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
joined_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (channel_id, agent_id),
FOREIGN KEY (channel_id) REFERENCES channels(id),
FOREIGN KEY (agent_id) REFERENCES agents(id)
);
CREATE TABLE IF NOT EXISTS channel_messages (
id TEXT PRIMARY KEY,
channel_id TEXT NOT NULL,
from_agent TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (channel_id) REFERENCES channels(id),
FOREIGN KEY (from_agent) REFERENCES agents(id)
);
CREATE INDEX IF NOT EXISTS idx_channel_messages ON channel_messages(channel_id, created_at);
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT DEFAULT '',
status TEXT DEFAULT 'todo',
priority TEXT DEFAULT 'medium',
assigned_to TEXT,
created_by TEXT NOT NULL,
group_id TEXT NOT NULL,
parent_task_id TEXT,
sort_order INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (assigned_to) REFERENCES agents(id),
FOREIGN KEY (created_by) REFERENCES agents(id)
);
CREATE TABLE IF NOT EXISTS task_comments (
id TEXT PRIMARY KEY,
task_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (task_id) REFERENCES tasks(id),
FOREIGN KEY (agent_id) REFERENCES agents(id)
);
CREATE INDEX IF NOT EXISTS idx_tasks_group ON tasks(group_id);
CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status);
CREATE INDEX IF NOT EXISTS idx_tasks_assigned ON tasks(assigned_to);
CREATE INDEX IF NOT EXISTS idx_task_comments_task ON task_comments(task_id);
CREATE TABLE IF NOT EXISTS heartbeats (
agent_id TEXT PRIMARY KEY,
timestamp INTEGER NOT NULL,
FOREIGN KEY (agent_id) REFERENCES agents(id)
);
CREATE TABLE IF NOT EXISTS dead_letter_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
from_agent TEXT NOT NULL,
to_agent TEXT NOT NULL,
content TEXT NOT NULL,
error TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (from_agent) REFERENCES agents(id),
FOREIGN KEY (to_agent) REFERENCES agents(id)
);
CREATE INDEX IF NOT EXISTS idx_dead_letter_from ON dead_letter_queue(from_agent);
CREATE INDEX IF NOT EXISTS idx_dead_letter_to ON dead_letter_queue(to_agent);
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_id TEXT NOT NULL,
event_type TEXT NOT NULL,
detail TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (agent_id) REFERENCES agents(id)
);
CREATE INDEX IF NOT EXISTS idx_audit_log_agent ON audit_log(agent_id);
CREATE INDEX IF NOT EXISTS idx_audit_log_type ON audit_log(event_type);"
).map_err(|e| format!("Schema creation error: {e}"))?;
Ok(conn)
}
// ---- Heartbeat monitoring ----
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct AgentHeartbeat {
pub agent_id: String,
pub agent_name: String,
pub agent_role: String,
pub timestamp: i64,
}
pub fn record_heartbeat(agent_id: &str) -> Result<(), String> {
let db = open_db()?;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| format!("Time error: {e}"))?
.as_secs() as i64;
db.execute(
"INSERT INTO heartbeats (agent_id, timestamp) VALUES (?1, ?2) \
ON CONFLICT(agent_id) DO UPDATE SET timestamp = excluded.timestamp",
params![agent_id, now],
)
.map_err(|e| format!("Heartbeat upsert error: {e}"))?;
Ok(())
}
pub fn get_agent_heartbeats(group_id: &str) -> Result<Vec<AgentHeartbeat>, String> {
let db = open_db()?;
let mut stmt = db
.prepare(
"SELECT h.agent_id, a.name AS agent_name, a.role AS agent_role, h.timestamp \
FROM heartbeats h JOIN agents a ON h.agent_id = a.id \
WHERE a.group_id = ? ORDER BY h.timestamp DESC",
)
.map_err(|e| format!("Query error: {e}"))?;
let rows = stmt
.query_map(params![group_id], |row| {
Ok(AgentHeartbeat {
agent_id: row.get("agent_id")?,
agent_name: row.get("agent_name")?,
agent_role: row.get("agent_role")?,
timestamp: row.get("timestamp")?,
})
})
.map_err(|e| format!("Query error: {e}"))?;
rows.collect::<Result<Vec<_>, _>>()
.map_err(|e| format!("Row error: {e}"))
}
pub fn get_stale_agents(group_id: &str, threshold_secs: i64) -> Result<Vec<String>, String> {
let db = open_db()?;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| format!("Time error: {e}"))?
.as_secs() as i64;
let cutoff = now - threshold_secs;
// Agents in the group that either have no heartbeat or heartbeat older than cutoff
let mut stmt = db
.prepare(
"SELECT a.id FROM agents a LEFT JOIN heartbeats h ON a.id = h.agent_id \
WHERE a.group_id = ? AND a.tier > 0 AND (h.timestamp IS NULL OR h.timestamp < ?)",
)
.map_err(|e| format!("Query error: {e}"))?;
let ids = stmt
.query_map(params![group_id, cutoff], |row| row.get::<_, String>(0))
.map_err(|e| format!("Query error: {e}"))?;
ids.collect::<Result<Vec<_>, _>>()
.map_err(|e| format!("Row error: {e}"))
}
// ---- Dead letter queue ----
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DeadLetter {
pub id: i64,
pub from_agent: String,
pub to_agent: String,
pub content: String,
pub error: String,
pub created_at: String,
}
pub fn queue_dead_letter(
from_agent: &str,
to_agent: &str,
content: &str,
error: &str,
) -> Result<(), String> {
let db = open_db()?;
db.execute(
"INSERT INTO dead_letter_queue (from_agent, to_agent, content, error) VALUES (?1, ?2, ?3, ?4)",
params![from_agent, to_agent, content, error],
)
.map_err(|e| format!("Dead letter insert error: {e}"))?;
Ok(())
}
pub fn get_dead_letters(group_id: &str, limit: i32) -> Result<Vec<DeadLetter>, String> {
let db = open_db()?;
let mut stmt = db
.prepare(
"SELECT d.id, d.from_agent, d.to_agent, d.content, d.error, d.created_at \
FROM dead_letter_queue d \
JOIN agents a ON d.to_agent = a.id \
WHERE a.group_id = ? \
ORDER BY d.created_at DESC LIMIT ?",
)
.map_err(|e| format!("Query error: {e}"))?;
let rows = stmt
.query_map(params![group_id, limit], |row| {
Ok(DeadLetter {
id: row.get("id")?,
from_agent: row.get("from_agent")?,
to_agent: row.get("to_agent")?,
content: row.get("content")?,
error: row.get("error")?,
created_at: row.get("created_at")?,
})
})
.map_err(|e| format!("Query error: {e}"))?;
rows.collect::<Result<Vec<_>, _>>()
.map_err(|e| format!("Row error: {e}"))
}
pub fn clear_dead_letters(group_id: &str) -> Result<(), String> {
let db = open_db()?;
db.execute(
"DELETE FROM dead_letter_queue WHERE to_agent IN (SELECT id FROM agents WHERE group_id = ?)",
params![group_id],
)
.map_err(|e| format!("Delete error: {e}"))?;
Ok(())
}
// ---- Audit log ----
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct AuditEntry {
pub id: i64,
pub agent_id: String,
pub event_type: String,
pub detail: String,
pub created_at: String,
}
pub fn log_audit_event(agent_id: &str, event_type: &str, detail: &str) -> Result<(), String> {
let db = open_db_or_create()?;
db.execute(
"INSERT INTO audit_log (agent_id, event_type, detail) VALUES (?1, ?2, ?3)",
params![agent_id, event_type, detail],
)
.map_err(|e| format!("Audit log insert error: {e}"))?;
Ok(())
}
pub fn get_audit_log(group_id: &str, limit: i32, offset: i32) -> Result<Vec<AuditEntry>, String> {
let db = open_db()?;
let mut stmt = db
.prepare(
"SELECT al.id, al.agent_id, al.event_type, al.detail, al.created_at \
FROM audit_log al \
JOIN agents a ON al.agent_id = a.id \
WHERE a.group_id = ? \
ORDER BY al.created_at DESC, al.id DESC LIMIT ? OFFSET ?",
)
.map_err(|e| format!("Query error: {e}"))?;
let rows = stmt
.query_map(params![group_id, limit, offset], |row| {
Ok(AuditEntry {
id: row.get("id")?,
agent_id: row.get("agent_id")?,
event_type: row.get("event_type")?,
detail: row.get("detail")?,
created_at: row.get("created_at")?,
})
})
.map_err(|e| format!("Query error: {e}"))?;
rows.collect::<Result<Vec<_>, _>>()
.map_err(|e| format!("Row error: {e}"))
}
pub fn get_audit_log_for_agent(
agent_id: &str,
limit: i32,
) -> Result<Vec<AuditEntry>, String> {
let db = open_db()?;
let mut stmt = db
.prepare(
"SELECT id, agent_id, event_type, detail, created_at \
FROM audit_log WHERE agent_id = ? ORDER BY created_at DESC, id DESC LIMIT ?",
)
.map_err(|e| format!("Query error: {e}"))?;
let rows = stmt
.query_map(params![agent_id, limit], |row| {
Ok(AuditEntry {
id: row.get("id")?,
agent_id: row.get("agent_id")?,
event_type: row.get("event_type")?,
detail: row.get("detail")?,
created_at: row.get("created_at")?,
})
})
.map_err(|e| format!("Query error: {e}"))?;
rows.collect::<Result<Vec<_>, _>>()
.map_err(|e| format!("Row error: {e}"))
}
pub fn all_feed(group_id: &str, limit: i32) -> Result<Vec<BtmsgFeedMessage>, String> {
let db = open_db()?;
let mut stmt = db.prepare(
@ -476,6 +1048,25 @@ mod tests {
from_agent TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE heartbeats (
agent_id TEXT PRIMARY KEY,
timestamp INTEGER NOT NULL
);
CREATE TABLE dead_letter_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
from_agent TEXT NOT NULL,
to_agent TEXT NOT NULL,
content TEXT NOT NULL,
error TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_id TEXT NOT NULL,
event_type TEXT NOT NULL,
detail TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);",
)
.unwrap();
@ -785,4 +1376,373 @@ mod tests {
assert!(json.get("recipientName").is_some(), "expected camelCase 'recipientName'");
assert!(json.get("recipientRole").is_some(), "expected camelCase 'recipientRole'");
}
// ---- Agent registration tests ----
#[test]
fn test_upsert_agent_inserts_new() {
let conn = test_db();
upsert_agent(&conn, "mgr1", "Manager", "manager", "g1", 1, Some("claude-4"), None, Some("You manage")).unwrap();
let agent = conn.query_row(
"SELECT * FROM agents WHERE id = 'mgr1'", [],
|row| Ok((
row.get::<_, String>("name").unwrap(),
row.get::<_, String>("role").unwrap(),
row.get::<_, i32>("tier").unwrap(),
row.get::<_, Option<String>>("model").unwrap(),
row.get::<_, Option<String>>("system_prompt").unwrap(),
)),
).unwrap();
assert_eq!(agent.0, "Manager");
assert_eq!(agent.1, "manager");
assert_eq!(agent.2, 1);
assert_eq!(agent.3, Some("claude-4".to_string()));
assert_eq!(agent.4, Some("You manage".to_string()));
}
#[test]
fn test_upsert_agent_updates_existing() {
let conn = test_db();
upsert_agent(&conn, "a1", "OldName", "manager", "g1", 1, None, None, None).unwrap();
upsert_agent(&conn, "a1", "NewName", "architect", "g1", 1, Some("model-x"), None, None).unwrap();
let (name, role, model): (String, String, Option<String>) = conn.query_row(
"SELECT name, role, model FROM agents WHERE id = 'a1'", [],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
).unwrap();
assert_eq!(name, "NewName");
assert_eq!(role, "architect");
assert_eq!(model, Some("model-x".to_string()));
}
#[test]
fn test_ensure_review_channels_for_group_creates_both() {
let conn = test_db();
ensure_review_channels_for_group(&conn, "g1");
let queue: i64 = conn.query_row(
"SELECT COUNT(*) FROM channels WHERE name = 'review-queue' AND group_id = 'g1'",
[], |row| row.get(0),
).unwrap();
let log: i64 = conn.query_row(
"SELECT COUNT(*) FROM channels WHERE name = 'review-log' AND group_id = 'g1'",
[], |row| row.get(0),
).unwrap();
assert_eq!(queue, 1);
assert_eq!(log, 1);
// Idempotent
ensure_review_channels_for_group(&conn, "g1");
let queue2: i64 = conn.query_row(
"SELECT COUNT(*) FROM channels WHERE name = 'review-queue' AND group_id = 'g1'",
[], |row| row.get(0),
).unwrap();
assert_eq!(queue2, 1);
}
#[test]
fn test_upsert_preserves_status() {
let conn = test_db();
upsert_agent(&conn, "a1", "Agent", "manager", "g1", 1, None, None, None).unwrap();
// Manually set status to 'active'
conn.execute("UPDATE agents SET status = 'active' WHERE id = 'a1'", []).unwrap();
// Upsert should NOT overwrite status (ON CONFLICT only updates name/role/etc)
upsert_agent(&conn, "a1", "Agent Updated", "manager", "g1", 1, None, None, None).unwrap();
let status: String = conn.query_row(
"SELECT status FROM agents WHERE id = 'a1'", [],
|row| row.get(0),
).unwrap();
assert_eq!(status, "active", "upsert should preserve existing status");
}
// ---- Heartbeat tests ----
#[test]
fn test_heartbeat_upsert_and_query() {
let conn = test_db();
seed_agents(&conn);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
// Insert heartbeat for a1
conn.execute(
"INSERT INTO heartbeats (agent_id, timestamp) VALUES ('a1', ?1) \
ON CONFLICT(agent_id) DO UPDATE SET timestamp = excluded.timestamp",
params![now],
).unwrap();
// Query heartbeats for group
let mut stmt = conn.prepare(
"SELECT h.agent_id, a.name AS agent_name, a.role AS agent_role, h.timestamp \
FROM heartbeats h JOIN agents a ON h.agent_id = a.id \
WHERE a.group_id = ? ORDER BY h.timestamp DESC"
).unwrap();
let heartbeats: Vec<AgentHeartbeat> = stmt.query_map(params!["g1"], |row| {
Ok(AgentHeartbeat {
agent_id: row.get("agent_id")?,
agent_name: row.get("agent_name")?,
agent_role: row.get("agent_role")?,
timestamp: row.get("timestamp")?,
})
}).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(heartbeats.len(), 1);
assert_eq!(heartbeats[0].agent_id, "a1");
assert_eq!(heartbeats[0].agent_name, "Coder");
assert_eq!(heartbeats[0].timestamp, now);
// Upsert overwrites
let later = now + 10;
conn.execute(
"INSERT INTO heartbeats (agent_id, timestamp) VALUES ('a1', ?1) \
ON CONFLICT(agent_id) DO UPDATE SET timestamp = excluded.timestamp",
params![later],
).unwrap();
let ts: i64 = conn.query_row(
"SELECT timestamp FROM heartbeats WHERE agent_id = 'a1'", [],
|row| row.get(0),
).unwrap();
assert_eq!(ts, later);
}
#[test]
fn test_stale_agents_detection() {
let conn = test_db();
seed_agents(&conn);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
// a1 has a recent heartbeat, a2 has a stale one
conn.execute("INSERT INTO heartbeats (agent_id, timestamp) VALUES ('a1', ?)", params![now]).unwrap();
conn.execute("INSERT INTO heartbeats (agent_id, timestamp) VALUES ('a2', ?)", params![now - 600]).unwrap();
// Stale threshold: 300 seconds
let cutoff = now - 300;
let mut stmt = conn.prepare(
"SELECT a.id FROM agents a LEFT JOIN heartbeats h ON a.id = h.agent_id \
WHERE a.group_id = ? AND a.tier > 0 AND (h.timestamp IS NULL OR h.timestamp < ?)"
).unwrap();
let stale: Vec<String> = stmt.query_map(params!["g1", cutoff], |row| {
row.get::<_, String>(0)
}).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(stale.len(), 1);
assert_eq!(stale[0], "a2");
}
// ---- Dead letter queue tests ----
#[test]
fn test_dead_letter_queue_insert_and_query() {
let conn = test_db();
seed_agents(&conn);
conn.execute(
"INSERT INTO dead_letter_queue (from_agent, to_agent, content, error) VALUES ('a1', 'a2', 'hello', 'stale')",
[],
).unwrap();
let mut stmt = conn.prepare(
"SELECT d.id, d.from_agent, d.to_agent, d.content, d.error, d.created_at \
FROM dead_letter_queue d JOIN agents a ON d.to_agent = a.id \
WHERE a.group_id = ? ORDER BY d.created_at DESC LIMIT 50"
).unwrap();
let letters: Vec<DeadLetter> = stmt.query_map(params!["g1"], |row| {
Ok(DeadLetter {
id: row.get("id")?,
from_agent: row.get("from_agent")?,
to_agent: row.get("to_agent")?,
content: row.get("content")?,
error: row.get("error")?,
created_at: row.get("created_at")?,
})
}).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(letters.len(), 1);
assert_eq!(letters[0].from_agent, "a1");
assert_eq!(letters[0].to_agent, "a2");
assert_eq!(letters[0].content, "hello");
assert_eq!(letters[0].error, "stale");
}
#[test]
fn test_dead_letter_clear() {
let conn = test_db();
seed_agents(&conn);
conn.execute(
"INSERT INTO dead_letter_queue (from_agent, to_agent, content, error) VALUES ('a1', 'a2', 'msg', 'err')",
[],
).unwrap();
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM dead_letter_queue", [], |row| row.get(0),
).unwrap();
assert_eq!(count, 1);
conn.execute(
"DELETE FROM dead_letter_queue WHERE to_agent IN (SELECT id FROM agents WHERE group_id = ?)",
params!["g1"],
).unwrap();
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM dead_letter_queue", [], |row| row.get(0),
).unwrap();
assert_eq!(count, 0);
}
// ---- Audit log tests ----
#[test]
fn test_audit_log_insert_and_query() {
let conn = test_db();
seed_agents(&conn);
conn.execute(
"INSERT INTO audit_log (agent_id, event_type, detail) VALUES ('a1', 'status_change', 'Started running')",
[],
).unwrap();
conn.execute(
"INSERT INTO audit_log (agent_id, event_type, detail) VALUES ('a1', 'wake_event', 'Auto-wake triggered')",
[],
).unwrap();
let mut stmt = conn.prepare(
"SELECT al.id, al.agent_id, al.event_type, al.detail, al.created_at \
FROM audit_log al JOIN agents a ON al.agent_id = a.id \
WHERE a.group_id = ? ORDER BY al.created_at DESC, al.id DESC LIMIT ? OFFSET ?"
).unwrap();
let entries: Vec<AuditEntry> = stmt.query_map(params!["g1", 50, 0], |row| {
Ok(AuditEntry {
id: row.get("id")?,
agent_id: row.get("agent_id")?,
event_type: row.get("event_type")?,
detail: row.get("detail")?,
created_at: row.get("created_at")?,
})
}).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(entries.len(), 2);
// Reverse chronological (by id tiebreaker) — most recent first
assert_eq!(entries[0].event_type, "wake_event");
assert_eq!(entries[1].event_type, "status_change");
}
#[test]
fn test_audit_log_for_agent() {
let conn = test_db();
seed_agents(&conn);
conn.execute(
"INSERT INTO audit_log (agent_id, event_type, detail) VALUES ('a1', 'status_change', 'started')",
[],
).unwrap();
conn.execute(
"INSERT INTO audit_log (agent_id, event_type, detail) VALUES ('a2', 'btmsg_sent', 'sent msg')",
[],
).unwrap();
let mut stmt = conn.prepare(
"SELECT id, agent_id, event_type, detail, created_at \
FROM audit_log WHERE agent_id = ? ORDER BY created_at DESC LIMIT ?"
).unwrap();
let entries: Vec<AuditEntry> = stmt.query_map(params!["a1", 50], |row| {
Ok(AuditEntry {
id: row.get("id")?,
agent_id: row.get("agent_id")?,
event_type: row.get("event_type")?,
detail: row.get("detail")?,
created_at: row.get("created_at")?,
})
}).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].agent_id, "a1");
assert_eq!(entries[0].event_type, "status_change");
}
#[test]
fn test_agent_heartbeat_serializes_to_camel_case() {
let hb = AgentHeartbeat {
agent_id: "a1".into(),
agent_name: "Coder".into(),
agent_role: "developer".into(),
timestamp: 1234567890,
};
let json = serde_json::to_value(&hb).unwrap();
assert!(json.get("agentId").is_some(), "expected camelCase 'agentId'");
assert!(json.get("agentName").is_some(), "expected camelCase 'agentName'");
assert!(json.get("agentRole").is_some(), "expected camelCase 'agentRole'");
assert!(json.get("agent_id").is_none(), "should not have snake_case 'agent_id'");
}
#[test]
fn test_dead_letter_serializes_to_camel_case() {
let dl = DeadLetter {
id: 1,
from_agent: "a1".into(),
to_agent: "a2".into(),
content: "hello".into(),
error: "stale".into(),
created_at: "2026-01-01".into(),
};
let json = serde_json::to_value(&dl).unwrap();
assert!(json.get("fromAgent").is_some(), "expected camelCase 'fromAgent'");
assert!(json.get("toAgent").is_some(), "expected camelCase 'toAgent'");
assert!(json.get("createdAt").is_some(), "expected camelCase 'createdAt'");
}
#[test]
fn test_audit_entry_serializes_to_camel_case() {
let entry = AuditEntry {
id: 1,
agent_id: "a1".into(),
event_type: "status_change".into(),
detail: "started".into(),
created_at: "2026-01-01".into(),
};
let json = serde_json::to_value(&entry).unwrap();
assert!(json.get("agentId").is_some(), "expected camelCase 'agentId'");
assert!(json.get("eventType").is_some(), "expected camelCase 'eventType'");
assert!(json.get("createdAt").is_some(), "expected camelCase 'createdAt'");
assert!(json.get("agent_id").is_none(), "should not have snake_case");
}
#[test]
fn test_contact_permissions_bidirectional() {
let conn = test_db();
upsert_agent(&conn, "mgr", "Manager", "manager", "g1", 1, None, None, None).unwrap();
upsert_agent(&conn, "arch", "Architect", "architect", "g1", 1, None, None, None).unwrap();
// Set up bidirectional contacts (simulating what register_group_agents does)
conn.execute("INSERT OR IGNORE INTO contacts (agent_id, contact_id) VALUES ('mgr', 'arch')", []).unwrap();
conn.execute("INSERT OR IGNORE INTO contacts (agent_id, contact_id) VALUES ('arch', 'mgr')", []).unwrap();
let fwd: bool = conn.query_row(
"SELECT COUNT(*) > 0 FROM contacts WHERE agent_id = 'mgr' AND contact_id = 'arch'",
[], |row| row.get(0),
).unwrap();
let rev: bool = conn.query_row(
"SELECT COUNT(*) > 0 FROM contacts WHERE agent_id = 'arch' AND contact_id = 'mgr'",
[], |row| row.get(0),
).unwrap();
assert!(fwd, "forward contact should exist");
assert!(rev, "reverse contact should exist");
}
}

View file

@ -1,4 +1,5 @@
use crate::btmsg;
use crate::groups;
#[tauri::command]
pub fn btmsg_get_agents(group_id: String) -> Result<Vec<btmsg::BtmsgAgent>, String> {
@ -69,3 +70,51 @@ pub fn btmsg_create_channel(name: String, group_id: String, created_by: String)
pub fn btmsg_add_channel_member(channel_id: String, agent_id: String) -> Result<(), String> {
btmsg::add_channel_member(&channel_id, &agent_id)
}
/// Register all agents from a GroupsFile into the btmsg database.
/// Creates/updates agent records, sets up contact permissions, ensures review channels.
#[tauri::command]
pub fn btmsg_register_agents(config: groups::GroupsFile) -> Result<(), String> {
btmsg::register_agents_from_groups(&config)
}
// ---- Heartbeat monitoring ----
#[tauri::command]
pub fn btmsg_record_heartbeat(agent_id: String) -> Result<(), String> {
btmsg::record_heartbeat(&agent_id)
}
#[tauri::command]
pub fn btmsg_get_stale_agents(group_id: String, threshold_secs: i64) -> Result<Vec<String>, String> {
btmsg::get_stale_agents(&group_id, threshold_secs)
}
// ---- Dead letter queue ----
#[tauri::command]
pub fn btmsg_get_dead_letters(group_id: String, limit: i32) -> Result<Vec<btmsg::DeadLetter>, String> {
btmsg::get_dead_letters(&group_id, limit)
}
#[tauri::command]
pub fn btmsg_clear_dead_letters(group_id: String) -> Result<(), String> {
btmsg::clear_dead_letters(&group_id)
}
// ---- Audit log ----
#[tauri::command]
pub fn audit_log_event(agent_id: String, event_type: String, detail: String) -> Result<(), String> {
btmsg::log_audit_event(&agent_id, &event_type, &detail)
}
#[tauri::command]
pub fn audit_log_list(group_id: String, limit: i32, offset: i32) -> Result<Vec<btmsg::AuditEntry>, String> {
btmsg::get_audit_log(&group_id, limit, offset)
}
#[tauri::command]
pub fn audit_log_for_agent(agent_id: String, limit: i32) -> Result<Vec<btmsg::AuditEntry>, String> {
btmsg::get_audit_log_for_agent(&agent_id, limit)
}

View file

@ -0,0 +1,57 @@
/**
* Audit log bridge reads/writes audit events via Tauri IPC.
* Used by agent-dispatcher, wake-scheduler, and AgentSession for event tracking.
*/
import { invoke } from '@tauri-apps/api/core';
import type { AgentId, GroupId } from '../types/ids';
export interface AuditEntry {
id: number;
agentId: string;
eventType: string;
detail: string;
createdAt: string;
}
/** Audit event types */
export type AuditEventType =
| 'prompt_injection'
| 'wake_event'
| 'btmsg_sent'
| 'btmsg_received'
| 'status_change'
| 'heartbeat_missed'
| 'dead_letter';
/**
* Log an audit event for an agent.
*/
export async function logAuditEvent(
agentId: AgentId,
eventType: AuditEventType,
detail: string,
): Promise<void> {
return invoke('audit_log_event', { agentId, eventType, detail });
}
/**
* Get audit log entries for a group (reverse chronological).
*/
export async function getAuditLog(
groupId: GroupId,
limit: number = 200,
offset: number = 0,
): Promise<AuditEntry[]> {
return invoke('audit_log_list', { groupId, limit, offset });
}
/**
* Get audit log entries for a specific agent.
*/
export async function getAuditLogForAgent(
agentId: AgentId,
limit: number = 50,
): Promise<AuditEntry[]> {
return invoke('audit_log_for_agent', { agentId, limit });
}

View file

@ -159,3 +159,53 @@ export async function createChannel(name: string, groupId: GroupId, createdBy: A
export async function addChannelMember(channelId: string, agentId: AgentId): Promise<void> {
return invoke('btmsg_add_channel_member', { channelId, agentId });
}
/**
* Register all agents from groups config into the btmsg database.
* Creates/updates agent records, sets up contact permissions, ensures review channels.
* Should be called whenever groups are loaded or switched.
*/
export async function registerAgents(config: import('../types/groups').GroupsFile): Promise<void> {
return invoke('btmsg_register_agents', { config });
}
// ---- Heartbeat monitoring ----
/**
* Record a heartbeat for an agent (upserts timestamp).
*/
export async function recordHeartbeat(agentId: AgentId): Promise<void> {
return invoke('btmsg_record_heartbeat', { agentId });
}
/**
* Get stale agents in a group (no heartbeat within threshold).
*/
export async function getStaleAgents(groupId: GroupId, thresholdSecs: number = 300): Promise<string[]> {
return invoke('btmsg_get_stale_agents', { groupId, thresholdSecs });
}
// ---- Dead letter queue ----
export interface DeadLetter {
id: number;
fromAgent: string;
toAgent: string;
content: string;
error: string;
createdAt: string;
}
/**
* Get dead letter queue entries for a group.
*/
export async function getDeadLetters(groupId: GroupId, limit: number = 50): Promise<DeadLetter[]> {
return invoke('btmsg_get_dead_letters', { groupId, limit });
}
/**
* Clear all dead letters for a group.
*/
export async function clearDeadLetters(groupId: GroupId): Promise<void> {
return invoke('btmsg_clear_dead_letters', { groupId });
}

View file

@ -3,6 +3,8 @@
import type { ProjectConfig, GroupAgentRole } from '../../types/groups';
import { generateAgentPrompt } from '../../utils/agent-prompts';
import { getActiveGroup } from '../../stores/workspace.svelte';
import { logAuditEvent } from '../../adapters/audit-bridge';
import type { AgentId } from '../../types/ids';
import {
loadProjectAgentState,
loadAgentMessages,
@ -75,6 +77,12 @@
? '[Context Refresh] Review your role and available tools above. Check your inbox with `btmsg inbox` and review the task board with `bttask board`.'
: '[Context Refresh] Review the instructions above and continue your work.';
contextRefreshPrompt = refreshMsg;
// Audit: log prompt injection event
logAuditEvent(
project.id as unknown as AgentId,
'prompt_injection',
`Context refresh triggered after ${Math.floor(elapsed / 60_000)} min idle`,
).catch(() => {});
}
}, 60_000); // Check every minute
}

View file

@ -0,0 +1,300 @@
<script lang="ts">
import { onMount, onDestroy } from 'svelte';
import { getAuditLog, type AuditEntry, type AuditEventType } from '../../adapters/audit-bridge';
import { getGroupAgents, type BtmsgAgent } from '../../adapters/btmsg-bridge';
import type { GroupId, AgentId } from '../../types/ids';
interface Props {
groupId: GroupId;
}
let { groupId }: Props = $props();
const EVENT_TYPES: AuditEventType[] = [
'prompt_injection',
'wake_event',
'btmsg_sent',
'btmsg_received',
'status_change',
'heartbeat_missed',
'dead_letter',
];
const EVENT_COLORS: Record<string, string> = {
prompt_injection: 'var(--ctp-mauve)',
wake_event: 'var(--ctp-peach)',
btmsg_sent: 'var(--ctp-blue)',
btmsg_received: 'var(--ctp-teal)',
status_change: 'var(--ctp-green)',
heartbeat_missed: 'var(--ctp-yellow)',
dead_letter: 'var(--ctp-red)',
};
const ROLE_COLORS: Record<string, string> = {
manager: 'var(--ctp-mauve)',
architect: 'var(--ctp-blue)',
tester: 'var(--ctp-green)',
reviewer: 'var(--ctp-peach)',
project: 'var(--ctp-text)',
admin: 'var(--ctp-overlay1)',
};
let entries = $state<AuditEntry[]>([]);
let agents = $state<BtmsgAgent[]>([]);
let loading = $state(true);
let error = $state<string | null>(null);
let pollTimer: ReturnType<typeof setInterval> | null = null;
// Filters
let enabledTypes = $state<Set<string>>(new Set(EVENT_TYPES));
let selectedAgent = $state<string>('all');
let filteredEntries = $derived.by(() => {
return entries
.filter(e => enabledTypes.has(e.eventType))
.filter(e => selectedAgent === 'all' || e.agentId === selectedAgent)
.slice(0, 200);
});
function agentName(agentId: string): string {
const agent = agents.find(a => a.id === agentId);
return agent?.name ?? agentId;
}
function agentRole(agentId: string): string {
const agent = agents.find(a => a.id === agentId);
return agent?.role ?? 'unknown';
}
function toggleType(type: string) {
const next = new Set(enabledTypes);
if (next.has(type)) {
next.delete(type);
} else {
next.add(type);
}
enabledTypes = next;
}
function formatTime(createdAt: string): string {
try {
const d = new Date(createdAt + 'Z');
return d.toLocaleTimeString([], { hour: '2-digit', minute: '2-digit', second: '2-digit' });
} catch {
return createdAt;
}
}
async function fetchData() {
try {
const [auditData, agentData] = await Promise.all([
getAuditLog(groupId, 200, 0),
getGroupAgents(groupId),
]);
entries = auditData;
agents = agentData;
error = null;
} catch (e) {
error = String(e);
} finally {
loading = false;
}
}
onMount(() => {
fetchData();
pollTimer = setInterval(fetchData, 5_000);
});
onDestroy(() => {
if (pollTimer) clearInterval(pollTimer);
});
</script>
<div class="audit-log-tab">
<div class="audit-toolbar">
<div class="filter-types">
{#each EVENT_TYPES as type}
<button
class="type-chip"
class:active={enabledTypes.has(type)}
style="--chip-color: {EVENT_COLORS[type] ?? 'var(--ctp-overlay1)'}"
onclick={() => toggleType(type)}
>
{type.replace(/_/g, ' ')}
</button>
{/each}
</div>
<select
class="agent-select"
bind:value={selectedAgent}
>
<option value="all">All agents</option>
{#each agents.filter(a => a.id !== 'admin') as agent}
<option value={agent.id}>{agent.name} ({agent.role})</option>
{/each}
</select>
</div>
<div class="audit-entries">
{#if loading}
<div class="audit-empty">Loading audit log...</div>
{:else if error}
<div class="audit-empty audit-error">Error: {error}</div>
{:else if filteredEntries.length === 0}
<div class="audit-empty">No audit events yet</div>
{:else}
{#each filteredEntries as entry (entry.id)}
<div class="audit-entry">
<span class="entry-time">{formatTime(entry.createdAt)}</span>
<span
class="entry-agent"
style="color: {ROLE_COLORS[agentRole(entry.agentId)] ?? 'var(--ctp-text)'}"
>
{agentName(entry.agentId)}
</span>
<span
class="entry-type"
style="--badge-color: {EVENT_COLORS[entry.eventType] ?? 'var(--ctp-overlay1)'}"
>
{entry.eventType.replace(/_/g, ' ')}
</span>
<span class="entry-detail">{entry.detail}</span>
</div>
{/each}
{/if}
</div>
</div>
<style>
.audit-log-tab {
display: flex;
flex-direction: column;
height: 100%;
overflow: hidden;
font-size: 0.8rem;
flex: 1;
}
.audit-toolbar {
display: flex;
align-items: center;
gap: 0.5rem;
padding: 0.375rem 0.5rem;
border-bottom: 1px solid var(--ctp-surface0);
background: var(--ctp-mantle);
flex-shrink: 0;
flex-wrap: wrap;
}
.filter-types {
display: flex;
gap: 0.25rem;
flex-wrap: wrap;
flex: 1;
}
.type-chip {
font-size: 0.6rem;
padding: 0.125rem 0.375rem;
border: 1px solid var(--chip-color);
border-radius: 0.25rem;
background: transparent;
color: var(--chip-color);
cursor: pointer;
text-transform: capitalize;
transition: background 0.12s, color 0.12s;
font-family: inherit;
opacity: 0.4;
}
.type-chip.active {
background: color-mix(in srgb, var(--chip-color) 15%, transparent);
opacity: 1;
}
.type-chip:hover {
background: color-mix(in srgb, var(--chip-color) 25%, transparent);
opacity: 1;
}
.agent-select {
font-size: 0.7rem;
padding: 0.125rem 0.375rem;
border: 1px solid var(--ctp-surface1);
border-radius: 0.25rem;
background: var(--ctp-surface0);
color: var(--ctp-text);
cursor: pointer;
font-family: inherit;
}
.audit-entries {
flex: 1;
overflow-y: auto;
padding: 0.25rem 0;
}
.audit-empty {
display: flex;
align-items: center;
justify-content: center;
height: 100%;
color: var(--ctp-overlay0);
font-size: 0.8rem;
}
.audit-error {
color: var(--ctp-red);
}
.audit-entry {
display: flex;
align-items: baseline;
gap: 0.5rem;
padding: 0.1875rem 0.5rem;
border-bottom: 1px solid color-mix(in srgb, var(--ctp-surface0) 50%, transparent);
line-height: 1.4;
}
.audit-entry:hover {
background: var(--ctp-surface0);
}
.entry-time {
font-size: 0.65rem;
color: var(--ctp-overlay0);
font-family: var(--font-mono, monospace);
white-space: nowrap;
flex-shrink: 0;
}
.entry-agent {
font-size: 0.7rem;
font-weight: 600;
white-space: nowrap;
flex-shrink: 0;
min-width: 5rem;
}
.entry-type {
font-size: 0.6rem;
padding: 0.0625rem 0.3125rem;
border-radius: 0.1875rem;
background: color-mix(in srgb, var(--badge-color) 15%, transparent);
color: var(--badge-color);
font-weight: 600;
white-space: nowrap;
flex-shrink: 0;
text-transform: capitalize;
}
.entry-detail {
font-size: 0.7rem;
color: var(--ctp-subtext0);
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
min-width: 0;
}
</style>

View file

@ -15,7 +15,11 @@
import ArchitectureTab from './ArchitectureTab.svelte';
import TestingTab from './TestingTab.svelte';
import MetricsPanel from './MetricsPanel.svelte';
import { getTerminalTabs, getActiveGroup } from '../../stores/workspace.svelte';
import AuditLogTab from './AuditLogTab.svelte';
import {
getTerminalTabs, getActiveGroup,
getFocusFlashProjectId, onProjectTabSwitch, onTerminalToggle,
} from '../../stores/workspace.svelte';
import { getProjectHealth, setStallThreshold } from '../../stores/health.svelte';
import { fsWatchProject, fsUnwatchProject, onFsWriteDetected, fsWatcherStatus } from '../../adapters/fs-watcher-bridge';
import { recordExternalWrite } from '../../stores/conflicts.svelte';
@ -24,6 +28,7 @@
import { registerManager, unregisterManager, updateManagerConfig } from '../../stores/wake-scheduler.svelte';
import { setReviewQueueDepth } from '../../stores/health.svelte';
import { reviewQueueCount } from '../../adapters/bttask-bridge';
import { getStaleAgents } from '../../adapters/btmsg-bridge';
interface Props {
project: ProjectConfig;
@ -38,13 +43,16 @@
let mainSessionId = $state<string | null>(null);
let terminalExpanded = $state(false);
type ProjectTab = 'model' | 'docs' | 'context' | 'files' | 'ssh' | 'memories' | 'metrics' | 'tasks' | 'architecture' | 'selenium' | 'tests';
type ProjectTab = 'model' | 'docs' | 'context' | 'files' | 'ssh' | 'memories' | 'metrics' | 'tasks' | 'architecture' | 'selenium' | 'tests' | 'audit';
let activeTab = $state<ProjectTab>('model');
let activeGroup = $derived(getActiveGroup());
let agentRole = $derived(project.agentRole);
let isAgent = $derived(project.isAgent ?? false);
// Heartbeat status for Tier 1 agents
let heartbeatStatus = $state<'healthy' | 'stale' | 'dead' | null>(null);
// PERSISTED-LAZY: track which tabs have been activated at least once
let everActivated = $state<Record<string, boolean>>({});
@ -52,6 +60,23 @@
let projectHealth = $derived(getProjectHealth(project.id));
let termTabCount = $derived(termTabs.length);
// Focus flash animation (triggered by keyboard quick-jump)
let flashProjectId = $derived(getFocusFlashProjectId());
let isFlashing = $derived(flashProjectId === project.id);
// Tab name -> index mapping for keyboard switching
const TAB_INDEX_MAP: ProjectTab[] = [
'model', // 1
'docs', // 2
'context', // 3
'files', // 4
'ssh', // 5
'memories', // 6
'metrics', // 7
'tasks', // 8
'architecture',// 9
];
/** Activate a tab — for lazy tabs, mark as ever-activated */
function switchTab(tab: ProjectTab) {
activeTab = tab;
@ -64,6 +89,23 @@
terminalExpanded = !terminalExpanded;
}
// Listen for keyboard-driven tab switches
$effect(() => {
const unsubTab = onProjectTabSwitch((pid, tabIndex) => {
if (pid !== project.id) return;
const tabName = TAB_INDEX_MAP[tabIndex - 1];
if (tabName) switchTab(tabName);
});
const unsubTerm = onTerminalToggle((pid) => {
if (pid !== project.id) return;
terminalExpanded = !terminalExpanded;
});
return () => {
unsubTab();
unsubTerm();
};
});
// Sync per-project stall threshold to health store
$effect(() => {
setStallThreshold(project.id, project.stallThresholdMin ?? null);
@ -112,6 +154,35 @@
return () => clearInterval(timer);
});
// Heartbeat monitoring for Tier 1 agents
$effect(() => {
if (!project.isAgent) return;
const groupId = activeGroup?.id;
if (!groupId) return;
const pollHeartbeat = () => {
// 300s = healthy threshold, 600s = dead threshold
getStaleAgents(groupId as unknown as GroupId, 300)
.then(staleIds => {
if (staleIds.includes(project.id)) {
// Check if truly dead (>10 min)
getStaleAgents(groupId as unknown as GroupId, 600)
.then(deadIds => {
heartbeatStatus = deadIds.includes(project.id) ? 'dead' : 'stale';
})
.catch(() => { heartbeatStatus = 'stale'; });
} else {
heartbeatStatus = 'healthy';
}
})
.catch(() => { heartbeatStatus = null; });
};
pollHeartbeat();
const timer = setInterval(pollHeartbeat, 15_000); // 15s poll
return () => clearInterval(timer);
});
// S-1 Phase 2: start filesystem watcher for this project's CWD
$effect(() => {
const cwd = project.cwd;
@ -162,6 +233,7 @@
<div
class="project-box"
class:active
class:focus-flash={isFlashing}
style="--accent: var({accentVar})"
data-testid="project-box"
data-project-id={project.id}
@ -171,6 +243,7 @@
{slotIndex}
{active}
health={projectHealth}
{heartbeatStatus}
onclick={onactivate}
/>
@ -223,6 +296,9 @@
<button class="ptab ptab-role" class:active={activeTab === 'selenium'} onclick={() => switchTab('selenium')}>Selenium</button>
<button class="ptab ptab-role" class:active={activeTab === 'tests'} onclick={() => switchTab('tests')}>Tests</button>
{/if}
{#if isAgent && agentRole === 'manager'}
<button class="ptab ptab-role" class:active={activeTab === 'audit'} onclick={() => switchTab('audit')}>Audit</button>
{/if}
</div>
<div class="project-content-area">
@ -281,6 +357,11 @@
<TestingTab cwd={project.cwd} mode="tests" />
</div>
{/if}
{#if everActivated['audit'] && activeGroup}
<div class="content-pane" style:display={activeTab === 'audit' ? 'flex' : 'none'}>
<AuditLogTab groupId={activeGroup.id} />
</div>
{/if}
</div>
<div class="terminal-section" style:display={activeTab === 'model' ? 'flex' : 'none'}>
@ -321,6 +402,21 @@
border-color: var(--accent);
}
.project-box.focus-flash {
animation: focus-flash 0.4s ease-out;
}
@keyframes focus-flash {
0% {
border-color: var(--ctp-blue);
box-shadow: 0 0 0 2px color-mix(in srgb, var(--ctp-blue) 40%, transparent);
}
100% {
border-color: var(--accent);
box-shadow: none;
}
}
.project-tabs {
display: flex;
gap: 0;

View file

@ -10,10 +10,12 @@
slotIndex: number;
active: boolean;
health: ProjectHealth | null;
/** Heartbeat status for Tier 1 agents: 'healthy' | 'stale' | 'dead' | null */
heartbeatStatus?: 'healthy' | 'stale' | 'dead' | null;
onclick: () => void;
}
let { project, slotIndex, active, health, onclick }: Props = $props();
let { project, slotIndex, active, health, heartbeatStatus = null, onclick }: Props = $props();
let accentVar = $derived(PROJECT_ACCENTS[slotIndex % PROJECT_ACCENTS.length]);
@ -82,6 +84,18 @@
<span class="project-id">({project.identifier})</span>
</div>
<div class="header-info">
{#if heartbeatStatus && project.isAgent}
<span
class="info-heartbeat"
class:hb-healthy={heartbeatStatus === 'healthy'}
class:hb-stale={heartbeatStatus === 'stale'}
class:hb-dead={heartbeatStatus === 'dead'}
title={heartbeatStatus === 'healthy' ? 'Agent healthy' : heartbeatStatus === 'stale' ? 'Agent stale — no heartbeat recently' : 'Agent dead — no heartbeat'}
>
{heartbeatStatus === 'healthy' ? '♥' : heartbeatStatus === 'stale' ? '♥' : '♡'}
</span>
<span class="info-sep">·</span>
{/if}
{#if health && health.externalConflictCount > 0}
<button
class="info-conflict info-conflict-external"
@ -273,6 +287,25 @@
background: color-mix(in srgb, var(--ctp-peach) 25%, transparent);
}
.info-heartbeat {
font-size: 0.65rem;
font-weight: 600;
white-space: nowrap;
}
.hb-healthy {
color: var(--ctp-green);
}
.hb-stale {
color: var(--ctp-yellow);
animation: pulse 1.5s ease-in-out infinite;
}
.hb-dead {
color: var(--ctp-red);
}
.info-profile {
font-size: 0.65rem;
color: var(--ctp-blue);

View file

@ -8,6 +8,7 @@ import { getAllProjectHealth, getHealthAggregates } from './health.svelte';
import { getAllWorkItems } from './workspace.svelte';
import { listTasks } from '../adapters/bttask-bridge';
import { getAgentSession } from './agents.svelte';
import { logAuditEvent } from '../adapters/audit-bridge';
import type { GroupId } from '../types/ids';
// --- Types ---
@ -258,4 +259,11 @@ async function evaluateAndEmit(reg: ManagerRegistration): Promise<void> {
context,
mode,
});
// Audit: log wake event
logAuditEvent(
reg.agentId,
'wake_event',
`Auto-wake triggered (strategy=${reg.strategy}, mode=${mode}, score=${evaluation.totalScore.toFixed(2)})`,
).catch(() => {});
}

View file

@ -6,6 +6,7 @@ import { clearHealthTracking } from '../stores/health.svelte';
import { clearAllConflicts } from '../stores/conflicts.svelte';
import { clearWakeScheduler } from '../stores/wake-scheduler.svelte';
import { waitForPendingPersistence } from '../agent-dispatcher';
import { registerAgents } from '../adapters/btmsg-bridge';
export type WorkspaceTab = 'sessions' | 'docs' | 'context' | 'settings' | 'comms';
@ -29,6 +30,58 @@ let activeProjectId = $state<string | null>(null);
/** Terminal tabs per project (keyed by project ID) */
let projectTerminals = $state<Record<string, TerminalTab[]>>({});
// --- Focus flash event (keyboard quick-jump visual feedback) ---
let focusFlashProjectId = $state<string | null>(null);
export function getFocusFlashProjectId(): string | null {
return focusFlashProjectId;
}
export function triggerFocusFlash(projectId: string): void {
focusFlashProjectId = projectId;
// Auto-clear after animation duration
setTimeout(() => {
focusFlashProjectId = null;
}, 400);
}
// --- Project tab switching (keyboard-driven) ---
type ProjectTabSwitchCallback = (projectId: string, tabIndex: number) => void;
let projectTabSwitchCallbacks: ProjectTabSwitchCallback[] = [];
export function onProjectTabSwitch(cb: ProjectTabSwitchCallback): () => void {
projectTabSwitchCallbacks.push(cb);
return () => {
projectTabSwitchCallbacks = projectTabSwitchCallbacks.filter(c => c !== cb);
};
}
export function emitProjectTabSwitch(projectId: string, tabIndex: number): void {
for (const cb of projectTabSwitchCallbacks) {
cb(projectId, tabIndex);
}
}
// --- Terminal toggle (keyboard-driven) ---
type TerminalToggleCallback = (projectId: string) => void;
let terminalToggleCallbacks: TerminalToggleCallback[] = [];
export function onTerminalToggle(cb: TerminalToggleCallback): () => void {
terminalToggleCallbacks.push(cb);
return () => {
terminalToggleCallbacks = terminalToggleCallbacks.filter(c => c !== cb);
};
}
export function emitTerminalToggle(projectId: string): void {
for (const cb of terminalToggleCallbacks) {
cb(projectId);
}
}
// --- Getters ---
export function getGroupsConfig(): GroupsFile | null {
@ -139,6 +192,10 @@ export async function loadWorkspace(initialGroupId?: string): Promise<void> {
groupsConfig = config;
projectTerminals = {};
// Register all agents from config into btmsg database
// (creates agent records, contact permissions, review channels)
registerAgents(config).catch(e => console.warn('Failed to register agents:', e));
// CLI --group flag takes priority, then explicit param, then persisted
let cliGroup: string | null = null;
if (!initialGroupId) {
@ -170,6 +227,8 @@ export async function loadWorkspace(initialGroupId?: string): Promise<void> {
export async function saveWorkspace(): Promise<void> {
if (!groupsConfig) return;
await saveGroups(groupsConfig);
// Re-register agents after config changes (new agents, permission updates)
registerAgents(groupsConfig).catch(e => console.warn('Failed to register agents:', e));
}
// --- Group/project mutation ---