oducers from consumers. This provides backpressure handling and persistence.
2. Fan-out Strategy: Implement Fan-out at Consumption. Producers emit domain events; the notification service resolves recipients and preferences. This prevents producers from knowing notification topology.
3. Deduplication: Implement idempotency at the event level to prevent duplicate sends during retries or producer re-publishes.
4. Channel Isolation: Isolate channel adapters. A failure in SMS delivery must not block Email or Push notifications.
Step-by-Step Implementation
1. Define Event Schema
Standardize the event payload to include metadata for routing, deduplication, and priority.
// events/notification-event.ts
export interface NotificationEvent {
id: string; // Unique ID for deduplication
type: 'transactional' | 'marketing' | 'alert';
priority: 'critical' | 'high' | 'normal' | 'low';
payload: Record<string, any>;
recipients: string[]; // User IDs or external identifiers
channels: ('email' | 'sms' | 'push' | 'in_app')[];
createdAt: string; // ISO timestamp
deduplicationKey?: string; // Optional key for content-based dedup
}
2. Implement the Consumer Worker
The worker consumes events, resolves user preferences, checks deduplication, and dispatches to channel adapters.
// workers/notification-worker.ts
import { Consumer, EachMessagePayload } from 'kafkajs';
import { RedisClient } from './redis-client';
import { ChannelDispatcher } from './channel-dispatcher';
import { NotificationEvent } from '../events/notification-event';
import { Logger } from './logger';
export class NotificationWorker {
constructor(
private consumer: Consumer,
private redis: RedisClient,
private dispatcher: ChannelDispatcher,
private logger: Logger
) {}
async start() {
await this.consumer.subscribe({ topic: 'notifications.incoming' });
await this.consumer.run({
eachMessage: async ({ message }: EachMessagePayload) => {
const event = JSON.parse(message.value!.toString()) as NotificationEvent;
await this.processEvent(event);
},
});
}
private async processEvent(event: NotificationEvent) {
// 1. Deduplication Check
const dedupKey = event.deduplicationKey || event.id;
const isDuplicate = await this.redis.set(
`dedup:${dedupKey}`,
'1',
{ NX: true, EX: 86400 } // 24h TTL
);
if (!isDuplicate) {
this.logger.info(`Duplicate event skipped: ${event.id}`);
return;
}
// 2. Priority Routing
// Critical events go to a separate high-priority queue or immediate dispatch
if (event.priority === 'critical') {
await this.dispatcher.dispatchImmediate(event);
} else {
// Non-critical events can be batched
await this.dispatcher.enqueueForBatching(event);
}
}
}
3. Channel Adapter with Retry and Isolation
Each channel must implement a retry policy with exponential backoff and circuit breaking.
// adapters/base-channel-adapter.ts
export abstract class BaseChannelAdapter {
protected abstract send(recipient: string, payload: any): Promise<void>;
async dispatch(recipient: string, payload: any, maxRetries = 3) {
let attempt = 0;
while (attempt < maxRetries) {
try {
await this.send(recipient, payload);
return; // Success
} catch (error: any) {
attempt++;
if (attempt >= maxRetries) {
// Move to DLQ or alerting
this.logger.error(`Failed after ${maxRetries} attempts: ${error.message}`);
throw error;
}
// Exponential backoff: 2^attempt * 100ms
const delay = Math.pow(2, attempt) * 100;
await new Promise(res => setTimeout(res, delay));
}
}
}
}
// adapters/sms-adapter.ts
export class SmsAdapter extends BaseChannelAdapter {
protected async send(recipient: string, payload: any): Promise<void> {
// Provider call with rate limit handling
// e.g., Twilio client.send(...)
}
}
4. Smart Batching Service
For low-priority events, aggregate payloads and send digests.
// services/batching-service.ts
import { CronJob } from 'cron';
export class BatchingService {
// Aggregate events in Redis or a DB table
// Triggered by a cron job every 5 minutes for low-priority
async processBatch() {
const batch = await this.redis.lrange('batch:low_priority', 0, -1);
if (batch.length === 0) return;
// Group by recipient
const grouped = this.groupByRecipient(batch);
for (const [recipient, events] of Object.entries(grouped)) {
// Create digest payload
const digest = this.createDigest(events);
await this.dispatcher.sendDigest(recipient, digest);
}
// Clear batch
await this.redis.del('batch:low_priority');
}
}
Rationale
- Redis Deduplication: Provides O(1) lookup speed and atomic operations. The 24h TTL balances memory usage with protection against late-arriving duplicates.
- Priority Separation: Critical alerts (e.g., security breaches) bypass batching to ensure minimal latency. Marketing and activity digests use batching to reduce costs.
- Circuit Breaking: Channel adapters must integrate with circuit breakers to fail fast when a provider is down, preventing thread starvation.
Pitfall Guide
1. The Synchronous Block
Mistake: Calling notification providers directly inside API controllers.
Impact: API latency spikes; thread exhaustion during provider outages.
Fix: Always use an async queue. Return 202 Accepted immediately after publishing the event.
2. Ignoring Channel-Specific Rate Limits
Mistake: Treating all channels equally and blasting messages without respecting provider limits (e.g., SMS carrier throttling).
Impact: Account suspension by providers; message drops.
Fix: Implement per-channel rate limiters using token buckets or sliding windows in Redis.
3. Lack of Idempotency
Mistake: Retrying sends without checking if the message was already delivered.
Impact: Users receive duplicate messages, causing spam complaints and trust erosion.
Fix: Enforce unique message IDs and check delivery status before retrying.
4. Schema Drift in Events
Mistake: Producers change event payloads without versioning or notifying consumers.
Impact: Notification workers crash or send malformed messages.
Fix: Implement schema registry or strict validation. Version events (event.v1, event.v2) and support multiple versions in consumers.
5. No Delivery State Tracking
Mistake: Assuming "sent" means "delivered."
Impact: Inability to debug delivery failures; poor user experience.
Fix: Store delivery state (pending, sent, delivered, failed, bounced). Ingest webhooks from providers to update status.
6. The Cascade Failure
Mistake: A failure in one channel adapter blocks the processing of other channels.
Impact: Email outage prevents Push notifications from being sent.
Fix: Isolate channel dispatching. Use Promise.allSettled or independent workers per channel.
7. Over-Batching Critical Alerts
Mistake: Putting transactional alerts into a digest batch.
Impact: Users miss time-sensitive information (e.g., password reset, payment confirmation).
Fix: Strictly separate transactional/urgent events from marketing/low-priority events. Never batch critical alerts.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Startup MVP (<10k users) | Async Queue + Simple Workers | Low complexity; solves latency; easy to maintain. | Low |
| High-Frequency Alerts | Async + Priority Queues | Ensures critical alerts bypass congestion. | Medium |
| Marketing / Digests | Async + Smart Batching | Reduces API calls and provider costs significantly. | High Savings |
| Multi-Channel Enterprise | Fan-out + Channel Isolation | Prevents cross-channel failures; allows granular scaling. | High |
| Strict Compliance | Async + Delivery Tracking | Audit trails and delivery guarantees required. | Medium |
Configuration Template
Copy this TypeScript configuration to define retry policies, batching rules, and channel mappings.
// config/notification-config.ts
export const NotificationConfig = {
broker: {
topic: 'notifications.incoming',
groupId: 'notification-service-v1',
maxRetries: 3,
retryBackoffMs: 100,
},
deduplication: {
ttlSeconds: 86400,
prefix: 'notif:dedup:',
},
channels: {
email: {
enabled: true,
rateLimit: 100, // requests per second
batchSize: 100, // for bulk email providers
},
sms: {
enabled: true,
rateLimit: 20,
maxMessageLength: 160,
},
push: {
enabled: true,
rateLimit: 500,
},
in_app: {
enabled: true,
// In-app is usually DB write, no rate limit needed
},
},
batching: {
enabled: true,
cronSchedule: '*/5 * * * *', // Every 5 minutes
maxBatchSize: 50,
channels: ['email', 'in_app'], // Only batch these channels
priorities: ['low', 'normal'], // Only batch these priorities
},
priorities: {
critical: { maxLatencyMs: 500, batchingAllowed: false },
high: { maxLatencyMs: 2000, batchingAllowed: false },
normal: { maxLatencyMs: 5000, batchingAllowed: true },
low: { maxLatencyMs: 60000, batchingAllowed: true },
},
};
Quick Start Guide
- Initialize Broker:
docker run -d --name kafka -p 9092:9092 apache/kafka:latest
- Install Dependencies:
npm install kafkajs ioredis winston
- Run the Worker:
# Start the consumer service
npm run start:worker
- Send Test Event:
# Publish a test event to the broker
curl -X POST http://localhost:3000/api/notify \
-H "Content-Type: application/json" \
-d '{"type":"alert","priority":"critical","recipients":["user_1"],"channels":["push"],"payload":{"message":"Test"}}'
- Verify:
Check worker logs for
Duplicate event skipped or Dispatched to push. Monitor Redis for dedup keys.
Scaling notification systems is an exercise in decoupling and resilience. By treating notifications as a data distribution problem rather than a simple API call, you gain control over latency, cost, and reliability. Implement the async patterns, enforce idempotency, and isolate channels to build a system that scales with your user base.