des stream/promises.pipeline() to solve this. It returns a promise, automatically destroys all streams on error, and propagates failures to a single catch block.
import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
async function archiveDataset(sourcePath: string, targetPath: string): Promise<void> {
const source = createReadStream(sourcePath);
const compressor = createGzip({ level: 6 });
const sink = createWriteStream(targetPath);
await pipeline(source, compressor, sink);
}
Why this choice: pipeline() enforces a unidirectional flow with guaranteed cleanup. It eliminates the need for manual .on('error') listeners on every stage and prevents descriptor leaks during partial failures.
Custom transforms should never block the event loop. Heavy computation inside _transform stalls the entire pipeline. Instead, transforms should parse, normalize, or route data, while heavy work is offloaded or batched.
import { Transform, TransformCallback } from 'stream';
interface TelemetryRecord {
timestamp: number;
metric: string;
value: number;
source: string;
}
export class TelemetryNormalizer extends Transform {
private buffer: string = '';
constructor() {
super({ objectMode: true, highWaterMark: 1024 });
}
_transform(rawChunk: Buffer, _encoding: string, callback: TransformCallback): void {
this.buffer += rawChunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop() || '';
for (const line of lines) {
if (!line.trim()) continue;
try {
const parsed = JSON.parse(line) as TelemetryRecord;
const normalized: TelemetryRecord = {
timestamp: parsed.timestamp * 1000,
metric: parsed.metric.toLowerCase(),
value: Number(parsed.value),
source: parsed.source.trim()
};
this.push(normalized);
} catch {
// Skip malformed lines without breaking the pipeline
}
}
callback();
}
}
Why this choice: objectMode: true decouples the transport layer (buffers) from the business logic layer (JS objects). The highWaterMark is explicitly set to 1024 objects to prevent memory accumulation when downstream consumers are slow. Malformed data is filtered silently to maintain pipeline continuity.
Step 3: Compose the Full Pipeline
Combine source, transforms, and sink into a single executable flow. Use Readable.from() to inject static or generated data when testing, or pair with database cursors for real workloads.
import { pipeline } from 'stream/promises';
import { createReadStream } from 'fs';
import { createWriteStream } from 'fs';
import { TelemetryNormalizer } from './TelemetryNormalizer';
import { MetricsAggregator } from './MetricsAggregator';
export async function runIngestionPipeline(inputPath: string, outputPath: string): Promise<void> {
const reader = createReadStream(inputPath, { encoding: 'utf-8' });
const normalizer = new TelemetryNormalizer();
const aggregator = new MetricsAggregator();
const writer = createWriteStream(outputPath);
await pipeline(reader, normalizer, aggregator, writer);
}
Architecture Rationale: Each stage has a single responsibility. The normalizer handles parsing and validation. The aggregator handles stateful computation. The writer handles persistence. This separation enables independent testing, swapping, and scaling of pipeline stages.
Pitfall Guide
1. Silent Failures with Manual .pipe()
Explanation: Chaining .pipe() without attaching error listeners causes streams to hang on failure. The readable stream stops emitting, the writable never receives end, and the process remains alive with open descriptors.
Fix: Always use pipeline() from stream/promises. If manual piping is unavoidable, attach .on('error', (err) => { stream.destroy(); }) to every stage.
Explanation: CPU-intensive operations inside _transform block the event loop, causing backpressure to propagate upstream and stalling the entire pipeline.
Fix: Keep transforms lightweight. Offload heavy computation to worker threads or batch data into arrays, process asynchronously, then push results. Use setImmediate or process.nextTick to yield control if necessary.
3. Ignoring objectMode Mismatches
Explanation: Piping a buffer stream directly into an object-mode stream (or vice versa) throws ERR_INVALID_ARG_TYPE. The transport layer and business layer must agree on data shape.
Fix: Explicitly set objectMode: true on both sides of the boundary, or insert a serialization/deserialization transform to bridge the gap.
4. Misconfiguring highWaterMark
Explanation: Setting highWaterMark too high causes memory accumulation during backpressure. Setting it too low increases context switching and reduces throughput.
Fix: Use defaults (64KB for buffers, 16 objects for object streams). Tune only after profiling with --trace-gc or clinic.js. Monitor stream.readableHighWaterMark and stream.writableLength in production.
5. Forgetting to Destroy Streams on External Cancellation
Explanation: If a client disconnects or a timeout triggers, the pipeline continues running in the background, consuming CPU and I/O.
Fix: Listen for close or aborted events on the consumer side. Call pipelineStream.destroy(new Error('Cancelled')) to trigger graceful teardown across all stages.
6. Assuming pipeline() Returns Early
Explanation: pipeline() returns a promise that resolves only after the final stream emits finish. Developers sometimes treat it as fire-and-forget, missing completion signals or error states.
Fix: Always await pipeline() or chain .then()/.catch(). Wrap in try/catch for synchronous error boundaries.
7. Mixing Async Iteration with Pipeline Cleanup
Explanation: Using for await...of on a stream bypasses pipeline()'s automatic cleanup. If the loop breaks early, upstream streams remain open.
Fix: When using async iteration, wrap in a try/finally block and call stream.destroy() in finally, or stick to pipeline() for guaranteed lifecycle management.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Payload < 10MB, random access needed | Buffer (fs.readFileSync, Buffer.concat) | Simpler API, no streaming overhead | Low CPU, predictable memory |
| Payload > 10MB or unknown size | Stream Pipeline (pipeline()) | Constant memory, backpressure handling | Low memory, higher I/O scheduling |
| Real-time data processing (logs, metrics) | Async Iterator (for await...of) | Clean syntax, easy integration with async logic | Same as streams, slightly higher GC pressure |
| CPU-heavy transformation per chunk | Worker Threads + Stream Queue | Prevents event loop blocking | Higher CPU cost, better throughput |
| Multi-tenant API proxy | Stream Pipeline with highWaterMark tuning | Isolates memory per connection | Linear scaling with connections |
Configuration Template
import { pipeline } from 'stream/promises';
import { Transform, TransformCallback } from 'stream';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
export interface PipelineConfig {
inputPath: string;
outputPath: string;
compressionLevel?: number;
timeoutMs?: number;
}
export class ValidationFilter extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(data: unknown, _enc: string, cb: TransformCallback): void {
if (typeof data === 'object' && data !== null && 'id' in data) {
this.push(data);
}
cb();
}
}
export async function executeSecurePipeline(config: PipelineConfig): Promise<void> {
const timeout = config.timeoutMs ?? 30000;
const timeoutHandle = setTimeout(() => {
throw new Error('Pipeline execution timed out');
}, timeout);
try {
const source = createReadStream(config.inputPath, { encoding: 'utf-8' });
const validator = new ValidationFilter();
const compressor = createGzip({ level: config.compressionLevel ?? 6 });
const sink = createWriteStream(config.outputPath);
await pipeline(source, validator, compressor, sink);
} finally {
clearTimeout(timeoutHandle);
}
}
Quick Start Guide
- Install dependencies: Ensure Node.js 18+ is active. Streams are built-in; no external packages required.
- Create a transform class: Extend
Transform, set objectMode: true if passing JS objects, and implement _transform(chunk, encoding, callback).
- Wire the pipeline: Import
pipeline from stream/promises. Pass source, transforms, and sink in order. Wrap in try/catch.
- Run and monitor: Execute with
node --trace-gc to verify constant memory usage. Check writableLength and readableFlowing properties if backpressure occurs.
- Iterate: Add logging, metrics, or error boundaries. Swap stages independently to test throughput vs latency trade-offs.