Bring over comprehensive documentation, CLI tools, and project scaffolding from the archived v2/ branch onto the rebuilt flat main. All v2/ path references updated to match flat layout. - docs/: architecture, decisions, phases, progress, findings, etc. - docker/tempo: telemetry stack (Grafana + Tempo) - CLAUDE.md, .claude/CLAUDE.md: comprehensive project guides - CHANGELOG.md, TODO.md, README.md: project meta - consult, ctx: CLI tools - .gitignore: merged entries from both branches
1268 lines
47 KiB
Python
Executable file
1268 lines
47 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""consult — Query external AI models via OpenRouter.
|
|
|
|
Usage:
|
|
consult "question" Ask default model
|
|
consult -m google/gemini-2.5-pro "q" Ask specific model
|
|
consult -f file.py "review this code" Include file as context
|
|
cat log.txt | consult "analyze errors" Pipe stdin as context
|
|
consult models List enabled models
|
|
consult debate "problem statement" Multi-model adversarial debate
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import threading
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
|
|
CONFIG_DIR = os.path.expanduser("~/.config/bterminal")
|
|
CONFIG_FILE = os.path.join(CONFIG_DIR, "consult.json")
|
|
|
|
OPENROUTER_API = "https://openrouter.ai/api/v1/chat/completions"
|
|
OPENROUTER_MODELS_API = "https://openrouter.ai/api/v1/models"
|
|
|
|
CLAUDE_CODE_MODELS = {
|
|
"claude-code/opus": {"name": "Claude Opus 4.6", "alias": "opus", "source": "claude-code"},
|
|
"claude-code/sonnet": {"name": "Claude Sonnet 4.6", "alias": "sonnet", "source": "claude-code"},
|
|
"claude-code/haiku": {"name": "Claude Haiku 4.5", "alias": "haiku", "source": "claude-code"},
|
|
}
|
|
|
|
DEFAULT_CONFIG = {
|
|
"api_key": "",
|
|
"default_model": "google/gemini-2.5-pro",
|
|
"models": {
|
|
"google/gemini-2.5-pro": {"enabled": True, "name": "Gemini 2.5 Pro", "source": "openrouter"},
|
|
"openai/gpt-4o": {"enabled": True, "name": "GPT-4o", "source": "openrouter"},
|
|
"openai/o3-mini": {"enabled": True, "name": "o3-mini", "source": "openrouter"},
|
|
"deepseek/deepseek-r1": {"enabled": True, "name": "DeepSeek R1", "source": "openrouter"},
|
|
"anthropic/claude-sonnet-4": {"enabled": False, "name": "Claude Sonnet 4", "source": "openrouter"},
|
|
"meta-llama/llama-4-maverick": {"enabled": False, "name": "Llama 4 Maverick", "source": "openrouter"},
|
|
},
|
|
"tribunal": {
|
|
"analyst_model": "",
|
|
"advocate_model": "",
|
|
"critic_model": "",
|
|
"arbiter_model": "",
|
|
"max_rounds": 3,
|
|
},
|
|
}
|
|
|
|
|
|
def load_config():
|
|
if os.path.isfile(CONFIG_FILE):
|
|
try:
|
|
with open(CONFIG_FILE) as f:
|
|
cfg = json.load(f)
|
|
# Ensure Claude Code models exist
|
|
models = cfg.setdefault("models", {})
|
|
changed = False
|
|
for mid, info in CLAUDE_CODE_MODELS.items():
|
|
if mid not in models:
|
|
models[mid] = dict(info, enabled=False)
|
|
changed = True
|
|
if changed:
|
|
with open(CONFIG_FILE, "w") as f:
|
|
json.dump(cfg, f, indent=2)
|
|
return cfg
|
|
except (json.JSONDecodeError, IOError):
|
|
pass
|
|
os.makedirs(CONFIG_DIR, exist_ok=True)
|
|
cfg = DEFAULT_CONFIG.copy()
|
|
with open(CONFIG_FILE, "w") as f:
|
|
json.dump(cfg, f, indent=2)
|
|
return cfg
|
|
|
|
|
|
def get_api_key():
|
|
cfg = load_config()
|
|
key = cfg.get("api_key", "")
|
|
if not key:
|
|
print(
|
|
"Error: No API key. Set it in BTerminal Consult tab "
|
|
"or edit ~/.config/bterminal/consult.json",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
return key
|
|
|
|
|
|
# ── OpenRouter API call (non-streaming, JSON mode) ──────────────────────
|
|
|
|
|
|
def call_openrouter(api_key, model, system_prompt, user_prompt):
|
|
"""Single non-streaming call. Returns (text, usage_dict)."""
|
|
payload = json.dumps({
|
|
"model": model,
|
|
"messages": [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_prompt},
|
|
],
|
|
"max_tokens": 16384,
|
|
"temperature": 0.7,
|
|
}).encode()
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Content-Type": "application/json",
|
|
"HTTP-Referer": "https://github.com/DexterFromLab/BTerminal",
|
|
"X-Title": "BTerminal Consult Tribunal",
|
|
}
|
|
|
|
req = urllib.request.Request(OPENROUTER_API, data=payload, headers=headers)
|
|
resp = urllib.request.urlopen(req, timeout=300)
|
|
data = json.loads(resp.read().decode())
|
|
|
|
if "error" in data:
|
|
raise RuntimeError(data["error"].get("message", str(data["error"])))
|
|
|
|
text = data.get("choices", [{}])[0].get("message", {}).get("content", "")
|
|
usage = data.get("usage", {})
|
|
return text, usage
|
|
|
|
|
|
# ── Claude Code API call ────────────────────────────────────────────────
|
|
|
|
|
|
def is_claude_code_model(model_id):
|
|
"""Check if model ID refers to a Claude Code local model."""
|
|
return model_id.startswith("claude-code/")
|
|
|
|
|
|
def get_claude_alias(model_id):
|
|
"""Get Claude CLI alias from model ID (e.g. 'claude-code/opus' -> 'opus')."""
|
|
info = CLAUDE_CODE_MODELS.get(model_id)
|
|
if info:
|
|
return info["alias"]
|
|
return model_id.split("/", 1)[-1]
|
|
|
|
|
|
def call_claude_code(model_id, system_prompt, user_prompt):
|
|
"""Call Claude Code CLI. Returns (text, usage_dict)."""
|
|
import subprocess
|
|
|
|
alias = get_claude_alias(model_id)
|
|
full_prompt = f"{system_prompt}\n\n{user_prompt}" if system_prompt else user_prompt
|
|
|
|
env = os.environ.copy()
|
|
env.pop("CLAUDECODE", None)
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
["claude", "-p", "--model", alias, full_prompt],
|
|
capture_output=True, text=True, timeout=600, env=env,
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
raise RuntimeError(f"Claude Code timeout (10min) — prompt too large or model too slow")
|
|
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"Claude Code error: {result.stderr.strip()}")
|
|
|
|
text = result.stdout.strip()
|
|
# Claude Code CLI doesn't report token usage, estimate roughly
|
|
usage = {
|
|
"prompt_tokens": len(full_prompt) // 4,
|
|
"completion_tokens": len(text) // 4,
|
|
}
|
|
return text, usage
|
|
|
|
|
|
def call_model(api_key, model, system_prompt, user_prompt):
|
|
"""Dispatch to the right API based on model source."""
|
|
if is_claude_code_model(model):
|
|
return call_claude_code(model, system_prompt, user_prompt)
|
|
return call_openrouter(api_key, model, system_prompt, user_prompt)
|
|
|
|
|
|
# ── JSON extraction ─────────────────────────────────────────────────────
|
|
|
|
|
|
def extract_json(text):
|
|
"""Extract JSON object from LLM response (handles fences, preamble)."""
|
|
# Direct parse
|
|
try:
|
|
return json.loads(text)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Code fence
|
|
m = re.search(r"```(?:json)?\s*([\s\S]*?)```", text)
|
|
if m:
|
|
try:
|
|
return json.loads(m.group(1).strip())
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Balanced brace extraction
|
|
start = text.find("{")
|
|
if start == -1:
|
|
return None
|
|
depth = 0
|
|
in_str = False
|
|
esc = False
|
|
for i in range(start, len(text)):
|
|
ch = text[i]
|
|
if esc:
|
|
esc = False
|
|
continue
|
|
if ch == "\\" and in_str:
|
|
esc = True
|
|
continue
|
|
if ch == '"':
|
|
in_str = not in_str
|
|
continue
|
|
if in_str:
|
|
continue
|
|
if ch == "{":
|
|
depth += 1
|
|
elif ch == "}":
|
|
depth -= 1
|
|
if depth == 0:
|
|
try:
|
|
return json.loads(text[start:i + 1])
|
|
except json.JSONDecodeError:
|
|
return None
|
|
return None
|
|
|
|
|
|
def coerce(data):
|
|
"""Coerce LLM output quirks (string numbers, lowercase enums, etc.)."""
|
|
if not isinstance(data, dict):
|
|
return data
|
|
|
|
for k in ("confidence", "confidenceChange"):
|
|
if k in data and isinstance(data[k], str):
|
|
try:
|
|
data[k] = float(data[k])
|
|
except ValueError:
|
|
data[k] = 0.5
|
|
|
|
for k in ("severity", "likelihood", "impact", "category", "verdict"):
|
|
if k in data and isinstance(data[k], str):
|
|
data[k] = data[k].upper().replace(" ", "_")
|
|
|
|
for k in ("dimensions", "constraints", "criteria", "tradeoffs",
|
|
"assumptions", "unresolved", "actionItems"):
|
|
if k in data and isinstance(data[k], list):
|
|
data[k] = [
|
|
(x if isinstance(x, str)
|
|
else x.get("name", x.get("label", x.get("title",
|
|
x.get("description", json.dumps(x))))))
|
|
if isinstance(x, dict) else str(x)
|
|
for x in data[k]
|
|
]
|
|
|
|
for k in ("solutions", "objections", "defenses", "riskMatrix", "revisedSolutions"):
|
|
if k in data and isinstance(data[k], list):
|
|
data[k] = [coerce(x) if isinstance(x, dict) else x for x in data[k]]
|
|
|
|
return data
|
|
|
|
|
|
# ── Pricing ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
_pricing_cache = None
|
|
|
|
|
|
def fetch_pricing(api_key):
|
|
global _pricing_cache
|
|
if _pricing_cache is not None:
|
|
return _pricing_cache
|
|
|
|
try:
|
|
req = urllib.request.Request(
|
|
OPENROUTER_MODELS_API,
|
|
headers={"Authorization": f"Bearer {api_key}"},
|
|
)
|
|
resp = urllib.request.urlopen(req, timeout=15)
|
|
data = json.loads(resp.read().decode())
|
|
pricing = {}
|
|
for m in data.get("data", []):
|
|
p = m.get("pricing", {})
|
|
if p.get("prompt") and p.get("completion"):
|
|
pricing[m["id"]] = {
|
|
"input": float(p["prompt"]) * 1_000_000,
|
|
"output": float(p["completion"]) * 1_000_000,
|
|
}
|
|
_pricing_cache = pricing
|
|
return pricing
|
|
except Exception:
|
|
_pricing_cache = {}
|
|
return {}
|
|
|
|
|
|
def calc_cost(pricing, model, usage):
|
|
p = pricing.get(model, {"input": 3, "output": 15})
|
|
inp = usage.get("prompt_tokens", 0)
|
|
out = usage.get("completion_tokens", 0)
|
|
cost = (inp * p["input"] + out * p["output"]) / 1_000_000
|
|
return inp, out, cost
|
|
|
|
|
|
# ── Agent prompts ───────────────────────────────────────────────────────
|
|
|
|
SYSTEM_PROMPTS = {
|
|
"analyst": """You are the Analyst — the first mind to touch the problem. Your job is to decompose the problem into a structured frame that downstream agents (Advocates, Critics, Arbiter) can work from.
|
|
|
|
Your output is the shared foundation. If you miss a dimension, the entire debate will have a blind spot.
|
|
|
|
1. Restate the problem in precise, unambiguous language.
|
|
2. Identify dimensions — the axes along which solutions will vary.
|
|
3. Surface constraints — hard limits stated or implied.
|
|
4. Define success criteria — what "solved" looks like.
|
|
5. Provide context — relevant background.
|
|
6. Suggest perspectives — 2-3 distinct lenses for solution generation.
|
|
|
|
IMPORTANT: Respond with valid JSON only. No markdown code fences, no explanation outside the JSON.""",
|
|
|
|
"advocate": """You are the Advocate — a solution generator and defender.
|
|
|
|
Mode 1 (Generation): Produce 2-3 distinct approaches from different perspectives. Each solution must be genuinely different. Include: title, approach (specific enough to implement), tradeoffs (genuine downsides), assumptions (falsifiable), confidence (0-1), perspective.
|
|
|
|
Mode 2 (Defense): For each objection, respond with:
|
|
- DEFENDED: objection is wrong, provide counter-argument
|
|
- CONCEDED: objection is valid, describe how solution changes
|
|
- PARTIALLY_CONCEDED: has merit but doesn't invalidate core approach
|
|
|
|
Do NOT defend everything reflexively. Intellectual honesty makes the output more useful.
|
|
|
|
IMPORTANT: Respond with valid JSON only. No markdown code fences, no explanation outside the JSON.""",
|
|
|
|
"critic": """You are the Critic — an adversarial evaluator who stress-tests solutions. Find real flaws, not contrarian noise.
|
|
|
|
1. Identify genuine weaknesses that would cause real problems
|
|
2. Challenge hidden assumptions
|
|
3. Find edge cases where the solution breaks
|
|
4. Assess feasibility given constraints
|
|
5. Flag missing considerations
|
|
|
|
Categories: FEASIBILITY, SCALABILITY, COST, COMPLEXITY, SECURITY, MAINTAINABILITY, PERFORMANCE, UX, CORRECTNESS, MISSING_REQUIREMENT
|
|
Severity: CRITICAL, HIGH, MEDIUM, LOW
|
|
|
|
Every objection must have a concrete argument and evidence. Suggest mitigations where possible.
|
|
|
|
IMPORTANT: Respond with valid JSON only. No markdown code fences, no explanation outside the JSON.""",
|
|
|
|
"arbiter": """You are the Arbiter — the final decision-maker who synthesizes an adversarial debate into a clear, actionable recommendation.
|
|
|
|
1. Weigh all solutions, objections, and defenses
|
|
2. Assess what survived adversarial scrutiny
|
|
3. Evaluate whether concessions fatally weakened solutions or led to improvements
|
|
4. Identify the strongest path given success criteria and constraints
|
|
5. Name unresolved questions
|
|
6. Provide concrete, executable action items
|
|
|
|
Confidence: 0.9+ clear winner, 0.7-0.9 good with minor concerns, 0.5-0.7 significant uncertainty, <0.5 no clear winner.
|
|
|
|
IMPORTANT: Respond with valid JSON only. No markdown code fences, no explanation outside the JSON.""",
|
|
}
|
|
|
|
|
|
# ── User prompts ────────────────────────────────────────────────────────
|
|
|
|
|
|
def analyst_prompt(problem):
|
|
return f"""Analyze the following problem and produce a structured problem frame.
|
|
|
|
Problem: {problem}
|
|
|
|
Produce a JSON object with:
|
|
- problem: {{ statement, dimensions (array of strings), constraints (array of strings), criteria (array of strings), context (string) }}
|
|
- suggestedPerspectives: string[] (2-3 distinct perspectives)"""
|
|
|
|
|
|
def advocate_generate_prompt(problem, problem_frame, perspectives):
|
|
ctx = json.dumps({"problemFrame": problem_frame, "perspectives": perspectives}, indent=2)
|
|
return f"""Generate solutions for the following problem.
|
|
|
|
Problem: {problem}
|
|
|
|
Context:
|
|
{ctx}
|
|
|
|
Generate 2-3 distinct solutions from different perspectives. Each solution must have clear tradeoffs and explicit assumptions.
|
|
|
|
Produce a JSON object with:
|
|
- solutions: [{{ id (e.g. "S-1"), title, approach (detailed), tradeoffs (array), assumptions (array), confidence (0-1), perspective }}]"""
|
|
|
|
|
|
def advocate_defend_prompt(problem, problem_frame, solutions, objections):
|
|
ctx = json.dumps({
|
|
"problemFrame": problem_frame, "solutions": solutions,
|
|
"defenseMode": True, "objections": objections,
|
|
}, indent=2)
|
|
return f"""Defend solutions against objections.
|
|
|
|
Problem: {problem}
|
|
|
|
Context:
|
|
{ctx}
|
|
|
|
For each objection respond with DEFENDED, CONCEDED, or PARTIALLY_CONCEDED.
|
|
|
|
Produce a JSON object with:
|
|
- defenses: [{{ objectionId, verdict ("DEFENDED"|"CONCEDED"|"PARTIALLY_CONCEDED"), counterArgument, modification (string|null), confidenceChange (number -1 to +1) }}]
|
|
- revisedSolutions: (optional) updated solutions incorporating concessions"""
|
|
|
|
|
|
def critic_prompt(problem, problem_frame, solutions, targeted, prev_objections=None, prev_defenses=None):
|
|
ctx = {"problemFrame": problem_frame, "solutions": solutions, "targeted": targeted}
|
|
if prev_objections:
|
|
ctx["previousObjections"] = [
|
|
{"id": o.get("id", "?"), "targetSolutionId": o.get("targetSolutionId", "?"),
|
|
"category": o.get("category", "?"), "argument": (o.get("argument") or "")[:200]}
|
|
for o in prev_objections
|
|
]
|
|
if prev_defenses:
|
|
ctx["previousDefenses"] = [
|
|
{"objectionId": d.get("objectionId", "?"), "verdict": d.get("verdict", "?"),
|
|
"counterArgument": (d.get("counterArgument") or "")[:200]}
|
|
for d in prev_defenses
|
|
]
|
|
ctx_str = json.dumps(ctx, indent=2)
|
|
focus = ("Focus ONLY on aspects not already covered by previous objections."
|
|
if targeted else "This is a FULL critique. Evaluate every solution thoroughly.")
|
|
return f"""Critically evaluate the following solutions.
|
|
|
|
Problem: {problem}
|
|
|
|
Context:
|
|
{ctx_str}
|
|
|
|
{focus}
|
|
|
|
Produce a JSON object with:
|
|
- objections: [{{ id (e.g. "OBJ-1"), targetSolutionId, category, severity, argument, evidence (string|null), suggestedMitigation (string|null) }}]
|
|
- overallAssessment: string"""
|
|
|
|
|
|
def arbiter_prompt(problem, problem_frame, solutions, all_objections, all_defenses, rounds_summary, converged, convergence_round):
|
|
ctx = json.dumps({
|
|
"problemFrame": problem_frame, "solutions": solutions,
|
|
"allObjections": all_objections, "allDefenses": all_defenses,
|
|
"rounds": rounds_summary, "converged": converged,
|
|
"convergenceRound": convergence_round,
|
|
}, indent=2)
|
|
return f"""Synthesize the full debate and produce a final ruling.
|
|
|
|
Problem: {problem}
|
|
|
|
Context:
|
|
{ctx}
|
|
|
|
Produce a JSON object with:
|
|
- recommendation: string (specific and actionable)
|
|
- confidence: number (0-1)
|
|
- rationale: string (reference specific debate points)
|
|
- dissent: string|null
|
|
- unresolved: string[]
|
|
- actionItems: string[]
|
|
- riskMatrix: [{{ risk, likelihood, impact, mitigation }}]"""
|
|
|
|
|
|
# ── Convergence ─────────────────────────────────────────────────────────
|
|
|
|
|
|
def extract_delta(new_objections, all_objections):
|
|
existing_ids = {o["id"] for o in all_objections}
|
|
existing_args = {(o.get("argument") or "").lower()[:80] for o in all_objections}
|
|
return [
|
|
o for o in new_objections
|
|
if o["id"] not in existing_ids
|
|
and (o.get("argument") or "").lower()[:80] not in existing_args
|
|
]
|
|
|
|
|
|
# ── Report generation ───────────────────────────────────────────────────
|
|
|
|
|
|
def severity_badge(s):
|
|
return {"CRITICAL": "[C]", "HIGH": "[H]", "MEDIUM": "[M]", "LOW": "[L]"}.get(s, f"[{s}]")
|
|
|
|
|
|
def confidence_bar(c):
|
|
filled = round(c * 10)
|
|
return "\u2588" * filled + "\u2591" * (10 - filled) + f" {c*100:.0f}%"
|
|
|
|
|
|
def generate_report(problem, problem_frame, solutions, rounds, all_objections,
|
|
all_defenses, ruling, converged, convergence_round,
|
|
total_rounds, duration, cost_data, agent_models):
|
|
lines = ["# Tribunal Report", "", f"**Date:** {time.strftime('%Y-%m-%d')}", ""]
|
|
|
|
# Models used
|
|
lines.append("## Models")
|
|
lines.append("")
|
|
lines.append(f"| Role | Model |")
|
|
lines.append(f"|------|-------|")
|
|
for role in ("analyst", "advocate", "critic", "arbiter"):
|
|
lines.append(f"| {role.title()} | `{agent_models.get(role, '?')}` |")
|
|
lines.append("")
|
|
|
|
# Problem
|
|
lines.append("## Problem")
|
|
lines.append("")
|
|
lines.append(problem_frame.get("statement", problem))
|
|
dims = problem_frame.get("dimensions", [])
|
|
if dims:
|
|
lines.append(f"\n**Dimensions:** {', '.join(dims)}")
|
|
lines.append("")
|
|
|
|
# Stats
|
|
lines.append("## Debate Statistics")
|
|
lines.append("")
|
|
lines.append(f"- **Solutions proposed:** {len(solutions)}")
|
|
lines.append(f"- **Total objections:** {len(all_objections)}")
|
|
lines.append(f"- **Total defenses:** {len(all_defenses)}")
|
|
lines.append(f"- **Rounds:** {total_rounds}")
|
|
conv_str = f"yes (round {convergence_round})" if converged else "no"
|
|
lines.append(f"- **Converged:** {conv_str}")
|
|
lines.append(f"- **Duration:** {duration:.1f}s")
|
|
if cost_data:
|
|
lines.append(f"- **Estimated cost:** ${cost_data['total_cost']:.2f}")
|
|
lines.append("")
|
|
|
|
# Solutions
|
|
lines.append("## Solutions")
|
|
lines.append("")
|
|
for s in solutions:
|
|
lines.append(f"### {s.get('id', '?')}: {s.get('title', 'Untitled')}")
|
|
lines.append("")
|
|
lines.append(f"**Perspective:** {s.get('perspective', '?')} | "
|
|
f"**Confidence:** {confidence_bar(s.get('confidence', 0))}")
|
|
lines.append("")
|
|
lines.append(f"**Approach:** {s.get('approach', '?')}")
|
|
lines.append("")
|
|
tradeoffs = s.get("tradeoffs", [])
|
|
if tradeoffs:
|
|
lines.append("**Tradeoffs:**")
|
|
for t in tradeoffs:
|
|
lines.append(f"- {t}")
|
|
lines.append("")
|
|
lines.append("---")
|
|
lines.append("")
|
|
|
|
# Objections table
|
|
if all_objections:
|
|
lines.append("## Objections")
|
|
lines.append("")
|
|
lines.append("| ID | Target | Category | Severity | Argument |")
|
|
lines.append("|----|--------|----------|----------|----------|")
|
|
for o in all_objections:
|
|
arg = o.get("argument", "")[:80] + ("..." if len(o.get("argument", "")) > 80 else "")
|
|
lines.append(f"| {o['id']} | {o['targetSolutionId']} | {o['category']} | "
|
|
f"{severity_badge(o['severity'])} {o['severity']} | {arg} |")
|
|
lines.append("")
|
|
|
|
# Defenses
|
|
if all_defenses:
|
|
lines.append("## Defenses")
|
|
lines.append("")
|
|
for d in all_defenses:
|
|
v = d.get("verdict", "?")
|
|
icon = {"DEFENDED": "\u2713", "CONCEDED": "\u2717", "PARTIALLY_CONCEDED": "~"}.get(v, "?")
|
|
ca = (d.get("counterArgument") or "")[:120]
|
|
lines.append(f"- **{icon} {d['objectionId']}** \u2192 {v}: {ca}")
|
|
if d.get("modification"):
|
|
lines.append(f" - *Modification:* {d['modification'][:200]}")
|
|
lines.append("")
|
|
|
|
# Ruling
|
|
if ruling:
|
|
lines.append("## Final Ruling")
|
|
lines.append("")
|
|
lines.append(f"**Confidence:** {confidence_bar(ruling.get('confidence', 0))}")
|
|
lines.append("")
|
|
lines.append("### Recommendation")
|
|
lines.append("")
|
|
lines.append(ruling.get("recommendation", "No recommendation"))
|
|
lines.append("")
|
|
lines.append("### Rationale")
|
|
lines.append("")
|
|
lines.append(ruling.get("rationale", ""))
|
|
lines.append("")
|
|
if ruling.get("unresolved"):
|
|
lines.append("### Unresolved Issues")
|
|
lines.append("")
|
|
for u in ruling["unresolved"]:
|
|
lines.append(f"- {u}")
|
|
lines.append("")
|
|
if ruling.get("actionItems"):
|
|
lines.append("### Action Items")
|
|
lines.append("")
|
|
for i, a in enumerate(ruling["actionItems"], 1):
|
|
lines.append(f"{i}. {a}")
|
|
lines.append("")
|
|
if ruling.get("riskMatrix"):
|
|
lines.append("### Risk Matrix")
|
|
lines.append("")
|
|
lines.append("| Risk | Likelihood | Impact | Mitigation |")
|
|
lines.append("|------|-----------|--------|------------|")
|
|
for r in ruling["riskMatrix"]:
|
|
lines.append(f"| {r.get('risk','')} | {r.get('likelihood','')} | "
|
|
f"{r.get('impact','')} | {r.get('mitigation','')} |")
|
|
lines.append("")
|
|
|
|
# Cost breakdown
|
|
if cost_data and cost_data.get("by_agent"):
|
|
lines.append("## Cost Breakdown")
|
|
lines.append("")
|
|
lines.append("| Agent | Model | Calls | Tokens In | Tokens Out | Cost |")
|
|
lines.append("|-------|-------|-------|-----------|------------|------|")
|
|
for name, stats in cost_data["by_agent"].items():
|
|
lines.append(f"| {name} | `{stats['model']}` | {stats['calls']} | "
|
|
f"~{stats['input']//1000}K | ~{stats['output']//1000}K | "
|
|
f"${stats['cost']:.3f} |")
|
|
lines.append(f"\n**Total: ${cost_data['total_cost']:.2f}**")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ── Main debate pipeline ────────────────────────────────────────────────
|
|
|
|
|
|
def log(msg):
|
|
print(f"[tribunal] {msg}", file=sys.stderr)
|
|
|
|
|
|
class Spinner:
|
|
"""Animated spinner showing elapsed time during long operations."""
|
|
|
|
FRAMES = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
|
|
|
|
def __init__(self, message):
|
|
self._message = message
|
|
self._stop = threading.Event()
|
|
self._thread = None
|
|
|
|
def start(self):
|
|
self._stop.clear()
|
|
self._thread = threading.Thread(target=self._spin, daemon=True)
|
|
self._thread.start()
|
|
|
|
def _spin(self):
|
|
start = time.time()
|
|
i = 0
|
|
while not self._stop.is_set():
|
|
elapsed = time.time() - start
|
|
frame = self.FRAMES[i % len(self.FRAMES)]
|
|
sys.stderr.write(f"\r[tribunal] {self._message} {frame} {elapsed:.0f}s")
|
|
sys.stderr.flush()
|
|
i += 1
|
|
self._stop.wait(0.1)
|
|
|
|
def stop(self, final_message=None):
|
|
self._stop.set()
|
|
if self._thread:
|
|
self._thread.join()
|
|
sys.stderr.write("\r\033[K")
|
|
if final_message:
|
|
sys.stderr.write(f"[tribunal] {final_message}\n")
|
|
sys.stderr.flush()
|
|
|
|
|
|
def run_agent(api_key, role, model, user_prompt, retries=1):
|
|
"""Call agent and parse JSON response. Returns (parsed_dict, usage)."""
|
|
source = "Claude Code" if is_claude_code_model(model) else "OpenRouter"
|
|
label = f"{role.title()} ({model} [{source}])"
|
|
|
|
for attempt in range(retries + 1):
|
|
spinner = Spinner(label)
|
|
spinner.start()
|
|
start = time.time()
|
|
try:
|
|
text, usage = call_model(api_key, model, SYSTEM_PROMPTS[role], user_prompt)
|
|
except RuntimeError as e:
|
|
elapsed = time.time() - start
|
|
if attempt < retries:
|
|
spinner.stop(f"{role.title()} FAILED ({elapsed:.1f}s) — retrying...")
|
|
continue
|
|
spinner.stop(f"{role.title()} FAILED ({elapsed:.1f}s) — {e}")
|
|
return None, {"prompt_tokens": 0, "completion_tokens": 0}
|
|
except Exception:
|
|
elapsed = time.time() - start
|
|
spinner.stop(f"{role.title()} FAILED ({elapsed:.1f}s)")
|
|
raise
|
|
elapsed = time.time() - start
|
|
spinner.stop(f"{role.title()} done ({elapsed:.1f}s)")
|
|
break
|
|
|
|
parsed = extract_json(text)
|
|
if parsed is None:
|
|
log(f"WARNING: {role} returned no parseable JSON")
|
|
return None, usage
|
|
return coerce(parsed), usage
|
|
|
|
|
|
def debate(problem, analyst_model, advocate_model, critic_model, arbiter_model,
|
|
max_rounds=3, single_pass=False, output_dir=".tribunal"):
|
|
all_models = [analyst_model, advocate_model, critic_model, arbiter_model]
|
|
needs_openrouter = any(not is_claude_code_model(m) for m in all_models)
|
|
api_key = get_api_key() if needs_openrouter else ""
|
|
start_time = time.time()
|
|
|
|
# Resolve models
|
|
agent_models = {
|
|
"analyst": analyst_model,
|
|
"advocate": advocate_model,
|
|
"critic": critic_model,
|
|
"arbiter": arbiter_model,
|
|
}
|
|
|
|
print(file=sys.stderr)
|
|
log("Multi-model adversarial debate")
|
|
log(f"Analyst: {analyst_model}")
|
|
log(f"Advocate: {advocate_model}")
|
|
log(f"Critic: {critic_model}")
|
|
log(f"Arbiter: {arbiter_model}")
|
|
log(f"Max rounds: {max_rounds}")
|
|
print(file=sys.stderr)
|
|
|
|
# Fetch pricing
|
|
pricing = fetch_pricing(api_key)
|
|
|
|
cost_data = {"total_input": 0, "total_output": 0, "total_cost": 0, "by_agent": {}}
|
|
|
|
def track(agent_name, model, usage):
|
|
inp, out, cost = calc_cost(pricing, model, usage)
|
|
cost_data["total_input"] += inp
|
|
cost_data["total_output"] += out
|
|
cost_data["total_cost"] += cost
|
|
if agent_name not in cost_data["by_agent"]:
|
|
cost_data["by_agent"][agent_name] = {"input": 0, "output": 0, "cost": 0, "calls": 0, "model": model}
|
|
a = cost_data["by_agent"][agent_name]
|
|
a["input"] += inp
|
|
a["output"] += out
|
|
a["cost"] += cost
|
|
a["calls"] += 1
|
|
|
|
# ── ANSI colors ──
|
|
C_BOLD = "\033[1m"
|
|
C_DIM = "\033[2m"
|
|
C_CYAN = "\033[36m"
|
|
C_GREEN = "\033[32m"
|
|
C_YELLOW = "\033[33m"
|
|
C_RED = "\033[31m"
|
|
C_MAGENTA = "\033[35m"
|
|
C_RESET = "\033[0m"
|
|
|
|
def section(title):
|
|
width = 60
|
|
print(f"\n{C_CYAN}{C_BOLD}{'─' * width}", file=sys.stderr)
|
|
print(f" {title}", file=sys.stderr)
|
|
print(f"{'─' * width}{C_RESET}\n", file=sys.stderr)
|
|
|
|
def bullet(text, indent=2):
|
|
print(f"{' ' * indent}{C_DIM}•{C_RESET} {text}", file=sys.stderr)
|
|
|
|
# ── Phase 0: Analyst ──
|
|
section("Phase 0: ANALYST — framing the problem")
|
|
result, usage = run_agent(api_key, "analyst", analyst_model, analyst_prompt(problem))
|
|
track("analyst", analyst_model, usage)
|
|
|
|
if result and "problem" in result:
|
|
problem_frame = result["problem"]
|
|
perspectives = result.get("suggestedPerspectives", ["pragmatist", "innovator"])
|
|
else:
|
|
log("Analyst parse failed, using minimal frame")
|
|
problem_frame = {"statement": problem, "dimensions": ["feasibility", "complexity"],
|
|
"constraints": [], "criteria": ["correctness"], "context": ""}
|
|
perspectives = ["pragmatist", "innovator"]
|
|
|
|
# Show analyst output
|
|
stmt = problem_frame.get("statement", problem)
|
|
print(f"\n {C_BOLD}Problem frame:{C_RESET}", file=sys.stderr)
|
|
# Wrap long statement
|
|
for i in range(0, len(stmt), 76):
|
|
print(f" {stmt[i:i+76]}", file=sys.stderr)
|
|
|
|
dims = problem_frame.get("dimensions", [])
|
|
if dims:
|
|
print(f"\n {C_BOLD}Dimensions ({len(dims)}):{C_RESET}", file=sys.stderr)
|
|
for d in dims:
|
|
label = d[:90] + "..." if len(d) > 90 else d
|
|
bullet(label, indent=4)
|
|
|
|
constraints = problem_frame.get("constraints", [])
|
|
if constraints:
|
|
print(f"\n {C_BOLD}Constraints ({len(constraints)}):{C_RESET}", file=sys.stderr)
|
|
for c in constraints:
|
|
label = c[:90] + "..." if len(c) > 90 else c
|
|
bullet(label, indent=4)
|
|
|
|
criteria = problem_frame.get("criteria", [])
|
|
if criteria:
|
|
print(f"\n {C_BOLD}Success criteria ({len(criteria)}):{C_RESET}", file=sys.stderr)
|
|
for c in criteria:
|
|
label = c[:90] + "..." if len(c) > 90 else c
|
|
bullet(label, indent=4)
|
|
|
|
print(f"\n {C_BOLD}Perspectives:{C_RESET} {', '.join(perspectives)}", file=sys.stderr)
|
|
print(file=sys.stderr)
|
|
|
|
# ── Phase 1: Advocate generates solutions ──
|
|
section("Phase 1: ADVOCATE — generating solutions")
|
|
result, usage = run_agent(api_key, "advocate", advocate_model,
|
|
advocate_generate_prompt(problem, problem_frame, perspectives))
|
|
track("advocate", advocate_model, usage)
|
|
|
|
solutions = result.get("solutions", []) if result else []
|
|
|
|
if not solutions:
|
|
log("ERROR: No solutions generated, aborting")
|
|
return
|
|
|
|
# Show solutions
|
|
for s in solutions:
|
|
sid = s.get("id", "?")
|
|
title = s.get("title", "Untitled")
|
|
conf = s.get("confidence", 0)
|
|
persp = s.get("perspective", "?")
|
|
print(f"\n {C_GREEN}{C_BOLD}{sid}: {title}{C_RESET}", file=sys.stderr)
|
|
print(f" {C_DIM}Perspective:{C_RESET} {persp} | {C_DIM}Confidence:{C_RESET} {confidence_bar(conf)}", file=sys.stderr)
|
|
approach = s.get("approach", "")
|
|
if approach:
|
|
short = approach[:200] + "..." if len(approach) > 200 else approach
|
|
print(f" {short}", file=sys.stderr)
|
|
tradeoffs = s.get("tradeoffs", [])
|
|
if tradeoffs:
|
|
print(f" {C_YELLOW}Tradeoffs:{C_RESET}", file=sys.stderr)
|
|
for t in tradeoffs[:4]:
|
|
bullet(t[:100], indent=6)
|
|
print(file=sys.stderr)
|
|
|
|
# ── Phase 2+: Adversarial rounds ──
|
|
all_objections = []
|
|
all_defenses = []
|
|
rounds = []
|
|
converged = False
|
|
convergence_round = 0
|
|
round_num = 1
|
|
|
|
while not converged and round_num <= max_rounds:
|
|
is_full = round_num <= 1
|
|
|
|
# Critic
|
|
mode = "full" if is_full else "targeted"
|
|
section(f"Round {round_num}: CRITIC — {mode} critique")
|
|
result, usage = run_agent(
|
|
api_key, "critic", critic_model,
|
|
critic_prompt(problem, problem_frame, solutions, not is_full,
|
|
all_objections if all_objections else None,
|
|
all_defenses if all_defenses else None))
|
|
track(f"critic-r{round_num}", critic_model, usage)
|
|
|
|
new_objections = result.get("objections", []) if result else []
|
|
|
|
# Convergence check
|
|
delta = extract_delta(new_objections, all_objections)
|
|
if not delta:
|
|
converged = True
|
|
convergence_round = round_num
|
|
print(f"\n {C_GREEN}{C_BOLD}✓ No new objections — debate converged{C_RESET}", file=sys.stderr)
|
|
rounds.append({"round": round_num, "agent": "critic", "objections": 0, "defenses": 0})
|
|
break
|
|
|
|
all_objections.extend(delta)
|
|
rounds.append({"round": round_num, "agent": "critic",
|
|
"objections": len(delta), "defenses": 0})
|
|
|
|
# Show objections
|
|
for o in delta:
|
|
oid = o.get("id", "?")
|
|
target = o.get("targetSolutionId", "?")
|
|
cat = o.get("category", "?")
|
|
sev = o.get("severity", "?")
|
|
arg = (o.get("argument") or "")[:150]
|
|
sev_color = C_RED if sev in ("CRITICAL", "HIGH") else C_YELLOW
|
|
print(f"\n {sev_color}{severity_badge(sev)}{C_RESET} {C_BOLD}{oid}{C_RESET} → {target} ({cat})", file=sys.stderr)
|
|
print(f" {arg}", file=sys.stderr)
|
|
mit = o.get("suggestedMitigation")
|
|
if mit:
|
|
print(f" {C_DIM}Mitigation: {mit[:120]}{C_RESET}", file=sys.stderr)
|
|
|
|
assessment = result.get("overallAssessment", "") if result else ""
|
|
if assessment:
|
|
print(f"\n {C_BOLD}Assessment:{C_RESET} {assessment[:300]}", file=sys.stderr)
|
|
|
|
# Advocate defense
|
|
section(f"Round {round_num}: ADVOCATE — defense")
|
|
result, usage = run_agent(
|
|
api_key, "advocate", advocate_model,
|
|
advocate_defend_prompt(problem, problem_frame, solutions, delta))
|
|
track(f"advocate-r{round_num}", advocate_model, usage)
|
|
|
|
defenses = result.get("defenses", []) if result else []
|
|
if result and result.get("revisedSolutions"):
|
|
solutions = result["revisedSolutions"]
|
|
log(f"Solutions revised based on concessions")
|
|
all_defenses.extend(defenses)
|
|
rounds.append({"round": round_num, "agent": "advocate",
|
|
"objections": 0, "defenses": len(defenses)})
|
|
|
|
# Show defenses
|
|
for d in defenses:
|
|
did = d.get("objectionId", "?")
|
|
verdict = d.get("verdict", "?")
|
|
ca = (d.get("counterArgument") or "")[:150]
|
|
if verdict == "DEFENDED":
|
|
icon, color = "✓", C_GREEN
|
|
elif verdict == "CONCEDED":
|
|
icon, color = "✗", C_RED
|
|
else:
|
|
icon, color = "~", C_YELLOW
|
|
print(f"\n {color}{icon} {did}{C_RESET} → {C_BOLD}{verdict}{C_RESET}", file=sys.stderr)
|
|
print(f" {ca}", file=sys.stderr)
|
|
mod = d.get("modification")
|
|
if mod:
|
|
print(f" {C_MAGENTA}Modification: {mod[:150]}{C_RESET}", file=sys.stderr)
|
|
|
|
print(file=sys.stderr)
|
|
round_num += 1
|
|
if single_pass:
|
|
break
|
|
|
|
# ── Final: Arbiter ──
|
|
section("FINAL: ARBITER — synthesis & ruling")
|
|
result, usage = run_agent(
|
|
api_key, "arbiter", arbiter_model,
|
|
arbiter_prompt(problem, problem_frame, solutions, all_objections,
|
|
all_defenses, rounds, converged, convergence_round))
|
|
track("arbiter", arbiter_model, usage)
|
|
|
|
ruling = result
|
|
|
|
# Show ruling
|
|
if ruling:
|
|
conf = ruling.get("confidence", 0)
|
|
print(f"\n {C_BOLD}Confidence:{C_RESET} {confidence_bar(conf)}", file=sys.stderr)
|
|
|
|
rec = ruling.get("recommendation", "")
|
|
if rec:
|
|
print(f"\n {C_GREEN}{C_BOLD}Recommendation:{C_RESET}", file=sys.stderr)
|
|
for i in range(0, len(rec), 76):
|
|
print(f" {rec[i:i+76]}", file=sys.stderr)
|
|
|
|
rationale = ruling.get("rationale", "")
|
|
if rationale:
|
|
print(f"\n {C_BOLD}Rationale:{C_RESET}", file=sys.stderr)
|
|
for i in range(0, len(rationale), 76):
|
|
print(f" {rationale[i:i+76]}", file=sys.stderr)
|
|
|
|
dissent = ruling.get("dissent")
|
|
if dissent:
|
|
print(f"\n {C_YELLOW}{C_BOLD}Dissent:{C_RESET} {dissent}", file=sys.stderr)
|
|
|
|
unresolved = ruling.get("unresolved", [])
|
|
if unresolved:
|
|
print(f"\n {C_BOLD}Unresolved:{C_RESET}", file=sys.stderr)
|
|
for u in unresolved:
|
|
bullet(u, indent=4)
|
|
|
|
actions = ruling.get("actionItems", [])
|
|
if actions:
|
|
print(f"\n {C_BOLD}Action items:{C_RESET}", file=sys.stderr)
|
|
for i, a in enumerate(actions, 1):
|
|
print(f" {C_GREEN}{i}.{C_RESET} {a}", file=sys.stderr)
|
|
|
|
risks = ruling.get("riskMatrix", [])
|
|
if risks:
|
|
print(f"\n {C_BOLD}Risk matrix:{C_RESET}", file=sys.stderr)
|
|
for r in risks:
|
|
risk = r.get("risk", "?")
|
|
lh = r.get("likelihood", "?")
|
|
imp = r.get("impact", "?")
|
|
sev_color = C_RED if imp in ("HIGH", "CRITICAL") else C_YELLOW
|
|
print(f" {sev_color}▸{C_RESET} {risk} ({C_DIM}likelihood:{C_RESET} {lh}, {C_DIM}impact:{C_RESET} {imp})", file=sys.stderr)
|
|
mit = r.get("mitigation", "")
|
|
if mit:
|
|
print(f" {C_DIM}→ {mit[:120]}{C_RESET}", file=sys.stderr)
|
|
|
|
print(file=sys.stderr)
|
|
|
|
duration = time.time() - start_time
|
|
total_rounds = round_num - (0 if converged else 1)
|
|
|
|
# Generate and save report
|
|
report = generate_report(
|
|
problem, problem_frame, solutions, rounds, all_objections,
|
|
all_defenses, ruling, converged, convergence_round,
|
|
total_rounds, duration, cost_data, agent_models)
|
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
report_path = os.path.join(output_dir, "tribunal-report.md")
|
|
with open(report_path, "w") as f:
|
|
f.write(report)
|
|
|
|
# Also save raw JSON
|
|
raw_path = os.path.join(output_dir, "tribunal-data.json")
|
|
with open(raw_path, "w") as f:
|
|
json.dump({
|
|
"problem": problem_frame, "solutions": solutions,
|
|
"objections": all_objections, "defenses": all_defenses,
|
|
"ruling": ruling, "converged": converged,
|
|
"convergenceRound": convergence_round,
|
|
"totalRounds": total_rounds, "duration": duration,
|
|
"cost": cost_data, "models": agent_models,
|
|
}, f, indent=2)
|
|
|
|
# Summary
|
|
log(f"Report: {report_path}")
|
|
log(f"Data: {raw_path}")
|
|
print(file=sys.stderr)
|
|
log("Summary:")
|
|
log(f" Solutions: {len(solutions)}")
|
|
log(f" Objections: {len(all_objections)}")
|
|
log(f" Defenses: {len(all_defenses)}")
|
|
log(f" Rounds: {total_rounds}")
|
|
conv = f"yes (round {convergence_round})" if converged else "no"
|
|
log(f" Converged: {conv}")
|
|
log(f" Duration: {duration:.1f}s")
|
|
log(f" Cost: ${cost_data['total_cost']:.2f}")
|
|
if ruling:
|
|
log(f" Confidence: {ruling.get('confidence', 0)*100:.0f}%")
|
|
|
|
|
|
# ── Original consult functions ──────────────────────────────────────────
|
|
|
|
|
|
def ask(question, model=None, files=None, system_prompt=None):
|
|
cfg = load_config()
|
|
model = model or cfg.get("default_model", "google/gemini-2.5-pro")
|
|
|
|
if not is_claude_code_model(model):
|
|
api_key = cfg.get("api_key", "")
|
|
if not api_key:
|
|
print(
|
|
"Error: No API key. Set it in BTerminal Consult tab "
|
|
"or edit ~/.config/bterminal/consult.json",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
content = question or ""
|
|
|
|
if files:
|
|
for fpath in files:
|
|
try:
|
|
with open(fpath) as f:
|
|
fc = f.read()
|
|
content += f"\n\n--- {fpath} ---\n```\n{fc}\n```"
|
|
except Exception as e:
|
|
print(f"Warning: Cannot read {fpath}: {e}", file=sys.stderr)
|
|
|
|
if not sys.stdin.isatty():
|
|
stdin_data = sys.stdin.read()
|
|
if stdin_data.strip():
|
|
content += f"\n\n--- stdin ---\n```\n{stdin_data}\n```"
|
|
|
|
if not content.strip():
|
|
print("Error: No question provided.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
messages = []
|
|
if system_prompt:
|
|
messages.append({"role": "system", "content": system_prompt})
|
|
messages.append({"role": "user", "content": content})
|
|
|
|
model_name = cfg.get("models", {}).get(model, {}).get("name", model)
|
|
source = "Claude Code" if is_claude_code_model(model) else "OpenRouter"
|
|
print(f"[{model_name} — {model} ({source})]", file=sys.stderr)
|
|
|
|
# Claude Code models use CLI directly
|
|
if is_claude_code_model(model):
|
|
combined = content
|
|
if system_prompt:
|
|
combined = f"{system_prompt}\n\n{content}"
|
|
try:
|
|
text, _ = call_claude_code(model, "", combined)
|
|
print(text)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
return
|
|
|
|
payload = json.dumps({"model": model, "messages": messages, "stream": True}).encode()
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Content-Type": "application/json",
|
|
"HTTP-Referer": "https://github.com/DexterFromLab/BTerminal",
|
|
"X-Title": "BTerminal Consult",
|
|
}
|
|
|
|
req = urllib.request.Request(OPENROUTER_API, data=payload, headers=headers)
|
|
|
|
try:
|
|
resp = urllib.request.urlopen(req, timeout=120)
|
|
while True:
|
|
line = resp.readline()
|
|
if not line:
|
|
break
|
|
line = line.decode("utf-8").strip()
|
|
if not line.startswith("data: "):
|
|
continue
|
|
data = line[6:]
|
|
if data == "[DONE]":
|
|
break
|
|
try:
|
|
chunk = json.loads(data)
|
|
delta = chunk["choices"][0].get("delta", {})
|
|
text = delta.get("content", "")
|
|
if text:
|
|
sys.stdout.write(text)
|
|
sys.stdout.flush()
|
|
except (json.JSONDecodeError, KeyError, IndexError):
|
|
pass
|
|
print()
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode()
|
|
try:
|
|
err = json.loads(body)
|
|
msg = err.get("error", {}).get("message", body)
|
|
except json.JSONDecodeError:
|
|
msg = body
|
|
print(f"Error ({e.code}): {msg}", file=sys.stderr)
|
|
sys.exit(1)
|
|
except urllib.error.URLError as e:
|
|
print(f"Connection error: {e.reason}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def list_models():
|
|
cfg = load_config()
|
|
default = cfg.get("default_model", "")
|
|
models = cfg.get("models", {})
|
|
enabled = {k: v for k, v in models.items() if v.get("enabled")}
|
|
|
|
if not enabled:
|
|
print("No models enabled. Use BTerminal Consult tab to enable models.")
|
|
return
|
|
|
|
print("Enabled models:")
|
|
for mid in sorted(enabled):
|
|
info = enabled[mid]
|
|
star = " [default]" if mid == default else ""
|
|
source = info.get("source", "openrouter")
|
|
src_tag = "[CC]" if source == "claude-code" else "[OR]"
|
|
print(f" {src_tag} {mid} — {info.get('name', mid)}{star}")
|
|
|
|
disabled = {k: v for k, v in models.items() if not v.get("enabled")}
|
|
if disabled:
|
|
print(f"\nDisabled models: {len(disabled)} (enable in BTerminal Consult tab)")
|
|
|
|
|
|
def print_help():
|
|
print("consult — Query external AI models via OpenRouter\n")
|
|
print("Usage:")
|
|
print(' consult "question" Ask default model')
|
|
print(' consult -m model_id "question" Ask specific model')
|
|
print(' consult -f file.py "review this code" Include file as context')
|
|
print(' cat log.txt | consult "analyze errors" Pipe stdin as context')
|
|
print(' consult -s "system prompt" "question" Custom system prompt')
|
|
print(" consult models List enabled models")
|
|
print(' consult debate "problem" Multi-model debate')
|
|
print()
|
|
print("Options:")
|
|
print(" -m, --model MODEL Model ID (default: from config)")
|
|
print(" -f, --file FILE Include file(s) as context (repeatable)")
|
|
print(" -s, --system PROMPT System prompt")
|
|
print(" -h, --help Show this help")
|
|
print()
|
|
print("Debate options:")
|
|
print(" --analyst MODEL Model for problem analysis")
|
|
print(" --advocate MODEL Model for solution generation")
|
|
print(" --critic MODEL Model for adversarial critique")
|
|
print(" --arbiter MODEL Model for final synthesis")
|
|
print(" --rounds N Max debate rounds (default: 3)")
|
|
print(" --single-pass One round only")
|
|
print(" --output DIR Output directory (default: .tribunal)")
|
|
print()
|
|
list_models()
|
|
print(f"\nConfig: {CONFIG_FILE}")
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) == 1 or sys.argv[1] in ("-h", "--help"):
|
|
print_help()
|
|
sys.exit(0)
|
|
|
|
if sys.argv[1] == "models":
|
|
list_models()
|
|
return
|
|
|
|
# ── debate subcommand ──
|
|
if sys.argv[1] == "debate":
|
|
cfg = load_config()
|
|
tribunal_cfg = cfg.get("tribunal", {})
|
|
default_model = cfg.get("default_model", "google/gemini-2.5-pro")
|
|
|
|
parser = argparse.ArgumentParser(prog="consult debate", add_help=False)
|
|
parser.add_argument("_debate", nargs=1) # consume "debate"
|
|
parser.add_argument("problem", nargs="*")
|
|
parser.add_argument("--analyst", default=tribunal_cfg.get("analyst_model") or default_model)
|
|
parser.add_argument("--advocate", default=tribunal_cfg.get("advocate_model") or default_model)
|
|
parser.add_argument("--critic", default=tribunal_cfg.get("critic_model") or default_model)
|
|
parser.add_argument("--arbiter", default=tribunal_cfg.get("arbiter_model") or default_model)
|
|
parser.add_argument("--rounds", type=int, default=tribunal_cfg.get("max_rounds", 3))
|
|
parser.add_argument("--single-pass", action="store_true")
|
|
parser.add_argument("--output", default=".tribunal")
|
|
|
|
args = parser.parse_args()
|
|
problem = " ".join(args.problem) if args.problem else ""
|
|
|
|
if not problem:
|
|
print("Error: No problem statement provided.", file=sys.stderr)
|
|
print('Usage: consult debate "your problem statement"', file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
try:
|
|
debate(
|
|
problem,
|
|
analyst_model=args.analyst,
|
|
advocate_model=args.advocate,
|
|
critic_model=args.critic,
|
|
arbiter_model=args.arbiter,
|
|
max_rounds=args.rounds,
|
|
single_pass=args.single_pass,
|
|
output_dir=args.output,
|
|
)
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode()
|
|
try:
|
|
err = json.loads(body)
|
|
msg = err.get("error", {}).get("message", body)
|
|
except json.JSONDecodeError:
|
|
msg = body
|
|
print(f"Error ({e.code}): {msg}", file=sys.stderr)
|
|
sys.exit(1)
|
|
except urllib.error.URLError as e:
|
|
print(f"Connection error: {e.reason}", file=sys.stderr)
|
|
sys.exit(1)
|
|
return
|
|
|
|
# ── regular query ──
|
|
parser = argparse.ArgumentParser(prog="consult", add_help=False)
|
|
parser.add_argument("question", nargs="*")
|
|
parser.add_argument("-m", "--model")
|
|
parser.add_argument("-f", "--file", action="append", dest="files")
|
|
parser.add_argument("-s", "--system")
|
|
|
|
args = parser.parse_args()
|
|
question = " ".join(args.question) if args.question else ""
|
|
|
|
if not question and not args.files and sys.stdin.isatty():
|
|
parser.print_help()
|
|
sys.exit(0)
|
|
|
|
ask(question, args.model, args.files, args.system)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|