From 69bce5ead8409d1c453bd0e0a45a41088a900dfc Mon Sep 17 00:00:00 2001 From: leetcrypt Date: Tue, 2 Jun 2026 22:42:08 -0700 Subject: [PATCH] feat(ai): stream agent replies token-by-token to the room Closes the cross-language half of token streaming (perf-plan A3). On the CPU-only box perceived latency is time-to-first-token, so showing the reply as it generates makes a slow model feel live. - Agent: OllamaProvider.stream() runs on a worker thread; bridge relays cumulative previews as throttled (~5/sec) `_ai:"stream"` control frames, then a `done` frame clears the preview as the final persisted chat message is posted. Providers without stream() fall back to blocking complete(). - Rust client: new Net::AiStream variant + parse_ai branch; App.ai_stream map holds the in-progress text per agent; draw_chat renders it as a dim, italic preview bubble below history. Cleared on done and on agent leave. Co-Authored-By: Claude Opus 4.6 --- cmd_chat/agent/bridge.py | 58 +++++++++++++++++++++++++++++++++++----- docs/ai-perf-plan.md | 11 ++++---- hh/src/app.rs | 23 ++++++++++++++++ hh/src/net.rs | 23 ++++++++++------ hh/src/ui.rs | 21 ++++++++++++++- 5 files changed, 116 insertions(+), 20 deletions(-) diff --git a/cmd_chat/agent/bridge.py b/cmd_chat/agent/bridge.py index 358d58c..3cdbe59 100644 --- a/cmd_chat/agent/bridge.py +++ b/cmd_chat/agent/bridge.py @@ -232,6 +232,13 @@ class AgentBridge(Client): frame = json.dumps({"_ai": "typing", "name": self.name, "on": on}) await ws.send(self.room_fernet.encrypt(frame.encode()).decode()) + async def _send_stream(self, ws, text: str, done: bool) -> None: + """Emit an incremental reply preview. ``text`` is the reply so far + (cumulative); ``done`` clears the client's live bubble. A control frame — + never stored or shown as a permanent chat line.""" + frame = json.dumps({"_ai": "stream", "name": self.name, "text": text, "done": done}) + await ws.send(self.room_fernet.encrypt(frame.encode()).decode()) + async def _send_chat(self, ws, text: str) -> None: await ws.send(self.room_fernet.encrypt(text.encode()).decode()) @@ -385,23 +392,62 @@ class AgentBridge(Client): self.success(f"answered /ai {question.strip().lower()} for {asker}") return self.transcript.append(Msg("user", f"{asker}: {question}")) + context = await self._model_messages(question) + stream_fn = getattr(self.provider, "stream", None) await self._send_typing(ws, True) try: - context = await self._model_messages(question) - reply = await asyncio.to_thread( - self.provider.complete, - self.system_prompt, - context, - ) + if stream_fn is not None: + reply = await self._stream_reply(ws, stream_fn, context) + else: + reply = await asyncio.to_thread( + self.provider.complete, self.system_prompt, context) except Exception as e: # noqa: BLE001 — surface any provider failure in-room reply = f"[ai error: {e}]" finally: await self._send_typing(ws, False) + if stream_fn is not None: + await self._send_stream(ws, "", True) # clear the live preview reply = reply.strip() or "[empty reply]" self.transcript.append(Msg("assistant", reply)) await self._send_chat(ws, reply) self.success(f"replied to {asker}") + async def _stream_reply(self, ws, stream_fn, context: list[Msg]) -> str: + """Run the provider's blocking token generator on a worker thread and + relay cumulative previews to the room, throttled so a fast model can't + flood the websocket. Returns the full reply text.""" + loop = asyncio.get_running_loop() + q: asyncio.Queue = asyncio.Queue() + done = object() + + def produce(): + try: + for piece in stream_fn(self.system_prompt, context): + loop.call_soon_threadsafe(q.put_nowait, piece) + except Exception as e: # noqa: BLE001 — relay failure to the consumer + loop.call_soon_threadsafe(q.put_nowait, e) + finally: + loop.call_soon_threadsafe(q.put_nowait, done) + + worker = loop.run_in_executor(None, produce) + parts: list[str] = [] + last_emit = 0.0 + try: + while True: + item = await q.get() + if item is done: + break + if isinstance(item, Exception): + raise item + parts.append(item) + now = loop.time() + if now - last_emit >= 0.2: # ~5 previews/sec max + await self._send_stream(ws, "".join(parts), False) + last_emit = now + finally: + await worker + return "".join(parts) + async def run_async(self) -> None: self.srp_authenticate() url = f"{self.ws_url}/ws/chat?user_id={self.user_id}&ws_token={self.ws_token}" diff --git a/docs/ai-perf-plan.md b/docs/ai-perf-plan.md index a016e70..c804978 100644 --- a/docs/ai-perf-plan.md +++ b/docs/ai-perf-plan.md @@ -17,11 +17,12 @@ KV-cache quant) are no-ops here. on CPU); `token_budget` default 3000→2000 to fit. `--num-ctx`, `--num-thread`, `--num-predict` flags added. `num_thread` defaults to Ollama's own (= physical cores, 4 here); benchmark 4/6/8. -3. **Token streaming.** *(partial — provider half done)* `OllamaProvider.stream()` - now yields deltas from Ollama's `stream=True` chat endpoint. Still TODO (commit 2): - have the agent emit `_ai:"stream"` delta frames and the Rust client render an - in-progress bubble. On CPU, perceived latency is TTFT — this will make a slow - reply feel live. +3. **Token streaming.** *(done)* `OllamaProvider.stream()` yields deltas from + Ollama's `stream=True` chat endpoint; the agent relays them as throttled + (~5/sec) cumulative `_ai:"stream"` frames off a worker thread, and the Rust + client renders a dim in-progress preview bubble (cleared by a `done` frame + when the final, persisted message lands). On CPU, perceived latency is TTFT — + this makes a slow reply feel live. 4. **Keep model warm + single-flight.** *(partial)* `keep_alive` already 30m (prevents mid-session reload). `OLLAMA_NUM_PARALLEL=1` is a **server-side env** read by `ollama serve`, not settable from the agent — set it where Ollama is diff --git a/hh/src/app.rs b/hh/src/app.rs index 000bb2e..2d9b78f 100644 --- a/hh/src/app.rs +++ b/hh/src/app.rs @@ -96,6 +96,14 @@ pub enum Net { name: String, on: bool, }, + /// Incremental reply text from a streaming AI agent. `text` is the reply so + /// far (cumulative); `done` clears the live preview (the final, persisted + /// chat message arrives separately as a normal `Message`). + AiStream { + name: String, + text: String, + done: bool, + }, /// A local system notice produced off-thread (e.g. async Ollama probe). Sys(String), Err(String), @@ -141,6 +149,9 @@ pub struct App { pub password: String, /// AI agents currently generating a reply — drives the "thinking" spinner. pub ai_typing: std::collections::HashSet, + /// Live, in-progress reply text per streaming agent, shown as a transient + /// preview bubble until the final message lands. Keyed by agent name. + pub ai_stream: std::collections::HashMap, /// Monotonic tick counter used to animate the AI spinner. pub spin: usize, /// When set, agents we summon are auto-granted sandbox drive on each launch @@ -173,6 +184,7 @@ impl App { error: None, password: String::new(), ai_typing: std::collections::HashSet::new(), + ai_stream: std::collections::HashMap::new(), spin: 0, agent_sbx_allow: false, } @@ -238,6 +250,7 @@ impl App { if let Some(p) = self.users.iter().position(|u| u.user_id == uid) { let name = self.users.remove(p).username; self.ai_typing.remove(&name); // a departed agent isn't thinking + self.ai_stream.remove(&name); // …nor streaming a reply self.sys(format!("{name} left")); } } @@ -248,6 +261,16 @@ impl App { self.ai_typing.remove(&name); } } + Net::AiStream { name, text, done } => { + if done { + self.ai_stream.remove(&name); + } else { + // Streaming has started → drop the bare "thinking" spinner; + // the live preview now signals the agent is working. + self.ai_typing.remove(&name); + self.ai_stream.insert(name, text); + } + } Net::SbxStatus { backend, ready, diff --git a/hh/src/net.rs b/hh/src/net.rs index 52c7297..201bacd 100644 --- a/hh/src/net.rs +++ b/hh/src/net.rs @@ -227,17 +227,24 @@ fn parse_sbx(text: &str, sender: &str) -> Option { } } -/// Parse a decrypted `{"_ai":"typing",...}` frame — an AI agent signalling that -/// it is (or has finished) generating a reply, so the UI can show a spinner. +/// Parse a decrypted `{"_ai":...}` frame from an AI agent. `"typing"` toggles the +/// thinking spinner; `"stream"` carries the cumulative reply text for a live +/// preview bubble (`done` clears it once the final message is posted). fn parse_ai(text: &str) -> Option { let v: Value = serde_json::from_str(text).ok()?; - if v["_ai"].as_str()? != "typing" { - return None; + let name = || v["name"].as_str().unwrap_or("ai").to_string(); + match v["_ai"].as_str()? { + "typing" => Some(Net::AiTyping { + name: name(), + on: v["on"].as_bool().unwrap_or(false), + }), + "stream" => Some(Net::AiStream { + name: name(), + text: v["text"].as_str().unwrap_or("").to_string(), + done: v["done"].as_bool().unwrap_or(false), + }), + _ => None, } - Some(Net::AiTyping { - name: v["name"].as_str().unwrap_or("ai").to_string(), - on: v["on"].as_bool().unwrap_or(false), - }) } /// Parse a decrypted `{"_perm":"acl",...}` frame. diff --git a/hh/src/ui.rs b/hh/src/ui.rs index 3dad3e3..4194eda 100644 --- a/hh/src/ui.rs +++ b/hh/src/ui.rs @@ -357,7 +357,26 @@ fn fmt_line<'a>(l: &'a ChatLine, app: &App, theme: &Theme) -> Line<'a> { fn draw_chat(f: &mut Frame, area: ratatui::layout::Rect, app: &App, theme: &Theme) { let inner_h = area.height.saturating_sub(2) as usize; // rows inside the border let text_w = area.width.saturating_sub(2).max(1); // wrap width inside the border - let lines: Vec = app.lines.iter().map(|l| fmt_line(l, app, theme)).collect(); + let mut lines: Vec = app.lines.iter().map(|l| fmt_line(l, app, theme)).collect(); + + // Live preview bubbles for agents currently streaming a reply, rendered + // below the committed history. Dim + italic so they read as in-progress; + // replaced by the real message once the agent posts it. + let mut streaming: Vec<(&String, &String)> = app.ai_stream.iter().collect(); + streaming.sort_unstable_by_key(|(name, _)| name.as_str()); + for (name, text) in streaming { + lines.push(Line::from(vec![ + Span::styled( + format!("{name} "), + Style::default().fg(theme.other).add_modifier(Modifier::BOLD), + ), + Span::styled("⠿ ", Style::default().fg(theme.dim)), + Span::styled( + text.as_str(), + Style::default().fg(theme.dim).add_modifier(Modifier::ITALIC), + ), + ])); + } // Measure the TRUE wrapped height and scroll to the bottom. Selecting N // logical lines and letting the Paragraph top-anchor them clips any wrapped