diff --git a/hh/src/app.rs b/hh/src/app.rs index e430bf7..1bc6596 100644 --- a/hh/src/app.rs +++ b/hh/src/app.rs @@ -497,13 +497,19 @@ impl Drop for TermGuard { pub async fn run(params: net::ConnParams, mut session: Session, mut theme: Theme) -> Result<()> { let (tx, mut rx) = unbounded_channel::(); let app_tx = tx.clone(); - let mut write = net::open(&session, tx.clone()).await?; + let write = net::open(&session, tx.clone()).await?; // Carries the result of a background reconnect handshake back to the loop. let (recon_tx, mut recon_rx) = unbounded_channel::>(); // All outgoing frames funnel through here so background tasks (file chunks, // PTY relay) can transmit without owning the socket. - let (out_tx, mut out_rx) = unbounded_channel::(); + let (out_tx, out_rx) = unbounded_channel::(); + // The websocket writer runs on its own task so a slow / backpressured socket + // (notably while relaying a sandbox PTY stream to a remote peer) can never + // stall the UI loop's keyboard + chat handling. On reconnect the loop hands + // the writer a fresh sink through `sink_tx`. + let (sink_tx, sink_rx) = unbounded_channel::(); + tokio::spawn(writer_task(write, out_rx, sink_rx)); let (pty_tx, mut pty_rx): (UnboundedSender>, UnboundedReceiver>) = unbounded_channel(); let (broker_tx, mut broker_rx) = unbounded_channel::(); @@ -789,7 +795,7 @@ pub async fn run(params: net::ConnParams, mut session: Session, mut theme: Theme session = s; match net::open(&session, tx.clone()).await { Ok(w) => { - write = w; + let _ = sink_tx.send(w); app.sys("⛧ websocket re-attached — syncing…"); // If we host the sandbox, re-announce it so the // rest of the house re-syncs the shared shell. @@ -823,22 +829,6 @@ pub async fn run(params: net::ConnParams, mut session: Session, mut theme: Theme send_frame(&out_tx, &session.room, json!({"_sbx":"data","b64": STANDARD.encode(&bytes)})); } } - outgoing = out_rx.recv() => { - if let Some(first) = outgoing { - // Flush a batch to keep file-chunk bursts from redrawing per frame. - let mut batch = vec![first]; - while let Ok(m) = out_rx.try_recv() { - batch.push(m); - if batch.len() >= 64 { break; } - } - for m in batch { - if write.send(m).await.is_err() { - app.connected = false; - break; - } - } - } - } _ = sigterm.recv() => { break Ok(()); } _ = sighup.recv() => { break Ok(()); } _ = tick.tick() => { app.spin = app.spin.wrapping_add(1); } @@ -876,6 +866,45 @@ enum BrokerMsg { Failed, } +/// Owns the websocket write half and drains all outgoing frames off the UI +/// loop. Because `sink.send().await` can block on a backpressured socket (e.g. +/// while the server relays a sandbox PTY stream to a slow remote peer), keeping +/// it here means that stall never starves keyboard input or chat rendering. A +/// reconnect delivers a fresh sink via `sink_rx`; the reader task independently +/// surfaces `Net::Closed`, so a dead sink here just drops frames until then. +async fn writer_task( + mut sink: net::WsSink, + mut out_rx: UnboundedReceiver, + mut sink_rx: UnboundedReceiver, +) { + loop { + tokio::select! { + biased; + // Swap in a reconnect's fresh sink before draining more frames. + new_sink = sink_rx.recv() => { + match new_sink { + Some(s) => sink = s, + None => {} + } + } + msg = out_rx.recv() => { + let Some(first) = msg else { return }; // app exiting + // Coalesce a burst (file chunks / PTY relay) into one batch. + let mut batch = vec![first]; + while let Ok(m) = out_rx.try_recv() { + batch.push(m); + if batch.len() >= 64 { break; } + } + for m in batch { + if sink.send(m).await.is_err() { + break; // sink dead — wait for a reconnect to replace it + } + } + } + } + } +} + #[allow(clippy::too_many_arguments)] fn handle_command( line: &str,