nk', 'rmdir', 'shred']"
action: "deny"
severity: "critical"
- id: "restrict_writes_to_tmp"
description: "Allow writes only to temporary directories"
expression: "tool_name == 'write_file' and not path.startswith('/tmp/')"
action: "notify"
severity: "medium"
- id: "network_security"
priority: 5
rules:
- id: "block_internal_scanning"
description: "Prevent access to internal IP ranges"
expression: "tool_name == 'http_request' and re.match(r'^10\.|^192\.168\.', url)"
action: "deny"
severity: "high"
pii_config:
enabled: true
strategies:
- type: "regex"
patterns: ["email", "ssn", "credit_card"]
- type: "local_llm"
provider: "ollama"
model: "gemma3:4b"
endpoint: "http://localhost:11434"
action_on_detection: "redact"
audit_config:
enabled: true
output_path: "./logs/agent_audit.jsonl"
rotation:
max_size_mb: 50
backup_count: 10
#### 2. Safe Expression Evaluation
A critical component of the policy engine is the evaluation of rule conditions. Using Python's `eval()` or `exec()` is strictly prohibited due to code injection risks. Instead, the engine utilizes **Abstract Syntax Tree (AST)** parsing.
When a rule's expression is loaded, it is parsed into an AST. The evaluator then traverses the tree, allowing only a whitelist of safe operations (e.g., comparisons, membership tests, safe function calls like `re.match`). This ensures that even if a policy file is compromised, it cannot execute arbitrary code.
**Implementation Snippet (Evaluator Core):**
```python
import ast
import re
class SafeExpressionEvaluator:
ALLOWED_NODES = {
ast.Compare, ast.BoolOp, ast.UnaryOp, ast.BinOp,
ast.Name, ast.Constant, ast.List, ast.Tuple,
ast.Call, ast.Attribute
}
@classmethod
def validate_syntax(cls, expression: str) -> bool:
try:
tree = ast.parse(expression, mode='eval')
for node in ast.walk(tree):
if type(node) not in cls.ALLOWED_NODES:
return False
return True
except SyntaxError:
return False
@classmethod
def evaluate(cls, expression: str, context: dict) -> bool:
if not cls.validate_syntax(expression):
raise ValueError("Unsafe expression detected")
# Build a restricted namespace for evaluation
safe_globals = {
"re": re,
"len": len,
"str": str,
"int": int,
"True": True,
"False": False,
"None": None
}
tree = ast.parse(expression, mode='eval')
return eval(compile(tree, '<policy>', 'eval'), safe_globals, context)
3. Integration via Decorator Pattern
The policy engine integrates with agent tools using a decorator pattern. This approach requires minimal changes to existing code. The decorator wraps the tool function, intercepts the call, constructs the evaluation context, and checks the policies before execution.
Integration Example:
from agent_shield import PolicyEngine, AgentGuard, PolicyViolationError
# Initialize engine with policy file
engine = PolicyEngine.load("policies.yaml")
guard = AgentGuard(engine)
@guard.protect
def execute_database_query(query: str, connection_id: str):
"""Execute a SQL query against the database."""
# Tool implementation
return db.execute(query, connection_id)
# Usage within agent workflow
try:
result = execute_database_query("SELECT * FROM users", "prod_db")
except PolicyViolationError as e:
# Handle violation: log, alert, or fallback
print(f"Action blocked: {e.rule_id} - {e.reason}")
4. PII Detection and Redaction
Data exfiltration is a primary risk. The system includes a dedicated PII detection layer that scans tool outputs before they are returned to the agent or external systems. This layer supports both regex-based pattern matching and local LLM inference for nuanced detection.
PII Scanner Usage:
from agent_shield.pii import SensitiveDataScanner
scanner = SensitiveDataScanner(config=engine.pii_config)
# Scanning tool output
output_text = "User email is john.doe@example.com, SSN: 123-45-6789"
scan_result = scanner.analyze(output_text)
if scan_result.violations:
# Redact sensitive data
safe_output = scanner.redact(output_text)
print(safe_output)
# Output: "User email is [REDACTED_EMAIL], SSN: [REDACTED_SSN]"
# Log the detection event
engine.audit.log_pii_detection(scan_result)
5. Audit Logging and Compliance
Every enforcement decision is recorded in a JSONL (JSON Lines) audit log. This format supports high-throughput writing and easy parsing. The logger includes rotation support to manage disk usage. Logs contain timestamps, tool names, actions taken, triggered rules, and context snapshots.
Audit Logger Implementation:
import json
import logging
from logging.handlers import RotatingFileHandler
class ComplianceRecorder:
def __init__(self, config: dict):
self.logger = logging.getLogger("agent_audit")
handler = RotatingFileHandler(
config["output_path"],
maxBytes=config["rotation"]["max_size_mb"] * 1024 * 1024,
backupCount=config["rotation"]["backup_count"]
)
handler.setFormatter(logging.Formatter("%(message)s"))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def record_enforcement(self, event: dict):
log_entry = {
"timestamp": event["timestamp"],
"tool": event["tool_name"],
"action": event["action"],
"rule_id": event.get("rule_id"),
"severity": event.get("severity"),
"context_hash": event.get("context_hash")
}
self.logger.info(json.dumps(log_entry))
Pitfall Guide
Implementing policy enforcement requires careful attention to detail. The following pitfalls are common in production deployments.
| Pitfall Name | Explanation | Fix |
|---|
| The Eval Trap | Using eval() or exec() to parse policy expressions allows attackers to execute arbitrary code if they can modify the policy file. | Always use AST-based parsing with a strict whitelist of allowed nodes and functions. Never pass user input directly to an evaluator. |
| Output Blindness | Focusing only on input validation while ignoring tool outputs. Agents may inadvertently leak PII or secrets in their responses. | Implement a mandatory output scanning layer. Apply PII detection and redaction to all tool return values before they reach the model. |
| Context Starvation | Rules fail to trigger because the evaluation context lacks necessary data (e.g., checking user_role but not passing it in the context). | Ensure all tool wrappers inject comprehensive context, including user metadata, session info, and environment variables. |
| Notification Noise | Overusing the notify action leads to alert fatigue. Security teams ignore logs if every minor infraction triggers a notification. | Reserve notify for informational logging. Use deny for critical violations. Implement log aggregation and alerting thresholds based on severity. |
| Policy Drift | Policies become outdated as the agent's capabilities evolve, leading to false positives or missed violations. | Integrate policy validation into the CI/CD pipeline. Use automated tests to verify rules against known tool behaviors. Review policies quarterly. |
| Performance Bottlenecks | Complex regex patterns or frequent LLM calls for PII detection introduce latency, degrading agent responsiveness. | Cache regex compilation. Use async I/O for LLM requests. Consider sampling strategies for high-volume tools. Benchmark latency impact. |
| Dashboard Exposure | Real-time dashboards expose sensitive audit data without authentication, risking information leakage. | Secure the dashboard with strong authentication (OAuth2/MFA). Implement role-based access control (RBAC) for log viewing. |
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| High-Volume Tool Calls | Regex-based PII detection | Low latency, high throughput. Suitable for known patterns. | Low (CPU only) |
| Complex/Unstructured Data | Local LLM (Ollama) PII detection | Higher accuracy for nuanced PII. Handles variations regex might miss. | Medium (GPU/VRAM required) |
| Critical Infrastructure | deny action with immediate alert | Zero tolerance for violations. Prevents damage instantly. | Low (Operational overhead) |
| Development/Staging | notify action with logging | Allows testing without blocking workflows. Collects data for tuning. | Low |
| Compliance Audits | JSONL logs with rotation | Tamper-evident, structured data. Meets retention requirements. | Low (Storage cost) |
Configuration Template
Copy this template to bootstrap your policy configuration. Adjust directives and rules to match your agent's capabilities.
schema_version: "2.1"
metadata:
name: "Agent Policy Template"
version: "1.0.0"
directives:
- id: "default_restrictions"
priority: 100
rules:
- id: "deny_all_by_default"
description: "Fallback rule to deny unlisted tools"
expression: "tool_name not in allowed_tools"
action: "deny"
severity: "critical"
- id: "data_safety"
priority: 50
rules:
- id: "block_ssn_leakage"
description: "Prevent SSN patterns in outputs"
expression: "pii_detected and 'ssn' in pii_types"
action: "deny"
severity: "critical"
pii_config:
enabled: true
strategies:
- type: "regex"
patterns: ["email", "phone", "ssn", "credit_card"]
action_on_detection: "redact"
audit_config:
enabled: true
output_path: "./audit/agent_events.jsonl"
rotation:
max_size_mb: 100
backup_count: 20
Quick Start Guide
-
Install Dependencies:
pip install agent-shield[full]
-
Initialize Policy:
agent-shield init --template production --output policies.yaml
-
Validate Configuration:
agent-shield validate policies.yaml
-
Test a Tool Call:
agent-shield check --tool rm --arg path=/etc/passwd --policy policies.yaml
# Expected: DENIED by rule block_destructive_ops
-
Start Monitoring Dashboard:
agent-shield dashboard --policy policies.yaml --port 8080
Open http://localhost:8080 to view real-time enforcement metrics and audit logs.