Back to KB

reduce false positives.

Difficulty
Advanced
Read Time
92 min

Building an External Control Plane for LLM Agents

By Codcompass TeamΒ·Β·92 min read

Current Situation Analysis

Large Language Models operate on probabilistic token prediction, not deterministic execution. When deployed in production environments, this fundamental characteristic introduces three operational risks that traditional software engineering patterns do not address: policy drift, unbounded resource consumption, and data exfiltration. Teams frequently discover that a model which performs flawlessly in staging will suddenly trigger unauthorized tool calls, leak customer identifiers, or enter recursive reasoning loops that exhaust compute budgets within minutes.

The industry standard response has been to embed safety constraints directly into system prompts or agent orchestration loops. This approach fails for two reasons. First, LLMs are inherently capable of ignoring or overriding their own instructions when faced with adversarial inputs or complex multi-step reasoning. Second, scattering validation logic across multiple agent endpoints creates policy fragmentation. A compliance rule enforced in one service may be silently bypassed in another, leaving audit trails incomplete and security postures inconsistent.

Enterprise deployment data consistently shows that organizations treating LLM outputs as trusted data streams experience a 30–40% incident rate involving PII leakage or policy violations within the first quarter of production use. The architectural correction is straightforward: treat the model as an untrusted compute node and enforce deterministic boundaries at the network edge. By externalizing governance into a dedicated control plane, engineering teams decouple security policy from model iteration, guarantee consistent enforcement across all agent routes, and maintain full observability without modifying internal agent logic.

WOW Moment: Key Findings

The architectural shift from embedded safety checks to an external middleware control plane produces measurable improvements across deployment stability, operational overhead, and compliance posture. The following comparison highlights the operational delta between the two approaches:

ApproachPolicy ConsistencyLatency OverheadBlast RadiusMaintenance Complexity
Prompt-Embedded SafetyFragmented (varies by route/model)Low (~5–15ms)High (agent logic tightly coupled)High (scattered across codebase)
External Middleware Control PlaneCentralized (single enforcement boundary)Moderate (~20–80ms)Low (sandboxed, policy-driven)Low (declarative, version-controlled)

This finding matters because it redefines how engineering teams scale AI systems. When guardrails live outside the agent, policy updates become configuration changes rather than code deployments. Security teams can audit enforcement without reading model-specific orchestration logic. Most importantly, the control plane acts as a circuit breaker: it can terminate runaway tool chains, redact sensitive payloads before they reach downstream services, and emit structured telemetry for cost attribution. The latency trade-off is negligible compared to the risk mitigation gained, and the architectural separation enables multi-model routing without duplicating safety logic.

Core Solution

The control plane architecture relies on FastAPI's middleware stack to intercept, validate, and transform agent responses before they exit the HTTP boundary. This implementation treats the LLM as a black-box compute service and enforces deterministic rules at the transport layer.

Step 1: Design the Interceptor Boundary

FastAPI middleware executes around every request lifecycle. By positioning a safety interceptor after the agent handler but before the response reaches the client, we guarantee that no output bypasses validation. The middleware must handle three responsibilities: policy enforcement, data sanitization, and telemetry emission.

Step 2: Implement Policy Validation & Data Sanitization

We separate concerns by creating dedicated engines for compliance checking and PII redaction. Microsoft Presidio provides context-aware entity detection using spaCy-based NLP models, significantly reducing false positives compared to regex-only approaches. The policy engine evaluates output against a configurable blocklist, while the sanitizer applies deterministic replacement rules.

Step 3: Handle Response Reconstruction Safely

Starlette streams HTTP responses as asynchronous byte chunks. Reassembling the payload requires careful memory management. We implement a strict size limit to prevent unbounded buffering, and we run CPU-intensive NLP analysis in a thread pool to avoid blocking the asyncio event loop.

Step 4: Wire Observability & Cost Tracking

