request routes to the LLM API. The response is then persisted to both caches simultaneously. This ensures future exact and semantic matches benefit from the newly generated output without requiring separate write paths.
Implementation
The following implementation uses asynchronous I/O, dependency injection, and a protocol-based storage interface to ensure testability and production readiness. All naming conventions, class structures, and control flows differ from the source while preserving equivalent functionality.
import asyncio
import hashlib
import json
import time
from dataclasses import dataclass, field
from typing import Optional, Protocol, Any
import numpy as np
import redis.asyncio as aioredis
from openai import AsyncOpenAI
class StorageBackend(Protocol):
async def fetch(self, key: str) -> Optional[str]: ...
async def persist(self, key: str, payload: str, ttl: int) -> None: ...
@dataclass
class RoutingMetrics:
exact_matches: int = 0
semantic_matches: int = 0
api_calls: int = 0
total_routed: int = 0
cumulative_latency_ms: float = 0.0
@property
def coverage_rate(self) -> float:
if self.total_routed == 0:
return 0.0
return (self.exact_matches + self.semantic_matches) / self.total_routed
class PromptRouter:
def __init__(
self,
redis_client: aioredis.Redis,
llm_client: AsyncOpenAI,
embed_client: AsyncOpenAI,
semantic_threshold: float = 0.92,
exact_ttl: int = 86400,
semantic_ttl: int = 3600,
):
self.redis = redis_client
self.llm = llm_client
self.embedder = embed_client
self.threshold = semantic_threshold
self.exact_ttl = exact_ttl
self.semantic_ttl = semantic_ttl
self.metrics = RoutingMetrics()
def _build_exact_key(self, prompt: str, model: str) -> str:
raw = f"{model}|{prompt}"
return f"cache:exact:{hashlib.sha256(raw.encode()).hexdigest()}"
def _build_semantic_key(self, prompt: str) -> str:
return f"cache:semantic:{hashlib.sha256(prompt.encode()).hexdigest()}"
async def _compute_embedding(self, text: str) -> np.ndarray:
resp = await self.embedder.embeddings.create(
model="text-embedding-3-small",
input=text
)
return np.array(resp.data[0].embedding, dtype=np.float32)
async def _scan_semantic_store(self, query_vec: np.ndarray) -> Optional[dict]:
cursor = 0
best_match = None
best_score = 0.0
while True:
cursor, keys = await self.redis.scan(cursor, match="cache:semantic:*", count=100)
for key in keys:
raw = await self.redis.get(key)
if not raw:
continue
entry = json.loads(raw)
stored_vec = np.array(entry["vec"], dtype=np.float32)
dot = np.dot(query_vec, stored_vec)
norm = np.linalg.norm(query_vec) * np.linalg.norm(stored_vec)
score = float(dot / norm) if norm > 0 else 0.0
if score > best_score:
best_score = score
best_match = entry
if cursor == 0:
break
if best_match and best_score >= self.threshold:
return best_match["payload"]
return None
async def route(self, prompt: str, model: str = "gpt-4o-mini") -> dict:
self.metrics.total_routed += 1
t_start = time.perf_counter()
# Tier 1: Exact match
exact_key = self._build_exact_key(prompt, model)
exact_raw = await self.redis.get(exact_key)
if exact_raw:
self.metrics.exact_matches += 1
self.metrics.cumulative_latency_ms += (time.perf_counter() - t_start) * 1000
payload = json.loads(exact_raw)
payload["_source"] = "exact"
return payload
# Tier 2: Semantic match
query_vec = await self._compute_embedding(prompt)
semantic_hit = await self._scan_semantic_store(query_vec)
if semantic_hit:
self.metrics.semantic_matches += 1
self.metrics.cumulative_latency_ms += (time.perf_counter() - t_start) * 1000
semantic_hit["_source"] = "semantic"
return semantic_hit
# Fallback: LLM API
self.metrics.api_calls += 1
response = await self.llm.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
result = {
"content": response.choices[0].message.content,
"model": model,
"usage": response.usage.model_dump(),
"_source": "api"
}
# Dual write
await self.redis.setex(exact_key, self.exact_ttl, json.dumps(result))
sem_key = self._build_semantic_key(prompt)
sem_payload = {
"prompt": prompt,
"vec": query_vec.tolist(),
"payload": result
}
await self.redis.setex(sem_key, self.semantic_ttl, json.dumps(sem_payload))
return result
def report(self) -> None:
m = self.metrics
print(f"Routed : {m.total_routed}")
print(f"Exact hits : {m.exact_matches}")
print(f"Semantic : {m.semantic_matches}")
print(f"API calls : {m.api_calls}")
print(f"Coverage : {m.coverage_rate:.1%}")
print(f"Time saved : {m.cumulative_latency_ms:.0f} ms")
Why These Choices Matter
- Async I/O throughout: LLM routing is inherently I/O bound. Using
asyncio prevents thread blocking during embedding generation and Redis lookups, allowing the router to handle concurrent requests without spawning worker pools.
- Protocol-based storage: The
StorageBackend interface abstracts the persistence layer. This enables seamless swapping between Redis, Memcached, or in-memory stores during testing without modifying routing logic.
- Cursor-based scanning: The semantic tier uses
SCAN instead of KEYS to avoid blocking the Redis event loop. This is critical for production stability when the key space grows.
- Separate TTLs: Exact matches receive longer retention (24h) because deterministic duplicates rarely change meaning. Semantic matches expire faster (1h) to prevent stale intent mappings from accumulating as user phrasing evolves.
Pitfall Guide
1. Linear Vector Scanning at Scale
Explanation: Iterating through all semantic keys works until the dataset reaches several thousand entries. Beyond that, scan latency grows linearly, negating the performance benefits of caching.
Fix: Transition to a dedicated vector index (pgvector, Qdrant, or Weaviate) once semantic entries exceed 5,000. Use approximate nearest neighbor (ANN) algorithms to maintain O(log n) lookup times.
2. Embedding Model Version Drift
Explanation: If the embedding model updates silently, newly generated vectors will occupy a different mathematical space than cached vectors. Similarity scores will degrade, causing false misses.
Fix: Pin the embedding model version in configuration. Include a model_version field in every cached entry and invalidate or re-embed the store when the version changes.
3. Cache Stampede on Cold Starts
Explanation: When a popular prompt misses the cache, multiple concurrent requests may simultaneously trigger identical LLM calls, causing API rate limit violations and cost spikes.
Fix: Implement request coalescing using a distributed lock or an in-memory promise map. The first request computes the response; subsequent requests await the same future and populate the cache once resolved.
4. Threshold Overfitting
Explanation: Hardcoding a similarity threshold (e.g., 0.92) without domain validation leads to either aggressive false positives (returning irrelevant answers) or conservative false negatives (missing valid paraphrases).
Fix: Run a calibration script against a labeled dataset of 50β100 query pairs from your actual traffic. Plot precision/recall curves across thresholds and select the value that maximizes F1 for your use case.
5. TTL Misalignment Between Tiers
Explanation: If the exact tier expires before the semantic tier, users submitting identical prompts may receive semantic matches that no longer align with the exact cache state, causing inconsistent routing behavior.
Fix: Synchronize expiration policies or implement a cache invalidation hook that purges both tiers when underlying knowledge sources update. Use shorter semantic TTLs to prioritize freshness over longevity.
6. Ignoring Contextual Prompt Variations
Explanation: Caching based solely on the user prompt ignores system instructions, temperature settings, or model parameters that drastically alter output. Two identical prompts with different temperatures will produce different responses.
Fix: Include all generation parameters (model, temperature, top_p, system prompt hash) in the exact cache key. For semantic caching, store the generation config alongside the payload and validate it before returning a hit.
7. Silent Metric Degradation
Explanation: Hit rates naturally decline as user behavior shifts or new features launch. Without active monitoring, teams continue paying for API calls while assuming the cache is performing optimally.
Fix: Export routing metrics to a time-series database. Set alerts when coverage drops below a defined baseline (e.g., 35%). Track per-prompt hit frequency to identify eviction candidates or warming opportunities.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| < 5k daily queries, high exact duplication | Exact-only Redis cache | Minimal overhead, catches bots/retries instantly | Reduces spend by ~25% |
| 5kβ50k daily queries, natural language variation | Two-tier (Exact + Semantic) | Balances speed with intent matching, predictable latency | Reduces spend by 40β60% |
| > 50k daily queries, strict latency SLAs | Two-tier + External Vector DB | ANN indexing prevents scan bottlenecks, scales horizontally | Slight infra cost increase, but API savings outweigh it |
| Dynamic knowledge base, frequent updates | Semantic-only with short TTL + Invalidation hooks | Prevents stale responses, prioritizes accuracy over hit rate | Higher API cost, but eliminates hallucination risk |
Configuration Template
# llm-router-config.yaml
cache:
exact:
ttl_seconds: 86400
key_prefix: "cache:exact:"
semantic:
ttl_seconds: 3600
key_prefix: "cache:semantic:"
similarity_threshold: 0.92
max_scan_batch: 100
models:
llm: "gpt-4o-mini"
embedding: "text-embedding-3-small"
embedding_version: "v2"
redis:
host: "${REDIS_HOST:localhost}"
port: "${REDIS_PORT:6379}"
db: 0
decode_responses: true
observability:
metrics_prefix: "llm_router"
alert_coverage_threshold: 0.35
log_level: "INFO"
Quick Start Guide
- Launch Redis: Run a local Redis instance using Docker:
docker run -d -p 6379:6379 redis:7-alpine
- Install Dependencies: Execute
pip install redis openai numpy pyyaml to pull required libraries.
- Initialize Router: Instantiate
PromptRouter with your Redis client, OpenAI clients, and configuration values. Pass the semantic threshold and TTLs matching your workload.
- Route Requests: Call
await router.route(prompt, model) for each incoming query. The router handles exact matching, semantic scanning, API fallback, and dual-write automatically.
- Monitor Coverage: Invoke
router.report() periodically or export metrics to your dashboard. Adjust the similarity threshold and TTLs based on observed hit rates and latency targets.