feat(ai): stream agent replies token-by-token to the room

Closes the cross-language half of token streaming (perf-plan A3). On the
CPU-only box perceived latency is time-to-first-token, so showing the reply
as it generates makes a slow model feel live.

- Agent: OllamaProvider.stream() runs on a worker thread; bridge relays
  cumulative previews as throttled (~5/sec) `_ai:"stream"` control frames,
  then a `done` frame clears the preview as the final persisted chat message
  is posted. Providers without stream() fall back to blocking complete().
- Rust client: new Net::AiStream variant + parse_ai branch; App.ai_stream
  map holds the in-progress text per agent; draw_chat renders it as a dim,
  italic preview bubble below history. Cleared on done and on agent leave.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
leetcrypt 2026-06-02 22:42:08 -07:00
parent 26c651e9ac
commit 69bce5ead8
5 changed files with 116 additions and 20 deletions

View File

@ -232,6 +232,13 @@ class AgentBridge(Client):
frame = json.dumps({"_ai": "typing", "name": self.name, "on": on})
await ws.send(self.room_fernet.encrypt(frame.encode()).decode())
async def _send_stream(self, ws, text: str, done: bool) -> None:
"""Emit an incremental reply preview. ``text`` is the reply so far
(cumulative); ``done`` clears the client's live bubble. A control frame —
never stored or shown as a permanent chat line."""
frame = json.dumps({"_ai": "stream", "name": self.name, "text": text, "done": done})
await ws.send(self.room_fernet.encrypt(frame.encode()).decode())
async def _send_chat(self, ws, text: str) -> None:
await ws.send(self.room_fernet.encrypt(text.encode()).decode())
@ -385,23 +392,62 @@ class AgentBridge(Client):
self.success(f"answered /ai {question.strip().lower()} for {asker}")
return
self.transcript.append(Msg("user", f"{asker}: {question}"))
context = await self._model_messages(question)
stream_fn = getattr(self.provider, "stream", None)
await self._send_typing(ws, True)
try:
context = await self._model_messages(question)
reply = await asyncio.to_thread(
self.provider.complete,
self.system_prompt,
context,
)
if stream_fn is not None:
reply = await self._stream_reply(ws, stream_fn, context)
else:
reply = await asyncio.to_thread(
self.provider.complete, self.system_prompt, context)
except Exception as e: # noqa: BLE001 — surface any provider failure in-room
reply = f"[ai error: {e}]"
finally:
await self._send_typing(ws, False)
if stream_fn is not None:
await self._send_stream(ws, "", True) # clear the live preview
reply = reply.strip() or "[empty reply]"
self.transcript.append(Msg("assistant", reply))
await self._send_chat(ws, reply)
self.success(f"replied to {asker}")
async def _stream_reply(self, ws, stream_fn, context: list[Msg]) -> str:
"""Run the provider's blocking token generator on a worker thread and
relay cumulative previews to the room, throttled so a fast model can't
flood the websocket. Returns the full reply text."""
loop = asyncio.get_running_loop()
q: asyncio.Queue = asyncio.Queue()
done = object()
def produce():
try:
for piece in stream_fn(self.system_prompt, context):
loop.call_soon_threadsafe(q.put_nowait, piece)
except Exception as e: # noqa: BLE001 — relay failure to the consumer
loop.call_soon_threadsafe(q.put_nowait, e)
finally:
loop.call_soon_threadsafe(q.put_nowait, done)
worker = loop.run_in_executor(None, produce)
parts: list[str] = []
last_emit = 0.0
try:
while True:
item = await q.get()
if item is done:
break
if isinstance(item, Exception):
raise item
parts.append(item)
now = loop.time()
if now - last_emit >= 0.2: # ~5 previews/sec max
await self._send_stream(ws, "".join(parts), False)
last_emit = now
finally:
await worker
return "".join(parts)
async def run_async(self) -> None:
self.srp_authenticate()
url = f"{self.ws_url}/ws/chat?user_id={self.user_id}&ws_token={self.ws_token}"

View File

@ -17,11 +17,12 @@ KV-cache quant) are no-ops here.
on CPU); `token_budget` default 3000→2000 to fit. `--num-ctx`, `--num-thread`,
`--num-predict` flags added. `num_thread` defaults to Ollama's own (= physical
cores, 4 here); benchmark 4/6/8.
3. **Token streaming.** *(partial — provider half done)* `OllamaProvider.stream()`
now yields deltas from Ollama's `stream=True` chat endpoint. Still TODO (commit 2):
have the agent emit `_ai:"stream"` delta frames and the Rust client render an
in-progress bubble. On CPU, perceived latency is TTFT — this will make a slow
reply feel live.
3. **Token streaming.** *(done)* `OllamaProvider.stream()` yields deltas from
Ollama's `stream=True` chat endpoint; the agent relays them as throttled
(~5/sec) cumulative `_ai:"stream"` frames off a worker thread, and the Rust
client renders a dim in-progress preview bubble (cleared by a `done` frame
when the final, persisted message lands). On CPU, perceived latency is TTFT —
this makes a slow reply feel live.
4. **Keep model warm + single-flight.** *(partial)* `keep_alive` already 30m
(prevents mid-session reload). `OLLAMA_NUM_PARALLEL=1` is a **server-side env**
read by `ollama serve`, not settable from the agent — set it where Ollama is

