uty"
CLOUDFLARE = "cloudflare"
SYSdig = "sysdig"
class TriageState(BaseModel):
alert_id: str
source_platform: AlertSource
raw_payload: dict
sanitized_payload: dict
retrieved_context: List[str] = Field(default_factory=list)
selected_model: str = ""
triage_analysis: Optional[dict] = None
analyst_decision: Optional[str] = None
audit_log: List[dict] = Field(default_factory=list)
This schema guarantees that every node in the workflow receives predictable inputs and produces trackable outputs. It also isolates the raw payload from downstream processing, preventing accidental leakage.
### Step 2: Implement Pre-Flight Sanitization
Local deployment does not eliminate data exposure risks. Prompts, outputs, and logs are often persisted to disk or vector stores. A sanitization node must run before any model invocation.
```python
import re
from typing import Dict
SECRET_PATTERNS = [
r"(?:AKIA|ASIA)[0-9A-Z]{16}",
r"eyJ[A-Za-z0-9_-]+\.eyJ[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+",
r"(?:password|secret|token|key)\s*[:=]\s*['\"]?[A-Za-z0-9+/=_-]{8,}['\"]?",
r"\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b"
]
def scrub_sensitive_data(payload: Dict) -> Dict:
cleaned = {}
for key, value in payload.items():
if isinstance(value, str):
sanitized = value
for pattern in SECRET_PATTERNS:
sanitized = re.sub(pattern, "[REDACTED]", sanitized, flags=re.IGNORECASE)
cleaned[key] = sanitized
elif isinstance(value, dict):
cleaned[key] = scrub_sensitive_data(value)
else:
cleaned[key] = value
return cleaned
This function recursively traverses nested payloads and replaces known secret formats, JWT tokens, AWS key prefixes, and payment identifiers. Production deployments should extend this with DLP rules specific to their data classification policy.
Step 3: Route to Specialized Models
Different alert types require different reasoning capabilities. A single general-purpose model will underperform on cloud IAM analysis, container runtime telemetry, and detection rule generation. Implement a deterministic router that maps alert metadata to the optimal local model.
MODEL_ROUTING_TABLE = {
"guardduty": "opennix/aws-security-assistant",
"securityhub": "opennix/aws-security-assistant",
"cloudtrail": "opennix/aws-security-assistant",
"sysdig": "fdtn-ai/Foundation-Sec-1.1-8B-Instruct",
"cloudflare": "fdtn-ai/Foundation-Sec-1.1-8B-Instruct",
"detection_rule": "qwen-coder",
"terraform_review": "qwen-coder"
}
def select_inference_engine(alert_source: AlertSource, task_type: str) -> str:
source_key = alert_source.value
if task_type in ("rule_generation", "code_review"):
return MODEL_ROUTING_TABLE.get("detection_rule", "qwen-coder")
return MODEL_ROUTING_TABLE.get(source_key, "fdtn-ai/Foundation-Sec-1.1-8B-Instruct")
This routing strategy prevents context window bloat and reduces inference latency by matching workload characteristics to model specialization. It also simplifies fallback logic: if the primary model fails, the harness can retry with a secondary model without breaking the workflow.
Step 4: Construct the Stateful Graph
LangGraph provides the execution control layer. Define nodes for sanitization, context retrieval, model inference, and validation. Connect them with conditional edges that enforce human approval before final disposition.
from langgraph.graph import StateGraph, END
from pydantic_ai import Agent, RunContext
# Define validation schema for model output
class TriageReport(BaseModel):
summary: str
severity_recommendation: str
confidence_level: str
key_evidence: List[str]
missing_context: List[str]
next_investigation_steps: List[str]
requires_human_review: bool
triage_agent = Agent(
model="ollama:fdtn-ai/Foundation-Sec-1.1-8B-Instruct",
result_type=TriageReport,
system_prompt="You are a SOC triage assistant. Analyze security alerts and return structured findings."
)
def execute_triage(state: TriageState) -> TriageState:
prompt = f"Analyze the following sanitized alert:\n{state.sanitized_payload}"
result = triage_agent.run_sync(prompt)
state.triage_analysis = result.data.model_dump()
state.selected_model = "fdtn-ai/Foundation-Sec-1.1-8B-Instruct"
state.audit_log.append({"step": "inference", "model": state.selected_model, "timestamp": "auto"})
return state
def check_approval_needed(state: TriageState) -> str:
if state.triage_analysis and state.triage_analysis.get("requires_human_review"):
return "await_analyst"
return "finalize"
workflow = StateGraph(TriageState)
workflow.add_node("sanitize", lambda s: s.model_copy(update={"sanitized_payload": scrub_sensitive_data(s.raw_payload)}))
workflow.add_node("route", lambda s: s.model_copy(update={"selected_model": select_inference_engine(s.source_platform, "triage")}))
workflow.add_node("inference", execute_triage)
workflow.add_node("validate", lambda s: s) # PydanticAI handles schema enforcement internally
workflow.set_entry_point("sanitize")
workflow.add_edge("sanitize", "route")
workflow.add_edge("route", "inference")
workflow.add_edge("inference", "validate")
workflow.add_conditional_edges("validate", check_approval_needed, {"await_analyst": "await_analyst", "finalize": END})
workflow.add_node("await_analyst", lambda s: s) # Placeholder for human-in-the-loop UI
app = workflow.compile()
This graph enforces a strict execution order. Sanitization runs first. Routing determines the model. Inference executes with schema validation. Conditional branching routes high-confidence findings directly to finalization, while uncertain results pause for analyst review. The audit_log field accumulates provenance data at each step, satisfying compliance requirements without external logging infrastructure.
Architecture Rationale
- Why LangGraph over CrewAI? CrewAI optimizes for role-based delegation and conversational multi-agent collaboration. SOC triage requires deterministic state transitions, auditability, and controlled tool execution. LangGraph's graph topology maps directly to incident response playbooks.
- Why PydanticAI for validation? Free-form LLM output cannot drive automated ticketing or escalation rules. PydanticAI enforces JSON schema compliance at the API boundary, guaranteeing that downstream systems receive parseable, type-safe objects.
- Why Ollama as the engine? Ollama provides standardized model management, quantization support, and a consistent REST interface. It abstracts hardware acceleration details while allowing seamless swapping between Foundation-Sec, AWS Security Assistant, and Qwen variants.
Pitfall Guide
1. Unbounded Context Injection
Explanation: Feeding raw SIEM logs, full packet captures, or unfiltered CloudTrail dumps into the prompt overwhelms the context window, increases latency, and degrades reasoning quality.
Fix: Implement a context window budget. Retrieve only signals within a ±15 minute window, filter by matching IP/identity, and truncate non-essential fields. Use vector search with similarity thresholds instead of bulk dumps.
2. Silent PII and Secret Leakage
Explanation: Local models do not automatically redact sensitive data. Prompts, completions, and debug logs often contain API keys, session tokens, or customer identifiers that violate data classification policies.
Fix: Run a pre-inference sanitization pipeline. Maintain a regularly updated regex/DLP rule set. Encrypt prompt/response logs at rest and restrict access to security engineering teams only.
Explanation: Allowing the AI to execute IAM revocations, firewall rule changes, or container quarantines based on a single inference step introduces catastrophic failure modes.
Fix: Enforce a human-in-the-loop checkpoint for all write operations. The AI should only recommend actions. Execution must pass through a separate approval workflow with role-based access control.
4. Single-Model Monoculture
Explanation: Relying on one general-purpose model for cloud IAM analysis, container telemetry, and detection rule drafting results in suboptimal accuracy and higher hallucination rates.
Fix: Implement deterministic model routing based on alert source and task type. Maintain a fallback model registry. Validate routing decisions with periodic accuracy benchmarks.
5. Ignoring Deterministic Fallbacks
Explanation: When the model times out, returns malformed JSON, or produces low-confidence output, the workflow stalls or propagates invalid data.
Fix: Design fail-closed behavior. If validation fails or confidence drops below a threshold, route the alert to the standard SOC queue with a note: AI enrichment unavailable. Proceed with manual triage. Never block analyst workflows.
6. Missing Audit Provenance
Explanation: Compliance frameworks require traceable decision paths. Without logging model versions, prompt templates, context sources, and analyst overrides, incident reconstruction becomes impossible.
Fix: Append a structured audit entry at every graph node. Include timestamps, model identifiers, prompt hashes, and state transitions. Export logs to an immutable storage tier for retention.
7. Premature Multi-Agent Orchestration
Explanation: Introducing CrewAI-style agent teams before establishing baseline workflow control adds unnecessary complexity, increases latency, and obscures failure points.
Fix: Start with a single deterministic graph. Add parallel context retrieval or secondary validation nodes only after the core pipeline achieves >90% structured output compliance and stable latency.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| High-volume cloud alerts (GuardDuty, Security Hub) | AWS Security Assistant + Ollama | Optimized for IAM, CloudTrail, and AWS service telemetry | Low (8B parameter quantized) |
| Cross-cloud/container runtime (Sysdig, GCP, Cloudflare) | Foundation-Sec-1.1-8B-Instruct | Balanced reasoning across heterogeneous cloud signals | Low-Medium |
| Detection rule drafting / Terraform review | Qwen Coder variant | Specialized in YAML, Sigma, HCL, and code generation | Low |
| Laptop/edge testing or CI validation | Small Llama/Gemma instruct (3B-7B) | Fast iteration, minimal VRAM requirements | Negligible |
| Production SOC with compliance requirements | LangGraph + PydanticAI + Ollama | Deterministic execution, schema enforcement, audit trails | Medium (infrastructure + engineering time) |
Configuration Template
# soc-triage-config.yaml
inference:
engine: ollama
default_model: fdtn-ai/Foundation-Sec-1.1-8B-Instruct
fallback_model: opennix/aws-security-assistant
max_context_tokens: 4096
temperature: 0.1
timeout_seconds: 30
routing:
guardduty: opennix/aws-security-assistant
securityhub: opennix/aws-security-assistant
cloudtrail: opennix/aws-security-assistant
sysdig: fdtn-ai/Foundation-Sec-1.1-8B-Instruct
cloudflare: fdtn-ai/Foundation-Sec-1.1-8B-Instruct
detection_rule: qwen-coder
terraform_review: qwen-coder
sanitization:
enabled: true
patterns:
- aws_key_prefix
- jwt_token
- generic_secret
- payment_card
log_redacted: false
workflow:
graph_engine: langgraph
validation: pydantic_ai
require_human_approval: true
approval_threshold: 0.7
audit_retention_days: 365
fail_closed: true
Quick Start Guide
- Install the inference engine: Run
ollama pull fdtn-ai/Foundation-Sec-1.1-8B-Instruct and ollama pull opennix/aws-security-assistant to cache models locally.
- Initialize the project: Create a virtual environment, install dependencies (
pip install langgraph pydantic-ai ollama), and place the configuration template in your project root.
- Define the graph: Copy the state schema, sanitization function, routing logic, and LangGraph workflow into a single
triage_pipeline.py file.
- Execute a test alert: Pass a sanitized JSON payload matching the
TriageState schema to app.invoke(initial_state). Verify that the output matches the TriageReport schema and that audit logs are populated.
- Integrate with your SIEM: Configure a webhook or event bridge to forward alerts to the pipeline. Route the structured output to your ticketing system, PagerDuty, or analyst dashboard. Enable human approval gates for high-severity findings.
This architecture transforms local AI from an experimental chatbot into a controlled, auditable enrichment engine. By prioritizing workflow determinism over model size, SOC teams achieve faster triage, stricter compliance, and sustainable operational control.