nc iteration provides a clean control flow
async function consumeMetrics(source: Readable) {
for await (const chunk of source) {
// Process each 128KB chunk
const lines = chunk.split('\n');
for (const line of lines) {
if (line.includes('CRITICAL')) {
handleAlert(line);
}
}
}
}
### 2. Transformation: Custom Processing Logic
Transform streams modify data as it flows. In object mode, streams handle JavaScript objects instead of buffers. This is essential for parsing structured data like JSON or CSV.
```typescript
import { Transform, TransformCallback } from 'stream';
interface MetricRecord {
timestamp: number;
value: number;
tag: string;
}
// Extracts and normalizes metrics from raw log lines
class MetricExtractor extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(
rawChunk: string,
_encoding: BufferEncoding,
callback: TransformCallback
): void {
try {
const lines = rawChunk.split('\n').filter(Boolean);
for (const line of lines) {
const match = line.match(/^(\d+):(\d+)\[(\w+)\]$/);
if (match) {
const record: MetricRecord = {
timestamp: Number(match[1]),
value: Number(match[2]),
tag: match[3],
};
this.push(record);
}
}
callback();
} catch (err) {
callback(err as Error);
}
}
}
3. Composition: Safe Pipeline Execution
The pipeline function from stream/promises is the standard for composing streams. Unlike pipe(), pipeline propagates errors across all stages and ensures resource cleanup, preventing file descriptor leaks.
import { pipeline } from 'stream/promises';
import { createGzip } from 'zlib';
async function archiveMetrics(inputPath: string, outputPath: string) {
const reader = fs.createReadStream(inputPath);
const extractor = new MetricExtractor();
const compressor = createGzip();
const writer = fs.createWriteStream(outputPath);
try {
// Pipeline handles backpressure, error propagation, and cleanup
await pipeline(reader, extractor, compressor, writer);
console.log('Archive complete.');
} catch (err) {
// Any error in any stage bubbles up here
console.error('Pipeline failed:', err);
// Streams are automatically destroyed on error
}
}
Architecture Rationale:
pipeline over pipe: pipe does not propagate errors to the source stream in all Node.js versions and requires manual error handling on each stream. pipeline centralizes error handling and guarantees cleanup.
objectMode for Parsing: When transforming text to objects, objectMode allows the stream to manage object references efficiently. However, it requires careful highWaterMark tuning since the limit counts objects, not bytes.
- Async Iterators:
for await...of simplifies consumption logic compared to event listeners, reducing boilerplate and improving readability for sequential processing.
Pitfall Guide
Production streaming code often fails due to subtle interactions between flow control, memory, and error states. The following pitfalls are derived from real-world incidents.
1. Silent Failures with pipe
Explanation: Using stream.pipe(dest) does not automatically propagate errors from downstream to upstream. If a write stream fails, the read stream may continue emitting data, wasting resources.
Fix: Always use pipeline from stream/promises. If manual composition is required, attach error listeners to every stream and call destroy() on failure.
2. objectMode Memory Leaks
Explanation: In objectMode, highWaterMark limits the number of objects, not bytes. A stream with highWaterMark: 16 holding large objects can consume significant memory. Developers often assume the memory footprint remains small.
Fix: Monitor heap usage when using objectMode. Reduce highWaterMark if memory spikes occur, or process objects in batches within the transform.
3. Blocking the Event Loop
Explanation: Heavy synchronous computation inside _transform blocks the event loop, stalling the entire pipeline and preventing backpressure signals from propagating.
Fix: Offload CPU-intensive work to worker threads or use setImmediate to yield control periodically. For I/O-bound transforms, ensure async operations do not block the callback.
4. Ignoring Backpressure
Explanation: Writing to a stream faster than it can drain causes the internal buffer to fill. If the developer ignores the return value of write(), data may be queued indefinitely, leading to memory exhaustion.
Fix: pipeline handles backpressure automatically. For manual writes, check the return value of write(). If false, wait for the drain event before writing more.
5. Encoding Mismatch Chaos
Explanation: Mixing buffered strings and binary buffers in the same pipeline causes data corruption. A transform expecting strings may receive buffers if encoding is not consistent.
Fix: Define encoding on readable streams or use objectMode consistently. Avoid implicit conversions; explicitly decode buffers in transforms if necessary.
6. Resource Leaks on Error
Explanation: If a pipeline fails and streams are not destroyed, file descriptors and network sockets remain open. This leads to EMFILE errors over time.
Fix: Use pipeline for automatic cleanup. For manual streams, wrap logic in try/catch and call stream.destroy(err) on failure.
7. Misusing PassThrough
Explanation: PassThrough is often used as a debugging tool but left in production code, adding unnecessary overhead. It also complicates error handling if not integrated correctly.
Fix: Use PassThrough only for testing or specific routing logic. Remove debug taps before deployment. Use dedicated logging transforms for production telemetry.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Config File (<1MB) | Buffering (readFileSync) | Simplicity outweighs streaming overhead. | Low |
| Large Log Processing (>1GB) | Streaming Pipeline | Prevents OOM; enables continuous processing. | High (Saves RAM) |
| HTTP File Upload | Stream to Disk | Avoids buffering upload in memory; scales with concurrency. | High (Saves RAM) |
| Real-time Data Feed | Transform + Writable | Low latency; processes data as it arrives. | Medium |
| Complex Data Transformation | Custom Transform Stream | Encapsulates logic; reusable across pipelines. | Medium |
Configuration Template
A reusable template for production streaming pipelines with telemetry and error handling.
import { pipeline, Transform, TransformCallback } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
// Telemetry transform for monitoring pipeline health
class PipelineTelemetry extends Transform {
private bytesProcessed = 0;
private chunksProcessed = 0;
constructor(private label: string) {
super();
}
_transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
this.bytesProcessed += chunk.length;
this.chunksProcessed++;
this.push(chunk);
callback();
}
_flush(callback: TransformCallback): void {
console.log(`[Telemetry] ${this.label}: ${this.chunksProcessed} chunks, ${this.bytesProcessed} bytes`);
callback();
}
}
// Robust pipeline executor with typed error handling
export async function executePipeline(
sourcePath: string,
destPath: string,
transform: Transform
): Promise<void> {
const source = createReadStream(sourcePath, { highWaterMark: 128 * 1024 });
const sink = createWriteStream(destPath);
const compressor = createGzip();
const telemetry = new PipelineTelemetry('MainPipeline');
try {
await pipeline(
source,
telemetry,
transform,
compressor,
sink
);
} catch (err) {
// Pipeline automatically destroys streams on error
console.error('Pipeline execution failed:', err);
throw err;
}
}
Quick Start Guide
- Import Pipeline: Use
import { pipeline } from 'stream/promises'; for safe composition.
- Create Source: Initialize a readable stream with
fs.createReadStream(path, { highWaterMark: 128 * 1024 }).
- Define Transform: Implement a class extending
Transform with _transform logic. Use objectMode: true if processing objects.
- Execute: Call
await pipeline(source, transform, destination) inside a try/catch block.
- Verify: Check logs for completion and monitor memory usage to confirm constant footprint.