View File

@ -96,6 +96,14 @@ pub enum Net {
name: String,
on: bool,
},
/// Incremental reply text from a streaming AI agent. `text` is the reply so
/// far (cumulative); `done` clears the live preview (the final, persisted
/// chat message arrives separately as a normal `Message`).
AiStream {
name: String,
text: String,
done: bool,
},
/// A local system notice produced off-thread (e.g. async Ollama probe).
Sys(String),
Err(String),
@ -141,6 +149,9 @@ pub struct App {
pub password: String,
/// AI agents currently generating a reply — drives the "thinking" spinner.
pub ai_typing: std::collections::HashSet<String>,
/// Live, in-progress reply text per streaming agent, shown as a transient
/// preview bubble until the final message lands. Keyed by agent name.
pub ai_stream: std::collections::HashMap<String, String>,
/// Monotonic tick counter used to animate the AI spinner.
pub spin: usize,
/// When set, agents we summon are auto-granted sandbox drive on each launch
@ -173,6 +184,7 @@ impl App {
error: None,
password: String::new(),
ai_typing: std::collections::HashSet::new(),
ai_stream: std::collections::HashMap::new(),
spin: 0,
agent_sbx_allow: false,
}
@ -238,6 +250,7 @@ impl App {
if let Some(p) = self.users.iter().position(|u| u.user_id == uid) {
let name = self.users.remove(p).username;
self.ai_typing.remove(&name); // a departed agent isn't thinking
self.ai_stream.remove(&name); // …nor streaming a reply
self.sys(format!("{name} left"));
}
}
@ -248,6 +261,16 @@ impl App {
self.ai_typing.remove(&name);
}
}
Net::AiStream { name, text, done } => {
if done {
self.ai_stream.remove(&name);
} else {
// Streaming has started → drop the bare "thinking" spinner;
// the live preview now signals the agent is working.
self.ai_typing.remove(&name);
self.ai_stream.insert(name, text);
}
}
Net::SbxStatus {
backend,
ready,

View File

@ -227,17 +227,24 @@ fn parse_sbx(text: &str, sender: &str) -> Option<Net> {
}
}
/// Parse a decrypted `{"_ai":"typing",...}` frame — an AI agent signalling that
/// it is (or has finished) generating a reply, so the UI can show a spinner.
/// Parse a decrypted `{"_ai":...}` frame from an AI agent. `"typing"` toggles the
/// thinking spinner; `"stream"` carries the cumulative reply text for a live
/// preview bubble (`done` clears it once the final message is posted).
fn parse_ai(text: &str) -> Option<Net> {
let v: Value = serde_json::from_str(text).ok()?;
if v["_ai"].as_str()? != "typing" {
return None;
let name = || v["name"].as_str().unwrap_or("ai").to_string();
match v["_ai"].as_str()? {
"typing" => Some(Net::AiTyping {
name: name(),
on: v["on"].as_bool().unwrap_or(false),
}),
"stream" => Some(Net::AiStream {
name: name(),
text: v["text"].as_str().unwrap_or("").to_string(),
done: v["done"].as_bool().unwrap_or(false),
}),
_ => None,
}
Some(Net::AiTyping {
name: v["name"].as_str().unwrap_or("ai").to_string(),
on: v["on"].as_bool().unwrap_or(false),
})
}
/// Parse a decrypted `{"_perm":"acl",...}` frame.

View File

@ -357,7 +357,26 @@ fn fmt_line<'a>(l: &'a ChatLine, app: &App, theme: &Theme) -> Line<'a> {
fn draw_chat(f: &mut Frame, area: ratatui::layout::Rect, app: &App, theme: &Theme) {
let inner_h = area.height.saturating_sub(2) as usize; // rows inside the border
let text_w = area.width.saturating_sub(2).max(1); // wrap width inside the border
let lines: Vec<Line> = app.lines.iter().map(|l| fmt_line(l, app, theme)).collect();
let mut lines: Vec<Line> = app.lines.iter().map(|l| fmt_line(l, app, theme)).collect();
// Live preview bubbles for agents currently streaming a reply, rendered
// below the committed history. Dim + italic so they read as in-progress;
// replaced by the real message once the agent posts it.
let mut streaming: Vec<(&String, &String)> = app.ai_stream.iter().collect();
streaming.sort_unstable_by_key(|(name, _)| name.as_str());
for (name, text) in streaming {
lines.push(Line::from(vec![
Span::styled(
format!("{name} "),
Style::default().fg(theme.other).add_modifier(Modifier::BOLD),
),
Span::styled("", Style::default().fg(theme.dim)),
Span::styled(
text.as_str(),
Style::default().fg(theme.dim).add_modifier(Modifier::ITALIC),
),
]));
}
// Measure the TRUE wrapped height and scroll to the bottom. Selecting N
// logical lines and letting the Paragraph top-anchor them clips any wrapped