Async Generators Are Underused — Here's How to Change That
Pull-Based Data Pipelines: Mastering Async Generators for Production Systems
Current Situation Analysis
Modern backend systems routinely process datasets that exceed available heap memory. Whether you're syncing millions of database records, parsing multi-gigabyte log exports, or relaying real-time AI model outputs, the traditional approaches to data ingestion consistently hit the same wall: unbounded memory growth and fragmented flow control.
The industry standard has historically relied on three patterns:
- Bulk Loading: Fetching entire result sets into memory before processing. This guarantees O(N) memory complexity and triggers heap spikes that force garbage collection pauses or OOM crashes.
- Event-Driven Streams: Piping data through
Readable/Writablestreams with event listeners (data,end,error). While memory-efficient, this scatters business logic across callback boundaries, making error propagation, retry logic, and sequential processing notoriously difficult to maintain. - Manual Offset Pagination: Writing
while(true)loops withLIMIT/OFFSETor cursor tracking. This works but couples data fetching mechanics directly to consumer logic, creating boilerplate that obscures the actual business operation.
These patterns persist not because they're optimal, but because they're culturally entrenched. Most engineering onboarding materials emphasize push-based architectures (events, promises, bulk arrays). The pull-based model—where the consumer explicitly requests the next unit of data only when ready—requires a mental shift that rarely appears in standard curricula.
The reality is that JavaScript's async generators (async function*) have been stable since Node.js 10 and are universally supported across modern runtimes. They natively combine lazy evaluation with asynchronous execution, providing built-in backpressure handling and flat memory profiles. Yet production codebases continue to reinvent pagination loops and stream plumbing where a single generator abstraction would reduce complexity by 60-80%.
WOW Moment: Key Findings
When you replace traditional data ingestion patterns with async generators, the architectural trade-offs shift dramatically. The following comparison isolates the operational impact across three critical dimensions:
| Approach | Memory Footprint | Backpressure Handling | Error Propagation | Code Cohesion |
|---|---|---|---|---|
| Bulk Loading | O(N) - scales linearly with dataset size | None - blocks until complete | Promise rejection at fetch time | High (single block) |
| Event Streams | O(1) - constant buffer size | Manual (pause/resume, drain events) | Fragmented across listeners | Low (scattered callbacks) |
| Async Generators | O(1) - constant buffer size | Automatic (pull-based flow control) | Native try/catch in consumer |
High (single iterator) |
Why this matters: Async generators transform data ingestion from a plumbing problem into a control flow problem. The consumer dictates the pace, the generator manages the fetch mechanics, and memory usage remains constant regardless of dataset scale. This eliminates the need for manual stream pause/resume logic, removes offset tracking boilerplate, and aligns asynchronous data processing with the familiar for await...of syntax.
Core Solution
The implementation strategy revolves around three production-grade patterns. Each encapsulates data retrieval mechanics inside an async generator, exposing a clean iterable interface to downstream consumers.
Pattern 1: Database Cursor Pagination
When processing millions of records, offset-based pagination degrades in performance as the dataset grows. A cursor-based approach combined with an async generator maintains consistent query performance while keeping memory flat.
import { PrismaClient } from '@prisma/client';
interface CursorPaginationOptions<T> {
batchSize?: number;
filter?: T;
orderBy?: Record<string, 'asc' | 'desc'>;
}
async function* createRecordCursor<T extends Record<string, unknown>>(
prisma: PrismaClient,
model: string,
options: CursorPaginationOptions<T> = {}
): AsyncGenerator<T[]> {
const { batchSize = 250, filter = {}, orderBy = { id: 'asc' } } = options;
let lastCursor: string | undefined;
while (true) {
const queryOptions: Record<string, unknown> = {
where: filter,
take: batchSize,
orderBy,
};
if (lastCursor) {
queryOptions.cursor = { id: lastCursor };
queryOptions.skip = 1;
}
const batch = await (prisma as any)[model].findMany(queryOptions);
if (batch.length === 0) break;
yield batch;
if (batch.length < batchSize) break;
lastCursor = batch[batch.length - 1].id;
}
}
// Consumer implementation
async function syncUserMetrics(prisma: PrismaClient) {
const iterator = createRecordCursor(prisma, 'User', {
batchSize: 500,
filter: { status: 'ACTIVE' },
orderBy: { lastLogin: 'desc' }
});
for await (const userBatch of iterator) {
await Promise.all(
userBatch.map(user => calculateEngagementScore(user))
);
}
}
Architecture Rationale:
- Cursor-based pagination prevents the
OFFSETperformance degradation that occurs past ~100k rows. - The generator yields batches, not individual records, reducing database round-trips while maintaining O(1) memory.
for await...ofnaturally pauses iteration until the consumer finishes processing the current batch, implementing automatic backpressure without explicit stream controls.
Pattern 2: Large File Line Iterator
Processing multi-gigabyte CSV or log files requires line-by-line extraction without loading the entire file into memory. Node's fs.createReadStream provides chunked I/O, but raw chunks split arbitrarily across line boundaries. The generator handles buffer management transparently.
import { createReadStream } from 'fs';
import { Readable } from 'stream';
interface FileLineOptions {
chunkSize?: number;
encoding?: BufferEncoding;
delimiter?: string;
}
async function* createLineIterator(
filePath: string,
options: FileLineOptions = {}
): AsyncGenerator<string> {
const { chunkSize = 32768, encoding = 'utf-8', delimiter = '\n' } = options;
const stream: Readable = createReadStream(filePath, {
highWaterMark: chunkSize,
encoding,
});
let pendingBuffer = '';
try {
for await (const rawChunk of stream) {
pendingBuffer += rawChunk;
const segments = pendingBuffer.split(delimiter);
pendingBuffer = segments.pop() || '';
for (const segment of segments) {
const trimmed = segment.trim();
if (trimmed.length > 0) {
yield trimmed;
}
}
}
if (pendingBuffer.trim().length > 0) {
yield pendingBuffer.trim();
}
} finally {
stream.destroy();
}
}
// Consumer implementation
async function ingestTransactionLogs(logPath: string) {
const lineStream = createLineIterator(logPath, {
chunkSize: 65536,
delimiter: '\r\n'
});
for await (const rawLine of lineStream) {
const record = parseTransactionLine(rawLine);
await validateAndStore(record);
}
}
Architecture Rationale:
highWaterMarkcontrols the internal buffer size, allowing fine-tuning between I/O throughput and memory pressure.- The
finallyblock guarantees stream destruction even if the consumer breaks early or throws an error. - Buffer splitting logic isolates partial line handling, ensuring consumers never receive fragmented records.
Pattern 3: LLM/SSE Token Relay
Large language models and Server-Sent Events deliver data incrementally. Accumulating the full response before forwarding introduces unacceptable latency. The generator consumes the vendor stream token-by-token and exposes a standardized iterable interface.
import { Anthropic } from '@anthropic-ai/sdk';
interface StreamCompletionOptions {
model?: string;
maxTokens?: number;
temperature?: number;
}
async function* createTokenStream(
client: Anthropic,
prompt: string,
options: StreamCompletionOptions = {}
): AsyncGenerator<string> {
const { model = 'claude-3-5-sonnet-20241022', maxTokens = 2048, temperature = 0.7 } = options;
const response = await client.messages.create({
model,
max_tokens: maxTokens,
temperature,
messages: [{ role: 'user', content: prompt }],
stream: true,
});
for await (const event of response) {
if (event.type === 'content_block_delta') {
const delta = event.delta?.text;
if (delta) {
yield delta;
}
}
}
}
// Consumer implementation (Express middleware)
app.post('/generate', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const tokenIterator = createTokenStream(anthropicClient, req.body.prompt);
try {
for await (const tokenChunk of tokenIterator) {
res.write(`data: ${JSON.stringify({ content: tokenChunk })}\n\n`);
}
res.write('data: [DONE]\n\n');
res.end();
} catch (error) {
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
res.end();
}
});
Architecture Rationale:
- Vendor SDK streams are wrapped to create a stable internal contract. Switching from Anthropic to OpenAI or Mistral only requires modifying the generator, not every downstream consumer.
- The
for await...ofloop in the HTTP handler naturally throttles token delivery to match network write capacity, preventing unbounded buffer accumulation. - SSE formatting is handled at the consumer layer, keeping the generator focused purely on data extraction.
Pitfall Guide
Async generators introduce subtle failure modes that don't appear in traditional async/await patterns. Production deployments must account for these edge cases.
1. Ignoring Generator Cleanup on Early Termination
Explanation: When a consumer breaks out of a for await...of loop, the generator's execution context is abandoned. If the generator holds open database connections, file handles, or network sockets, they leak.
Fix: Always wrap resource acquisition in try/finally blocks inside the generator. The finally block executes when the consumer calls .return() or breaks early.
2. Mixing Synchronous and Asynchronous Yields
Explanation: An async function* must only yield Promises or values that resolve asynchronously. Yielding a synchronous value inside an async generator breaks the iterator contract and causes type mismatches in TypeScript.
Fix: Ensure every yield statement is either a direct value (which gets wrapped in a resolved Promise) or an awaited async operation. Never mix yield syncValue with yield await asyncOp in the same generator without explicit type alignment.
3. Unhandled Rejections in Consumer Loops
Explanation: If the generator throws an error, it propagates to the consumer's for await...of loop. Without a try/catch, the rejection becomes unhandled and crashes the process.
Fix: Always wrap consumer iteration in try/catch. Log or transform the error appropriately, and ensure the generator's finally block runs to release resources.
4. Blocking the Event Loop Inside the Generator
Explanation: Generators execute synchronously until the first await or yield. If you perform CPU-intensive work (JSON parsing, cryptographic hashing, heavy computation) before yielding, you block the event loop and degrade throughput.
Fix: Offload CPU-bound work to worker threads or break it into micro-tasks using setImmediate/queueMicrotask. Yield control back to the event loop periodically during heavy processing.
5. Over-Engineering Simple Sequential Tasks
Explanation: Not every async operation needs a generator. If you're fetching a single record, processing a small array, or making one HTTP request, a generator adds unnecessary abstraction layers.
Fix: Reserve async generators for scenarios involving: (a) unknown dataset sizes, (b) continuous streams, (c) backpressure requirements, or (d) repeated batch operations. Use standard async/await for deterministic, single-shot operations.
6. Misconfiguring Stream High Water Marks
Explanation: When wrapping Node streams, the highWaterMark dictates internal buffer size. Setting it too high causes memory spikes; too low causes excessive I/O syscalls and context switching.
Fix: Benchmark your specific workload. For text files, 32KB-64KB is typically optimal. For binary data, 256KB-1MB reduces syscall overhead. Monitor heap usage with process.memoryUsage() during load testing.
7. Forgetting to Handle Consumer Break Conditions
Explanation: Consumers may exit early due to business logic (e.g., finding a target record, hitting a rate limit, or user cancellation). The generator must gracefully handle this without throwing or leaking state.
Fix: Design generators to be idempotent and stateless where possible. Use try/finally for cleanup, and avoid maintaining mutable state across yield boundaries unless explicitly required.
Production Bundle
Action Checklist
- Audit existing pagination loops and stream event handlers for generator migration candidates
- Implement
try/finallyblocks in all generator functions to guarantee resource cleanup - Wrap consumer
for await...ofloops intry/catchto handle generator rejections gracefully - Benchmark
highWaterMarkand batch sizes under production load before deployment - Add TypeScript return type annotations (
AsyncGenerator<T, void, unknown>) for strict type safety - Implement circuit breakers or timeout wrappers around external API generators to prevent hanging iterators
- Profile heap memory with
--inspectand Chrome DevTools to verify O(1) memory behavior - Document generator contracts clearly: what triggers termination, what errors are thrown, and cleanup guarantees
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|---|---|---|
| Dataset < 10k records, single fetch | Standard async/await + array |
Generator overhead outweighs benefits | Lower infrastructure cost |
| Dataset > 100k records, batch processing | Async Generator with cursor pagination | Flat memory, automatic backpressure, cleaner consumer code | Reduced RAM costs, fewer OOM incidents |
| Real-time LLM/SSE streaming | Async Generator wrapping vendor SDK | Vendor abstraction, natural flow control, simplified HTTP relay | Lower latency, better UX, easier SDK swaps |
| Multi-GB log/CSV processing | Async Generator with stream buffering | Line-by-line extraction without heap spikes | Predictable memory, no crash recovery overhead |
| High-frequency WebSocket feed | Async Generator + internal queue | Decouples network I/O from business logic | Improved throughput, easier rate limiting |
Configuration Template
// generator-factory.ts
import { Readable } from 'stream';
export interface GeneratorLifecycle {
onInit?: () => Promise<void>;
onCleanup?: () => Promise<void>;
onError?: (error: Error) => void;
}
export function createSafeAsyncGenerator<T>(
lifecycle: GeneratorLifecycle,
generatorFn: () => AsyncGenerator<T>
): AsyncGenerator<T> {
let initialized = false;
async function* wrapped(): AsyncGenerator<T> {
try {
if (lifecycle.onInit && !initialized) {
await lifecycle.onInit();
initialized = true;
}
const source = generatorFn();
yield* source;
} catch (error) {
lifecycle.onError?.(error as Error);
throw error;
} finally {
if (lifecycle.onCleanup) {
await lifecycle.onCleanup();
}
}
}
return wrapped();
}
// Usage example
const safeDbCursor = createSafeAsyncGenerator(
{
onInit: async () => { await database.connect(); },
onCleanup: async () => { await database.disconnect(); },
onError: (err) => { logger.warn('Cursor failed', err); }
},
() => createRecordCursor(prisma, 'User', { batchSize: 500 })
);
Quick Start Guide
- Identify a candidate: Find a
while(true)pagination loop, event stream handler, or bulk data fetch that processes data sequentially. - Extract fetch logic: Move the data retrieval mechanism into an
async function*. Replacereturnorbreakwithyieldfor each batch or unit. - Add cleanup: Wrap resource acquisition in
try/finallyto handle early consumer termination and error states. - Update consumer: Replace the original loop with
for await (const item of generator) { ... }. Addtry/catchfor error handling. - Validate: Run load tests monitoring
process.memoryUsage().heapUsed. Confirm memory remains constant as dataset size increases.
Mid-Year Sale — Unlock Full Article
Base plan from just $4.99/mo or $49/yr
Sign in to read the full article and unlock all tutorials.
Sign In / Register — Start Free Trial7-day free trial · Cancel anytime · 30-day money-back
