he following implementation demonstrates a structured log parser that reads raw binary data, filters by severity, transforms records into CSV format, and writes to disk with explicit backpressure management.
Transform streams modify data as it passes through. We'll create a parser that handles line-buffering (since TCP/FS streams don't guarantee line boundaries) and filters records.
import { Transform, TransformCallback } from 'stream';
interface LogRecord {
timestamp: string;
level: string;
message: string;
}
interface FilterOptions {
minSeverity: number;
objectMode?: boolean;
}
export class SeverityFilter extends Transform {
private buffer: string = '';
private readonly severityMap: Record<string, number> = {
DEBUG: 0,
INFO: 1,
WARN: 2,
ERROR: 3,
FATAL: 4,
};
constructor(private readonly config: FilterOptions) {
super({ objectMode: config.objectMode ?? true });
}
_transform(
rawChunk: Buffer,
_encoding: BufferEncoding,
callback: TransformCallback
): void {
this.buffer += rawChunk.toString('utf-8');
const lines = this.buffer.split(/\r?\n/);
// Preserve incomplete trailing line for next chunk
this.buffer = lines.pop() ?? '';
for (const line of lines) {
if (!line.trim()) continue;
const parsed = this.parseLine(line);
if (!parsed) continue;
const severity = this.severityMap[parsed.level] ?? 0;
if (severity >= this.config.minSeverity) {
this.push(this.formatOutput(parsed));
}
}
callback();
}
_flush(callback: TransformCallback): void {
if (this.buffer.trim()) {
const parsed = this.parseLine(this.buffer);
if (parsed) {
const severity = this.severityMap[parsed.level] ?? 0;
if (severity >= this.config.minSeverity) {
this.push(this.formatOutput(parsed));
}
}
}
callback();
}
private parseLine(raw: string): LogRecord | null {
const match = raw.match(/^\[(\d{4}-\d{2}-\d{2}T[\d:Z]+)\]\s+(\w+)\s+(.*)$/);
if (!match) return null;
return { timestamp: match[1], level: match[2], message: match[3] };
}
private formatOutput(record: LogRecord): string {
return `${record.timestamp},${record.level},"${record.message.replace(/"/g, '""')}"\n`;
}
}
Step 2: Wire the Pipeline with Safety Guarantees
Manual .pipe() chaining lacks automatic cleanup on error. stream/promises provides pipeline(), which ensures all streams are destroyed and handles backpressure natively.
import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { SeverityFilter } from './SeverityFilter';
export async function runLogAggregation(
sourcePath: string,
destinationPath: string,
threshold: number
): Promise<void> {
const reader = createReadStream(sourcePath, {
highWaterMark: 128 * 1024, // 128KB chunks
encoding: 'utf-8',
});
const writer = createWriteStream(destinationPath, {
flags: 'a',
encoding: 'utf-8',
});
const filter = new SeverityFilter({ minSeverity: threshold });
try {
await pipeline(reader, filter, writer);
console.log(`Aggregation complete. Output: ${destinationPath}`);
} catch (err) {
// pipeline() automatically destroys all streams on error
console.error('Pipeline failed:', err);
throw err;
}
}
Architecture Decisions & Rationale
highWaterMark Tuning: The default 64KB works for most disk I/O. Increasing to 128KB reduces syscall frequency for fast SSDs, but diminishing returns appear beyond 256KB due to V8 allocation overhead. Always benchmark against your storage backend.
- Object Mode vs Binary: The filter uses
objectMode: true internally to pass structured records between transforms, then serializes to CSV strings before writing. Mixing modes in a single pipeline causes silent data corruption; explicit serialization boundaries prevent this.
_flush Implementation: Network and file streams may end mid-line. _flush guarantees the final buffered segment is processed before the stream closes, preventing data loss.
pipeline() over Manual Chaining: pipeline() attaches error listeners to every stream, propagates failures upward, and calls .destroy() on all components. This eliminates resource leaks and zombie processes in production.
Pitfall Guide
1. Ignoring Writable Backpressure
Explanation: Calling write() repeatedly without checking its return value fills the internal buffer. Once full, Node.js queues data in memory, defeating the purpose of streaming.
Fix: Check the boolean return value. If false, pause reading or wait for the drain event before resuming writes.
2. Silent Stream Failures
Explanation: Streams emit error events. If unhandled, they crash the process in modern Node.js. Developers often attach data and end listeners but forget error.
Fix: Always attach error handlers, or use pipeline() which routes errors to the returned promise.
Explanation: Synchronous heavy computation (e.g., regex on massive strings, JSON.parse on unbounded data) inside stream callbacks blocks the event loop, stalling all other I/O.
Fix: Offload CPU-intensive work to worker threads, or break processing into smaller async steps using setImmediate() or queueMicrotask().
4. Incorrect highWaterMark Configuration
Explanation: Setting highWaterMark too low increases syscall overhead and reduces throughput. Setting it too high increases memory pressure and latency spikes.
Fix: Start with 64KBβ128KB. Profile with --trace-gc and monitor process.memoryUsage().heapUsed under load. Adjust based on I/O latency, not arbitrary numbers.
Explanation: Streams may terminate with incomplete data in the buffer. Without _flush, trailing bytes are discarded.
Fix: Always implement _flush to process remaining buffer content before calling the callback.
6. Mixing Object and Binary Modes Implicitly
Explanation: Piping a binary stream into an object-mode transform without explicit conversion causes TypeError: invalid data.
Fix: Declare objectMode explicitly in constructor options. Serialize/deserialize at pipeline boundaries.
7. Assuming end Means Success
Explanation: The end event fires when the readable source closes, not when the writable destination finishes flushing to disk.
Fix: Listen to finish on writable streams, or rely on pipeline() which resolves only after all data is flushed.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Processing files <50MB | fs.readFile() + in-memory transform | Simpler code, negligible memory overhead | Lower dev time, identical infra cost |
| Processing files >1GB or unbounded network streams | pipeline() with Transform streams | Bounded memory, backpressure native, GC stable | Slightly higher CPU overhead, massive RAM savings |
| Real-time log forwarding | PassThrough + TCP/HTTP writable | Zero-copy forwarding, minimal transformation | Network bandwidth bound, CPU minimal |
| CPU-heavy data transformation | Worker threads + stream chunking | Prevents event loop blocking, maintains throughput | Higher infra cost (worker processes), better latency |
| High-latency storage (S3, NFS) | Larger highWaterMark (256KBβ1MB) | Reduces round-trip overhead, batches I/O | Higher per-stream memory, better throughput |
Configuration Template
// stream.config.ts
import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { SeverityFilter } from './SeverityFilter';
export interface PipelineConfig {
inputPath: string;
outputPath: string;
minSeverity: number;
chunkSizeBytes?: number;
abortSignal?: AbortSignal;
}
export async function executeSecurePipeline(config: PipelineConfig): Promise<void> {
const { inputPath, outputPath, minSeverity, chunkSizeBytes = 128 * 1024, abortSignal } = config;
const reader = createReadStream(inputPath, {
highWaterMark: chunkSizeBytes,
encoding: 'utf-8',
signal: abortSignal,
});
const writer = createWriteStream(outputPath, {
flags: 'w',
encoding: 'utf-8',
signal: abortSignal,
});
const transformer = new SeverityFilter({ minSeverity });
// pipeline() handles cleanup, backpressure, and error propagation
await pipeline(reader, transformer, writer);
}
Quick Start Guide
- Initialize the project:
npm init -y && npm install typescript @types/node
- Create the transform module: Save the
SeverityFilter class as src/SeverityFilter.ts
- Create the pipeline runner: Save the
executeSecurePipeline function as src/pipeline.ts
- Execute: Run
npx ts-node -e "import { executeSecurePipeline } from './src/pipeline'; executeSecurePipeline({ inputPath: 'logs.raw', outputPath: 'filtered.csv', minSeverity: 2 }).catch(console.error);"
- Verify: Check
filtered.csv for WARN/ERROR/FATAL records. Monitor memory with node --inspect or process.memoryUsage() to confirm bounded allocation.