#!/usr/bin/env python3 """consult — Query external AI models via OpenRouter. Usage: consult "question" Ask default model consult -m google/gemini-2.5-pro "q" Ask specific model consult -f file.py "review this code" Include file as context cat log.txt | consult "analyze errors" Pipe stdin as context consult models List enabled models consult debate "problem statement" Multi-model adversarial debate """ import argparse import json import os import re import sys import threading import time import urllib.error import urllib.request CONFIG_DIR = os.path.expanduser("~/.config/bterminal") CONFIG_FILE = os.path.join(CONFIG_DIR, "consult.json") OPENROUTER_API = "https://openrouter.ai/api/v1/chat/completions" OPENROUTER_MODELS_API = "https://openrouter.ai/api/v1/models" CLAUDE_CODE_MODELS = { "claude-code/opus": {"name": "Claude Opus 4.6", "alias": "opus", "source": "claude-code"}, "claude-code/sonnet": {"name": "Claude Sonnet 4.6", "alias": "sonnet", "source": "claude-code"}, "claude-code/haiku": {"name": "Claude Haiku 4.5", "alias": "haiku", "source": "claude-code"}, } DEFAULT_CONFIG = { "api_key": "", "default_model": "google/gemini-2.5-pro", "models": { "google/gemini-2.5-pro": {"enabled": True, "name": "Gemini 2.5 Pro", "source": "openrouter"}, "openai/gpt-4o": {"enabled": True, "name": "GPT-4o", "source": "openrouter"}, "openai/o3-mini": {"enabled": True, "name": "o3-mini", "source": "openrouter"}, "deepseek/deepseek-r1": {"enabled": True, "name": "DeepSeek R1", "source": "openrouter"}, "anthropic/claude-sonnet-4": {"enabled": False, "name": "Claude Sonnet 4", "source": "openrouter"}, "meta-llama/llama-4-maverick": {"enabled": False, "name": "Llama 4 Maverick", "source": "openrouter"}, }, "tribunal": { "analyst_model": "", "advocate_model": "", "critic_model": "", "arbiter_model": "", "max_rounds": 3, }, } def load_config(): if os.path.isfile(CONFIG_FILE): try: with open(CONFIG_FILE) as f: cfg = json.load(f) # Ensure Claude Code models exist models = cfg.setdefault("models", {}) changed = False for mid, info in CLAUDE_CODE_MODELS.items(): if mid not in models: models[mid] = dict(info, enabled=False) changed = True if changed: with open(CONFIG_FILE, "w") as f: json.dump(cfg, f, indent=2) return cfg except (json.JSONDecodeError, IOError): pass os.makedirs(CONFIG_DIR, exist_ok=True) cfg = DEFAULT_CONFIG.copy() with open(CONFIG_FILE, "w") as f: json.dump(cfg, f, indent=2) return cfg def get_api_key(): cfg = load_config() key = cfg.get("api_key", "") if not key: print( "Error: No API key. Set it in BTerminal Consult tab " "or edit ~/.config/bterminal/consult.json", file=sys.stderr, ) sys.exit(1) return key # ── OpenRouter API call (non-streaming, JSON mode) ────────────────────── def call_openrouter(api_key, model, system_prompt, user_prompt): """Single non-streaming call. Returns (text, usage_dict).""" payload = json.dumps({ "model": model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], "max_tokens": 16384, "temperature": 0.7, }).encode() headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "HTTP-Referer": "https://github.com/DexterFromLab/BTerminal", "X-Title": "BTerminal Consult Tribunal", } req = urllib.request.Request(OPENROUTER_API, data=payload, headers=headers) resp = urllib.request.urlopen(req, timeout=300) data = json.loads(resp.read().decode()) if "error" in data: raise RuntimeError(data["error"].get("message", str(data["error"]))) text = data.get("choices", [{}])[0].get("message", {}).get("content", "") usage = data.get("usage", {}) return text, usage # ── Claude Code API call ──────────────────────────────────────────────── def is_claude_code_model(model_id): """Check if model ID refers to a Claude Code local model.""" return model_id.startswith("claude-code/") def get_claude_alias(model_id): """Get Claude CLI alias from model ID (e.g. 'claude-code/opus' -> 'opus').""" info = CLAUDE_CODE_MODELS.get(model_id) if info: return info["alias"] return model_id.split("/", 1)[-1] def call_claude_code(model_id, system_prompt, user_prompt): """Call Claude Code CLI. Returns (text, usage_dict).""" import subprocess alias = get_claude_alias(model_id) full_prompt = f"{system_prompt}\n\n{user_prompt}" if system_prompt else user_prompt env = os.environ.copy() env.pop("CLAUDECODE", None) try: result = subprocess.run( ["claude", "-p", "--model", alias, full_prompt], capture_output=True, text=True, timeout=600, env=env, ) except subprocess.TimeoutExpired: raise RuntimeError(f"Claude Code timeout (10min) — prompt too large or model too slow") if result.returncode != 0: raise RuntimeError(f"Claude Code error: {result.stderr.strip()}") text = result.stdout.strip() # Claude Code CLI doesn't report token usage, estimate roughly usage = { "prompt_tokens": len(full_prompt) // 4, "completion_tokens": len(text) // 4, } return text, usage def call_model(api_key, model, system_prompt, user_prompt): """Dispatch to the right API based on model source.""" if is_claude_code_model(model): return call_claude_code(model, system_prompt, user_prompt) return call_openrouter(api_key, model, system_prompt, user_prompt) # ── JSON extraction ───────────────────────────────────────────────────── def extract_json(text): """Extract JSON object from LLM response (handles fences, preamble).""" # Direct parse try: return json.loads(text) except json.JSONDecodeError: pass # Code fence m = re.search(r"```(?:json)?\s*([\s\S]*?)```", text) if m: try: return json.loads(m.group(1).strip()) except json.JSONDecodeError: pass # Balanced brace extraction start = text.find("{") if start == -1: return None depth = 0 in_str = False esc = False for i in range(start, len(text)): ch = text[i] if esc: esc = False continue if ch == "\\" and in_str: esc = True continue if ch == '"': in_str = not in_str continue if in_str: continue if ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: try: return json.loads(text[start:i + 1]) except json.JSONDecodeError: return None return None def coerce(data): """Coerce LLM output quirks (string numbers, lowercase enums, etc.).""" if not isinstance(data, dict): return data for k in ("confidence", "confidenceChange"): if k in data and isinstance(data[k], str): try: data[k] = float(data[k]) except ValueError: data[k] = 0.5 for k in ("severity", "likelihood", "impact", "category", "verdict"): if k in data and isinstance(data[k], str): data[k] = data[k].upper().replace(" ", "_") for k in ("dimensions", "constraints", "criteria", "tradeoffs", "assumptions", "unresolved", "actionItems"): if k in data and isinstance(data[k], list): data[k] = [ (x if isinstance(x, str) else x.get("name", x.get("label", x.get("title", x.get("description", json.dumps(x)))))) if isinstance(x, dict) else str(x) for x in data[k] ] for k in ("solutions", "objections", "defenses", "riskMatrix", "revisedSolutions"): if k in data and isinstance(data[k], list): data[k] = [coerce(x) if isinstance(x, dict) else x for x in data[k]] return data # ── Pricing ───────────────────────────────────────────────────────────── _pricing_cache = None def fetch_pricing(api_key): global _pricing_cache if _pricing_cache is not None: return _pricing_cache try: req = urllib.request.Request( OPENROUTER_MODELS_API, headers={"Authorization": f"Bearer {api_key}"}, ) resp = urllib.request.urlopen(req, timeout=15) data = json.loads(resp.read().decode()) pricing = {} for m in data.get("data", []): p = m.get("pricing", {}) if p.get("prompt") and p.get("completion"): pricing[m["id"]] = { "input": float(p["prompt"]) * 1_000_000, "output": float(p["completion"]) * 1_000_000, } _pricing_cache = pricing return pricing except Exception: _pricing_cache = {} return {} def calc_cost(pricing, model, usage): p = pricing.get(model, {"input": 3, "output": 15}) inp = usage.get("prompt_tokens", 0) out = usage.get("completion_tokens", 0) cost = (inp * p["input"] + out * p["output"]) / 1_000_000 return inp, out, cost # ── Agent prompts ─────────────────────────────────────────────────────── SYSTEM_PROMPTS = { "analyst": """You are the Analyst — the first mind to touch the problem. Your job is to decompose the problem into a structured frame that downstream agents (Advocates, Critics, Arbiter) can work from. Your output is the shared foundation. If you miss a dimension, the entire debate will have a blind spot. 1. Restate the problem in precise, unambiguous language. 2. Identify dimensions — the axes along which solutions will vary. 3. Surface constraints — hard limits stated or implied. 4. Define success criteria — what "solved" looks like. 5. Provide context — relevant background. 6. Suggest perspectives — 2-3 distinct lenses for solution generation. IMPORTANT: Respond with valid JSON only. No markdown code fences, no explanation outside the JSON.""", "advocate": """You are the Advocate — a solution generator and defender. Mode 1 (Generation): Produce 2-3 distinct approaches from different perspectives. Each solution must be genuinely different. Include: title, approach (specific enough to implement), tradeoffs (genuine downsides), assumptions (falsifiable), confidence (0-1), perspective. Mode 2 (Defense): For each objection, respond with: - DEFENDED: objection is wrong, provide counter-argument - CONCEDED: objection is valid, describe how solution changes - PARTIALLY_CONCEDED: has merit but doesn't invalidate core approach Do NOT defend everything reflexively. Intellectual honesty makes the output more useful. IMPORTANT: Respond with valid JSON only. No markdown code fences, no explanation outside the JSON.""", "critic": """You are the Critic — an adversarial evaluator who stress-tests solutions. Find real flaws, not contrarian noise. 1. Identify genuine weaknesses that would cause real problems 2. Challenge hidden assumptions 3. Find edge cases where the solution breaks 4. Assess feasibility given constraints 5. Flag missing considerations Categories: FEASIBILITY, SCALABILITY, COST, COMPLEXITY, SECURITY, MAINTAINABILITY, PERFORMANCE, UX, CORRECTNESS, MISSING_REQUIREMENT Severity: CRITICAL, HIGH, MEDIUM, LOW Every objection must have a concrete argument and evidence. Suggest mitigations where possible. IMPORTANT: Respond with valid JSON only. No markdown code fences, no explanation outside the JSON.""", "arbiter": """You are the Arbiter — the final decision-maker who synthesizes an adversarial debate into a clear, actionable recommendation. 1. Weigh all solutions, objections, and defenses 2. Assess what survived adversarial scrutiny 3. Evaluate whether concessions fatally weakened solutions or led to improvements 4. Identify the strongest path given success criteria and constraints 5. Name unresolved questions 6. Provide concrete, executable action items Confidence: 0.9+ clear winner, 0.7-0.9 good with minor concerns, 0.5-0.7 significant uncertainty, <0.5 no clear winner. IMPORTANT: Respond with valid JSON only. No markdown code fences, no explanation outside the JSON.""", } # ── User prompts ──────────────────────────────────────────────────────── def analyst_prompt(problem): return f"""Analyze the following problem and produce a structured problem frame. Problem: {problem} Produce a JSON object with: - problem: {{ statement, dimensions (array of strings), constraints (array of strings), criteria (array of strings), context (string) }} - suggestedPerspectives: string[] (2-3 distinct perspectives)""" def advocate_generate_prompt(problem, problem_frame, perspectives): ctx = json.dumps({"problemFrame": problem_frame, "perspectives": perspectives}, indent=2) return f"""Generate solutions for the following problem. Problem: {problem} Context: {ctx} Generate 2-3 distinct solutions from different perspectives. Each solution must have clear tradeoffs and explicit assumptions. Produce a JSON object with: - solutions: [{{ id (e.g. "S-1"), title, approach (detailed), tradeoffs (array), assumptions (array), confidence (0-1), perspective }}]""" def advocate_defend_prompt(problem, problem_frame, solutions, objections): ctx = json.dumps({ "problemFrame": problem_frame, "solutions": solutions, "defenseMode": True, "objections": objections, }, indent=2) return f"""Defend solutions against objections. Problem: {problem} Context: {ctx} For each objection respond with DEFENDED, CONCEDED, or PARTIALLY_CONCEDED. Produce a JSON object with: - defenses: [{{ objectionId, verdict ("DEFENDED"|"CONCEDED"|"PARTIALLY_CONCEDED"), counterArgument, modification (string|null), confidenceChange (number -1 to +1) }}] - revisedSolutions: (optional) updated solutions incorporating concessions""" def critic_prompt(problem, problem_frame, solutions, targeted, prev_objections=None, prev_defenses=None): ctx = {"problemFrame": problem_frame, "solutions": solutions, "targeted": targeted} if prev_objections: ctx["previousObjections"] = [ {"id": o.get("id", "?"), "targetSolutionId": o.get("targetSolutionId", "?"), "category": o.get("category", "?"), "argument": (o.get("argument") or "")[:200]} for o in prev_objections ] if prev_defenses: ctx["previousDefenses"] = [ {"objectionId": d.get("objectionId", "?"), "verdict": d.get("verdict", "?"), "counterArgument": (d.get("counterArgument") or "")[:200]} for d in prev_defenses ] ctx_str = json.dumps(ctx, indent=2) focus = ("Focus ONLY on aspects not already covered by previous objections." if targeted else "This is a FULL critique. Evaluate every solution thoroughly.") return f"""Critically evaluate the following solutions. Problem: {problem} Context: {ctx_str} {focus} Produce a JSON object with: - objections: [{{ id (e.g. "OBJ-1"), targetSolutionId, category, severity, argument, evidence (string|null), suggestedMitigation (string|null) }}] - overallAssessment: string""" def arbiter_prompt(problem, problem_frame, solutions, all_objections, all_defenses, rounds_summary, converged, convergence_round): ctx = json.dumps({ "problemFrame": problem_frame, "solutions": solutions, "allObjections": all_objections, "allDefenses": all_defenses, "rounds": rounds_summary, "converged": converged, "convergenceRound": convergence_round, }, indent=2) return f"""Synthesize the full debate and produce a final ruling. Problem: {problem} Context: {ctx} Produce a JSON object with: - recommendation: string (specific and actionable) - confidence: number (0-1) - rationale: string (reference specific debate points) - dissent: string|null - unresolved: string[] - actionItems: string[] - riskMatrix: [{{ risk, likelihood, impact, mitigation }}]""" # ── Convergence ───────────────────────────────────────────────────────── def extract_delta(new_objections, all_objections): existing_ids = {o["id"] for o in all_objections} existing_args = {(o.get("argument") or "").lower()[:80] for o in all_objections} return [ o for o in new_objections if o["id"] not in existing_ids and (o.get("argument") or "").lower()[:80] not in existing_args ] # ── Report generation ─────────────────────────────────────────────────── def severity_badge(s): return {"CRITICAL": "[C]", "HIGH": "[H]", "MEDIUM": "[M]", "LOW": "[L]"}.get(s, f"[{s}]") def confidence_bar(c): filled = round(c * 10) return "\u2588" * filled + "\u2591" * (10 - filled) + f" {c*100:.0f}%" def generate_report(problem, problem_frame, solutions, rounds, all_objections, all_defenses, ruling, converged, convergence_round, total_rounds, duration, cost_data, agent_models): lines = ["# Tribunal Report", "", f"**Date:** {time.strftime('%Y-%m-%d')}", ""] # Models used lines.append("## Models") lines.append("") lines.append(f"| Role | Model |") lines.append(f"|------|-------|") for role in ("analyst", "advocate", "critic", "arbiter"): lines.append(f"| {role.title()} | `{agent_models.get(role, '?')}` |") lines.append("") # Problem lines.append("## Problem") lines.append("") lines.append(problem_frame.get("statement", problem)) dims = problem_frame.get("dimensions", []) if dims: lines.append(f"\n**Dimensions:** {', '.join(dims)}") lines.append("") # Stats lines.append("## Debate Statistics") lines.append("") lines.append(f"- **Solutions proposed:** {len(solutions)}") lines.append(f"- **Total objections:** {len(all_objections)}") lines.append(f"- **Total defenses:** {len(all_defenses)}") lines.append(f"- **Rounds:** {total_rounds}") conv_str = f"yes (round {convergence_round})" if converged else "no" lines.append(f"- **Converged:** {conv_str}") lines.append(f"- **Duration:** {duration:.1f}s") if cost_data: lines.append(f"- **Estimated cost:** ${cost_data['total_cost']:.2f}") lines.append("") # Solutions lines.append("## Solutions") lines.append("") for s in solutions: lines.append(f"### {s.get('id', '?')}: {s.get('title', 'Untitled')}") lines.append("") lines.append(f"**Perspective:** {s.get('perspective', '?')} | " f"**Confidence:** {confidence_bar(s.get('confidence', 0))}") lines.append("") lines.append(f"**Approach:** {s.get('approach', '?')}") lines.append("") tradeoffs = s.get("tradeoffs", []) if tradeoffs: lines.append("**Tradeoffs:**") for t in tradeoffs: lines.append(f"- {t}") lines.append("") lines.append("---") lines.append("") # Objections table if all_objections: lines.append("## Objections") lines.append("") lines.append("| ID | Target | Category | Severity | Argument |") lines.append("|----|--------|----------|----------|----------|") for o in all_objections: arg = o.get("argument", "")[:80] + ("..." if len(o.get("argument", "")) > 80 else "") lines.append(f"| {o['id']} | {o['targetSolutionId']} | {o['category']} | " f"{severity_badge(o['severity'])} {o['severity']} | {arg} |") lines.append("") # Defenses if all_defenses: lines.append("## Defenses") lines.append("") for d in all_defenses: v = d.get("verdict", "?") icon = {"DEFENDED": "\u2713", "CONCEDED": "\u2717", "PARTIALLY_CONCEDED": "~"}.get(v, "?") ca = (d.get("counterArgument") or "")[:120] lines.append(f"- **{icon} {d['objectionId']}** \u2192 {v}: {ca}") if d.get("modification"): lines.append(f" - *Modification:* {d['modification'][:200]}") lines.append("") # Ruling if ruling: lines.append("## Final Ruling") lines.append("") lines.append(f"**Confidence:** {confidence_bar(ruling.get('confidence', 0))}") lines.append("") lines.append("### Recommendation") lines.append("") lines.append(ruling.get("recommendation", "No recommendation")) lines.append("") lines.append("### Rationale") lines.append("") lines.append(ruling.get("rationale", "")) lines.append("") if ruling.get("unresolved"): lines.append("### Unresolved Issues") lines.append("") for u in ruling["unresolved"]: lines.append(f"- {u}") lines.append("") if ruling.get("actionItems"): lines.append("### Action Items") lines.append("") for i, a in enumerate(ruling["actionItems"], 1): lines.append(f"{i}. {a}") lines.append("") if ruling.get("riskMatrix"): lines.append("### Risk Matrix") lines.append("") lines.append("| Risk | Likelihood | Impact | Mitigation |") lines.append("|------|-----------|--------|------------|") for r in ruling["riskMatrix"]: lines.append(f"| {r.get('risk','')} | {r.get('likelihood','')} | " f"{r.get('impact','')} | {r.get('mitigation','')} |") lines.append("") # Cost breakdown if cost_data and cost_data.get("by_agent"): lines.append("## Cost Breakdown") lines.append("") lines.append("| Agent | Model | Calls | Tokens In | Tokens Out | Cost |") lines.append("|-------|-------|-------|-----------|------------|------|") for name, stats in cost_data["by_agent"].items(): lines.append(f"| {name} | `{stats['model']}` | {stats['calls']} | " f"~{stats['input']//1000}K | ~{stats['output']//1000}K | " f"${stats['cost']:.3f} |") lines.append(f"\n**Total: ${cost_data['total_cost']:.2f}**") lines.append("") return "\n".join(lines) # ── Main debate pipeline ──────────────────────────────────────────────── def log(msg): print(f"[tribunal] {msg}", file=sys.stderr) class Spinner: """Animated spinner showing elapsed time during long operations.""" FRAMES = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" def __init__(self, message): self._message = message self._stop = threading.Event() self._thread = None def start(self): self._stop.clear() self._thread = threading.Thread(target=self._spin, daemon=True) self._thread.start() def _spin(self): start = time.time() i = 0 while not self._stop.is_set(): elapsed = time.time() - start frame = self.FRAMES[i % len(self.FRAMES)] sys.stderr.write(f"\r[tribunal] {self._message} {frame} {elapsed:.0f}s") sys.stderr.flush() i += 1 self._stop.wait(0.1) def stop(self, final_message=None): self._stop.set() if self._thread: self._thread.join() sys.stderr.write("\r\033[K") if final_message: sys.stderr.write(f"[tribunal] {final_message}\n") sys.stderr.flush() def run_agent(api_key, role, model, user_prompt, retries=1): """Call agent and parse JSON response. Returns (parsed_dict, usage).""" source = "Claude Code" if is_claude_code_model(model) else "OpenRouter" label = f"{role.title()} ({model} [{source}])" for attempt in range(retries + 1): spinner = Spinner(label) spinner.start() start = time.time() try: text, usage = call_model(api_key, model, SYSTEM_PROMPTS[role], user_prompt) except RuntimeError as e: elapsed = time.time() - start if attempt < retries: spinner.stop(f"{role.title()} FAILED ({elapsed:.1f}s) — retrying...") continue spinner.stop(f"{role.title()} FAILED ({elapsed:.1f}s) — {e}") return None, {"prompt_tokens": 0, "completion_tokens": 0} except Exception: elapsed = time.time() - start spinner.stop(f"{role.title()} FAILED ({elapsed:.1f}s)") raise elapsed = time.time() - start spinner.stop(f"{role.title()} done ({elapsed:.1f}s)") break parsed = extract_json(text) if parsed is None: log(f"WARNING: {role} returned no parseable JSON") return None, usage return coerce(parsed), usage def debate(problem, analyst_model, advocate_model, critic_model, arbiter_model, max_rounds=3, single_pass=False, output_dir=".tribunal"): all_models = [analyst_model, advocate_model, critic_model, arbiter_model] needs_openrouter = any(not is_claude_code_model(m) for m in all_models) api_key = get_api_key() if needs_openrouter else "" start_time = time.time() # Resolve models agent_models = { "analyst": analyst_model, "advocate": advocate_model, "critic": critic_model, "arbiter": arbiter_model, } print(file=sys.stderr) log("Multi-model adversarial debate") log(f"Analyst: {analyst_model}") log(f"Advocate: {advocate_model}") log(f"Critic: {critic_model}") log(f"Arbiter: {arbiter_model}") log(f"Max rounds: {max_rounds}") print(file=sys.stderr) # Fetch pricing pricing = fetch_pricing(api_key) cost_data = {"total_input": 0, "total_output": 0, "total_cost": 0, "by_agent": {}} def track(agent_name, model, usage): inp, out, cost = calc_cost(pricing, model, usage) cost_data["total_input"] += inp cost_data["total_output"] += out cost_data["total_cost"] += cost if agent_name not in cost_data["by_agent"]: cost_data["by_agent"][agent_name] = {"input": 0, "output": 0, "cost": 0, "calls": 0, "model": model} a = cost_data["by_agent"][agent_name] a["input"] += inp a["output"] += out a["cost"] += cost a["calls"] += 1 # ── ANSI colors ── C_BOLD = "\033[1m" C_DIM = "\033[2m" C_CYAN = "\033[36m" C_GREEN = "\033[32m" C_YELLOW = "\033[33m" C_RED = "\033[31m" C_MAGENTA = "\033[35m" C_RESET = "\033[0m" def section(title): width = 60 print(f"\n{C_CYAN}{C_BOLD}{'─' * width}", file=sys.stderr) print(f" {title}", file=sys.stderr) print(f"{'─' * width}{C_RESET}\n", file=sys.stderr) def bullet(text, indent=2): print(f"{' ' * indent}{C_DIM}•{C_RESET} {text}", file=sys.stderr) # ── Phase 0: Analyst ── section("Phase 0: ANALYST — framing the problem") result, usage = run_agent(api_key, "analyst", analyst_model, analyst_prompt(problem)) track("analyst", analyst_model, usage) if result and "problem" in result: problem_frame = result["problem"] perspectives = result.get("suggestedPerspectives", ["pragmatist", "innovator"]) else: log("Analyst parse failed, using minimal frame") problem_frame = {"statement": problem, "dimensions": ["feasibility", "complexity"], "constraints": [], "criteria": ["correctness"], "context": ""} perspectives = ["pragmatist", "innovator"] # Show analyst output stmt = problem_frame.get("statement", problem) print(f"\n {C_BOLD}Problem frame:{C_RESET}", file=sys.stderr) # Wrap long statement for i in range(0, len(stmt), 76): print(f" {stmt[i:i+76]}", file=sys.stderr) dims = problem_frame.get("dimensions", []) if dims: print(f"\n {C_BOLD}Dimensions ({len(dims)}):{C_RESET}", file=sys.stderr) for d in dims: label = d[:90] + "..." if len(d) > 90 else d bullet(label, indent=4) constraints = problem_frame.get("constraints", []) if constraints: print(f"\n {C_BOLD}Constraints ({len(constraints)}):{C_RESET}", file=sys.stderr) for c in constraints: label = c[:90] + "..." if len(c) > 90 else c bullet(label, indent=4) criteria = problem_frame.get("criteria", []) if criteria: print(f"\n {C_BOLD}Success criteria ({len(criteria)}):{C_RESET}", file=sys.stderr) for c in criteria: label = c[:90] + "..." if len(c) > 90 else c bullet(label, indent=4) print(f"\n {C_BOLD}Perspectives:{C_RESET} {', '.join(perspectives)}", file=sys.stderr) print(file=sys.stderr) # ── Phase 1: Advocate generates solutions ── section("Phase 1: ADVOCATE — generating solutions") result, usage = run_agent(api_key, "advocate", advocate_model, advocate_generate_prompt(problem, problem_frame, perspectives)) track("advocate", advocate_model, usage) solutions = result.get("solutions", []) if result else [] if not solutions: log("ERROR: No solutions generated, aborting") return # Show solutions for s in solutions: sid = s.get("id", "?") title = s.get("title", "Untitled") conf = s.get("confidence", 0) persp = s.get("perspective", "?") print(f"\n {C_GREEN}{C_BOLD}{sid}: {title}{C_RESET}", file=sys.stderr) print(f" {C_DIM}Perspective:{C_RESET} {persp} | {C_DIM}Confidence:{C_RESET} {confidence_bar(conf)}", file=sys.stderr) approach = s.get("approach", "") if approach: short = approach[:200] + "..." if len(approach) > 200 else approach print(f" {short}", file=sys.stderr) tradeoffs = s.get("tradeoffs", []) if tradeoffs: print(f" {C_YELLOW}Tradeoffs:{C_RESET}", file=sys.stderr) for t in tradeoffs[:4]: bullet(t[:100], indent=6) print(file=sys.stderr) # ── Phase 2+: Adversarial rounds ── all_objections = [] all_defenses = [] rounds = [] converged = False convergence_round = 0 round_num = 1 while not converged and round_num <= max_rounds: is_full = round_num <= 1 # Critic mode = "full" if is_full else "targeted" section(f"Round {round_num}: CRITIC — {mode} critique") result, usage = run_agent( api_key, "critic", critic_model, critic_prompt(problem, problem_frame, solutions, not is_full, all_objections if all_objections else None, all_defenses if all_defenses else None)) track(f"critic-r{round_num}", critic_model, usage) new_objections = result.get("objections", []) if result else [] # Convergence check delta = extract_delta(new_objections, all_objections) if not delta: converged = True convergence_round = round_num print(f"\n {C_GREEN}{C_BOLD}✓ No new objections — debate converged{C_RESET}", file=sys.stderr) rounds.append({"round": round_num, "agent": "critic", "objections": 0, "defenses": 0}) break all_objections.extend(delta) rounds.append({"round": round_num, "agent": "critic", "objections": len(delta), "defenses": 0}) # Show objections for o in delta: oid = o.get("id", "?") target = o.get("targetSolutionId", "?") cat = o.get("category", "?") sev = o.get("severity", "?") arg = (o.get("argument") or "")[:150] sev_color = C_RED if sev in ("CRITICAL", "HIGH") else C_YELLOW print(f"\n {sev_color}{severity_badge(sev)}{C_RESET} {C_BOLD}{oid}{C_RESET} → {target} ({cat})", file=sys.stderr) print(f" {arg}", file=sys.stderr) mit = o.get("suggestedMitigation") if mit: print(f" {C_DIM}Mitigation: {mit[:120]}{C_RESET}", file=sys.stderr) assessment = result.get("overallAssessment", "") if result else "" if assessment: print(f"\n {C_BOLD}Assessment:{C_RESET} {assessment[:300]}", file=sys.stderr) # Advocate defense section(f"Round {round_num}: ADVOCATE — defense") result, usage = run_agent( api_key, "advocate", advocate_model, advocate_defend_prompt(problem, problem_frame, solutions, delta)) track(f"advocate-r{round_num}", advocate_model, usage) defenses = result.get("defenses", []) if result else [] if result and result.get("revisedSolutions"): solutions = result["revisedSolutions"] log(f"Solutions revised based on concessions") all_defenses.extend(defenses) rounds.append({"round": round_num, "agent": "advocate", "objections": 0, "defenses": len(defenses)}) # Show defenses for d in defenses: did = d.get("objectionId", "?") verdict = d.get("verdict", "?") ca = (d.get("counterArgument") or "")[:150] if verdict == "DEFENDED": icon, color = "✓", C_GREEN elif verdict == "CONCEDED": icon, color = "✗", C_RED else: icon, color = "~", C_YELLOW print(f"\n {color}{icon} {did}{C_RESET} → {C_BOLD}{verdict}{C_RESET}", file=sys.stderr) print(f" {ca}", file=sys.stderr) mod = d.get("modification") if mod: print(f" {C_MAGENTA}Modification: {mod[:150]}{C_RESET}", file=sys.stderr) print(file=sys.stderr) round_num += 1 if single_pass: break # ── Final: Arbiter ── section("FINAL: ARBITER — synthesis & ruling") result, usage = run_agent( api_key, "arbiter", arbiter_model, arbiter_prompt(problem, problem_frame, solutions, all_objections, all_defenses, rounds, converged, convergence_round)) track("arbiter", arbiter_model, usage) ruling = result # Show ruling if ruling: conf = ruling.get("confidence", 0) print(f"\n {C_BOLD}Confidence:{C_RESET} {confidence_bar(conf)}", file=sys.stderr) rec = ruling.get("recommendation", "") if rec: print(f"\n {C_GREEN}{C_BOLD}Recommendation:{C_RESET}", file=sys.stderr) for i in range(0, len(rec), 76): print(f" {rec[i:i+76]}", file=sys.stderr) rationale = ruling.get("rationale", "") if rationale: print(f"\n {C_BOLD}Rationale:{C_RESET}", file=sys.stderr) for i in range(0, len(rationale), 76): print(f" {rationale[i:i+76]}", file=sys.stderr) dissent = ruling.get("dissent") if dissent: print(f"\n {C_YELLOW}{C_BOLD}Dissent:{C_RESET} {dissent}", file=sys.stderr) unresolved = ruling.get("unresolved", []) if unresolved: print(f"\n {C_BOLD}Unresolved:{C_RESET}", file=sys.stderr) for u in unresolved: bullet(u, indent=4) actions = ruling.get("actionItems", []) if actions: print(f"\n {C_BOLD}Action items:{C_RESET}", file=sys.stderr) for i, a in enumerate(actions, 1): print(f" {C_GREEN}{i}.{C_RESET} {a}", file=sys.stderr) risks = ruling.get("riskMatrix", []) if risks: print(f"\n {C_BOLD}Risk matrix:{C_RESET}", file=sys.stderr) for r in risks: risk = r.get("risk", "?") lh = r.get("likelihood", "?") imp = r.get("impact", "?") sev_color = C_RED if imp in ("HIGH", "CRITICAL") else C_YELLOW print(f" {sev_color}▸{C_RESET} {risk} ({C_DIM}likelihood:{C_RESET} {lh}, {C_DIM}impact:{C_RESET} {imp})", file=sys.stderr) mit = r.get("mitigation", "") if mit: print(f" {C_DIM}→ {mit[:120]}{C_RESET}", file=sys.stderr) print(file=sys.stderr) duration = time.time() - start_time total_rounds = round_num - (0 if converged else 1) # Generate and save report report = generate_report( problem, problem_frame, solutions, rounds, all_objections, all_defenses, ruling, converged, convergence_round, total_rounds, duration, cost_data, agent_models) os.makedirs(output_dir, exist_ok=True) report_path = os.path.join(output_dir, "tribunal-report.md") with open(report_path, "w") as f: f.write(report) # Also save raw JSON raw_path = os.path.join(output_dir, "tribunal-data.json") with open(raw_path, "w") as f: json.dump({ "problem": problem_frame, "solutions": solutions, "objections": all_objections, "defenses": all_defenses, "ruling": ruling, "converged": converged, "convergenceRound": convergence_round, "totalRounds": total_rounds, "duration": duration, "cost": cost_data, "models": agent_models, }, f, indent=2) # Summary log(f"Report: {report_path}") log(f"Data: {raw_path}") print(file=sys.stderr) log("Summary:") log(f" Solutions: {len(solutions)}") log(f" Objections: {len(all_objections)}") log(f" Defenses: {len(all_defenses)}") log(f" Rounds: {total_rounds}") conv = f"yes (round {convergence_round})" if converged else "no" log(f" Converged: {conv}") log(f" Duration: {duration:.1f}s") log(f" Cost: ${cost_data['total_cost']:.2f}") if ruling: log(f" Confidence: {ruling.get('confidence', 0)*100:.0f}%") # ── Original consult functions ────────────────────────────────────────── def ask(question, model=None, files=None, system_prompt=None): cfg = load_config() model = model or cfg.get("default_model", "google/gemini-2.5-pro") if not is_claude_code_model(model): api_key = cfg.get("api_key", "") if not api_key: print( "Error: No API key. Set it in BTerminal Consult tab " "or edit ~/.config/bterminal/consult.json", file=sys.stderr, ) sys.exit(1) content = question or "" if files: for fpath in files: try: with open(fpath) as f: fc = f.read() content += f"\n\n--- {fpath} ---\n```\n{fc}\n```" except Exception as e: print(f"Warning: Cannot read {fpath}: {e}", file=sys.stderr) if not sys.stdin.isatty(): stdin_data = sys.stdin.read() if stdin_data.strip(): content += f"\n\n--- stdin ---\n```\n{stdin_data}\n```" if not content.strip(): print("Error: No question provided.", file=sys.stderr) sys.exit(1) messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "user", "content": content}) model_name = cfg.get("models", {}).get(model, {}).get("name", model) source = "Claude Code" if is_claude_code_model(model) else "OpenRouter" print(f"[{model_name} — {model} ({source})]", file=sys.stderr) # Claude Code models use CLI directly if is_claude_code_model(model): combined = content if system_prompt: combined = f"{system_prompt}\n\n{content}" try: text, _ = call_claude_code(model, "", combined) print(text) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) return payload = json.dumps({"model": model, "messages": messages, "stream": True}).encode() headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "HTTP-Referer": "https://github.com/DexterFromLab/BTerminal", "X-Title": "BTerminal Consult", } req = urllib.request.Request(OPENROUTER_API, data=payload, headers=headers) try: resp = urllib.request.urlopen(req, timeout=120) while True: line = resp.readline() if not line: break line = line.decode("utf-8").strip() if not line.startswith("data: "): continue data = line[6:] if data == "[DONE]": break try: chunk = json.loads(data) delta = chunk["choices"][0].get("delta", {}) text = delta.get("content", "") if text: sys.stdout.write(text) sys.stdout.flush() except (json.JSONDecodeError, KeyError, IndexError): pass print() except urllib.error.HTTPError as e: body = e.read().decode() try: err = json.loads(body) msg = err.get("error", {}).get("message", body) except json.JSONDecodeError: msg = body print(f"Error ({e.code}): {msg}", file=sys.stderr) sys.exit(1) except urllib.error.URLError as e: print(f"Connection error: {e.reason}", file=sys.stderr) sys.exit(1) def list_models(): cfg = load_config() default = cfg.get("default_model", "") models = cfg.get("models", {}) enabled = {k: v for k, v in models.items() if v.get("enabled")} if not enabled: print("No models enabled. Use BTerminal Consult tab to enable models.") return print("Enabled models:") for mid in sorted(enabled): info = enabled[mid] star = " [default]" if mid == default else "" source = info.get("source", "openrouter") src_tag = "[CC]" if source == "claude-code" else "[OR]" print(f" {src_tag} {mid} — {info.get('name', mid)}{star}") disabled = {k: v for k, v in models.items() if not v.get("enabled")} if disabled: print(f"\nDisabled models: {len(disabled)} (enable in BTerminal Consult tab)") def print_help(): print("consult — Query external AI models via OpenRouter\n") print("Usage:") print(' consult "question" Ask default model') print(' consult -m model_id "question" Ask specific model') print(' consult -f file.py "review this code" Include file as context') print(' cat log.txt | consult "analyze errors" Pipe stdin as context') print(' consult -s "system prompt" "question" Custom system prompt') print(" consult models List enabled models") print(' consult debate "problem" Multi-model debate') print() print("Options:") print(" -m, --model MODEL Model ID (default: from config)") print(" -f, --file FILE Include file(s) as context (repeatable)") print(" -s, --system PROMPT System prompt") print(" -h, --help Show this help") print() print("Debate options:") print(" --analyst MODEL Model for problem analysis") print(" --advocate MODEL Model for solution generation") print(" --critic MODEL Model for adversarial critique") print(" --arbiter MODEL Model for final synthesis") print(" --rounds N Max debate rounds (default: 3)") print(" --single-pass One round only") print(" --output DIR Output directory (default: .tribunal)") print() list_models() print(f"\nConfig: {CONFIG_FILE}") def main(): if len(sys.argv) == 1 or sys.argv[1] in ("-h", "--help"): print_help() sys.exit(0) if sys.argv[1] == "models": list_models() return # ── debate subcommand ── if sys.argv[1] == "debate": cfg = load_config() tribunal_cfg = cfg.get("tribunal", {}) default_model = cfg.get("default_model", "google/gemini-2.5-pro") parser = argparse.ArgumentParser(prog="consult debate", add_help=False) parser.add_argument("_debate", nargs=1) # consume "debate" parser.add_argument("problem", nargs="*") parser.add_argument("--analyst", default=tribunal_cfg.get("analyst_model") or default_model) parser.add_argument("--advocate", default=tribunal_cfg.get("advocate_model") or default_model) parser.add_argument("--critic", default=tribunal_cfg.get("critic_model") or default_model) parser.add_argument("--arbiter", default=tribunal_cfg.get("arbiter_model") or default_model) parser.add_argument("--rounds", type=int, default=tribunal_cfg.get("max_rounds", 3)) parser.add_argument("--single-pass", action="store_true") parser.add_argument("--output", default=".tribunal") args = parser.parse_args() problem = " ".join(args.problem) if args.problem else "" if not problem: print("Error: No problem statement provided.", file=sys.stderr) print('Usage: consult debate "your problem statement"', file=sys.stderr) sys.exit(1) try: debate( problem, analyst_model=args.analyst, advocate_model=args.advocate, critic_model=args.critic, arbiter_model=args.arbiter, max_rounds=args.rounds, single_pass=args.single_pass, output_dir=args.output, ) except urllib.error.HTTPError as e: body = e.read().decode() try: err = json.loads(body) msg = err.get("error", {}).get("message", body) except json.JSONDecodeError: msg = body print(f"Error ({e.code}): {msg}", file=sys.stderr) sys.exit(1) except urllib.error.URLError as e: print(f"Connection error: {e.reason}", file=sys.stderr) sys.exit(1) return # ── regular query ── parser = argparse.ArgumentParser(prog="consult", add_help=False) parser.add_argument("question", nargs="*") parser.add_argument("-m", "--model") parser.add_argument("-f", "--file", action="append", dest="files") parser.add_argument("-s", "--system") args = parser.parse_args() question = " ".join(args.question) if args.question else "" if not question and not args.files and sys.stdin.isatty(): parser.print_help() sys.exit(0) ask(question, args.model, args.files, args.system) if __name__ == "__main__": main()