chema drift errors from propagating. We define interfaces for source data and sink operations.
// types.ts
export interface PipelineRecord<T> {
idempotencyKey: string;
payload: T;
metadata: {
sourceTimestamp: Date;
attempt: number;
};
}
export interface PipelineMetrics {
recordsProcessed: number;
errors: number;
latencyMs: number;
}
2. Implement the Idempotent Sink
The sink must handle duplicates. We use a INSERT ... ON CONFLICT strategy for SQL backends or conditional writes for NoSQL.
// sinks/IdempotentPostgresSink.ts
import { Pool, QueryResult } from 'pg';
export class IdempotentPostgresSink<T extends Record<string, any>> {
constructor(private pool: Pool, private tableName: string) {}
async write(records: PipelineRecord<T>[]): Promise<void> {
if (records.length === 0) return;
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// Batch insert with idempotency constraint
// Assumes table has a unique constraint on 'idempotency_key'
const query = `
INSERT INTO ${this.tableName} (idempotency_key, data, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (idempotency_key) DO UPDATE SET
data = EXCLUDED.data,
updated_at = NOW()
`;
for (const record of records) {
await client.query(query, [
record.idempotencyKey,
JSON.stringify(record.payload)
]);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error; // Caller handles retry/DLQ
} finally {
client.release();
}
}
}
3. Build the Resilient Engine with Retry and DLQ
The engine orchestrates extraction, transformation, and loading. It includes exponential backoff retry logic and a Dead Letter Queue for poison pills.
// engine/PipelineEngine.ts
import { PipelineRecord, PipelineMetrics } from '../types';
import { IdempotentPostgresSink } from '../sinks/IdempotentPostgresSink';
export class PipelineEngine<T> {
private metrics: PipelineMetrics = { recordsProcessed: 0, errors: 0, latencyMs: 0 };
private dlq: PipelineRecord<T>[] = [];
constructor(
private sink: IdempotentPostgresSink<T>,
private maxRetries: number = 3,
private baseDelayMs: number = 1000
) {}
async processBatch(records: PipelineRecord<T>[]): Promise<void> {
const startTime = Date.now();
const failedRecords: PipelineRecord<T>[] = [];
for (const record of records) {
try {
await this.executeWithRetry(record);
this.metrics.recordsProcessed++;
} catch (error) {
this.metrics.errors++;
record.metadata.attempt++;
if (record.metadata.attempt < this.maxRetries) {
// Re-queue for retry (simplified: push to front of batch)
records.unshift(record);
} else {
// Send to DLQ
this.dlq.push(record);
console.error(`Record ${record.idempotencyKey} moved to DLQ after ${this.maxRetries} attempts`, error);
}
}
}
this.metrics.latencyMs = Date.now() - startTime;
this.emitMetrics();
}
private async executeWithRetry(record: PipelineRecord<T>): Promise<void> {
let attempt = 0;
while (attempt <= this.maxRetries) {
try {
await this.sink.write([record]);
return; // Success
} catch (error: any) {
attempt++;
if (attempt > this.maxRetries) throw error;
// Exponential backoff with jitter
const delay = this.baseDelayMs * Math.pow(2, attempt) * (0.5 + Math.random());
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
private emitMetrics(): void {
// Integration with Prometheus/Datadog
console.log(`Pipeline Metrics: ${JSON.stringify(this.metrics)}`);
}
getDeadLetterQueue(): PipelineRecord<T>[] {
return this.dlq;
}
}
4. Composition and Usage
The engine is consumed by a runner that handles extraction and transformation.
// runner.ts
import { Pool } from 'pg';
import { IdempotentPostgresSink } from './sinks/IdempotentPostgresSink';
import { PipelineEngine, PipelineRecord } from './engine/PipelineEngine';
async function main() {
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const sink = new IdempotentPostgresSink(pool, 'events');
const engine = new PipelineEngine(sink, 3, 1000);
// Mock extraction
const rawRecords = [
{ id: 'evt-001', data: { type: 'click', ts: Date.now() } },
{ id: 'evt-002', data: { type: 'view', ts: Date.now() } },
];
const pipelineRecords: PipelineRecord<any>[] = rawRecords.map(r => ({
idempotencyKey: `raw:${r.id}`, // Deterministic key
payload: r.data,
metadata: { sourceTimestamp: new Date(), attempt: 0 }
}));
await engine.processBatch(pipelineRecords);
// Handle DLQ records asynchronously
const dlq = engine.getDeadLetterQueue();
if (dlq.length > 0) {
console.warn(`DLQ contains ${dlq.length} records. Alerting ops team.`);
// Trigger alert or store in S3 for later analysis
}
await pool.end();
}
main().catch(console.error);
Rationale:
- Idempotency Key Strategy: Keys are derived from source IDs (
raw:${id}), ensuring that retries of the same record map to the same database row.
- Transaction Management: The sink uses transactions to ensure atomic writes per batch, reducing partial write states.
- Backoff with Jitter: Prevents thundering herd issues on the database during recovery.
- DLQ Separation: Poison pills do not block the pipeline; they are quarantined for inspection.
Pitfall Guide
1. Missing Idempotency Guarantees
- Mistake: Implementing retries without ensuring the sink operation is idempotent.
- Consequence: Retries create duplicate records. Data integrity is compromised, and downstream analytics yield inflated metrics.
- Fix: Every sink must support upserts or conditional writes based on a deterministic key. Never rely on "best effort" inserts in a retry loop.
2. Silent Schema Drift
- Mistake: Assuming source schemas remain static. New fields are added, or types change without updating the pipeline.
- Consequence: Pipeline crashes or silently drops data. In TypeScript, this may be caught at compile time if types are strict, but runtime data often violates contracts.
- Fix: Implement runtime schema validation (e.g., Zod or Joi) at the ingestion boundary. Route records failing validation to a DLQ with metadata indicating the schema violation.
3. Blocking I/O in Single-Threaded Environments
- Mistake: Using synchronous blocking calls or heavy computation in Node.js without offloading to worker threads.
- Consequence: The event loop blocks, causing the pipeline to stall. Latency spikes, and backpressure causes memory accumulation.
- Fix: Use
worker_threads for CPU-intensive transformations. Ensure all I/O is non-blocking. Implement stream processing for large datasets to avoid loading everything into memory.
4. Lack of Backpressure Handling
- Mistake: Fetching data as fast as possible regardless of sink capacity.
- Consequence: The sink becomes overwhelmed, leading to connection pool exhaustion, timeouts, and cascading failures.
- Fix: Implement flow control. If the sink returns errors or high latency, throttle the extraction rate. Use bounded queues with capacity limits.
5. Treating Logs as Metrics
- Mistake: Relying on log aggregation to detect pipeline health.
- Consequence: Logs are verbose and expensive to query. You cannot set alerts on "records processed per minute" effectively using logs alone.
- Fix: Emit structured metrics (Prometheus counters/gauges) for throughput, error rates, and latency. Logs should contain correlation IDs for tracing, but metrics drive alerting.
6. No Dead Letter Queue Strategy
- Mistake: Failing the entire batch when a single record is malformed.
- Consequence: One bad record halts processing of valid data. This is known as the "poison pill" problem.
- Fix: Isolate record-level errors. If a record fails validation or sink write after max retries, move it to a DLQ and continue processing the batch.
7. Hardcoded Configuration and Secrets
- Mistake: Embedding database URLs, API keys, or thresholds in code.
- Consequence: Security vulnerabilities and inability to rotate secrets or adjust behavior without code redeployment.
- Fix: Use environment variables with typed configuration loaders. Validate config on startup. Support hot-reloading for thresholds where appropriate.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| High Volume, Low Latency | Stream Processing (Kafka/Kinesis + Flink/Node Workers) | Handles backpressure natively; sub-second latency; scalable. | High infrastructure cost; complex ops. |
| Batch Analytics, Daily Runs | Orchestrated Batch (Airflow/Dagster + TS Workers) | Simplicity; easy debugging; cost-effective for non-real-time. | Low infra cost; higher latency. |
| Microservices Data Sync | CDC + Message Queue (Debezium + Kafka) | Real-time sync; decouples services; handles schema evolution. | Medium cost; requires CDC setup. |
| Ad-Hoc Data Migration | Idempotent Script + DLQ | Fast implementation; safe for one-off loads. | Low cost; manual effort. |
| Unreliable Source API | Pull-based with Aggressive Retry + DLQ | Resilience against source downtime; rate limit handling. | Medium compute cost due to retries. |
Configuration Template
pipeline.config.ts
import { z } from 'zod';
const PipelineConfigSchema = z.object({
database: z.object({
url: z.string().url(),
poolSize: z.number().int().min(1).max(50).default(10),
}),
pipeline: z.object({
batchSize: z.number().int().min(1).max(1000).default(100),
maxRetries: z.number().int().min(0).max(5).default(3),
baseRetryDelayMs: z.number().int().min(100).default(1000),
dlqBucket: z.string().optional(), // S3 bucket for DLQ persistence
}),
metrics: z.object({
enabled: z.boolean().default(true),
port: z.number().int().default(9090),
}),
});
export type PipelineConfig = z.infer<typeof PipelineConfigSchema>;
export function loadConfig(): PipelineConfig {
return PipelineConfigSchema.parse({
database: {
url: process.env.DATABASE_URL,
poolSize: parseInt(process.env.DB_POOL_SIZE || '10', 10),
},
pipeline: {
batchSize: parseInt(process.env.PIPELINE_BATCH_SIZE || '100', 10),
maxRetries: parseInt(process.env.PIPELINE_MAX_RETRIES || '3', 10),
baseRetryDelayMs: parseInt(process.env.PIPELINE_RETRY_DELAY || '1000', 10),
dlqBucket: process.env.DLQ_BUCKET,
},
metrics: {
enabled: process.env.METRICS_ENABLED !== 'false',
port: parseInt(process.env.METRICS_PORT || '9090', 10),
},
});
}
Quick Start Guide
- Initialize Project:
mkdir data-pipeline && cd data-pipeline
npm init -y
npm install typescript ts-node pg zod
npx tsc --init
- Add Configuration:
Copy the
pipeline.config.ts template and create .env file with DATABASE_URL and other variables.
- Implement Sink:
Create
sinks/IdempotentPostgresSink.ts using the code from Core Solution. Ensure your database table has a UNIQUE constraint on idempotency_key.
- Run Pipeline:
ts-node runner.ts
Monitor console output for metrics and DLQ alerts. Verify data in the database to confirm idempotency by running the script twice with the same input.