From b0cce7ae4f177d73516fa5d12840a59ddf9feacb Mon Sep 17 00:00:00 2001 From: Hibryda Date: Fri, 6 Mar 2026 19:49:19 +0100 Subject: [PATCH] feat(v2): add relay response propagation and reconnection with exponential backoff Relay (bterminal-relay): command handlers now send structured responses (pty_created, pong, error) back via shared event channel with commandId for correlation. New send_error() helper replaces log-only error reporting. RemoteManager (remote.rs): exponential backoff reconnection on disconnect (1s/2s/4s/8s/16s/30s cap). Uses attempt_ws_connect() probe with 5s timeout. Emits remote-machine-reconnecting and remote-machine-reconnect-ready events. Handles pty_created relay event as remote-pty-created Tauri event. --- v2/bterminal-relay/src/main.rs | 54 +++++++++++++----- v2/src-tauri/src/remote.rs | 100 +++++++++++++++++++++++++++++++-- 2 files changed, 134 insertions(+), 20 deletions(-) diff --git a/v2/bterminal-relay/src/main.rs b/v2/bterminal-relay/src/main.rs index 472289b..ec03f96 100644 --- a/v2/bterminal-relay/src/main.rs +++ b/v2/bterminal-relay/src/main.rs @@ -165,8 +165,9 @@ async fn handle_connection( log::info!("Client connected: {peer}"); - // Set up event channel + // Set up event channel — shared between EventSink and command response sender let (event_tx, mut event_rx) = mpsc::unbounded_channel::(); + let sink_tx = event_tx.clone(); let sink: Arc = Arc::new(WsEventSink { tx: event_tx }); // Create managers for this connection @@ -204,12 +205,13 @@ async fn handle_connection( // Process incoming commands let pty_mgr = pty_manager.clone(); let sidecar_mgr = sidecar_manager.clone(); + let response_tx = sink_tx; let command_reader = tokio::spawn(async move { while let Some(msg) = ws_rx.next().await { match msg { Ok(Message::Text(text)) => { if let Ok(cmd) = serde_json::from_str::(&text) { - handle_relay_command(&pty_mgr, &sidecar_mgr, cmd).await; + handle_relay_command(&pty_mgr, &sidecar_mgr, &response_tx, cmd).await; } } Ok(Message::Close(_)) => break, @@ -238,25 +240,35 @@ async fn handle_connection( async fn handle_relay_command( pty: &PtyManager, sidecar: &SidecarManager, + response_tx: &mpsc::UnboundedSender, cmd: RelayCommand, ) { match cmd.type_.as_str() { "ping" => { - // Pong is handled by the event sink — no-op here since we'd need - // the event channel. The WsEventSink handles it via the ready signal pattern. - // For pong, we emit directly. + let _ = response_tx.send(RelayEvent { + type_: "pong".to_string(), + session_id: None, + payload: None, + }); } "pty_create" => { let options: PtyOptions = match serde_json::from_value(cmd.payload) { Ok(opts) => opts, Err(e) => { - log::error!("Invalid pty_create payload: {e}"); + send_error(response_tx, &cmd.id, &format!("Invalid pty_create payload: {e}")); return; } }; match pty.spawn(options) { - Ok(id) => log::info!("Spawned remote PTY: {id}"), - Err(e) => log::error!("Failed to spawn remote PTY: {e}"), + Ok(pty_id) => { + log::info!("Spawned remote PTY: {pty_id}"); + let _ = response_tx.send(RelayEvent { + type_: "pty_created".to_string(), + session_id: Some(pty_id), + payload: Some(serde_json::json!({ "commandId": cmd.id })), + }); + } + Err(e) => send_error(response_tx, &cmd.id, &format!("Failed to spawn PTY: {e}")), } } "pty_write" => { @@ -265,7 +277,7 @@ async fn handle_relay_command( cmd.payload.get("data").and_then(|v| v.as_str()), ) { if let Err(e) = pty.write(id, data) { - log::error!("Remote PTY write error: {e}"); + send_error(response_tx, &cmd.id, &format!("PTY write error: {e}")); } } } @@ -276,14 +288,14 @@ async fn handle_relay_command( cmd.payload.get("rows").and_then(|v| v.as_u64()), ) { if let Err(e) = pty.resize(id, cols as u16, rows as u16) { - log::error!("Remote PTY resize error: {e}"); + send_error(response_tx, &cmd.id, &format!("PTY resize error: {e}")); } } } "pty_close" => { if let Some(id) = cmd.payload.get("id").and_then(|v| v.as_str()) { if let Err(e) = pty.kill(id) { - log::error!("Remote PTY kill error: {e}"); + send_error(response_tx, &cmd.id, &format!("PTY kill error: {e}")); } } } @@ -291,24 +303,24 @@ async fn handle_relay_command( let options: AgentQueryOptions = match serde_json::from_value(cmd.payload) { Ok(opts) => opts, Err(e) => { - log::error!("Invalid agent_query payload: {e}"); + send_error(response_tx, &cmd.id, &format!("Invalid agent_query payload: {e}")); return; } }; if let Err(e) = sidecar.query(&options) { - log::error!("Remote agent query error: {e}"); + send_error(response_tx, &cmd.id, &format!("Agent query error: {e}")); } } "agent_stop" => { if let Some(session_id) = cmd.payload.get("sessionId").and_then(|v| v.as_str()) { if let Err(e) = sidecar.stop_session(session_id) { - log::error!("Remote agent stop error: {e}"); + send_error(response_tx, &cmd.id, &format!("Agent stop error: {e}")); } } } "sidecar_restart" => { if let Err(e) = sidecar.restart() { - log::error!("Remote sidecar restart error: {e}"); + send_error(response_tx, &cmd.id, &format!("Sidecar restart error: {e}")); } } other => { @@ -316,3 +328,15 @@ async fn handle_relay_command( } } } + +fn send_error(tx: &mpsc::UnboundedSender, cmd_id: &str, message: &str) { + log::error!("{message}"); + let _ = tx.send(RelayEvent { + type_: "error".to_string(), + session_id: None, + payload: Some(serde_json::json!({ + "commandId": cmd_id, + "message": message, + })), + }); +} diff --git a/v2/src-tauri/src/remote.rs b/v2/src-tauri/src/remote.rs index c2726fb..e7e8dd8 100644 --- a/v2/src-tauri/src/remote.rs +++ b/v2/src-tauri/src/remote.rs @@ -196,6 +196,14 @@ impl RemoteManager { "payload": event.payload, })); } + "pty_created" => { + // Relay confirmed PTY spawn — emit with real PTY ID + let _ = app_handle.emit("remote-pty-created", &serde_json::json!({ + "machineId": mid, + "ptyId": event.session_id, + "commandId": event.payload.as_ref().and_then(|p| p.get("commandId")).and_then(|v| v.as_str()), + })); + } "pong" => {} // heartbeat response, ignore "error" => { let _ = app_handle.emit("remote-error", &serde_json::json!({ @@ -218,16 +226,71 @@ impl RemoteManager { } } - // Mark disconnected - if let Ok(mut machines) = machines_ref.try_lock() { - if let Some(machine) = machines.get_mut(&mid) { - machine.status = "disconnected".to_string(); - machine.connection = None; + // Mark disconnected and clear connection + { + if let Ok(mut machines) = machines_ref.try_lock() { + if let Some(machine) = machines.get_mut(&mid) { + machine.status = "disconnected".to_string(); + machine.connection = None; + } } } let _ = app_handle.emit("remote-machine-disconnected", &serde_json::json!({ "machineId": mid, })); + + // Exponential backoff reconnection (1s, 2s, 4s, 8s, 16s, 30s cap) + let reconnect_machines = machines_ref.clone(); + let reconnect_app = app_handle.clone(); + let reconnect_mid = mid.clone(); + tokio::spawn(async move { + let mut delay = std::time::Duration::from_secs(1); + let max_delay = std::time::Duration::from_secs(30); + + loop { + tokio::time::sleep(delay).await; + + // Check if machine still exists and wants reconnection + let should_reconnect = { + let machines = reconnect_machines.lock().await; + machines.get(&reconnect_mid) + .map(|m| m.status == "disconnected" && m.connection.is_none()) + .unwrap_or(false) + }; + + if !should_reconnect { + log::info!("Reconnection cancelled for machine {reconnect_mid}"); + break; + } + + log::info!("Attempting reconnection to {reconnect_mid} (backoff: {}s)", delay.as_secs()); + let _ = reconnect_app.emit("remote-machine-reconnecting", &serde_json::json!({ + "machineId": reconnect_mid, + "backoffSecs": delay.as_secs(), + })); + + // Try to get config for reconnection + let config = { + let machines = reconnect_machines.lock().await; + machines.get(&reconnect_mid).map(|m| (m.config.url.clone(), m.config.token.clone())) + }; + + if let Some((url, token)) = config { + if attempt_ws_connect(&url, &token).await.is_ok() { + log::info!("Reconnection probe succeeded for {reconnect_mid}"); + // Mark as ready for reconnection — frontend should call connect() + let _ = reconnect_app.emit("remote-machine-reconnect-ready", &serde_json::json!({ + "machineId": reconnect_mid, + })); + break; + } + } else { + break; // Machine removed + } + + delay = std::cmp::min(delay * 2, max_delay); + } + }); }); // Combine reader + writer into one handle @@ -349,6 +412,33 @@ impl RemoteManager { } } +/// Probe whether a relay is reachable (connect + immediate close). +async fn attempt_ws_connect(url: &str, token: &str) -> Result<(), String> { + let request = tokio_tungstenite::tungstenite::http::Request::builder() + .uri(url) + .header("Authorization", format!("Bearer {token}")) + .header("Sec-WebSocket-Key", tokio_tungstenite::tungstenite::handshake::client::generate_key()) + .header("Sec-WebSocket-Version", "13") + .header("Connection", "Upgrade") + .header("Upgrade", "websocket") + .header("Host", extract_host(url).unwrap_or_default()) + .body(()) + .map_err(|e| format!("Request build failed: {e}"))?; + + let (ws, _) = tokio::time::timeout( + std::time::Duration::from_secs(5), + tokio_tungstenite::connect_async(request), + ) + .await + .map_err(|_| "Connection timeout".to_string())? + .map_err(|e| format!("Connection failed: {e}"))?; + + // Close immediately — this was just a probe + let (mut tx, _) = ws.split(); + let _ = tx.close().await; + Ok(()) +} + fn extract_host(url: &str) -> Option { url.replace("wss://", "") .replace("ws://", "")