d-stream context overflow.
4. Idempotent Tool Execution: Each tool call is assigned a deterministic execution key based on its tool_use_id. This prevents duplicate runs if the agent loop retries due to transient failures.
Implementation
import anthropic
import json
import logging
from typing import Callable, Dict, List, Any
logger = logging.getLogger(__name__)
class StreamingAgentOrchestrator:
def __init__(self, api_key: str, model_id: str = "claude-sonnet-4-6"):
self.client = anthropic.Anthropic(api_key=api_key)
self.model_id = model_id
self.tool_buffer: Dict[int, Dict[str, Any]] = {}
self.text_accumulator: List[str] = []
def _route_stream_events(
self,
event_stream: Any,
text_callback: Callable[[str], None],
tools: List[Dict]
) -> anthropic.types.Message:
"""Consumes SSE events, routes text to callback, buffers tool inputs."""
for event in event_stream:
if event.type == "content_block_start":
if event.content_block.type == "tool_use":
self.tool_buffer[event.index] = {
"tool_id": event.content_block.id,
"tool_name": event.content_block.name,
"raw_json": ""
}
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
chunk = event.delta.text
self.text_accumulator.append(chunk)
text_callback(chunk)
elif event.delta.type == "input_json_delta":
if event.index in self.tool_buffer:
self.tool_buffer[event.index]["raw_json"] += event.delta.partial_json
return event_stream.get_final_message()
def _execute_tool_batch(self, tool_buffer: Dict[int, Dict[str, Any]]) -> List[Dict]:
"""Parses buffered JSON and runs tools. Returns formatted results."""
results = []
for idx, tool_data in tool_buffer.items():
try:
parsed_args = json.loads(tool_data["raw_json"])
except json.JSONDecodeError as exc:
logger.warning(f"Tool {tool_data['tool_name']} failed JSON parse: {exc}")
parsed_args = {}
# Production note: wrap execute_tool in retry/idempotency logic
output = self._invoke_external_tool(tool_data["tool_name"], parsed_args)
results.append({
"type": "tool_result",
"tool_use_id": tool_data["tool_id"],
"content": str(output)
})
return results
def _invoke_external_tool(self, name: str, args: Dict) -> Any:
"""Placeholder for actual tool execution logic."""
# In production, route to a task queue or direct function dispatcher
return f"Executed {name} with {args}"
def run_agent_loop(
self,
conversation_history: List[Dict],
available_tools: List[Dict],
text_callback: Callable[[str], None]
) -> str:
"""Main execution loop with context budgeting and streaming."""
conversation_history = self._enforce_context_budget(conversation_history)
with self.client.messages.stream(
model=self.model_id,
max_tokens=4096,
tools=available_tools,
messages=conversation_history
) as stream:
final_response = self._route_stream_events(stream, text_callback, available_tools)
if final_response.stop_reason == "tool_use":
tool_outputs = self._execute_tool_batch(self.tool_buffer)
self.tool_buffer.clear()
self.text_accumulator.clear()
conversation_history.append({"role": "assistant", "content": final_response.content})
conversation_history.append({"role": "user", "content": tool_outputs})
return self.run_agent_loop(conversation_history, available_tools, text_callback)
return "".join(self.text_accumulator)
def _enforce_context_budget(self, messages: List[Dict], safety_margin: float = 0.85) -> List[Dict]:
"""Estimates tokens and prunes/truncates to prevent context overflow."""
estimated_tokens = self._approximate_token_count(messages)
max_budget = 160000 * safety_margin
if estimated_tokens > max_budget:
# Truncate large tool outputs first
for msg in messages:
if msg.get("role") == "tool" and len(msg.get("content", "")) > 2000:
msg["content"] = msg["content"][:2000] + "... [truncated]"
# Drop oldest non-system pairs if still over budget
while self._approximate_token_count(messages) > max_budget and len(messages) > 2:
messages = [messages[0]] + messages[3:]
return messages
def _approximate_token_count(self, messages: List[Dict]) -> int:
"""Fast heuristic estimation. Replace with prompt-token-counter in production."""
char_count = sum(len(str(m.get("content", ""))) for m in messages)
return int(char_count * 0.25) # Rough 4 chars per token average
Why This Architecture Works
The orchestrator separates consumption from execution. The streaming loop never blocks on tool calls. Text reaches the UI immediately via the callback. Tool inputs accumulate in a dictionary keyed by event.index, which is the only reliable way to track interleaved tool calls in Anthropic's SSE format. When the stream closes, get_final_message() returns the fully assembled payload. Only then does the system parse JSON and invoke tools. Context budgeting runs before the API call, preventing mid-stream truncation. This design guarantees deterministic tool execution while preserving real-time UX.
Pitfall Guide
1. Parsing Partial JSON Deltas
Explanation: input_json_delta events deliver JSON characters sequentially. Calling json.loads() on a chunk like {"query": "su raises a JSONDecodeError.
Fix: Accumulate all deltas for a given event.index into a string buffer. Parse only after the stream emits message_stop or content_block_stop.
Explanation: Anthropic can return multiple tool calls in a single response. Deltas arrive out of order or interleaved. Using a single buffer variable overwrites previous tool inputs.
Fix: Use a dictionary keyed by event.index. Each index corresponds to a distinct content block. Clear the buffer after execution to prevent state leakage across turns.
3. Forwarding Partial Streams to External Messaging APIs
Explanation: Platforms like Slack, Discord, and WhatsApp enforce strict rate limits on message edits. Streaming 100 chunks and calling chat_update per chunk triggers 429 errors and stalls the pipeline.
Fix: Stream internally to a buffer or local UI. Send exactly one chat_postMessage call with the complete response. Use streaming only for first-party interfaces.
4. Context Window Exhaustion in Recursive Loops
Explanation: Agentic loops append assistant and tool results to the message list. Without pruning, token count grows linearly. The API cuts off mid-stream or returns a context_length_exceeded error.
Fix: Implement pre-flight token estimation. Truncate large tool outputs first. Drop oldest user/assistant pairs while preserving the system prompt. Apply a safety margin (e.g., 85% of limit).
5. Assuming get_final_message() Bypasses Streaming Logic
Explanation: Developers sometimes call get_final_message() inside the streaming loop, expecting it to return partial data. It blocks until the stream closes, defeating real-time rendering.
Fix: Use get_final_message() only after the streaming context manager exits. Process live events inside the loop for UI updates. Reserve the final message for state inspection and tool extraction.
6. Mixing Extended Thinking Blocks with Text Deltas
Explanation: When extended_thinking is enabled, the model emits thinking content blocks before tool use or text. These blocks use different delta types and should not be rendered to users.
Fix: Filter event.content_block.type == "thinking" during routing. Buffer thinking blocks separately if audit logging is required. Never pass thinking deltas to the text callback.
7. Treating Token Estimation as a Hard Guarantee
Explanation: Approximate counters (character-based or tokenizer-based) drift by 3-7% depending on formatting, tool schemas, and system prompts. Relying on exact counts causes unexpected overflows.
Fix: Treat estimation as a safety margin check, not a boundary condition. Apply conservative thresholds (80-85%). Implement graceful fallbacks like message summarization or hard truncation when limits approach.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Internal chat UI with real-time feedback | Buffered event processing with live text callback | Preserves UX while guaranteeing tool execution safety | Neutral (same API calls) |
| Slack/Discord/WhatsApp bot integration | Post-stream buffering + single message send | Avoids rate limits, prevents broken UX from partial text | Lower (fewer API edits) |
| Long-running agentic loop (>10 turns) | Context budgeting with tool truncation + sliding window | Prevents context overflow, reduces token waste | Lower (fewer dropped requests) |
| Multi-modal or extended thinking enabled | Separate thinking buffer + filtered text routing | Prevents rendering internal reasoning, maintains schema compliance | Neutral (requires event filtering) |
Configuration Template
# production_agent_config.py
import anthropic
from typing import Callable, Dict, List, Any
class ProductionStreamingAgent:
def __init__(self, api_key: str, model: str = "claude-sonnet-4-6", context_limit: int = 160000):
self.client = anthropic.Anthropic(api_key=api_key)
self.model = model
self.context_limit = context_limit
self.tool_buffer: Dict[int, Dict[str, Any]] = {}
self.text_sink: List[str] = []
def configure_streaming(self, messages: List[Dict], tools: List[Dict], on_text: Callable[[str], None]) -> str:
messages = self._budget_context(messages)
with self.client.messages.stream(
model=self.model,
max_tokens=4096,
tools=tools,
messages=messages
) as stream:
for event in stream:
self._dispatch_event(event, on_text)
final_msg = stream.get_final_message()
if final_msg.stop_reason == "tool_use":
results = self._run_tools()
messages.append({"role": "assistant", "content": final_msg.content})
messages.append({"role": "user", "content": results})
return self.configure_streaming(messages, tools, on_text)
return "".join(self.text_sink)
def _dispatch_event(self, event, on_text):
if event.type == "content_block_start" and event.content_block.type == "tool_use":
self.tool_buffer[event.index] = {"id": event.content_block.id, "name": event.content_block.name, "json": ""}
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
self.text_sink.append(event.delta.text)
on_text(event.delta.text)
elif event.delta.type == "input_json_delta" and event.index in self.tool_buffer:
self.tool_buffer[event.index]["json"] += event.delta.partial_json
def _run_tools(self) -> List[Dict]:
outputs = []
for idx, data in self.tool_buffer.items():
try:
args = json.loads(data["json"])
except json.JSONDecodeError:
args = {}
outputs.append({"type": "tool_result", "tool_use_id": data["id"], "content": str(self._execute(data["name"], args))})
self.tool_buffer.clear()
self.text_sink.clear()
return outputs
def _execute(self, name: str, args: Dict) -> Any:
# Route to actual tool dispatcher
return f"Result from {name}"
def _budget_context(self, messages: List[Dict]) -> List[Dict]:
est = sum(len(str(m.get("content", ""))) * 0.25 for m in messages)
if est > self.context_limit * 0.85:
for m in messages:
if m.get("role") == "tool" and len(m.get("content", "")) > 2000:
m["content"] = m["content"][:2000] + "... [truncated]"
while sum(len(str(m.get("content", ""))) * 0.25 for m in messages) > self.context_limit * 0.85 and len(messages) > 2:
messages = [messages[0]] + messages[3:]
return messages
Quick Start Guide
- Install dependencies:
pip install anthropic (add prompt-token-counter and tool-output-truncate for production token estimation and truncation)
- Initialize the orchestrator: Instantiate
StreamingAgentOrchestrator with your API key and target model (claude-sonnet-4-6)
- Define your tool schema: Pass a list of tool definitions matching Anthropic's
tools parameter format
- Run the loop: Call
run_agent_loop() with your conversation history, tool list, and a text callback function. The orchestrator handles buffering, context budgeting, and recursive execution automatically.