execution when thresholds are breached, preventing silent overages.
3. Async-First Execution: All patterns leverage asyncio for concurrent dispatch. Timeouts and exception handling are baked into the execution layer, ensuring partial failures do not cascade.
4. Deterministic Tracing: Every routing decision, stage completion, and revision cycle is logged with a correlation ID. This enables replay, cost attribution, and latency profiling without external dependencies.
Implementation
import asyncio
import time
import logging
from dataclasses import dataclass, field
from typing import Any, Callable, Awaitable
from enum import Enum
logger = logging.getLogger(__name__)
class PatternType(Enum):
ROUTER = "router"
PIPELINE = "pipeline"
BROADCAST = "broadcast"
SUPERVISOR = "supervisor"
@dataclass
class TaskContext:
correlation_id: str
input_payload: str
accumulated_state: dict = field(default_factory=dict)
metadata: dict = field(default_factory=dict)
@dataclass
class CostLedger:
total_budget_usd: float
current_spend_usd: float = 0.0
token_count: int = 0
def charge(self, tokens: int, rate_per_1k: float = 0.005) -> None:
cost = (tokens / 1000.0) * rate_per_1k
self.current_spend_usd += cost
self.token_count += tokens
if self.current_spend_usd > self.total_budget_usd:
raise RuntimeError(f"Budget exceeded: {self.current_spend_usd:.4f} > {self.total_budget_usd}")
class AgentOrchestrator:
def __init__(self, budget_usd: float = 0.50, default_timeout: float = 30.0):
self.ledger = CostLedger(total_budget_usd=budget_usd)
self.default_timeout = default_timeout
self._registry: dict[str, Callable[[TaskContext], Awaitable[str]]] = {}
def register_agent(self, name: str, handler: Callable[[TaskContext], Awaitable[str]]) -> None:
self._registry[name] = handler
async def _execute_with_timeout(self, handler: Callable, ctx: TaskContext) -> str:
try:
return await asyncio.wait_for(handler(ctx), timeout=self.default_timeout)
except asyncio.TimeoutError:
logger.warning(f"Agent {handler.__name__} timed out for correlation_id={ctx.correlation_id}")
raise
# Pattern 1: Router
async def route_execution(self, ctx: TaskContext, classifier: Callable[[str], Awaitable[str]]) -> str:
intent = await classifier(ctx.input_payload)
handler = self._registry.get(intent)
if not handler:
raise ValueError(f"No handler registered for intent: {intent}")
logger.info(f"Routing {ctx.correlation_id} to {intent}")
result = await self._execute_with_timeout(handler, ctx)
return result
# Pattern 2: Pipeline
async def pipeline_execution(self, ctx: TaskContext, stages: list[str]) -> list[str]:
outputs = []
current_input = ctx.input_payload
for stage_name in stages:
handler = self._registry.get(stage_name)
if not handler:
raise ValueError(f"Missing pipeline stage: {stage_name}")
stage_ctx = TaskContext(
correlation_id=ctx.correlation_id,
input_payload=current_input,
metadata={"stage": stage_name}
)
result = await self._execute_with_timeout(handler, stage_ctx)
outputs.append(result)
current_input = result # Chain output to next stage
return outputs
# Pattern 3: Broadcast
async def broadcast_execution(self, ctx: TaskContext, agents: list[str]) -> dict:
tasks = []
for agent_name in agents:
handler = self._registry.get(agent_name)
if not handler:
raise ValueError(f"Missing broadcast agent: {agent_name}")
tasks.append(self._execute_with_timeout(handler, ctx))
results = await asyncio.gather(*tasks, return_exceptions=True)
valid_results = []
failures = []
for i, res in enumerate(results):
if isinstance(res, Exception):
failures.append({"agent": agents[i], "error": str(res)})
logger.error(f"Broadcast agent {agents[i]} failed: {res}")
else:
valid_results.append(res)
if not valid_results:
raise RuntimeError("All broadcast agents failed")
return {"valid_outputs": valid_results, "failures": failures}
# Pattern 4: Supervisor
async def supervisor_execution(
self,
ctx: TaskContext,
worker_name: str,
reviewer_name: str,
max_iterations: int = 3
) -> str:
worker = self._registry[worker_name]
reviewer = self._registry[reviewer_name]
draft = await self._execute_with_timeout(worker, ctx)
iteration = 0
while iteration < max_iterations:
review_ctx = TaskContext(
correlation_id=ctx.correlation_id,
input_payload=f"Task: {ctx.input_payload}\nSubmission:\n{draft}",
metadata={"iteration": iteration}
)
critique = await self._execute_with_timeout(reviewer, review_ctx)
if "APPROVED" in critique.upper():
logger.info(f"Supervisor approved {ctx.correlation_id} at iteration {iteration}")
return draft
revision_ctx = TaskContext(
correlation_id=ctx.correlation_id,
input_payload=f"Task: {ctx.input_payload}\nPrevious:\n{draft}\nFeedback:\n{critique}",
metadata={"iteration": iteration + 1}
)
draft = await self._execute_with_timeout(worker, revision_ctx)
iteration += 1
logger.warning(f"Supervisor loop exhausted for {ctx.correlation_id}")
return draft
Why These Choices Matter
TaskContext over raw strings: Passing structured objects prevents accidental data loss. Metadata like correlation IDs and iteration counts survive across async boundaries and enable precise tracing.
asyncio.wait_for wrapping: LLM APIs exhibit tail latency. Without explicit timeouts, a single stalled request blocks the entire coordination layer. The wrapper enforces SLA boundaries.
- Centralized
CostLedger: Fragmented budget tracking is a production anti-pattern. By charging tokens at execution time and halting on breach, you prevent silent overages and enable real-time cost attribution.
asyncio.gather with return_exceptions=True: Broadcast patterns must never fail entirely because one agent times out. Collecting exceptions separately allows graceful degradation and fallback synthesis.
Pitfall Guide
1. Implicit Context Sharing
Explanation: Developers assume agents automatically share state because they run in the same process. LLMs are stateless; context windows reset per call. If Agent A extracts a constraint, Agent B will ignore it unless explicitly passed.
Fix: Always serialize accumulated state into the TaskContext.input_payload or a dedicated state field. Validate payload completeness before dispatch.
2. Unbounded Supervisor Loops
Explanation: A strict reviewer and a stubborn worker can cycle indefinitely, consuming tokens until the API rate limit or budget cap triggers.
Fix: Enforce hard iteration limits (max_iterations) and wall-clock timeouts. Implement cost decay: reduce temperature or switch to a cheaper model after iteration 2 to minimize waste.
3. Fragmented Budget Management
Explanation: Assigning independent budgets to each agent prevents global spend control. A pipeline with three agents each capped at $0.10 can still spend $0.30, violating system-level constraints.
Fix: Use a centralized ledger that deducts from a shared pool. Allocate per-stage caps as soft limits, but enforce a hard global ceiling that halts all execution when breached.
4. Ignoring Partial Failures in Broadcast
Explanation: Assuming all parallel agents succeed leads to unhandled exceptions and crashed aggregators. Network blips, rate limits, or model refusals are common.
Fix: Always use asyncio.gather(..., return_exceptions=True). Filter valid results, log failures with correlation IDs, and implement a fallback synthesis strategy that weights successful outputs.
5. Over-Provisioning Routing Models
Explanation: Using a high-capability reasoning model for intent classification wastes budget and increases latency. Routing is a classification problem, not a generation problem.
Fix: Deploy lightweight classifiers, embedding-based matchers, or fine-tuned small models for routing. Reserve expensive models for execution stages where reasoning depth matters.
6. Synchronous Aggregation Assumptions
Explanation: Treating parallel agent completion as ordered leads to race conditions and stale data. Agents finish in arbitrary order based on model load and prompt complexity.
Fix: Design aggregation logic to handle unordered results. Use correlation IDs to map outputs back to their source agents. Never assume sequential completion.
7. Missing Idempotency and Replay Support
Explanation: Production failures require debugging. Without deterministic tracing, you cannot reconstruct why a supervisor rejected a draft or why a router misclassified intent.
Fix: Attach correlation IDs to every context. Log routing decisions, token counts, and iteration states to structured storage (JSONL, OpenTelemetry). Enable replay by serializing full context payloads.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Mutually exclusive user intents (e.g., code review vs. summarization) | Router | Minimizes token spend by executing only one specialist | Low |
| Sequential refinement required (research β draft β polish) | Pipeline | Guarantees state propagation and stage-level quality gates | Medium |
| Need diverse perspectives or parallel search strategies | Broadcast | Maximizes coverage but requires synthesis overhead | High |
| Output quality is critical and subjective (legal, medical, creative) | Supervisor | Iterative correction reduces hallucination risk | Medium-High |
| Strict budget cap with unpredictable payload sizes | Pipeline + Centralized Ledger | Stage-level allocation prevents global overage | Controlled |
| High-throughput, low-latency SLA | Router + Lightweight Classifier | Avoids parallel execution and revision loops | Low |
Configuration Template
orchestrator:
budget_usd: 0.75
default_timeout_seconds: 25.0
token_rate_per_1k: 0.005
patterns:
router:
classifier_model: "text-embedding-3-small"
fallback_intent: "general_assistant"
pipeline:
max_stages: 4
stage_budget_split: [0.4, 0.35, 0.25]
broadcast:
max_parallel_agents: 3
require_synthesis: true
fallback_on_failure: "use_best_valid"
supervisor:
max_iterations: 3
approval_keyword: "APPROVED"
timeout_per_iteration: 20.0
observability:
correlation_id_prefix: "corr_"
log_format: "jsonl"
trace_export: "opentelemetry"
Quick Start Guide
- Initialize the orchestrator: Instantiate
AgentOrchestrator with your global budget and timeout preferences. Register your specialist handlers using descriptive intent or stage names.
- Define your pattern: Choose Router for intent dispatch, Pipeline for sequential refinement, Broadcast for parallel exploration, or Supervisor for iterative quality control. Configure pattern-specific limits in the YAML template.
- Execute with context: Create a
TaskContext with a unique correlation ID and payload. Dispatch through the chosen pattern method. The orchestrator handles timeouts, cost deduction, and error isolation automatically.
- Trace and iterate: Monitor structured logs for routing decisions, token consumption, and iteration states. Adjust budget splits, timeout thresholds, or agent assignments based on production telemetry.