),
"claude-sonnet-4-6": ModelSpec(
identifier="claude-sonnet-4-6",
tier=TaskTier.STANDARD,
input_price_per_m=3.00,
output_price_per_m=15.00,
max_context_tokens=4096,
fallback_id="gpt-5.5"
),
"gpt-5.5": ModelSpec(
identifier="gpt-5.5",
tier=TaskTier.STRUCTURED,
input_price_per_m=3.00,
output_price_per_m=12.00,
max_context_tokens=4096,
fallback_id="claude-sonnet-4-6"
),
"deepseek-v3": ModelSpec(
identifier="deepseek-v3",
tier=TaskTier.BULK,
input_price_per_m=0.27,
output_price_per_m=1.10,
max_context_tokens=4096,
fallback_id="gpt-5.5"
)
}
**Architecture Decision:** Using a frozen dataclass prevents runtime mutation of pricing or fallback chains. The `TaskTier` enum provides a type-safe routing target, while the `fallback_id` creates a directed acyclic graph (DAG) for retry logic. This design isolates configuration from execution, making it trivial to swap providers or update pricing without touching business logic.
### Step 2: Semantic Task Classifier
Keyword matching is insufficient for production routing. We implement a heuristic classifier that evaluates prompt length, structural indicators, and domain-specific terminology. The classifier returns a `TaskTier` and a confidence score, allowing the orchestrator to apply fallback rules when confidence is low.
```python
import re
from typing import Tuple
class TaskClassifier:
_STRUCTURAL_PATTERNS = re.compile(r"\b(json|schema|csv|extract|parse|format|return)\b", re.IGNORECASE)
_REASONING_PATTERNS = re.compile(r"\b(refactor|architect|debug|race.condition|optimize|security|trade.off|compare)\b", re.IGNORECASE)
_ROUTINE_PATTERNS = re.compile(r"\b(test|docstring|translate|boilerplate|lint|format|comment|rename)\b", re.IGNORECASE)
def evaluate(self, prompt: str) -> Tuple[TaskTier, float]:
tokens = len(prompt.split())
has_structure = bool(self._STRUCTURAL_PATTERNS.search(prompt))
has_reasoning = bool(self._REASONING_PATTERNS.search(prompt))
has_routine = bool(self._ROUTINE_PATTERNS.search(prompt))
if has_structure and not has_reasoning:
return TaskTier.STRUCTURED, 0.85
if has_reasoning or tokens > 600:
return TaskTier.COMPLEX, 0.90
if has_routine:
return TaskTier.BULK, 0.80
return TaskTier.STANDARD, 0.65
Architecture Decision: Regular expressions are used instead of naive in checks to avoid false positives on common words. The confidence score enables the orchestrator to apply a "confidence threshold" rule: if confidence drops below 0.7, the system defaults to the standard tier rather than guessing. This prevents misrouting complex architectural queries to bulk models.
Step 3: Execution Orchestrator with Fallback & Accounting
The orchestrator manages the request lifecycle: classification, model selection, API invocation, fallback chaining, and cost logging. We use a ledger pattern to track usage deterministically.
import time
import logging
from openai import OpenAI
from typing import Any, Dict
logger = logging.getLogger(__name__)
class UsageLedger:
def __init__(self):
self.entries: list[Dict[str, Any]] = []
self.cumulative_cost: float = 0.0
def record(self, model_id: str, in_tokens: int, out_tokens: int) -> float:
spec = MODEL_REGISTRY.get(model_id)
if not spec:
raise ValueError(f"Unknown model: {model_id}")
cost = (in_tokens * spec.input_price_per_m + out_tokens * spec.output_price_per_m) / 1_000_000
self.entries.append({
"model": model_id,
"input_tokens": in_tokens,
"output_tokens": out_tokens,
"cost": cost,
"epoch": time.time()
})
self.cumulative_cost += cost
return cost
class ExecutionOrchestrator:
def __init__(self, api_client: OpenAI, ledger: UsageLedger):
self.client = api_client
self.ledger = ledger
self.classifier = TaskClassifier()
def execute(self, user_prompt: str, system_instruction: str | None = None, max_attempts: int = 2) -> Dict[str, Any]:
tier, confidence = self.classifier.evaluate(user_prompt)
target_id = MODEL_REGISTRY[tier.value].identifier
messages = []
if system_instruction:
messages.append({"role": "system", "content": system_instruction})
messages.append({"role": "user", "content": user_prompt})
current_id = target_id
for attempt in range(max_attempts + 1):
try:
response = self.client.chat.completions.create(
model=current_id,
messages=messages,
max_tokens=4096,
temperature=0.7
)
usage = response.usage
self.ledger.record(current_id, usage.prompt_tokens, usage.completion_tokens)
return {
"content": response.choices[0].message.content,
"model_used": current_id,
"tier_routed": tier.value,
"confidence": confidence,
"tokens": {"input": usage.prompt_tokens, "output": usage.completion_tokens}
}
except Exception as exc:
logger.warning(f"Model {current_id} failed (attempt {attempt+1}): {exc}")
spec = MODEL_REGISTRY.get(current_id)
if spec and spec.fallback_id and attempt < max_attempts:
current_id = spec.fallback_id
logger.info(f"Rerouting to fallback: {current_id}")
else:
raise RuntimeError(f"Exhausted fallback chain for {target_id}") from exc
Architecture Decision: The orchestrator separates classification from execution. The fallback chain is driven by the ModelSpec.fallback_id, creating a predictable retry path rather than random model swapping. Cost recording happens immediately after a successful response, ensuring ledger accuracy even if downstream processing fails. The max_attempts parameter prevents infinite retry loops during provider outages.
Step 4: Asynchronous Batch Processing
Production workloads frequently require processing arrays of items (e.g., generating documentation for 50 functions). Synchronous execution creates bottlenecks. We implement a semaphore-controlled async batch processor that routes bulk items to the cheapest capable model.
import asyncio
from typing import List, Coroutine
class BatchProcessor:
def __init__(self, orchestrator: ExecutionOrchestrator, concurrency_limit: int = 5):
self.orchestrator = orchestrator
self.semaphore = asyncio.Semaphore(concurrency_limit)
async def _process_item(self, item: str, template: str) -> Dict[str, Any]:
async with self.semaphore:
prompt = template.format(item=item)
return await asyncio.to_thread(
self.orchestrator.execute, prompt, system_instruction=None, max_attempts=1
)
async def run(self, items: List[str], template: str) -> List[Dict[str, Any]]:
tasks: List[Coroutine] = [self._process_item(item, template) for item in items]
return await asyncio.gather(*tasks, return_exceptions=True)
Architecture Decision: asyncio.Semaphore enforces concurrency limits, preventing API rate limit violations. asyncio.to_thread bridges synchronous OpenAI client calls with async event loops without blocking. The batch processor explicitly limits retries to 1 per item to avoid cascading failures during high-throughput operations.
Pitfall Guide
1. Naive Keyword Routing
Explanation: Relying solely on exact string matches causes misclassification when prompts use synonyms or domain-specific jargon.
Fix: Implement pattern-based regex matching with confidence scoring. Add a fallback tier for low-confidence classifications. Consider a lightweight secondary classifier (e.g., a small embedding model) for semantic routing in high-volume systems.
2. Ignoring Context Window Economics
Explanation: Routing decisions often overlook input token volume. A 2,000-token prompt routed to a $25/M output model can cost more than the output itself.
Fix: Implement pre-flight token estimation. If input tokens exceed a threshold, automatically compress or summarize before routing. Adjust routing logic to factor in total token budget, not just task type.
3. Synchronous Fallback Chains
Explanation: Blocking retries during provider outages increase tail latency and degrade user experience.
Fix: Use asynchronous execution with circuit breakers. Track failure rates per model and temporarily disable failing endpoints. Implement exponential backoff with jitter to prevent thundering herd scenarios.
4. Unbounded Concurrency in Batch Jobs
Explanation: Spawning unlimited parallel requests triggers rate limits, causes 429 errors, and inflates costs through redundant retries.
Fix: Enforce concurrency limits using semaphores or token buckets. Monitor API quota headers (x-ratelimit-remaining) and dynamically adjust worker counts. Implement request queuing for bursty workloads.
5. Cost Drift from Untracked System Prompts
Explanation: System instructions and few-shot examples consume tokens but are frequently excluded from cost calculations, leading to budget overruns.
Fix: Track all tokens returned in the API response, including system prompt consumption. Use a unified ledger that records every API call regardless of role. Audit system prompt length quarterly and optimize for token efficiency.
6. Silent Model Degradation
Explanation: Providers occasionally update model weights or routing infrastructure, causing subtle quality drops without changing version strings.
Fix: Implement output validation checks (e.g., JSON schema verification, regex pattern matching). Track success rates per model tier and alert when quality metrics deviate by >5%. Maintain a shadow routing mode to compare model outputs before full deployment.
7. Hardcoded Pricing and Fallback Logic
Explanation: Embedding pricing and retry chains in application code requires deployments for every pricing update or provider change.
Fix: Externalize configuration to environment variables or a configuration service. Version pricing schemas and implement graceful degradation when configuration updates fail. Use feature flags to toggle routing strategies without code changes.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Real-time chat interface | Streaming + Sonnet 4.6 | Low latency, balanced reasoning, predictable token consumption | Moderate ($3/$15) |
| JSON extraction / API parsing | GPT-5.5 with schema enforcement | Superior structured output compliance, reduces post-processing | Moderate ($3/$12) |
| Architecture design / debugging | Claude Opus 4.7 | Highest reasoning fidelity, handles complex trade-off analysis | High ($5/$25) |
| Documentation / boilerplate / linting | DeepSeek V3 + batch processing | 10x cheaper, sufficient for routine transformations | Low ($0.27/$1.10) |
| High-throughput data transformation | DeepSeek V3 + async semaphore | Maximizes throughput while respecting rate limits | Low ($0.27/$1.10) |
Configuration Template
# routing_config.py
from dataclasses import dataclass
from typing import Dict, Optional
@dataclass
class RoutingPolicy:
confidence_threshold: float = 0.7
max_retry_attempts: int = 2
batch_concurrency: int = 5
fallback_timeout_ms: int = 3000
cost_alert_threshold: float = 100.0 # Monthly budget cap
ROUTING_POLICY = RoutingPolicy()
# Environment-driven overrides
import os
ROUTING_POLICY.confidence_threshold = float(os.getenv("ROUTING_CONFIDENCE", "0.7"))
ROUTING_POLICY.max_retry_attempts = int(os.getenv("ROUTING_MAX_RETRIES", "2"))
ROUTING_POLICY.batch_concurrency = int(os.getenv("BATCH_CONCURRENCY", "5"))
ROUTING_POLICY.cost_alert_threshold = float(os.getenv("COST_ALERT_THRESHOLD", "100.0"))
Quick Start Guide
- Initialize the client: Configure an OpenAI-compatible client pointing to your preferred gateway or direct provider endpoint. Set environment variables for API keys and base URLs.
- Deploy the registry: Copy the
MODEL_REGISTRY and RoutingPolicy into your configuration module. Adjust pricing and fallback chains to match your provider agreements.
- Instantiate the orchestrator: Create a
UsageLedger and ExecutionOrchestrator instance. Pass your API client and ledger to the orchestrator constructor.
- Route your first request: Call
orchestrator.execute(user_prompt, system_instruction) and inspect the returned dictionary for model used, tier routed, and token consumption.
- Monitor and tune: Review ledger entries after 100 requests. Adjust confidence thresholds, fallback chains, and concurrency limits based on observed latency, success rates, and cost distribution.