feat(v2): add relay response propagation and reconnection with exponential backoff
Relay (bterminal-relay): command handlers now send structured responses (pty_created, pong, error) back via shared event channel with commandId for correlation. New send_error() helper replaces log-only error reporting. RemoteManager (remote.rs): exponential backoff reconnection on disconnect (1s/2s/4s/8s/16s/30s cap). Uses attempt_ws_connect() probe with 5s timeout. Emits remote-machine-reconnecting and remote-machine-reconnect-ready events. Handles pty_created relay event as remote-pty-created Tauri event.
This commit is contained in:
parent
0a17c09a46
commit
b0cce7ae4f
2 changed files with 134 additions and 20 deletions
|
|
@ -165,8 +165,9 @@ async fn handle_connection(
|
||||||
|
|
||||||
log::info!("Client connected: {peer}");
|
log::info!("Client connected: {peer}");
|
||||||
|
|
||||||
// Set up event channel
|
// Set up event channel — shared between EventSink and command response sender
|
||||||
let (event_tx, mut event_rx) = mpsc::unbounded_channel::<RelayEvent>();
|
let (event_tx, mut event_rx) = mpsc::unbounded_channel::<RelayEvent>();
|
||||||
|
let sink_tx = event_tx.clone();
|
||||||
let sink: Arc<dyn EventSink> = Arc::new(WsEventSink { tx: event_tx });
|
let sink: Arc<dyn EventSink> = Arc::new(WsEventSink { tx: event_tx });
|
||||||
|
|
||||||
// Create managers for this connection
|
// Create managers for this connection
|
||||||
|
|
@ -204,12 +205,13 @@ async fn handle_connection(
|
||||||
// Process incoming commands
|
// Process incoming commands
|
||||||
let pty_mgr = pty_manager.clone();
|
let pty_mgr = pty_manager.clone();
|
||||||
let sidecar_mgr = sidecar_manager.clone();
|
let sidecar_mgr = sidecar_manager.clone();
|
||||||
|
let response_tx = sink_tx;
|
||||||
let command_reader = tokio::spawn(async move {
|
let command_reader = tokio::spawn(async move {
|
||||||
while let Some(msg) = ws_rx.next().await {
|
while let Some(msg) = ws_rx.next().await {
|
||||||
match msg {
|
match msg {
|
||||||
Ok(Message::Text(text)) => {
|
Ok(Message::Text(text)) => {
|
||||||
if let Ok(cmd) = serde_json::from_str::<RelayCommand>(&text) {
|
if let Ok(cmd) = serde_json::from_str::<RelayCommand>(&text) {
|
||||||
handle_relay_command(&pty_mgr, &sidecar_mgr, cmd).await;
|
handle_relay_command(&pty_mgr, &sidecar_mgr, &response_tx, cmd).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(Message::Close(_)) => break,
|
Ok(Message::Close(_)) => break,
|
||||||
|
|
@ -238,25 +240,35 @@ async fn handle_connection(
|
||||||
async fn handle_relay_command(
|
async fn handle_relay_command(
|
||||||
pty: &PtyManager,
|
pty: &PtyManager,
|
||||||
sidecar: &SidecarManager,
|
sidecar: &SidecarManager,
|
||||||
|
response_tx: &mpsc::UnboundedSender<RelayEvent>,
|
||||||
cmd: RelayCommand,
|
cmd: RelayCommand,
|
||||||
) {
|
) {
|
||||||
match cmd.type_.as_str() {
|
match cmd.type_.as_str() {
|
||||||
"ping" => {
|
"ping" => {
|
||||||
// Pong is handled by the event sink — no-op here since we'd need
|
let _ = response_tx.send(RelayEvent {
|
||||||
// the event channel. The WsEventSink handles it via the ready signal pattern.
|
type_: "pong".to_string(),
|
||||||
// For pong, we emit directly.
|
session_id: None,
|
||||||
|
payload: None,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
"pty_create" => {
|
"pty_create" => {
|
||||||
let options: PtyOptions = match serde_json::from_value(cmd.payload) {
|
let options: PtyOptions = match serde_json::from_value(cmd.payload) {
|
||||||
Ok(opts) => opts,
|
Ok(opts) => opts,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::error!("Invalid pty_create payload: {e}");
|
send_error(response_tx, &cmd.id, &format!("Invalid pty_create payload: {e}"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
match pty.spawn(options) {
|
match pty.spawn(options) {
|
||||||
Ok(id) => log::info!("Spawned remote PTY: {id}"),
|
Ok(pty_id) => {
|
||||||
Err(e) => log::error!("Failed to spawn remote PTY: {e}"),
|
log::info!("Spawned remote PTY: {pty_id}");
|
||||||
|
let _ = response_tx.send(RelayEvent {
|
||||||
|
type_: "pty_created".to_string(),
|
||||||
|
session_id: Some(pty_id),
|
||||||
|
payload: Some(serde_json::json!({ "commandId": cmd.id })),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(e) => send_error(response_tx, &cmd.id, &format!("Failed to spawn PTY: {e}")),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"pty_write" => {
|
"pty_write" => {
|
||||||
|
|
@ -265,7 +277,7 @@ async fn handle_relay_command(
|
||||||
cmd.payload.get("data").and_then(|v| v.as_str()),
|
cmd.payload.get("data").and_then(|v| v.as_str()),
|
||||||
) {
|
) {
|
||||||
if let Err(e) = pty.write(id, data) {
|
if let Err(e) = pty.write(id, data) {
|
||||||
log::error!("Remote PTY write error: {e}");
|
send_error(response_tx, &cmd.id, &format!("PTY write error: {e}"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -276,14 +288,14 @@ async fn handle_relay_command(
|
||||||
cmd.payload.get("rows").and_then(|v| v.as_u64()),
|
cmd.payload.get("rows").and_then(|v| v.as_u64()),
|
||||||
) {
|
) {
|
||||||
if let Err(e) = pty.resize(id, cols as u16, rows as u16) {
|
if let Err(e) = pty.resize(id, cols as u16, rows as u16) {
|
||||||
log::error!("Remote PTY resize error: {e}");
|
send_error(response_tx, &cmd.id, &format!("PTY resize error: {e}"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"pty_close" => {
|
"pty_close" => {
|
||||||
if let Some(id) = cmd.payload.get("id").and_then(|v| v.as_str()) {
|
if let Some(id) = cmd.payload.get("id").and_then(|v| v.as_str()) {
|
||||||
if let Err(e) = pty.kill(id) {
|
if let Err(e) = pty.kill(id) {
|
||||||
log::error!("Remote PTY kill error: {e}");
|
send_error(response_tx, &cmd.id, &format!("PTY kill error: {e}"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -291,24 +303,24 @@ async fn handle_relay_command(
|
||||||
let options: AgentQueryOptions = match serde_json::from_value(cmd.payload) {
|
let options: AgentQueryOptions = match serde_json::from_value(cmd.payload) {
|
||||||
Ok(opts) => opts,
|
Ok(opts) => opts,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::error!("Invalid agent_query payload: {e}");
|
send_error(response_tx, &cmd.id, &format!("Invalid agent_query payload: {e}"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if let Err(e) = sidecar.query(&options) {
|
if let Err(e) = sidecar.query(&options) {
|
||||||
log::error!("Remote agent query error: {e}");
|
send_error(response_tx, &cmd.id, &format!("Agent query error: {e}"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"agent_stop" => {
|
"agent_stop" => {
|
||||||
if let Some(session_id) = cmd.payload.get("sessionId").and_then(|v| v.as_str()) {
|
if let Some(session_id) = cmd.payload.get("sessionId").and_then(|v| v.as_str()) {
|
||||||
if let Err(e) = sidecar.stop_session(session_id) {
|
if let Err(e) = sidecar.stop_session(session_id) {
|
||||||
log::error!("Remote agent stop error: {e}");
|
send_error(response_tx, &cmd.id, &format!("Agent stop error: {e}"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"sidecar_restart" => {
|
"sidecar_restart" => {
|
||||||
if let Err(e) = sidecar.restart() {
|
if let Err(e) = sidecar.restart() {
|
||||||
log::error!("Remote sidecar restart error: {e}");
|
send_error(response_tx, &cmd.id, &format!("Sidecar restart error: {e}"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
other => {
|
other => {
|
||||||
|
|
@ -316,3 +328,15 @@ async fn handle_relay_command(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn send_error(tx: &mpsc::UnboundedSender<RelayEvent>, cmd_id: &str, message: &str) {
|
||||||
|
log::error!("{message}");
|
||||||
|
let _ = tx.send(RelayEvent {
|
||||||
|
type_: "error".to_string(),
|
||||||
|
session_id: None,
|
||||||
|
payload: Some(serde_json::json!({
|
||||||
|
"commandId": cmd_id,
|
||||||
|
"message": message,
|
||||||
|
})),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -196,6 +196,14 @@ impl RemoteManager {
|
||||||
"payload": event.payload,
|
"payload": event.payload,
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
"pty_created" => {
|
||||||
|
// Relay confirmed PTY spawn — emit with real PTY ID
|
||||||
|
let _ = app_handle.emit("remote-pty-created", &serde_json::json!({
|
||||||
|
"machineId": mid,
|
||||||
|
"ptyId": event.session_id,
|
||||||
|
"commandId": event.payload.as_ref().and_then(|p| p.get("commandId")).and_then(|v| v.as_str()),
|
||||||
|
}));
|
||||||
|
}
|
||||||
"pong" => {} // heartbeat response, ignore
|
"pong" => {} // heartbeat response, ignore
|
||||||
"error" => {
|
"error" => {
|
||||||
let _ = app_handle.emit("remote-error", &serde_json::json!({
|
let _ = app_handle.emit("remote-error", &serde_json::json!({
|
||||||
|
|
@ -218,16 +226,71 @@ impl RemoteManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mark disconnected
|
// Mark disconnected and clear connection
|
||||||
if let Ok(mut machines) = machines_ref.try_lock() {
|
{
|
||||||
if let Some(machine) = machines.get_mut(&mid) {
|
if let Ok(mut machines) = machines_ref.try_lock() {
|
||||||
machine.status = "disconnected".to_string();
|
if let Some(machine) = machines.get_mut(&mid) {
|
||||||
machine.connection = None;
|
machine.status = "disconnected".to_string();
|
||||||
|
machine.connection = None;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let _ = app_handle.emit("remote-machine-disconnected", &serde_json::json!({
|
let _ = app_handle.emit("remote-machine-disconnected", &serde_json::json!({
|
||||||
"machineId": mid,
|
"machineId": mid,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
// Exponential backoff reconnection (1s, 2s, 4s, 8s, 16s, 30s cap)
|
||||||
|
let reconnect_machines = machines_ref.clone();
|
||||||
|
let reconnect_app = app_handle.clone();
|
||||||
|
let reconnect_mid = mid.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut delay = std::time::Duration::from_secs(1);
|
||||||
|
let max_delay = std::time::Duration::from_secs(30);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::time::sleep(delay).await;
|
||||||
|
|
||||||
|
// Check if machine still exists and wants reconnection
|
||||||
|
let should_reconnect = {
|
||||||
|
let machines = reconnect_machines.lock().await;
|
||||||
|
machines.get(&reconnect_mid)
|
||||||
|
.map(|m| m.status == "disconnected" && m.connection.is_none())
|
||||||
|
.unwrap_or(false)
|
||||||
|
};
|
||||||
|
|
||||||
|
if !should_reconnect {
|
||||||
|
log::info!("Reconnection cancelled for machine {reconnect_mid}");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
log::info!("Attempting reconnection to {reconnect_mid} (backoff: {}s)", delay.as_secs());
|
||||||
|
let _ = reconnect_app.emit("remote-machine-reconnecting", &serde_json::json!({
|
||||||
|
"machineId": reconnect_mid,
|
||||||
|
"backoffSecs": delay.as_secs(),
|
||||||
|
}));
|
||||||
|
|
||||||
|
// Try to get config for reconnection
|
||||||
|
let config = {
|
||||||
|
let machines = reconnect_machines.lock().await;
|
||||||
|
machines.get(&reconnect_mid).map(|m| (m.config.url.clone(), m.config.token.clone()))
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some((url, token)) = config {
|
||||||
|
if attempt_ws_connect(&url, &token).await.is_ok() {
|
||||||
|
log::info!("Reconnection probe succeeded for {reconnect_mid}");
|
||||||
|
// Mark as ready for reconnection — frontend should call connect()
|
||||||
|
let _ = reconnect_app.emit("remote-machine-reconnect-ready", &serde_json::json!({
|
||||||
|
"machineId": reconnect_mid,
|
||||||
|
}));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break; // Machine removed
|
||||||
|
}
|
||||||
|
|
||||||
|
delay = std::cmp::min(delay * 2, max_delay);
|
||||||
|
}
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
// Combine reader + writer into one handle
|
// Combine reader + writer into one handle
|
||||||
|
|
@ -349,6 +412,33 @@ impl RemoteManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Probe whether a relay is reachable (connect + immediate close).
|
||||||
|
async fn attempt_ws_connect(url: &str, token: &str) -> Result<(), String> {
|
||||||
|
let request = tokio_tungstenite::tungstenite::http::Request::builder()
|
||||||
|
.uri(url)
|
||||||
|
.header("Authorization", format!("Bearer {token}"))
|
||||||
|
.header("Sec-WebSocket-Key", tokio_tungstenite::tungstenite::handshake::client::generate_key())
|
||||||
|
.header("Sec-WebSocket-Version", "13")
|
||||||
|
.header("Connection", "Upgrade")
|
||||||
|
.header("Upgrade", "websocket")
|
||||||
|
.header("Host", extract_host(url).unwrap_or_default())
|
||||||
|
.body(())
|
||||||
|
.map_err(|e| format!("Request build failed: {e}"))?;
|
||||||
|
|
||||||
|
let (ws, _) = tokio::time::timeout(
|
||||||
|
std::time::Duration::from_secs(5),
|
||||||
|
tokio_tungstenite::connect_async(request),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|_| "Connection timeout".to_string())?
|
||||||
|
.map_err(|e| format!("Connection failed: {e}"))?;
|
||||||
|
|
||||||
|
// Close immediately — this was just a probe
|
||||||
|
let (mut tx, _) = ws.split();
|
||||||
|
let _ = tx.close().await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn extract_host(url: &str) -> Option<String> {
|
fn extract_host(url: &str) -> Option<String> {
|
||||||
url.replace("wss://", "")
|
url.replace("wss://", "")
|
||||||
.replace("ws://", "")
|
.replace("ws://", "")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue