BTerminal/bttask
DexterFromLab f2dcedc460 feat(orchestration): add bttask CLI + GroupAgentsPanel + btmsg Tauri bridge
Phase 2: bttask CLI (Python, SQLite) — task management with role-based
visibility. Kanban board view. Manager/Architect can create tasks,
Tier 2 agents receive tasks via btmsg only.

Phase 3: GroupAgentConfig in groups.json + Rust backend. GroupAgentsPanel
Svelte component above ProjectGrid with status dots, role icons,
unread badges, start/stop buttons.

Phase 4: btmsg Rust bridge (btmsg.rs) — read/write access to btmsg.db.
6 Tauri commands for agent status, messages, and history.
GroupAgentsPanel polls btmsg.db every 5s for live status updates.
2026-03-11 14:03:11 +01:00

710 lines
21 KiB
Python
Executable file

#!/usr/bin/env python3
"""
bttask — Group Task Manager for BTerminal Mission Control.
Hierarchical task management for multi-agent orchestration.
Tasks stored in SQLite, role-based visibility.
Agent identity set via BTMSG_AGENT_ID environment variable.
Usage: bttask <command> [args]
Commands:
list [--all] Show tasks (filtered by role visibility)
add <title> Create task (Manager/Architect only)
assign <id> <agent> Assign task to agent (Manager only)
status <id> <state> Set task status (todo/progress/review/done/blocked)
comment <id> <text> Add comment to task
show <id> Show task details with comments
board Kanban board view
delete <id> Delete task (Manager only)
priorities Reorder tasks by priority
"""
import sqlite3
import sys
import os
import uuid
from pathlib import Path
from datetime import datetime
DB_PATH = Path.home() / ".local" / "share" / "bterminal" / "btmsg.db"
TASK_STATES = ['todo', 'progress', 'review', 'done', 'blocked']
# Roles that can create tasks
CREATOR_ROLES = {'manager', 'architect'}
# Roles that can assign tasks
ASSIGNER_ROLES = {'manager'}
# Roles that can see full task list
VIEWER_ROLES = {'manager', 'architect', 'tester'}
# Colors
C_RESET = "\033[0m"
C_BOLD = "\033[1m"
C_DIM = "\033[2m"
C_RED = "\033[31m"
C_GREEN = "\033[32m"
C_YELLOW = "\033[33m"
C_BLUE = "\033[34m"
C_MAGENTA = "\033[35m"
C_CYAN = "\033[36m"
C_WHITE = "\033[37m"
STATE_COLORS = {
'todo': C_WHITE,
'progress': C_CYAN,
'review': C_YELLOW,
'done': C_GREEN,
'blocked': C_RED,
}
STATE_ICONS = {
'todo': '',
'progress': '',
'review': '',
'done': '',
'blocked': '',
}
PRIORITY_COLORS = {
'critical': C_RED,
'high': C_YELLOW,
'medium': C_WHITE,
'low': C_DIM,
}
def get_db():
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
db = sqlite3.connect(str(DB_PATH))
db.row_factory = sqlite3.Row
db.execute("PRAGMA journal_mode=WAL")
return db
def init_db():
db = get_db()
db.executescript("""
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT DEFAULT '',
status TEXT DEFAULT 'todo',
priority TEXT DEFAULT 'medium',
assigned_to TEXT,
created_by TEXT NOT NULL,
group_id TEXT NOT NULL,
parent_task_id TEXT,
sort_order INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (assigned_to) REFERENCES agents(id),
FOREIGN KEY (created_by) REFERENCES agents(id)
);
CREATE TABLE IF NOT EXISTS task_comments (
id TEXT PRIMARY KEY,
task_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (task_id) REFERENCES tasks(id),
FOREIGN KEY (agent_id) REFERENCES agents(id)
);
CREATE INDEX IF NOT EXISTS idx_tasks_group ON tasks(group_id);
CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status);
CREATE INDEX IF NOT EXISTS idx_tasks_assigned ON tasks(assigned_to);
CREATE INDEX IF NOT EXISTS idx_task_comments_task ON task_comments(task_id);
""")
db.commit()
db.close()
def get_agent_id():
agent_id = os.environ.get("BTMSG_AGENT_ID")
if not agent_id:
print(f"{C_RED}Error: BTMSG_AGENT_ID not set.{C_RESET}")
sys.exit(1)
return agent_id
def get_agent(db, agent_id):
return db.execute("SELECT * FROM agents WHERE id = ?", (agent_id,)).fetchone()
def short_id(task_id):
return task_id[:8] if task_id else "?"
def format_time(ts_str):
if not ts_str:
return "?"
try:
dt = datetime.fromisoformat(ts_str)
return dt.strftime("%m-%d %H:%M")
except (ValueError, TypeError):
return ts_str[:16]
def format_state(state):
icon = STATE_ICONS.get(state, '?')
color = STATE_COLORS.get(state, C_RESET)
return f"{color}{icon} {state}{C_RESET}"
def format_priority(priority):
color = PRIORITY_COLORS.get(priority, C_RESET)
return f"{color}{priority}{C_RESET}"
def check_role(db, agent_id, allowed_roles, action="do this"):
agent = get_agent(db, agent_id)
if not agent:
print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}")
return None
if agent['role'] not in allowed_roles:
print(f"{C_RED}Permission denied: {agent['role']} cannot {action}.{C_RESET}")
print(f"{C_DIM}Required roles: {', '.join(allowed_roles)}{C_RESET}")
return None
return agent
def find_task(db, task_id_prefix, group_id=None):
"""Find task by ID prefix, optionally filtered by group."""
if group_id:
return db.execute(
"SELECT * FROM tasks WHERE id LIKE ? AND group_id = ?",
(task_id_prefix + "%", group_id)
).fetchone()
return db.execute(
"SELECT * FROM tasks WHERE id LIKE ?", (task_id_prefix + "%",)
).fetchone()
# ─── Commands ────────────────────────────────────────────────
def cmd_list(args):
"""List tasks visible to current agent."""
agent_id = get_agent_id()
show_all = "--all" in args
db = get_db()
agent = get_agent(db, agent_id)
if not agent:
print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}")
db.close()
return
# Tier 2 agents cannot see task list
if agent['role'] not in VIEWER_ROLES:
print(f"{C_RED}Access denied: project agents don't see the task list.{C_RESET}")
print(f"{C_DIM}Tasks are assigned to you via btmsg messages.{C_RESET}")
db.close()
return
group_id = agent['group_id']
if show_all:
rows = db.execute(
"SELECT t.*, a.name as assignee_name FROM tasks t "
"LEFT JOIN agents a ON t.assigned_to = a.id "
"WHERE t.group_id = ? ORDER BY t.sort_order, t.created_at",
(group_id,)
).fetchall()
else:
rows = db.execute(
"SELECT t.*, a.name as assignee_name FROM tasks t "
"LEFT JOIN agents a ON t.assigned_to = a.id "
"WHERE t.group_id = ? AND t.status != 'done' "
"ORDER BY t.sort_order, t.created_at",
(group_id,)
).fetchall()
if not rows:
print(f"{C_DIM}No tasks.{C_RESET}")
db.close()
return
label = "All tasks" if show_all else "Active tasks"
print(f"\n{C_BOLD}📋 {label} ({len(rows)}):{C_RESET}\n")
for row in rows:
state_str = format_state(row['status'])
priority_str = format_priority(row['priority'])
assignee = row['assignee_name'] or f"{C_DIM}unassigned{C_RESET}"
print(f" {state_str} [{short_id(row['id'])}] {C_BOLD}{row['title']}{C_RESET}")
print(f" {priority_str}{assignee} {C_DIM}{format_time(row['updated_at'])}{C_RESET}")
# Show comment count
count = db.execute(
"SELECT COUNT(*) FROM task_comments WHERE task_id = ?", (row['id'],)
).fetchone()[0]
if count > 0:
print(f" {C_DIM}💬 {count} comment{'s' if count != 1 else ''}{C_RESET}")
print()
db.close()
def cmd_add(args):
"""Create a new task."""
if not args:
print(f"{C_RED}Usage: bttask add <title> [--desc TEXT] [--priority critical|high|medium|low] [--assign AGENT] [--parent TASK_ID]{C_RESET}")
return
agent_id = get_agent_id()
db = get_db()
agent = check_role(db, agent_id, CREATOR_ROLES, "create tasks")
if not agent:
db.close()
return
# Parse args
title_parts = []
description = ""
priority = "medium"
assign_to = None
parent_id = None
i = 0
while i < len(args):
if args[i] == "--desc" and i + 1 < len(args):
description = args[i + 1]
i += 2
elif args[i] == "--priority" and i + 1 < len(args):
priority = args[i + 1]
if priority not in PRIORITY_COLORS:
print(f"{C_RED}Invalid priority: {priority}. Use: critical, high, medium, low{C_RESET}")
db.close()
return
i += 2
elif args[i] == "--assign" and i + 1 < len(args):
assign_to = args[i + 1]
i += 2
elif args[i] == "--parent" and i + 1 < len(args):
parent_id = args[i + 1]
i += 2
else:
title_parts.append(args[i])
i += 1
title = " ".join(title_parts)
if not title:
print(f"{C_RED}Title is required.{C_RESET}")
db.close()
return
# Verify assignee if specified
if assign_to:
assignee = get_agent(db, assign_to)
if not assignee:
# prefix match
row = db.execute("SELECT * FROM agents WHERE id LIKE ?", (assign_to + "%",)).fetchone()
if row:
assign_to = row['id']
else:
print(f"{C_RED}Agent '{assign_to}' not found.{C_RESET}")
db.close()
return
# Resolve parent task
if parent_id:
parent = find_task(db, parent_id, agent['group_id'])
if not parent:
print(f"{C_RED}Parent task '{parent_id}' not found.{C_RESET}")
db.close()
return
parent_id = parent['id']
# Get max sort_order
max_order = db.execute(
"SELECT COALESCE(MAX(sort_order), 0) FROM tasks WHERE group_id = ?",
(agent['group_id'],)
).fetchone()[0]
task_id = str(uuid.uuid4())
db.execute(
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_by, "
"group_id, parent_task_id, sort_order) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(task_id, title, description, priority, assign_to, agent_id,
agent['group_id'], parent_id, max_order + 1)
)
db.commit()
db.close()
print(f"{C_GREEN}✓ Created: {title}{C_RESET} [{short_id(task_id)}]")
if assign_to:
print(f" {C_DIM}Assigned to: {assign_to}{C_RESET}")
def cmd_assign(args):
"""Assign task to an agent."""
if len(args) < 2:
print(f"{C_RED}Usage: bttask assign <task-id> <agent-id>{C_RESET}")
return
agent_id = get_agent_id()
db = get_db()
agent = check_role(db, agent_id, ASSIGNER_ROLES, "assign tasks")
if not agent:
db.close()
return
task = find_task(db, args[0], agent['group_id'])
if not task:
print(f"{C_RED}Task not found.{C_RESET}")
db.close()
return
assignee_id = args[1]
assignee = get_agent(db, assignee_id)
if not assignee:
row = db.execute("SELECT * FROM agents WHERE id LIKE ?", (assignee_id + "%",)).fetchone()
if row:
assignee = row
assignee_id = row['id']
else:
print(f"{C_RED}Agent '{assignee_id}' not found.{C_RESET}")
db.close()
return
db.execute(
"UPDATE tasks SET assigned_to = ?, updated_at = datetime('now') WHERE id = ?",
(assignee_id, task['id'])
)
db.commit()
db.close()
print(f"{C_GREEN}✓ Assigned [{short_id(task['id'])}] to {assignee['name']}{C_RESET}")
def cmd_status(args):
"""Change task status."""
if len(args) < 2:
print(f"{C_RED}Usage: bttask status <task-id> <{'/'.join(TASK_STATES)}>{C_RESET}")
return
agent_id = get_agent_id()
new_status = args[1]
if new_status not in TASK_STATES:
print(f"{C_RED}Invalid status: {new_status}. Use: {', '.join(TASK_STATES)}{C_RESET}")
return
db = get_db()
agent = get_agent(db, agent_id)
if not agent:
print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}")
db.close()
return
task = find_task(db, args[0], agent['group_id'])
if not task:
print(f"{C_RED}Task not found.{C_RESET}")
db.close()
return
# Tier 2 can only update tasks assigned to them
if agent['role'] not in VIEWER_ROLES and task['assigned_to'] != agent_id:
print(f"{C_RED}Cannot update task not assigned to you.{C_RESET}")
db.close()
return
old_status = task['status']
db.execute(
"UPDATE tasks SET status = ?, updated_at = datetime('now') WHERE id = ?",
(new_status, task['id'])
)
# Auto-add comment for status change
comment_id = str(uuid.uuid4())
db.execute(
"INSERT INTO task_comments (id, task_id, agent_id, content) VALUES (?, ?, ?, ?)",
(comment_id, task['id'], agent_id, f"Status: {old_status}{new_status}")
)
db.commit()
db.close()
print(f"{C_GREEN}✓ [{short_id(task['id'])}] {format_state(old_status)}{format_state(new_status)}{C_RESET}")
def cmd_comment(args):
"""Add comment to a task."""
if len(args) < 2:
print(f"{C_RED}Usage: bttask comment <task-id> <text>{C_RESET}")
return
agent_id = get_agent_id()
db = get_db()
agent = get_agent(db, agent_id)
if not agent:
print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}")
db.close()
return
task = find_task(db, args[0], agent['group_id'])
if not task:
print(f"{C_RED}Task not found.{C_RESET}")
db.close()
return
content = " ".join(args[1:])
comment_id = str(uuid.uuid4())
db.execute(
"INSERT INTO task_comments (id, task_id, agent_id, content) VALUES (?, ?, ?, ?)",
(comment_id, task['id'], agent_id, content)
)
db.execute(
"UPDATE tasks SET updated_at = datetime('now') WHERE id = ?", (task['id'],)
)
db.commit()
db.close()
print(f"{C_GREEN}✓ Comment added to [{short_id(task['id'])}]{C_RESET}")
def cmd_show(args):
"""Show task details with comments."""
if not args:
print(f"{C_RED}Usage: bttask show <task-id>{C_RESET}")
return
agent_id = get_agent_id()
db = get_db()
agent = get_agent(db, agent_id)
if not agent:
print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}")
db.close()
return
task = find_task(db, args[0], agent['group_id'])
if not task:
print(f"{C_RED}Task not found.{C_RESET}")
db.close()
return
# Get assignee name
assignee_name = "unassigned"
if task['assigned_to']:
assignee = get_agent(db, task['assigned_to'])
if assignee:
assignee_name = assignee['name']
# Get creator name
creator = get_agent(db, task['created_by'])
creator_name = creator['name'] if creator else task['created_by']
print(f"\n{C_BOLD}{'' * 60}{C_RESET}")
print(f" {format_state(task['status'])} {C_BOLD}{task['title']}{C_RESET}")
print(f"{C_BOLD}{'' * 60}{C_RESET}")
print(f" {C_DIM}ID:{C_RESET} {task['id']}")
print(f" {C_DIM}Priority:{C_RESET} {format_priority(task['priority'])}")
print(f" {C_DIM}Assigned:{C_RESET} {assignee_name}")
print(f" {C_DIM}Created:{C_RESET} {creator_name} @ {format_time(task['created_at'])}")
print(f" {C_DIM}Updated:{C_RESET} {format_time(task['updated_at'])}")
if task['description']:
print(f"\n {task['description']}")
if task['parent_task_id']:
parent = find_task(db, task['parent_task_id'])
if parent:
print(f" {C_DIM}Parent:{C_RESET} [{short_id(parent['id'])}] {parent['title']}")
# Subtasks
subtasks = db.execute(
"SELECT * FROM tasks WHERE parent_task_id = ? ORDER BY sort_order",
(task['id'],)
).fetchall()
if subtasks:
print(f"\n {C_BOLD}Subtasks:{C_RESET}")
for st in subtasks:
print(f" {format_state(st['status'])} [{short_id(st['id'])}] {st['title']}")
# Comments
comments = db.execute(
"SELECT c.*, a.name as agent_name, a.role as agent_role "
"FROM task_comments c JOIN agents a ON c.agent_id = a.id "
"WHERE c.task_id = ? ORDER BY c.created_at ASC",
(task['id'],)
).fetchall()
if comments:
print(f"\n {C_BOLD}Comments ({len(comments)}):{C_RESET}")
for c in comments:
time_str = format_time(c['created_at'])
print(f" {C_DIM}{time_str}{C_RESET} {C_BOLD}{c['agent_name']}{C_RESET}: {c['content']}")
print(f"\n{C_BOLD}{'' * 60}{C_RESET}\n")
db.close()
def cmd_board(args):
"""Kanban board view."""
agent_id = get_agent_id()
db = get_db()
agent = get_agent(db, agent_id)
if not agent:
print(f"{C_RED}Agent '{agent_id}' not registered.{C_RESET}")
db.close()
return
if agent['role'] not in VIEWER_ROLES:
print(f"{C_RED}Access denied: project agents don't see the task board.{C_RESET}")
db.close()
return
group_id = agent['group_id']
# Get all tasks grouped by status
all_tasks = db.execute(
"SELECT t.*, a.name as assignee_name FROM tasks t "
"LEFT JOIN agents a ON t.assigned_to = a.id "
"WHERE t.group_id = ? ORDER BY t.sort_order, t.created_at",
(group_id,)
).fetchall()
columns = {}
for state in TASK_STATES:
columns[state] = [t for t in all_tasks if t['status'] == state]
# Calculate column width
col_width = 20
# Header
print(f"\n{C_BOLD} 📋 Task Board{C_RESET}\n")
# Column headers
header_line = " "
for state in TASK_STATES:
icon = STATE_ICONS[state]
color = STATE_COLORS[state]
count = len(columns[state])
col_header = f"{color}{icon} {state.upper()} ({count}){C_RESET}"
header_line += col_header.ljust(col_width + len(color) + len(C_RESET) + 5)
print(header_line)
print(f" {'' * (col_width * len(TASK_STATES) + 10)}")
# Find max rows
max_rows = max(len(columns[s]) for s in TASK_STATES) if all_tasks else 0
for row_idx in range(max_rows):
line = " "
for state in TASK_STATES:
tasks_in_col = columns[state]
if row_idx < len(tasks_in_col):
t = tasks_in_col[row_idx]
title = t['title'][:col_width - 2]
assignee = (t['assignee_name'] or "?")[:8]
priority_c = PRIORITY_COLORS.get(t['priority'], C_RESET)
cell = f"{priority_c}{short_id(t['id'])}{C_RESET} {title}"
# Pad to column width (accounting for color codes)
visible_len = len(short_id(t['id'])) + 1 + len(title)
padding = max(0, col_width - visible_len)
line += cell + " " * padding + " "
else:
line += " " * (col_width + 2)
print(line)
# Second line with assignee
line2 = " "
for state in TASK_STATES:
tasks_in_col = columns[state]
if row_idx < len(tasks_in_col):
t = tasks_in_col[row_idx]
assignee = (t['assignee_name'] or "unassigned")[:col_width - 2]
cell = f"{C_DIM}{assignee}{C_RESET}"
visible_len = 4 + len(assignee)
padding = max(0, col_width - visible_len)
line2 += cell + " " * padding + " "
else:
line2 += " " * (col_width + 2)
print(line2)
print()
if not all_tasks:
print(f" {C_DIM}No tasks. Create one: bttask add \"Task title\"{C_RESET}")
print()
db.close()
def cmd_delete(args):
"""Delete a task."""
if not args:
print(f"{C_RED}Usage: bttask delete <task-id>{C_RESET}")
return
agent_id = get_agent_id()
db = get_db()
agent = check_role(db, agent_id, ASSIGNER_ROLES, "delete tasks")
if not agent:
db.close()
return
task = find_task(db, args[0], agent['group_id'])
if not task:
print(f"{C_RED}Task not found.{C_RESET}")
db.close()
return
title = task['title']
db.execute("DELETE FROM task_comments WHERE task_id = ?", (task['id'],))
db.execute("DELETE FROM tasks WHERE id = ?", (task['id'],))
db.commit()
db.close()
print(f"{C_GREEN}✓ Deleted: {title}{C_RESET}")
def cmd_help(args=None):
"""Show help."""
print(__doc__)
# ─── Main dispatch ───────────────────────────────────────────
COMMANDS = {
'list': cmd_list,
'add': cmd_add,
'assign': cmd_assign,
'status': cmd_status,
'comment': cmd_comment,
'show': cmd_show,
'board': cmd_board,
'delete': cmd_delete,
'help': cmd_help,
'--help': cmd_help,
'-h': cmd_help,
}
def main():
init_db()
if len(sys.argv) < 2:
cmd_help()
sys.exit(0)
command = sys.argv[1]
args = sys.argv[2:]
handler = COMMANDS.get(command)
if not handler:
print(f"{C_RED}Unknown command: {command}{C_RESET}")
cmd_help()
sys.exit(1)
handler(args)
if __name__ == "__main__":
main()