uring serialization ensures that variations in JSON key order do not produce different hashes.
import hashlib
import json
from typing import Any, Dict
class CacheSignatureEngine:
"""Generates deterministic cache keys from prompt payloads."""
@staticmethod
def compute_signature(prompt_data: Dict[str, Any]) -> str:
# Serialize with sorted keys to ensure consistency
payload = json.dumps(prompt_data, sort_keys=True)
digest = hashlib.sha256(payload.encode('utf-8')).hexdigest()
return f"llm:resp:{digest}"
2. Redis Response Repository
The repository handles storage, retrieval, and eviction tracking. Using Redis pipelines reduces round-trip latency. A sorted set tracks access times to approximate LRU eviction behavior, which is more efficient than list-based approaches for large datasets.
import redis
import time
from typing import Optional
class ResponseRepository:
"""Manages Redis interactions for caching LLM responses."""
def __init__(self, connection_url: str, default_ttl: int = 3600):
self.client = redis.Redis.from_url(connection_url, decode_responses=True)
self.default_ttl = default_ttl
self.access_log_key = "llm:access_log"
def fetch(self, key: str) -> Optional[str]:
"""Retrieve cached response."""
return self.client.get(key)
def persist(self, key: str, content: str, ttl_override: Optional[int] = None):
"""Store response with TTL and update access log."""
expiry = ttl_override or self.default_ttl
pipe = self.client.pipeline()
# Store payload with expiration
pipe.set(key, content, ex=expiry)
# Update access timestamp for LRU approximation
pipe.zadd(self.access_log_key, {key: time.time()})
# Trim access log to prevent unbounded growth
pipe.zremrangebyrank(self.access_log_key, 0, -10001)
pipe.execute()
def evict_stale(self, max_keys: int = 10000):
"""Remove oldest entries if cache exceeds size limit."""
current_size = self.client.zcard(self.access_log_key)
if current_size > max_keys:
remove_count = current_size - max_keys
oldest_keys = self.client.zrange(self.access_log_key, 0, remove_count - 1)
if oldest_keys:
self.client.delete(*oldest_keys)
self.client.zrem(self.access_log_key, *oldest_keys)
3. Inference Gateway Integration
The gateway wraps the AI client, intercepting requests to check the cache before calling the provider. This pattern centralizes caching logic and keeps business code clean.
class InferenceGateway:
"""Proxy for LLM calls with integrated caching."""
def __init__(self, provider_client: Any, repository: ResponseRepository):
self.provider = provider_client
self.repo = repository
async def resolve(self, query: Dict[str, Any]) -> str:
"""Check cache; if miss, call provider and store result."""
signature = CacheSignatureEngine.compute_signature(query)
# Cache lookup
cached_response = self.repo.fetch(signature)
if cached_response:
return cached_response
# Cache miss: invoke provider
response = await self.provider.complete(query)
# Store result
self.repo.persist(signature, response)
return response
4. Semantic Caching Extension
For semantic caching, generate embeddings for incoming prompts and compare them against stored embeddings using cosine similarity. If similarity exceeds a threshold (e.g., 0.95), return the cached response associated with the nearest neighbor.
class SemanticCacheLayer:
"""Extends caching with vector similarity matching."""
def __init__(self, embedding_model: Any, similarity_threshold: float = 0.95):
self.model = embedding_model
self.threshold = similarity_threshold
async def find_similar(self, query: Dict[str, Any]) -> Optional[str]:
"""Search for semantically similar cached responses."""
query_embedding = await self.model.encode(query)
# Query Redis for nearest neighbors
# Implementation depends on Redis Vector Search module
# Returns cached response if similarity > threshold
pass
5. Cache Warming
Pre-populate the cache with high-frequency queries during deployment or off-peak hours to maximize initial hit rates.
async def seed_cache(gateway: InferenceGateway, frequent_queries: list):
"""Pre-load cache with common prompts."""
for query in frequent_queries:
try:
await gateway.resolve(query)
except Exception as e:
print(f"Failed to warm cache for query: {e}")
Pitfall Guide
Production caching requires careful handling of edge cases. The following pitfalls are common in AI caching implementations.
| Pitfall | Explanation | Fix |
|---|
| Key Explosion | Prompts contain unique IDs or timestamps, preventing cache hits. | Normalize prompts by stripping dynamic tokens before key generation. |
| Stale Data | Cached responses become outdated as underlying data changes. | Implement short TTLs for volatile data; use versioned keys. |
| Cache Stampede | Multiple requests miss simultaneously, overwhelming the API. | Implement locking or single-flight mechanisms to coalesce requests. |
| Memory Bloat | Cache grows unbounded, evicting active data or crashing Redis. | Configure maxmemory-policy and implement size-based eviction. |
| Semantic Drift | Low similarity thresholds return irrelevant responses. | Tune cosine threshold; validate responses with a secondary check. |
| Blocking I/O | Synchronous Redis calls block the event loop in async apps. | Use async Redis clients (e.g., aioredis) or run sync calls in executors. |
| Metric Blindness | No visibility into hit rates or eviction patterns. | Instrument cache operations with Prometheus/Grafana metrics. |
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Static FAQs | Exact Match | Deterministic outputs; high hit rate; low complexity. | High savings; minimal compute overhead. |
| User Chat | Semantic | Captures paraphrased queries; improves UX. | Moderate savings; embedding compute cost. |
| Real-time Data | No Cache / Short TTL | Accuracy critical; data changes frequently. | No savings; ensures freshness. |
| Batch Processing | Exact Match + Warming | Predictable inputs; can pre-compute results. | Maximum savings; reduced API load. |
Configuration Template
Docker Compose for Redis with Memory Limits:
version: '3.8'
services:
redis-cache:
image: redis:7-alpine
command: redis-server --maxmemory 512mb --maxmemory-policy allkeys-lru --save ""
volumes:
- redis-data:/data
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 3
volumes:
redis-data:
Redis Configuration Best Practices:
- Set
maxmemory to 70% of available RAM to leave headroom for fragmentation.
- Use
allkeys-lru for caching workloads to evict least recently used keys.
- Disable persistence (
save "") if cache is ephemeral and can be rebuilt.
Quick Start Guide
- Deploy Redis: Run the Docker Compose configuration above to start a Redis instance with memory limits.
- Install Dependencies: Add
redis and openai (or your provider SDK) to your project.
- Initialize Gateway: Create an
InferenceGateway instance wrapping your AI client and ResponseRepository.
- Instrument Metrics: Add Prometheus counters for cache hits and misses to monitor performance.
- Validate: Run a load test with repeated queries and verify hit rates and latency improvements.