agent-orchestrator/consult
DexterFromLab f6b3a3e080 feat: add consult (multi-model tribunal) CLI tool
Fix NoneType bug in generate_report (counterArgument can be None).
Add consult to install.sh alongside ctx for symlink-based installation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 13:17:48 +01:00

1268 lines
47 KiB
Python
Executable file

#!/usr/bin/env python3
"""consult — Query external AI models via OpenRouter.
Usage:
consult "question" Ask default model
consult -m google/gemini-2.5-pro "q" Ask specific model
consult -f file.py "review this code" Include file as context
cat log.txt | consult "analyze errors" Pipe stdin as context
consult models List enabled models
consult debate "problem statement" Multi-model adversarial debate
"""
import argparse
import json
import os
import re
import sys
import threading
import time
import urllib.error
import urllib.request
CONFIG_DIR = os.path.expanduser("~/.config/bterminal")
CONFIG_FILE = os.path.join(CONFIG_DIR, "consult.json")
OPENROUTER_API = "https://openrouter.ai/api/v1/chat/completions"
OPENROUTER_MODELS_API = "https://openrouter.ai/api/v1/models"
CLAUDE_CODE_MODELS = {
"claude-code/opus": {"name": "Claude Opus 4.6", "alias": "opus", "source": "claude-code"},
"claude-code/sonnet": {"name": "Claude Sonnet 4.6", "alias": "sonnet", "source": "claude-code"},
"claude-code/haiku": {"name": "Claude Haiku 4.5", "alias": "haiku", "source": "claude-code"},
}
DEFAULT_CONFIG = {
"api_key": "",
"default_model": "google/gemini-2.5-pro",
"models": {
"google/gemini-2.5-pro": {"enabled": True, "name": "Gemini 2.5 Pro", "source": "openrouter"},
"openai/gpt-4o": {"enabled": True, "name": "GPT-4o", "source": "openrouter"},
"openai/o3-mini": {"enabled": True, "name": "o3-mini", "source": "openrouter"},
"deepseek/deepseek-r1": {"enabled": True, "name": "DeepSeek R1", "source": "openrouter"},
"anthropic/claude-sonnet-4": {"enabled": False, "name": "Claude Sonnet 4", "source": "openrouter"},
"meta-llama/llama-4-maverick": {"enabled": False, "name": "Llama 4 Maverick", "source": "openrouter"},
},
"tribunal": {
"analyst_model": "",
"advocate_model": "",
"critic_model": "",
"arbiter_model": "",
"max_rounds": 3,
},
}
def load_config():
if os.path.isfile(CONFIG_FILE):
try:
with open(CONFIG_FILE) as f:
cfg = json.load(f)
# Ensure Claude Code models exist
models = cfg.setdefault("models", {})
changed = False
for mid, info in CLAUDE_CODE_MODELS.items():
if mid not in models:
models[mid] = dict(info, enabled=False)
changed = True
if changed:
with open(CONFIG_FILE, "w") as f:
json.dump(cfg, f, indent=2)
return cfg
except (json.JSONDecodeError, IOError):
pass
os.makedirs(CONFIG_DIR, exist_ok=True)
cfg = DEFAULT_CONFIG.copy()
with open(CONFIG_FILE, "w") as f:
json.dump(cfg, f, indent=2)
return cfg
def get_api_key():
cfg = load_config()
key = cfg.get("api_key", "")
if not key:
print(
"Error: No API key. Set it in BTerminal Consult tab "
"or edit ~/.config/bterminal/consult.json",
file=sys.stderr,
)
sys.exit(1)
return key
# ── OpenRouter API call (non-streaming, JSON mode) ──────────────────────
def call_openrouter(api_key, model, system_prompt, user_prompt):
"""Single non-streaming call. Returns (text, usage_dict)."""
payload = json.dumps({
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"max_tokens": 16384,
"temperature": 0.7,
}).encode()
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/DexterFromLab/BTerminal",
"X-Title": "BTerminal Consult Tribunal",
}
req = urllib.request.Request(OPENROUTER_API, data=payload, headers=headers)
resp = urllib.request.urlopen(req, timeout=300)
data = json.loads(resp.read().decode())
if "error" in data:
raise RuntimeError(data["error"].get("message", str(data["error"])))
text = data.get("choices", [{}])[0].get("message", {}).get("content", "")
usage = data.get("usage", {})
return text, usage
# ── Claude Code API call ────────────────────────────────────────────────
def is_claude_code_model(model_id):
"""Check if model ID refers to a Claude Code local model."""
return model_id.startswith("claude-code/")
def get_claude_alias(model_id):
"""Get Claude CLI alias from model ID (e.g. 'claude-code/opus' -> 'opus')."""
info = CLAUDE_CODE_MODELS.get(model_id)
if info:
return info["alias"]
return model_id.split("/", 1)[-1]
def call_claude_code(model_id, system_prompt, user_prompt):
"""Call Claude Code CLI. Returns (text, usage_dict)."""
import subprocess
alias = get_claude_alias(model_id)
full_prompt = f"{system_prompt}\n\n{user_prompt}" if system_prompt else user_prompt
env = os.environ.copy()
env.pop("CLAUDECODE", None)
try:
result = subprocess.run(
["claude", "-p", "--model", alias, full_prompt],
capture_output=True, text=True, timeout=600, env=env,
)
except subprocess.TimeoutExpired:
raise RuntimeError(f"Claude Code timeout (10min) — prompt too large or model too slow")
if result.returncode != 0:
raise RuntimeError(f"Claude Code error: {result.stderr.strip()}")
text = result.stdout.strip()
# Claude Code CLI doesn't report token usage, estimate roughly
usage = {
"prompt_tokens": len(full_prompt) // 4,
"completion_tokens": len(text) // 4,
}
return text, usage
def call_model(api_key, model, system_prompt, user_prompt):
"""Dispatch to the right API based on model source."""
if is_claude_code_model(model):
return call_claude_code(model, system_prompt, user_prompt)
return call_openrouter(api_key, model, system_prompt, user_prompt)
# ── JSON extraction ─────────────────────────────────────────────────────
def extract_json(text):
"""Extract JSON object from LLM response (handles fences, preamble)."""
# Direct parse
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Code fence
m = re.search(r"```(?:json)?\s*([\s\S]*?)```", text)
if m:
try:
return json.loads(m.group(1).strip())
except json.JSONDecodeError:
pass
# Balanced brace extraction
start = text.find("{")
if start == -1:
return None
depth = 0
in_str = False
esc = False
for i in range(start, len(text)):
ch = text[i]
if esc:
esc = False
continue
if ch == "\\" and in_str:
esc = True
continue
if ch == '"':
in_str = not in_str
continue
if in_str:
continue
if ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
try:
return json.loads(text[start:i + 1])
except json.JSONDecodeError:
return None
return None
def coerce(data):
"""Coerce LLM output quirks (string numbers, lowercase enums, etc.)."""
if not isinstance(data, dict):
return data
for k in ("confidence", "confidenceChange"):
if k in data and isinstance(data[k], str):
try:
data[k] = float(data[k])
except ValueError:
data[k] = 0.5
for k in ("severity", "likelihood", "impact", "category", "verdict"):
if k in data and isinstance(data[k], str):
data[k] = data[k].upper().replace(" ", "_")
for k in ("dimensions", "constraints", "criteria", "tradeoffs",
"assumptions", "unresolved", "actionItems"):
if k in data and isinstance(data[k], list):
data[k] = [
(x if isinstance(x, str)
else x.get("name", x.get("label", x.get("title",
x.get("description", json.dumps(x))))))
if isinstance(x, dict) else str(x)
for x in data[k]
]
for k in ("solutions", "objections", "defenses", "riskMatrix", "revisedSolutions"):
if k in data and isinstance(data[k], list):
data[k] = [coerce(x) if isinstance(x, dict) else x for x in data[k]]
return data
# ── Pricing ─────────────────────────────────────────────────────────────
_pricing_cache = None
def fetch_pricing(api_key):
global _pricing_cache
if _pricing_cache is not None:
return _pricing_cache
try:
req = urllib.request.Request(
OPENROUTER_MODELS_API,
headers={"Authorization": f"Bearer {api_key}"},
)
resp = urllib.request.urlopen(req, timeout=15)
data = json.loads(resp.read().decode())
pricing = {}
for m in data.get("data", []):
p = m.get("pricing", {})
if p.get("prompt") and p.get("completion"):
pricing[m["id"]] = {
"input": float(p["prompt"]) * 1_000_000,
"output": float(p["completion"]) * 1_000_000,
}
_pricing_cache = pricing
return pricing
except Exception:
_pricing_cache = {}
return {}
def calc_cost(pricing, model, usage):
p = pricing.get(model, {"input": 3, "output": 15})
inp = usage.get("prompt_tokens", 0)
out = usage.get("completion_tokens", 0)
cost = (inp * p["input"] + out * p["output"]) / 1_000_000
return inp, out, cost
# ── Agent prompts ───────────────────────────────────────────────────────
SYSTEM_PROMPTS = {
"analyst": """You are the Analyst — the first mind to touch the problem. Your job is to decompose the problem into a structured frame that downstream agents (Advocates, Critics, Arbiter) can work from.
Your output is the shared foundation. If you miss a dimension, the entire debate will have a blind spot.
1. Restate the problem in precise, unambiguous language.
2. Identify dimensions — the axes along which solutions will vary.
3. Surface constraints — hard limits stated or implied.
4. Define success criteria — what "solved" looks like.
5. Provide context — relevant background.
6. Suggest perspectives — 2-3 distinct lenses for solution generation.
IMPORTANT: Respond with valid JSON only. No markdown code fences, no explanation outside the JSON.""",
"advocate": """You are the Advocate — a solution generator and defender.
Mode 1 (Generation): Produce 2-3 distinct approaches from different perspectives. Each solution must be genuinely different. Include: title, approach (specific enough to implement), tradeoffs (genuine downsides), assumptions (falsifiable), confidence (0-1), perspective.
Mode 2 (Defense): For each objection, respond with:
- DEFENDED: objection is wrong, provide counter-argument
- CONCEDED: objection is valid, describe how solution changes
- PARTIALLY_CONCEDED: has merit but doesn't invalidate core approach
Do NOT defend everything reflexively. Intellectual honesty makes the output more useful.
IMPORTANT: Respond with valid JSON only. No markdown code fences, no explanation outside the JSON.""",
"critic": """You are the Critic — an adversarial evaluator who stress-tests solutions. Find real flaws, not contrarian noise.
1. Identify genuine weaknesses that would cause real problems
2. Challenge hidden assumptions
3. Find edge cases where the solution breaks
4. Assess feasibility given constraints
5. Flag missing considerations
Categories: FEASIBILITY, SCALABILITY, COST, COMPLEXITY, SECURITY, MAINTAINABILITY, PERFORMANCE, UX, CORRECTNESS, MISSING_REQUIREMENT
Severity: CRITICAL, HIGH, MEDIUM, LOW
Every objection must have a concrete argument and evidence. Suggest mitigations where possible.
IMPORTANT: Respond with valid JSON only. No markdown code fences, no explanation outside the JSON.""",
"arbiter": """You are the Arbiter — the final decision-maker who synthesizes an adversarial debate into a clear, actionable recommendation.
1. Weigh all solutions, objections, and defenses
2. Assess what survived adversarial scrutiny
3. Evaluate whether concessions fatally weakened solutions or led to improvements
4. Identify the strongest path given success criteria and constraints
5. Name unresolved questions
6. Provide concrete, executable action items
Confidence: 0.9+ clear winner, 0.7-0.9 good with minor concerns, 0.5-0.7 significant uncertainty, <0.5 no clear winner.
IMPORTANT: Respond with valid JSON only. No markdown code fences, no explanation outside the JSON.""",
}
# ── User prompts ────────────────────────────────────────────────────────
def analyst_prompt(problem):
return f"""Analyze the following problem and produce a structured problem frame.
Problem: {problem}
Produce a JSON object with:
- problem: {{ statement, dimensions (array of strings), constraints (array of strings), criteria (array of strings), context (string) }}
- suggestedPerspectives: string[] (2-3 distinct perspectives)"""
def advocate_generate_prompt(problem, problem_frame, perspectives):
ctx = json.dumps({"problemFrame": problem_frame, "perspectives": perspectives}, indent=2)
return f"""Generate solutions for the following problem.
Problem: {problem}
Context:
{ctx}
Generate 2-3 distinct solutions from different perspectives. Each solution must have clear tradeoffs and explicit assumptions.
Produce a JSON object with:
- solutions: [{{ id (e.g. "S-1"), title, approach (detailed), tradeoffs (array), assumptions (array), confidence (0-1), perspective }}]"""
def advocate_defend_prompt(problem, problem_frame, solutions, objections):
ctx = json.dumps({
"problemFrame": problem_frame, "solutions": solutions,
"defenseMode": True, "objections": objections,
}, indent=2)
return f"""Defend solutions against objections.
Problem: {problem}
Context:
{ctx}
For each objection respond with DEFENDED, CONCEDED, or PARTIALLY_CONCEDED.
Produce a JSON object with:
- defenses: [{{ objectionId, verdict ("DEFENDED"|"CONCEDED"|"PARTIALLY_CONCEDED"), counterArgument, modification (string|null), confidenceChange (number -1 to +1) }}]
- revisedSolutions: (optional) updated solutions incorporating concessions"""
def critic_prompt(problem, problem_frame, solutions, targeted, prev_objections=None, prev_defenses=None):
ctx = {"problemFrame": problem_frame, "solutions": solutions, "targeted": targeted}
if prev_objections:
ctx["previousObjections"] = [
{"id": o.get("id", "?"), "targetSolutionId": o.get("targetSolutionId", "?"),
"category": o.get("category", "?"), "argument": (o.get("argument") or "")[:200]}
for o in prev_objections
]
if prev_defenses:
ctx["previousDefenses"] = [
{"objectionId": d.get("objectionId", "?"), "verdict": d.get("verdict", "?"),
"counterArgument": (d.get("counterArgument") or "")[:200]}
for d in prev_defenses
]
ctx_str = json.dumps(ctx, indent=2)
focus = ("Focus ONLY on aspects not already covered by previous objections."
if targeted else "This is a FULL critique. Evaluate every solution thoroughly.")
return f"""Critically evaluate the following solutions.
Problem: {problem}
Context:
{ctx_str}
{focus}
Produce a JSON object with:
- objections: [{{ id (e.g. "OBJ-1"), targetSolutionId, category, severity, argument, evidence (string|null), suggestedMitigation (string|null) }}]
- overallAssessment: string"""
def arbiter_prompt(problem, problem_frame, solutions, all_objections, all_defenses, rounds_summary, converged, convergence_round):
ctx = json.dumps({
"problemFrame": problem_frame, "solutions": solutions,
"allObjections": all_objections, "allDefenses": all_defenses,
"rounds": rounds_summary, "converged": converged,
"convergenceRound": convergence_round,
}, indent=2)
return f"""Synthesize the full debate and produce a final ruling.
Problem: {problem}
Context:
{ctx}
Produce a JSON object with:
- recommendation: string (specific and actionable)
- confidence: number (0-1)
- rationale: string (reference specific debate points)
- dissent: string|null
- unresolved: string[]
- actionItems: string[]
- riskMatrix: [{{ risk, likelihood, impact, mitigation }}]"""
# ── Convergence ─────────────────────────────────────────────────────────
def extract_delta(new_objections, all_objections):
existing_ids = {o["id"] for o in all_objections}
existing_args = {(o.get("argument") or "").lower()[:80] for o in all_objections}
return [
o for o in new_objections
if o["id"] not in existing_ids
and (o.get("argument") or "").lower()[:80] not in existing_args
]
# ── Report generation ───────────────────────────────────────────────────
def severity_badge(s):
return {"CRITICAL": "[C]", "HIGH": "[H]", "MEDIUM": "[M]", "LOW": "[L]"}.get(s, f"[{s}]")
def confidence_bar(c):
filled = round(c * 10)
return "\u2588" * filled + "\u2591" * (10 - filled) + f" {c*100:.0f}%"
def generate_report(problem, problem_frame, solutions, rounds, all_objections,
all_defenses, ruling, converged, convergence_round,
total_rounds, duration, cost_data, agent_models):
lines = ["# Tribunal Report", "", f"**Date:** {time.strftime('%Y-%m-%d')}", ""]
# Models used
lines.append("## Models")
lines.append("")
lines.append(f"| Role | Model |")
lines.append(f"|------|-------|")
for role in ("analyst", "advocate", "critic", "arbiter"):
lines.append(f"| {role.title()} | `{agent_models.get(role, '?')}` |")
lines.append("")
# Problem
lines.append("## Problem")
lines.append("")
lines.append(problem_frame.get("statement", problem))
dims = problem_frame.get("dimensions", [])
if dims:
lines.append(f"\n**Dimensions:** {', '.join(dims)}")
lines.append("")
# Stats
lines.append("## Debate Statistics")
lines.append("")
lines.append(f"- **Solutions proposed:** {len(solutions)}")
lines.append(f"- **Total objections:** {len(all_objections)}")
lines.append(f"- **Total defenses:** {len(all_defenses)}")
lines.append(f"- **Rounds:** {total_rounds}")
conv_str = f"yes (round {convergence_round})" if converged else "no"
lines.append(f"- **Converged:** {conv_str}")
lines.append(f"- **Duration:** {duration:.1f}s")
if cost_data:
lines.append(f"- **Estimated cost:** ${cost_data['total_cost']:.2f}")
lines.append("")
# Solutions
lines.append("## Solutions")
lines.append("")
for s in solutions:
lines.append(f"### {s.get('id', '?')}: {s.get('title', 'Untitled')}")
lines.append("")
lines.append(f"**Perspective:** {s.get('perspective', '?')} | "
f"**Confidence:** {confidence_bar(s.get('confidence', 0))}")
lines.append("")
lines.append(f"**Approach:** {s.get('approach', '?')}")
lines.append("")
tradeoffs = s.get("tradeoffs", [])
if tradeoffs:
lines.append("**Tradeoffs:**")
for t in tradeoffs:
lines.append(f"- {t}")
lines.append("")
lines.append("---")
lines.append("")
# Objections table
if all_objections:
lines.append("## Objections")
lines.append("")
lines.append("| ID | Target | Category | Severity | Argument |")
lines.append("|----|--------|----------|----------|----------|")
for o in all_objections:
arg = o.get("argument", "")[:80] + ("..." if len(o.get("argument", "")) > 80 else "")
lines.append(f"| {o['id']} | {o['targetSolutionId']} | {o['category']} | "
f"{severity_badge(o['severity'])} {o['severity']} | {arg} |")
lines.append("")
# Defenses
if all_defenses:
lines.append("## Defenses")
lines.append("")
for d in all_defenses:
v = d.get("verdict", "?")
icon = {"DEFENDED": "\u2713", "CONCEDED": "\u2717", "PARTIALLY_CONCEDED": "~"}.get(v, "?")
ca = (d.get("counterArgument") or "")[:120]
lines.append(f"- **{icon} {d['objectionId']}** \u2192 {v}: {ca}")
if d.get("modification"):
lines.append(f" - *Modification:* {d['modification'][:200]}")
lines.append("")
# Ruling
if ruling:
lines.append("## Final Ruling")
lines.append("")
lines.append(f"**Confidence:** {confidence_bar(ruling.get('confidence', 0))}")
lines.append("")
lines.append("### Recommendation")
lines.append("")
lines.append(ruling.get("recommendation", "No recommendation"))
lines.append("")
lines.append("### Rationale")
lines.append("")
lines.append(ruling.get("rationale", ""))
lines.append("")
if ruling.get("unresolved"):
lines.append("### Unresolved Issues")
lines.append("")
for u in ruling["unresolved"]:
lines.append(f"- {u}")
lines.append("")
if ruling.get("actionItems"):
lines.append("### Action Items")
lines.append("")
for i, a in enumerate(ruling["actionItems"], 1):
lines.append(f"{i}. {a}")
lines.append("")
if ruling.get("riskMatrix"):
lines.append("### Risk Matrix")
lines.append("")
lines.append("| Risk | Likelihood | Impact | Mitigation |")
lines.append("|------|-----------|--------|------------|")
for r in ruling["riskMatrix"]:
lines.append(f"| {r.get('risk','')} | {r.get('likelihood','')} | "
f"{r.get('impact','')} | {r.get('mitigation','')} |")
lines.append("")
# Cost breakdown
if cost_data and cost_data.get("by_agent"):
lines.append("## Cost Breakdown")
lines.append("")
lines.append("| Agent | Model | Calls | Tokens In | Tokens Out | Cost |")
lines.append("|-------|-------|-------|-----------|------------|------|")
for name, stats in cost_data["by_agent"].items():
lines.append(f"| {name} | `{stats['model']}` | {stats['calls']} | "
f"~{stats['input']//1000}K | ~{stats['output']//1000}K | "
f"${stats['cost']:.3f} |")
lines.append(f"\n**Total: ${cost_data['total_cost']:.2f}**")
lines.append("")
return "\n".join(lines)
# ── Main debate pipeline ────────────────────────────────────────────────
def log(msg):
print(f"[tribunal] {msg}", file=sys.stderr)
class Spinner:
"""Animated spinner showing elapsed time during long operations."""
FRAMES = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
def __init__(self, message):
self._message = message
self._stop = threading.Event()
self._thread = None
def start(self):
self._stop.clear()
self._thread = threading.Thread(target=self._spin, daemon=True)
self._thread.start()
def _spin(self):
start = time.time()
i = 0
while not self._stop.is_set():
elapsed = time.time() - start
frame = self.FRAMES[i % len(self.FRAMES)]
sys.stderr.write(f"\r[tribunal] {self._message} {frame} {elapsed:.0f}s")
sys.stderr.flush()
i += 1
self._stop.wait(0.1)
def stop(self, final_message=None):
self._stop.set()
if self._thread:
self._thread.join()
sys.stderr.write("\r\033[K")
if final_message:
sys.stderr.write(f"[tribunal] {final_message}\n")
sys.stderr.flush()
def run_agent(api_key, role, model, user_prompt, retries=1):
"""Call agent and parse JSON response. Returns (parsed_dict, usage)."""
source = "Claude Code" if is_claude_code_model(model) else "OpenRouter"
label = f"{role.title()} ({model} [{source}])"
for attempt in range(retries + 1):
spinner = Spinner(label)
spinner.start()
start = time.time()
try:
text, usage = call_model(api_key, model, SYSTEM_PROMPTS[role], user_prompt)
except RuntimeError as e:
elapsed = time.time() - start
if attempt < retries:
spinner.stop(f"{role.title()} FAILED ({elapsed:.1f}s) — retrying...")
continue
spinner.stop(f"{role.title()} FAILED ({elapsed:.1f}s) — {e}")
return None, {"prompt_tokens": 0, "completion_tokens": 0}
except Exception:
elapsed = time.time() - start
spinner.stop(f"{role.title()} FAILED ({elapsed:.1f}s)")
raise
elapsed = time.time() - start
spinner.stop(f"{role.title()} done ({elapsed:.1f}s)")
break
parsed = extract_json(text)
if parsed is None:
log(f"WARNING: {role} returned no parseable JSON")
return None, usage
return coerce(parsed), usage
def debate(problem, analyst_model, advocate_model, critic_model, arbiter_model,
max_rounds=3, single_pass=False, output_dir=".tribunal"):
all_models = [analyst_model, advocate_model, critic_model, arbiter_model]
needs_openrouter = any(not is_claude_code_model(m) for m in all_models)
api_key = get_api_key() if needs_openrouter else ""
start_time = time.time()
# Resolve models
agent_models = {
"analyst": analyst_model,
"advocate": advocate_model,
"critic": critic_model,
"arbiter": arbiter_model,
}
print(file=sys.stderr)
log("Multi-model adversarial debate")
log(f"Analyst: {analyst_model}")
log(f"Advocate: {advocate_model}")
log(f"Critic: {critic_model}")
log(f"Arbiter: {arbiter_model}")
log(f"Max rounds: {max_rounds}")
print(file=sys.stderr)
# Fetch pricing
pricing = fetch_pricing(api_key)
cost_data = {"total_input": 0, "total_output": 0, "total_cost": 0, "by_agent": {}}
def track(agent_name, model, usage):
inp, out, cost = calc_cost(pricing, model, usage)
cost_data["total_input"] += inp
cost_data["total_output"] += out
cost_data["total_cost"] += cost
if agent_name not in cost_data["by_agent"]:
cost_data["by_agent"][agent_name] = {"input": 0, "output": 0, "cost": 0, "calls": 0, "model": model}
a = cost_data["by_agent"][agent_name]
a["input"] += inp
a["output"] += out
a["cost"] += cost
a["calls"] += 1
# ── ANSI colors ──
C_BOLD = "\033[1m"
C_DIM = "\033[2m"
C_CYAN = "\033[36m"
C_GREEN = "\033[32m"
C_YELLOW = "\033[33m"
C_RED = "\033[31m"
C_MAGENTA = "\033[35m"
C_RESET = "\033[0m"
def section(title):
width = 60
print(f"\n{C_CYAN}{C_BOLD}{'' * width}", file=sys.stderr)
print(f" {title}", file=sys.stderr)
print(f"{'' * width}{C_RESET}\n", file=sys.stderr)
def bullet(text, indent=2):
print(f"{' ' * indent}{C_DIM}{C_RESET} {text}", file=sys.stderr)
# ── Phase 0: Analyst ──
section("Phase 0: ANALYST — framing the problem")
result, usage = run_agent(api_key, "analyst", analyst_model, analyst_prompt(problem))
track("analyst", analyst_model, usage)
if result and "problem" in result:
problem_frame = result["problem"]
perspectives = result.get("suggestedPerspectives", ["pragmatist", "innovator"])
else:
log("Analyst parse failed, using minimal frame")
problem_frame = {"statement": problem, "dimensions": ["feasibility", "complexity"],
"constraints": [], "criteria": ["correctness"], "context": ""}
perspectives = ["pragmatist", "innovator"]
# Show analyst output
stmt = problem_frame.get("statement", problem)
print(f"\n {C_BOLD}Problem frame:{C_RESET}", file=sys.stderr)
# Wrap long statement
for i in range(0, len(stmt), 76):
print(f" {stmt[i:i+76]}", file=sys.stderr)
dims = problem_frame.get("dimensions", [])
if dims:
print(f"\n {C_BOLD}Dimensions ({len(dims)}):{C_RESET}", file=sys.stderr)
for d in dims:
label = d[:90] + "..." if len(d) > 90 else d
bullet(label, indent=4)
constraints = problem_frame.get("constraints", [])
if constraints:
print(f"\n {C_BOLD}Constraints ({len(constraints)}):{C_RESET}", file=sys.stderr)
for c in constraints:
label = c[:90] + "..." if len(c) > 90 else c
bullet(label, indent=4)
criteria = problem_frame.get("criteria", [])
if criteria:
print(f"\n {C_BOLD}Success criteria ({len(criteria)}):{C_RESET}", file=sys.stderr)
for c in criteria:
label = c[:90] + "..." if len(c) > 90 else c
bullet(label, indent=4)
print(f"\n {C_BOLD}Perspectives:{C_RESET} {', '.join(perspectives)}", file=sys.stderr)
print(file=sys.stderr)
# ── Phase 1: Advocate generates solutions ──
section("Phase 1: ADVOCATE — generating solutions")
result, usage = run_agent(api_key, "advocate", advocate_model,
advocate_generate_prompt(problem, problem_frame, perspectives))
track("advocate", advocate_model, usage)
solutions = result.get("solutions", []) if result else []
if not solutions:
log("ERROR: No solutions generated, aborting")
return
# Show solutions
for s in solutions:
sid = s.get("id", "?")
title = s.get("title", "Untitled")
conf = s.get("confidence", 0)
persp = s.get("perspective", "?")
print(f"\n {C_GREEN}{C_BOLD}{sid}: {title}{C_RESET}", file=sys.stderr)
print(f" {C_DIM}Perspective:{C_RESET} {persp} | {C_DIM}Confidence:{C_RESET} {confidence_bar(conf)}", file=sys.stderr)
approach = s.get("approach", "")
if approach:
short = approach[:200] + "..." if len(approach) > 200 else approach
print(f" {short}", file=sys.stderr)
tradeoffs = s.get("tradeoffs", [])
if tradeoffs:
print(f" {C_YELLOW}Tradeoffs:{C_RESET}", file=sys.stderr)
for t in tradeoffs[:4]:
bullet(t[:100], indent=6)
print(file=sys.stderr)
# ── Phase 2+: Adversarial rounds ──
all_objections = []
all_defenses = []
rounds = []
converged = False
convergence_round = 0
round_num = 1
while not converged and round_num <= max_rounds:
is_full = round_num <= 1
# Critic
mode = "full" if is_full else "targeted"
section(f"Round {round_num}: CRITIC — {mode} critique")
result, usage = run_agent(
api_key, "critic", critic_model,
critic_prompt(problem, problem_frame, solutions, not is_full,
all_objections if all_objections else None,
all_defenses if all_defenses else None))
track(f"critic-r{round_num}", critic_model, usage)
new_objections = result.get("objections", []) if result else []
# Convergence check
delta = extract_delta(new_objections, all_objections)
if not delta:
converged = True
convergence_round = round_num
print(f"\n {C_GREEN}{C_BOLD}✓ No new objections — debate converged{C_RESET}", file=sys.stderr)
rounds.append({"round": round_num, "agent": "critic", "objections": 0, "defenses": 0})
break
all_objections.extend(delta)
rounds.append({"round": round_num, "agent": "critic",
"objections": len(delta), "defenses": 0})
# Show objections
for o in delta:
oid = o.get("id", "?")
target = o.get("targetSolutionId", "?")
cat = o.get("category", "?")
sev = o.get("severity", "?")
arg = (o.get("argument") or "")[:150]
sev_color = C_RED if sev in ("CRITICAL", "HIGH") else C_YELLOW
print(f"\n {sev_color}{severity_badge(sev)}{C_RESET} {C_BOLD}{oid}{C_RESET}{target} ({cat})", file=sys.stderr)
print(f" {arg}", file=sys.stderr)
mit = o.get("suggestedMitigation")
if mit:
print(f" {C_DIM}Mitigation: {mit[:120]}{C_RESET}", file=sys.stderr)
assessment = result.get("overallAssessment", "") if result else ""
if assessment:
print(f"\n {C_BOLD}Assessment:{C_RESET} {assessment[:300]}", file=sys.stderr)
# Advocate defense
section(f"Round {round_num}: ADVOCATE — defense")
result, usage = run_agent(
api_key, "advocate", advocate_model,
advocate_defend_prompt(problem, problem_frame, solutions, delta))
track(f"advocate-r{round_num}", advocate_model, usage)
defenses = result.get("defenses", []) if result else []
if result and result.get("revisedSolutions"):
solutions = result["revisedSolutions"]
log(f"Solutions revised based on concessions")
all_defenses.extend(defenses)
rounds.append({"round": round_num, "agent": "advocate",
"objections": 0, "defenses": len(defenses)})
# Show defenses
for d in defenses:
did = d.get("objectionId", "?")
verdict = d.get("verdict", "?")
ca = (d.get("counterArgument") or "")[:150]
if verdict == "DEFENDED":
icon, color = "", C_GREEN
elif verdict == "CONCEDED":
icon, color = "", C_RED
else:
icon, color = "~", C_YELLOW
print(f"\n {color}{icon} {did}{C_RESET}{C_BOLD}{verdict}{C_RESET}", file=sys.stderr)
print(f" {ca}", file=sys.stderr)
mod = d.get("modification")
if mod:
print(f" {C_MAGENTA}Modification: {mod[:150]}{C_RESET}", file=sys.stderr)
print(file=sys.stderr)
round_num += 1
if single_pass:
break
# ── Final: Arbiter ──
section("FINAL: ARBITER — synthesis & ruling")
result, usage = run_agent(
api_key, "arbiter", arbiter_model,
arbiter_prompt(problem, problem_frame, solutions, all_objections,
all_defenses, rounds, converged, convergence_round))
track("arbiter", arbiter_model, usage)
ruling = result
# Show ruling
if ruling:
conf = ruling.get("confidence", 0)
print(f"\n {C_BOLD}Confidence:{C_RESET} {confidence_bar(conf)}", file=sys.stderr)
rec = ruling.get("recommendation", "")
if rec:
print(f"\n {C_GREEN}{C_BOLD}Recommendation:{C_RESET}", file=sys.stderr)
for i in range(0, len(rec), 76):
print(f" {rec[i:i+76]}", file=sys.stderr)
rationale = ruling.get("rationale", "")
if rationale:
print(f"\n {C_BOLD}Rationale:{C_RESET}", file=sys.stderr)
for i in range(0, len(rationale), 76):
print(f" {rationale[i:i+76]}", file=sys.stderr)
dissent = ruling.get("dissent")
if dissent:
print(f"\n {C_YELLOW}{C_BOLD}Dissent:{C_RESET} {dissent}", file=sys.stderr)
unresolved = ruling.get("unresolved", [])
if unresolved:
print(f"\n {C_BOLD}Unresolved:{C_RESET}", file=sys.stderr)
for u in unresolved:
bullet(u, indent=4)
actions = ruling.get("actionItems", [])
if actions:
print(f"\n {C_BOLD}Action items:{C_RESET}", file=sys.stderr)
for i, a in enumerate(actions, 1):
print(f" {C_GREEN}{i}.{C_RESET} {a}", file=sys.stderr)
risks = ruling.get("riskMatrix", [])
if risks:
print(f"\n {C_BOLD}Risk matrix:{C_RESET}", file=sys.stderr)
for r in risks:
risk = r.get("risk", "?")
lh = r.get("likelihood", "?")
imp = r.get("impact", "?")
sev_color = C_RED if imp in ("HIGH", "CRITICAL") else C_YELLOW
print(f" {sev_color}{C_RESET} {risk} ({C_DIM}likelihood:{C_RESET} {lh}, {C_DIM}impact:{C_RESET} {imp})", file=sys.stderr)
mit = r.get("mitigation", "")
if mit:
print(f" {C_DIM}{mit[:120]}{C_RESET}", file=sys.stderr)
print(file=sys.stderr)
duration = time.time() - start_time
total_rounds = round_num - (0 if converged else 1)
# Generate and save report
report = generate_report(
problem, problem_frame, solutions, rounds, all_objections,
all_defenses, ruling, converged, convergence_round,
total_rounds, duration, cost_data, agent_models)
os.makedirs(output_dir, exist_ok=True)
report_path = os.path.join(output_dir, "tribunal-report.md")
with open(report_path, "w") as f:
f.write(report)
# Also save raw JSON
raw_path = os.path.join(output_dir, "tribunal-data.json")
with open(raw_path, "w") as f:
json.dump({
"problem": problem_frame, "solutions": solutions,
"objections": all_objections, "defenses": all_defenses,
"ruling": ruling, "converged": converged,
"convergenceRound": convergence_round,
"totalRounds": total_rounds, "duration": duration,
"cost": cost_data, "models": agent_models,
}, f, indent=2)
# Summary
log(f"Report: {report_path}")
log(f"Data: {raw_path}")
print(file=sys.stderr)
log("Summary:")
log(f" Solutions: {len(solutions)}")
log(f" Objections: {len(all_objections)}")
log(f" Defenses: {len(all_defenses)}")
log(f" Rounds: {total_rounds}")
conv = f"yes (round {convergence_round})" if converged else "no"
log(f" Converged: {conv}")
log(f" Duration: {duration:.1f}s")
log(f" Cost: ${cost_data['total_cost']:.2f}")
if ruling:
log(f" Confidence: {ruling.get('confidence', 0)*100:.0f}%")
# ── Original consult functions ──────────────────────────────────────────
def ask(question, model=None, files=None, system_prompt=None):
cfg = load_config()
model = model or cfg.get("default_model", "google/gemini-2.5-pro")
if not is_claude_code_model(model):
api_key = cfg.get("api_key", "")
if not api_key:
print(
"Error: No API key. Set it in BTerminal Consult tab "
"or edit ~/.config/bterminal/consult.json",
file=sys.stderr,
)
sys.exit(1)
content = question or ""
if files:
for fpath in files:
try:
with open(fpath) as f:
fc = f.read()
content += f"\n\n--- {fpath} ---\n```\n{fc}\n```"
except Exception as e:
print(f"Warning: Cannot read {fpath}: {e}", file=sys.stderr)
if not sys.stdin.isatty():
stdin_data = sys.stdin.read()
if stdin_data.strip():
content += f"\n\n--- stdin ---\n```\n{stdin_data}\n```"
if not content.strip():
print("Error: No question provided.", file=sys.stderr)
sys.exit(1)
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": content})
model_name = cfg.get("models", {}).get(model, {}).get("name", model)
source = "Claude Code" if is_claude_code_model(model) else "OpenRouter"
print(f"[{model_name}{model} ({source})]", file=sys.stderr)
# Claude Code models use CLI directly
if is_claude_code_model(model):
combined = content
if system_prompt:
combined = f"{system_prompt}\n\n{content}"
try:
text, _ = call_claude_code(model, "", combined)
print(text)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
return
payload = json.dumps({"model": model, "messages": messages, "stream": True}).encode()
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/DexterFromLab/BTerminal",
"X-Title": "BTerminal Consult",
}
req = urllib.request.Request(OPENROUTER_API, data=payload, headers=headers)
try:
resp = urllib.request.urlopen(req, timeout=120)
while True:
line = resp.readline()
if not line:
break
line = line.decode("utf-8").strip()
if not line.startswith("data: "):
continue
data = line[6:]
if data == "[DONE]":
break
try:
chunk = json.loads(data)
delta = chunk["choices"][0].get("delta", {})
text = delta.get("content", "")
if text:
sys.stdout.write(text)
sys.stdout.flush()
except (json.JSONDecodeError, KeyError, IndexError):
pass
print()
except urllib.error.HTTPError as e:
body = e.read().decode()
try:
err = json.loads(body)
msg = err.get("error", {}).get("message", body)
except json.JSONDecodeError:
msg = body
print(f"Error ({e.code}): {msg}", file=sys.stderr)
sys.exit(1)
except urllib.error.URLError as e:
print(f"Connection error: {e.reason}", file=sys.stderr)
sys.exit(1)
def list_models():
cfg = load_config()
default = cfg.get("default_model", "")
models = cfg.get("models", {})
enabled = {k: v for k, v in models.items() if v.get("enabled")}
if not enabled:
print("No models enabled. Use BTerminal Consult tab to enable models.")
return
print("Enabled models:")
for mid in sorted(enabled):
info = enabled[mid]
star = " [default]" if mid == default else ""
source = info.get("source", "openrouter")
src_tag = "[CC]" if source == "claude-code" else "[OR]"
print(f" {src_tag} {mid}{info.get('name', mid)}{star}")
disabled = {k: v for k, v in models.items() if not v.get("enabled")}
if disabled:
print(f"\nDisabled models: {len(disabled)} (enable in BTerminal Consult tab)")
def print_help():
print("consult — Query external AI models via OpenRouter\n")
print("Usage:")
print(' consult "question" Ask default model')
print(' consult -m model_id "question" Ask specific model')
print(' consult -f file.py "review this code" Include file as context')
print(' cat log.txt | consult "analyze errors" Pipe stdin as context')
print(' consult -s "system prompt" "question" Custom system prompt')
print(" consult models List enabled models")
print(' consult debate "problem" Multi-model debate')
print()
print("Options:")
print(" -m, --model MODEL Model ID (default: from config)")
print(" -f, --file FILE Include file(s) as context (repeatable)")
print(" -s, --system PROMPT System prompt")
print(" -h, --help Show this help")
print()
print("Debate options:")
print(" --analyst MODEL Model for problem analysis")
print(" --advocate MODEL Model for solution generation")
print(" --critic MODEL Model for adversarial critique")
print(" --arbiter MODEL Model for final synthesis")
print(" --rounds N Max debate rounds (default: 3)")
print(" --single-pass One round only")
print(" --output DIR Output directory (default: .tribunal)")
print()
list_models()
print(f"\nConfig: {CONFIG_FILE}")
def main():
if len(sys.argv) == 1 or sys.argv[1] in ("-h", "--help"):
print_help()
sys.exit(0)
if sys.argv[1] == "models":
list_models()
return
# ── debate subcommand ──
if sys.argv[1] == "debate":
cfg = load_config()
tribunal_cfg = cfg.get("tribunal", {})
default_model = cfg.get("default_model", "google/gemini-2.5-pro")
parser = argparse.ArgumentParser(prog="consult debate", add_help=False)
parser.add_argument("_debate", nargs=1) # consume "debate"
parser.add_argument("problem", nargs="*")
parser.add_argument("--analyst", default=tribunal_cfg.get("analyst_model") or default_model)
parser.add_argument("--advocate", default=tribunal_cfg.get("advocate_model") or default_model)
parser.add_argument("--critic", default=tribunal_cfg.get("critic_model") or default_model)
parser.add_argument("--arbiter", default=tribunal_cfg.get("arbiter_model") or default_model)
parser.add_argument("--rounds", type=int, default=tribunal_cfg.get("max_rounds", 3))
parser.add_argument("--single-pass", action="store_true")
parser.add_argument("--output", default=".tribunal")
args = parser.parse_args()
problem = " ".join(args.problem) if args.problem else ""
if not problem:
print("Error: No problem statement provided.", file=sys.stderr)
print('Usage: consult debate "your problem statement"', file=sys.stderr)
sys.exit(1)
try:
debate(
problem,
analyst_model=args.analyst,
advocate_model=args.advocate,
critic_model=args.critic,
arbiter_model=args.arbiter,
max_rounds=args.rounds,
single_pass=args.single_pass,
output_dir=args.output,
)
except urllib.error.HTTPError as e:
body = e.read().decode()
try:
err = json.loads(body)
msg = err.get("error", {}).get("message", body)
except json.JSONDecodeError:
msg = body
print(f"Error ({e.code}): {msg}", file=sys.stderr)
sys.exit(1)
except urllib.error.URLError as e:
print(f"Connection error: {e.reason}", file=sys.stderr)
sys.exit(1)
return
# ── regular query ──
parser = argparse.ArgumentParser(prog="consult", add_help=False)
parser.add_argument("question", nargs="*")
parser.add_argument("-m", "--model")
parser.add_argument("-f", "--file", action="append", dest="files")
parser.add_argument("-s", "--system")
args = parser.parse_args()
question = " ".join(args.question) if args.question else ""
if not question and not args.files and sys.stdin.isatty():
parser.print_help()
sys.exit(0)
ask(question, args.model, args.files, args.system)
if __name__ == "__main__":
main()