ltaneously, we implement a worker pool pattern. This prevents overwhelming downstream services and respects rate limits.
type Task<T> = () => Promise<T>;
class ConcurrencyPool {
private running = 0;
private queue: Array<{ task: Task<any>; resolve: (value: any) => void; reject: (reason: any) => void }> = [];
constructor(private limit: number) {}
async run<T>(task: Task<T>): Promise<T> {
return new Promise<T>((resolve, reject) => {
this.queue.push({ task, resolve, reject });
this.drain();
});
}
private drain(): void {
while (this.running < this.limit && this.queue.length > 0) {
const { task, resolve, reject } = this.queue.shift()!;
this.running++;
task()
.then(resolve)
.catch(reject)
.finally(() => {
this.running--;
this.drain();
});
}
}
}
Why this architecture? Promise.all creates all promises immediately, which can exhaust file descriptors, connection pools, or trigger API rate limits. A concurrency pool defers promise creation until a worker slot opens, keeping resource usage predictable. The drain() method ensures the queue processes in FIFO order without blocking the event loop.
Step 2: Implement Structured Error Boundaries
Network calls and business logic must be separated. HTTP responses with 4xx or 5xx status codes do not reject fetch; they resolve successfully. We must explicitly validate the response and attach metadata to errors for downstream handling.
class ApiError extends Error {
constructor(
message: string,
public status: number,
public payload?: Record<string, unknown>
) {
super(message);
this.name = 'ApiError';
}
}
async function retrieveEndpoint(url: string, signal?: AbortSignal): Promise<unknown> {
const response = await fetch(url, { signal });
if (!response.ok) {
const body = await response.json().catch(() => ({}));
throw new ApiError(`Request failed: ${response.status}`, response.status, body);
}
return response.json();
}
Why this architecture? Custom error classes preserve stack traces while attaching contextual data (status codes, response bodies). This enables centralized error routing without string parsing. The AbortSignal integration ensures timeouts cancel in-flight requests, preventing memory leaks from dangling network sockets.
Step 3: Compose Operations with the Right Combinator
Different failure semantics require different combinators. Promise.all fails fast. Promise.allSettled isolates failures. Promise.any succeeds on first resolution. Choosing incorrectly leads to cascading outages or silent data loss.
async function syncInventoryBatch(
endpoints: string[],
pool: ConcurrencyPool
): Promise<Record<string, unknown>> {
const tasks = endpoints.map(url =>
pool.run(() => retrieveEndpoint(url))
);
const results = await Promise.allSettled(tasks);
const synced: Record<string, unknown> = {};
results.forEach((result, index) => {
const url = endpoints[index];
if (result.status === 'fulfilled') {
synced[url] = result.value;
} else {
console.warn(`Skipped ${url}: ${(result.reason as Error).message}`);
}
});
return synced;
}
Why this architecture? allSettled guarantees the pipeline completes regardless of individual failures. This is critical for batch operations where partial success is acceptable. The explicit iteration over results provides a single place to log, alert, or route failures to dead-letter queues.
Step 4: Stream Large Datasets with Async Generators
Loading entire datasets into memory causes heap spikes. Async generators yield items incrementally, allowing downstream consumers to process data as it arrives.
async function* paginateRecords(baseUrl: string, pageSize = 50): AsyncGenerator<unknown, void, unknown> {
let cursor: string | null = null;
do {
const url = cursor ? `${baseUrl}?cursor=${cursor}&limit=${pageSize}` : baseUrl;
const payload = await retrieveEndpoint(url);
const data = payload as { items: unknown[]; nextCursor: string | null };
yield* data.items;
cursor = data.nextCursor;
} while (cursor);
}
// Consumption
for await (const record of paginateRecords('/api/inventory')) {
await persistRecord(record);
}
Why this architecture? Generators decouple production from consumption. Memory usage remains constant regardless of dataset size. The for await...of loop naturally handles backpressure and cleanup, eliminating manual iterator management.
Pitfall Guide
1. Silent Promise Drops
Explanation: Calling an async function without await or .catch() creates an unhandled rejection. The operation runs in the background, but failures are swallowed until the runtime crashes or logs a warning.
Fix: Explicitly handle or intentionally suppress. Use void asyncOp().catch(logError) for fire-and-forget tasks, or await when the result matters.
2. Unbounded Parallelism
Explanation: Mapping an array to async functions (items.map(async i => await process(i))) creates all promises simultaneously. Large arrays exhaust connection pools, trigger rate limits, or cause OOM errors.
Fix: Use a concurrency limiter (ConcurrencyPool, p-limit, or async.queue) to cap simultaneous executions.
3. Callback-Promise Hybridization
Explanation: Mixing Node.js callback APIs (fs.readFile) with async/await breaks control flow. Errors thrown in callbacks don't propagate to try/catch blocks, and mixing paradigms increases cognitive load.
Fix: Use promise-native modules (fs/promises, util.promisify) or wrap legacy callbacks once at the boundary layer.
4. Overly Broad Try/Catch Blocks
Explanation: Wrapping synchronous code in try/catch inside an async function masks programming errors (typos, type mismatches) that should fail fast during development.
Fix: Scope try/catch strictly to async boundaries. Let synchronous errors bubble up to global handlers or test runners.
5. State Leakage in Promise Chains
Explanation: Mutating external variables inside .then() or await blocks creates race conditions. If the chain is reused or retried, stale state corrupts subsequent executions.
Fix: Return pure values from async functions. Pass state explicitly through parameters or use immutable data structures.
6. Ignoring HTTP Error Semantics
Explanation: Assuming fetch rejects on 4xx or 5xx responses leads to silent data corruption. The promise resolves, but the payload contains an error page or validation message.
Fix: Always check response.ok or response.status before parsing. Throw a structured error if the status indicates failure.
7. Microtask Queue Starvation
Explanation: Chaining thousands of synchronous operations inside .then() or await loops blocks the microtask queue. UI threads freeze, and I/O callbacks are delayed.
Fix: Yield control periodically using setTimeout(..., 0) or queueMicrotask() for CPU-heavy transformations. Prefer streaming or chunked processing.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Independent API calls with strict SLA | Promise.all + timeout wrapper | Fails fast, minimizes latency | Higher risk of cascading failure |
| Batch data sync with 1000+ items | Concurrency pool + allSettled | Prevents rate limits, isolates failures | Lower infrastructure cost, predictable memory |
| Fallback data sources | Promise.any | Returns first successful response | Reduces latency, improves availability |
| Streaming large datasets | Async generators + for await...of | Constant memory footprint | Lower heap usage, better GC behavior |
| Idempotent external calls | Exponential backoff + deduplication map | Handles transient failures safely | Higher retry cost, improved success rate |
Configuration Template
// async-pipeline.config.ts
export interface PipelineConfig {
concurrency: number;
maxRetries: number;
baseDelayMs: number;
requestTimeoutMs: number;
deduplicate: boolean;
}
export const defaultConfig: PipelineConfig = {
concurrency: 5,
maxRetries: 3,
baseDelayMs: 1000,
requestTimeoutMs: 8000,
deduplicate: true,
};
export function validateConfig(config: Partial<PipelineConfig>): PipelineConfig {
const merged = { ...defaultConfig, ...config };
if (merged.concurrency < 1 || merged.concurrency > 50) {
throw new Error('Concurrency must be between 1 and 50');
}
if (merged.maxRetries < 0 || merged.maxRetries > 10) {
throw new Error('Retries must be between 0 and 10');
}
if (merged.requestTimeoutMs < 1000) {
throw new Error('Timeout must be at least 1000ms');
}
return merged;
}
Quick Start Guide
- Initialize the pipeline: Import
validateConfig and create a PipelineConfig object matching your service's rate limits and memory constraints.
- Instantiate the worker pool: Pass the concurrency limit to
ConcurrencyPool. This caps simultaneous operations and prevents downstream overload.
- Wrap network calls: Use
retrieveEndpoint with an AbortController signal. Configure timeout via setTimeout and abort on expiration.
- Execute with isolation: Map your tasks through the pool, then await
Promise.allSettled. Iterate results to separate successes from failures and route errors to logging or dead-letter queues.