er:** Token estimation, provider selection, fallback logic
- Semantic Cache: Embedding-based lookup for similar prompts
- Async Queue: Batching, retry management, backpressure handling
- Model Provider Layer: Abstracted client with circuit breakers and cost tracking
- Response Formatter: Streaming orchestration, token counting, billing hooks
Step 2: Request Routing & Provider Abstraction
Hardcoding providers creates vendor lock-in and prevents cost optimization. Implement a routing layer that selects models based on task complexity, latency requirements, and current pricing.
from enum import Enum
from typing import Optional
import httpx
import structlog
logger = structlog.get_logger()
class TaskType(Enum):
SIMPLE = "simple"
REASONING = "reasoning"
CODE = "code"
VISION = "vision"
class ModelRouter:
def __init__(self, providers: dict):
self.providers = providers
self.fallback_chain = {
TaskType.SIMPLE: ["openai/gpt-4o-mini", "anthropic/claude-3-haiku"],
TaskType.REASONING: ["anthropic/claude-3-5-sonnet", "openai/gpt-4o"],
TaskType.CODE: ["openai/gpt-4o", "anthropic/claude-3-5-sonnet"],
TaskType.VISION: ["openai/gpt-4o", "anthropic/claude-3-5-sonnet"]
}
async def resolve_provider(self, task: TaskType, tokens: int) -> str:
chain = self.fallback_chain.get(task, self.fallback_chain[TaskType.SIMPLE])
for provider_id in chain:
client = self.providers.get(provider_id)
if client and client.is_healthy():
return provider_id
raise RuntimeError("No healthy provider available")
Step 3: Semantic Caching & Async Batching
Token costs and latency drop dramatically when identical or semantically similar requests are cached. Use embedding-based similarity matching with configurable thresholds.
import redis
import numpy as np
from sentence_transformers import SentenceTransformer
class SemanticCache:
def __init__(self, redis_url: str, threshold: float = 0.92):
self.redis = redis.Redis.from_url(redis_url, decode_responses=True)
self.model = SentenceTransformer("all-MiniLM-L6-v2")
self.threshold = threshold
def _embed(self, text: str) -> list[float]:
return self.model.encode(text).tolist()
def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
a, b = np.array(a), np.array(b)
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
async def get(self, prompt: str) -> Optional[str]:
emb = self._embed(prompt)
for key in self.redis.scan_iter("cache:*"):
stored_emb = self.redis.hget(key, "embedding")
if stored_emb:
sim = self._cosine_similarity(emb, eval(stored_emb))
if sim >= self.threshold:
logger.info("cache_hit", key=key, similarity=sim)
return self.redis.hget(key, "response")
return None
async def set(self, prompt: str, response: str, ttl: int = 3600):
emb = self._embed(prompt)
key = f"cache:{hash(prompt)}"
self.redis.hset(key, mapping={"embedding": str(emb), "response": response})
self.redis.expire(key, ttl)
Pair caching with an async task queue to batch requests and smooth provider load:
from celery import Celery
import asyncio
celery_app = Celery("ai_tasks", broker="redis://localhost:6379/0")
@celery_app.task(bind=True, max_retries=3)
def generate_response(self, tenant_id: str, prompt: str, task_type: str):
try:
router = ModelRouter(providers=load_providers())
provider = router.resolve_provider(TaskType(task_type), estimate_tokens(prompt))
response = call_provider_sync(provider, prompt)
track_billing(tenant_id, provider, prompt, response)
return response
except Exception as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
Step 4: Observability & Metered Billing
Production AI systems require structured telemetry. Track tokens, latency, provider costs, and fallback events per request.
import time
import structlog
class AIRequestTracer:
def __init__(self, tenant_id: str, request_id: str):
self.tenant_id = tenant_id
self.request_id = request_id
self.start = time.perf_counter()
self.metrics = {"provider": None, "input_tokens": 0, "output_tokens": 0, "cost": 0.0}
def record(self, provider: str, input_tok: int, output_tok: int, cost: float):
self.metrics.update({
"provider": provider,
"input_tokens": input_tok,
"output_tokens": output_tok,
"cost": cost
})
def finish(self):
latency_ms = (time.perf_counter() - self.start) * 1000
self.metrics["latency_ms"] = latency_ms
logger.info("ai_request_complete", **self.metrics, tenant=self.tenant_id, req=self.request_id)
publish_to_billing(self.tenant_id, self.metrics)
Architecture Decisions
| Decision | Recommendation | Rationale |
|---|
| Synchronous vs Async | Async queue + streaming | Prevents thread blocking, enables batching, improves P95 latency |
| Cache Strategy | Semantic + TTL-based | Handles paraphrased queries; TTL prevents stale context |
| Provider Selection | Task-aware routing + fallback | Balances cost, capability, and availability |
| Token Counting | Provider-native + fallback estimator | Native counts are accurate; estimators prevent billing gaps |
| Multi-tenancy | Tenant-scoped rate limits + isolated queues | Prevents noisy-neighbor degradation and cost bleed |
Pitfall Guide
-
Treating tokens as free variables
Token consumption compounds across retries, system prompts, and context windows. Implement strict token budgets and context truncation strategies before scaling.
-
Hardcoding provider endpoints
Direct imports of openai or anthropic clients lock you into pricing changes and outages. Abstract providers behind a routing layer with health checks and circuit breakers.
-
Ignoring context window economics
Doubling context length often doubles inference cost and latency. Implement dynamic context pruning, retrieval-augmented generation (RAG) with vector databases, and summary condensation for long conversations.
-
Skipping fallback routing
Provider rate limits, regional outages, and model deprecations happen. Maintain a priority chain of alternative models with capability mapping and automatic failover.
-
Overcomplicating prompt orchestration
Multi-agent prompt chains introduce latency multiplication and error propagation. Start with single-turn generation, add routing only when task complexity justifies it, and measure chain success rates.
-
Neglecting data residency and PII scrubbing
AI providers may log prompts for training. Implement pre-processing pipelines that redact sensitive data, enforce tenant data isolation, and comply with GDPR/CCPA requirements.
-
Measuring success by accuracy, not unit economics
A 2% accuracy improvement that increases inference cost by 40% destroys margins. Track cost-per-task, latency SLAs, and cache hit rates alongside model performance metrics.
Production Bundle
Action Checklist
Decision Matrix
| Deployment Model | Cost Efficiency | Latency Control | Operational Overhead | Best For |
|---|
| Managed API (OpenAI/Anthropic) | Low | Medium | Minimal | MVP, low volume, rapid iteration |
| Serverless Functions | Medium | Medium-High | Low-Medium | Variable traffic, cost-sensitive workloads |
| Async Batching + Cache | High | High | Medium | Production SaaS, multi-tenant, metered billing |
| Self-Hosted Inference (vLLM/TGI) | Very High | Very High | High | High volume, data privacy, custom fine-tunes |
| Edge AI Deployment | Medium-High | Very High | High | Low-latency requirements, offline capability |
Configuration Template
# docker-compose.yml
version: "3.9"
services:
api:
build: ./app
ports: ["8000:8000"]
environment:
- REDIS_URL=redis://redis:6379/0
- CELERY_BROKER_URL=redis://redis:6379/1
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
depends_on: [redis, celery-worker]
redis:
image: redis:7-alpine
ports: ["6379:6379"]
volumes: ["redis_data:/data"]
celery-worker:
build: ./app
command: celery -A tasks worker --loglevel=info --concurrency=4
environment:
- REDIS_URL=redis://redis:6379/0
- CELERY_BROKER_URL=redis://redis:6379/1
depends_on: [redis]
nginx:
image: nginx:alpine
ports: ["80:80"]
volumes: ["./nginx.conf:/etc/nginx/nginx.conf"]
depends_on: [api]
volumes:
redis_data:
# app/config.py
import os
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
redis_url: str = os.getenv("REDIS_URL", "redis://localhost:6379/0")
celery_broker: str = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/1")
cache_threshold: float = 0.92
max_concurrent_per_tenant: int = 50
rate_limit_per_minute: int = 120
fallback_chain: dict = {
"simple": ["openai/gpt-4o-mini", "anthropic/claude-3-haiku"],
"reasoning": ["anthropic/claude-3-5-sonnet", "openai/gpt-4o"],
"code": ["openai/gpt-4o", "anthropic/claude-3-5-sonnet"]
}
settings = Settings()
Quick Start Guide
-
Initialize the routing layer
Create a provider abstraction that normalizes response formats, tracks token usage, and implements health checks. Deploy with at least two fallback providers per task category.
-
Deploy semantic caching
Install sentence-transformers and redis. Configure embedding lookup with a 0.90-0.95 similarity threshold. Set TTL to 1-4 hours depending on domain volatility.
-
Wire async processing
Replace synchronous requests or SDK calls with Celery/RQ tasks. Implement exponential backoff, max retries (3), and tenant-scoped queues. Enable streaming responses only after queue acceptance.
-
Instrument unit economics
Add middleware that captures input/output tokens, latency, provider cost, and fallback events. Publish metrics to your billing system and dashboard. Set alerts for cost-per-task spikes >15%.
-
Enforce production guardrails
Configure rate limits per tenant, implement PII redaction before provider calls, and establish circuit breakers for provider outages. Run load tests at 2x expected peak concurrency before launch.
AI SaaS products succeed when inference is treated as a measurable, cacheable, and routable infrastructure layer. Optimize for unit economics first, model capability second, and observability always. The architecture patterns outlined here reduce operational risk, stabilize margins, and scale predictably under production load.