ardown of all connected streams.
import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
async function compressAndArchive(sourcePath: string, destPath: string): Promise<void> {
await pipeline(
createReadStream(sourcePath),
createGzip(),
createWriteStream(destPath)
);
}
Rationale: pipeline enforces a fail-fast contract. Any stream error rejects the promise immediately, and the runtime calls .destroy() on all participants. This eliminates resource leaks and simplifies error handling to a single try/catch block.
2. Backpressure Management
Backpressure occurs when a producer generates data faster than a consumer can process it. Without regulation, internal buffers overflow, memory spikes, and the event loop stalls.
Node.js streams natively handle backpressure when connected via pipeline or .pipe(). The writable stream signals the readable stream to pause by returning false from .write(). The readable stream resumes only when the writable stream emits a drain event.
For custom writable implementations, respect the return value:
import { Writable } from 'stream';
const batchWriter = new Writable({
highWaterMark: 16, // Buffer up to 16 objects
write(chunk, encoding, callback) {
const success = this._internalBuffer.push(chunk);
if (this._internalBuffer.length >= this.writableHighWaterMark) {
// Signal upstream to pause
callback();
} else {
callback();
}
}
});
Rationale: Tuning highWaterMark balances memory usage against throughput. Lower values reduce memory footprint but increase context switching. Higher values improve throughput but risk OOM under burst conditions. Default to 64KB for binary data, 16-32 for object streams.
Transform streams bridge transport and business logic. They read chunks, mutate them, and push results downstream. For structured data, enable objectMode to pass JavaScript objects instead of raw Buffers.
import { Transform, TransformCallback } from 'stream';
interface LogEntry {
timestamp: string;
level: string;
message: string;
}
class JsonLineParser extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(rawChunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
const lines = rawChunk.toString('utf8').split('\n').filter(Boolean);
for (const line of lines) {
try {
const parsed = JSON.parse(line) as LogEntry;
this.push(parsed);
} catch (err) {
// Emit malformed lines to stderr without breaking the pipeline
process.stderr.write(`[PARSE_ERROR] ${line}\n`);
}
}
callback();
}
}
Rationale: objectMode decouples parsing from I/O, allowing downstream transforms to work with typed data. However, it disables binary chunking optimizations. Use it only when the payload structure justifies the overhead. For pure binary transformations (compression, encryption, hashing), keep objectMode: false.
Pitfall Guide
| Pitfall | Explanation | Fix |
|---|
| Silent Stream Termination | Unhandled error events on streams cause uncaught exceptions that crash the process. Streams do not propagate errors to async/await unless wrapped in pipeline or explicitly listened to. | Always use stream/promises.pipeline. If using event emitters, attach .on('error', handler) to every stream in the chain. |
| Backpressure Blindness | Calling .write() in a tight loop without checking its return value ignores the internal buffer limit. The buffer grows until memory exhaustion. | Check .write() return value. If false, wait for the drain event before writing more. Prefer pipeline which handles this automatically. |
| Event/Promise API Collision | Mixing .on('data') with await pipeline() on the same stream causes race conditions. The event listener consumes chunks before the pipeline can process them, leading to missing data or double-processing. | Choose one paradigm per stream. Use pipeline for composition, or for await...of with Readable/readline for iteration. Never combine them. |
| Buffer Encoding Neglect | Streams emit Buffer objects by default. Calling .toString() without specifying encoding or assuming UTF-8 can corrupt binary data or throw on invalid byte sequences. | Call .setEncoding('utf8') on readable streams, or explicitly pass encoding to .toString('utf8'). Validate byte boundaries for multi-byte characters. |
| Resource Leakage on Early Exit | Aborting a stream mid-processing without calling .destroy() leaves file descriptors, TCP sockets, or database cursors open. The OS eventually hits EMFILE limits. | Wrap stream operations in try/finally blocks. Call .destroy() on all streams in the finally clause, or rely on pipeline which guarantees cleanup. |
| CPU-Bound Transform Blocking | Heavy computation inside _transform blocks the event loop, stalling I/O and causing backpressure to cascade upstream. The stream appears "frozen" despite active data flow. | Offload CPU-heavy work to worker_threads or child_process. Use setImmediate or queueMicrotask to yield control periodically. Keep _transform synchronous and lightweight. |
| ObjectMode Overuse | Passing large JavaScript objects through objectMode streams defeats chunking benefits. Each object is fully materialized in memory before transformation, negating streaming advantages. | Stream raw bytes or delimited text. Parse and transform incrementally. Only use objectMode for small, fixed-size records or metadata routing. |
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Log aggregation & analysis | readline + Transform pipeline | Line-delimited parsing avoids full-file buffering; enables real-time filtering | Reduces memory by 95%, allows smaller EC2/GCP instances |
| Paginated API ingestion | Readable generator + pipeline | Fetches pages on-demand, respects rate limits, prevents heap growth | Lowers egress costs by processing data before storage |
| Large file upload/download | pipeline with highWaterMark tuning | Native backpressure prevents OOM; configurable buffer balances speed/memory | Optimizes network throughput without scaling compute |
| Real-time binary transformation | Transform (binary mode) + pipeline | Zero-copy chunk processing; avoids object allocation overhead | Maximizes CPU efficiency, reduces GC pauses |
| In-memory data enrichment | objectMode Transform + worker_threads | Decouples I/O from CPU-heavy enrichment; prevents event loop blocking | Increases latency slightly, prevents cascading timeouts |
Configuration Template
// stream-engine.ts
import { pipeline, Transform, TransformCallback } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { Logger } from './logger';
export interface StreamConfig {
highWaterMark?: number;
objectMode?: boolean;
timeoutMs?: number;
}
export class StreamEngine {
private readonly config: Required<StreamConfig>;
constructor(config: StreamConfig = {}) {
this.config = {
highWaterMark: config.highWaterMark ?? 65536,
objectMode: config.objectMode ?? false,
timeoutMs: config.timeoutMs ?? 30000,
};
}
async execute<T>(
source: NodeJS.ReadableStream,
transforms: Transform[],
sink: NodeJS.WritableStream,
logger: Logger
): Promise<void> {
const timeoutHandle = setTimeout(() => {
logger.warn('Stream pipeline timeout exceeded, forcing teardown');
source.destroy(new Error('Pipeline timeout'));
}, this.config.timeoutMs);
try {
await pipeline(source, ...transforms, sink);
logger.info('Pipeline completed successfully');
} catch (error) {
logger.error('Pipeline failed', { error: error instanceof Error ? error.message : String(error) });
throw error;
} finally {
clearTimeout(timeoutHandle);
// Ensure deterministic cleanup
source.destroy();
sink.destroy();
transforms.forEach(t => t.destroy());
}
}
}
// Usage example
const engine = new StreamEngine({ highWaterMark: 131072, timeoutMs: 60000 });
const parser = new Transform({
objectMode: true,
transform(chunk, _, cb) {
this.push(JSON.parse(chunk.toString()));
cb();
}
});
await engine.execute(
createReadStream('input.jsonl'),
[parser],
createWriteStream('output.json'),
console
);
Quick Start Guide
- Install dependencies: Ensure Node.js 18+ is installed. No external packages required; streams are native.
- Replace buffered I/O: Locate
fs.readFileSync or await response.json() calls. Swap with createReadStream and pipeline.
- Define transforms: Create
Transform classes for parsing, filtering, or enriching. Set objectMode: true only when passing structured data.
- Wire the pipeline: Pass source, transforms, and sink to
stream/promises.pipeline. Wrap in try/catch for error handling.
- Validate & monitor: Run with
--trace-gc to verify stable memory. Add OpenTelemetry spans to track pipeline duration and backpressure events. Deploy to staging with highWaterMark logging enabled.