0.90) -> Optional[str]:
"""
Retrieve cached response if semantic similarity exceeds threshold.
Returns None on miss or error.
"""
start = time.perf_counter()
try:
query_embedding = await self.embedder.aembed_query(query)
# Redis vector search query
# We use KNN to find the nearest neighbor
vector_query = f"*=>[KNN 1 @embedding $query_vector AS score]"
params = {"query_vector": np.array(query_embedding).tobytes()}
results = self.r.ft(self.index_name).search(
vector_query,
params=params,
dialect=2
)
latency_ms = (time.perf_counter() - start) * 1000
logger.debug(f"Cache lookup latency: {latency_ms:.2f}ms")
if results.docs:
doc = results.docs[0]
score = float(doc.score)
# Redis returns distance; convert to similarity if needed
# Assuming COSINE returns distance, similarity = 1 - distance
similarity = 1.0 - score
if similarity >= threshold:
logger.info(f"Cache HIT: similarity={similarity:.4f}")
return doc.json # Return stored payload
else:
logger.info(f"Cache MISS: similarity={similarity:.4f} < threshold")
return None
except Exception as e:
# Fail-open: Never block request on cache error
logger.error(f"Semantic cache error: {e}", exc_info=True)
return None
async def set(self, query: str, response: str, metadata: Dict[str, Any], ttl_seconds: int = 3600):
"""Store response with embedding and metadata."""
try:
embedding = await self.embedder.aembed_query(query)
payload = {
"query": query,
"response": response,
"metadata": metadata,
"embedding": embedding,
"created_at": time.time()
}
# Store in Redis JSON
key = f"cache:{hash(query)}" # Hash for key stability
self.r.json().set(key, "$", payload)
self.r.expire(key, ttl_seconds)
except Exception as e:
logger.error(f"Cache set error: {e}", exc_info=True)
### Component 2: Adaptive Router with Structured Output
The router uses a lightweight model (`gpt-4o-mini`) to classify intent and complexity. It returns structured JSON. This allows us to route simple queries to cheaper models and complex ones to `gpt-4o`. Crucially, it returns a `cache_strategy` field.
```python
# adaptive_router.py
from pydantic import BaseModel, Field
from typing import Literal
import json
import logging
logger = logging.getLogger(__name__)
class RoutingDecision(BaseModel):
intent: Literal["retrieval", "chat", "analysis", "error"]
complexity: Literal["low", "medium", "high"]
model: Literal["gpt-4o-mini", "gpt-4o", "claude-3-haiku"]
cache_strategy: Literal["aggressive", "standard", "bypass"]
confidence: float = Field(ge=0.0, le=1.0)
class AdaptiveRouter:
"""
Routes queries based on intent and complexity.
Uses structured output to guarantee parseable decisions.
"""
def __init__(self, llm_client):
# Use a small, fast model for routing
self.router_llm = llm_client.with_structured_output(RoutingDecision)
self.system_prompt = """
You are a routing engine. Analyze the user query and return a RoutingDecision.
- retrieval: Needs external data.
- chat: General conversation.
- analysis: Complex reasoning, multi-step.
- error: Malformed or out-of-scope.
Cache Strategy:
- aggressive: High chance of repeat query. Safe to cache loosely.
- standard: Normal caching rules.
- bypass: Query is unique or sensitive. Do not use cache.
Confidence: How sure are you about this classification?
"""
async def route(self, query: str) -> RoutingDecision:
try:
decision = await self.router_llm.ainvoke([
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": query}
])
logger.info(f"Router decision: model={decision.model}, strategy={decision.cache_strategy}, confidence={decision.confidence}")
return decision
except Exception as e:
logger.error(f"Routing failed, falling back to default: {e}")
# Fail-safe default
return RoutingDecision(
intent="retrieval",
complexity="medium",
model="gpt-4o",
cache_strategy="standard",
confidence=0.5
)
Component 3: Orchestrator Integration
This FastAPI endpoint ties cache and router together. It implements the Intent-Gated logic: if the router confidence is low, we force a bypass or stricter threshold to prevent hallucination amplification.
# main.py
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
import asyncio
import logging
import time
app = FastAPI(title="AI SaaS Core", version="2.1.0")
logger = logging.getLogger(__name__)
class QueryRequest(BaseModel):
query: str
user_id: str
# Initialize components (simplified for example)
# In production, inject via dependency injection
cache = SemanticCache(redis_url="redis://localhost:6379", embedder=...)
router = AdaptiveRouter(llm_client=...)
rag_pipeline = ... # Your vector search + generation chain
@app.post("/api/v1/ai/query")
async def handle_query(req: QueryRequest):
start_time = time.perf_counter()
try:
# 1. Route query
decision = await router.route(req.query)
# 2. Determine cache threshold based on router confidence
# If router is unsure, we demand higher similarity to avoid false hits
threshold = 0.95 if decision.confidence < 0.8 else 0.85
# 3. Check Cache (Intent-Gated)
cached_response = None
if decision.cache_strategy != "bypass":
cached_response = await cache.get(req.query, threshold=threshold)
if cached_response:
latency = (time.perf_counter() - start_time) * 1000
return {
"response": cached_response,
"source": "cache",
"latency_ms": round(latency, 2),
"model_used": decision.model
}
# 4. Cache Miss: Execute Pipeline
# Route to selected model
result = await rag_pipeline.run(
query=req.query,
model=decision.model,
user_id=req.user_id
)
# 5. Store in Cache
await cache.set(
query=req.query,
response=result.response,
metadata={"model": decision.model, "user_id": req.user_id},
ttl_seconds=3600
)
latency = (time.perf_counter() - start_time) * 1000
return {
"response": result.response,
"source": "llm",
"latency_ms": round(latency, 2),
"model_used": decision.model
}
except Exception as e:
logger.error(f"Query processing failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal service error")
Pitfall Guide
We encountered these failures during our migration. If you skip this section, you will debug them in production.
Real Production Failures
| Error Message / Symptom | Root Cause | Fix |
|---|
RedisClusterError: CROSSSLOT Keys in request don't hash to the same slot | Using Redis Cluster without hash tags. Vector search and metadata keys hashed to different slots. | Use hash tags in keys: f"{user_id}:cache:{hash}". Ensure all keys for a user route to the same slot. |
TimeoutError: Connection pool exhausted | Redis connection pool size too small for concurrent vector searches. Default pool=10 is insufficient. | Increase pool size: redis.ConnectionPool(max_connections=50). Monitor redis_connected_clients. |
Cache returns stale answer after data update | TTL set to 24h, but underlying vector DB updated. Semantic cache doesn't know about data freshness. | Implement cache.invalidate(user_id) on data mutations. Or use short TTLs (1h) for volatile domains. |
Router hallucinates intent on edge cases | Router prompt lacked negative examples. Model confused "analysis" with "retrieval". | Add few-shot examples to router prompt. Include edge cases in training data. Monitor router_confidence distribution. |
Cost spike: 300% increase in token usage | Semantic cache threshold too loose (0.7). Cache returned partial matches, causing LLM to hallucinate corrections, doubling output tokens. | Raise threshold to 0.85 minimum. Implement "Cache Confidence Score" in response payload to track drift. |
Debugging Checklist
- If latency > 200ms: Check Redis network latency. Ensure vector search is using
FLAT or HNSW index correctly. Verify KNN parameters.
- If cache hit ratio < 20%: Threshold is too high. Analyze query embeddings. Are you using the right embedding model?
text-embedding-3-small may lack granularity for domain-specific jargon.
- If cost savings are minimal: Router is over-selecting expensive models. Tune router prompt to favor
gpt-4o-mini for simple retrieval.
- If users report wrong answers: Check
router.confidence. If low, ensure you are bypassing cache or using aggressive retrieval. Add a "confidence" header to responses for client-side warnings.
Production Bundle
After deploying this pattern across our production cluster (Node.js 22 workers, Python 3.12 inference services):
- Latency: p95 dropped from 1,420ms to 135ms. Cache hits average 12ms.
- Throughput: Sustained 850 requests/second on a single Redis shard and 4 app instances.
- Cache Hit Ratio: 48.2% of queries served from cache. Semantic cache captured 3.2x more hits than exact-match baseline.
- Model Distribution: 62% routed to
gpt-4o-mini, 28% to gpt-4o, 10% to claude-3-haiku for specific domain tasks.
Cost Analysis & ROI
Baseline (Naive RAG):
- Queries: 1,200,000 / month
- Avg tokens/query: 4,500
- Model: 100%
gpt-4o ($5.00 / 1M input, $15.00 / 1M output)
- Monthly Cost: ~$34,200
Optimized (Semantic Cache + Routing):
- Cache Hits: 576,000 queries (Zero LLM cost)
- Remaining: 624,000 queries
- Model Mix: 62% mini ($0.15/$0.60), 38% gpt-4o
- Avg tokens/query: Reduced 15% due to optimized prompts per model.
- Monthly Cost: ~$7,800
- Infra Cost: Redis 7.4 cluster + pgvector: ~$450/mo
- Total Optimized Cost: $8,250/mo
ROI:
- Monthly Savings: $25,950
- Payback Period: Immediate.
- Annual Run-rate Savings: $311,400.
Monitoring Setup
We use Prometheus 2.53 and Grafana 11.1. Critical dashboards:
- Cache Efficiency:
cache_hit_ratio: Gauge. Alert if < 30%.
cache_latency_seconds: Histogram. P99 must be < 50ms.
- Routing Health:
router_confidence_distribution: Histogram. Alert if median confidence < 0.85.
model_routing_count: Counter. Track drift in model selection.
- Cost Tracking:
cost_per_query_dollars: Derived from token counts and model rates.
llm_tokens_consumed: Counter by model.
Scaling Considerations
- Redis Sharding: At >2M queries/day, single Redis shard hits CPU limits on vector search. We shard by
user_id hash tags. Each shard handles ~500k queries/day.
- Embedding Bottleneck: Embedding generation can become the latency killer. We batch embeddings using
asyncio.gather and use a dedicated embedding service with GPU inference (NVIDIA T4) for sub-20ms embedding latency.
- Vector Index Tuning: For >10M vectors,
FLAT index degrades. We switch to HNSW with M=16 and ef_construction=200. This trades 5% accuracy for 10x speedup, acceptable for cache retrieval.
Actionable Checklist
- Audit Current Costs: Calculate cost-per-query and latency percentiles. Identify top 100 frequent queries.
- Deploy Redis 7.4: Set up vector search index. Verify
FT.SEARCH performance.
- Implement Semantic Cache: Add
SemanticCache class. Start with threshold 0.90. Log hits/misses only.
- Build Router: Create
AdaptiveRouter with structured output. Test on 1,000 sample queries.
- Integrate Gating: Connect router confidence to cache threshold. Deploy to staging.
- Load Test: Simulate 2x peak traffic. Monitor Redis memory and CPU. Adjust connection pools.
- Rollout: Canary release to 10% of traffic. Monitor
cache_hit_ratio and error rates.
- Tune: Adjust thresholds based on hit ratio and user feedback. Implement cache invalidation hooks.
- Monitor: Set up alerts for cache miss storms and router confidence drops.
This pattern is battle-tested. It moves your AI SaaS from "prototype working" to "production profitable." Implement semantic caching, route by intent, and stop paying for redundant computation.