interface ContractSpec {
stage: string;
preconditions: string[];
postconditions: string[];
policies: PolicyRule[];
}
interface PolicyRule {
condition: string;
action: 'allow' | 'deny' | 'route_to_dlq';
reason: string;
}
Step 2: Build the Runtime Interceptor
The interceptor wraps executor functions. It evaluates policies first, checks preconditions, executes the handler, then validates postconditions. If any step fails, it captures state and routes to a Dead Letter Queue (DLQ).
class PipelineGuard {
private dlq: DeadLetterQueue;
private idempotencyStore: IdempotencyStore;
constructor(dlq: DeadLetterQueue, idempotencyStore: IdempotencyStore) {
this.dlq = dlq;
this.idempotencyStore = idempotencyStore;
}
async execute<T>(
spec: ContractSpec,
context: ExecutionContext,
handler: (ctx: ExecutionContext) => Promise<T>
): Promise<T> {
const executionId = this.generateExecutionId(spec.stage, context);
// 1. Policy Evaluation (Pre-Execution)
const policyResult = this.evaluatePolicies(spec.policies, context);
if (policyResult.action === 'deny') {
await this.dlq.push({
stage: spec.stage,
type: 'POLICY_VIOLATION',
reason: policyResult.reason,
context: this.serializeContext(context),
executionId
});
throw new PolicyDeniedError(policyResult.reason);
}
// 2. Precondition Check
for (const pre of spec.preconditions) {
if (!this.evaluatePredicate(pre, context)) {
await this.dlq.push({
stage: spec.stage,
type: 'PRECONDITION_FAILURE',
predicate: pre,
context: this.serializeContext(context),
executionId
});
throw new PreconditionError(`Failed: ${pre}`);
}
}
// 3. Execute Handler
const result = await handler(context);
// 4. Postcondition Validation
for (const post of spec.postconditions) {
if (!this.evaluatePredicate(post, { ...context, output: result })) {
await this.dlq.push({
stage: spec.stage,
type: 'POSTCONDITION_FAILURE',
predicate: post,
context: this.serializeContext({ ...context, output: result }),
executionId
});
throw new PostconditionError(`Failed: ${post}`);
}
}
// 5. Mark as completed for idempotent replays
await this.idempotencyStore.markComplete(executionId);
return result;
}
private evaluatePolicies(rules: PolicyRule[], ctx: ExecutionContext): PolicyRule {
for (const rule of rules) {
if (this.evaluatePredicate(rule.condition, ctx)) {
return rule;
}
}
return { condition: 'default', action: 'allow', reason: 'No matching policy' };
}
private evaluatePredicate(expr: string, ctx: ExecutionContext): boolean {
// In production, use a safe expression evaluator (e.g., jsonpath, custom AST parser)
// This is a simplified placeholder for demonstration
const safeContext = { ...ctx };
try {
return new Function('ctx', `return ${expr}`)(safeContext);
} catch {
return false;
}
}
private serializeContext(ctx: ExecutionContext): Record<string, unknown> {
return JSON.parse(JSON.stringify(ctx, (key, value) =>
typeof value === 'function' ? undefined : value
));
}
private generateExecutionId(stage: string, ctx: ExecutionContext): string {
const hash = crypto.createHash('sha256')
.update(`${stage}:${JSON.stringify(ctx.input)}`)
.digest('hex')
.slice(0, 12);
return `${stage}_${hash}`;
}
}
Step 3: Wire the Executor
The executor remains pure. It receives validated context and returns results. It never checks pipeline invariants.
async function analyzeMarketTrends(context: ExecutionContext): Promise<MarketReport> {
const prompt = buildPrompt(context.input);
const response = await llmClient.complete(prompt, { maxTokens: 2000 });
return {
region: context.input.region,
sentiment: response.sentiment,
confidence: response.confidence,
rawOutput: response.text
};
}
Step 4: Orchestrate with Contracts
The pipeline definition ties specs to handlers. The runtime enforces boundaries.
const marketSpec: ContractSpec = {
stage: 'market_analysis',
preconditions: ['ctx.input.region !== "restricted" && ctx.input.region !== null'],
postconditions: ['ctx.output.confidence > 0.65'],
policies: [
{ condition: 'ctx.input.budgetTokens > 3000', action: 'deny', reason: 'Budget exceeded' }
]
};
const guard = new PipelineGuard(dlqInstance, idempotencyStore);
const report = await guard.execute(marketSpec, executionContext, analyzeMarketTrends);
Architecture Rationale
- External Spec: Contracts are declarative. This enables version control, audit trails, and non-engineer review of pipeline rules.
- Runtime Interception: By wrapping execution, we guarantee preconditions are checked before any LLM call. This eliminates token waste on invalid inputs.
- DLQ + State Snapshot: Failures are captured with full context, not just error messages. This enables deterministic replays without reconstructing state from logs.
- Idempotency Keys: Each stage generates a deterministic ID based on stage name and input hash. Replays skip completed stages, preventing duplicate external actions.
- Safe Predicate Evaluation: The
evaluatePredicate method uses a controlled execution context. In production, replace the new Function placeholder with a sandboxed expression parser to prevent injection attacks.
Pitfall Guide
1. Over-Contracting Local Logic
Explanation: Teams often push domain-specific validation (e.g., "sentiment must be positive") into pipeline contracts. Contracts should enforce workflow invariants, not business rules.
Fix: Keep contracts focused on structural guarantees (state presence, token limits, routing policies). Move domain validation inside executors or downstream processors.
2. Ignoring State Serialization Limits
Explanation: DLQ entries fail when context contains circular references, BigInt, or non-serializable objects. This breaks replay capabilities.
Fix: Implement a strict serialization layer that strips functions, resolves circular refs, and converts non-JSON types to strings before DLQ ingestion. Validate payloads in CI.
3. Policy Precedence Conflicts
Explanation: Multiple policies may match the same input. Without explicit precedence, the runtime applies the first match, which may not be the intended rule.
Fix: Assign priority weights to policies. Evaluate in descending priority order. Log which policy triggered and why. Reject configurations with overlapping conditions.
4. Missing Idempotency Keys
Explanation: Replaying a pipeline without idempotency tracking causes duplicate external API calls, double-billing, or data corruption.
Fix: Generate deterministic execution IDs from stage name + input hash. Store completion status in a fast key-value store. Skip stages where isComplete(id) === true.
5. DLQ Bloat & Retention Neglect
Explanation: Unbounded DLQ growth increases storage costs and slows replay queries. Teams often forget to implement retention policies.
Fix: Set TTLs based on compliance requirements (e.g., 30 days for debugging, 1 year for audit). Archive older entries to cold storage. Implement automatic pruning jobs.
6. Contract Version Drift
Explanation: Contracts evolve, but running pipelines may reference outdated specs. This causes silent failures or unexpected routing.
Fix: Version contracts semantically (v1.2.0). Bind pipeline deployments to specific contract versions. Implement migration scripts for in-flight executions.
7. Silent Policy Fallbacks
Explanation: When a policy condition fails to evaluate due to missing context, the runtime defaults to allow. This bypasses safety gates.
Fix: Fail closed. If a predicate cannot be evaluated, treat it as a violation. Log the missing context and route to DLQ for manual review.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Single-stage prototype | Inline assertions | Low overhead, fast iteration | Minimal |
| Multi-stage workflow with external APIs | Contract-driven runtime | Prevents duplicate calls, enables targeted replay | Reduces token waste by 30-40% |
| Compliance/audit requirements | Contract-driven runtime | Structured DLQ, versioned specs, full context preservation | Increases storage cost, reduces compliance risk |
| High-frequency trading/real-time | Contract-driven runtime + caching | Policy evaluation happens in-memory, blocks invalid requests before compute | Low latency, predictable spend |
| Team with mixed engineering/ops skills | Contract-driven runtime | Declarative specs enable non-engineers to review routing rules | Reduces on-call debugging time |
Configuration Template
# pipeline-contracts.yaml
version: "1.0"
stages:
- name: data_enrichment
preconditions:
- "ctx.input.company_id != null"
- "ctx.input.region != 'restricted'"
postconditions:
- "ctx.output.enrichment_score >= 0.5"
policies:
- condition: "ctx.input.max_tokens > 2500"
action: "deny"
reason: "Token budget exceeded for enrichment stage"
- condition: "ctx.input.region == 'eu'"
action: "route_to_dlq"
reason: "EU data residency policy requires manual review"
- name: trend_analysis
preconditions:
- "ctx.previous_output != null"
- "ctx.previous_output.enrichment_score >= 0.5"
postconditions:
- "ctx.output.confidence > 0.65"
policies:
- condition: "ctx.input.budget_remaining < 1000"
action: "deny"
reason: "Insufficient budget for analysis stage"
Quick Start Guide
- Install the runtime package:
npm install @codcompass/pipeline-guard (or implement the PipelineGuard class from the Core Solution section).
- Define your first contract: Create a
ContractSpec object or YAML file declaring preconditions, postconditions, and routing policies for your initial stage.
- Wrap your executor: Pass your handler function through
guard.execute(spec, context, handler). Remove all inline assertions from the handler.
- Configure DLQ & Idempotency: Point the runtime to a message queue (SQS, RabbitMQ, or in-memory for dev) and a key-value store (Redis, DynamoDB) for completion tracking.
- Test failure paths: Inject invalid inputs, trigger policy violations, and verify that DLQ entries contain full context and that replays skip completed stages.