FORM pg_notify(
NEW.routing_key,
json_build_object(
'event_id', NEW.id,
'routing_key', NEW.routing_key,
'emitted_at', NEW.published_at
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_broadcast_outbox
AFTER INSERT ON event_outbox
FOR EACH ROW
EXECUTE FUNCTION broadcast_outbox_event();
**Architecture Rationale**: The trigger executes synchronously during `INSERT`. This couples the notification to the transaction lifecycle, preventing orphaned events. The payload sent via `pg_notify` contains only metadata (ID, routing key, timestamp). The full payload remains in the `event_outbox` table, allowing listeners to fetch complete data on demand and stay within the 8KB notification limit.
### Step 2: Dedicated Listener Implementation
Connection pooling tools like PgBouncer operate in transaction or statement mode by default. Both modes return connections to the pool between queries, which silently drops `LISTEN` state. The listener must establish a direct TCP connection to PostgreSQL, bypassing the pooler entirely.
```typescript
import { Pool, PoolClient, Notification } from 'pg';
import { EventEmitter } from 'events';
interface ListenerConfig {
connectionString: string;
routingKeys: string[];
pollIntervalMs: number;
maxReconnectAttempts: number;
}
export class PgEventSubscriber extends EventEmitter {
private client: PoolClient | null = null;
private isRunning = false;
private reconnectBackoff = 1000;
constructor(private config: ListenerConfig) {
super();
}
async start(): Promise<void> {
this.isRunning = true;
await this.establishDirectConnection();
this.subscribeToChannels();
this.listenLoop();
}
private async establishDirectConnection(): Promise<void> {
// Bypass pooler: use raw connection string with application_name
const directPool = new Pool({
connectionString: this.config.connectionString,
max: 1,
idleTimeoutMillis: 0,
application_name: 'pg-event-listener'
});
this.client = await directPool.connect();
await this.client.query('SET statement_timeout = 0');
await this.client.query('SET idle_in_transaction_session_timeout = 0');
}
private subscribeToChannels(): void {
if (!this.client) return;
for (const key of this.config.routingKeys) {
this.client.query(`LISTEN ${key}`);
}
}
private listenLoop(): void {
if (!this.client) return;
this.client.on('notification', async (msg: Notification) => {
try {
const payload = JSON.parse(msg.payload);
this.emit('event', {
routingKey: msg.channel,
eventId: payload.event_id,
metadata: payload
});
} catch (err) {
this.emit('error', new Error(`Failed to parse notification: ${err}`));
}
});
this.client.on('error', (err) => {
this.handleDisconnection(err);
});
}
private async handleDisconnection(err: Error): Promise<void> {
if (!this.isRunning) return;
console.warn(`Listener disconnected: ${err.message}`);
this.client?.removeAllListeners();
this.client?.release();
this.client = null;
let attempts = 0;
while (attempts < this.config.maxReconnectAttempts && this.isRunning) {
attempts++;
await new Promise(r => setTimeout(r, this.reconnectBackoff * attempts));
try {
await this.establishDirectConnection();
this.subscribeToChannels();
this.listenLoop();
await this.recoverMissedEvents();
this.reconnectBackoff = 1000; // Reset backoff
return;
} catch (reconnectErr) {
console.error(`Reconnect attempt ${attempts} failed`);
}
}
this.emit('error', new Error('Max reconnect attempts reached'));
}
private async recoverMissedEvents(): Promise<void> {
if (!this.client) return;
// Poll outbox for unacknowledged events since last known timestamp
const lastSeen = new Date(Date.now() - 60000); // Fallback window
const res = await this.client.query(
`SELECT id, routing_key, payload FROM event_outbox
WHERE acknowledged_at IS NULL AND published_at > $1
ORDER BY id ASC LIMIT 500`,
[lastSeen]
);
for (const row of res.rows) {
this.emit('event', {
routingKey: row.routing_key,
eventId: row.id,
metadata: row.payload,
recovered: true
});
}
}
stop(): void {
this.isRunning = false;
this.client?.release();
}
}
Architecture Rationale:
max: 1 and idleTimeoutMillis: 0 enforce a single, persistent connection.
SET statement_timeout = 0 prevents the database from killing the idle listener during long waits.
- The
notification event handler parses metadata only. Full payload retrieval happens downstream, keeping the notification channel lightweight.
- Reconnection logic implements exponential backoff and triggers outbox polling to recover events missed during downtime. This converts best-effort delivery into at-least-once semantics.
Step 3: Payload Strategy and Downstream Processing
Notifications carry ~100 bytes of metadata. The consumer fetches the full payload from event_outbox using the event_id. This pattern respects the 7,999-byte hard limit while enabling rich event structures.
subscriber.on('event', async (evt) => {
if (evt.recovered) {
// Idempotency check required for recovered events
const alreadyProcessed = await checkIdempotencyStore(evt.eventId);
if (alreadyProcessed) return;
}
const fullPayload = await fetchPayloadFromOutbox(evt.eventId);
await processBusinessLogic(fullPayload);
await markAsAcknowledged(evt.eventId);
});
Architecture Rationale: Separating notification metadata from payload storage solves three problems simultaneously: it respects the 8KB limit, enables event replay during outages, and provides a natural idempotency boundary. The acknowledged_at column acts as a processing cursor.
Pitfall Guide
1. Connection Pooler Interference
Explanation: PgBouncer in transaction or statement mode returns connections to the pool between queries. LISTEN state is session-bound and vanishes when the connection is recycled. The listener appears to work initially but silently drops events after the first transaction completes.
Fix: Connect directly to PostgreSQL using a dedicated pool with max: 1. Set application_name to distinguish listener traffic from transactional queries. Never route listener connections through PgBouncer transaction mode.
2. Payload Overflow
Explanation: PostgreSQL enforces a hard 7,999-byte limit on NOTIFY payloads. Exceeding this limit throws a server error and drops the notification. Developers often embed full JSON objects, triggering silent failures in production.
Fix: Send only event metadata (ID, type, timestamp) via NOTIFY. Store the complete payload in a durable table. Fetch full data downstream using the event ID.
3. Silent Message Loss During Deploys
Explanation: Container restarts, database failovers, or network partitions disconnect listeners. Without persistence, any notification fired during the downtime is permanently lost. Teams assume the database will buffer events; it does not.
Fix: Implement the outbox pattern. Write events to a table first, then trigger pg_notify. On reconnection, poll the outbox for unacknowledged records within a recovery window.
4. Blocking the Listener Thread
Explanation: Performing synchronous I/O, heavy computation, or blocking database queries inside the notification event handler stalls the event loop. Subsequent notifications queue up, increasing latency and risking connection timeouts.
Fix: Offload processing to a worker queue or async task pool. The listener should only parse metadata, emit events, and return immediately. Use backpressure controls to prevent worker saturation.
5. Channel Sprawl and Routing Inefficiency
Explanation: Creating a separate LISTEN channel for every entity (e.g., order_123, user_456) exhausts database resources and complicates listener management. PostgreSQL handles channels efficiently up to ~20 distinct routing keys. Beyond that, memory overhead and subscription management degrade.
Fix: Use hierarchical routing keys (orders.status, users.profile, payments.receipt). Implement client-side filtering or a lightweight dispatcher service that routes notifications to specific consumers based on metadata.
6. Assuming Exactly-Once Delivery
Explanation: LISTEN/NOTIFY provides at-most-once delivery natively. Network retries, listener restarts, and outbox polling introduce duplicate events. Teams building financial or inventory systems often assume deduplication happens automatically.
Fix: Design consumers for idempotency. Use event IDs as unique keys in downstream systems. Implement idempotency tokens or deduplication tables. Never rely on the transport layer for exactly-once semantics.
7. Ignoring Reconnection Backoff
Explanation: Immediate reconnection attempts after a database restart or network partition create connection storms. The database rejects rapid retries, and the listener enters a failure loop.
Fix: Implement exponential backoff with jitter. Start at 1 second, double on each failure, cap at 30 seconds. Reset backoff only after a successful subscription and a clean notification cycle.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| <5K events/sec, single service | PostgreSQL LISTEN/NOTIFY + Outbox | Zero infra overhead, sub-8ms latency, sufficient reliability | $0 additional |
| 5Kβ10K events/sec, multi-service | PostgreSQL LISTEN/NOTIFY + Outbox + Dispatcher | Database handles load; dispatcher manages routing complexity | $0β50/mo (compute) |
| >10K events/sec, strict ordering | Redis Pub/Sub or Kafka | PostgreSQL CPU overhead exceeds 15%, latency degrades | $50β200/mo |
| Pattern-based subscriptions, >20 channels | Redis Pub/Sub or Kafka | Channel management becomes inefficient in PG | $50β200/mo |
| Message replay, audit compliance | Kafka or PostgreSQL Outbox with retention | Native NOTIFY lacks history; outbox provides replay window | $0β100/mo (storage) |
Configuration Template
-- Outbox Schema
CREATE TABLE event_outbox (
id BIGSERIAL PRIMARY KEY,
routing_key TEXT NOT NULL,
payload JSONB NOT NULL,
published_at TIMESTAMPTZ DEFAULT now(),
acknowledged_at TIMESTAMPTZ
);
CREATE INDEX idx_outbox_unacked ON event_outbox(acknowledged_at, published_at)
WHERE acknowledged_at IS NULL;
-- Broadcast Trigger
CREATE OR REPLACE FUNCTION broadcast_outbox_event()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify(
NEW.routing_key,
json_build_object(
'event_id', NEW.id,
'routing_key', NEW.routing_key,
'emitted_at', NEW.published_at
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_broadcast_outbox
AFTER INSERT ON event_outbox
FOR EACH ROW
EXECUTE FUNCTION broadcast_outbox_event();
// Listener Configuration
const listenerConfig = {
connectionString: 'postgresql://app_user:password@db-host:5432/app_db?application_name=pg-event-listener',
routingKeys: ['orders.status', 'users.profile', 'payments.receipt'],
pollIntervalMs: 2000,
maxReconnectAttempts: 10
};
const subscriber = new PgEventSubscriber(listenerConfig);
subscriber.start();
Quick Start Guide
- Create the outbox table and trigger: Run the provided SQL schema against your PostgreSQL instance. Verify the trigger fires by inserting a test row and checking
pg_stat_activity for notification delivery.
- Configure direct listener connection: Set up your TypeScript/Node.js environment with the
pg driver. Instantiate the subscriber with a direct connection string, bypassing any connection poolers.
- Attach event handlers: Register listeners for
event and error events. Implement downstream processing with idempotency checks and outbox acknowledgment logic.
- Deploy and monitor: Start the listener service. Monitor
pg_stat_activity for idle listener connections. Track event_outbox acknowledgment lag to detect processing bottlenecks.
- Validate recovery: Restart the listener process mid-workload. Verify that missed events are recovered via outbox polling and that duplicates are handled idempotently.
Database-native pub/sub eliminates an entire class of infrastructure decisions for workloads under 10K events per second. By coupling a dedicated listener with an outbox table, you gain at-least-once delivery, sub-8ms latency, and zero operational overhead. Graduate to external brokers only when metrics demand it, not when architectural habits dictate it.