cher {
private dbPool: ReturnType<typeof createPool>;
constructor(dbConfig: Record<string, string>) {
this.dbPool = createPool(dbConfig);
}
async fetchUserReport(userId: string): Promise<Buffer> {
const [rows] = await this.dbPool.execute(
'SELECT report_path FROM user_reports WHERE user_id = ?',
[userId]
);
if (!Array.isArray(rows) || rows.length === 0) {
throw new Error('Report not found');
}
const filePath = (rows[0] as any).report_path;
return await readFile(filePath);
}
async close(): Promise<void> {
await this.dbPool.end();
}
}
**Why this works:** `fs/promises` and `mysql2/promise` schedule I/O on libuv's thread pool and return a Promise. The event loop continues processing other requests while the disk read or network query executes. Error handling is centralized through standard `try/catch` boundaries, preserving stack traces and preventing unhandled rejections.
### Step 2: Implement Streaming Pipelines for Large Payloads
Buffering entire files or database results in memory defeats the purpose of async I/O. Streams process data in chunks, applying backpressure to prevent memory exhaustion.
```typescript
import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { Transform } from 'stream';
export class DataTransformer {
private readonly chunkSize: number;
constructor(chunkSize = 64 * 1024) {
this.chunkSize = chunkSize;
}
async processLargeFile(inputPath: string, outputPath: string): Promise<void> {
const readStream = createReadStream(inputPath, { highWaterMark: this.chunkSize });
const writeStream = createWriteStream(outputPath, { highWaterMark: this.chunkSize });
const upperCaseTransform = new Transform({
transform(chunk, _encoding, callback) {
this.push(Buffer.from(chunk.toString().toUpperCase()));
callback();
}
});
await pipeline(readStream, upperCaseTransform, writeStream);
}
}
Why this works: pipeline from stream/promises automatically handles backpressure, error propagation, and stream cleanup. The highWaterMark configuration controls memory allocation per chunk. The transform stream processes data incrementally, keeping heap usage constant regardless of file size. If the destination slows down, backpressure automatically pauses the source, preventing buffer overflow.
Step 3: Offload CPU-Intensive Work
The event loop cannot handle heavy computation without blocking I/O scheduling. CPU-bound tasks must run outside the main thread.
import { Worker, isMainThread, parentPort } from 'worker_threads';
import { cpus } from 'os';
export class CpuTaskRouter {
private workers: Worker[] = [];
private currentIndex = 0;
constructor() {
const workerCount = Math.max(1, cpus().length - 1);
for (let i = 0; i < workerCount; i++) {
this.workers.push(new Worker(__filename));
}
}
async executeTask<T>(payload: unknown): Promise<T> {
const worker = this.workers[this.currentIndex];
this.currentIndex = (this.currentIndex + 1) % this.workers.length;
return new Promise((resolve, reject) => {
worker.once('message', resolve);
worker.once('error', reject);
worker.postMessage(payload);
});
}
shutdown(): void {
this.workers.forEach(w => w.terminate());
}
}
if (!isMainThread) {
parentPort?.on('message', (task) => {
const result = heavyComputation(task);
parentPort?.postMessage(result);
});
}
function heavyComputation(data: unknown): number {
let sum = 0;
for (let i = 0; i < 1e7; i++) sum += Math.sqrt(i);
return sum;
}
Why this works: Worker threads run isolated V8 instances with separate event loops. The main thread delegates computation, receives results via message passing, and remains free to handle network I/O. Round-robin distribution prevents worker starvation. This pattern scales CPU-bound work without impacting request latency.
Pitfall Guide
1. Synchronous JSON Parsing on Untrusted Payloads
Explanation: JSON.parse() blocks the event loop until the entire string is processed. Parsing a 20MB payload can freeze the thread for 200–500ms, dropping throughput for all concurrent connections.
Fix: Use chunked parsers like stream-json or JSONStream. Process payloads incrementally and validate structure before full deserialization.
2. Ignoring Stream Backpressure
Explanation: Piping a fast read stream into a slow write stream without backpressure handling causes memory to grow until the process crashes with FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed.
Fix: Always use stream/promises/pipeline or stream.pipeline. These utilities automatically pause the source when the destination buffer fills, maintaining constant memory usage.
3. Mixing Sync/Async in Array Iterations
Explanation: Array.prototype.forEach does not await Promises. Using it with async operations causes fire-and-forget behavior, leading to unhandled rejections and race conditions.
Fix: Use for...of with await for sequential execution, or Promise.all()/Promise.allSettled() for parallel execution. Never use forEach with async callbacks.
4. Unhandled Promise Rejections
Explanation: Rejected Promises without .catch() or try/catch crash the Node.js process in modern versions. This is especially dangerous in middleware chains and event handlers.
Fix: Wrap all async route handlers in a higher-order function that catches errors and forwards them to Express error middleware. Register process.on('unhandledRejection', handler) as a safety net.
Explanation: The default thread pool size is 4. Heavy file I/O, DNS resolution, or crypto operations can exhaust it, causing async calls to queue and behave like sync calls.
Fix: Set UV_THREADPOOL_SIZE environment variable based on workload (e.g., UV_THREADPOOL_SIZE=16 for I/O-heavy services). Monitor thread pool utilization with clinic.js or OpenTelemetry metrics.
6. Blocking Express Middleware
Explanation: Synchronous middleware (e.g., heavy validation, sync database lookups) runs on the main thread before the route handler. It blocks all subsequent requests regardless of route async patterns.
Fix: Convert all middleware to async functions. Use express-async-handler or a custom wrapper to ensure errors propagate to error middleware without crashing the server.
7. Memory Leaks from Unfinished Streams
Explanation: Streams that error or are abandoned without proper cleanup hold references to buffers and file descriptors, causing gradual memory growth and EMFILE errors.
Fix: Always attach error listeners, use pipeline for automatic cleanup, and implement timeout guards with AbortController or setTimeout that call .destroy() on idle streams.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Small config files (<1MB) | Async readFile | Simple, low memory, sufficient for startup | Minimal |
| Large reports/logs (>10MB) | Stream pipeline | Constant memory, backpressure prevents OOM | Moderate (stream setup) |
| CPU-heavy calculations | Worker Threads | Isolates event loop, scales with cores | Higher (thread overhead) |
| External API aggregation | Promise.allSettled | Parallel execution, graceful degradation | Low |
| Real-time data processing | Async iterators + streams | Backpressure-aware, handles variable throughput | Moderate |
Configuration Template
// server.ts
import express from 'express';
import { AsyncDataFetcher } from './AsyncDataFetcher';
import { DataTransformer } from './DataTransformer';
import { CpuTaskRouter } from './CpuTaskRouter';
import { registerMetrics } from './observability';
const app = express();
const fetcher = new AsyncDataFetcher({
host: process.env.DB_HOST!,
user: process.env.DB_USER!,
password: process.env.DB_PASS!,
database: process.env.DB_NAME!,
waitForConnections: true,
connectionLimit: 20,
queueLimit: 0
});
const transformer = new DataTransformer(128 * 1024);
const cpuRouter = new CpuTaskRouter();
registerMetrics();
app.get('/api/report/:userId', async (req, res, next) => {
try {
const report = await fetcher.fetchUserReport(req.params.userId);
res.setHeader('Content-Type', 'application/pdf');
res.send(report);
} catch (err) {
next(err);
}
});
app.post('/api/process', async (req, res, next) => {
try {
const result = await cpuRouter.executeTask<number>(req.body);
res.json({ result });
} catch (err) {
next(err);
}
});
app.use((err: Error, _req: express.Request, res: express.Response, _next: express.NextFunction) => {
console.error(err.stack);
res.status(500).json({ error: 'Internal server error' });
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});
process.on('SIGTERM', async () => {
await fetcher.close();
cpuRouter.shutdown();
process.exit(0);
});
Quick Start Guide
- Initialize project:
npm init -y && npm install express mysql2 stream-json @types/node
- Set environment variables: Export
DB_HOST, DB_USER, DB_PASS, DB_NAME, and UV_THREADPOOL_SIZE=8
- Run server:
npx ts-node server.ts
- Verify async behavior: Execute
npx autocannon -c 50 -d 10 http://localhost:3000/api/report/test-user and confirm p99 latency remains under 200ms with stable memory usage.