Was only updating cached dimensions without calling PTY resize. Shell thought terminal was wrong size → double prompts, escape code leaks. - Session stores master PTY handle (Arc<Mutex<Box<dyn MasterPty>>>) - resize() calls master.resize(PtySize) → issues TIOCSWINSZ - Reader task no longer owns master handle (uses cloned reader only)
281 lines
8.5 KiB
Rust
281 lines
8.5 KiB
Rust
use std::collections::HashMap;
|
|
use std::io::{Read, Write as IoWrite};
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
use std::sync::Arc;
|
|
use std::time::{SystemTime, UNIX_EPOCH};
|
|
|
|
use portable_pty::{native_pty_system, CommandBuilder, MasterPty, PtySize};
|
|
use tokio::sync::{broadcast, Mutex};
|
|
|
|
use crate::protocol::SessionInfo;
|
|
|
|
const OUTPUT_CHANNEL_CAP: usize = 256;
|
|
|
|
/// A live (or recently exited) PTY session.
|
|
pub struct Session {
|
|
pub id: String,
|
|
pub pid: u32,
|
|
pub shell: String,
|
|
pub cwd: String,
|
|
pub cols: u16,
|
|
pub rows: u16,
|
|
pub created_at: u64,
|
|
writer: Arc<Mutex<Box<dyn IoWrite + Send>>>,
|
|
/// Master PTY handle — needed for resize (TIOCSWINSZ).
|
|
master: Arc<Mutex<Box<dyn MasterPty + Send>>>,
|
|
pub tx: broadcast::Sender<Vec<u8>>,
|
|
pub alive: Arc<AtomicBool>,
|
|
#[allow(dead_code)]
|
|
pub exit_code: Arc<Mutex<Option<i32>>>,
|
|
}
|
|
|
|
impl Session {
|
|
/// Snapshot metadata for ListSessions responses.
|
|
pub fn snapshot(&self) -> SessionInfo {
|
|
SessionInfo {
|
|
id: self.id.clone(),
|
|
pid: self.pid,
|
|
shell: self.shell.clone(),
|
|
cwd: self.cwd.clone(),
|
|
cols: self.cols,
|
|
rows: self.rows,
|
|
created_at: self.created_at,
|
|
alive: self.alive.load(Ordering::Relaxed),
|
|
}
|
|
}
|
|
|
|
/// Write raw bytes into the PTY master (keyboard input, paste, etc.).
|
|
pub async fn write_input(&self, data: &[u8]) -> Result<(), String> {
|
|
let mut w = self.writer.lock().await;
|
|
w.write_all(data)
|
|
.map_err(|e| format!("PTY write for {}: {e}", self.id))
|
|
}
|
|
|
|
/// Resize the PTY (issues TIOCSWINSZ) and update cached dimensions.
|
|
pub async fn resize(&mut self, cols: u16, rows: u16) -> Result<(), String> {
|
|
let master = self.master.lock().await;
|
|
master.resize(PtySize { rows, cols, pixel_width: 0, pixel_height: 0 })
|
|
.map_err(|e| format!("PTY resize for {}: {e}", self.id))?;
|
|
self.cols = cols;
|
|
self.rows = rows;
|
|
Ok(())
|
|
}
|
|
|
|
/// Return a new receiver subscribed to this session's broadcast output.
|
|
pub fn subscribe(&self) -> broadcast::Receiver<Vec<u8>> {
|
|
self.tx.subscribe()
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Session manager
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/// Owns the full set of PTY sessions.
|
|
pub struct SessionManager {
|
|
sessions: HashMap<String, Session>,
|
|
default_shell: String,
|
|
}
|
|
|
|
impl SessionManager {
|
|
pub fn new(default_shell: String) -> Self {
|
|
Self {
|
|
sessions: HashMap::new(),
|
|
default_shell,
|
|
}
|
|
}
|
|
|
|
/// Spawn a new PTY session.
|
|
///
|
|
/// Returns `(pid, output_rx)` on success. `on_exit` is called from the
|
|
/// blocking reader task once the child process exits.
|
|
pub fn create_session(
|
|
&mut self,
|
|
id: String,
|
|
shell: Option<String>,
|
|
cwd: Option<String>,
|
|
env: Option<HashMap<String, String>>,
|
|
cols: u16,
|
|
rows: u16,
|
|
on_exit: impl FnOnce(String, Option<i32>) + Send + 'static,
|
|
) -> Result<(u32, broadcast::Receiver<Vec<u8>>), String> {
|
|
if self.sessions.contains_key(&id) {
|
|
return Err(format!("session {id} already exists"));
|
|
}
|
|
|
|
let shell_path = shell.unwrap_or_else(|| self.default_shell.clone());
|
|
let pty_system = native_pty_system();
|
|
let pair = pty_system
|
|
.openpty(PtySize {
|
|
rows,
|
|
cols,
|
|
pixel_width: 0,
|
|
pixel_height: 0,
|
|
})
|
|
.map_err(|e| format!("openpty: {e}"))?;
|
|
|
|
let mut cmd = CommandBuilder::new(&shell_path);
|
|
if let Some(ref dir) = cwd {
|
|
cmd.cwd(dir);
|
|
}
|
|
if let Some(ref vars) = env {
|
|
for (k, v) in vars {
|
|
cmd.env(k, v);
|
|
}
|
|
}
|
|
|
|
let child = pair
|
|
.slave
|
|
.spawn_command(cmd)
|
|
.map_err(|e| format!("spawn: {e}"))?;
|
|
|
|
let pid = child.process_id().unwrap_or(0);
|
|
|
|
let cwd_str = cwd.unwrap_or_else(|| {
|
|
std::env::current_dir()
|
|
.map(|p| p.to_string_lossy().into_owned())
|
|
.unwrap_or_else(|_| "/".into())
|
|
});
|
|
|
|
// Take the writer before moving `pair.master` into the reader task.
|
|
let writer = pair
|
|
.master
|
|
.take_writer()
|
|
.map_err(|e| format!("take_writer: {e}"))?;
|
|
|
|
// Clone a reader; the master handle itself moves into the blocking task
|
|
// so the PTY stays open until the reader is done.
|
|
let reader = pair
|
|
.master
|
|
.try_clone_reader()
|
|
.map_err(|e| format!("clone_reader: {e}"))?;
|
|
|
|
let (tx, rx) = broadcast::channel(OUTPUT_CHANNEL_CAP);
|
|
let alive = Arc::new(AtomicBool::new(true));
|
|
let exit_code = Arc::new(Mutex::new(None::<i32>));
|
|
|
|
// Keep a reference to the master for resize operations.
|
|
// The reader task gets the master handle to keep the PTY fd alive.
|
|
let master_for_session: Box<dyn MasterPty + Send> = pair.master;
|
|
|
|
// Spawn the blocking reader task.
|
|
let tx_clone = tx.clone();
|
|
let alive_clone = alive.clone();
|
|
let exit_code_clone = exit_code.clone();
|
|
let id_clone = id.clone();
|
|
tokio::task::spawn_blocking(move || {
|
|
read_pty_output(
|
|
reader,
|
|
tx_clone,
|
|
alive_clone,
|
|
exit_code_clone,
|
|
id_clone,
|
|
on_exit,
|
|
child,
|
|
);
|
|
});
|
|
|
|
let session = Session {
|
|
id: id.clone(),
|
|
pid,
|
|
shell: shell_path,
|
|
cwd: cwd_str,
|
|
cols,
|
|
rows,
|
|
created_at: unix_now(),
|
|
writer: Arc::new(Mutex::new(writer)),
|
|
master: Arc::new(Mutex::new(master_for_session)),
|
|
tx,
|
|
alive,
|
|
exit_code,
|
|
};
|
|
|
|
log::info!("created session {id} pid={pid}");
|
|
self.sessions.insert(id, session);
|
|
Ok((pid, rx))
|
|
}
|
|
|
|
pub fn get(&self, id: &str) -> Option<&Session> {
|
|
self.sessions.get(id)
|
|
}
|
|
|
|
pub fn get_mut(&mut self, id: &str) -> Option<&mut Session> {
|
|
self.sessions.get_mut(id)
|
|
}
|
|
|
|
pub fn list(&self) -> Vec<SessionInfo> {
|
|
self.sessions.values().map(|s| s.snapshot()).collect()
|
|
}
|
|
|
|
/// Remove a session entry. The reader task will notice the PTY is closed
|
|
/// and stop on its own.
|
|
pub fn close_session(&mut self, id: &str) -> Result<(), String> {
|
|
if self.sessions.remove(id).is_some() {
|
|
log::info!("closed session {id}");
|
|
Ok(())
|
|
} else {
|
|
Err(format!("session {id} not found"))
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Private helpers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
fn unix_now() -> u64 {
|
|
SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.map(|d| d.as_secs())
|
|
.unwrap_or(0)
|
|
}
|
|
|
|
/// Blocking PTY reader — lives inside `tokio::task::spawn_blocking`.
|
|
///
|
|
/// `_master` is held here so the PTY file descriptor is not closed until this
|
|
/// task finishes.
|
|
#[allow(clippy::too_many_arguments)]
|
|
fn read_pty_output(
|
|
mut reader: Box<dyn Read + Send>,
|
|
tx: broadcast::Sender<Vec<u8>>,
|
|
alive: Arc<AtomicBool>,
|
|
exit_code_cell: Arc<Mutex<Option<i32>>>,
|
|
id: String,
|
|
on_exit: impl FnOnce(String, Option<i32>),
|
|
mut child: Box<dyn portable_pty::Child + Send>,
|
|
) {
|
|
let mut buf = [0u8; 4096];
|
|
loop {
|
|
match reader.read(&mut buf) {
|
|
Ok(0) => break,
|
|
Ok(n) => {
|
|
let _ = tx.send(buf[..n].to_vec());
|
|
}
|
|
Err(e) => {
|
|
log::debug!("session {id} reader error: {e}");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
alive.store(false, Ordering::Relaxed);
|
|
|
|
// `exit_code()` on portable-pty returns u32 directly (not Option).
|
|
let code = child
|
|
.wait()
|
|
.ok()
|
|
.map(|status| status.exit_code() as i32);
|
|
|
|
// Write exit code using try_lock spin — the lock is never held for long.
|
|
loop {
|
|
if let Ok(mut guard) = exit_code_cell.try_lock() {
|
|
*guard = code;
|
|
break;
|
|
}
|
|
std::thread::sleep(std::time::Duration::from_millis(1));
|
|
}
|
|
|
|
log::info!("session {id} exited with code {code:?}");
|
|
on_exit(id, code);
|
|
// `_master` drops here — PTY closed.
|
|
}
|