s apply to Redis Streams, RabbitMQ, or cloud-native message queues.
Step 1: Domain Event Modeling
Events must represent facts that have already occurred, not commands. Model events using a strict schema with metadata for tracing, idempotency, and versioning.
// types/domain-events.ts
export interface DomainEvent<T = unknown> {
id: string;
type: string;
version: number;
timestamp: string;
correlationId: string;
aggregateId: string;
payload: T;
}
export interface OrderPlacedPayload {
orderId: string;
userId: string;
totalAmount: number;
currency: string;
items: Array<{ sku: string; quantity: number; price: number }>;
}
Step 2: Broker Configuration & Producer Setup
Producers must handle retries, schema validation, and idempotency keys. Use a schema registry or local validation to prevent payload drift.
// producers/event-producer.ts
import { Kafka, Producer, logLevel } from 'kafkajs';
const kafka = new Kafka({
clientId: 'backend-service',
brokers: [process.env.KAFKA_BROKERS || 'localhost:9092'],
logLevel: logLevel.WARN,
});
const producer: Producer = kafka.producer({
retry: { retries: 5, initialRetryTime: 200, maxRetryTime: 5000 },
idempotent: true,
});
export async function publishEvent<T>(event: DomainEvent<T>, topic: string): Promise<void> {
await producer.connect();
try {
await producer.send({
topic,
messages: [
{
key: event.aggregateId,
value: JSON.stringify(event),
headers: {
'correlation-id': event.correlationId,
'event-version': String(event.version),
},
},
],
});
} catch (error) {
// Implement DLQ routing or circuit breaker fallback
console.error(`Failed to publish event ${event.id}:`, error);
throw error;
}
}
Step 3: Consumer Implementation with Idempotency & DLQ
Consumers must be idempotent by design. Use a deterministic idempotency key derived from event metadata. Implement dead-letter queue routing for poison messages.
// consumers/order-consumer.ts
import { Kafka, Consumer, logLevel } from 'kafkajs';
import { Redis } from 'ioredis';
const kafka = new Kafka({ clientId: 'order-processor', brokers: ['localhost:9092'] });
const consumer: Consumer = kafka.consumer({ groupId: 'order-processing-group' });
const redis = new Redis(process.env.REDIS_URL || 'redis://localhost:6379');
const DLQ_TOPIC = 'order-events.dlq';
export async function startConsumer() {
await consumer.connect();
await consumer.subscribe({ topic: 'order-events', fromBeginning: false });
await consumer.run({
eachMessage: async ({ message, partition }) => {
if (!message.value) return;
const event = JSON.parse(message.value.toString());
const idempotencyKey = `idemp:${event.id}`;
// Check idempotency
const processed = await redis.exists(idempotencyKey);
if (processed) return;
try {
await processOrderEvent(event);
// Mark as processed with TTL matching retention policy
await redis.set(idempotencyKey, '1', 'EX', 86400);
} catch (error) {
// Route to DLQ after configurable retry threshold
await consumer.send({
topic: DLQ_TOPIC,
messages: [
{
key: event.aggregateId,
value: message.value,
headers: { 'original-topic': 'order-events', 'error': String(error) },
},
],
});
console.error(`Event ${event.id} routed to DLQ:`, error);
}
},
});
}
async function processOrderEvent(event: any): Promise<void> {
// Business logic: inventory reservation, payment initiation, notification trigger
// Must be side-effect safe and idempotent
}
Step 4: Distributed Transaction Handling (Saga Pattern)
Asynchronous workflows require compensating transactions. Implement the choreography-based Saga pattern where each service publishes success or failure events.
// sagas/order-saga.ts
export async function executeOrderSaga(orderId: string): Promise<void> {
// Step 1: Reserve inventory
await publishEvent({ id: generateId(), type: 'inventory.reserved', aggregateId: orderId, payload: { orderId } }, 'inventory-events');
// Step 2: Process payment (listens to inventory.reserved)
// Step 3: Confirm shipment (listens to payment.success)
// Failure at any step triggers compensation events:
// payment.refunded, inventory.released
}
Architecture Decisions & Rationale
- Broker Choice: Kafka provides partitioned log storage, consumer groups, and exactly-once semantics at the application level. Redis Streams offer lower operational overhead for simpler use cases but lack native partitioning and long-term retention.
- Idempotency Strategy: Externalized state (Redis, DynamoDB, or PostgreSQL with upsert) ensures safe retries without duplicating side effects. TTL-based cleanup prevents state bloat.
- Schema Evolution: Events are versioned and backward-compatible. New fields are optional; removed fields are deprecated with a migration window. Breaking changes require dual-topic routing during transition.
- Partitioning Key: Using
aggregateId guarantees ordering within a single business entity while allowing parallel processing across aggregates.
- Error Handling: DLQ routing isolates poison messages. Alerting on DLQ depth triggers immediate investigation without blocking the main pipeline.
Pitfall Guide
1. Assuming Exactly-Once Delivery
Message brokers default to at-least-once semantics. Network retries, consumer rebalances, and broker failovers guarantee duplicate delivery. Implement idempotency at the consumer level, not the broker level.
2. Using Events as Synchronous RPC
Publishing an event and immediately polling for a response defeats the purpose of asynchronous decoupling. If you need a synchronous answer, use gRPC or REST. Events are for state propagation, not request-response.
3. Ignoring Schema Evolution
Tight coupling through shared TypeScript interfaces creates deployment bottlenecks. When service A changes a payload, service B breaks. Use versioned payloads, optional fields, and contract testing to isolate schema changes.
4. Missing Dead-Letter Queues & Retry Policies
Without DLQ routing, poison messages block consumer groups indefinitely. Implement exponential backoff, max retry thresholds, and automatic DLQ migration. Monitor DLQ depth as a primary health metric.
5. Lack of Distributed Tracing
Asynchronous workflows obscure request boundaries. Correlation IDs must propagate through headers, logs, and metrics. Without end-to-end tracing, debugging latency spikes or failed sagas becomes impossible.
6. Ignoring Backpressure & Consumer Lag
Producers can outpace consumers during traffic surges. Monitor lag metrics per partition. Implement auto-scaling based on lag thresholds, not CPU/memory. Over-provisioning consumers without addressing processing bottlenecks wastes resources.
7. Over-Partitioning or Under-Partitioning Topics
Partition count dictates maximum parallelism and rebalance overhead. Too few partitions cause consumer starvation; too many increase broker memory usage and rebalance latency. Start with 6-12 partitions per topic and scale based on throughput requirements.
Production Best Practices:
- Enforce idempotency keys on every consumer
- Implement circuit breakers for downstream sync calls within async handlers
- Use structured logging with correlation IDs for observability
- Run chaos tests simulating broker partitions and consumer crashes
- Maintain a schema registry or strict local validation pipeline
- Treat DLQs as production-critical alerts, not logging destinations
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Client-facing API with strict SLA | Synchronous REST/gRPC | Predictable latency, simple error handling, direct client feedback | Low infrastructure, higher compute during peaks |
| Internal workflow spanning 3+ services | Event-Driven Saga | Decouples failure domains, enables independent scaling, improves resilience | Moderate broker cost, lower compute waste |
| Real-time analytics or stream processing | Event-Driven + Stream Processor | High throughput, ordered processing per key, native windowing | Higher storage cost, optimized compute scaling |
| Bursty, unpredictable workloads | Serverless/Edge Functions | Zero idle cost, automatic scaling, pay-per-execution | Unpredictable at scale, cold start latency |
Configuration Template
# docker-compose.yml
version: '3.8'
services:
kafka:
image: confluentinc/cp-kafka:7.5.0
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
ports:
- "9092:9092"
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
redis:
image: redis:7.2-alpine
ports:
- "6379:6379"
command: redis-server --appendonly yes
// config/kafka-config.ts
import { Kafka, logLevel } from 'kafkajs';
export const kafkaConfig = {
clientId: 'production-backend',
brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
logLevel: logLevel.WARN,
retry: {
retries: 5,
initialRetryTime: 200,
maxRetryTime: 5000,
randomize: true,
},
connectionTimeout: 10000,
authenticationTimeout: 10000,
reauthenticationThreshold: 10000,
};
export const consumerConfig = {
groupId: 'order-processing-group',
sessionTimeout: 30000,
rebalanceTimeout: 60000,
heartbeatInterval: 10000,
maxBytesPerPartition: 1048576,
minBytes: 1,
maxWaitTimeInMs: 500,
isolationLevel: 'read_committed' as const,
};
Quick Start Guide
- Initialize the stack: Run
docker compose up -d to start Kafka, Zookeeper, and Redis locally. Verify connectivity with kafkajs health checks.
- Create topics: Execute
kafka-topics --create --topic order-events --partitions 6 --replication-factor 1 --bootstrap-server localhost:9092 and repeat for DLQ and compensation topics.
- Deploy consumer: Run
ts-node consumers/order-consumer.ts. Monitor console output for partition assignment and message consumption. Verify Redis idempotency keys are created.
- Publish test event: Use a producer script or
kafka-console-producer to send a valid JSON event. Confirm consumer processes it, creates idempotency key, and logs success. Inject a duplicate event to verify idempotency skip.