agent-orchestrator/btmsg
DexterFromLab a158ed9544 feat(orchestration): multi-agent communication, unified agents, env passthrough
- btmsg: admin role (tier 0), channel messaging (create/list/send/history),
  admin global feed, mark-read conversations
- Rust btmsg module: admin bypass, channels, feed, 8 new Tauri commands
- CommsTab: sidebar chat interface with activity feed, DMs, channels (Ctrl+M)
- Agent unification: Tier 1 agents rendered as ProjectBoxes via agentToProject()
  converter, getAllWorkItems() combines agents + projects in ProjectGrid
- GroupAgentsPanel: click-to-navigate agents to their ProjectBox
- Agent system prompts: generateAgentPrompt() builds comprehensive introductory
  context (role, environment, team, btmsg/bttask docs, workflow instructions)
- AgentSession passes group context to prompt generator via $derived.by()
- BTMSG_AGENT_ID env var passthrough: extra_env field flows through full chain
  (agent-bridge → Rust AgentQueryOptions → NDJSON → sidecar runners → cleanEnv)
- workspace store: updateAgent() for Tier 1 agent config persistence
2026-03-11 14:53:39 +01:00

1183 lines
37 KiB
Python
Executable file

#!/usr/bin/env python3
"""
btmsg — Group Agent Messenger for BTerminal Mission Control.
Hierarchical communication system for multi-agent orchestration.
Agents communicate through messages stored in SQLite.
Agent identity set via BTMSG_AGENT_ID environment variable.
Usage: btmsg <command> [args]
Commands:
inbox Show unread messages (--all for all)
read <msg-id> Read message and mark as read
send <to> <msg> Send message to agent (must be in contacts)
reply <msg-id> <msg> Reply to a message
contacts List allowed contacts for current agent
history <agent-id> Show conversation history with agent
status Show all agents and their status
register <id> <name> <role> <group> [--tier N] [--model M]
Register an agent
allow <from> <to> Add contact permission
whoami Show current agent identity
graph Visual hierarchy with status dots and links
"""
import sqlite3
import sys
import os
import uuid
import json
from pathlib import Path
from datetime import datetime, timezone
DB_PATH = Path.home() / ".local" / "share" / "bterminal" / "btmsg.db"
# Agent roles and their tiers
ROLES = {
'admin': 0,
'manager': 1,
'architect': 1,
'tester': 1,
'reviewer': 1,
'project': 2,
}
# Colors for terminal output
C_RESET = "\033[0m"
C_BOLD = "\033[1m"
C_DIM = "\033[2m"
C_RED = "\033[31m"
C_GREEN = "\033[32m"
C_YELLOW = "\033[33m"
C_BLUE = "\033[34m"
C_MAGENTA = "\033[35m"
C_CYAN = "\033[36m"
ROLE_COLORS = {
'admin': C_CYAN,
'manager': C_MAGENTA,
'architect': C_BLUE,
'tester': C_GREEN,
'reviewer': C_CYAN,
'project': C_YELLOW,
}
def get_db():
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
db = sqlite3.connect(str(DB_PATH))
db.row_factory = sqlite3.Row
db.execute("PRAGMA journal_mode=WAL")
return db
def init_db():
db = get_db()
db.executescript("""
CREATE TABLE IF NOT EXISTS agents (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
role TEXT NOT NULL,
group_id TEXT NOT NULL,
tier INTEGER NOT NULL DEFAULT 2,
model TEXT,
cwd TEXT,
system_prompt TEXT,
status TEXT DEFAULT 'stopped',
last_active_at TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS contacts (
agent_id TEXT NOT NULL,
contact_id TEXT NOT NULL,
PRIMARY KEY (agent_id, contact_id),
FOREIGN KEY (agent_id) REFERENCES agents(id),
FOREIGN KEY (contact_id) REFERENCES agents(id)
);
CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY,
from_agent TEXT NOT NULL,
to_agent TEXT NOT NULL,
content TEXT NOT NULL,
read INTEGER DEFAULT 0,
reply_to TEXT,
group_id TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (from_agent) REFERENCES agents(id),
FOREIGN KEY (to_agent) REFERENCES agents(id)
);
CREATE INDEX IF NOT EXISTS idx_messages_to ON messages(to_agent, read);
CREATE INDEX IF NOT EXISTS idx_messages_from ON messages(from_agent);
CREATE INDEX IF NOT EXISTS idx_messages_group ON messages(group_id);
CREATE INDEX IF NOT EXISTS idx_messages_reply ON messages(reply_to);
CREATE TABLE IF NOT EXISTS channels (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
group_id TEXT NOT NULL,
created_by TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (created_by) REFERENCES agents(id)
);
CREATE TABLE IF NOT EXISTS channel_members (
channel_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
joined_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (channel_id, agent_id),
FOREIGN KEY (channel_id) REFERENCES channels(id),
FOREIGN KEY (agent_id) REFERENCES agents(id)
);
CREATE TABLE IF NOT EXISTS channel_messages (
id TEXT PRIMARY KEY,
channel_id TEXT NOT NULL,
from_agent TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (channel_id) REFERENCES channels(id),
FOREIGN KEY (from_agent) REFERENCES agents(id)
);
CREATE INDEX IF NOT EXISTS idx_channel_messages ON channel_messages(channel_id, created_at);
""")
db.commit()
db.close()
def get_agent_id():
"""Get current agent ID from environment."""
agent_id = os.environ.get("BTMSG_AGENT_ID")
if not agent_id:
print(f"{C_RED}Error: BTMSG_AGENT_ID not set.{C_RESET}")
print(f"{C_DIM}Set it with: export BTMSG_AGENT_ID=<your-agent-id>{C_RESET}")
sys.exit(1)
return agent_id
def get_agent(db, agent_id):
"""Get agent record, or None."""
row = db.execute("SELECT * FROM agents WHERE id = ?", (agent_id,)).fetchone()
return row
def can_message(db, from_id, to_id):
"""Check if from_agent is allowed to message to_agent."""
# Admin (tier 0) can message anyone
sender = get_agent(db, from_id)
if sender and sender['tier'] == 0:
return True
row = db.execute(
"SELECT 1 FROM contacts WHERE agent_id = ? AND contact_id = ?",
(from_id, to_id)
).fetchone()
return row is not None
def format_time(ts_str):
"""Format timestamp for display."""
if not ts_str:
return "?"
try:
dt = datetime.fromisoformat(ts_str)
return dt.strftime("%H:%M:%S")
except (ValueError, TypeError):
return ts_str[:19]
def format_role(role):
"""Colorize role name."""
color = ROLE_COLORS.get(role, C_RESET)
return f"{color}{role}{C_RESET}"
def short_id(msg_id):
"""Show first 8 chars of message ID."""
return msg_id[:8] if msg_id else "?"
# ─── Commands ────────────────────────────────────────────────
def cmd_inbox(args):
"""Show unread messages (or all with --all)."""
agent_id = get_agent_id()
show_all = "--all" in args
db = get_db()
agent = get_agent(db, agent_id)
if not agent:
print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}")
return
if show_all:
rows = db.execute(
"SELECT m.*, a.name as sender_name, a.role as sender_role "
"FROM messages m JOIN agents a ON m.from_agent = a.id "
"WHERE m.to_agent = ? ORDER BY m.created_at DESC LIMIT 50",
(agent_id,)
).fetchall()
else:
rows = db.execute(
"SELECT m.*, a.name as sender_name, a.role as sender_role "
"FROM messages m JOIN agents a ON m.from_agent = a.id "
"WHERE m.to_agent = ? AND m.read = 0 ORDER BY m.created_at ASC",
(agent_id,)
).fetchall()
if not rows:
label = "messages" if show_all else "unread messages"
print(f"{C_DIM}No {label}.{C_RESET}")
return
label = "All messages" if show_all else f"Unread messages ({len(rows)})"
print(f"\n{C_BOLD}📬 {label} for {agent['name']}:{C_RESET}\n")
for row in rows:
read_mark = " " if row['read'] else f"{C_GREEN}{C_RESET}"
role_str = format_role(row['sender_role'])
time_str = format_time(row['created_at'])
reply_str = f" {C_DIM}{short_id(row['reply_to'])}{C_RESET}" if row['reply_to'] else ""
print(f" {read_mark} [{short_id(row['id'])}] {C_DIM}{time_str}{C_RESET} "
f"{C_BOLD}{row['sender_name']}{C_RESET} ({role_str}){reply_str}")
# Show first 120 chars of content
preview = row['content'][:120].replace('\n', ' ')
if len(row['content']) > 120:
preview += "..."
print(f" {preview}\n")
db.close()
def cmd_read(args):
"""Read a specific message and mark as read."""
if not args:
print(f"{C_RED}Usage: btmsg read <msg-id>{C_RESET}")
return
agent_id = get_agent_id()
msg_id_prefix = args[0]
db = get_db()
# Support short IDs (prefix match)
row = db.execute(
"SELECT m.*, a.name as sender_name, a.role as sender_role "
"FROM messages m JOIN agents a ON m.from_agent = a.id "
"WHERE m.id LIKE ? AND m.to_agent = ?",
(msg_id_prefix + "%", agent_id)
).fetchone()
if not row:
print(f"{C_RED}Message not found.{C_RESET}")
db.close()
return
role_str = format_role(row['sender_role'])
time_str = format_time(row['created_at'])
print(f"\n{C_BOLD}{'' * 60}{C_RESET}")
print(f" {C_DIM}ID:{C_RESET} {row['id']}")
print(f" {C_DIM}From:{C_RESET} {C_BOLD}{row['sender_name']}{C_RESET} ({role_str})")
print(f" {C_DIM}Time:{C_RESET} {time_str}")
if row['reply_to']:
print(f" {C_DIM}Re:{C_RESET} {short_id(row['reply_to'])}")
print(f"{C_BOLD}{'' * 60}{C_RESET}\n")
print(f" {row['content']}\n")
print(f"{C_BOLD}{'' * 60}{C_RESET}")
print(f"{C_DIM}Reply: btmsg reply {short_id(row['id'])} \"your message\"{C_RESET}\n")
# Mark as read
db.execute("UPDATE messages SET read = 1 WHERE id = ?", (row['id'],))
db.commit()
db.close()
def cmd_send(args):
"""Send a message to an agent."""
if len(args) < 2:
print(f"{C_RED}Usage: btmsg send <to-agent-id> <message>{C_RESET}")
return
agent_id = get_agent_id()
to_id = args[0]
content = " ".join(args[1:])
db = get_db()
# Verify sender exists
sender = get_agent(db, agent_id)
if not sender:
print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}")
db.close()
return
# Verify recipient exists (support prefix match)
recipient = get_agent(db, to_id)
if not recipient:
# Try prefix match
row = db.execute("SELECT * FROM agents WHERE id LIKE ?", (to_id + "%",)).fetchone()
if row:
recipient = row
to_id = row['id']
else:
print(f"{C_RED}Agent '{to_id}' not found.{C_RESET}")
db.close()
return
# Check contact permission
if not can_message(db, agent_id, to_id):
print(f"{C_RED}Not allowed to message '{recipient['name']}' ({recipient['role']}).{C_RESET}")
print(f"{C_DIM}Check your contacts: btmsg contacts{C_RESET}")
db.close()
return
msg_id = str(uuid.uuid4())
db.execute(
"INSERT INTO messages (id, from_agent, to_agent, content, group_id) "
"VALUES (?, ?, ?, ?, ?)",
(msg_id, agent_id, to_id, content, sender['group_id'])
)
db.commit()
db.close()
print(f"{C_GREEN}✓ Sent to {recipient['name']} ({recipient['role']}){C_RESET} [{short_id(msg_id)}]")
def cmd_reply(args):
"""Reply to a message."""
if len(args) < 2:
print(f"{C_RED}Usage: btmsg reply <msg-id> <message>{C_RESET}")
return
agent_id = get_agent_id()
msg_id_prefix = args[0]
content = " ".join(args[1:])
db = get_db()
# Find original message (prefix match)
original = db.execute(
"SELECT * FROM messages WHERE id LIKE ? AND to_agent = ?",
(msg_id_prefix + "%", agent_id)
).fetchone()
if not original:
print(f"{C_RED}Message not found.{C_RESET}")
db.close()
return
# Reply goes back to sender
sender = get_agent(db, agent_id)
if not sender:
print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}")
db.close()
return
to_id = original['from_agent']
# Check contact permission
if not can_message(db, agent_id, to_id):
recipient = get_agent(db, to_id)
rname = recipient['name'] if recipient else to_id
print(f"{C_RED}Not allowed to reply to '{rname}'.{C_RESET}")
db.close()
return
reply_id = str(uuid.uuid4())
db.execute(
"INSERT INTO messages (id, from_agent, to_agent, content, reply_to, group_id) "
"VALUES (?, ?, ?, ?, ?, ?)",
(reply_id, agent_id, to_id, content, original['id'], sender['group_id'])
)
db.commit()
recipient = get_agent(db, to_id)
rname = recipient['name'] if recipient else to_id
print(f"{C_GREEN}✓ Replied to {rname}{C_RESET} [{short_id(reply_id)}]")
db.close()
def cmd_contacts(args):
"""List allowed contacts for current agent."""
agent_id = get_agent_id()
db = get_db()
agent = get_agent(db, agent_id)
if not agent:
print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}")
db.close()
return
rows = db.execute(
"SELECT a.* FROM contacts c JOIN agents a ON c.contact_id = a.id "
"WHERE c.agent_id = ? ORDER BY a.tier, a.role, a.name",
(agent_id,)
).fetchall()
print(f"\n{C_BOLD}📋 Contacts for {agent['name']} ({format_role(agent['role'])}):{C_RESET}\n")
if not rows:
print(f" {C_DIM}No contacts configured.{C_RESET}")
db.close()
return
current_tier = None
for row in rows:
if row['tier'] != current_tier:
current_tier = row['tier']
tier_label = "Management" if current_tier == 1 else "Execution"
print(f" {C_DIM}── Tier {current_tier}: {tier_label} ──{C_RESET}")
status_dot = f"{C_GREEN}{C_RESET}" if row['status'] == 'active' else f"{C_DIM}{C_RESET}"
print(f" {status_dot} {C_BOLD}{row['id']}{C_RESET}{row['name']} ({format_role(row['role'])})")
print()
db.close()
def cmd_history(args):
"""Show conversation history with another agent."""
if not args:
print(f"{C_RED}Usage: btmsg history <agent-id> [--limit N]{C_RESET}")
return
agent_id = get_agent_id()
other_id = args[0]
limit = 20
if "--limit" in args:
idx = args.index("--limit")
if idx + 1 < len(args):
try:
limit = int(args[idx + 1])
except ValueError:
pass
db = get_db()
# Support prefix match for other_id
other = get_agent(db, other_id)
if not other:
row = db.execute("SELECT * FROM agents WHERE id LIKE ?", (other_id + "%",)).fetchone()
if row:
other = row
other_id = row['id']
else:
print(f"{C_RED}Agent '{other_id}' not found.{C_RESET}")
db.close()
return
agent = get_agent(db, agent_id)
if not agent:
print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}")
db.close()
return
rows = db.execute(
"SELECT m.*, a.name as sender_name, a.role as sender_role "
"FROM messages m JOIN agents a ON m.from_agent = a.id "
"WHERE (m.from_agent = ? AND m.to_agent = ?) "
" OR (m.from_agent = ? AND m.to_agent = ?) "
"ORDER BY m.created_at ASC LIMIT ?",
(agent_id, other_id, other_id, agent_id, limit)
).fetchall()
print(f"\n{C_BOLD}💬 History: {agent['name']}{other['name']}{C_RESET}\n")
if not rows:
print(f" {C_DIM}No messages yet.{C_RESET}\n")
db.close()
return
for row in rows:
time_str = format_time(row['created_at'])
is_me = row['from_agent'] == agent_id
arrow = f"{C_CYAN}{C_RESET}" if is_me else f"{C_YELLOW}{C_RESET}"
name = "You" if is_me else row['sender_name']
print(f" {C_DIM}{time_str}{C_RESET} {arrow} {C_BOLD}{name}{C_RESET}: {row['content']}")
print()
db.close()
def cmd_status(args):
"""Show all agents in the group and their status."""
db = get_db()
# Get current agent's group, or show all
agent_id = os.environ.get("BTMSG_AGENT_ID")
if agent_id:
agent = get_agent(db, agent_id)
if agent:
group_id = agent['group_id']
rows = db.execute(
"SELECT * FROM agents WHERE group_id = ? ORDER BY tier, role, name",
(group_id,)
).fetchall()
else:
rows = db.execute("SELECT * FROM agents ORDER BY group_id, tier, role, name").fetchall()
else:
rows = db.execute("SELECT * FROM agents ORDER BY group_id, tier, role, name").fetchall()
if not rows:
print(f"{C_DIM}No agents registered.{C_RESET}")
db.close()
return
# Count unread per agent
unread_counts = {}
for row in rows:
count = db.execute(
"SELECT COUNT(*) FROM messages WHERE to_agent = ? AND read = 0",
(row['id'],)
).fetchone()[0]
unread_counts[row['id']] = count
print(f"\n{C_BOLD}🤖 Agent Status{C_RESET}\n")
current_tier = None
for row in rows:
if row['tier'] != current_tier:
current_tier = row['tier']
tier_label = "TIER 1 — Management" if current_tier == 1 else "TIER 2 — Execution"
print(f" {C_BOLD}{tier_label}{C_RESET}")
status = row['status'] or 'stopped'
if status == 'active':
status_str = f"{C_GREEN}● active{C_RESET}"
elif status == 'sleeping':
status_str = f"{C_YELLOW}◐ sleeping{C_RESET}"
else:
status_str = f"{C_DIM}○ stopped{C_RESET}"
unread = unread_counts.get(row['id'], 0)
unread_str = f" {C_RED}({unread} unread){C_RESET}" if unread > 0 else ""
model_str = f" {C_DIM}[{row['model']}]{C_RESET}" if row['model'] else ""
print(f" {status_str} {C_BOLD}{row['name']}{C_RESET} "
f"({format_role(row['role'])}) — {row['id']}{model_str}{unread_str}")
print()
db.close()
def cmd_register(args):
"""Register a new agent."""
if len(args) < 4:
print(f"{C_RED}Usage: btmsg register <id> <name> <role> <group-id> [--tier N] [--model M] [--cwd PATH]{C_RESET}")
print(f"{C_DIM}Roles: {', '.join(ROLES.keys())}{C_RESET}")
return
agent_id = args[0]
name = args[1]
role = args[2]
group_id = args[3]
tier = ROLES.get(role, 2)
model = None
cwd = None
# Parse optional flags
i = 4
while i < len(args):
if args[i] == "--tier" and i + 1 < len(args):
tier = int(args[i + 1])
i += 2
elif args[i] == "--model" and i + 1 < len(args):
model = args[i + 1]
i += 2
elif args[i] == "--cwd" and i + 1 < len(args):
cwd = args[i + 1]
i += 2
else:
i += 1
db = get_db()
# Check if agent already exists
existing = get_agent(db, agent_id)
if existing:
print(f"{C_YELLOW}Agent '{agent_id}' already exists. Updating...{C_RESET}")
db.execute(
"UPDATE agents SET name=?, role=?, group_id=?, tier=?, model=?, cwd=? WHERE id=?",
(name, role, group_id, tier, model, cwd, agent_id)
)
else:
db.execute(
"INSERT INTO agents (id, name, role, group_id, tier, model, cwd) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(agent_id, name, role, group_id, tier, model, cwd)
)
db.commit()
db.close()
print(f"{C_GREEN}✓ Registered: {name} ({role}, tier {tier}){C_RESET}")
def cmd_allow(args):
"""Add contact permission between agents."""
if len(args) < 2:
print(f"{C_RED}Usage: btmsg allow <from-agent> <to-agent> [--bidirectional]{C_RESET}")
return
from_id = args[0]
to_id = args[1]
bidir = "--bidirectional" in args or "-b" in args
db = get_db()
# Verify both agents exist
for aid in [from_id, to_id]:
if not get_agent(db, aid):
print(f"{C_RED}Agent '{aid}' not found.{C_RESET}")
db.close()
return
db.execute(
"INSERT OR IGNORE INTO contacts (agent_id, contact_id) VALUES (?, ?)",
(from_id, to_id)
)
if bidir:
db.execute(
"INSERT OR IGNORE INTO contacts (agent_id, contact_id) VALUES (?, ?)",
(to_id, from_id)
)
db.commit()
db.close()
direction = "" if bidir else ""
print(f"{C_GREEN}✓ Contact: {from_id} {direction} {to_id}{C_RESET}")
def cmd_whoami(args):
"""Show current agent identity."""
agent_id = get_agent_id()
db = get_db()
agent = get_agent(db, agent_id)
if not agent:
print(f"{C_YELLOW}Agent ID: {agent_id} (not registered){C_RESET}")
db.close()
return
unread = db.execute(
"SELECT COUNT(*) FROM messages WHERE to_agent = ? AND read = 0",
(agent_id,)
).fetchone()[0]
print(f"\n{C_BOLD}🤖 {agent['name']}{C_RESET}")
print(f" {C_DIM}ID:{C_RESET} {agent['id']}")
print(f" {C_DIM}Role:{C_RESET} {format_role(agent['role'])} (Tier {agent['tier']})")
print(f" {C_DIM}Group:{C_RESET} {agent['group_id']}")
if agent['model']:
print(f" {C_DIM}Model:{C_RESET} {agent['model']}")
if agent['cwd']:
print(f" {C_DIM}CWD:{C_RESET} {agent['cwd']}")
print(f" {C_DIM}Unread:{C_RESET} {unread} messages")
print()
db.close()
def cmd_unread_count(args):
"""Print just the unread count (for notifications)."""
agent_id = os.environ.get("BTMSG_AGENT_ID")
if not agent_id:
print("0")
return
db = get_db()
count = db.execute(
"SELECT COUNT(*) FROM messages WHERE to_agent = ? AND read = 0",
(agent_id,)
).fetchone()[0]
db.close()
print(count)
def cmd_notify(args):
"""Print notification line if there are unread messages (for agent injection)."""
agent_id = os.environ.get("BTMSG_AGENT_ID")
if not agent_id:
return
db = get_db()
count = db.execute(
"SELECT COUNT(*) FROM messages WHERE to_agent = ? AND read = 0",
(agent_id,)
).fetchone()[0]
if count > 0:
# Get latest unread sender
latest = db.execute(
"SELECT a.name, a.role FROM messages m JOIN agents a ON m.from_agent = a.id "
"WHERE m.to_agent = ? AND m.read = 0 ORDER BY m.created_at DESC LIMIT 1",
(agent_id,)
).fetchone()
sender_info = f" (latest from {latest['name']})" if latest else ""
print(f"📬 You have {count} unread message{'s' if count != 1 else ''}!{sender_info}")
print(f" Use: btmsg inbox")
db.close()
def cmd_graph(args):
"""Show visual hierarchy graph with agent status."""
db = get_db()
# Get current group
agent_id = os.environ.get("BTMSG_AGENT_ID")
group_id = None
if agent_id:
agent = get_agent(db, agent_id)
if agent:
group_id = agent['group_id']
if group_id:
agents = db.execute(
"SELECT * FROM agents WHERE group_id = ? ORDER BY tier, role, name",
(group_id,)
).fetchall()
else:
agents = db.execute("SELECT * FROM agents ORDER BY group_id, tier, role, name").fetchall()
if not agents:
print(f"{C_DIM}No agents registered.{C_RESET}")
db.close()
return
# Build contact map
contacts_map = {}
for a in agents:
rows = db.execute(
"SELECT contact_id FROM contacts WHERE agent_id = ?", (a['id'],)
).fetchall()
contacts_map[a['id']] = [r['contact_id'] for r in rows]
# Unread counts
unread = {}
for a in agents:
count = db.execute(
"SELECT COUNT(*) FROM messages WHERE to_agent = ? AND read = 0",
(a['id'],)
).fetchone()[0]
unread[a['id']] = count
# Status dot
def dot(agent):
s = agent['status'] or 'stopped'
if s == 'active':
return f"{C_GREEN}{C_RESET}"
elif s == 'sleeping':
return f"{C_YELLOW}{C_RESET}"
else:
return f"{C_RED}{C_RESET}"
# Agent label with status
def label(agent):
d = dot(agent)
name = agent['name']
role_c = ROLE_COLORS.get(agent['role'], C_RESET)
unread_str = f" {C_RED}{unread[agent['id']]}{C_RESET}" if unread.get(agent['id'], 0) > 0 else ""
model_str = f" {C_DIM}[{agent['model']}]{C_RESET}" if agent['model'] else ""
return f"{d} {C_BOLD}{name}{C_RESET} {role_c}({agent['role']}){C_RESET}{model_str}{unread_str}"
# Separate by tier
tier1 = [a for a in agents if a['tier'] == 1]
tier2 = [a for a in agents if a['tier'] == 2]
# Find manager, architect, tester
manager = next((a for a in tier1 if a['role'] == 'manager'), None)
architect = next((a for a in tier1 if a['role'] == 'architect'), None)
tester = next((a for a in tier1 if a['role'] == 'tester'), None)
other_tier1 = [a for a in tier1 if a['role'] not in ('manager', 'architect', 'tester')]
# Draw graph
group_name = group_id or "all"
print(f"\n{C_BOLD} ╔══════════════════════════════════════════════════════╗{C_RESET}")
print(f"{C_BOLD} ║ Agent Hierarchy — {group_name:<35s}{C_RESET}")
print(f"{C_BOLD} ╚══════════════════════════════════════════════════════╝{C_RESET}")
# USER at top
print(f"\n {C_BOLD}{C_CYAN}👤 USER{C_RESET}")
print(f" {C_DIM}{C_RESET}")
# TIER 1
print(f" {C_DIM} │ ┌─────────────────────────────────────────────┐{C_RESET}")
print(f" {C_DIM} │ │{C_RESET} {C_BOLD}TIER 1 — Management{C_RESET} {C_DIM}{C_RESET}")
print(f" {C_DIM} │ │{C_RESET} {C_DIM}{C_RESET}")
if manager:
print(f" {C_DIM} ├──│{C_RESET} {label(manager)}")
if architect:
print(f" {C_DIM} │ ├── {C_RESET}{label(architect)}")
if tester:
print(f" {C_DIM} │ │ └── {C_RESET}{label(tester)}")
elif tester:
print(f" {C_DIM} │ └── {C_RESET}{label(tester)}")
for a in other_tier1:
print(f" {C_DIM} │ ├── {C_RESET}{label(a)}")
else:
for a in tier1:
print(f" {C_DIM}{C_RESET} {label(a)}")
print(f" {C_DIM}{C_RESET} {C_DIM}{C_RESET}")
print(f" {C_DIM} └─────────────────────────────────────────────┘{C_RESET}")
if tier2:
# Connection lines from manager to tier 2
print(f" {C_DIM}{C_RESET}")
print(f" {C_DIM} │ ┌─────────────────────────────────────────────┐{C_RESET}")
print(f" {C_DIM} │ │{C_RESET} {C_BOLD}TIER 2 — Execution{C_RESET} {C_DIM}{C_RESET}")
print(f" {C_DIM} │ │{C_RESET} {C_DIM}{C_RESET}")
for i, a in enumerate(tier2):
is_last = i == len(tier2) - 1
connector = "" if is_last else ""
print(f" {C_DIM} ├──│{C_RESET} {connector}── {label(a)}")
print(f" {C_DIM}{C_RESET} {C_DIM}{C_RESET}")
print(f" {C_DIM} └─────────────────────────────────────────────┘{C_RESET}")
# Legend
print(f"\n {C_DIM}Legend: {C_GREEN}{C_RESET}{C_DIM} active {C_YELLOW}{C_RESET}{C_DIM} sleeping {C_RED}{C_RESET}{C_DIM} stopped {C_RED}{C_RESET}{C_DIM} unread{C_RESET}")
# Communication lines summary
print(f"\n {C_BOLD}Communication links:{C_RESET}")
for a in agents:
targets = contacts_map.get(a['id'], [])
if targets:
target_names = []
for t_id in targets:
t = get_agent(db, t_id)
if t:
target_names.append(t['name'])
if target_names:
role_c = ROLE_COLORS.get(a['role'], C_RESET)
print(f" {role_c}{a['name']}{C_RESET}{', '.join(target_names)}")
print()
db.close()
def cmd_feed(args):
"""Show all messages in the group (admin only)."""
agent_id = get_agent_id()
db = get_db()
agent = get_agent(db, agent_id)
if not agent or agent['tier'] != 0:
print(f"{C_RED}Admin access required (tier 0).{C_RESET}")
db.close()
return
limit = 50
if "--limit" in args:
idx = args.index("--limit")
if idx + 1 < len(args):
try:
limit = int(args[idx + 1])
except ValueError:
pass
rows = db.execute(
"SELECT m.*, a1.name as sender_name, a1.role as sender_role, "
"a2.name as recipient_name, a2.role as recipient_role "
"FROM messages m "
"JOIN agents a1 ON m.from_agent = a1.id "
"JOIN agents a2 ON m.to_agent = a2.id "
"WHERE m.group_id = ? ORDER BY m.created_at DESC LIMIT ?",
(agent['group_id'], limit)
).fetchall()
print(f"\n{C_BOLD}📡 Activity Feed — All Messages{C_RESET}\n")
if not rows:
print(f" {C_DIM}No messages yet.{C_RESET}\n")
db.close()
return
for row in reversed(rows):
time_str = format_time(row['created_at'])
sender_role = format_role(row['sender_role'])
recipient_role = format_role(row['recipient_role'])
print(f" {C_DIM}{time_str}{C_RESET} {C_BOLD}{row['sender_name']}{C_RESET} ({sender_role}) "
f"{C_BOLD}{row['recipient_name']}{C_RESET} ({recipient_role})")
preview = row['content'][:200].replace('\n', ' ')
if len(row['content']) > 200:
preview += "..."
print(f" {preview}\n")
db.close()
def cmd_channel(args):
"""Channel management commands."""
if not args:
print(f"{C_RED}Usage: btmsg channel <create|list|send|history|add> [args]{C_RESET}")
return
subcmd = args[0]
subargs = args[1:]
handlers = {
'create': cmd_channel_create,
'list': cmd_channel_list,
'send': cmd_channel_send,
'history': cmd_channel_history,
'add': cmd_channel_add,
}
handler = handlers.get(subcmd)
if handler:
handler(subargs)
else:
print(f"{C_RED}Unknown channel command: {subcmd}{C_RESET}")
def cmd_channel_create(args):
"""Create a new channel."""
if not args:
print(f"{C_RED}Usage: btmsg channel create <name>{C_RESET}")
return
agent_id = get_agent_id()
name = args[0]
db = get_db()
agent = get_agent(db, agent_id)
if not agent:
print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}")
db.close()
return
channel_id = str(uuid.uuid4())[:8]
db.execute(
"INSERT INTO channels (id, name, group_id, created_by) VALUES (?, ?, ?, ?)",
(channel_id, name, agent['group_id'], agent_id)
)
db.execute(
"INSERT INTO channel_members (channel_id, agent_id) VALUES (?, ?)",
(channel_id, agent_id)
)
db.commit()
db.close()
print(f"{C_GREEN}✓ Channel #{name} created [{channel_id}]{C_RESET}")
def cmd_channel_list(args):
"""List channels."""
agent_id = get_agent_id()
db = get_db()
agent = get_agent(db, agent_id)
if not agent:
print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}")
db.close()
return
rows = db.execute(
"SELECT c.*, "
"(SELECT COUNT(*) FROM channel_members cm WHERE cm.channel_id = c.id) as member_count, "
"(SELECT content FROM channel_messages cm2 WHERE cm2.channel_id = c.id "
"ORDER BY cm2.created_at DESC LIMIT 1) as last_msg "
"FROM channels c WHERE c.group_id = ? ORDER BY c.name",
(agent['group_id'],)
).fetchall()
print(f"\n{C_BOLD}📢 Channels{C_RESET}\n")
if not rows:
print(f" {C_DIM}No channels yet. Create one: btmsg channel create <name>{C_RESET}\n")
db.close()
return
for row in rows:
last = row['last_msg'] or ""
if len(last) > 60:
last = last[:60] + "..."
print(f" {C_CYAN}#{row['name']}{C_RESET} ({row['member_count']} members) [{row['id']}]")
if last:
print(f" {C_DIM}{last}{C_RESET}")
print()
db.close()
def cmd_channel_send(args):
"""Send message to a channel."""
if len(args) < 2:
print(f"{C_RED}Usage: btmsg channel send <channel-name> <message>{C_RESET}")
return
agent_id = get_agent_id()
channel_ref = args[0]
content = " ".join(args[1:])
db = get_db()
agent = get_agent(db, agent_id)
if not agent:
print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}")
db.close()
return
channel = db.execute(
"SELECT * FROM channels WHERE id = ? OR name = ?",
(channel_ref, channel_ref)
).fetchone()
if not channel:
print(f"{C_RED}Channel '{channel_ref}' not found.{C_RESET}")
db.close()
return
if agent['tier'] > 0:
is_member = db.execute(
"SELECT 1 FROM channel_members WHERE channel_id = ? AND agent_id = ?",
(channel['id'], agent_id)
).fetchone()
if not is_member:
print(f"{C_RED}Not a member of #{channel['name']}.{C_RESET}")
db.close()
return
msg_id = str(uuid.uuid4())
db.execute(
"INSERT INTO channel_messages (id, channel_id, from_agent, content) VALUES (?, ?, ?, ?)",
(msg_id, channel['id'], agent_id, content)
)
db.commit()
db.close()
print(f"{C_GREEN}✓ Sent to #{channel['name']}{C_RESET} [{short_id(msg_id)}]")
def cmd_channel_history(args):
"""Show channel message history."""
if not args:
print(f"{C_RED}Usage: btmsg channel history <channel-name> [--limit N]{C_RESET}")
return
channel_ref = args[0]
limit = 30
if "--limit" in args:
idx = args.index("--limit")
if idx + 1 < len(args):
try:
limit = int(args[idx + 1])
except ValueError:
pass
db = get_db()
channel = db.execute(
"SELECT * FROM channels WHERE id = ? OR name = ?",
(channel_ref, channel_ref)
).fetchone()
if not channel:
print(f"{C_RED}Channel '{channel_ref}' not found.{C_RESET}")
db.close()
return
rows = db.execute(
"SELECT cm.*, a.name as sender_name, a.role as sender_role "
"FROM channel_messages cm JOIN agents a ON cm.from_agent = a.id "
"WHERE cm.channel_id = ? ORDER BY cm.created_at ASC LIMIT ?",
(channel['id'], limit)
).fetchall()
print(f"\n{C_BOLD}📢 #{channel['name']}{C_RESET}\n")
if not rows:
print(f" {C_DIM}No messages yet.{C_RESET}\n")
db.close()
return
for row in rows:
time_str = format_time(row['created_at'])
role_str = format_role(row['sender_role'])
print(f" {C_DIM}{time_str}{C_RESET} {C_BOLD}{row['sender_name']}{C_RESET} ({role_str}): {row['content']}")
print()
db.close()
def cmd_channel_add(args):
"""Add member to a channel."""
if len(args) < 2:
print(f"{C_RED}Usage: btmsg channel add <channel-name> <agent-id>{C_RESET}")
return
channel_ref = args[0]
member_id = args[1]
db = get_db()
channel = db.execute(
"SELECT * FROM channels WHERE id = ? OR name = ?",
(channel_ref, channel_ref)
).fetchone()
if not channel:
print(f"{C_RED}Channel '{channel_ref}' not found.{C_RESET}")
db.close()
return
member = get_agent(db, member_id)
if not member:
print(f"{C_RED}Agent '{member_id}' not found.{C_RESET}")
db.close()
return
db.execute(
"INSERT OR IGNORE INTO channel_members (channel_id, agent_id) VALUES (?, ?)",
(channel['id'], member_id)
)
db.commit()
db.close()
print(f"{C_GREEN}✓ Added {member['name']} to #{channel['name']}{C_RESET}")
def cmd_help(args=None):
"""Show help."""
print(__doc__)
# ─── Main dispatch ───────────────────────────────────────────
COMMANDS = {
'inbox': cmd_inbox,
'read': cmd_read,
'send': cmd_send,
'reply': cmd_reply,
'contacts': cmd_contacts,
'history': cmd_history,
'status': cmd_status,
'register': cmd_register,
'allow': cmd_allow,
'whoami': cmd_whoami,
'graph': cmd_graph,
'unread': cmd_unread_count,
'notify': cmd_notify,
'feed': cmd_feed,
'channel': cmd_channel,
'help': cmd_help,
'--help': cmd_help,
'-h': cmd_help,
}
def main():
init_db()
if len(sys.argv) < 2:
cmd_help()
sys.exit(0)
command = sys.argv[1]
args = sys.argv[2:]
handler = COMMANDS.get(command)
if not handler:
print(f"{C_RED}Unknown command: {command}{C_RESET}")
cmd_help()
sys.exit(1)
handler(args)
if __name__ == "__main__":
main()