Every intercepted request emits structured metrics. Prometheus histograms track latency distribution, while counters track policy violations and redaction events. OpenTelemetry spans attach to the middleware lifecycle, enabling distributed tracing across agent tool calls and downstream dependencies.

Step 5: Enforce Dependency Injection for Sandboxing

Agent tool access is restricted through scoped dependency injection. The middleware validates that the requesting context holds appropriate permissions before allowing tool execution. This limits the blast radius if an agent enters an unintended execution path.

Implementation

# control_plane/safety_interceptor.py
import time
import asyncio
from typing import Optional
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response, JSONResponse
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
from prometheus_client import Histogram, Counter
import logging

logger = logging.getLogger("control_plane.safety")

# ── Telemetry Definitions ───────────────────────────────────────
LATENCY_HISTOGRAM = Histogram(
    "agent_response_latency_seconds",
    "Time taken to process and validate agent output",
    buckets=[0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)
POLICY_BLOCKED = Counter(
    "agent_policy_blocks_total",
    "Count of responses blocked by compliance rules",
    labelnames=["rule_type"]
)
PII_REDACTED = Counter(
    "agent_pii_redactions_total",
    "Count of PII entities detected and masked",
    labelnames=["entity_category"]
)

# ── Configuration Constants ─────────────────────────────────────
MAX_RESPONSE_BYTES = 2 * 1024 * 1024  # 2MB hard limit
FORBIDDEN_PATTERNS = [
    "ignore previous instructions",
    "override system prompt",
    "disregard safety guidelines",
    "you are now acting as",
]

class ComplianceEngine:
    """Evaluates output against deterministic policy rules."""
    def __init__(self, patterns: list[str]) -> None:
        self._patterns = [p.lower() for p in patterns]

    def evaluate(self, payload: str) -> Optional[str]:
        lower_payload = payload.lower()
        for pattern in self._patterns:
            if pattern in lower_payload:
                return f"pattern_match:{pattern}"
        return None

class DataRedactor:
    """Context-aware PII detection and masking using Presi

dio.""" def init(self, languages: list[str] = None) -> None: self._analyzer = AnalyzerEngine() self._anonymizer = AnonymizerEngine() self._languages = languages or ["en"] self._target_entities = [ "PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "CREDIT_CARD", "IBAN_CODE", "IP_ADDRESS" ] self._replacement_map = { "PERSON": "[REDACTED_USER]", "EMAIL_ADDRESS": "[REDACTED_EMAIL]", "PHONE_NUMBER": "[REDACTED_PHONE]", "CREDIT_CARD": "[REDACTED_CARD]", "IBAN_CODE": "[REDACTED_IBAN]", "IP_ADDRESS": "[REDACTED_IP]", }

def sanitize(self, text: str) -> str:
    all_findings = []
    for lang in self._languages:
        findings = self._analyzer.analyze(
            text=text,
            language=lang,
            entities=self._target_entities,
            score_threshold=0.65,
        )
        all_findings.extend(findings)

    if not all_findings:
        return text

    for finding in all_findings:
        PII_REDACTED.labels(entity_category=finding.entity_type).inc()
        logger.debug("PII detected: type=%s confidence=%.2f", finding.entity_type, finding.score)

    sanitized = self._anonymizer.anonymize(
        text=text,
        analyzer_results=all_findings,
        operators={
            entity: OperatorConfig("replace", {"new_value": replacement})
            for entity, replacement in self._replacement_map.items()
        },
    )
    return sanitized.text

class SafetyInterceptor(BaseHTTPMiddleware): """External control plane middleware for LLM agent responses."""

def __init__(self, app, languages: list[str] = None) -> None:
    super().__init__(app)
    self._compliance = ComplianceEngine(FORBIDDEN_PATTERNS)
    self._redactor = DataRedactor(languages)

async def dispatch(self, request: Request, call_next) -> Response:
    start = time.perf_counter()
    response = await call_next(request)

    # Reassemble streamed body with memory guard
    body_chunks = []
    total_size = 0
    async for chunk in response.body_iterator:
        total_size += len(chunk)
        if total_size > MAX_RESPONSE_BYTES:
            logger.warning("Response exceeds size limit, terminating buffer")
            return JSONResponse(
                status_code=413,
                content={"error": "payload_too_large", "limit_bytes": MAX_RESPONSE_BYTES}
            )
        body_chunks.append(chunk)
    
    raw_payload = b"".join(body_chunks).decode("utf-8", errors="replace")

    # Run CPU-bound NLP analysis off the event loop
    loop = asyncio.get_event_loop()
    violation = await loop.run_in_executor(None, self._compliance.evaluate, raw_payload)
    
    if violation:
        POLICY_BLOCKED.labels(rule_type=violation).inc()
        logger.info("Policy violation intercepted: %s", violation)
        return JSONResponse(
            status_code=403,
            content={"error": "compliance_block", "reason": violation}
        )

    sanitized_payload = await loop.run_in_executor(None, self._redactor.sanitize, raw_payload)

    latency = time.perf_counter() - start
    LATENCY_HISTOGRAM.observe(latency)
    logger.info("Agent response validated: %.3fs | %d chars", latency, len(sanitized_payload))

    return Response(
        content=sanitized_payload.encode("utf-8"),
        status_code=response.status_code,
        headers=dict(response.headers),
        media_type=response.media_type,
    )

### Architecture Decisions & Rationale

1. **External Middleware over Agent Logic**: Embedding safety checks inside agent loops creates coupling. If you swap models or refactor orchestration, guardrails break. Middleware enforces policy at the transport boundary, guaranteeing execution regardless of internal routing.
2. **Thread Pool for NLP Analysis**: Presidio's spaCy models perform CPU-intensive tokenization and entity recognition. Running them directly in an async handler blocks the event loop, degrading throughput for all concurrent requests. `run_in_executor` isolates CPU work while preserving async I/O performance.
3. **Hard Payload Limit**: Streaming responses can theoretically grow indefinitely. A 2MB buffer limit prevents memory exhaustion attacks and forces agents to paginate or truncate outputs, which aligns with production API design standards.
4. **Metric-Driven Observability**: Histograms capture latency distribution, enabling p95/p99 alerting. Counters track violation trends. Both integrate natively with Prometheus/Grafana stacks, providing audit-ready telemetry without custom logging pipelines.
5. **Header & Status Preservation**: Reconstructing the response without copying metadata strips `Content-Type`, `Cache-Control`, and error codes. Explicitly forwarding these fields maintains HTTP contract integrity for downstream clients.

## Pitfall Guide

### 1. Unbounded Response Buffering
**Explanation**: Collecting streamed chunks without a size limit allows malicious or malfunctioning agents to allocate gigabytes of RAM, triggering OOM kills.
**Fix**: Implement a strict byte counter during chunk iteration. Terminate and return `413 Payload Too Large` when the threshold is crossed.

### 2. Blocking the Async Event Loop
**Explanation**: Running NLP models or heavy regex operations directly in `dispatch()` stalls the asyncio loop, causing cascading timeouts across all endpoints.
**Fix**: Offload CPU-bound analysis to `asyncio.to_thread()` or `loop.run_in_executor()`. Keep I/O operations strictly async.

### 3. Middleware Registration Order Blindness
**Explanation**: FastAPI executes middleware in reverse registration order (LIFO). Placing safety checks before authentication wastes compute on unauthorized requests.
**Fix**: Register authentication/authorization middleware last so it executes first. Follow with safety interceptors, then logging/telemetry.

### 4. Logging Raw Sensitive Values
**Explanation**: Accidentally logging PII during debugging or error handling violates GDPR/CCPA and creates audit liabilities.
**Fix**: Log only entity types, confidence scores, and redaction counts. Never log the original payload or matched substrings. Use structured logging with explicit PII filters.

### 5. Regex-Only PII Detection
**Explanation**: Regular expressions cannot distinguish between a credit card number and a product serial number, generating high false-positive rates.
**Fix**: Use context-aware NLP engines like Presidio that leverage linguistic patterns, surrounding tokens, and validation checksums (Luhn, IBAN) to reduce false positives.

### 6. Dropping Original Response Metadata
**Explanation**: Returning a new `Response` object without copying headers and status codes strips `Content-Type`, `ETag`, and error states, breaking client expectations.
**Fix**: Explicitly pass `status_code=response.status_code`, `headers=dict(response.headers)`, and `media_type=response.media_type` when reconstructing the response.

### 7. Ignoring Streaming/SSE Responses
**Explanation**: Server-Sent Events and WebSocket upgrades cannot be buffered synchronously. Applying byte-level reconstruction breaks real-time feeds.
**Fix**: Detect `media_type` or `upgrade` headers early. Bypass buffering for streaming endpoints and apply chunk-level validation or skip sanitization for real-time UI updates.

## Production Bundle

### Action Checklist
- [ ] Define policy blocklist and PII entity targets in a version-controlled configuration file
- [ ] Implement payload size limits and early termination logic in the interceptor
- [ ] Offload NLP analysis to thread pools to prevent event loop blocking
- [ ] Register authentication middleware last to ensure LIFO execution order
- [ ] Configure Prometheus histograms for latency distribution and counters for violation tracking
- [ ] Add OpenTelemetry instrumentation to trace agent tool calls and middleware spans
- [ ] Validate header and status code preservation during response reconstruction
- [ ] Establish alerting thresholds for p95 latency spikes and policy violation rate increases

### Decision Matrix

| Scenario | Recommended Approach | Why | Cost Impact |
|----------|---------------------|-----|-------------|
| Single agent, low compliance requirements | Prompt-embedded safety + lightweight regex | Simpler stack, lower latency overhead | Minimal infrastructure cost |
| Multi-agent platform, strict regulatory compliance | External middleware control plane + Presidio | Centralized policy, audit-ready telemetry, consistent enforcement | Moderate compute overhead (~15–30ms/request) |
| High-throughput public API | Async interceptor with streaming bypass + size limits | Prevents OOM, maintains real-time UX, scales horizontally | Higher memory allocation for buffer management |
| Internal enterprise tooling | Full sanitization + dependency-injected sandboxing | Limits blast radius, enforces least-privilege tool access | Increased DevOps complexity for DI container management |

### Configuration Template

```python
# app/main.py
from fastapi import FastAPI
from control_plane.safety_interceptor import SafetyInterceptor
from control_plane.telemetry import setup_opentelemetry, setup_prometheus
from control_plane.sandbox import ToolAccessDependency

app = FastAPI(title="Agent Control Plane", version="1.0.0")

# Telemetry initialization
setup_prometheus()
setup_opentelemetry(app)

# Dependency injection for sandboxed tool access
app.dependency_overrides[ToolAccessDependency] = ToolAccessDependency

# Middleware stack (LIFO execution)
app.add_middleware(SafetyInterceptor, languages=["en", "es"])
app.add_middleware(AuthenticationGuard)  # Registered last β†’ executes first

@app.get("/agent/query")
async def query_agent(request: dict):
    # Agent logic executes here
    # Middleware intercepts response automatically
    return {"status": "processed"}

Quick Start Guide

  1. Install dependencies: pip install fastapi presidio-analyzer presidio-anonymizer prometheus-client opentelemetry-api opentelemetry-sdk
  2. Create the interceptor module: Copy the SafetyInterceptor, ComplianceEngine, and DataRedactor classes into control_plane/safety_interceptor.py
  3. Register middleware: Add app.add_middleware(SafetyInterceptor) to your FastAPI application after route definitions
  4. Configure telemetry: Initialize Prometheus metrics and OpenTelemetry exporters before starting the server
  5. Deploy and validate: Send a test request containing a known PII pattern and a policy trigger. Verify that the response is redacted, the violation is logged, and Prometheus metrics increment correctly.