str
state: ExecutionState
output_payload: Optional[str] = None
execution_metrics: Dict[str, int] = Field(default_factory=dict)
diagnostic: Optional[str] = None
### Step 2: Externalize State and Context
Inference must remain stateless. Conversation history, intermediate results, and tool outputs are persisted in an external store. A context manager service handles compression, truncation, and retrieval before each inference step.
```python
# src/storage/context_vault.py
import json
import redis.asyncio as redis
from typing import List, Dict
class ContextVault:
def __init__(self, redis_client: redis.Redis, ttl_seconds: int = 3600):
self._client = redis_client
self._ttl = ttl_seconds
async def persist(self, session_id: str, interaction_log: List[Dict]) -> None:
key = f"ctx:{session_id}"
await self._client.set(key, json.dumps(interaction_log), ex=self._ttl)
async def retrieve(self, session_id: str) -> List[Dict]:
key = f"ctx:{session_id}"
raw = await self._client.get(key)
return json.loads(raw) if raw else []
async def compact(self, session_id: str, max_entries: int) -> List[Dict]:
history = await self.retrieve(session_id)
if len(history) > max_entries:
truncated = history[-max_entries:]
await self.persist(session_id, truncated)
return truncated
return history
Step 3: Implement the Execution Loop
The core loop follows a plan β act β observe pattern. It enforces token budgets, validates tool schemas before invocation, and integrates distributed tracing.
# src/engine/processor.py
import asyncio
import time
from opentelemetry import trace
from tenacity import retry, stop_after_attempt, wait_exponential_jitter
from src.contracts.workflow import WorkflowRequest, WorkflowResponse, ExecutionState
from src.storage.context_vault import ContextVault
from src.clients.tool_catalog import ToolCatalogClient
from src.clients.llm_gateway import LLMGateway
tracer = trace.get_tracer(__name__)
class WorkflowProcessor:
def __init__(self, service_id: str, config: dict):
self._service_id = service_id
self._llm = LLMGateway(model=config["model"], timeout=25)
self._vault = ContextVault(config["redis_url"])
self._catalog = ToolCatalogClient(config["registry_url"])
self._metrics = {}
async def execute(self, request: WorkflowRequest) -> WorkflowResponse:
start_time = time.monotonic()
span = tracer.start_span("workflow.execution")
span.set_attribute("service.id", self._service_id)
span.set_attribute("workflow.id", request.request_id)
try:
result = await self._run_cycle(request, span)
except Exception as exc:
span.record_exception(exc)
result = WorkflowResponse(
request_id=request.request_id,
state=ExecutionState.TERMINATED,
diagnostic=str(exc)
)
finally:
elapsed = int((time.monotonic() - start_time) * 1000)
result.execution_metrics["latency_ms"] = elapsed
span.end()
return result
async def _run_cycle(self, request: WorkflowRequest, span) -> WorkflowResponse:
available_tools = await self._catalog.resolve(self._service_id)
history = await self._vault.compact(request.session_context, max_entries=20)
accumulated_tokens = 0
iteration_count = 0
max_iterations = request.capability_limits["max_iterations"]
token_cap = request.capability_limits["token_cap"]
while iteration_count < max_iterations:
span.set_attribute("workflow.iteration", iteration_count)
with tracer.start_as_current_span("workflow.inference") as inf_span:
response = await self._invoke_with_backoff(history, available_tools)
inf_span.set_attribute("llm.prompt_tokens", response.usage.prompt_tokens)
inf_span.set_attribute("llm.completion_tokens", response.usage.completion_tokens)
accumulated_tokens += response.usage.total_tokens
if accumulated_tokens > token_cap:
return WorkflowResponse(
request_id=request.request_id,
state=ExecutionState.FINALIZED,
output_payload=response.content,
execution_metrics={"tokens_consumed": accumulated_tokens},
diagnostic="token_cap_reached"
)
if response.completion_reason == "stop":
history.append({"role": "assistant", "content": response.content})
await self._vault.persist(request.session_context, history)
return WorkflowResponse(
request_id=request.request_id,
state=ExecutionState.FINALIZED,
output_payload=response.content,
execution_metrics={"iterations": iteration_count + 1, "tokens_consumed": accumulated_tokens}
)
if response.tool_invocations:
execution_results = await self._dispatch_tools(response.tool_invocations)
history.append({"role": "assistant", "content": response.content})
history.extend(execution_results)
iteration_count += 1
return WorkflowResponse(
request_id=request.request_id,
state=ExecutionState.FINALIZED,
output_payload=response.content,
execution_metrics={"iterations": max_iterations, "tokens_consumed": accumulated_tokens},
diagnostic="iteration_limit_reached"
)
@retry(stop=stop_after_attempt(3), wait=wait_exponential_jitter(max=10))
async def _invoke_with_backoff(self, history, tools):
return await self._llm.generate(history, tools=tools)
async def _dispatch_tools(self, invocations):
results = []
for call in invocations:
validated = await self._catalog.validate(call.tool_name, call.arguments)
if not validated:
results.append({"role": "tool", "content": "schema_validation_failed"})
continue
output = await self._catalog.execute(call.tool_name, call.arguments)
results.append({"role": "tool", "content": str(output)})
return results
Architecture Decisions & Rationale
- Async Queue over Synchronous HTTP: Long-running agent tasks use Kafka or RabbitMQ for task submission. This prevents connection timeouts and allows horizontal scaling of workers independent of API gateways.
- Schema-First Tool Validation: Tools are registered with JSON Schema contracts. Validation occurs before LLM output reaches backend services, preventing malformed payloads from triggering downstream failures.
- Externalized Context Management: Conversation history lives in Redis or a vector database. The context vault handles truncation and compression, ensuring the LLM never receives unbounded payloads.
- Distributed Tracing Integration: OpenTelemetry spans track iteration counts, token consumption, and tool execution latency. This data feeds directly into cost attribution and performance dashboards.
Pitfall Guide
1. Context Window Bleed
Explanation: Developers append raw conversation history to every inference step without compression or truncation. This causes token costs to grow linearly and eventually exceeds model limits.
Fix: Implement a context vault that enforces a sliding window, summarizes older turns, and strips tool outputs that are no longer semantically relevant.
Explanation: Tool execution runs in the same event loop as LLM inference. A slow external API call blocks the entire agent, causing P99 latency spikes.
Fix: Offload tool execution to a separate worker pool. Use async I/O or message queues to decouple inference from external system calls.
3. Implicit State Leakage
Explanation: Conversation state is stored in process memory or global variables. Container restarts or horizontal scaling lose session data.
Fix: Enforce stateless inference. Persist all interaction logs in external stores (Redis, PostgreSQL, or vector DBs) keyed by session ID.
4. Unbounded Retry Storms
Explanation: LLM provider rate limits or temporary outages trigger aggressive retries without backoff or circuit breaking, amplifying load.
Fix: Implement exponential backoff with jitter. Add circuit breakers that trip after consecutive failures and degrade gracefully to cached or simplified responses.
Explanation: Tool signatures change without versioning or validation. Agents invoke outdated parameters, causing silent failures or data corruption.
Fix: Maintain a centralized tool registry with semantic versioning. Validate all invocations against the published schema before execution.
6. Missing Idempotency Keys
Explanation: Retrying a tool call that modifies external state (e.g., sending an email, updating a database) causes duplicate actions.
Fix: Generate idempotency keys at the request layer. Pass them to downstream services and implement check-then-act patterns in tool handlers.
7. Monolithic Deployment Boundaries
Explanation: Updating a single tool integration requires redeploying the entire agent service, causing unnecessary downtime and rollbacks.
Fix: Package each agent capability as an independent container. Use Kubernetes Deployments with independent replica counts and rollout strategies.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Short-lived tasks (<2s) | Synchronous HTTP (FastAPI) | Lower latency, simpler client integration | Baseline |
| Long-running workflows (>5s) | Async Queue (Kafka/RabbitMQ) | Prevents timeout cascades, enables worker scaling | +15% infra, -40% timeout failures |
| High-concurrency sessions | External Context Vault (Redis Cluster) | Stateless scaling, session persistence across pods | +10% memory cost, +90% reliability |
| Multi-provider LLM routing | Gateway with fallback routing | Avoids single-provider outages, optimizes cost | +5% latency, -30% token spend |
| Strict compliance environments | On-prem vector DB + local LLM | Data sovereignty, audit trails, no external egress | +200% infra cost, 100% data control |
Configuration Template
# k8s/agent-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-search-service
labels:
app: agent-search
tier: ai-workload
spec:
replicas: 3
selector:
matchLabels:
app: agent-search
template:
metadata:
labels:
app: agent-search
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8000"
spec:
containers:
- name: agent-worker
image: registry.internal/agent-search:v1.4.2
ports:
- containerPort: 8000
env:
- name: LLM_MODEL
value: "gpt-4o-mini"
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: agent-secrets
key: redis-connection
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: "http://otel-collector.monitoring:4317"
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "2000m"
memory: "1Gi"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 15
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: agent-search-svc
spec:
selector:
app: agent-search
ports:
- port: 80
targetPort: 8000
type: ClusterIP
Quick Start Guide
- Initialize the service scaffold: Create a FastAPI application with
/run, /tasks, /health, and /ready endpoints. Wire up Pydantic models for request/response contracts.
- Deploy external dependencies: Spin up a Redis instance for context storage and a Kafka cluster for async task routing. Configure connection strings in environment variables.
- Register tool schemas: Publish JSON Schema definitions for all capabilities to a centralized registry. Implement a client that validates and executes tools at runtime.
- Instrument observability: Add OpenTelemetry SDK initialization. Create spans for inference, tool execution, and context retrieval. Export metrics to Prometheus and traces to Jaeger/Tempo.
- Containerize and deploy: Build a Docker image with health checks. Apply the Kubernetes deployment template. Verify scaling behavior by simulating concurrent session loads and monitoring P99 latency.