st-response cycles, the response is never closed until the client disconnects or the server initiates termination.
import { createServer, IncomingMessage, ServerResponse } from 'node:http';
import { EventEmitter } from 'node:events';
interface TelemetryFrame {
id: string;
type: 'metric' | 'alert' | 'heartbeat';
payload: Record<string, unknown>;
}
class StreamEmitter extends EventEmitter {
private activeConnections: Set<ServerResponse> = new Set();
private eventCursor: number = 0;
private buffer: TelemetryFrame[] = [];
constructor() {
super();
// Emit heartbeat every 20s to prevent idle timeouts
setInterval(() => this.broadcast({
id: String(++this.eventCursor),
type: 'heartbeat',
payload: { ts: Date.now() }
}), 20000);
}
registerClient(res: ServerResponse): void {
this.activeConnections.add(res);
res.on('close', () => this.activeConnections.delete(res));
}
broadcast(frame: TelemetryFrame): void {
this.buffer.push(frame);
if (this.buffer.length > 500) this.buffer.shift(); // Bounded buffer
const formatted = [
`id: ${frame.id}`,
`event: ${frame.type}`,
`data: ${JSON.stringify(frame.payload)}`,
'' // Mandatory double newline
].join('\n');
for (const conn of this.activeConnections) {
if (!conn.writableEnded) {
conn.write(formatted);
}
}
}
}
const emitter = new StreamEmitter();
const server = createServer((req, res) => {
if (req.url !== '/api/stream/telemetry') {
res.writeHead(404).end();
return;
}
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache, no-transform',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no'
});
emitter.registerClient(res);
// Handle resume via Last-Event-ID
const lastId = req.headers['last-event-id'] as string | undefined;
if (lastId) {
const resumeIndex = emitter.buffer.findIndex(f => f.id === lastId);
if (resumeIndex !== -1) {
emitter.buffer.slice(resumeIndex + 1).forEach(frame => {
const formatted = [
`id: ${frame.id}`,
`event: ${frame.type}`,
`data: ${JSON.stringify(frame.payload)}`,
''
].join('\n');
res.write(formatted);
});
}
}
});
server.listen(8080, () => console.log('Stream emitter active on :8080'));
Architecture Rationale:
- Bounded buffer prevents memory leaks during high-throughput periods.
X-Accel-Buffering: no signals Nginx-compatible proxies to disable response buffering.
- Resume logic uses a linear scan for simplicity; production systems should index by event ID or use a time-series cursor.
- Heartbeat frames use the
heartbeat event type to avoid triggering business logic handlers.
Step 2: Client-Side Consumer with State Reconciliation
The native EventSource API handles connection lifecycle, but production applications require explicit state management, error boundaries, and cleanup routines.
class TelemetryStreamClient {
private source: EventSource | null = null;
private lastReceivedId: string | null = null;
private reconnectAttempts: number = 0;
private maxRetries: number = 5;
private onMessageCallback: (data: unknown) => void;
constructor(endpoint: string, callback: (data: unknown) => void) {
this.onMessageCallback = callback;
this.connect(endpoint);
}
private connect(url: string): void {
this.source = new EventSource(url);
this.source.addEventListener('metric', (evt: MessageEvent) => {
this.handleIncoming(evt);
});
this.source.addEventListener('alert', (evt: MessageEvent) => {
this.handleIncoming(evt);
});
this.source.onerror = () => {
this.reconnectAttempts++;
if (this.reconnectAttempts > this.maxRetries) {
this.source?.close();
console.error('Stream exhausted retry budget');
return;
}
// Native auto-reconnect handles the delay; we only track attempts
};
}
private handleIncoming(evt: MessageEvent): void {
if (evt.lastEventId) {
this.lastReceivedId = evt.lastEventId;
}
try {
const parsed = JSON.parse(evt.data);
this.onMessageCallback(parsed);
} catch {
console.warn('Malformed stream payload');
}
}
destroy(): void {
this.source?.close();
this.source = null;
}
}
export default TelemetryStreamClient;
Step 3: Authenticated Streaming via Fetch + ReadableStream
Native EventSource does not support custom headers, making token-based authentication impossible. The standard workaround replaces the API with a fetch call and a manual parser.
async function createAuthenticatedStream(
endpoint: string,
token: string,
onChunk: (data: unknown) => void
): Promise<() => void> {
const response = await fetch(endpoint, {
headers: {
'Authorization': `Bearer ${token}`,
'Accept': 'text/event-stream'
}
});
if (!response.ok || !response.body) {
throw new Error('Stream initialization failed');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
const readLoop = async () => {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
onChunk(JSON.parse(line.slice(6)));
} catch {
console.warn('Invalid JSON in stream chunk');
}
}
}
}
};
readLoop();
return () => reader.cancel();
}
Why this approach? It preserves standard HTTP authentication flows while sacrificing native auto-reconnect. Production implementations typically wrap this in a retry loop with exponential backoff, or leverage libraries like @microsoft/fetch-event-source that abstract the parsing and reconnection logic.
Pitfall Guide
1. Missing Double Newline Termination
Explanation: The SSE specification requires each event to end with two newline characters (\n\n). Omitting the second newline causes the browser to buffer the event indefinitely, waiting for the terminator.
Fix: Always append \n\n after the final data: line. When using template literals, ensure the string ends with \n\n or explicitly push an empty line.
2. Reverse Proxy Buffering
Explanation: Nginx, Apache, and Cloudflare apply response buffering by default. Streamed writes accumulate in memory until the buffer fills or the connection closes, defeating real-time delivery.
Fix: Disable buffering at the proxy level (proxy_buffering off;) and send X-Accel-Buffering: no from the application. For Cloudflare, route SSE endpoints through a dedicated worker or disable caching rules for the stream path.
3. The Six-Connection Ceiling
Explanation: HTTP/1.1 restricts browsers to six concurrent connections per origin. Opening multiple EventSource instances across tabs or components exhausts this limit, causing subsequent requests to queue silently.
Fix: Migrate to HTTP/2 or HTTP/3, which multiplex streams over a single TCP connection. Alternatively, implement a SharedWorker or BroadcastChannel coordinator where one tab maintains the connection and distributes events to siblings.
4. Silent Idle Timeouts
Explanation: Load balancers (AWS ALB defaults to 50s, GCP to 30s) terminate connections without data transfer. The browser interprets this as a network failure and reconnects, causing state drift.
Fix: Emit comment lines (:keepalive\n\n) or heartbeat events every 15-25 seconds. Comment lines are ignored by the client parser but keep the TCP connection active.
Explanation: new EventSource(url) does not accept a headers option. Attempting to pass authentication tokens via headers fails silently or throws in strict environments.
Fix: Use cookie-based authentication with withCredentials: true, or switch to the fetch + ReadableStream pattern. Never embed tokens in query strings for production systems due to log exposure.
6. Orphaned Event IDs
Explanation: Sending id: values without server-side state tracking breaks resume functionality. If the server restarts or the buffer clears, clients reconnect with stale IDs and receive duplicate or missing events.
Fix: Maintain a monotonic event counter or timestamp-based cursor. Implement idempotent event processing on the client, and validate Last-Event-ID against a bounded retention window.
7. Blocking the Event Loop
Explanation: Synchronous operations inside the stream handler (database queries, heavy JSON parsing, CPU-bound transformations) stall the Node.js event loop, delaying heartbeats and causing proxy timeouts.
Fix: Offload heavy processing to worker threads or message queues. Keep the stream emitter focused solely on I/O forwarding. Use streaming JSON parsers for large payloads.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Unidirectional feed (logs, alerts, AI tokens) | Native SSE (EventSource) | Zero reconnection code, native resume, standard HTTP routing | Lowest infra overhead |
| Bidirectional chat or collaborative editing | WebSockets | Requires client-to-server messaging and binary frame support | Higher connection state management |
| Legacy proxy environment (HTTP/1.1 only) | Long Polling or SSE + SharedWorker | Avoids 6-connection limit; polling works through strict firewalls | Increased request volume or worker complexity |
| Token-based auth required | fetch + ReadableStream | Bypasses EventSource header limitation; maintains streaming | Custom parser and retry logic required |
Configuration Template
# Nginx reverse proxy configuration for SSE endpoints
location /api/stream/ {
proxy_pass http://backend_cluster;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 24h;
proxy_send_timeout 24h;
add_header X-Accel-Buffering no;
}
// Node.js server hardening snippet
import { createServer } from 'node:http';
const server = createServer((req, res) => {
// Apply backpressure awareness
res.on('drain', () => { /* resume writes if paused */ });
// Set explicit timeout to override ALB defaults
res.setTimeout(0);
// Stream headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no'
});
// ... stream logic
});
// Prevent uncaught stream errors from crashing the process
process.on('uncaughtException', (err) => {
if (err.code === 'ECONNRESET') return; // Expected client disconnect
console.error('Unhandled stream error:', err);
});
Quick Start Guide
- Initialize the emitter: Create a Node.js HTTP server that responds to
/stream with text/event-stream headers and maintains a Set of active ServerResponse objects.
- Implement frame broadcasting: Write a method that formats events with
id:, event:, and data: fields, terminated by \n\n, and iterates over active connections to push payloads.
- Add client consumption: Instantiate
EventSource in your frontend, attach addEventListener handlers for specific event types, and implement a cleanup function that calls .close() on component unmount.
- Validate delivery: Run
curl -N -H "Accept: text/event-stream" http://localhost:8080/stream to verify raw frame output, then check browser DevTools β Network β EventStream tab for parsed events and reconnection behavior.