ols: string[];
systemPrompt: string;
}
interface SupervisorOrchestrator {
routeTask(task: string, specialists: SpecialistConfig[]): Promise<Record<string, unknown>>;
}
class TaskSupervisor implements SupervisorOrchestrator {
private routingModel = 'haiku'; // $0.0003β$0.001 per classification
private idempotencyKeys = new Map<string, string>();
async routeTask(task: string, specialists: SpecialistConfig[]): Promise<Record<string, unknown>> {
const sessionId = uuidv4();
const routingDecision = await this.classifyIntent(task);
const target = specialists.find(s => s.id === routingDecision) || specialists[0];
// Execute in isolated context with session-scoped state
const result = await this.executeSpecialist(target, task, sessionId);
this.idempotencyKeys.set(sessionId, routingDecision);
return { sessionId, specialist: target.id, output: result };
}
private async classifyIntent(task: string): Promise<string> {
// Lightweight routing via Haiku or rule-based classifier
// Avoids wasting high-cost tokens on dispatch logic
return 'research-specialist'; // Placeholder for actual LLM call
}
private async executeSpecialist(config: SpecialistConfig, task: string, sessionId: string): Promise<unknown> {
// Isolated execution with distributed lock to prevent concurrent mutation
const lockKey = agent:lock:${sessionId};
await this.acquireDistributedLock(lockKey, 30000);
try {
return await this.runAgentLoop(config, task);
} finally {
await this.releaseDistributedLock(lockKey);
}
}
}
**Architecture Rationale:** Routing uses a low-cost model (`haiku`) to preserve budget. Worker execution is wrapped in distributed locks to prevent TOCTOU races. Session IDs enable idempotent retries and traceability.
### 2. Sequential Pipeline
Linear stages require explicit schema validation between steps to prevent contamination propagation.
```typescript
interface PipelineStage<TInput, TOutput> {
name: string;
validate(input: TInput): boolean;
execute(input: TInput): Promise<TOutput>;
}
class ExecutionPipeline {
private stages: PipelineStage<any, any>[] = [];
addStage<TIn, TOut>(stage: PipelineStage<TIn, TOut>) {
this.stages.push(stage);
return this;
}
async run(initialInput: any): Promise<any> {
let current = initialInput;
for (const stage of this.stages) {
if (!stage.validate(current)) {
throw new Error(`Validation failed at stage: ${stage.name}`);
}
current = await stage.execute(current);
}
return current;
}
}
// Usage example with strict contracts
const researchPipeline = new ExecutionPipeline()
.addStage({
name: 'data-extraction',
validate: (input) => typeof input.source === 'string' && input.source.length > 0,
execute: async (input) => ({ claims: await extractClaims(input.source), gaps: [] })
})
.addStage({
name: 'synthesis',
validate: (input) => Array.isArray(input.claims) && input.claims.length > 0,
execute: async (input) => ({ report: await synthesizeReport(input.claims) })
});
Architecture Rationale: Schema validation gates halt execution before corrupted data propagates. Each stage declares explicit input/output contracts, enabling parallel development and independent evaluation.
3. Fan-Out Aggregation
Parallel dispatch requires explicit partial-failure policies. Silent degradation produces authoritative-looking but incomplete outputs.
interface AggregationPolicy {
onPartialFailure: 'fail-all' | 'return-best' | 'retry-failed';
timeoutMs: number;
}
class ParallelDispatcher {
private policy: AggregationPolicy;
constructor(policy: AggregationPolicy) {
this.policy = policy;
}
async dispatchAndAggregate(tasks: Array<{ id: string; payload: any }>): Promise<any[]> {
const promises = tasks.map(t => this.executeTask(t));
const results = await Promise.allSettled(promises);
const fulfilled = results.filter(r => r.status === 'fulfilled').map(r => (r as PromiseFulfilledResult<any>).value);
const rejected = results.filter(r => r.status === 'rejected').map(r => (r as PromiseRejectedResult).reason);
if (rejected.length > 0) {
switch (this.policy.onPartialFailure) {
case 'fail-all': throw new Error(`Partial failure detected: ${rejected.length} tasks failed`);
case 'return-best': return fulfilled;
case 'retry-failed': return [...fulfilled, ...(await this.retryFailed(rejected))];
}
}
return fulfilled;
}
private async executeTask(task: { id: string; payload: any }): Promise<any> {
// Isolated execution with circuit breaker
return await this.withCircuitBreaker(() => this.runAgent(task.payload));
}
}
Architecture Rationale: Promise.allSettled prevents one failure from aborting the entire batch. Explicit aggregation policies force architectural decisions about degradation behavior. Circuit breakers prevent retry amplification.
4. Multi-Perspective Debate
Redundant execution with consensus voting reduces hallucination at a predictable cost premium.
interface ConsensusEngine {
vote(results: any[]): string;
median(results: number[]): number;
}
class DebateOrchestrator implements ConsensusEngine {
private variants = ['thorough', 'concise', 'skeptical'];
async classifyWithConsensus(document: string, candidates: string[]): Promise<string> {
const executions = this.variants.map(v => this.runClassification(document, candidates, v));
const results = await Promise.all(executions);
return this.vote(results.map(r => r.classification));
}
vote(classifications: string[]): string {
const tally = classifications.reduce((acc, cls) => {
acc[cls] = (acc[cls] || 0) + 1;
return acc;
}, {} as Record<string, number>);
return Object.entries(tally).sort((a, b) => b[1] - a[1])[0][0];
}
median(numbers: number[]): number {
const sorted = [...numbers].sort((a, b) => a - b);
const mid = Math.floor(sorted.length / 2);
return sorted.length % 2 !== 0 ? sorted[mid] : (sorted[mid - 1] + sorted[mid]) / 2;
}
}
Architecture Rationale: Majority voting filters stochastic errors. Median aggregation handles numerical extraction outliers. Cost scales predictably (2.5β3.2Γ), making it suitable for high-stakes classification where accuracy outweighs latency.
5. Large-Scale Swarm
Swarm coordination requires event-driven state machines and shared message buses to avoid polling overhead.
import { EventEmitter } from 'events';
interface SwarmNode {
id: string;
state: 'idle' | 'processing' | 'completed';
findings: Record<string, any>;
}
class SwarmCoordinator extends EventEmitter {
private nodes = new Map<string, SwarmNode>();
private sharedState = new Map<string, any>();
registerNode(node: SwarmNode) {
this.nodes.set(node.id, node);
this.on(`node:${node.id}:update`, (data) => this.mergeFindings(node.id, data));
}
async coordinate(tasks: string[]) {
const workQueue = [...tasks];
while (workQueue.length > 0) {
const idleNode = Array.from(this.nodes.values()).find(n => n.state === 'idle');
if (!idleNode) {
await this.waitForNodeUpdate();
continue;
}
idleNode.state = 'processing';
const task = workQueue.shift()!;
this.emit(`node:${idleNode.id}:assign`, { task, context: this.sharedState });
}
}
private mergeFindings(nodeId: string, data: any) {
const node = this.nodes.get(nodeId);
if (node) {
node.findings = { ...node.findings, ...data };
node.state = 'completed';
// Update shared state with conflict resolution
this.sharedState.set(`contrib:${nodeId}`, data);
}
}
}
Architecture Rationale: Event-driven coordination replaces polling, reducing latency and CPU overhead. Shared state uses explicit merge strategies to prevent concurrent mutation. Debugging requires distributed tracing and node health probes.
Pitfall Guide
| Pitfall | Explanation | Production Fix |
|---|
| Unbounded Retry Stacking | Independent retry layers compound failures exponentially. A 3-layer stack with 3 retries each creates 27 calls per error. | Implement circuit breakers at the orchestration layer. Use exponential backoff with jitter. Cap total retries globally, not per layer. |
| Implicit State Leakage | Shared caches or global variables bleed context across sessions. Subsequent requests reason over foreign data. | Enforce session-scoped isolation. Prefix all cache keys with session_id. Use Redis namespaces or in-memory maps keyed by UUID. |
| Silent Partial Aggregation | Fan-out patterns return incomplete results without signaling degradation. Outputs appear authoritative but miss critical branches. | Define explicit aggregation policies (fail-all, return-best, retry-failed). Add metadata flags indicating partial completion. |
| Supervisor Bottleneck | Central routing becomes a single point of failure. High concurrency saturates the dispatcher, increasing p95 latency. | Shard supervisors by workload type. Implement health checks and automatic failover. Use async message queues (SQS, RabbitMQ) for decoupling. |
| Pipeline Contamination | Early-stage hallucinations propagate through linear stages, amplifying errors. | Insert schema validation gates between stages. Implement rollback mechanisms. Log intermediate outputs for audit trails. |
| Swarm Coordination Overhead | Polling-based state synchronization consumes resources and introduces race conditions. | Switch to event-driven architectures. Use Redis Pub/Sub or Kafka for state updates. Implement distributed locks for shared mutations. |
| Missing Idempotency Keys | Retries or duplicate dispatches create duplicate records or spurious webhooks. | Generate UUID-based idempotency keys per task. Store execution state in a durable ledger. Reject duplicate requests before agent invocation. |
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| General-purpose automation with <6 sub-tasks | Supervisor + Specialists | Built-in fault containment, predictable latency, easy routing | Baseline (1.0β1.2x) |
| Linear data transformation or content generation | Sequential Pipeline | Deterministic flow, easy stage evaluation, low overhead | Moderate (1.0β1.5x) |
| Multi-source research or parallel code review | Fan-Out Aggregation | Bounded latency, independent sub-tasks, scalable | Linear (1.0βNΓ) |
| Legal, financial, or medical classification | Multi-Perspective Debate | Outlier rejection, 99.1% accuracy, stochastic error filtering | High (2.5β3.2x) |
| Complex research requiring 50β300+ agents | Large-Scale Swarm | Diverse perspectives, dynamic coordination, high throughput | Variable (1.5β4.0x) |
Configuration Template
// orchestrator.config.ts
import { TaskSupervisor } from './supervisor';
import { ExecutionPipeline } from './pipeline';
import { ParallelDispatcher } from './fanout';
import { DebateOrchestrator } from './debate';
import { SwarmCoordinator } from './swarm';
export const AgentTopologyConfig = {
supervisor: {
routingModel: 'haiku',
maxConcurrentWorkers: 50,
sessionIsolation: true,
idempotencyEnabled: true,
fallbackStrategy: 'escalate-to-human'
},
pipeline: {
validationStrictness: 'strict',
rollbackOnFailure: true,
intermediateLogging: true,
stageTimeoutMs: 15000
},
fanout: {
aggregationPolicy: 'return-best',
partialFailureThreshold: 0.2,
circuitBreakerThreshold: 5,
retryBackoffMs: [1000, 2000, 5000]
},
debate: {
variantCount: 3,
consensusMethod: 'majority-vote',
numericalAggregation: 'median',
costCapMultiplier: 3.5
},
swarm: {
coordinationMode: 'event-driven',
sharedStateBackend: 'redis',
nodeHealthCheckIntervalMs: 5000,
maxNodeCount: 300
}
};
export function instantiateTopology(type: keyof typeof AgentTopologyConfig) {
const config = AgentTopologyConfig[type];
switch (type) {
case 'supervisor': return new TaskSupervisor(config);
case 'pipeline': return new ExecutionPipeline(config);
case 'fanout': return new ParallelDispatcher(config);
case 'debate': return new DebateOrchestrator(config);
case 'swarm': return new SwarmCoordinator(config);
default: throw new Error('Unknown topology');
}
}
Quick Start Guide
- Define your failure budget: Determine acceptable latency, cost multiplier, and accuracy threshold. This dictates topology selection before writing code.
- Initialize session isolation: Generate a
session_id per request. Prefix all cache keys, database writes, and temporary files with this identifier to prevent state leakage.
- Select and instantiate the topology: Use the configuration template to create the appropriate orchestrator. Pass explicit aggregation policies, retry caps, and validation rules.
- Deploy with observability: Attach
trace_id and session_id to every agent invocation. Enable distributed tracing, circuit breaker metrics, and cost monitoring dashboards.
- Validate under concurrency: Run load tests with 50β100 concurrent sessions. Verify that retry amplification is capped, partial failures are handled explicitly, and shared state remains consistent.