use std::collections::HashMap; use std::io::{Read, Write as IoWrite}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use portable_pty::{native_pty_system, CommandBuilder, MasterPty, PtySize}; use tokio::sync::{broadcast, Mutex}; use crate::protocol::SessionInfo; const OUTPUT_CHANNEL_CAP: usize = 256; /// A live (or recently exited) PTY session. pub struct Session { pub id: String, pub pid: u32, pub shell: String, pub cwd: String, pub cols: u16, pub rows: u16, pub created_at: u64, writer: Arc>>, /// Master PTY handle — needed for resize (TIOCSWINSZ). master: Arc>>, pub tx: broadcast::Sender>, pub alive: Arc, #[allow(dead_code)] pub exit_code: Arc>>, } impl Session { /// Snapshot metadata for ListSessions responses. pub fn snapshot(&self) -> SessionInfo { SessionInfo { id: self.id.clone(), pid: self.pid, shell: self.shell.clone(), cwd: self.cwd.clone(), cols: self.cols, rows: self.rows, created_at: self.created_at, alive: self.alive.load(Ordering::Relaxed), } } /// Write raw bytes into the PTY master (keyboard input, paste, etc.). pub async fn write_input(&self, data: &[u8]) -> Result<(), String> { let mut w = self.writer.lock().await; w.write_all(data) .map_err(|e| format!("PTY write for {}: {e}", self.id)) } /// Resize the PTY (issues TIOCSWINSZ) and update cached dimensions. pub async fn resize(&mut self, cols: u16, rows: u16) -> Result<(), String> { let master = self.master.lock().await; master.resize(PtySize { rows, cols, pixel_width: 0, pixel_height: 0 }) .map_err(|e| format!("PTY resize for {}: {e}", self.id))?; self.cols = cols; self.rows = rows; Ok(()) } /// Return a new receiver subscribed to this session's broadcast output. pub fn subscribe(&self) -> broadcast::Receiver> { self.tx.subscribe() } } // --------------------------------------------------------------------------- // Session manager // --------------------------------------------------------------------------- /// Owns the full set of PTY sessions. pub struct SessionManager { sessions: HashMap, default_shell: String, } impl SessionManager { pub fn new(default_shell: String) -> Self { Self { sessions: HashMap::new(), default_shell, } } /// Spawn a new PTY session. /// /// Returns `(pid, output_rx)` on success. `on_exit` is called from the /// blocking reader task once the child process exits. pub fn create_session( &mut self, id: String, shell: Option, cwd: Option, env: Option>, cols: u16, rows: u16, on_exit: impl FnOnce(String, Option) + Send + 'static, ) -> Result<(u32, broadcast::Receiver>), String> { if self.sessions.contains_key(&id) { return Err(format!("session {id} already exists")); } let shell_path = shell.unwrap_or_else(|| self.default_shell.clone()); let pty_system = native_pty_system(); let pair = pty_system .openpty(PtySize { rows, cols, pixel_width: 0, pixel_height: 0, }) .map_err(|e| format!("openpty: {e}"))?; let mut cmd = CommandBuilder::new(&shell_path); if let Some(ref dir) = cwd { cmd.cwd(dir); } if let Some(ref vars) = env { for (k, v) in vars { cmd.env(k, v); } } let child = pair .slave .spawn_command(cmd) .map_err(|e| format!("spawn: {e}"))?; let pid = child.process_id().unwrap_or(0); let cwd_str = cwd.unwrap_or_else(|| { std::env::current_dir() .map(|p| p.to_string_lossy().into_owned()) .unwrap_or_else(|_| "/".into()) }); // Take the writer before moving `pair.master` into the reader task. let writer = pair .master .take_writer() .map_err(|e| format!("take_writer: {e}"))?; // Clone a reader; the master handle itself moves into the blocking task // so the PTY stays open until the reader is done. let reader = pair .master .try_clone_reader() .map_err(|e| format!("clone_reader: {e}"))?; let (tx, rx) = broadcast::channel(OUTPUT_CHANNEL_CAP); let alive = Arc::new(AtomicBool::new(true)); let exit_code = Arc::new(Mutex::new(None::)); // Keep a reference to the master for resize operations. // The reader task gets the master handle to keep the PTY fd alive. let master_for_session: Box = pair.master; // Spawn the blocking reader task. let tx_clone = tx.clone(); let alive_clone = alive.clone(); let exit_code_clone = exit_code.clone(); let id_clone = id.clone(); tokio::task::spawn_blocking(move || { read_pty_output( reader, tx_clone, alive_clone, exit_code_clone, id_clone, on_exit, child, ); }); let session = Session { id: id.clone(), pid, shell: shell_path, cwd: cwd_str, cols, rows, created_at: unix_now(), writer: Arc::new(Mutex::new(writer)), master: Arc::new(Mutex::new(master_for_session)), tx, alive, exit_code, }; log::info!("created session {id} pid={pid}"); self.sessions.insert(id, session); Ok((pid, rx)) } pub fn get(&self, id: &str) -> Option<&Session> { self.sessions.get(id) } pub fn get_mut(&mut self, id: &str) -> Option<&mut Session> { self.sessions.get_mut(id) } pub fn list(&self) -> Vec { self.sessions.values().map(|s| s.snapshot()).collect() } /// Remove a session entry. The reader task will notice the PTY is closed /// and stop on its own. pub fn close_session(&mut self, id: &str) -> Result<(), String> { if self.sessions.remove(id).is_some() { log::info!("closed session {id}"); Ok(()) } else { Err(format!("session {id} not found")) } } } // --------------------------------------------------------------------------- // Private helpers // --------------------------------------------------------------------------- fn unix_now() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) .map(|d| d.as_secs()) .unwrap_or(0) } /// Blocking PTY reader — lives inside `tokio::task::spawn_blocking`. /// /// `_master` is held here so the PTY file descriptor is not closed until this /// task finishes. #[allow(clippy::too_many_arguments)] fn read_pty_output( mut reader: Box, tx: broadcast::Sender>, alive: Arc, exit_code_cell: Arc>>, id: String, on_exit: impl FnOnce(String, Option), mut child: Box, ) { let mut buf = [0u8; 4096]; loop { match reader.read(&mut buf) { Ok(0) => break, Ok(n) => { let _ = tx.send(buf[..n].to_vec()); } Err(e) => { log::debug!("session {id} reader error: {e}"); break; } } } alive.store(false, Ordering::Relaxed); // `exit_code()` on portable-pty returns u32 directly (not Option). let code = child .wait() .ok() .map(|status| status.exit_code() as i32); // Write exit code using try_lock spin — the lock is never held for long. loop { if let Ok(mut guard) = exit_code_cell.try_lock() { *guard = code; break; } std::thread::sleep(std::time::Duration::from_millis(1)); } log::info!("session {id} exited with code {code:?}"); on_exit(id, code); // `_master` drops here — PTY closed. }