follows four discrete stages: embedding normalization, index construction, rank-weighted scoring, and trajectory smoothing.
Step 1: Embedding Pipeline & Normalization
All steps must be encoded using a consistent model. The all-MiniLM-L6-v2 architecture provides an optimal balance of dimensionality (384) and semantic density. Quantize the model to ONNX format to eliminate Python interpreter overhead. Crucially, every vector must be L2-normalized before indexing or querying. FAISS IndexFlatIP computes inner products; on normalized vectors, inner product equals cosine similarity. Skipping normalization breaks the metric.
Step 2: Index Construction
Load the policy dataset into a FAISS flat index. The index stores precomputed embeddings alongside binary safety labels. Flat indexes trade memory for deterministic recall, which is acceptable for policy allowlists that typically range from 5,000 to 15,000 entries.
Step 3: Rank-Weighted Scoring
Retrieve the top-k neighbors. Compute softmax-normalized weights from the similarity scores. Multiply each weight by its neighbor's safety label. The sum yields a probability in [0, 1] representing the likelihood of unsafe behavior.
Step 4: Trajectory Smoothing
Single-step scores fluctuate due to linguistic variance. Apply an exponential moving average (EMA) across the session trajectory. The EMA dampens transient spikes while preserving sustained drift patterns.
Implementation
import numpy as np
import faiss
from typing import List, Tuple
from dataclasses import dataclass
@dataclass
class SafetyThresholds:
warn_level: float = 0.45
kill_level: float = 0.70
class TrajectoryGuardrail:
def __init__(
self,
embeddings: np.ndarray,
labels: np.ndarray,
k_neighbors: int = 5,
ema_alpha: float = 0.3,
thresholds: SafetyThresholds | None = None
):
"""
Initialize the guardrail with a pre-normalized policy index.
Args:
embeddings: (N, D) float32, L2-normalized policy vectors
labels: (N,) int {0: safe, 1: unsafe}
k_neighbors: Number of neighbors for rank-weighted voting
ema_alpha: Smoothing factor for trajectory drift
thresholds: Warning and session-termination boundaries
"""
self.k = k_neighbors
self.alpha = ema_alpha
self.thresholds = thresholds or SafetyThresholds()
self.labels = labels.astype(np.float32)
dim = embeddings.shape[1]
self.index = faiss.IndexFlatIP(dim)
self.index.add(embeddings)
# EMA state for trajectory smoothing
self._ema_score = 0.0
self._initialized = False
def compute_soft_vote(self, query_vec: np.ndarray) -> float:
"""
Retrieve top-k neighbors and compute rank-weighted unsafe probability.
Args:
query_vec: (1, D) float32, L2-normalized query vector
Returns:
Float in [0, 1] representing unsafe probability
"""
similarities, indices = self.index.search(query_vec, self.k)
# Extract raw similarities and corresponding labels
sims = similarities[0] # shape: (k,)
neighbor_labels = self.labels[indices[0]]
# Softmax normalization converts similarities to probability weights
exp_sims = np.exp(sims)
weights = exp_sims / exp_sims.sum()
# Weighted sum of labels yields soft unsafe probability
unsafe_prob = float(np.dot(weights, neighbor_labels))
return unsafe_prob
def evaluate_step(self, query_vec: np.ndarray) -> Tuple[float, str]:
"""
Score a single step and apply EMA smoothing.
Returns (smoothed_score, decision).
"""
raw_score = self.compute_soft_vote(query_vec)
if not self._initialized:
self._ema_score = raw_score
self._initialized = True
else:
self._ema_score = self.alpha * raw_score + (1 - self.alpha) * self._ema_score
decision = "ALLOW"
if self._ema_score >= self.thresholds.kill_level:
decision = "KILL_SESSION"
elif self._ema_score >= self.thresholds.warn_level:
decision = "WARN"
return self._ema_score, decision
def reset_trajectory(self) -> None:
"""Clear EMA state for new sessions."""
self._ema_score = 0.0
self._initialized = False
Architecture Decisions & Rationale
- FAISS
IndexFlatIP over HNSW: Flat indexes guarantee exact nearest-neighbor retrieval. Approximate indexes introduce recall variance that compounds with soft voting. For policy allowlists under 20k vectors, flat search remains sub-millisecond on modern CPUs.
- Softmax Weighting: Linear rank weighting (
1/i) assumes uniform semantic decay. Softmax preserves the exponential nature of cosine similarity distributions, giving disproportionate influence to the closest neighbors while still incorporating boundary context.
- EMA Smoothing (α=0.3): A lower alpha prioritizes historical context, preventing single-step linguistic noise from triggering false positives. A higher alpha reacts faster to abrupt drift. 0.3 balances responsiveness with stability across 6–12 step trajectories.
- Threshold Separation: The 0.45/0.70 split creates a hysteresis band. Steps scoring between thresholds enter a warning state, allowing observability systems to log context without interrupting execution. This prevents cascading session terminations from transient embedding fluctuations.
- ONNX Quantization: The embedding model dominates latency (~10ms). Quantizing to INT8 reduces inference time by 40–60% with negligible semantic degradation. The scoring step itself runs in ~0.3ms, keeping total per-step overhead near 11ms.
Pitfall Guide
| Pitfall | Explanation | Fix |
|---|
| Skipping L2 Normalization | FAISS IndexFlatIP computes dot products. Without normalization, magnitude dominates direction, breaking cosine similarity semantics. | Apply vec / np.linalg.norm(vec) to both index and query vectors. Verify with np.allclose(np.linalg.norm(vecs, axis=1), 1.0). |
| Hard Thresholding Without Smoothing | Single-step decisions amplify linguistic variance. A benign step with unusual phrasing can cross the threshold, causing false positives. | Implement EMA or rolling window averaging. Use the hysteresis band (warn vs kill) to absorb transient spikes. |
| Over-Expanding k Beyond 7 | Distant neighbors fall into unrelated semantic clusters. Their labels introduce noise that dilutes the vote and degrades precision. | Cap k at 5 or 7. Validate with a holdout adversarial set. Monitor precision/recall divergence as k increases. |
| Ignoring Embedding Latency | Scoring is fast, but model inference dominates. Synchronous embedding calls block the agent loop, causing timeout cascades. | Precompute embeddings for static policy steps. Use ONNX runtime with batched inference. Offload embedding to a dedicated worker thread or async task. |
| Label Imbalance in Index | Policy datasets are heavily skewed toward safe examples. The softmax weights naturally favor the majority class, masking unsafe signals. | Apply class-weighted voting, oversample unsafe examples synthetically, or use focal loss-style weighting during index construction. |
| Static Thresholds Across Environments | Internal tooling and external API gateways require different risk tolerances. One threshold fits neither. | Parameterize thresholds per environment. Load from configuration. Allow runtime adjustment via feature flags during incident response. |
| Query Preprocessing Mismatch | If the query pipeline applies different tokenization, truncation, or normalization than the index pipeline, similarity scores become meaningless. | Share a single preprocessing function between index builder and runtime scorer. Unit test with known semantic pairs to verify distance consistency. |
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Internal dev tools, low-risk data | k=3, single-step threshold, no EMA | Faster iteration, lower memory, acceptable false positive rate | Minimal CPU overhead, reduced index size |
| Production agent workflows, mixed data | k=5, EMA smoothing, hysteresis thresholds | Balances precision/recall, absorbs linguistic noise, prevents cascading kills | ~11ms/step, standard CPU allocation |
| Regulated/financial environments, high-risk actions | k=5, EMA + secondary signals (Jaccard, plan match), dynamic thresholds | Defense-in-depth, auditability, compliance-ready decision trails | Higher compute budget, requires signal aggregator service |
| High-throughput batch processing | Precomputed embeddings, batched FAISS search, async scoring | Eliminates per-step latency bottleneck, scales linearly with worker count | Increased memory for batch buffers, requires queue infrastructure |
Configuration Template
guardrail:
model:
name: "all-MiniLM-L6-v2"
format: "onnx"
quantization: "int8"
dimension: 384
index:
type: "flat_ip"
k_neighbors: 5
policy_path: "/data/policy_embeddings.npy"
labels_path: "/data/policy_labels.npy"
scoring:
ema_alpha: 0.3
thresholds:
warn: 0.45
kill: 0.70
normalization: "l2"
runtime:
embedding_batch_size: 32
scoring_timeout_ms: 50
fallback_decision: "WARN"
metrics:
enabled: true
export_format: "prometheus"
Quick Start Guide
- Install dependencies:
pip install faiss-cpu numpy onnxruntime
- Prepare policy data: Export your safety allowlist as two NumPy arrays:
policy_embeddings.npy (N, 384) and policy_labels.npy (N,). Ensure embeddings are L2-normalized.
- Initialize the guardrail: Load the arrays, instantiate
TrajectoryGuardrail(k_neighbors=5, ema_alpha=0.3), and verify index size with index.ntotal.
- Run a test query: Encode a sample step using the ONNX model, normalize the output, and call
evaluate_step(). Inspect the returned score and decision.
- Tune thresholds: Adjust
warn and kill levels based on your environment's risk tolerance. Monitor precision/recall on a validation set before locking configuration.
The rank-weighted voting mechanism transforms semantic guardrails from brittle single-point checks into resilient probabilistic filters. By expanding the decision surface across multiple neighbors, smoothing across trajectory history, and enforcing strict normalization pipelines, you eliminate the boundary-exploitation vector that plagues standard nearest-neighbor approaches. The result is a governance layer that operates at sub-15ms latency, maintains high precision under adversarial pressure, and integrates cleanly into existing agent orchestration frameworks.