Live chain-of-thought in a chatbot: how to actually stream the tool calls (not just the text)
Real-Time Agent Telemetry: Streaming Tool Execution Alongside LLM Output
Current Situation Analysis
Modern LLM chatbots have standardized on token-by-token text streaming. Users expect the typewriter effect, and frameworks deliver it efficiently. However, this creates a critical UX blind spot: the model's actual work remains invisible. When an agent decides to query a database, scrape a webpage, or call an external API, the interface typically freezes or shows a generic loading spinner. The user stares at a blinking cursor while the system burns seconds on network I/O, parsing, and guardrail evaluation.
This problem is systematically overlooked because most AI frameworks treat tool execution as an internal orchestration step rather than a user-facing event. Developers prioritize reducing time-to-first-token and optimizing text generation latency, assuming transparency is secondary to speed. In reality, perceived latency is driven by uncertainty, not raw milliseconds. A 4-second operation with visible progress steps feels faster than a 2-second operation with a black box.
Production telemetry confirms this gap. Support tickets for agent-based applications frequently cite "unexplained delays" or "unclear failure states." Trust metrics drop sharply when users cannot audit the decision path. Furthermore, debugging production agent loops becomes nearly impossible without real-time visibility into tool invocation sequences, error propagation, and post-processing modifications. The industry has optimized for text delivery while ignoring the execution graph that actually powers the response.
WOW Moment: Key Findings
Exposing the full execution stream transforms the interface from a passive text receiver into an interactive audit trail. The following comparison illustrates the operational impact of streaming telemetry alongside content generation:
| Approach | Perceived Latency | Debuggability | User Trust | Implementation Complexity |
|---|---|---|---|---|
| Text-Only Streaming | High (black box during tool execution) | Low (post-hoc logs only) | Moderate (speculative) | Low |
| Full Telemetry Streaming | Low (progress visible in real-time) | High (live step tracking) | High (transparent decision path) | Medium |
| Batched Tool + Text | Very High (long pauses) | Medium (structured but delayed) | Low (opaque workflow) | Low |
This finding matters because it shifts the architectural priority from "deliver tokens fast" to "deliver context fast." When tool calls surface as discrete, timestamped events, you gain three immediate advantages:
- Graceful degradation: Users can abort long-running steps before they complete.
- Live debugging: Engineering teams can trace execution paths without querying centralized logging systems.
- Predictable state transitions: The UI can differentiate between generation, execution, and finalization phases, enabling precise loading states and error boundaries.
Core Solution
Building a real-time telemetry stream requires coordinating three distinct layers: the event emitter, the secure wire contract, and the client-side consumer. Each layer must handle asynchronous deltas, partial objects, and state reconciliation without blocking the primary generation loop.
Step 1: Backend Event Architecture
The server must emit three distinct event categories as they occur in the agent lifecycle:
tool_invocation: Fires when the model decides to execute an external function. Contains metadata but deliberately excludes payloads.content_delta: Fires continuously as the language model generates tokens.turn_finalization: Fires once post-processing guards complete. Contains the authoritative response text and final tool execution results.
Server-Sent Events (SSE) is the optimal transport protocol. It provides unidirectional streaming, native browser support, automatic reconnection, and proxy compatibility. FastAPI's StreamingResponse handles the generator pattern cleanly.
import json
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
def _format_sse(event_type: str, payload: dict) -> str:
return f"event: {event_type}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n"
@app.post("/v1/agent/stream")
async def stream_agent_response(request: Request):
body = await request.json()
session_id = body.get("session_id")
prompt = body.get("prompt")
async def _event_generator():
yield _format_sse("session_open", {"session_id": session_id})
try:
async with asyncio.timeout(120):
async for raw_event in agent_orchestrator.run(prompt, session_id):
event_kind = raw_event.get("category")
if event_kind == "tool_invocation":
yield _format_sse("tool_invocation", sanitize_tool_metadata(raw_event["meta"]))
elif event_kind == "content_delta":
yield _format_sse("content_delta", {"chunk": raw_event["text"]})
elif event_kind == "turn_finalization":
yield _format_sse("turn_finalization", format_final_turn(raw_event["turn_data"]))
except asyncio.CancelledError:
raise
except asyncio.TimeoutError:
yield _format_sse("stream_error", {"code": "TURN_TIMEOUT", "detail": "Execution exceeded 120s"})
except Exception as exc:
yield _format_sse("stream_error", {"code": "INTERNAL", "detail": str(exc)})
finally:
yield _format_sse("stream_closed", {"status": "complete"})
return StreamingResponse(
_event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
},
)
Architecture Rationale:
asyncio.timeout(120)prevents zombie connections when external APIs hang.CancelledErrormust be re-raised before generic exception handling. Swallowing it keeps the upstream LLM process alive, wasting compute and tokens.X-Accel-Buffering: nodisables reverse proxy buffering. Without it, nginx or Cloudflare will accumulate the entire response before sending it to the client, destroying the streaming contract.
Step 2: Secure Wire Contract
Never serialize raw tool payloads to the client. Tool inputs frequently contain API keys, database connection strings, or sensitive user data. The wire contract must be strictly narrower than the internal representation.
from typing import TypedDict, Optional
class ToolInvocationWire(TypedDict):
execution_id: str
function_name: str
server_namespace: str
status: str
started_at: float
def sanitize_tool_metadata(internal_obj: object) -> ToolInvocationWire:
"""Strips all input/output payloads. Returns only routing and status metadata."""
return {
"execution_id": getattr(internal_obj, "run_id", "unknown"),
"function_name": getattr(internal_obj, "target_func", "unknown"),
"server_namespace": getattr(internal_obj, "provider", "unknown"),
"status": getattr(internal_obj, "state", "pending"),
"started_at": getattr(internal_obj, "timestamp", 0.0),
}
Why this matters: The wire type acts as a compiler-enforced security boundary. If a developer accidentally attempts to serialize the full internal object, TypeScript/Pydantic will reject it. Using getattr with defaults handles partial objects that arrive before execution completes, preventing AttributeError during mid-stream serialization.
Step 3: Client-Side Stream Consumption
Browsers cannot use EventSource for POST requests. You must use the Fetch API with ReadableStream. The consumer must handle UTF-8 chunk boundaries, SSE framing, and state reconciliation.
interface StreamEvent {
event: string;
data: Record<string, unknown>;
}
async function consumeAgentStream(sessionId: string, prompt: string, signal: AbortSignal) {
const response = await fetch("/v1/agent/stream", {
method: "POST",
headers: { "Content-Type": "application/json", Accept: "text/event-stream" },
body: JSON.stringify({ session_id: sessionId, prompt }),
signal,
});
if (!response.body) throw new Error("ReadableStream not supported");
const reader = response.body.getReader();
const decoder = new TextDecoder("utf-8", { fatal: false });
let buffer = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const frames = buffer.split("\n\n");
buffer = frames.pop() ?? "";
for (const frame of frames) {
if (!frame.trim()) continue;
const parsed = parseSSEFrame(frame);
if (!parsed) continue;
switch (parsed.event) {
case "tool_invocation":
stateManager.appendToolStep(parsed.data as ToolInvocationWire);
break;
case "content_delta":
stateManager.appendContentDelta((parsed.data as any).chunk);
break;
case "turn_finalization":
stateManager.replaceWithFinalTurn(parsed.data as FinalTurnData);
break;
case "stream_error":
stateManager.setError(parsed.data as any);
break;
}
}
}
}
function parseSSEFrame(raw: string): StreamEvent | null {
const lines = raw.split("\n");
let eventType = "";
let dataStr = "";
for (const line of lines) {
if (line.startsWith("event:")) eventType = line.slice(6).trim();
if (line.startsWith("data:")) dataStr = line.slice(5).trim();
}
if (!eventType || !dataStr) return null;
return { event: eventType, data: JSON.parse(dataStr) };
}
Critical Implementation Detail: The turn_finalization event must replace the accumulated content, not append to it. Post-processing guards (anti-drift filters, PII redaction, formatting normalization) frequently modify the raw token stream. Appending causes duplicate rendering and broken markdown. Replacing ensures the UI reflects the authoritative, sanitized output.
Step 4: Scroll Management
Naive auto-scroll triggers on every content delta, causing ~30 scroll events per second. This fights user intent when they scroll up to review earlier context. The industry standard is a sticky-bottom threshold.
function useStickyScroll(messagesRef: React.RefObject<HTMLDivElement>) {
const lastMessageCount = useRef(0);
useEffect(() => {
const container = messagesRef.current;
if (!container) return;
const viewportBottom = window.innerHeight + window.scrollY;
const distanceFromBottom = container.scrollHeight - viewportBottom;
const isNearBottom = distanceFromBottom < 250;
const hasNewMessage = container.childElementCount > lastMessageCount.current;
lastMessageCount.current = container.childElementCount;
if (hasNewMessage || isNearBottom) {
container.scrollTo({ top: container.scrollHeight, behavior: "smooth" });
}
});
}
The 250px threshold balances responsiveness with user control. New messages always trigger scroll. Deltas only trigger scroll if the user is already viewing the tail. This eliminates scroll-jacking while maintaining real-time visibility.
Pitfall Guide
1. Swallowing Cancellation Errors
Explanation: When a user closes the tab or navigates away, the browser aborts the fetch request. FastAPI propagates this as asyncio.CancelledError. If caught by a generic except Exception, the generator yields an error frame to a closed socket and, critically, fails to cancel the upstream LLM process.
Fix: Always place except asyncio.CancelledError: raise before generic exception handlers. This ensures the orchestration loop terminates cleanly and compute is released.
2. Appending Final Turn Data
Explanation: Streaming content_delta events represent raw generation. The turn_finalization event contains post-processed output. Guardrails may strip markdown, redact sensitive phrases, or reformat structure. Appending creates duplicate text and broken rendering.
Fix: Treat turn_finalization as a state replacement operation. Clear accumulated deltas and render the authoritative payload.
3. Missing Reverse Proxy Headers
Explanation: Nginx, Cloudflare, and AWS ALB buffer responses by default. SSE requires unbuffered delivery. Without X-Accel-Buffering: no (nginx) or equivalent Cloudflare settings, the client receives nothing until the entire turn completes.
Fix: Explicitly set buffering headers on the SSE endpoint. Verify with curl -N to confirm chunked delivery.
4. UTF-8 Corruption in Chunks
Explanation: TextDecoder without { stream: true } assumes each chunk is a complete UTF-8 sequence. Multi-byte characters split across network boundaries cause decoding errors or replacement characters ().
Fix: Always initialize TextDecoder with { stream: true }. This maintains internal state across chunks and correctly reassembles split sequences.
5. Leaking Tool Payloads
Explanation: Tool inputs contain queries, URLs, credentials, and user data. Serializing the full internal object exposes this data to the browser's network tab and potentially to third-party analytics.
Fix: Implement a strict wire contract. Use typed dictionaries or interfaces that explicitly exclude input, output, and credentials fields. Validate serialization paths in CI.
6. Aggressive Auto-Scroll
Explanation: Triggering scrollIntoView() on every delta creates animation thrashing and yanks users away from content they're reading.
Fix: Implement the sticky-bottom pattern with a distance threshold. Only auto-scroll on new turn boundaries or when the viewport is already near the tail.
7. Zombie Connections from Hanging Tools
Explanation: External MCP servers or search APIs may hang indefinitely. The SSE socket remains open, consuming server memory and client resources.
Fix: Wrap the generator in asyncio.timeout() or equivalent. Return a structured error event when the threshold is exceeded, allowing the UI to display a timeout message and reset state.
Production Bundle
Action Checklist
- Define strict wire types that exclude all tool inputs, outputs, and credentials
- Implement
asyncio.timeouton the generator to prevent zombie connections - Place
CancelledErrorre-raise before generic exception handlers - Add
X-Accel-Buffering: noand equivalent proxy headers to SSE endpoints - Use
TextDecoderwith{ stream: true }for UTF-8 safety - Replace state on
turn_finalizationinstead of appending deltas - Implement sticky-bottom scroll logic with a 200-300px threshold
- Add client-side
AbortControllerfor manual stream cancellation - Log tool execution durations server-side for performance monitoring
- Validate SSE framing with
curl -Nbefore deploying to production
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|---|---|---|
| Real-time agent chat | SSE + Fetch streaming | Native browser support, simple framing, proxy-friendly | Low |
| Bidirectional control (pause/resume/override) | WebSocket | Full duplex, lower latency for control messages | Medium |
| Low-complexity internal tools | Polling with short intervals | Easier debugging, no streaming infrastructure | Low (but poor UX) |
| High-throughput public API | SSE with connection pooling | Scales better than WebSockets, HTTP/2 compatible | Low |
| Mobile/Unstable networks | SSE with automatic retry | Native reconnection, lower battery drain than polling | Low |
Configuration Template
# server/sse_config.py
import json
from fastapi.responses import StreamingResponse
SSE_HEADERS = {
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Content-Type, Authorization",
}
def create_sse_response(generator, media_type="text/event-stream"):
return StreamingResponse(
generator,
media_type=media_type,
headers=SSE_HEADERS,
)
def format_event(event_name: str, payload: dict) -> str:
return f"event: {event_name}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n"
// client/streamManager.ts
export class AgentStreamManager {
private controller: AbortController | null = null;
async start(sessionId: string, prompt: string, handlers: StreamHandlers) {
this.controller = new AbortController();
await consumeAgentStream(sessionId, prompt, this.controller.signal, handlers);
}
cancel() {
this.controller?.abort();
this.controller = null;
}
}
export interface StreamHandlers {
onToolInvocation: (data: ToolInvocationWire) => void;
onContentDelta: (chunk: string) => void;
onFinalTurn: (data: FinalTurnData) => void;
onError: (error: StreamError) => void;
onClose: () => void;
}
Quick Start Guide
- Initialize the backend endpoint: Create a FastAPI route that accepts POST requests, validates the session, and returns a
StreamingResponsewith the required SSE headers. - Implement the generator: Wrap your agent orchestration loop in an async generator. Emit
tool_invocation,content_delta, andturn_finalizationevents using theformat_eventhelper. - Define wire contracts: Create strict TypeScript/Python types that exclude tool payloads. Serialize only execution metadata, status, and timestamps.
- Build the client consumer: Use
fetchwithReadableStream,TextDecoder({ stream: true }), and a buffer-split loop. Route events to state handlers. - Add scroll management: Implement the sticky-bottom pattern with a distance threshold. Test with rapid token generation and manual scroll-up to verify behavior.
Mid-Year Sale β Unlock Full Article
Base plan from just $4.99/mo or $49/yr
Sign in to read the full article and unlock all tutorials.
Sign In / Register β Start Free Trial7-day free trial Β· Cancel anytime Β· 30-day money-back
