fix(ui): move websocket writer off the UI loop to stop input/chat lag

Outgoing frames were drained inline in the main select! loop with a blocking
sink.send().await. While a sandbox streams its PTY to the room, those _sbx:data
frames flood the socket; if the server backpressures (e.g. relaying to a remote
peer), each await stalled the loop, so keystrokes and incoming chat arrived in
laggy bursts. Hand the write half to a dedicated writer task; reconnect passes
it a fresh sink. Disconnects are still detected by the reader (Net::Closed).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
leetcrypt 2026-06-01 23:04:59 -07:00
parent ee9d0f7ff9
commit ea67796551

View File

@ -497,13 +497,19 @@ impl Drop for TermGuard {
pub async fn run(params: net::ConnParams, mut session: Session, mut theme: Theme) -> Result<()> { pub async fn run(params: net::ConnParams, mut session: Session, mut theme: Theme) -> Result<()> {
let (tx, mut rx) = unbounded_channel::<Net>(); let (tx, mut rx) = unbounded_channel::<Net>();
let app_tx = tx.clone(); let app_tx = tx.clone();
let mut write = net::open(&session, tx.clone()).await?; let write = net::open(&session, tx.clone()).await?;
// Carries the result of a background reconnect handshake back to the loop. // Carries the result of a background reconnect handshake back to the loop.
let (recon_tx, mut recon_rx) = unbounded_channel::<std::result::Result<Session, String>>(); let (recon_tx, mut recon_rx) = unbounded_channel::<std::result::Result<Session, String>>();
// All outgoing frames funnel through here so background tasks (file chunks, // All outgoing frames funnel through here so background tasks (file chunks,
// PTY relay) can transmit without owning the socket. // PTY relay) can transmit without owning the socket.
let (out_tx, mut out_rx) = unbounded_channel::<WsMsg>(); let (out_tx, out_rx) = unbounded_channel::<WsMsg>();
// The websocket writer runs on its own task so a slow / backpressured socket
// (notably while relaying a sandbox PTY stream to a remote peer) can never
// stall the UI loop's keyboard + chat handling. On reconnect the loop hands
// the writer a fresh sink through `sink_tx`.
let (sink_tx, sink_rx) = unbounded_channel::<net::WsSink>();
tokio::spawn(writer_task(write, out_rx, sink_rx));
let (pty_tx, mut pty_rx): (UnboundedSender<Vec<u8>>, UnboundedReceiver<Vec<u8>>) = let (pty_tx, mut pty_rx): (UnboundedSender<Vec<u8>>, UnboundedReceiver<Vec<u8>>) =
unbounded_channel(); unbounded_channel();
let (broker_tx, mut broker_rx) = unbounded_channel::<BrokerMsg>(); let (broker_tx, mut broker_rx) = unbounded_channel::<BrokerMsg>();
@ -789,7 +795,7 @@ pub async fn run(params: net::ConnParams, mut session: Session, mut theme: Theme
session = s; session = s;
match net::open(&session, tx.clone()).await { match net::open(&session, tx.clone()).await {
Ok(w) => { Ok(w) => {
write = w; let _ = sink_tx.send(w);
app.sys("⛧ websocket re-attached — syncing…"); app.sys("⛧ websocket re-attached — syncing…");
// If we host the sandbox, re-announce it so the // If we host the sandbox, re-announce it so the
// rest of the house re-syncs the shared shell. // rest of the house re-syncs the shared shell.
@ -823,22 +829,6 @@ pub async fn run(params: net::ConnParams, mut session: Session, mut theme: Theme
send_frame(&out_tx, &session.room, json!({"_sbx":"data","b64": STANDARD.encode(&bytes)})); send_frame(&out_tx, &session.room, json!({"_sbx":"data","b64": STANDARD.encode(&bytes)}));
} }
} }
outgoing = out_rx.recv() => {
if let Some(first) = outgoing {
// Flush a batch to keep file-chunk bursts from redrawing per frame.
let mut batch = vec![first];
while let Ok(m) = out_rx.try_recv() {
batch.push(m);
if batch.len() >= 64 { break; }
}
for m in batch {
if write.send(m).await.is_err() {
app.connected = false;
break;
}
}
}
}
_ = sigterm.recv() => { break Ok(()); } _ = sigterm.recv() => { break Ok(()); }
_ = sighup.recv() => { break Ok(()); } _ = sighup.recv() => { break Ok(()); }
_ = tick.tick() => { app.spin = app.spin.wrapping_add(1); } _ = tick.tick() => { app.spin = app.spin.wrapping_add(1); }
@ -876,6 +866,45 @@ enum BrokerMsg {
Failed, Failed,
} }
/// Owns the websocket write half and drains all outgoing frames off the UI
/// loop. Because `sink.send().await` can block on a backpressured socket (e.g.
/// while the server relays a sandbox PTY stream to a slow remote peer), keeping
/// it here means that stall never starves keyboard input or chat rendering. A
/// reconnect delivers a fresh sink via `sink_rx`; the reader task independently
/// surfaces `Net::Closed`, so a dead sink here just drops frames until then.
async fn writer_task(
mut sink: net::WsSink,
mut out_rx: UnboundedReceiver<WsMsg>,
mut sink_rx: UnboundedReceiver<net::WsSink>,
) {
loop {
tokio::select! {
biased;
// Swap in a reconnect's fresh sink before draining more frames.
new_sink = sink_rx.recv() => {
match new_sink {
Some(s) => sink = s,
None => {}
}
}
msg = out_rx.recv() => {
let Some(first) = msg else { return }; // app exiting
// Coalesce a burst (file chunks / PTY relay) into one batch.
let mut batch = vec![first];
while let Ok(m) = out_rx.try_recv() {
batch.push(m);
if batch.len() >= 64 { break; }
}
for m in batch {
if sink.send(m).await.is_err() {
break; // sink dead — wait for a reconnect to replace it
}
}
}
}
}
}
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn handle_command( fn handle_command(
line: &str, line: &str,