re controls.
4. Worker pool consumes tasks, calls the LLM API, and streams tokens back through the original WebSocket.
5. Session registry manages lifecycle, cleaning up resources on disconnect or timeout.
Implementation Architecture
The following TypeScript/Node.js implementation demonstrates the pattern using ws for transport, asyncio-style queue management, and OpenAI-compatible streaming. The structure prioritizes explicit lifecycle control, backpressure signaling, and error boundaries.
import { WebSocketServer, WebSocket } from 'ws';
import { createServer, IncomingMessage } from 'http';
import { OpenAI } from 'openai';
import { EventEmitter } from 'events';
interface SessionState {
socket: WebSocket;
queue: string[];
isProcessing: boolean;
lastActivity: number;
}
class InferenceGateway extends EventEmitter {
private sessions: Map<string, SessionState> = new Map();
private openai: OpenAI;
private wss: WebSocketServer;
constructor(port: number) {
super();
this.openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
this.wss = new WebSocketServer({ port });
this.setupListeners();
}
private setupListeners(): void {
this.wss.on('connection', (socket: WebSocket, request: IncomingMessage) => {
const sessionId = this.generateSessionId();
this.sessions.set(sessionId, {
socket,
queue: [],
isProcessing: false,
lastActivity: Date.now()
});
socket.on('message', async (raw: string) => {
const payload = JSON.parse(raw);
if (payload.type === 'PROMPT') {
await this.enqueueTask(sessionId, payload.content);
}
});
socket.on('close', () => this.teardownSession(sessionId));
socket.on('error', () => this.teardownSession(sessionId));
});
}
private async enqueueTask(sessionId: string, prompt: string): Promise<void> {
const session = this.sessions.get(sessionId);
if (!session) return;
session.queue.push(prompt);
session.lastActivity = Date.now();
// Immediate acknowledgment
session.socket.send(JSON.stringify({
type: 'ACK',
taskId: this.generateTaskId(),
queueDepth: session.queue.length
}));
if (!session.isProcessing) {
this.processQueue(sessionId);
}
}
private async processQueue(sessionId: string): Promise<void> {
const session = this.sessions.get(sessionId);
if (!session || session.queue.length === 0) {
session && (session.isProcessing = false);
return;
}
session.isProcessing = true;
const prompt = session.queue.shift()!;
try {
const stream = await this.openai.chat.completions.create({
model: 'gpt-3.5-turbo',
messages: [{ role: 'user', content: prompt }],
stream: true
});
for await (const chunk of stream) {
const token = chunk.choices[0]?.delta?.content;
if (token) {
session.socket.send(JSON.stringify({ type: 'TOKEN', data: token }));
}
}
session.socket.send(JSON.stringify({ type: 'COMPLETE' }));
} catch (error) {
session.socket.send(JSON.stringify({
type: 'ERROR',
message: error instanceof Error ? error.message : 'Inference failed'
}));
} finally {
// Process next item or mark idle
if (session.queue.length > 0) {
this.processQueue(sessionId);
} else {
session.isProcessing = false;
}
}
}
private teardownSession(sessionId: string): void {
const session = this.sessions.get(sessionId);
if (session) {
session.socket.terminate();
this.sessions.delete(sessionId);
}
}
private generateSessionId(): string {
return `sess_${Math.random().toString(36).substring(2, 11)}`;
}
private generateTaskId(): string {
return `task_${Date.now()}_${Math.random().toString(36).substring(2, 7)}`;
}
}
// Initialize gateway
const gateway = new InferenceGateway(parseInt(process.env.PORT || '8080'));
console.log('Inference gateway listening on port 8080');
Architecture Rationale
- Persistent Transport over HTTP: WebSockets maintain a single TCP connection per client, eliminating handshake overhead and connection pool exhaustion. The bidirectional channel enables server-initiated token streaming without polling.
- Immediate Acknowledgment: Returning a
TASK_QUEUED payload within milliseconds decouples user perception from inference duration. Clients can update UI state, disable input fields, or show progress indicators without blocking the main thread.
- In-Memory Queue with Sequential Processing: Each session maintains a lightweight FIFO queue. Tasks are processed sequentially per session to preserve conversational context, while multiple sessions run concurrently across the event loop.
- Worker Isolation: The inference client operates independently of the transport layer. If the LLM API rate-limits or times out, the WebSocket remains open, and the error payload is delivered gracefully without crashing the gateway.
- Explicit Lifecycle Management:
teardownSession ensures socket termination, queue cleanup, and memory release on disconnect. This prevents zombie connections and resource leaks in long-running deployments.
Pitfall Guide
1. Blocking the Event Loop with Synchronous SDK Calls
Explanation: Using synchronous HTTP clients or CPU-bound operations inside async handlers freezes the entire gateway. Node.js cannot process other WebSocket messages while waiting for a blocking call.
Fix: Always use async/await patterns with non-blocking I/O. Offload CPU-heavy tasks (e.g., prompt templating, embedding generation) to worker threads or external services.
2. Unbounded Queue Growth
Explanation: Without backpressure, rapid user input floods the session queue. Memory consumption grows linearly with queue depth, eventually triggering OOM kills.
Fix: Implement maxQueueSize thresholds. When exceeded, return a RATE_LIMITED acknowledgment and drop or defer oldest tasks. Integrate Redis-backed queues for persistence and distributed backpressure.
3. Connection State Leakage
Explanation: Failing to clean up session objects on disconnect leaves dangling references. Over time, the gateway accumulates dead sessions, consuming memory and processing phantom tasks.
Fix: Bind close and error events to explicit teardown routines. Implement heartbeat/ping intervals to detect stale connections. Use WeakMap or TTL-based expiration for session storage.
4. Stream Fragmentation & JSON Corruption
Explanation: Sending raw tokens over WebSocket without framing can result in partial payloads or interleaved messages when multiple streams run concurrently.
Fix: Wrap all transmissions in structured JSON envelopes with explicit type fields. Validate payloads on the client side. Use message boundaries or length prefixes for binary safety.
5. Over-Provisioning Fixed Worker Counts
Explanation: Hardcoding worker pools ignores traffic variance. During low load, resources sit idle. During spikes, queues back up and latency degrades.
Fix: Implement dynamic scaling based on queue depth and CPU utilization. Use container orchestration (Kubernetes HPA, Docker Swarm) to scale worker replicas automatically.
6. Silent API Failures
Explanation: LLM providers return rate limits, context window errors, or model deprecations. If unhandled, these failures terminate streams without client notification.
Fix: Wrap inference calls in try/catch blocks. Return structured error payloads with retry headers. Implement exponential backoff and circuit breakers for upstream dependencies.
7. Context Drift in Multi-Turn Sessions
Explanation: Sending isolated prompts without conversation history causes the model to lose context, generating disjointed or irrelevant responses.
Fix: Maintain a session-level message array. Append new prompts to the history before sending to the API. Implement context window trimming to stay within token limits.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Startup MVP / Low Traffic (<500 concurrent) | In-memory asyncio/async queue + single gateway node | Zero infrastructure overhead, rapid iteration, sufficient for burst testing | Minimal ($0-$50/mo) |
| Mid-scale SaaS (500-5,000 concurrent) | Redis-backed queue (BullMQ/Celery) + horizontal gateway cluster | Persistence across restarts, distributed backpressure, proven reliability | Moderate ($100-$400/mo) |
| Enterprise Event Streaming (>10k concurrent) | Apache Kafka + dedicated worker pool + API gateway | High throughput, replay capability, multi-consumer routing, audit trails | High ($500-$2,000+/mo) |
| Local Development / Testing | asyncio.Queue or Node.js async queue | No external dependencies, instant feedback, mirrors production logic | Free |
Configuration Template
# docker-compose.yml
version: '3.8'
services:
gateway:
build: ./gateway
ports:
- "8080:8080"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- QUEUE_TYPE=redis
- REDIS_URL=redis://cache:6379
depends_on:
- cache
deploy:
replicas: 2
resources:
limits:
memory: 512M
worker:
build: ./worker
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- QUEUE_TYPE=redis
- REDIS_URL=redis://cache:6379
depends_on:
- cache
deploy:
replicas: 4
resources:
limits:
memory: 1G
cache:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
volumes:
redis_data:
Quick Start Guide
- Initialize the project: Run
npm init -y and install dependencies: npm install ws openai dotenv.
- Configure environment variables: Create a
.env file with OPENAI_API_KEY=sk-your-key and PORT=8080.
- Deploy the gateway: Execute
node gateway.js or run via Docker using the provided compose file.
- Test the stream: Connect via WebSocket client, send
{"type":"PROMPT","content":"Explain quantum computing"}, and observe immediate ACK followed by TOKEN payloads and a COMPLETE signal.