self, client: Anthropic, model: str, max_output_tokens: int = 1024):
self.client = client
self.model = model
self.max_output_tokens = max_output_tokens
self.turn_history: List[Dict[str, Any]] = []
self.system_instruction: str = ""
def append_turn(self, role: str, content: str) -> None:
self.turn_history.append({"role": role, "content": content})
def generate_response(self) -> Dict[str, Any]:
payload = {
"model": self.model,
"max_tokens": self.max_output_tokens,
"messages": self.turn_history,
}
if self.system_instruction:
payload["system"] = self.system_instruction
try:
response = self.client.messages.create(**payload)
self.append_turn("assistant", response.content[0].text)
return {
"text": response.content[0].text,
"stop_reason": response.stop_reason,
"usage": response.usage,
}
except APIStatusError as exc:
raise RuntimeError(f"Anthropic API failed: {exc.status_code} {exc.message}") from exc
**Architecture Rationale:**
- Encapsulating state prevents accidental mutation across concurrent requests.
- Explicit `system` parameter separation keeps instructions outside the turn list, enabling prompt caching optimizations.
- Centralized error handling ensures consistent failure modes before business logic executes.
### 2. Streaming with Structured Consumption
Streaming reduces perceived latency but introduces complexity in token boundary handling. The SDK's context manager simplifies consumption while preserving access to the final message object.
```python
def stream_completion(manager: DialogueManager, prompt: str) -> str:
manager.append_turn("user", prompt)
accumulated = []
with manager.client.messages.stream(
model=manager.model,
max_tokens=manager.max_output_tokens,
messages=manager.turn_history,
) as stream_ctx:
for chunk in stream_ctx.text_stream:
accumulated.append(chunk)
# In production, emit to WebSocket/SSE here
final_msg = stream_ctx.get_final_message()
manager.append_turn("assistant", "".join(accumulated))
return final_msg.content[0].text
Architecture Rationale:
- Accumulating chunks locally prevents partial token corruption.
get_final_message() provides access to usage and stop_reason after streaming completes.
- Separating accumulation from emission allows clean integration with async frameworks or real-time transports.
3. Vision Payload Construction
Vision requests require base64 encoding and strict media type declarations. The API accepts multiple content blocks per turn, enabling mixed text and image inputs.
import base64
from pathlib import Path
def analyze_visual_input(client: Anthropic, image_path: str, query: str) -> str:
raw_bytes = Path(image_path).read_bytes()
encoded_image = base64.standard_b64encode(raw_bytes).decode("utf-8")
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": encoded_image,
},
},
{"type": "text", "text": query},
],
}],
)
return response.content[0].text
Architecture Rationale:
- Content blocks enforce explicit typing, preventing ambiguous payload structures.
- Base64 encoding is handled synchronously for simplicity; production systems should stream files or use presigned URLs to avoid memory spikes.
- The API supports
image/jpeg, image/png, image/gif, and image/webp. Validate extensions before encoding.
Tool calling requires explicit loop management. The model signals intent via stop_reason == "tool_use", and the application must execute the function, format the result as a tool_result block, and re-invoke the API.
from typing import Callable, Dict, Any, List
def execute_tool_chain(
client: Anthropic,
initial_prompt: str,
tool_definitions: List[Dict[str, Any]],
tool_registry: Dict[str, Callable[..., str]],
max_iterations: int = 5,
) -> str:
conversation: List[Dict[str, Any]] = [{"role": "user", "content": initial_prompt}]
for _ in range(max_iterations):
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
tools=tool_definitions,
messages=conversation,
)
if response.stop_reason != "tool_use":
return response.content[0].text
tool_call = next(block for block in response.content if block.type == "tool_use")
func_name = tool_call.name
func_args = tool_call.input
if func_name not in tool_registry:
raise ValueError(f"Unregistered tool requested: {func_name}")
result = tool_registry[func_name](**func_args)
conversation.append({"role": "assistant", "content": response.content})
conversation.append({
"role": "user",
"content": [{
"type": "tool_result",
"tool_use_id": tool_call.id,
"content": result,
}],
})
raise RuntimeError("Tool-use loop exceeded maximum iterations")
Architecture Rationale:
max_iterations prevents infinite loops caused by ambiguous tool schemas or model hallucination.
- Explicit
tool_use_id binding ensures result routing matches the original request.
- Separating tool definitions from execution logic enables dependency injection and testing.
Pitfall Guide
1. Context Window Overflow
Explanation: Appending every historical turn without pruning or summarization eventually exceeds the model's context limit, triggering truncation or API errors.
Fix: Implement a sliding window or token-aware truncation strategy. Summarize older turns when approaching 75% of the context limit. Track usage.input_tokens to enforce boundaries proactively.
2. Hardcoded max_tokens Limits
Explanation: Setting a fixed output limit without considering task complexity causes premature truncation (stop_reason == "max_tokens"), forcing retry loops and wasting tokens.
Fix: Dynamically adjust max_tokens based on task type. Use response.stop_reason to detect truncation and implement exponential backoff with increased limits for critical paths.
Explanation: Failing to enforce iteration limits or validate tool outputs allows the model to cycle indefinitely between tool calls, exhausting rate limits and budget.
Fix: Always cap iterations (3–5 is standard). Validate tool results against expected schemas before re-injection. Log loop depth for observability.
4. Blocking Stream Consumption
Explanation: Synchronously iterating stream.text_stream without yielding or emitting chunks blocks the event loop, negating latency benefits and causing timeout errors in web frameworks.
Fix: Use async generators or non-blocking I/O. Emit chunks to a message queue or WebSocket immediately. Buffer only for final message reconstruction when required.
5. System Prompt Drift
Explanation: Modifying the system parameter across turns breaks prompt caching and introduces inconsistent behavior. The model treats each unique system prompt as a new instruction set.
Fix: Version system prompts externally. Keep the system string identical across turns in a single session. Use environment variables or configuration files to manage prompt versions.
6. Image Payload Bloat
Explanation: Encoding high-resolution images directly into base64 payloads inflates request size, increases latency, and may exceed API limits.
Fix: Resize images to ≤1080px on the longest side before encoding. Validate file size (<5MB recommended). Consider presigned URLs for large assets to reduce payload overhead.
7. Ignoring Usage Metrics
Explanation: Treating response.usage as optional metadata leads to untracked token consumption, making cost attribution and budget forecasting impossible.
Fix: Log input_tokens and output_tokens on every call. Implement a token accounting middleware that aggregates usage per session, user, or feature flag. Alert on threshold breaches.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| High-volume classification or routing | claude-haiku-4-5 with streaming | Lowest latency and cost; sufficient for pattern matching | ~90% reduction vs Opus |
| Standard conversational UI or moderate tool-use | claude-sonnet-4-6 with context window management | Balanced reasoning, speed, and pricing; reliable default | Baseline operational cost |
| Multi-step planning, code generation, or deep analysis | claude-opus-4-7 with explicit tool orchestration | Highest reasoning depth; justifies premium for complex tasks | ~5x cost vs Sonnet, but reduces retry loops |
| Real-time dashboard with image analysis | claude-sonnet-4-6 + presigned URLs + async streaming | Avoids base64 payload bloat; maintains low latency | Moderate cost; infrastructure overhead for URL signing |
| Batch processing with strict SLAs | claude-haiku-4-5 + token budgeting + parallel execution | Maximizes throughput; predictable latency | Lowest cost; requires queue management |
Configuration Template
# config/claude_integration.py
import os
from anthropic import Anthropic
from dataclasses import dataclass
from typing import Optional
@dataclass
class ClaudeConfig:
api_key: str = os.getenv("ANTHROPIC_API_KEY", "")
default_model: str = "claude-sonnet-4-6"
max_output_tokens: int = 1024
max_tool_iterations: int = 5
system_prompt: Optional[str] = None
enable_streaming: bool = True
token_budget_limit: int = 50000 # per session
def validate(self) -> None:
if not self.api_key:
raise ValueError("ANTHROPIC_API_KEY is required")
if self.max_tool_iterations < 1 or self.max_tool_iterations > 10:
raise ValueError("max_tool_iterations must be between 1 and 10")
if self.token_budget_limit < 1000:
raise ValueError("token_budget_limit must be >= 1000")
def build_client(config: ClaudeConfig) -> Anthropic:
config.validate()
return Anthropic(api_key=config.api_key)
Quick Start Guide
- Install SDK & Set Credentials: Run
pip install anthropic. Export your key: export ANTHROPIC_API_KEY="sk-ant-...". Verify with echo $ANTHROPIC_API_KEY.
- Initialize Client & Test Connection: Create a minimal script using
Anthropic() and call client.messages.create() with claude-sonnet-4-6. Confirm response text and usage metrics.
- Implement Context Manager: Wrap message history in a class that tracks turns, enforces token limits, and separates the
system prompt. Test multi-turn consistency.
- Add Tool Safety: Define one tool with JSON Schema. Implement the loop with
max_iterations=3. Verify stop_reason handling and tool_result binding.
- Deploy Observability: Log
input_tokens, output_tokens, stop_reason, and loop depth. Set up alerts for budget thresholds and truncation events. Validate end-to-end flow under load.