TInput) => Promise<TOutput>;
interface AgentNode {
id: string;
execute: (state: AgentState) => Promise<AgentState>;
guardrails?: GuardrailFn[];
}
interface AgentEdge {
from: string;
to: string;
condition: (state: AgentState) => boolean;
}
interface AgentState {
executionId: string;
currentNode: string;
history: ExecutionLog[];
data: Record<string, unknown>;
budget: { tokens: number; steps: number };
}
type GuardrailFn = (state: AgentState) => Promise<boolean>;
type TelemetryEvent = { type: string; payload: Record<string, unknown>; timestamp: number };
// βββ Core Orchestrator ββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class StatefulAgentOrchestrator extends EventEmitter {
private nodes: Map<string, AgentNode> = new Map();
private edges: AgentEdge[] = [];
private toolRegistry: Map<string, { schema: z.ZodTypeAny; fn: ToolFn<any, any> }> = new Map();
private telemetry: TelemetryEvent[] = [];
constructor(private maxSteps: number = 50) {
super();
}
registerNode(node: AgentNode): this {
this.nodes.set(node.id, node);
return this;
}
registerEdge(edge: AgentEdge): this {
this.edges.push(edge);
return this;
}
registerTool<TIn, TOut>(
name: string,
schema: z.ZodType<TIn>,
fn: ToolFn<TIn, TOut>
): this {
this.toolRegistry.set(name, { schema, fn });
return this;
}
async execute(initialState: AgentState): Promise<AgentState> {
let state = { ...initialState };
let stepCount = 0;
while (stepCount < this.maxSteps) {
const currentNode = this.nodes.get(state.currentNode);
if (!currentNode) throw new Error(`Node ${state.currentNode} not registered`);
// Guardrail evaluation
if (currentNode.guardrails) {
const passed = await Promise.all(
currentNode.guardrails.map(g => g(state))
);
if (!passed.every(Boolean)) {
this.emit('guardrail_failed', { nodeId: currentNode.id, state });
return { ...state, data: { ...state.data, _error: 'GUARDRAIL_VIOLATION' } };
}
}
// Execute node logic
state = await currentNode.execute(state);
stepCount++;
// Log execution
state.history.push({
nodeId: currentNode.id,
timestamp: Date.now(),
step: stepCount,
stateSnapshot: JSON.parse(JSON.stringify(state))
});
this.emit('step_complete', { nodeId: currentNode.id, step: stepCount });
// Route to next node
const nextEdge = this.edges.find(e => e.from === currentNode.id && e.condition(state));
if (!nextEdge) break;
state.currentNode = nextEdge.to;
}
return state;
}
async invokeTool<TIn, TOut>(name: string, input: TIn): Promise<TOut> {
const tool = this.toolRegistry.get(name);
if (!tool) throw new Error(Tool ${name} not registered);
const validated = tool.schema.parse(input);
const result = await tool.fn(validated);
this.telemetry.push({
type: 'tool_invocation',
payload: { name, input: validated, output: result },
timestamp: Date.now()
});
return result;
}
getTelemetry(): TelemetryEvent[] {
return [...this.telemetry];
}
}
// βββ Usage Example ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
async function bootstrapAgent() {
const orchestrator = new StatefulAgentOrchestrator(30);
// Define tools with strict schemas
orchestrator.registerTool(
'fetch_customer_profile',
z.object({ customerId: z.string().uuid() }),
async ({ customerId }) => ({ id: customerId, tier: 'enterprise', status: 'active' })
);
orchestrator.registerTool(
'generate_support_ticket',
z.object({ summary: z.string().min(10), priority: z.enum(['low', 'medium', 'high']) }),
async ({ summary, priority }) => ({ ticketId: 'TK-9921', status: 'open', priority })
);
// Register nodes
orchestrator.registerNode({
id: 'ingest_request',
execute: async (state) => {
const profile = await orchestrator.invokeTool('fetch_customer_profile', {
customerId: (state.data.requestId as string) ?? 'default-uuid'
});
return { ...state, data: { ...state.data, profile } };
},
guardrails: [
async (s) => s.budget.steps > 0 // Simple step budget guardrail
]
});
orchestrator.registerNode({
id: 'route_decision',
execute: async (state) => {
const tier = (state.data.profile as any)?.tier;
return {
...state,
data: { ...state.data, routingTarget: tier === 'enterprise' ? 'escalate' : 'standard' }
};
}
});
orchestrator.registerNode({
id: 'escalate',
execute: async (state) => {
const ticket = await orchestrator.invokeTool('generate_support_ticket', {
summary: 'Enterprise escalation required',
priority: 'high'
});
return { ...state, data: { ...state.data, ticket } };
}
});
// Define routing edges
orchestrator.registerEdge({
from: 'ingest_request',
to: 'route_decision',
condition: (s) => !!s.data.profile
});
orchestrator.registerEdge({
from: 'route_decision',
to: 'escalate',
condition: (s) => (s.data.routingTarget as string) === 'escalate'
});
// Execute
const initialState: AgentState = {
executionId: 'exec-001',
currentNode: 'ingest_request',
history: [],
data: { requestId: '550e8400-e29b-41d4-a716-446655440000' },
budget: { tokens: 10000, steps: 20 }
};
const result = await orchestrator.execute(initialState);
console.log('Execution complete:', result.data);
console.log('Telemetry events:', orchestrator.getTelemetry().length);
}
bootstrapAgent().catch(console.error);
### Why This Architecture Works
The orchestrator separates concerns cleanly: state lives in a serializable object, tools are validated at the boundary, and routing is declarative. This eliminates the most common production failure modeβimplicit state driftβby forcing every transition to pass through explicit conditions. The telemetry layer attaches directly to tool invocations and state mutations, providing a complete audit trail without external dependencies. Guardrails run synchronously before execution, preventing costly LLM calls when constraints are violated. This pattern scales from single-agent workflows to multi-agent handoff systems because the execution loop remains deterministic regardless of node complexity.
## Pitfall Guide
### 1. Implicit State Accumulation
**Explanation**: Relying on LLM context windows to carry state across steps causes token bloat, non-deterministic behavior, and unbounded costs. Context windows are not databases.
**Fix**: Maintain a typed state object that persists across nodes. Serialize critical data at each step and pass only necessary payloads to the LLM. Use explicit state snapshots for rollback and auditing.
### 2. Over-Engineering Multi-Agent Communication
**Explanation**: Teams often design conversational agent-to-agent negotiation patterns before validating single-agent tool routing. This introduces latency, ambiguous delegation, and debugging nightmares.
**Fix**: Start with a single agent and structured tool routing. Only introduce multi-agent handoffs when a single execution path exceeds cognitive or computational boundaries. Use explicit routing tables instead of conversational negotiation.
### 3. Ignoring Guardrail Latency
**Explanation**: Guardrails that run synchronously on every step add cumulative latency. Teams often place them after LLM invocation, wasting tokens on invalid requests.
**Fix**: Run guardrails before tool calls and LLM invocations. Implement async validation with fallback paths. Cache repeated validation results where possible. Set execution budgets at the orchestrator level, not the node level.
### 4. Ambiguous Tool Definitions
**Explanation**: Tools with loose input/output schemas cause malformed calls, state corruption, and silent failures. LLMs will guess parameter types when schemas are under-specified.
**Fix**: Define every tool with strict Zod schemas. Validate inputs before execution and outputs before state mutation. Include error schemas in tool responses to handle failures gracefully. Never allow untyped `any` in production tool contracts.
### 5. Observability Afterthought
**Explanation**: Telemetry is often bolted on after deployment, resulting in missing execution traces, unstructured logs, and blind spots in failure paths.
**Fix**: Instrument the orchestrator from day one. Emit structured events for every state transition, tool invocation, and guardrail evaluation. Use execution IDs to correlate logs across steps. Store telemetry in a queryable format, not console output.
### 6. Visual-First for Complex Logic
**Explanation**: Drag-and-drop platforms accelerate prototyping but struggle with conditional branching, state persistence, and error recovery. Teams often hit a ceiling when workflows require deterministic control.
**Fix**: Use visual tools for validation and stakeholder demos. Migrate critical paths to code-based orchestration before production. Maintain a clear boundary between prototyping and deployment architectures.
### 7. Cost Blindness in Execution Loops
**Explanation**: Unbounded loops and missing step budgets cause runaway token consumption. Teams rarely instrument execution costs per node, making optimization impossible.
**Fix**: Define step and token budgets at the orchestrator level. Implement early termination conditions. Log cost metrics per execution path. Use fallback routes when budgets are exhausted.
## Production Bundle
### Action Checklist
- [ ] Define explicit state schema: Map every data field that persists across execution steps. Serialize state at each node transition.
- [ ] Implement entry/exit guardrails: Validate constraints before LLM calls and after tool execution. Set execution budgets at the orchestrator level.
- [ ] Schema-validate all tools: Use Zod or equivalent to enforce input/output contracts. Reject malformed calls before they reach the LLM.
- [ ] Instrument structured telemetry: Emit events for state transitions, tool invocations, and guardrail evaluations. Correlate logs with execution IDs.
- [ ] Design routing tables, not conversations: Map state conditions to target nodes explicitly. Avoid agent-to-agent negotiation unless absolutely necessary.
- [ ] Set execution budgets: Define maximum steps and token limits per execution path. Implement early termination and fallback routes.
- [ ] Test failure paths: Simulate tool failures, guardrail violations, and budget exhaustion. Verify state recovery and telemetry completeness.
### Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|----------|---------------------|-----|-------------|
| Rapid prototyping / stakeholder demos | Visual orchestrators (Dify, Flowise) | Zero-code iteration, immediate feedback loops | Low initial, high migration cost if production-ready |
| High-stakes automation / regulated workflows | Graph-based state machines (LangGraph, custom orchestrator) | Deterministic branching, explicit state, native tracing | Medium setup, low operational risk |
| TypeScript/JavaScript teams | TypeScript-native frameworks (Mastra, custom Zod-based) | End-to-end typing, IDE autocomplete, schema validation | Medium setup, high developer velocity |
| Python/R&D / academic multi-agent research | Role-based frameworks (CrewAI, AutoGen/AG2) | Conversational patterns, rapid experimentation | Low setup, high production migration cost |
| Enterprise RAG / data extraction | Integration-heavy frameworks (LangChain, Haystack) | Broad connector ecosystem, retrieval optimization | Medium setup, high token cost if unoptimized |
### Configuration Template
```typescript
// orchestrator.config.ts
import { z } from 'zod';
import { StatefulAgentOrchestrator } from './orchestrator';
export function createProductionOrchestrator() {
const orchestrator = new StatefulAgentOrchestrator(25);
// Tool registry with strict schemas
orchestrator.registerTool(
'query_knowledge_base',
z.object({ query: z.string().min(5), maxResults: z.number().min(1).max(10) }),
async ({ query, maxResults }) => ({ results: [], relevance: 0.85 })
);
orchestrator.registerTool(
'validate_response',
z.object({ content: z.string(), confidence: z.number().min(0).max(1) }),
async ({ content, confidence }) => ({ valid: confidence > 0.7, content })
);
// Guardrail factory
const createBudgetGuardrail = (maxTokens: number) => async (state: any) => {
return state.budget.tokens >= maxTokens;
};
// Node definitions
orchestrator.registerNode({
id: 'retrieve_context',
execute: async (state) => {
const context = await orchestrator.invokeTool('query_knowledge_base', {
query: state.data.userQuery as string,
maxResults: 5
});
return { ...state, data: { ...state.data, context } };
},
guardrails: [createBudgetGuardrail(2000)]
});
orchestrator.registerNode({
id: 'synthesize_answer',
execute: async (state) => {
// LLM invocation would occur here with state.data.context
const response = { content: 'Generated response', confidence: 0.88 };
return { ...state, data: { ...state.data, response } };
}
});
orchestrator.registerNode({
id: 'quality_gate',
execute: async (state) => {
const validation = await orchestrator.invokeTool('validate_response', {
content: (state.data.response as any).content,
confidence: (state.data.response as any).confidence
});
return { ...state, data: { ...state.data, validated: validation.valid } };
}
});
// Routing edges
orchestrator.registerEdge({ from: 'retrieve_context', to: 'synthesize_answer', condition: () => true });
orchestrator.registerEdge({ from: 'synthesize_answer', to: 'quality_gate', condition: () => true });
orchestrator.registerEdge({ from: 'quality_gate', to: 'synthesize_answer', condition: (s) => !s.data.validated });
orchestrator.registerEdge({ from: 'quality_gate', to: 'complete', condition: (s) => s.data.validated });
return orchestrator;
}
Quick Start Guide
- Initialize the orchestrator: Import the
StatefulAgentOrchestrator class and instantiate it with a step budget. Configure telemetry listeners to capture execution events.
- Register tools with schemas: Define every external function using Zod schemas. Validate inputs before execution and outputs before state mutation. Attach error handling to each tool.
- Define nodes and edges: Create execution nodes that mutate state explicitly. Map routing edges with deterministic conditions. Avoid implicit branching or conversational delegation.
- Execute and monitor: Pass an initial state object to the
execute method. Listen for step_complete and guardrail_failed events. Retrieve telemetry after execution to audit performance and cost.