able**: Data source. Emits chunks to consumers. Example: fs.createReadStream, http.IncomingMessage.
- Writable: Data sink. Accepts chunks from producers. Example:
fs.createWriteStream, http.ServerResponse.
- Duplex: Bidirectional. Both readable and writable. Example:
net.Socket, tls.TLSSocket.
- Transform: Duplex with modification. Output is derived from input. Example:
zlib.createGzip, custom parsers.
For log processing, we need a Readable source, a Transform for parsing/filtering, and a Writable destination.
Step 2: Compose with pipeline (Not .pipe())
The legacy .pipe() method chains streams but fails to propagate errors upstream or clean up resources on failure. Modern Node.js provides pipeline from stream/promises, which:
- Returns a Promise that resolves on completion or rejects on error
- Automatically destroys all streams in the chain on failure
- Handles backpressure synchronization internally
- Supports async generators as intermediate stages
Transform streams override _transform to modify data before pushing it downstream. The following example parses space-delimited access logs, extracts relevant fields, and filters for client/server errors:
import { Transform, TransformCallback } from 'node:stream';
interface LogEntry {
timestamp: string;
method: string;
path: string;
statusCode: number;
responseTime: number;
}
export class AccessLogParser extends Transform {
private buffer: string = '';
constructor() {
super({ objectMode: true });
}
_transform(
chunk: Buffer,
_encoding: BufferEncoding,
callback: TransformCallback
): void {
this.buffer += chunk.toString('utf-8');
const lines = this.buffer.split('\n');
// Preserve incomplete line for next chunk
this.buffer = lines.pop() ?? '';
for (const line of lines) {
if (!line.trim()) continue;
const parsed = this.parseLine(line);
if (parsed && parsed.statusCode >= 400) {
this.push(parsed);
}
}
callback();
}
_flush(callback: TransformCallback): void {
if (this.buffer.trim()) {
const parsed = this.parseLine(this.buffer);
if (parsed && parsed.statusCode >= 400) {
this.push(parsed);
}
}
callback();
}
private parseLine(raw: string): LogEntry | null {
const parts = raw.split(' ');
if (parts.length < 7) return null;
const statusCode = parseInt(parts[5], 10);
const responseTime = parseFloat(parts[6]);
if (isNaN(statusCode) || isNaN(responseTime)) return null;
return {
timestamp: parts[0],
method: parts[1],
path: parts[2],
statusCode,
responseTime,
};
}
}
Architecture Rationale:
objectMode: true allows emitting JavaScript objects instead of Buffers. This is critical when downstream consumers expect structured data.
_flush handles trailing data that doesn't end with a newline, preventing data loss.
- Synchronous parsing inside
_transform is acceptable here because string splitting and number conversion are sub-millisecond operations. Blocking the event loop with heavy computation would defeat the async nature of streams.
Step 4: Wire the Pipeline with Error Boundaries
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { AccessLogParser } from './AccessLogParser';
async function aggregateErrorLogs(
inputPath: string,
outputPath: string
): Promise<void> {
const source = createReadStream(inputPath, { encoding: 'utf-8' });
const sink = createWriteStream(outputPath, { flags: 'a' });
const parser = new AccessLogParser();
try {
await pipeline(
source,
parser,
async function* filterAndFormat(source: AsyncIterable<LogEntry>) {
for await (const entry of source) {
yield JSON.stringify(entry) + '\n';
}
},
sink
);
console.log('Pipeline completed successfully');
} catch (err) {
console.error('Stream pipeline failed:', err);
throw err;
}
}
Why this structure works:
- Async generators act as lightweight middleware. They receive objects, transform them, and yield formatted strings without creating additional Transform classes.
pipeline guarantees that if sink fails (e.g., disk full), source and parser are destroyed, releasing file descriptors.
- The
try/catch block captures rejections from any stage, enabling centralized error handling and retry logic.
Pitfall Guide
1. Ignoring Backpressure Signals
Explanation: Fast producers can overwhelm slow consumers, causing internal buffers to fill. If you don't respect write() return values or drain events, memory grows unbounded.
Fix: When writing manually, check stream.write(chunk). If it returns false, wait for the drain event before sending more data. With pipeline, backpressure is handled automatically.
2. Mixing Callback and Promise APIs
Explanation: Using .pipe() alongside async/await or mixing stream with stream/promises creates unpredictable error propagation. Unhandled stream errors crash the Node.js process.
Fix: Standardize on stream/promises for all pipeline composition. Never mix .pipe() chains with await pipeline().
Explanation: _transform runs on the event loop. CPU-intensive operations (regex on large strings, JSON.parse on massive payloads, crypto) block other requests.
Fix: Offload heavy computation to worker threads or use async transforms. Keep _transform lightweight: parsing, filtering, and formatting only.
4. Misconfiguring objectMode
Explanation: Setting objectMode: true on a stream that expects Buffers (or vice versa) causes type mismatches. Downstream consumers may receive [object Object] strings or throw ERR_INVALID_ARG_TYPE.
Fix: Explicitly declare objectMode in constructor options. Validate chunk types in development with assert.ok(Buffer.isBuffer(chunk)) or type guards.
5. Forgetting to Handle error Events
Explanation: Streams are EventEmitter instances. If no listener attaches to error, Node.js throws an unhandled exception and terminates the process.
Fix: Always attach error handlers, or rely on pipeline which forwards errors to the Promise rejection. Never leave a stream without an error boundary.
6. Leaking File Descriptors
Explanation: Creating streams in loops without proper cleanup exhausts OS file descriptors. createReadStream opens a file handle that must be closed.
Fix: Use pipeline for automatic cleanup. If managing streams manually, call stream.destroy() in finally blocks or on process signals (SIGTERM, SIGINT).
7. Tuning highWaterMark Blindly
Explanation: The default highWaterMark is 16 KB for object streams and 64 KB for buffer streams. Increasing it arbitrarily doesn't improve throughput and increases memory pressure.
Fix: Profile actual chunk sizes and consumer latency. Increase highWaterMark only when backpressure causes unnecessary I/O wait times. Monitor memory with process.memoryUsage() during load tests.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Copying files < 50 MB | fs.copyFile or readFileSync | Lower overhead, simpler code | Negligible |
| Processing logs > 1 GB | pipeline + custom Transform | Constant memory, backpressure-safe | +5% CPU for parsing, -90% RAM |
| Real-time HTTP proxy | req.pipe(res) with pipeline | Zero-copy forwarding, low latency | Network-bound, minimal CPU |
| Batch JSON transformation | stream-json + async generator | Avoids JSON.parse heap explosion | +10% CPU, enables TB-scale jobs |
| Multi-file concatenation | Sequential pipeline with { end: false } | Prevents premature stream closure | Identical I/O cost, safer cleanup |
Configuration Template
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { Transform, TransformCallback } from 'node:stream';
import { createGzip, createGunzip } from 'node:zlib';
export interface StreamPipelineConfig {
inputPath: string;
outputPath: string;
compress?: boolean;
highWaterMark?: number;
onError?: (err: Error) => void;
}
export class ProductionPipeline {
private config: StreamPipelineConfig;
constructor(config: StreamPipelineConfig) {
this.config = { compress: false, highWaterMark: 65536, ...config };
}
async execute(): Promise<void> {
const source = createReadStream(this.config.inputPath, {
highWaterMark: this.config.highWaterMark,
});
const sink = createWriteStream(this.config.outputPath, {
highWaterMark: this.config.highWaterMark,
flags: 'w',
});
const stages: any[] = [source];
if (this.config.compress) {
stages.push(createGzip({ level: 6 }));
}
stages.push(sink);
try {
await pipeline(...stages);
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err));
this.config.onError?.(error);
throw error;
} finally {
source.destroy();
sink.destroy();
}
}
}
Quick Start Guide
- Install dependencies:
npm install typescript @types/node (if not already present)
- Create a basic pipeline:
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
await pipeline(
createReadStream('input.dat'),
createWriteStream('output.dat')
);
- Add a transform stage: Insert a custom
Transform class or zlib.createGzip() between source and sink.
- Handle errors: Wrap
await pipeline(...) in try/catch and log failures. Verify file descriptors close on error.
- Validate memory: Run
node --max-old-space-size=256 app.ts with a 2 GB file. Monitor process.memoryUsage().rss—it should remain stable under 50 MB.