number;
};
}
// events/user.ts
export interface UserCreatedPayload {
userId: string;
email: string;
createdAt: string;
}
export type UserEvents = EventEnvelope<'user.created', UserCreatedPayload>;
#### 2. The Outbox Pattern Implementation
The dual-write problem (writing to database and publishing to broker) creates consistency risks. The Outbox pattern solves this by writing the event to a table within the same transaction as the business data. A separate process polls the outbox and publishes events.
```typescript
// outbox/repository.ts
import { Pool } from 'pg';
export class OutboxRepository {
constructor(private pool: Pool) {}
async saveEvent(transactionClient: any, event: any): Promise<void> {
await transactionClient.query(
`INSERT INTO event_outbox (aggregate_id, event_type, payload, created_at)
VALUES ($1, $2, $3, NOW())`,
[event.aggregateId, event.eventType, JSON.stringify(event.payload)]
);
}
async getPendingEvents(limit: number): Promise<any[]> {
const res = await this.pool.query(
`SELECT id, aggregate_id, event_type, payload
FROM event_outbox
WHERE published_at IS NULL
ORDER BY created_at ASC
LIMIT $1`,
[limit]
);
return res.rows;
}
async markPublished(ids: string[]): Promise<void> {
await this.pool.query(
`UPDATE event_outbox
SET published_at = NOW()
WHERE id = ANY($1::uuid[])`,
[ids]
);
}
}
3. Producer Service with Transactional Integrity
The producer integrates business logic with the outbox repository.
// services/user.service.ts
import { OutboxRepository } from '../outbox/repository';
import { UserCreatedPayload } from '../types';
export class UserService {
constructor(
private db: any,
private outbox: OutboxRepository,
private eventPublisher: any
) {}
async createUser(userData: any): Promise<void> {
const client = await this.db.connect();
try {
await client.query('BEGIN');
// 1. Business Logic
const res = await client.query(
'INSERT INTO users (email, name) VALUES ($1, $2) RETURNING id',
[userData.email, userData.name]
);
const userId = res.rows[0].id;
// 2. Save Event to Outbox (Same Transaction)
const event: UserCreatedPayload = {
userId,
email: userData.email,
createdAt: new Date().toISOString(),
};
await this.outbox.saveEvent(client, {
aggregateId: userId,
eventType: 'user.created',
payload: event,
});
await client.query('COMMIT');
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}
}
4. Outbox Publisher
A background worker reads the outbox and publishes to the message broker. This ensures exactly-once delivery semantics when combined with idempotent broker operations.
// workers/outbox-publisher.ts
import { OutboxRepository } from '../outbox/repository';
import { KafkaProducer } from '../infrastructure/kafka';
export class OutboxPublisher {
constructor(
private outbox: OutboxRepository,
private kafka: KafkaProducer
) {}
async run(): Promise<void> {
setInterval(async () => {
const events = await this.outbox.getPendingEvents(100);
if (events.length === 0) return;
try {
// Batch publish for throughput
await this.kafka.publishBatch(events);
// Mark as published only after successful broker ack
await this.outbox.markPublished(events.map(e => e.id));
} catch (error) {
// Log error; events remain unpublished for next retry
console.error('Outbox publish failed:', error);
}
}, 1000);
}
}
5. Idempotent Consumer
Consumers must handle duplicate messages gracefully. Idempotency is enforced via a deduplication store keyed by eventId.
// consumers/user.consumer.ts
import { KafkaConsumer } from '../infrastructure/kafka';
import { IdempotencyStore } from '../infrastructure/idempotency';
export class UserConsumer {
constructor(
private kafka: KafkaConsumer,
private idempotency: IdempotencyStore
) {}
async subscribe(): Promise<void> {
await this.kafka.subscribe('user.created', async (message: any) => {
const { eventId, payload } = message;
// Idempotency Check
if (await this.idempotency.isProcessed(eventId)) {
return; // Duplicate, skip processing
}
try {
// Business Logic
await this.processUserCreated(payload);
// Mark as processed
await this.idempotency.markProcessed(eventId);
// Commit offset
this.kafka.commitOffset(message);
} catch (error) {
// Do not commit offset; message will be redelivered
console.error('Consumer processing failed:', error);
throw error;
}
});
}
private async processUserCreated(payload: any): Promise<void> {
// Send welcome email, provision resources, etc.
console.log(`Processing user created: ${payload.userId}`);
}
}
Architecture Decisions
- Outbox over Local Transactions: Direct database-to-broker transactions are impossible. The Outbox pattern provides transactional integrity with minimal overhead compared to two-phase commit.
- Batch Publishing: The outbox publisher batches events to reduce broker round-trips, significantly improving throughput.
- Idempotency Store: A Redis-backed store with TTL is recommended for idempotency checks, offering low latency and automatic expiration.
- Schema Registry: Enforce schema validation at the producer level to prevent breaking changes. Use versioned event types.
Pitfall Guide
- Treating Events as RPC Calls: Events represent facts about state changes (
UserCreated), not commands (CreateUser). Using events to trigger synchronous actions in consumers recreates coupling and latency issues. Events should inform consumers of state changes so they can react independently.
- Ignoring Idempotency: Message brokers guarantee at-least-once delivery by default. Network blips or consumer crashes cause redelivery. Without idempotency, duplicate processing leads to data corruption, such as double-charging users or duplicate records.
- Schema Evolution Neglect: Adding or removing fields in events breaks consumers. Implement a schema registry and versioning strategy. Use backward-compatible changes (adding optional fields) or maintain multiple schema versions during migration windows.
- Dead Letter Queue (DLQ) Blindness: Poison messages that crash consumers can stall partitions if not handled. Implement DLQ routing for messages exceeding retry limits. Monitor DLQ depth as a critical health metric; a growing DLQ indicates systemic issues.
- Ordering Assumptions: Events for different aggregates are unordered. Assuming global ordering limits scalability. Design consumers to handle out-of-order events or use partition keys to enforce ordering only for related aggregates.
- Synchronous Consumer Processing: Blocking the consumer thread with long-running tasks reduces throughput and causes rebalancing storms. Offload heavy processing to worker pools or use async I/O. Keep the consumer loop responsive.
- Missing Correlation IDs: Tracing across asynchronous boundaries is difficult without correlation IDs. Propagate
correlationId and causationId in event metadata to reconstruct distributed traces in tools like Jaeger or Datadog.
Best Practices:
- Use partition keys to ensure ordering for specific aggregates.
- Implement exponential backoff with jitter for retry policies.
- Design consumers to be stateless where possible; store state in external databases.
- Regularly audit event schemas for drift.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| High Throughput, Global Scale | Apache Kafka | Partitioning, replayability, ecosystem maturity | High (Infrastructure/OpEx) |
| Low Latency, Simple Routing | RabbitMQ | AMQP flexibility, low latency, easy setup | Medium |
| Serverless, Eventual Consistency | AWS SNS/SQS | Managed, scales to zero, integrates with Lambda | Low-Medium (Pay-per-use) |
| In-Memory, Development/Small Apps | Redis Streams | Low overhead, familiar tech, fast prototyping | Low |
| Strong Ordering Required | Kafka with Partition Keys | Guarantees order per partition | Medium |
Configuration Template
kafkajs Producer/Consumer configuration with retry logic and error handling.
// infrastructure/kafka.config.ts
import { Kafka, logLevel } from 'kafkajs';
export const kafka = new Kafka({
clientId: 'backend-service',
brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
logLevel: logLevel.WARN,
});
export const producerConfig = {
createPartitioner: Kafka.LegacyPartitioner,
retry: {
retries: 5,
initialRetryTime: 100,
maxRetryTime: 5000,
},
};
export const consumerConfig = (groupId: string) => ({
groupId,
sessionTimeout: 30000,
rebalanceTimeout: 60000,
heartbeatInterval: 10000,
maxBytesPerPartition: 1048576,
retry: {
retries: 3,
initialRetryTime: 500,
maxRetryTime: 5000,
},
});
Quick Start Guide
- Start Broker: Run a local Kafka instance using Docker Compose or a managed service.
docker run -d --name kafka -p 9092:9092 apache/kafka:latest
- Initialize Project: Create a TypeScript project with
kafkajs and pg dependencies.
npm init -y && npm install kafkajs pg typescript
- Create Outbox Table: Execute SQL to create the
event_outbox table with published_at column.
CREATE TABLE event_outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
published_at TIMESTAMP
);
CREATE INDEX idx_outbox_unpublished ON event_outbox (published_at) WHERE published_at IS NULL;
- Run Producer/Consumer: Implement the code snippets from the Core Solution. Start the Outbox Publisher and Consumer. Trigger a user creation and verify the event appears in the consumer logs.
// main.ts
import { kafka, producerConfig, consumerConfig } from './infrastructure/kafka.config';
import { OutboxPublisher } from './workers/outbox-publisher';
import { UserConsumer } from './consumers/user.consumer';
async function bootstrap() {
const producer = kafka.producer(producerConfig);
await producer.connect();
const consumer = kafka.consumer(consumerConfig('user-service-group'));
await consumer.connect();
const outboxPublisher = new OutboxPublisher(outboxRepo, producer);
const userConsumer = new UserConsumer(consumer, idempotencyStore);
await outboxPublisher.run();
await userConsumer.subscribe();
console.log('Event-driven backend running.');
}
bootstrap().catch(console.error);