opaque AI calls into traceable workflows, enabling real-time monitoring and post-mortem analysis.
Implementation
// interfaces.ts
export interface AgentPayload {
taskId: string;
input: Record<string, unknown>;
metadata: {
step: string;
timestamp: number;
traceId: string;
};
}
export interface ReflectionResult {
confidenceScore: number;
diagnostics: string[];
suggestedAdjustments: string[];
status: 'pass' | 'fail' | 'review';
}
export interface ExecutionState {
plan: string[];
searchResults: unknown[];
reflection: ReflectionResult | null;
memory: Map<string, unknown>;
}
// agents/planner.ts
import { OpenAI } from 'openai';
export class TaskPlanner {
private readonly client: OpenAI;
constructor(apiKey: string) {
this.client = new OpenAI({ apiKey });
}
async decomposeIntent(payload: AgentPayload): Promise<string[]> {
const prompt = `
Analyze the following request and extract actionable execution steps.
Return only a JSON array of strings. Do not include explanations.
Request: ${JSON.stringify(payload.input)}
`;
const response = await this.client.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: prompt }],
temperature: 0.2,
response_format: { type: 'json_object' }
});
const content = response.choices[0]?.message?.content;
if (!content) throw new Error('Planner returned empty response');
return JSON.parse(content).steps as string[];
}
}
// agents/search.ts
import { OpenAI } from 'openai';
export class SemanticRetriever {
private readonly client: OpenAI;
constructor(apiKey: string) {
this.client = new OpenAI({ apiKey });
}
async findMatches(query: string, candidatePool: string[]): Promise<{ id: string; score: number }[]> {
const queryEmbedding = await this.getEmbedding(query);
const scored = candidatePool.map((candidate, index) => {
const similarity = this.cosineSimilarity(queryEmbedding, candidate);
return { id: `candidate_${index}`, score: similarity };
});
return scored
.filter(item => item.score > 0.75)
.sort((a, b) => b.score - a.score);
}
private async getEmbedding(text: string): Promise<number[]> {
const response = await this.client.embeddings.create({
model: 'text-embedding-3-large',
input: text
});
return response.data[0].embedding;
}
private cosineSimilarity(vecA: number[], vecB: number[]): number {
const dotProduct = vecA.reduce((sum, val, i) => sum + val * vecB[i], 0);
const magA = Math.sqrt(vecA.reduce((sum, val) => sum + val * val, 0));
const magB = Math.sqrt(vecB.reduce((sum, val) => sum + val * val, 0));
return dotProduct / (magA * magB);
}
}
// agents/reflection.ts
import { OpenAI } from 'openai';
export class QualityReflector {
private readonly client: OpenAI;
constructor(apiKey: string) {
this.client = new OpenAI({ apiKey });
}
async evaluate(payload: AgentPayload, results: unknown[]): Promise<ReflectionResult> {
const prompt = `
Evaluate the execution results against the original request.
Return a JSON object with: confidenceScore (0-1), diagnostics (array of strings),
suggestedAdjustments (array of strings), status (pass/fail/review).
Request: ${JSON.stringify(payload.input)}
Results: ${JSON.stringify(results)}
`;
const response = await this.client.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: prompt }],
temperature: 0.1,
response_format: { type: 'json_object' }
});
const content = response.choices[0]?.message?.content;
if (!content) throw new Error('Reflector returned empty response');
return JSON.parse(content) as ReflectionResult;
}
}
// runtime/orchestrator.ts
import { TaskPlanner } from './agents/planner';
import { SemanticRetriever } from './agents/search';
import { QualityReflector } from './agents/reflection';
import { AgentPayload, ExecutionState } from './interfaces';
export class AgentOrchestrator {
private readonly planner: TaskPlanner;
private readonly retriever: SemanticRetriever;
private readonly reflector: QualityReflector;
private readonly telemetry: Map<string, unknown[]>;
constructor(config: { openaiKey: string }) {
this.planner = new TaskPlanner(config.openaiKey);
this.retriever = new SemanticRetriever(config.openaiKey);
this.reflector = new QualityReflector(config.openaiKey);
this.telemetry = new Map();
}
async execute(payload: AgentPayload, candidatePool: string[]): Promise<ExecutionState> {
const state: ExecutionState = {
plan: [],
searchResults: [],
reflection: null,
memory: new Map()
};
// Phase 1: Planning
this.logTelemetry(payload.traceId, 'planner.start');
state.plan = await this.planner.decomposeIntent(payload);
this.logTelemetry(payload.traceId, 'planner.complete', { steps: state.plan.length });
// Phase 2: Semantic Retrieval
this.logTelemetry(payload.traceId, 'search.start');
const query = state.plan.join(' ');
state.searchResults = await this.retriever.findMatches(query, candidatePool);
this.logTelemetry(payload.traceId, 'search.complete', { matches: state.searchResults.length });
// Phase 3: Reflection
this.logTelemetry(payload.traceId, 'reflection.start');
state.reflection = await this.reflector.evaluate(payload, state.searchResults);
this.logTelemetry(payload.traceId, 'reflection.complete', {
confidence: state.reflection.confidenceScore,
status: state.reflection.status
});
// Phase 4: Memory Persistence
state.memory.set(payload.taskId, {
plan: state.plan,
results: state.searchResults,
reflection: state.reflection
});
return state;
}
private logTelemetry(traceId: string, event: string, data?: Record<string, unknown>): void {
const entry = { event, timestamp: Date.now(), ...data };
if (!this.telemetry.has(traceId)) {
this.telemetry.set(traceId, []);
}
this.telemetry.get(traceId)!.push(entry);
}
}
Why This Structure Works
- Explicit Contracts: Interfaces enforce type safety across agent boundaries. You can swap
gpt-4o for claude-3-5-sonnet or change the embedding model without breaking the orchestrator.
- Decoupled Reflection: The reflection agent runs after retrieval, not during. This prevents the LLM from optimizing for self-justification instead of factual accuracy.
- Traceable Execution: Every phase emits telemetry. You can reconstruct exactly where a workflow failed, whether the planner misinterpreted intent, or if the retriever returned low-similarity matches.
- State Isolation: The
ExecutionState object is immutable per run. Memory persistence happens at the end, preventing cross-contamination between concurrent requests.
Pitfall Guide
1. Synchronous Agent Blocking
Explanation: Chaining agents with await in a single thread blocks the event loop, causing latency spikes under concurrent load.
Fix: Implement an async message queue (e.g., BullMQ, RabbitMQ) or use Promise.allSettled for independent phases. Reserve await only for strict dependencies.
2. Unbounded Reflection Loops
Explanation: If the reflection agent returns status: 'review' and triggers a retry, you can create infinite loops that exhaust API quotas.
Fix: Implement a maximum retry counter (typically 2-3). After threshold, force-fail the workflow and route to human review or fallback logic.
3. Missing State Serialization
Explanation: Storing execution state in memory works locally but fails in distributed deployments or after process restarts.
Fix: Serialize ExecutionState to Redis or PostgreSQL after each phase. Use structured JSON schemas with versioning to handle schema drift.
4. Telemetry Overhead
Explanation: Logging every internal variable creates massive payloads that degrade performance and inflate storage costs.
Fix: Log only phase boundaries, confidence scores, error codes, and latency metrics. Use sampling for high-frequency events. Strip PII before persistence.
5. Hallucination in Planning Phase
Explanation: The planner may generate steps that are logically sound but technically impossible given available tools or data.
Fix: Constrain the planner with a tool registry schema. Validate generated steps against available agent capabilities before execution. Use structured output modes (response_format: 'json_object').
6. Ignoring Embedding Drift
Explanation: Candidate data changes over time, but embeddings are static. Stale vectors cause semantic mismatch and degraded retrieval accuracy.
Fix: Implement incremental embedding updates. Schedule nightly re-indexing for dynamic datasets. Use versioned embedding models and track drift metrics.
7. Tight Coupling via Global State
Explanation: Sharing mutable objects across agents leads to race conditions and unpredictable behavior under load.
Fix: Pass immutable payloads between agents. Use dependency injection to provide read-only configuration. Never mutate shared state during execution.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Low-volume, high-accuracy workflows | Multi-agent with reflection loop | Ensures quality control and traceability | Higher API costs due to multiple calls |
| High-volume, latency-sensitive tasks | Single-prompt with tool calling | Reduces orchestration overhead and round trips | Lower accuracy on complex multi-step tasks |
| Dynamic datasets with frequent updates | Vector search + incremental re-embedding | Maintains semantic relevance without full re-index | Moderate storage and compute costs |
| Static datasets with strict compliance | Keyword search + rule-based filtering | Eliminates LLM hallucination risk and ensures auditability | Lower API costs, higher maintenance overhead |
Configuration Template
// config/runtime.config.ts
export const agentConfig = {
openai: {
apiKey: process.env.OPENAI_API_KEY!,
models: {
planner: 'gpt-4o',
reflector: 'gpt-4o',
embedding: 'text-embedding-3-large'
},
defaults: {
temperature: 0.2,
maxTokens: 1024,
responseFormat: 'json_object'
}
},
orchestration: {
maxRetries: 2,
reflectionThreshold: 0.75,
telemetry: {
enabled: true,
retentionDays: 30,
sampleRate: 1.0
}
},
storage: {
type: 'redis',
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
keyPrefix: 'agent_runtime:'
}
};
Quick Start Guide
- Initialize the runtime: Install dependencies (
npm i openai redis) and set environment variables for OPENAI_API_KEY and REDIS_HOST.
- Define your candidate pool: Prepare a string array of searchable items. Generate embeddings using
text-embedding-3-large and store them in your vector store or in-memory cache for testing.
- Instantiate the orchestrator: Import
AgentOrchestrator, pass your configuration, and call execute() with a structured payload containing taskId, input, and traceId.
- Monitor telemetry: Subscribe to the telemetry map or pipe logs to your observability platform. Track
planner.complete, search.complete, and reflection.complete events to validate workflow health.
- Iterate with reflection: Adjust
reflectionThreshold and maxRetries based on production metrics. If confidence scores consistently drop below threshold, refine the planner prompt or improve candidate data quality.