diff --git a/v2/Cargo.lock b/v2/Cargo.lock index d94a277..6a33007 100644 --- a/v2/Cargo.lock +++ b/v2/Cargo.lock @@ -15,8 +15,10 @@ dependencies = [ "bterminal-core", "dirs 5.0.1", "futures-util", + "hex", "keyring", "log", + "native-tls", "notify", "notify-rust", "opentelemetry", @@ -26,12 +28,14 @@ dependencies = [ "rusqlite", "serde", "serde_json", + "sha2", "tauri", "tauri-build", "tauri-plugin-dialog", "tauri-plugin-updater", "tempfile", "tokio", + "tokio-native-tls", "tokio-tungstenite", "tracing", "tracing-opentelemetry", diff --git a/v2/bterminal-core/src/sandbox.rs b/v2/bterminal-core/src/sandbox.rs index 2c41d1b..50c0e75 100644 --- a/v2/bterminal-core/src/sandbox.rs +++ b/v2/bterminal-core/src/sandbox.rs @@ -85,6 +85,39 @@ impl SandboxConfig { } } + /// Build a restricted sandbox config for Aider agent sessions. + /// More restrictive than `for_projects`: only project worktree + read-only system paths. + /// Does NOT allow write access to ~/.config, ~/.claude, etc. + pub fn for_aider_restricted(project_cwd: &str, worktree: Option<&str>) -> Self { + let mut rw = vec![PathBuf::from(project_cwd)]; + if let Some(wt) = worktree { + rw.push(PathBuf::from(wt)); + } + rw.push(std::env::temp_dir()); + let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("/root")); + rw.push(home.join(".aider")); + + let ro = vec![ + PathBuf::from("/usr"), + PathBuf::from("/lib"), + PathBuf::from("/lib64"), + PathBuf::from("/etc"), + PathBuf::from("/proc"), + PathBuf::from("/dev"), + PathBuf::from("/bin"), + PathBuf::from("/sbin"), + home.join(".local"), + home.join(".deno"), + home.join(".nvm"), + ]; + + Self { + rw_paths: rw, + ro_paths: ro, + enabled: true, + } + } + /// Build a sandbox config for a single project directory. pub fn for_project(cwd: &str, worktree: Option<&str>) -> Self { let worktrees: Vec<&str> = worktree.into_iter().collect(); @@ -266,6 +299,57 @@ mod tests { assert_eq!(config.rw_paths.len(), 3); } + #[test] + fn test_for_aider_restricted_single_cwd() { + let config = SandboxConfig::for_aider_restricted("/home/user/myproject", None); + assert!(config.enabled); + assert!(config.rw_paths.contains(&PathBuf::from("/home/user/myproject"))); + assert!(config.rw_paths.contains(&std::env::temp_dir())); + let home = dirs::home_dir().unwrap(); + assert!(config.rw_paths.contains(&home.join(".aider"))); + // No worktree path added + assert!(!config + .rw_paths + .iter() + .any(|p| p.to_string_lossy().contains("worktree"))); + } + + #[test] + fn test_for_aider_restricted_with_worktree() { + let config = SandboxConfig::for_aider_restricted( + "/home/user/myproject", + Some("/home/user/myproject/.claude/worktrees/abc123"), + ); + assert!(config.enabled); + assert!(config.rw_paths.contains(&PathBuf::from("/home/user/myproject"))); + assert!(config.rw_paths.contains(&PathBuf::from( + "/home/user/myproject/.claude/worktrees/abc123" + ))); + } + + #[test] + fn test_for_aider_restricted_no_config_write() { + let config = SandboxConfig::for_aider_restricted("/tmp/test", None); + let home = dirs::home_dir().unwrap(); + // Aider restricted must NOT have ~/.config or ~/.claude in rw_paths + assert!(!config.rw_paths.contains(&home.join(".config"))); + assert!(!config.rw_paths.contains(&home.join(".claude"))); + // And NOT in ro_paths either (stricter than for_projects) + assert!(!config.ro_paths.contains(&home.join(".config"))); + assert!(!config.ro_paths.contains(&home.join(".claude"))); + } + + #[test] + fn test_for_aider_restricted_rw_count() { + // Without worktree: cwd + tmp + .aider = 3 + let config = SandboxConfig::for_aider_restricted("/tmp/test", None); + assert_eq!(config.rw_paths.len(), 3); + + // With worktree: cwd + worktree + tmp + .aider = 4 + let config = SandboxConfig::for_aider_restricted("/tmp/test", Some("/tmp/wt")); + assert_eq!(config.rw_paths.len(), 4); + } + #[test] fn test_for_projects_empty() { let config = SandboxConfig::for_projects(&[], &[]); diff --git a/v2/bterminal-core/src/sidecar.rs b/v2/bterminal-core/src/sidecar.rs index 934e117..4e27578 100644 --- a/v2/bterminal-core/src/sidecar.rs +++ b/v2/bterminal-core/src/sidecar.rs @@ -2,6 +2,10 @@ // Spawns per-provider runner scripts (e.g. claude-runner.mjs, aider-runner.mjs) // via deno or node, communicates via stdio NDJSON. // Each provider gets its own process, started lazily on first query. +// +// Uses a std::sync::mpsc actor pattern: the actor thread owns all mutable state +// (providers HashMap, session_providers HashMap) exclusively. External callers +// send requests via a channel, eliminating the TOCTOU race in ensure_provider(). use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -10,7 +14,8 @@ use std::io::{BufRead, BufReader, Write}; use std::os::unix::process::CommandExt; use std::path::PathBuf; use std::process::{Child, Command, Stdio}; -use std::sync::{Arc, Mutex}; +use std::sync::mpsc as std_mpsc; +use std::sync::Arc; use std::thread; use crate::event::EventSink; @@ -66,296 +71,544 @@ struct ProviderProcess { child: Child, stdin_writer: Box, ready: bool, + /// Atomic flag set by the stdout reader thread when "ready" message arrives. + /// The actor polls this to detect readiness without needing a separate channel. + ready_flag: Arc, +} + +/// Requests sent from public API methods to the actor thread. +enum ProviderRequest { + Start { + reply: std_mpsc::Sender>, + }, + EnsureAndQuery { + options: AgentQueryOptions, + reply: std_mpsc::Sender>, + }, + StopSession { + session_id: String, + reply: std_mpsc::Sender>, + }, + SendMessage { + msg: serde_json::Value, + reply: std_mpsc::Sender>, + }, + Restart { + reply: std_mpsc::Sender>, + }, + Shutdown { + reply: std_mpsc::Sender>, + }, + IsReady { + reply: std_mpsc::Sender, + }, + SetSandbox { + sandbox: SandboxConfig, + reply: std_mpsc::Sender<()>, + }, } pub struct SidecarManager { - /// Provider name → running sidecar process - providers: Arc>>, - /// Session ID → provider name (for routing stop messages) - session_providers: Arc>>, + tx: std_mpsc::Sender, + // Keep a handle so the thread lives as long as the manager. + // Not joined on drop — we send Shutdown instead. + _actor_thread: Option>, +} + +/// Actor function that owns all mutable state exclusively. +/// Receives requests via `req_rx`. Ready signaling from stdout reader threads +/// uses per-provider AtomicBool flags (polled during ensure_provider_impl). +fn run_actor( + req_rx: std_mpsc::Receiver, sink: Arc, - config: Mutex, + initial_config: SidecarConfig, +) { + let mut providers: HashMap = HashMap::new(); + let mut session_providers: HashMap = HashMap::new(); + let mut config = initial_config; + + loop { + // Block waiting for next request (with timeout so actor stays responsive) + match req_rx.recv_timeout(std::time::Duration::from_millis(50)) { + Ok(req) => { + match req { + ProviderRequest::Start { reply } => { + let result = start_provider_impl( + &mut providers, + &config, + &sink, + "claude", + ); + let _ = reply.send(result); + } + ProviderRequest::EnsureAndQuery { options, reply } => { + let provider = options.provider.clone(); + + // Ensure provider is ready — atomic, no TOCTOU + if let Err(e) = ensure_provider_impl( + &mut providers, + &config, + &sink, + &provider, + ) { + let _ = reply.send(Err(e)); + continue; + } + + // Track session -> provider mapping + session_providers.insert(options.session_id.clone(), provider.clone()); + + // Build and send query message + let msg = build_query_msg(&options); + let result = send_to_provider_impl(&mut providers, &provider, &msg); + let _ = reply.send(result); + } + ProviderRequest::StopSession { session_id, reply } => { + let provider = session_providers + .get(&session_id) + .cloned() + .unwrap_or_else(|| "claude".to_string()); + let msg = serde_json::json!({ + "type": "stop", + "sessionId": session_id, + }); + let result = send_to_provider_impl(&mut providers, &provider, &msg); + let _ = reply.send(result); + } + ProviderRequest::SendMessage { msg, reply } => { + let result = send_to_provider_impl(&mut providers, "claude", &msg); + let _ = reply.send(result); + } + ProviderRequest::Restart { reply } => { + log::info!("Restarting all sidecars"); + shutdown_all(&mut providers, &mut session_providers); + let result = start_provider_impl( + &mut providers, + &config, + &sink, + "claude", + ); + let _ = reply.send(result); + } + ProviderRequest::Shutdown { reply } => { + shutdown_all(&mut providers, &mut session_providers); + let _ = reply.send(Ok(())); + } + ProviderRequest::IsReady { reply } => { + // Sync ready state from atomic flags + sync_ready_flags(&mut providers); + let ready = providers + .get("claude") + .map(|p| p.ready) + .unwrap_or(false); + let _ = reply.send(ready); + } + ProviderRequest::SetSandbox { sandbox, reply } => { + config.sandbox = sandbox; + let _ = reply.send(()); + } + } + } + Err(std_mpsc::RecvTimeoutError::Timeout) => { + // Loop back -- keeps actor responsive to shutdown + continue; + } + Err(std_mpsc::RecvTimeoutError::Disconnected) => { + // All senders dropped — shut down + break; + } + } + } + + // Channel closed — clean up remaining providers + shutdown_all(&mut providers, &mut session_providers); +} + +/// Sync ready state from AtomicBool flags set by stdout reader threads. +fn sync_ready_flags(providers: &mut HashMap) { + for p in providers.values_mut() { + if !p.ready && p.ready_flag.load(std::sync::atomic::Ordering::Acquire) { + p.ready = true; + } + } +} + +/// Shut down all provider processes and clear session mappings. +fn shutdown_all( + providers: &mut HashMap, + session_providers: &mut HashMap, +) { + for (name, mut proc) in providers.drain() { + log::info!("Shutting down {} sidecar", name); + let _ = proc.child.kill(); + let _ = proc.child.wait(); + } + session_providers.clear(); +} + +/// Start a specific provider's sidecar process. Called from the actor thread +/// which owns the providers HashMap exclusively — no lock contention possible. +fn start_provider_impl( + providers: &mut HashMap, + config: &SidecarConfig, + sink: &Arc, + provider: &str, +) -> Result<(), String> { + if providers.contains_key(provider) { + return Err(format!("Sidecar for '{}' already running", provider)); + } + + let cmd = SidecarManager::resolve_sidecar_for_provider_with_config(config, provider)?; + + log::info!( + "Starting {} sidecar: {} {}", + provider, + cmd.program, + cmd.args.join(" ") + ); + + // Build a clean environment stripping provider-specific vars to prevent + // SDKs from detecting nesting when BTerminal is launched from a provider terminal. + let clean_env: Vec<(String, String)> = std::env::vars() + .filter(|(k, _)| strip_provider_env_var(k)) + .collect(); + + let mut command = Command::new(&cmd.program); + command + .args(&cmd.args) + .env_clear() + .envs(clean_env) + .envs( + config + .env_overrides + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())), + ) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + // Apply Landlock sandbox in child process before exec (Linux only). + #[cfg(unix)] + if config.sandbox.enabled { + let sandbox = config.sandbox.clone(); + unsafe { + command.pre_exec(move || { + sandbox + .apply() + .map(|enforced| { + if !enforced { + log::warn!("Landlock sandbox not enforced in sidecar child"); + } + }) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + }); + } + } + + let mut child = command + .spawn() + .map_err(|e| format!("Failed to start {} sidecar: {e}", provider))?; + + let child_stdin = child + .stdin + .take() + .ok_or("Failed to capture sidecar stdin")?; + let child_stdout = child + .stdout + .take() + .ok_or("Failed to capture sidecar stdout")?; + let child_stderr = child + .stderr + .take() + .ok_or("Failed to capture sidecar stderr")?; + + // Per-provider AtomicBool for ready signaling from stdout reader thread to actor. + let ready_flag = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let ready_flag_writer = ready_flag.clone(); + + // Stdout reader thread — forwards NDJSON to event sink + let sink_clone = sink.clone(); + let provider_name = provider.to_string(); + thread::spawn(move || { + let reader = BufReader::new(child_stdout); + for line in reader.lines() { + match line { + Ok(line) => { + if line.trim().is_empty() { + continue; + } + match serde_json::from_str::(&line) { + Ok(msg) => { + if msg.get("type").and_then(|t| t.as_str()) == Some("ready") { + ready_flag_writer + .store(true, std::sync::atomic::Ordering::Release); + log::info!("{} sidecar ready", provider_name); + } + sink_clone.emit("sidecar-message", msg); + } + Err(e) => { + log::warn!( + "Invalid JSON from {} sidecar: {e}: {line}", + provider_name + ); + } + } + } + Err(e) => { + log::error!("{} sidecar stdout read error: {e}", provider_name); + break; + } + } + } + log::info!("{} sidecar stdout reader exited", provider_name); + sink_clone.emit( + "sidecar-exited", + serde_json::json!({ "provider": provider_name }), + ); + }); + + // Stderr reader thread — logs only + let provider_name2 = provider.to_string(); + thread::spawn(move || { + let reader = BufReader::new(child_stderr); + for line in reader.lines() { + match line { + Ok(line) => log::info!("[{} sidecar stderr] {line}", provider_name2), + Err(e) => { + log::error!("{} sidecar stderr read error: {e}", provider_name2); + break; + } + } + } + }); + + providers.insert( + provider.to_string(), + ProviderProcess { + child, + stdin_writer: Box::new(child_stdin), + ready: false, + ready_flag, + }, + ); + + Ok(()) +} + +/// Ensure a provider's sidecar is running and ready, starting it lazily if needed. +/// Called exclusively from the actor thread — no lock contention, no TOCTOU race. +fn ensure_provider_impl( + providers: &mut HashMap, + config: &SidecarConfig, + sink: &Arc, + provider: &str, +) -> Result<(), String> { + // Sync ready state from atomic flag (set by stdout reader thread) + if let Some(p) = providers.get_mut(provider) { + if !p.ready && p.ready_flag.load(std::sync::atomic::Ordering::Acquire) { + p.ready = true; + } + if p.ready { + return Ok(()); + } + // Started but not ready yet -- fall through to wait loop + } else { + // Not started -- start it now. No TOCTOU: we own the HashMap exclusively. + start_provider_impl(providers, config, sink, provider)?; + } + + // Wait for ready (up to 10 seconds) + for _ in 0..100 { + std::thread::sleep(std::time::Duration::from_millis(100)); + + if let Some(p) = providers.get_mut(provider) { + if !p.ready && p.ready_flag.load(std::sync::atomic::Ordering::Acquire) { + p.ready = true; + } + if p.ready { + return Ok(()); + } + } else { + return Err(format!("{} sidecar process exited before ready", provider)); + } + } + Err(format!( + "{} sidecar did not become ready within timeout", + provider + )) +} + +/// Send a JSON message to a provider's stdin. +fn send_to_provider_impl( + providers: &mut HashMap, + provider: &str, + msg: &serde_json::Value, +) -> Result<(), String> { + let proc = providers + .get_mut(provider) + .ok_or_else(|| format!("{} sidecar not running", provider))?; + + let line = + serde_json::to_string(msg).map_err(|e| format!("JSON serialize error: {e}"))?; + + proc.stdin_writer + .write_all(line.as_bytes()) + .map_err(|e| format!("Sidecar write error: {e}"))?; + proc.stdin_writer + .write_all(b"\n") + .map_err(|e| format!("Sidecar write error: {e}"))?; + proc.stdin_writer + .flush() + .map_err(|e| format!("Sidecar flush error: {e}"))?; + + Ok(()) +} + +/// Build the NDJSON query message from AgentQueryOptions. +fn build_query_msg(options: &AgentQueryOptions) -> serde_json::Value { + serde_json::json!({ + "type": "query", + "provider": options.provider, + "sessionId": options.session_id, + "prompt": options.prompt, + "cwd": options.cwd, + "maxTurns": options.max_turns, + "maxBudgetUsd": options.max_budget_usd, + "resumeSessionId": options.resume_session_id, + "permissionMode": options.permission_mode, + "settingSources": options.setting_sources, + "systemPrompt": options.system_prompt, + "model": options.model, + "claudeConfigDir": options.claude_config_dir, + "additionalDirectories": options.additional_directories, + "worktreeName": options.worktree_name, + "providerConfig": options.provider_config, + "extraEnv": options.extra_env, + }) } impl SidecarManager { pub fn new(sink: Arc, config: SidecarConfig) -> Self { + let (req_tx, req_rx) = std_mpsc::channel(); + + let handle = thread::spawn(move || { + run_actor(req_rx, sink, config); + }); + Self { - providers: Arc::new(Mutex::new(HashMap::new())), - session_providers: Arc::new(Mutex::new(HashMap::new())), - sink, - config: Mutex::new(config), + tx: req_tx, + _actor_thread: Some(handle), } } /// Update the sandbox configuration. Takes effect on next sidecar (re)start. pub fn set_sandbox(&self, sandbox: SandboxConfig) { - self.config.lock().unwrap().sandbox = sandbox; + let (reply_tx, reply_rx) = std_mpsc::channel(); + if self + .tx + .send(ProviderRequest::SetSandbox { + sandbox, + reply: reply_tx, + }) + .is_ok() + { + let _ = reply_rx.recv(); + } } /// Start the default (claude) provider sidecar. Called on app startup. pub fn start(&self) -> Result<(), String> { - self.start_provider("claude") - } - - /// Start a specific provider's sidecar process. - fn start_provider(&self, provider: &str) -> Result<(), String> { - let mut providers = self.providers.lock().unwrap(); - if providers.contains_key(provider) { - return Err(format!("Sidecar for '{}' already running", provider)); - } - - let config = self.config.lock().unwrap(); - let cmd = Self::resolve_sidecar_for_provider_with_config(&config, provider)?; - - log::info!("Starting {} sidecar: {} {}", provider, cmd.program, cmd.args.join(" ")); - - // Build a clean environment stripping provider-specific vars to prevent - // SDKs from detecting nesting when BTerminal is launched from a provider terminal. - let clean_env: Vec<(String, String)> = std::env::vars() - .filter(|(k, _)| { - strip_provider_env_var(k) - }) - .collect(); - - let mut command = Command::new(&cmd.program); - command - .args(&cmd.args) - .env_clear() - .envs(clean_env) - .envs(config.env_overrides.iter().map(|(k, v)| (k.as_str(), v.as_str()))) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - - // Apply Landlock sandbox in child process before exec (Linux only). - #[cfg(unix)] - if config.sandbox.enabled { - let sandbox = config.sandbox.clone(); - unsafe { - command.pre_exec(move || { - sandbox.apply().map(|enforced| { - if !enforced { - log::warn!("Landlock sandbox not enforced in sidecar child"); - } - }).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) - }); - } - } - - // Drop config lock before spawn - drop(config); - - let mut child = command - .spawn() - .map_err(|e| format!("Failed to start {} sidecar: {e}", provider))?; - - let child_stdin = child - .stdin - .take() - .ok_or("Failed to capture sidecar stdin")?; - let child_stdout = child - .stdout - .take() - .ok_or("Failed to capture sidecar stdout")?; - let child_stderr = child - .stderr - .take() - .ok_or("Failed to capture sidecar stderr")?; - - // Stdout reader thread — forwards NDJSON to event sink - let sink = self.sink.clone(); - let providers_ref = self.providers.clone(); - let provider_name = provider.to_string(); - thread::spawn(move || { - let reader = BufReader::new(child_stdout); - for line in reader.lines() { - match line { - Ok(line) => { - if line.trim().is_empty() { - continue; - } - match serde_json::from_str::(&line) { - Ok(msg) => { - if msg.get("type").and_then(|t| t.as_str()) == Some("ready") { - if let Ok(mut provs) = providers_ref.lock() { - if let Some(p) = provs.get_mut(&provider_name) { - p.ready = true; - } - } - log::info!("{} sidecar ready", provider_name); - } - sink.emit("sidecar-message", msg); - } - Err(e) => { - log::warn!("Invalid JSON from {} sidecar: {e}: {line}", provider_name); - } - } - } - Err(e) => { - log::error!("{} sidecar stdout read error: {e}", provider_name); - break; - } - } - } - log::info!("{} sidecar stdout reader exited", provider_name); - sink.emit("sidecar-exited", serde_json::json!({ "provider": provider_name })); - }); - - // Stderr reader thread — logs only - let provider_name2 = provider.to_string(); - thread::spawn(move || { - let reader = BufReader::new(child_stderr); - for line in reader.lines() { - match line { - Ok(line) => log::info!("[{} sidecar stderr] {line}", provider_name2), - Err(e) => { - log::error!("{} sidecar stderr read error: {e}", provider_name2); - break; - } - } - } - }); - - providers.insert(provider.to_string(), ProviderProcess { - child, - stdin_writer: Box::new(child_stdin), - ready: false, - }); - - Ok(()) - } - - /// Ensure a provider's sidecar is running and ready, starting it lazily if needed. - fn ensure_provider(&self, provider: &str) -> Result<(), String> { - { - let providers = self.providers.lock().unwrap(); - if let Some(p) = providers.get(provider) { - if p.ready { - return Ok(()); - } - // Started but not ready yet — wait briefly - } else { - drop(providers); - self.start_provider(provider)?; - } - } - - // Wait for ready (up to 10 seconds) - for _ in 0..100 { - std::thread::sleep(std::time::Duration::from_millis(100)); - let providers = self.providers.lock().unwrap(); - if let Some(p) = providers.get(provider) { - if p.ready { - return Ok(()); - } - } else { - return Err(format!("{} sidecar process exited before ready", provider)); - } - } - Err(format!("{} sidecar did not become ready within timeout", provider)) - } - - fn send_to_provider(&self, provider: &str, msg: &serde_json::Value) -> Result<(), String> { - let mut providers = self.providers.lock().unwrap(); - let proc = providers.get_mut(provider) - .ok_or_else(|| format!("{} sidecar not running", provider))?; - - let line = serde_json::to_string(msg) - .map_err(|e| format!("JSON serialize error: {e}"))?; - - proc.stdin_writer - .write_all(line.as_bytes()) - .map_err(|e| format!("Sidecar write error: {e}"))?; - proc.stdin_writer - .write_all(b"\n") - .map_err(|e| format!("Sidecar write error: {e}"))?; - proc.stdin_writer - .flush() - .map_err(|e| format!("Sidecar flush error: {e}"))?; - - Ok(()) - } - - /// Legacy send_message — routes to the default (claude) provider. - pub fn send_message(&self, msg: &serde_json::Value) -> Result<(), String> { - self.send_to_provider("claude", msg) + let (reply_tx, reply_rx) = std_mpsc::channel(); + self.tx + .send(ProviderRequest::Start { reply: reply_tx }) + .map_err(|_| "Sidecar actor stopped".to_string())?; + reply_rx + .recv() + .map_err(|_| "Sidecar actor stopped".to_string())? } pub fn query(&self, options: &AgentQueryOptions) -> Result<(), String> { - let provider = &options.provider; - - // Ensure the provider's sidecar is running and ready - self.ensure_provider(provider)?; - - // Track session → provider mapping for stop routing - self.session_providers.lock().unwrap() - .insert(options.session_id.clone(), provider.clone()); - - let msg = serde_json::json!({ - "type": "query", - "provider": options.provider, - "sessionId": options.session_id, - "prompt": options.prompt, - "cwd": options.cwd, - "maxTurns": options.max_turns, - "maxBudgetUsd": options.max_budget_usd, - "resumeSessionId": options.resume_session_id, - "permissionMode": options.permission_mode, - "settingSources": options.setting_sources, - "systemPrompt": options.system_prompt, - "model": options.model, - "claudeConfigDir": options.claude_config_dir, - "additionalDirectories": options.additional_directories, - "worktreeName": options.worktree_name, - "providerConfig": options.provider_config, - "extraEnv": options.extra_env, - }); - - self.send_to_provider(provider, &msg) + let (reply_tx, reply_rx) = std_mpsc::channel(); + self.tx + .send(ProviderRequest::EnsureAndQuery { + options: options.clone(), + reply: reply_tx, + }) + .map_err(|_| "Sidecar actor stopped".to_string())?; + reply_rx + .recv() + .map_err(|_| "Sidecar actor stopped".to_string())? } pub fn stop_session(&self, session_id: &str) -> Result<(), String> { - let msg = serde_json::json!({ - "type": "stop", - "sessionId": session_id, - }); - - // Route to the correct provider based on session tracking - let provider = self.session_providers.lock().unwrap() - .get(session_id) - .cloned() - .unwrap_or_else(|| "claude".to_string()); - - self.send_to_provider(&provider, &msg) + let (reply_tx, reply_rx) = std_mpsc::channel(); + self.tx + .send(ProviderRequest::StopSession { + session_id: session_id.to_string(), + reply: reply_tx, + }) + .map_err(|_| "Sidecar actor stopped".to_string())?; + reply_rx + .recv() + .map_err(|_| "Sidecar actor stopped".to_string())? } pub fn restart(&self) -> Result<(), String> { - log::info!("Restarting all sidecars"); - let _ = self.shutdown(); - self.start() + let (reply_tx, reply_rx) = std_mpsc::channel(); + self.tx + .send(ProviderRequest::Restart { reply: reply_tx }) + .map_err(|_| "Sidecar actor stopped".to_string())?; + reply_rx + .recv() + .map_err(|_| "Sidecar actor stopped".to_string())? } pub fn shutdown(&self) -> Result<(), String> { - let mut providers = self.providers.lock().unwrap(); - for (name, mut proc) in providers.drain() { - log::info!("Shutting down {} sidecar", name); - let _ = proc.child.kill(); - let _ = proc.child.wait(); + let (reply_tx, reply_rx) = std_mpsc::channel(); + if self + .tx + .send(ProviderRequest::Shutdown { reply: reply_tx }) + .is_ok() + { + let _ = reply_rx.recv(); } - self.session_providers.lock().unwrap().clear(); Ok(()) } /// Returns true if the default (claude) provider sidecar is ready. pub fn is_ready(&self) -> bool { - let providers = self.providers.lock().unwrap(); - providers.get("claude") - .map(|p| p.ready) - .unwrap_or(false) + let (reply_tx, reply_rx) = std_mpsc::channel(); + if self + .tx + .send(ProviderRequest::IsReady { reply: reply_tx }) + .is_ok() + { + reply_rx.recv().unwrap_or(false) + } else { + false + } + } + + /// Legacy send_message — routes to the default (claude) provider. + pub fn send_message(&self, msg: &serde_json::Value) -> Result<(), String> { + let (reply_tx, reply_rx) = std_mpsc::channel(); + self.tx + .send(ProviderRequest::SendMessage { + msg: msg.clone(), + reply: reply_tx, + }) + .map_err(|_| "Sidecar actor stopped".to_string())?; + reply_rx + .recv() + .map_err(|_| "Sidecar actor stopped".to_string())? } /// Resolve a sidecar command for a specific provider's runner file. - fn resolve_sidecar_for_provider_with_config(config: &SidecarConfig, provider: &str) -> Result { + fn resolve_sidecar_for_provider_with_config( + config: &SidecarConfig, + provider: &str, + ) -> Result { let runner_name = format!("{}-runner.mjs", provider); // Try Deno first (faster startup, better perf), fall back to Node.js. @@ -442,13 +695,24 @@ fn strip_provider_env_var(key: &str) -> bool { impl Drop for SidecarManager { fn drop(&mut self) { - let _ = self.shutdown(); + // Send shutdown request to the actor. If the channel is already closed + // (actor thread exited), this is a no-op. + let (reply_tx, reply_rx) = std_mpsc::channel(); + if self + .tx + .send(ProviderRequest::Shutdown { reply: reply_tx }) + .is_ok() + { + // Wait briefly for the actor to clean up (with timeout to avoid hanging) + let _ = reply_rx.recv_timeout(std::time::Duration::from_secs(5)); + } } } #[cfg(test)] mod tests { use super::*; + use std::sync::Mutex; // ---- strip_provider_env_var unit tests ---- @@ -543,4 +807,174 @@ mod tests { assert!(!kept.contains(&"OLLAMA_HOST")); assert!(!kept.contains(&"ANTHROPIC_API_KEY")); } + + // ---- Actor pattern tests ---- + + /// Mock EventSink that records emitted events. + struct MockSink { + events: Mutex>, + } + + impl MockSink { + fn new() -> Self { + Self { + events: Mutex::new(Vec::new()), + } + } + } + + impl EventSink for MockSink { + fn emit(&self, event: &str, payload: serde_json::Value) { + self.events + .lock() + .unwrap() + .push((event.to_string(), payload)); + } + } + + #[test] + fn test_actor_new_and_drop() { + // SidecarManager should create and drop cleanly without panicking + let sink: Arc = Arc::new(MockSink::new()); + let config = SidecarConfig { + search_paths: vec![], + env_overrides: Default::default(), + sandbox: Default::default(), + }; + let manager = SidecarManager::new(sink, config); + // is_ready should return false since no provider started + assert!(!manager.is_ready()); + // Drop should send shutdown cleanly + drop(manager); + } + + #[test] + fn test_actor_shutdown_idempotent() { + let sink: Arc = Arc::new(MockSink::new()); + let config = SidecarConfig { + search_paths: vec![], + env_overrides: Default::default(), + sandbox: Default::default(), + }; + let manager = SidecarManager::new(sink, config); + // Multiple shutdowns should not panic + assert!(manager.shutdown().is_ok()); + assert!(manager.shutdown().is_ok()); + } + + #[test] + fn test_actor_set_sandbox() { + let sink: Arc = Arc::new(MockSink::new()); + let config = SidecarConfig { + search_paths: vec![], + env_overrides: Default::default(), + sandbox: Default::default(), + }; + let manager = SidecarManager::new(sink, config); + // set_sandbox should complete without error + manager.set_sandbox(SandboxConfig { + rw_paths: vec![PathBuf::from("/tmp")], + ro_paths: vec![], + enabled: true, + }); + } + + #[test] + fn test_build_query_msg_fields() { + let options = AgentQueryOptions { + provider: "claude".to_string(), + session_id: "s1".to_string(), + prompt: "hello".to_string(), + cwd: Some("/tmp".to_string()), + max_turns: Some(5), + max_budget_usd: None, + resume_session_id: None, + permission_mode: Some("bypassPermissions".to_string()), + setting_sources: None, + system_prompt: None, + model: Some("claude-4-opus".to_string()), + claude_config_dir: None, + additional_directories: None, + worktree_name: None, + provider_config: serde_json::Value::Null, + extra_env: Default::default(), + }; + let msg = build_query_msg(&options); + assert_eq!(msg["type"], "query"); + assert_eq!(msg["provider"], "claude"); + assert_eq!(msg["sessionId"], "s1"); + assert_eq!(msg["prompt"], "hello"); + assert_eq!(msg["cwd"], "/tmp"); + assert_eq!(msg["maxTurns"], 5); + assert_eq!(msg["model"], "claude-4-opus"); + } + + #[test] + fn test_concurrent_queries_no_race() { + // This test verifies that concurrent query() calls from multiple threads + // are serialized by the actor and don't cause a TOCTOU race on ensure_provider. + // Since we can't actually start a sidecar in tests (no runner scripts), + // we verify that the actor handles multiple concurrent requests gracefully + // (all get errors, none panic or deadlock). + + let sink: Arc = Arc::new(MockSink::new()); + let config = SidecarConfig { + search_paths: vec![], // No search paths → start_provider will fail + env_overrides: Default::default(), + sandbox: Default::default(), + }; + let manager = Arc::new(SidecarManager::new(sink, config)); + + let mut handles = vec![]; + let errors = Arc::new(Mutex::new(Vec::new())); + + // Spawn 10 concurrent query() calls + for i in 0..10 { + let mgr = manager.clone(); + let errs = errors.clone(); + handles.push(thread::spawn(move || { + let options = AgentQueryOptions { + provider: "test-provider".to_string(), + session_id: format!("session-{}", i), + prompt: "hello".to_string(), + cwd: None, + max_turns: None, + max_budget_usd: None, + resume_session_id: None, + permission_mode: None, + setting_sources: None, + system_prompt: None, + model: None, + claude_config_dir: None, + additional_directories: None, + worktree_name: None, + provider_config: serde_json::Value::Null, + extra_env: Default::default(), + }; + let result = mgr.query(&options); + if let Err(e) = result { + errs.lock().unwrap().push(e); + } + })); + } + + for h in handles { + h.join().expect("Thread should not panic"); + } + + // All 10 should have failed (no sidecar scripts available), but none panicked + let errs = errors.lock().unwrap(); + assert_eq!(errs.len(), 10, "All 10 concurrent queries should get errors"); + + // The key invariant: no "Sidecar for 'X' already running" error. + // Because the actor serializes requests, the second caller sees the first's + // start_provider result (either success or failure), not a conflicting start. + // With no search paths, all errors should be "Sidecar not found" style. + for err in errs.iter() { + assert!( + !err.contains("already running"), + "Should not get 'already running' error from serialized actor. Got: {err}" + ); + } + } } diff --git a/v2/src-tauri/Cargo.toml b/v2/src-tauri/Cargo.toml index 30e1b23..8e3cfab 100644 --- a/v2/src-tauri/Cargo.toml +++ b/v2/src-tauri/Cargo.toml @@ -40,6 +40,10 @@ opentelemetry-otlp = { version = "0.28", features = ["http-proto", "reqwest-clie tracing-opentelemetry = "0.29" keyring = { version = "3", features = ["linux-native"] } notify-rust = "4" +native-tls = "0.2" +tokio-native-tls = "0.3" +sha2 = "0.10" +hex = "0.4" [dev-dependencies] tempfile = "3" diff --git a/v2/src-tauri/src/btmsg.rs b/v2/src-tauri/src/btmsg.rs index 903cb92..1cb1b16 100644 --- a/v2/src-tauri/src/btmsg.rs +++ b/v2/src-tauri/src/btmsg.rs @@ -35,6 +35,25 @@ fn open_db() -> Result { .map_err(|e| format!("Failed to set WAL mode: {e}"))?; conn.query_row("PRAGMA busy_timeout = 5000", [], |_| Ok(())) .map_err(|e| format!("Failed to set busy_timeout: {e}"))?; + conn.execute_batch("PRAGMA foreign_keys = ON") + .map_err(|e| format!("Failed to enable foreign keys: {e}"))?; + + // Migration: add seen_messages table if not present + conn.execute_batch( + "CREATE TABLE IF NOT EXISTS seen_messages ( + session_id TEXT NOT NULL, + message_id TEXT NOT NULL, + seen_at INTEGER NOT NULL DEFAULT (unixepoch()), + PRIMARY KEY (session_id, message_id), + FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE + ); + CREATE INDEX IF NOT EXISTS idx_seen_messages_session ON seen_messages(session_id);" + ).map_err(|e| format!("Migration error (seen_messages): {e}"))?; + + // Migration: add sender_group_id column to messages if not present + // SQLite ALTER TABLE ADD COLUMN is a no-op if column already exists (errors silently) + let _ = conn.execute_batch("ALTER TABLE messages ADD COLUMN sender_group_id TEXT"); + Ok(conn) } @@ -161,6 +180,79 @@ pub fn unread_messages(agent_id: &str) -> Result, String> { msgs.collect::, _>>().map_err(|e| format!("Row error: {e}")) } +/// Get messages that have not been seen by this session. +/// Unlike unread_messages (which uses the global `read` flag), +/// this tracks per-session acknowledgment via the seen_messages table. +pub fn unseen_messages(agent_id: &str, session_id: &str) -> Result, String> { + let db = open_db()?; + let mut stmt = db.prepare( + "SELECT m.id, m.from_agent, m.to_agent, m.content, m.read, m.reply_to, m.created_at, \ + a.name AS sender_name, a.role AS sender_role \ + FROM messages m \ + LEFT JOIN agents a ON a.id = m.from_agent \ + WHERE m.to_agent = ?1 \ + AND m.id NOT IN (SELECT message_id FROM seen_messages WHERE session_id = ?2) \ + ORDER BY m.created_at ASC" + ).map_err(|e| format!("Prepare unseen query: {e}"))?; + + let rows = stmt.query_map(params![agent_id, session_id], |row| { + Ok(BtmsgMessage { + id: row.get("id")?, + from_agent: row.get("from_agent")?, + to_agent: row.get("to_agent")?, + content: row.get("content")?, + read: row.get::<_, i32>("read")? != 0, + reply_to: row.get("reply_to")?, + created_at: row.get("created_at")?, + sender_name: row.get("sender_name")?, + sender_role: row.get("sender_role")?, + }) + }).map_err(|e| format!("Query unseen: {e}"))?; + + rows.collect::, _>>().map_err(|e| format!("Row error: {e}")) +} + +/// Mark specific message IDs as seen by this session. +pub fn mark_messages_seen(session_id: &str, message_ids: &[String]) -> Result<(), String> { + if message_ids.is_empty() { + return Ok(()); + } + let db = open_db()?; + let mut stmt = db.prepare( + "INSERT OR IGNORE INTO seen_messages (session_id, message_id) VALUES (?1, ?2)" + ).map_err(|e| format!("Prepare mark_seen: {e}"))?; + + for id in message_ids { + stmt.execute(params![session_id, id]) + .map_err(|e| format!("Insert seen: {e}"))?; + } + Ok(()) +} + +/// Prune seen_messages entries older than the given threshold. +/// Uses emergency aggressive pruning (3 days) when row count exceeds the threshold. +pub fn prune_seen_messages(max_age_secs: i64, emergency_threshold: i64) -> Result { + let db = open_db()?; + + let count: i64 = db.query_row( + "SELECT COUNT(*) FROM seen_messages", [], |row| row.get(0) + ).map_err(|e| format!("Count seen: {e}"))?; + + let threshold_secs = if count > emergency_threshold { + // Emergency: prune more aggressively (3 days instead of configured max) + max_age_secs.min(3 * 24 * 3600) + } else { + max_age_secs + }; + + let deleted = db.execute( + "DELETE FROM seen_messages WHERE seen_at < unixepoch() - ?1", + params![threshold_secs], + ).map_err(|e| format!("Prune seen: {e}"))?; + + Ok(deleted as u64) +} + pub fn history(agent_id: &str, other_id: &str, limit: i32) -> Result, String> { let db = open_db()?; let mut stmt = db.prepare( @@ -254,7 +346,8 @@ pub fn send_message(from_agent: &str, to_agent: &str, content: &str) -> Result Result { read INTEGER DEFAULT 0, reply_to TEXT, group_id TEXT NOT NULL, + sender_group_id TEXT, created_at TEXT DEFAULT (datetime('now')), FOREIGN KEY (from_agent) REFERENCES agents(id), FOREIGN KEY (to_agent) REFERENCES agents(id) @@ -619,14 +713,28 @@ fn open_db_or_create() -> Result { ); CREATE INDEX IF NOT EXISTS idx_audit_log_agent ON audit_log(agent_id); - CREATE INDEX IF NOT EXISTS idx_audit_log_type ON audit_log(event_type);" + CREATE INDEX IF NOT EXISTS idx_audit_log_type ON audit_log(event_type); + + CREATE TABLE IF NOT EXISTS seen_messages ( + session_id TEXT NOT NULL, + message_id TEXT NOT NULL, + seen_at INTEGER NOT NULL DEFAULT (unixepoch()), + PRIMARY KEY (session_id, message_id), + FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE + ); + CREATE INDEX IF NOT EXISTS idx_seen_messages_session ON seen_messages(session_id);" ).map_err(|e| format!("Schema creation error: {e}"))?; + // Enable foreign keys for ON DELETE CASCADE support + conn.execute_batch("PRAGMA foreign_keys = ON") + .map_err(|e| format!("Failed to enable foreign keys: {e}"))?; + Ok(conn) } // ---- Heartbeat monitoring ---- +#[allow(dead_code)] // Constructed in get_agent_heartbeats, called via Tauri IPC #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct AgentHeartbeat { @@ -651,6 +759,7 @@ pub fn record_heartbeat(agent_id: &str) -> Result<(), String> { Ok(()) } +#[allow(dead_code)] // Called via Tauri IPC command btmsg_get_agent_heartbeats pub fn get_agent_heartbeats(group_id: &str) -> Result, String> { let db = open_db()?; let mut stmt = db @@ -713,21 +822,6 @@ pub struct DeadLetter { pub created_at: String, } -pub fn queue_dead_letter( - from_agent: &str, - to_agent: &str, - content: &str, - error: &str, -) -> Result<(), String> { - let db = open_db()?; - db.execute( - "INSERT INTO dead_letter_queue (from_agent, to_agent, content, error) VALUES (?1, ?2, ?3, ?4)", - params![from_agent, to_agent, content, error], - ) - .map_err(|e| format!("Dead letter insert error: {e}"))?; - Ok(()) -} - pub fn get_dead_letters(group_id: &str, limit: i32) -> Result, String> { let db = open_db()?; let mut stmt = db @@ -757,6 +851,22 @@ pub fn get_dead_letters(group_id: &str, limit: i32) -> Result, S .map_err(|e| format!("Row error: {e}")) } +#[allow(dead_code)] // Called via Tauri IPC command btmsg_queue_dead_letter +pub fn queue_dead_letter( + from_agent: &str, + to_agent: &str, + content: &str, + error: &str, +) -> Result<(), String> { + let db = open_db()?; + db.execute( + "INSERT INTO dead_letter_queue (from_agent, to_agent, content, error) VALUES (?1, ?2, ?3, ?4)", + params![from_agent, to_agent, content, error], + ) + .map_err(|e| format!("Dead letter insert error: {e}"))?; + Ok(()) +} + pub fn clear_dead_letters(group_id: &str) -> Result<(), String> { let db = open_db()?; db.execute( diff --git a/v2/src-tauri/src/commands/btmsg.rs b/v2/src-tauri/src/commands/btmsg.rs index 978535d..687dd6a 100644 --- a/v2/src-tauri/src/commands/btmsg.rs +++ b/v2/src-tauri/src/commands/btmsg.rs @@ -78,6 +78,23 @@ pub fn btmsg_register_agents(config: groups::GroupsFile) -> Result<(), String> { btmsg::register_agents_from_groups(&config) } +// ---- Per-message acknowledgment (seen_messages) ---- + +#[tauri::command] +pub fn btmsg_unseen_messages(agent_id: String, session_id: String) -> Result, String> { + btmsg::unseen_messages(&agent_id, &session_id) +} + +#[tauri::command] +pub fn btmsg_mark_seen(session_id: String, message_ids: Vec) -> Result<(), String> { + btmsg::mark_messages_seen(&session_id, &message_ids) +} + +#[tauri::command] +pub fn btmsg_prune_seen() -> Result { + btmsg::prune_seen_messages(7 * 24 * 3600, 200_000) +} + // ---- Heartbeat monitoring ---- #[tauri::command] @@ -90,6 +107,11 @@ pub fn btmsg_get_stale_agents(group_id: String, threshold_secs: i64) -> Result Result, String> { + btmsg::get_agent_heartbeats(&group_id) +} + // ---- Dead letter queue ---- #[tauri::command] @@ -102,6 +124,16 @@ pub fn btmsg_clear_dead_letters(group_id: String) -> Result<(), String> { btmsg::clear_dead_letters(&group_id) } +#[tauri::command] +pub fn btmsg_queue_dead_letter( + from_agent: String, + to_agent: String, + content: String, + error: String, +) -> Result<(), String> { + btmsg::queue_dead_letter(&from_agent, &to_agent, &content, &error) +} + // ---- Audit log ---- #[tauri::command] diff --git a/v2/src-tauri/src/commands/remote.rs b/v2/src-tauri/src/commands/remote.rs index 5f7d3a6..91347f0 100644 --- a/v2/src-tauri/src/commands/remote.rs +++ b/v2/src-tauri/src/commands/remote.rs @@ -1,6 +1,6 @@ use tauri::State; use crate::AppState; -use crate::remote::{RemoteMachineConfig, RemoteMachineInfo}; +use crate::remote::{self, RemoteMachineConfig, RemoteMachineInfo}; use crate::pty::PtyOptions; use crate::sidecar::AgentQueryOptions; @@ -63,3 +63,23 @@ pub async fn remote_pty_resize(state: State<'_, AppState>, machine_id: String, i pub async fn remote_pty_kill(state: State<'_, AppState>, machine_id: String, id: String) -> Result<(), String> { state.remote_manager.pty_kill(&machine_id, &id).await } + +// --- SPKI certificate pinning --- + +#[tauri::command] +#[tracing::instrument] +pub async fn remote_probe_spki(url: String) -> Result { + remote::probe_spki_hash(&url).await +} + +#[tauri::command] +#[tracing::instrument(skip(state))] +pub async fn remote_add_pin(state: State<'_, AppState>, machine_id: String, pin: String) -> Result<(), String> { + state.remote_manager.add_spki_pin(&machine_id, pin).await +} + +#[tauri::command] +#[tracing::instrument(skip(state))] +pub async fn remote_remove_pin(state: State<'_, AppState>, machine_id: String, pin: String) -> Result<(), String> { + state.remote_manager.remove_spki_pin(&machine_id, &pin).await +} diff --git a/v2/src-tauri/src/commands/search.rs b/v2/src-tauri/src/commands/search.rs index 8ca6e33..2074d08 100644 --- a/v2/src-tauri/src/commands/search.rs +++ b/v2/src-tauri/src/commands/search.rs @@ -33,3 +33,27 @@ pub fn search_index_message( ) -> Result<(), String> { state.search_db.index_message(&session_id, &role, &content) } + +#[tauri::command] +pub fn search_index_task( + state: State<'_, AppState>, + task_id: String, + title: String, + description: String, + status: String, + assigned_to: String, +) -> Result<(), String> { + state.search_db.index_task(&task_id, &title, &description, &status, &assigned_to) +} + +#[tauri::command] +pub fn search_index_btmsg( + state: State<'_, AppState>, + msg_id: String, + from_agent: String, + to_agent: String, + content: String, + channel: String, +) -> Result<(), String> { + state.search_db.index_btmsg(&msg_id, &from_agent, &to_agent, &content, &channel) +} diff --git a/v2/src-tauri/src/ctx.rs b/v2/src-tauri/src/ctx.rs index 5aa725e..a9a7272 100644 --- a/v2/src-tauri/src/ctx.rs +++ b/v2/src-tauri/src/ctx.rs @@ -28,6 +28,7 @@ pub struct CtxDb { } impl CtxDb { + #[cfg(test)] fn default_db_path() -> PathBuf { dirs::home_dir() .unwrap_or_default() @@ -35,6 +36,7 @@ impl CtxDb { .join("context.db") } + #[cfg(test)] pub fn new() -> Self { Self::new_with_path(Self::default_db_path()) } diff --git a/v2/src-tauri/src/groups.rs b/v2/src-tauri/src/groups.rs index ffa7d97..cd5f8ba 100644 --- a/v2/src-tauri/src/groups.rs +++ b/v2/src-tauri/src/groups.rs @@ -226,6 +226,15 @@ mod tests { cwd: "/tmp/test".to_string(), profile: "default".to_string(), enabled: true, + provider: None, + model: None, + use_worktrees: None, + sandbox_enabled: None, + anchor_budget_scale: None, + stall_threshold_min: None, + is_agent: None, + agent_role: None, + system_prompt: None, }], agents: vec![], }], diff --git a/v2/src-tauri/src/lib.rs b/v2/src-tauri/src/lib.rs index 057ecd3..8e00846 100644 --- a/v2/src-tauri/src/lib.rs +++ b/v2/src-tauri/src/lib.rs @@ -248,6 +248,9 @@ pub fn run() { commands::remote::remote_pty_write, commands::remote::remote_pty_resize, commands::remote::remote_pty_kill, + commands::remote::remote_probe_spki, + commands::remote::remote_add_pin, + commands::remote::remote_remove_pin, // btmsg (agent messenger) commands::btmsg::btmsg_get_agents, commands::btmsg::btmsg_unread_count, @@ -264,11 +267,17 @@ pub fn run() { commands::btmsg::btmsg_create_channel, commands::btmsg::btmsg_add_channel_member, commands::btmsg::btmsg_register_agents, + // btmsg per-message acknowledgment + commands::btmsg::btmsg_unseen_messages, + commands::btmsg::btmsg_mark_seen, + commands::btmsg::btmsg_prune_seen, // btmsg health monitoring commands::btmsg::btmsg_record_heartbeat, commands::btmsg::btmsg_get_stale_agents, + commands::btmsg::btmsg_get_agent_heartbeats, commands::btmsg::btmsg_get_dead_letters, commands::btmsg::btmsg_clear_dead_letters, + commands::btmsg::btmsg_queue_dead_letter, // Audit log commands::btmsg::audit_log_event, commands::btmsg::audit_log_list, @@ -286,6 +295,8 @@ pub fn run() { commands::search::search_query, commands::search::search_rebuild, commands::search::search_index_message, + commands::search::search_index_task, + commands::search::search_index_btmsg, // Notifications commands::notifications::notify_desktop, // Secrets (system keyring) diff --git a/v2/src-tauri/src/memora.rs b/v2/src-tauri/src/memora.rs index 5297c90..45dca15 100644 --- a/v2/src-tauri/src/memora.rs +++ b/v2/src-tauri/src/memora.rs @@ -26,6 +26,7 @@ pub struct MemoraDb { } impl MemoraDb { + #[cfg(test)] fn default_db_path() -> std::path::PathBuf { dirs::data_dir() .unwrap_or_else(|| dirs::home_dir().unwrap_or_default().join(".local/share")) @@ -33,6 +34,7 @@ impl MemoraDb { .join("memories.db") } + #[cfg(test)] pub fn new() -> Self { Self::new_with_path(Self::default_db_path()) } diff --git a/v2/src-tauri/src/plugins.rs b/v2/src-tauri/src/plugins.rs index 488abae..d9bf95f 100644 --- a/v2/src-tauri/src/plugins.rs +++ b/v2/src-tauri/src/plugins.rs @@ -3,7 +3,7 @@ // Each plugin lives in its own subdirectory with a plugin.json manifest. use serde::{Deserialize, Serialize}; -use std::path::{Path, PathBuf}; +use std::path::Path; /// Plugin manifest — parsed from plugin.json #[derive(Debug, Clone, Serialize, Deserialize)] @@ -137,11 +137,6 @@ pub fn read_plugin_file( .map_err(|e| format!("Failed to read plugin file: {e}")) } -/// Get the plugins directory path from a config directory -pub fn plugins_dir(config_dir: &Path) -> PathBuf { - config_dir.join("plugins") -} - #[cfg(test)] mod tests { use super::*; @@ -257,9 +252,4 @@ mod tests { assert!(result.is_err()); } - #[test] - fn test_plugins_dir_path() { - let config = Path::new("/home/user/.config/bterminal"); - assert_eq!(plugins_dir(config), PathBuf::from("/home/user/.config/bterminal/plugins")); - } } diff --git a/v2/src-tauri/src/remote.rs b/v2/src-tauri/src/remote.rs index f5aa668..77789fd 100644 --- a/v2/src-tauri/src/remote.rs +++ b/v2/src-tauri/src/remote.rs @@ -4,6 +4,7 @@ use bterminal_core::pty::PtyOptions; use bterminal_core::sidecar::AgentQueryOptions; use futures_util::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; +use sha2::{Sha256, Digest}; use std::collections::HashMap; use std::sync::Arc; use tauri::{AppHandle, Emitter}; @@ -16,6 +17,9 @@ pub struct RemoteMachineConfig { pub url: String, pub token: String, pub auto_connect: bool, + /// SPKI SHA-256 pin(s) for certificate verification. Empty = TOFU on first connect. + #[serde(default)] + pub spki_pins: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -25,6 +29,8 @@ pub struct RemoteMachineInfo { pub url: String, pub status: String, pub auto_connect: bool, + /// Currently stored SPKI pin hashes (hex-encoded SHA-256) + pub spki_pins: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -79,6 +85,7 @@ impl RemoteManager { url: m.config.url.clone(), status: m.status.clone(), auto_connect: m.config.auto_connect, + spki_pins: m.config.spki_pins.clone(), }).collect() } @@ -110,8 +117,28 @@ impl RemoteManager { Ok(()) } + /// Add an SPKI pin hash to a machine's trusted pins. + pub async fn add_spki_pin(&self, machine_id: &str, pin: String) -> Result<(), String> { + let mut machines = self.machines.lock().await; + let machine = machines.get_mut(machine_id) + .ok_or_else(|| format!("Machine {machine_id} not found"))?; + if !machine.config.spki_pins.contains(&pin) { + machine.config.spki_pins.push(pin); + } + Ok(()) + } + + /// Remove an SPKI pin hash from a machine's trusted pins. + pub async fn remove_spki_pin(&self, machine_id: &str, pin: &str) -> Result<(), String> { + let mut machines = self.machines.lock().await; + let machine = machines.get_mut(machine_id) + .ok_or_else(|| format!("Machine {machine_id} not found"))?; + machine.config.spki_pins.retain(|p| p != pin); + Ok(()) + } + pub async fn connect(&self, app: &AppHandle, machine_id: &str) -> Result<(), String> { - let (url, token) = { + let (url, token, spki_pins) = { let mut machines = self.machines.lock().await; let machine = machines.get_mut(machine_id) .ok_or_else(|| format!("Machine {machine_id} not found"))?; @@ -121,9 +148,60 @@ impl RemoteManager { machine.status = "connecting".to_string(); // Reset cancellation flag for new connection machine.cancelled.store(false, std::sync::atomic::Ordering::Relaxed); - (machine.config.url.clone(), machine.config.token.clone()) + (machine.config.url.clone(), machine.config.token.clone(), machine.config.spki_pins.clone()) }; + // SPKI certificate pin verification for wss:// connections + if url.starts_with("wss://") { + if !spki_pins.is_empty() { + // Verify server certificate against stored pins + let server_hash = probe_spki_hash(&url).await.map_err(|e| { + // Reset status on probe failure + let machines = self.machines.clone(); + let mid = machine_id.to_string(); + tauri::async_runtime::spawn(async move { + let mut machines = machines.lock().await; + if let Some(machine) = machines.get_mut(&mid) { + machine.status = "disconnected".to_string(); + } + }); + format!("SPKI probe failed: {e}") + })?; + if !spki_pins.contains(&server_hash) { + // Pin mismatch — possible MITM or certificate rotation + let mut machines = self.machines.lock().await; + if let Some(machine) = machines.get_mut(machine_id) { + machine.status = "disconnected".to_string(); + } + return Err(format!( + "SPKI pin mismatch! Server certificate hash '{server_hash}' does not match \ + any trusted pin. This may indicate a MITM attack or certificate rotation. \ + Update the pin in Settings if this is expected." + )); + } + log::info!("SPKI pin verified for machine {machine_id}"); + } else { + // TOFU: no pins stored — probe and auto-store on first wss:// connect + match probe_spki_hash(&url).await { + Ok(hash) => { + log::info!("TOFU: storing SPKI pin for machine {machine_id}: {hash}"); + let mut machines = self.machines.lock().await; + if let Some(machine) = machines.get_mut(machine_id) { + machine.config.spki_pins.push(hash.clone()); + } + let _ = app.emit("remote-spki-tofu", &serde_json::json!({ + "machineId": machine_id, + "hash": hash, + })); + } + Err(e) => { + log::warn!("TOFU: failed to probe SPKI hash for {machine_id}: {e}"); + // Continue without pinning — non-blocking + } + } + } + } + // Build WebSocket request with auth header let request = tokio_tungstenite::tungstenite::http::Request::builder() .uri(&url) @@ -430,6 +508,57 @@ impl RemoteManager { } } +/// Probe a relay server's TLS certificate and return its SHA-256 hash (hex-encoded). +/// Connects with a permissive TLS config to extract the certificate, then hashes it. +/// Only works for wss:// URLs. +pub async fn probe_spki_hash(url: &str) -> Result { + let host = extract_host(url).ok_or_else(|| "Invalid URL".to_string())?; + let hostname = host.split(':').next().unwrap_or(&host).to_string(); + let addr = if host.contains(':') { + host.clone() + } else { + format!("{host}:9750") + }; + + // Build a permissive TLS connector to get the certificate regardless of CA trust + let connector = native_tls::TlsConnector::builder() + .danger_accept_invalid_certs(true) + .build() + .map_err(|e| format!("TLS connector error: {e}"))?; + let connector = tokio_native_tls::TlsConnector::from(connector); + + let tcp = tokio::time::timeout( + std::time::Duration::from_secs(5), + tokio::net::TcpStream::connect(&addr), + ) + .await + .map_err(|_| "Connection timeout".to_string())? + .map_err(|e| format!("TCP connect failed: {e}"))?; + + let tls_stream = connector + .connect(&hostname, tcp) + .await + .map_err(|e| format!("TLS handshake failed: {e}"))?; + + // Extract peer certificate DER bytes + let cert = tls_stream + .get_ref() + .peer_certificate() + .map_err(|e| format!("Failed to get peer certificate: {e}"))? + .ok_or_else(|| "No peer certificate presented".to_string())?; + + let cert_der = cert + .to_der() + .map_err(|e| format!("Failed to encode certificate DER: {e}"))?; + + // SHA-256 hash of the full DER-encoded certificate + let mut hasher = Sha256::new(); + hasher.update(&cert_der); + let hash = hasher.finalize(); + + Ok(hex::encode(hash)) +} + /// Probe whether a relay is reachable via TCP connect only (no WS upgrade). /// This avoids allocating per-connection resources (PtyManager, SidecarManager) on the relay. async fn attempt_tcp_probe(url: &str) -> Result<(), String> { diff --git a/v2/src-tauri/src/search.rs b/v2/src-tauri/src/search.rs index f4116cd..2838b25 100644 --- a/v2/src-tauri/src/search.rs +++ b/v2/src-tauri/src/search.rs @@ -89,6 +89,7 @@ impl SearchDb { } /// Index a task into the search_tasks FTS5 table. + #[allow(dead_code)] // Called via Tauri IPC command search_index_task pub fn index_task( &self, task_id: &str, @@ -108,6 +109,7 @@ impl SearchDb { } /// Index a btmsg message into the search_btmsg FTS5 table. + #[allow(dead_code)] // Called via Tauri IPC command search_index_btmsg pub fn index_btmsg( &self, msg_id: &str, @@ -264,7 +266,6 @@ fn chrono_now() -> String { #[cfg(test)] mod tests { use super::*; - use std::path::PathBuf; fn temp_search_db() -> (SearchDb, tempfile::TempDir) { let dir = tempfile::tempdir().unwrap(); diff --git a/v2/src/lib/adapters/btmsg-bridge.ts b/v2/src/lib/adapters/btmsg-bridge.ts index a3e97ad..579cb8d 100644 --- a/v2/src/lib/adapters/btmsg-bridge.ts +++ b/v2/src/lib/adapters/btmsg-bridge.ts @@ -169,6 +169,29 @@ export async function registerAgents(config: import('../types/groups').GroupsFil return invoke('btmsg_register_agents', { config }); } +// ---- Per-message acknowledgment (seen_messages) ---- + +/** + * Get messages not yet seen by this session (per-session tracking). + */ +export async function getUnseenMessages(agentId: AgentId, sessionId: string): Promise { + return invoke('btmsg_unseen_messages', { agentId, sessionId }); +} + +/** + * Mark specific message IDs as seen by this session. + */ +export async function markMessagesSeen(sessionId: string, messageIds: string[]): Promise { + return invoke('btmsg_mark_seen', { sessionId, messageIds }); +} + +/** + * Prune old seen_messages entries (7-day default, emergency 3-day at 200k rows). + */ +export async function pruneSeen(): Promise { + return invoke('btmsg_prune_seen'); +} + // ---- Heartbeat monitoring ---- /** diff --git a/v2/src/lib/adapters/remote-bridge.ts b/v2/src/lib/adapters/remote-bridge.ts index 51a66a9..e56cd91 100644 --- a/v2/src/lib/adapters/remote-bridge.ts +++ b/v2/src/lib/adapters/remote-bridge.ts @@ -8,6 +8,8 @@ export interface RemoteMachineConfig { url: string; token: string; auto_connect: boolean; + /** SPKI SHA-256 pin(s) for certificate verification. Empty = TOFU on first connect. */ + spki_pins?: string[]; } export interface RemoteMachineInfo { @@ -16,6 +18,8 @@ export interface RemoteMachineInfo { url: string; status: string; auto_connect: boolean; + /** Currently stored SPKI pin hashes (hex-encoded SHA-256) */ + spki_pins: string[]; } // --- Machine management --- @@ -40,6 +44,23 @@ export async function disconnectRemoteMachine(machineId: string): Promise return invoke('remote_disconnect', { machineId }); } +// --- SPKI certificate pinning --- + +/** Probe a relay server's TLS certificate and return its SHA-256 hash (hex-encoded). */ +export async function probeSpki(url: string): Promise { + return invoke('remote_probe_spki', { url }); +} + +/** Add an SPKI pin hash to a machine's trusted pins. */ +export async function addSpkiPin(machineId: string, pin: string): Promise { + return invoke('remote_add_pin', { machineId, pin }); +} + +/** Remove an SPKI pin hash from a machine's trusted pins. */ +export async function removeSpkiPin(machineId: string, pin: string): Promise { + return invoke('remote_remove_pin', { machineId, pin }); +} + // --- Remote event listeners --- export interface RemoteSidecarMessage { @@ -141,3 +162,19 @@ export async function onRemoteMachineReconnectReady( callback(event.payload); }); } + +// --- SPKI TOFU event --- + +export interface RemoteSpkiTofuEvent { + machineId: string; + hash: string; +} + +/** Listen for TOFU (Trust On First Use) events when a new SPKI pin is auto-stored. */ +export async function onRemoteSpkiTofu( + callback: (msg: RemoteSpkiTofuEvent) => void, +): Promise { + return listen('remote-spki-tofu', (event) => { + callback(event.payload); + }); +} diff --git a/v2/src/lib/components/Agent/AgentPane.svelte b/v2/src/lib/components/Agent/AgentPane.svelte index 05890dc..255961e 100644 --- a/v2/src/lib/components/Agent/AgentPane.svelte +++ b/v2/src/lib/components/Agent/AgentPane.svelte @@ -62,6 +62,8 @@ model?: string; /** Extra env vars injected into agent process (e.g. BTMSG_AGENT_ID) */ extraEnv?: Record; + /** Shell execution mode for AI agents. 'restricted' blocks auto-exec; 'autonomous' allows it. */ + autonomousMode?: 'restricted' | 'autonomous'; /** Auto-triggered prompt (e.g. periodic context refresh). Picked up when agent is idle. */ autoPrompt?: string; /** Called when autoPrompt has been consumed */ @@ -69,7 +71,7 @@ onExit?: () => void; } - let { sessionId, projectId, prompt: initialPrompt = '', cwd: initialCwd, profile: profileName, provider: providerId = 'claude', capabilities = DEFAULT_CAPABILITIES, useWorktrees = false, agentSystemPrompt, model: modelOverride, extraEnv, autoPrompt, onautopromptconsumed, onExit }: Props = $props(); + let { sessionId, projectId, prompt: initialPrompt = '', cwd: initialCwd, profile: profileName, provider: providerId = 'claude', capabilities = DEFAULT_CAPABILITIES, useWorktrees = false, agentSystemPrompt, model: modelOverride, extraEnv, autonomousMode, autoPrompt, onautopromptconsumed, onExit }: Props = $props(); let session = $derived(getAgentSession(sessionId)); let inputPrompt = $state(initialPrompt); @@ -213,6 +215,7 @@ system_prompt: systemPrompt, model: modelOverride || undefined, worktree_name: useWorktrees ? sessionId : undefined, + provider_config: { autonomousMode: autonomousMode ?? 'restricted' }, extra_env: extraEnv, }); inputPrompt = ''; diff --git a/v2/src/lib/components/Workspace/AgentSession.svelte b/v2/src/lib/components/Workspace/AgentSession.svelte index a4f2e98..9e749e6 100644 --- a/v2/src/lib/components/Workspace/AgentSession.svelte +++ b/v2/src/lib/components/Workspace/AgentSession.svelte @@ -27,7 +27,7 @@ import { getProvider, getDefaultProviderId } from '../../providers/registry.svelte'; import { loadAnchorsForProject } from '../../stores/anchors.svelte'; import { getSecret } from '../../adapters/secrets-bridge'; - import { getUnreadCount } from '../../adapters/btmsg-bridge'; + import { getUnseenMessages, markMessagesSeen } from '../../adapters/btmsg-bridge'; import { getWakeEvent, consumeWakeEvent, updateManagerSession } from '../../stores/wake-scheduler.svelte'; import { SessionId, ProjectId } from '../../types/ids'; import AgentPane from '../Agent/AgentPane.svelte'; @@ -161,26 +161,35 @@ bttask comment "update" # Add a comment stopAgent(sessionId).catch(() => {}); }); - // btmsg inbox polling — auto-wake agent when it receives messages from other agents + // btmsg inbox polling — per-message acknowledgment wake mechanism + // Uses seen_messages table for per-session tracking instead of global unread count. + // Every unseen message triggers exactly one wake, regardless of timing. let msgPollTimer: ReturnType | null = null; - let lastKnownUnread = 0; function startMsgPoll() { if (msgPollTimer) clearInterval(msgPollTimer); msgPollTimer = setInterval(async () => { if (contextRefreshPrompt) return; // Don't queue if already has a pending prompt try { - const count = await getUnreadCount(project.id as unknown as AgentId); - if (count > 0 && count > lastKnownUnread) { - lastKnownUnread = count; - contextRefreshPrompt = `[New Message] You have ${count} unread message(s). Check your inbox with \`btmsg inbox\` and respond appropriately.`; + const unseen = await getUnseenMessages( + project.id as unknown as AgentId, + sessionId, + ); + if (unseen.length > 0) { + // Build a prompt with the actual message contents + const msgSummary = unseen.map(m => + `From ${m.senderName ?? m.fromAgent} (${m.senderRole ?? 'unknown'}): ${m.content}` + ).join('\n'); + contextRefreshPrompt = `[New Messages] You have ${unseen.length} unread message(s):\n\n${msgSummary}\n\nRespond appropriately using \`btmsg send "reply"\`.`; + + // Mark as seen immediately to prevent re-injection + await markMessagesSeen(sessionId, unseen.map(m => m.id)); + logAuditEvent( project.id as unknown as AgentId, 'wake_event', - `Agent woken by ${count} unread btmsg message(s)`, + `Agent woken by ${unseen.length} btmsg message(s)`, ).catch(() => {}); - } else if (count === 0) { - lastKnownUnread = 0; } } catch { // btmsg not available, ignore @@ -345,6 +354,7 @@ bttask comment "update" # Add a comment agentSystemPrompt={agentPrompt} model={project.model} extraEnv={agentEnv} + autonomousMode={project.autonomousMode} autoPrompt={contextRefreshPrompt} onautopromptconsumed={handleAutoPromptConsumed} onExit={handleNewSession} diff --git a/v2/src/lib/components/Workspace/SettingsTab.svelte b/v2/src/lib/components/Workspace/SettingsTab.svelte index dfca0a8..8a29fe0 100644 --- a/v2/src/lib/components/Workspace/SettingsTab.svelte +++ b/v2/src/lib/components/Workspace/SettingsTab.svelte @@ -1150,6 +1150,27 @@ {/if} {/if} +
+ + + Shell Execution + +
+ + +
+
+
@@ -1440,6 +1461,27 @@
+
+ + + Shell Execution + +
+ + +
+
+
diff --git a/v2/src/lib/types/groups.ts b/v2/src/lib/types/groups.ts index 2053161..e685236 100644 --- a/v2/src/lib/types/groups.ts +++ b/v2/src/lib/types/groups.ts @@ -20,6 +20,8 @@ export interface ProjectConfig { useWorktrees?: boolean; /** When true, sidecar process is sandboxed via Landlock (Linux 5.13+, restricts filesystem access) */ sandboxEnabled?: boolean; + /** Shell execution mode for AI agents. 'restricted' (default) surfaces commands for approval; 'autonomous' auto-executes with audit logging */ + autonomousMode?: 'restricted' | 'autonomous'; /** Anchor token budget scale (defaults to 'medium' = 6K tokens) */ anchorBudgetScale?: AnchorBudgetScale; /** Stall detection threshold in minutes (defaults to 15) */ @@ -56,6 +58,7 @@ export function agentToProject(agent: GroupAgentConfig, groupCwd: string): Proje isAgent: true, agentRole: agent.role, systemPrompt: agent.systemPrompt, + autonomousMode: agent.autonomousMode, }; } @@ -83,6 +86,8 @@ export interface GroupAgentConfig { wakeStrategy?: WakeStrategy; /** Wake threshold 0..1 for smart strategy (default 0.5) */ wakeThreshold?: number; + /** Shell execution mode. 'restricted' (default) surfaces commands for approval; 'autonomous' auto-executes with audit logging */ + autonomousMode?: 'restricted' | 'autonomous'; } export interface GroupConfig { diff --git a/v2/vite.config.ts b/v2/vite.config.ts index 6a5bb20..379edce 100644 --- a/v2/vite.config.ts +++ b/v2/vite.config.ts @@ -9,6 +9,6 @@ export default defineConfig({ }, clearScreen: false, test: { - include: ['src/**/*.test.ts'], + include: ['src/**/*.test.ts', 'sidecar/**/*.test.ts'], }, })