#!/usr/bin/env python3 """ btmsg — Group Agent Messenger for BTerminal Mission Control. Hierarchical communication system for multi-agent orchestration. Agents communicate through messages stored in SQLite. Agent identity set via BTMSG_AGENT_ID environment variable. Usage: btmsg [args] Commands: inbox Show unread messages (--all for all) read Read message and mark as read send Send message to agent (must be in contacts) reply Reply to a message contacts List allowed contacts for current agent history Show conversation history with agent status Show all agents and their status register [--tier N] [--model M] Register an agent allow Add contact permission whoami Show current agent identity """ import sqlite3 import sys import os import uuid import json from pathlib import Path from datetime import datetime, timezone DB_PATH = Path.home() / ".local" / "share" / "bterminal" / "btmsg.db" # Agent roles and their tiers ROLES = { 'manager': 1, 'architect': 1, 'tester': 1, 'reviewer': 1, 'project': 2, } # Colors for terminal output C_RESET = "\033[0m" C_BOLD = "\033[1m" C_DIM = "\033[2m" C_RED = "\033[31m" C_GREEN = "\033[32m" C_YELLOW = "\033[33m" C_BLUE = "\033[34m" C_MAGENTA = "\033[35m" C_CYAN = "\033[36m" ROLE_COLORS = { 'manager': C_MAGENTA, 'architect': C_BLUE, 'tester': C_GREEN, 'reviewer': C_CYAN, 'project': C_YELLOW, } def get_db(): DB_PATH.parent.mkdir(parents=True, exist_ok=True) db = sqlite3.connect(str(DB_PATH)) db.row_factory = sqlite3.Row db.execute("PRAGMA journal_mode=WAL") return db def init_db(): db = get_db() db.executescript(""" CREATE TABLE IF NOT EXISTS agents ( id TEXT PRIMARY KEY, name TEXT NOT NULL, role TEXT NOT NULL, group_id TEXT NOT NULL, tier INTEGER NOT NULL DEFAULT 2, model TEXT, cwd TEXT, system_prompt TEXT, status TEXT DEFAULT 'stopped', last_active_at TEXT, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS contacts ( agent_id TEXT NOT NULL, contact_id TEXT NOT NULL, PRIMARY KEY (agent_id, contact_id), FOREIGN KEY (agent_id) REFERENCES agents(id), FOREIGN KEY (contact_id) REFERENCES agents(id) ); CREATE TABLE IF NOT EXISTS messages ( id TEXT PRIMARY KEY, from_agent TEXT NOT NULL, to_agent TEXT NOT NULL, content TEXT NOT NULL, read INTEGER DEFAULT 0, reply_to TEXT, group_id TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')), FOREIGN KEY (from_agent) REFERENCES agents(id), FOREIGN KEY (to_agent) REFERENCES agents(id) ); CREATE INDEX IF NOT EXISTS idx_messages_to ON messages(to_agent, read); CREATE INDEX IF NOT EXISTS idx_messages_from ON messages(from_agent); CREATE INDEX IF NOT EXISTS idx_messages_group ON messages(group_id); CREATE INDEX IF NOT EXISTS idx_messages_reply ON messages(reply_to); """) db.commit() db.close() def get_agent_id(): """Get current agent ID from environment.""" agent_id = os.environ.get("BTMSG_AGENT_ID") if not agent_id: print(f"{C_RED}Error: BTMSG_AGENT_ID not set.{C_RESET}") print(f"{C_DIM}Set it with: export BTMSG_AGENT_ID={C_RESET}") sys.exit(1) return agent_id def get_agent(db, agent_id): """Get agent record, or None.""" row = db.execute("SELECT * FROM agents WHERE id = ?", (agent_id,)).fetchone() return row def can_message(db, from_id, to_id): """Check if from_agent is allowed to message to_agent.""" row = db.execute( "SELECT 1 FROM contacts WHERE agent_id = ? AND contact_id = ?", (from_id, to_id) ).fetchone() return row is not None def format_time(ts_str): """Format timestamp for display.""" if not ts_str: return "?" try: dt = datetime.fromisoformat(ts_str) return dt.strftime("%H:%M:%S") except (ValueError, TypeError): return ts_str[:19] def format_role(role): """Colorize role name.""" color = ROLE_COLORS.get(role, C_RESET) return f"{color}{role}{C_RESET}" def short_id(msg_id): """Show first 8 chars of message ID.""" return msg_id[:8] if msg_id else "?" # ─── Commands ──────────────────────────────────────────────── def cmd_inbox(args): """Show unread messages (or all with --all).""" agent_id = get_agent_id() show_all = "--all" in args db = get_db() agent = get_agent(db, agent_id) if not agent: print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}") return if show_all: rows = db.execute( "SELECT m.*, a.name as sender_name, a.role as sender_role " "FROM messages m JOIN agents a ON m.from_agent = a.id " "WHERE m.to_agent = ? ORDER BY m.created_at DESC LIMIT 50", (agent_id,) ).fetchall() else: rows = db.execute( "SELECT m.*, a.name as sender_name, a.role as sender_role " "FROM messages m JOIN agents a ON m.from_agent = a.id " "WHERE m.to_agent = ? AND m.read = 0 ORDER BY m.created_at ASC", (agent_id,) ).fetchall() if not rows: label = "messages" if show_all else "unread messages" print(f"{C_DIM}No {label}.{C_RESET}") return label = "All messages" if show_all else f"Unread messages ({len(rows)})" print(f"\n{C_BOLD}📬 {label} for {agent['name']}:{C_RESET}\n") for row in rows: read_mark = " " if row['read'] else f"{C_GREEN}●{C_RESET}" role_str = format_role(row['sender_role']) time_str = format_time(row['created_at']) reply_str = f" {C_DIM}↩ {short_id(row['reply_to'])}{C_RESET}" if row['reply_to'] else "" print(f" {read_mark} [{short_id(row['id'])}] {C_DIM}{time_str}{C_RESET} " f"{C_BOLD}{row['sender_name']}{C_RESET} ({role_str}){reply_str}") # Show first 120 chars of content preview = row['content'][:120].replace('\n', ' ') if len(row['content']) > 120: preview += "..." print(f" {preview}\n") db.close() def cmd_read(args): """Read a specific message and mark as read.""" if not args: print(f"{C_RED}Usage: btmsg read {C_RESET}") return agent_id = get_agent_id() msg_id_prefix = args[0] db = get_db() # Support short IDs (prefix match) row = db.execute( "SELECT m.*, a.name as sender_name, a.role as sender_role " "FROM messages m JOIN agents a ON m.from_agent = a.id " "WHERE m.id LIKE ? AND m.to_agent = ?", (msg_id_prefix + "%", agent_id) ).fetchone() if not row: print(f"{C_RED}Message not found.{C_RESET}") db.close() return role_str = format_role(row['sender_role']) time_str = format_time(row['created_at']) print(f"\n{C_BOLD}{'─' * 60}{C_RESET}") print(f" {C_DIM}ID:{C_RESET} {row['id']}") print(f" {C_DIM}From:{C_RESET} {C_BOLD}{row['sender_name']}{C_RESET} ({role_str})") print(f" {C_DIM}Time:{C_RESET} {time_str}") if row['reply_to']: print(f" {C_DIM}Re:{C_RESET} {short_id(row['reply_to'])}") print(f"{C_BOLD}{'─' * 60}{C_RESET}\n") print(f" {row['content']}\n") print(f"{C_BOLD}{'─' * 60}{C_RESET}") print(f"{C_DIM}Reply: btmsg reply {short_id(row['id'])} \"your message\"{C_RESET}\n") # Mark as read db.execute("UPDATE messages SET read = 1 WHERE id = ?", (row['id'],)) db.commit() db.close() def cmd_send(args): """Send a message to an agent.""" if len(args) < 2: print(f"{C_RED}Usage: btmsg send {C_RESET}") return agent_id = get_agent_id() to_id = args[0] content = " ".join(args[1:]) db = get_db() # Verify sender exists sender = get_agent(db, agent_id) if not sender: print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}") db.close() return # Verify recipient exists (support prefix match) recipient = get_agent(db, to_id) if not recipient: # Try prefix match row = db.execute("SELECT * FROM agents WHERE id LIKE ?", (to_id + "%",)).fetchone() if row: recipient = row to_id = row['id'] else: print(f"{C_RED}Agent '{to_id}' not found.{C_RESET}") db.close() return # Check contact permission if not can_message(db, agent_id, to_id): print(f"{C_RED}Not allowed to message '{recipient['name']}' ({recipient['role']}).{C_RESET}") print(f"{C_DIM}Check your contacts: btmsg contacts{C_RESET}") db.close() return msg_id = str(uuid.uuid4()) db.execute( "INSERT INTO messages (id, from_agent, to_agent, content, group_id) " "VALUES (?, ?, ?, ?, ?)", (msg_id, agent_id, to_id, content, sender['group_id']) ) db.commit() db.close() print(f"{C_GREEN}✓ Sent to {recipient['name']} ({recipient['role']}){C_RESET} [{short_id(msg_id)}]") def cmd_reply(args): """Reply to a message.""" if len(args) < 2: print(f"{C_RED}Usage: btmsg reply {C_RESET}") return agent_id = get_agent_id() msg_id_prefix = args[0] content = " ".join(args[1:]) db = get_db() # Find original message (prefix match) original = db.execute( "SELECT * FROM messages WHERE id LIKE ? AND to_agent = ?", (msg_id_prefix + "%", agent_id) ).fetchone() if not original: print(f"{C_RED}Message not found.{C_RESET}") db.close() return # Reply goes back to sender sender = get_agent(db, agent_id) if not sender: print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}") db.close() return to_id = original['from_agent'] # Check contact permission if not can_message(db, agent_id, to_id): recipient = get_agent(db, to_id) rname = recipient['name'] if recipient else to_id print(f"{C_RED}Not allowed to reply to '{rname}'.{C_RESET}") db.close() return reply_id = str(uuid.uuid4()) db.execute( "INSERT INTO messages (id, from_agent, to_agent, content, reply_to, group_id) " "VALUES (?, ?, ?, ?, ?, ?)", (reply_id, agent_id, to_id, content, original['id'], sender['group_id']) ) db.commit() recipient = get_agent(db, to_id) rname = recipient['name'] if recipient else to_id print(f"{C_GREEN}✓ Replied to {rname}{C_RESET} [{short_id(reply_id)}]") db.close() def cmd_contacts(args): """List allowed contacts for current agent.""" agent_id = get_agent_id() db = get_db() agent = get_agent(db, agent_id) if not agent: print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}") db.close() return rows = db.execute( "SELECT a.* FROM contacts c JOIN agents a ON c.contact_id = a.id " "WHERE c.agent_id = ? ORDER BY a.tier, a.role, a.name", (agent_id,) ).fetchall() print(f"\n{C_BOLD}📋 Contacts for {agent['name']} ({format_role(agent['role'])}):{C_RESET}\n") if not rows: print(f" {C_DIM}No contacts configured.{C_RESET}") db.close() return current_tier = None for row in rows: if row['tier'] != current_tier: current_tier = row['tier'] tier_label = "Management" if current_tier == 1 else "Execution" print(f" {C_DIM}── Tier {current_tier}: {tier_label} ──{C_RESET}") status_dot = f"{C_GREEN}●{C_RESET}" if row['status'] == 'active' else f"{C_DIM}○{C_RESET}" print(f" {status_dot} {C_BOLD}{row['id']}{C_RESET} — {row['name']} ({format_role(row['role'])})") print() db.close() def cmd_history(args): """Show conversation history with another agent.""" if not args: print(f"{C_RED}Usage: btmsg history [--limit N]{C_RESET}") return agent_id = get_agent_id() other_id = args[0] limit = 20 if "--limit" in args: idx = args.index("--limit") if idx + 1 < len(args): try: limit = int(args[idx + 1]) except ValueError: pass db = get_db() # Support prefix match for other_id other = get_agent(db, other_id) if not other: row = db.execute("SELECT * FROM agents WHERE id LIKE ?", (other_id + "%",)).fetchone() if row: other = row other_id = row['id'] else: print(f"{C_RED}Agent '{other_id}' not found.{C_RESET}") db.close() return agent = get_agent(db, agent_id) if not agent: print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}") db.close() return rows = db.execute( "SELECT m.*, a.name as sender_name, a.role as sender_role " "FROM messages m JOIN agents a ON m.from_agent = a.id " "WHERE (m.from_agent = ? AND m.to_agent = ?) " " OR (m.from_agent = ? AND m.to_agent = ?) " "ORDER BY m.created_at ASC LIMIT ?", (agent_id, other_id, other_id, agent_id, limit) ).fetchall() print(f"\n{C_BOLD}💬 History: {agent['name']} ↔ {other['name']}{C_RESET}\n") if not rows: print(f" {C_DIM}No messages yet.{C_RESET}\n") db.close() return for row in rows: time_str = format_time(row['created_at']) is_me = row['from_agent'] == agent_id arrow = f"{C_CYAN}→{C_RESET}" if is_me else f"{C_YELLOW}←{C_RESET}" name = "You" if is_me else row['sender_name'] print(f" {C_DIM}{time_str}{C_RESET} {arrow} {C_BOLD}{name}{C_RESET}: {row['content']}") print() db.close() def cmd_status(args): """Show all agents in the group and their status.""" db = get_db() # Get current agent's group, or show all agent_id = os.environ.get("BTMSG_AGENT_ID") if agent_id: agent = get_agent(db, agent_id) if agent: group_id = agent['group_id'] rows = db.execute( "SELECT * FROM agents WHERE group_id = ? ORDER BY tier, role, name", (group_id,) ).fetchall() else: rows = db.execute("SELECT * FROM agents ORDER BY group_id, tier, role, name").fetchall() else: rows = db.execute("SELECT * FROM agents ORDER BY group_id, tier, role, name").fetchall() if not rows: print(f"{C_DIM}No agents registered.{C_RESET}") db.close() return # Count unread per agent unread_counts = {} for row in rows: count = db.execute( "SELECT COUNT(*) FROM messages WHERE to_agent = ? AND read = 0", (row['id'],) ).fetchone()[0] unread_counts[row['id']] = count print(f"\n{C_BOLD}🤖 Agent Status{C_RESET}\n") current_tier = None for row in rows: if row['tier'] != current_tier: current_tier = row['tier'] tier_label = "TIER 1 — Management" if current_tier == 1 else "TIER 2 — Execution" print(f" {C_BOLD}{tier_label}{C_RESET}") status = row['status'] or 'stopped' if status == 'active': status_str = f"{C_GREEN}● active{C_RESET}" elif status == 'sleeping': status_str = f"{C_YELLOW}◐ sleeping{C_RESET}" else: status_str = f"{C_DIM}○ stopped{C_RESET}" unread = unread_counts.get(row['id'], 0) unread_str = f" {C_RED}({unread} unread){C_RESET}" if unread > 0 else "" model_str = f" {C_DIM}[{row['model']}]{C_RESET}" if row['model'] else "" print(f" {status_str} {C_BOLD}{row['name']}{C_RESET} " f"({format_role(row['role'])}) — {row['id']}{model_str}{unread_str}") print() db.close() def cmd_register(args): """Register a new agent.""" if len(args) < 4: print(f"{C_RED}Usage: btmsg register [--tier N] [--model M] [--cwd PATH]{C_RESET}") print(f"{C_DIM}Roles: {', '.join(ROLES.keys())}{C_RESET}") return agent_id = args[0] name = args[1] role = args[2] group_id = args[3] tier = ROLES.get(role, 2) model = None cwd = None # Parse optional flags i = 4 while i < len(args): if args[i] == "--tier" and i + 1 < len(args): tier = int(args[i + 1]) i += 2 elif args[i] == "--model" and i + 1 < len(args): model = args[i + 1] i += 2 elif args[i] == "--cwd" and i + 1 < len(args): cwd = args[i + 1] i += 2 else: i += 1 db = get_db() # Check if agent already exists existing = get_agent(db, agent_id) if existing: print(f"{C_YELLOW}Agent '{agent_id}' already exists. Updating...{C_RESET}") db.execute( "UPDATE agents SET name=?, role=?, group_id=?, tier=?, model=?, cwd=? WHERE id=?", (name, role, group_id, tier, model, cwd, agent_id) ) else: db.execute( "INSERT INTO agents (id, name, role, group_id, tier, model, cwd) " "VALUES (?, ?, ?, ?, ?, ?, ?)", (agent_id, name, role, group_id, tier, model, cwd) ) db.commit() db.close() print(f"{C_GREEN}✓ Registered: {name} ({role}, tier {tier}){C_RESET}") def cmd_allow(args): """Add contact permission between agents.""" if len(args) < 2: print(f"{C_RED}Usage: btmsg allow [--bidirectional]{C_RESET}") return from_id = args[0] to_id = args[1] bidir = "--bidirectional" in args or "-b" in args db = get_db() # Verify both agents exist for aid in [from_id, to_id]: if not get_agent(db, aid): print(f"{C_RED}Agent '{aid}' not found.{C_RESET}") db.close() return db.execute( "INSERT OR IGNORE INTO contacts (agent_id, contact_id) VALUES (?, ?)", (from_id, to_id) ) if bidir: db.execute( "INSERT OR IGNORE INTO contacts (agent_id, contact_id) VALUES (?, ?)", (to_id, from_id) ) db.commit() db.close() direction = "↔" if bidir else "→" print(f"{C_GREEN}✓ Contact: {from_id} {direction} {to_id}{C_RESET}") def cmd_whoami(args): """Show current agent identity.""" agent_id = get_agent_id() db = get_db() agent = get_agent(db, agent_id) if not agent: print(f"{C_YELLOW}Agent ID: {agent_id} (not registered){C_RESET}") db.close() return unread = db.execute( "SELECT COUNT(*) FROM messages WHERE to_agent = ? AND read = 0", (agent_id,) ).fetchone()[0] print(f"\n{C_BOLD}🤖 {agent['name']}{C_RESET}") print(f" {C_DIM}ID:{C_RESET} {agent['id']}") print(f" {C_DIM}Role:{C_RESET} {format_role(agent['role'])} (Tier {agent['tier']})") print(f" {C_DIM}Group:{C_RESET} {agent['group_id']}") if agent['model']: print(f" {C_DIM}Model:{C_RESET} {agent['model']}") if agent['cwd']: print(f" {C_DIM}CWD:{C_RESET} {agent['cwd']}") print(f" {C_DIM}Unread:{C_RESET} {unread} messages") print() db.close() def cmd_unread_count(args): """Print just the unread count (for notifications).""" agent_id = os.environ.get("BTMSG_AGENT_ID") if not agent_id: print("0") return db = get_db() count = db.execute( "SELECT COUNT(*) FROM messages WHERE to_agent = ? AND read = 0", (agent_id,) ).fetchone()[0] db.close() print(count) def cmd_notify(args): """Print notification line if there are unread messages (for agent injection).""" agent_id = os.environ.get("BTMSG_AGENT_ID") if not agent_id: return db = get_db() count = db.execute( "SELECT COUNT(*) FROM messages WHERE to_agent = ? AND read = 0", (agent_id,) ).fetchone()[0] if count > 0: # Get latest unread sender latest = db.execute( "SELECT a.name, a.role FROM messages m JOIN agents a ON m.from_agent = a.id " "WHERE m.to_agent = ? AND m.read = 0 ORDER BY m.created_at DESC LIMIT 1", (agent_id,) ).fetchone() sender_info = f" (latest from {latest['name']})" if latest else "" print(f"📬 You have {count} unread message{'s' if count != 1 else ''}!{sender_info}") print(f" Use: btmsg inbox") db.close() def cmd_help(args=None): """Show help.""" print(__doc__) # ─── Main dispatch ─────────────────────────────────────────── COMMANDS = { 'inbox': cmd_inbox, 'read': cmd_read, 'send': cmd_send, 'reply': cmd_reply, 'contacts': cmd_contacts, 'history': cmd_history, 'status': cmd_status, 'register': cmd_register, 'allow': cmd_allow, 'whoami': cmd_whoami, 'unread': cmd_unread_count, 'notify': cmd_notify, 'help': cmd_help, '--help': cmd_help, '-h': cmd_help, } def main(): init_db() if len(sys.argv) < 2: cmd_help() sys.exit(0) command = sys.argv[1] args = sys.argv[2:] handler = COMMANDS.get(command) if not handler: print(f"{C_RED}Unknown command: {command}{C_RESET}") cmd_help() sys.exit(1) handler(args) if __name__ == "__main__": main()