ormalized object and are responsible for mapping fields to their specific format requirements.
Technical Implementation
The following TypeScript implementation demonstrates the core distribution engine with retry logic, idempotency, and adapter abstraction.
1. Domain Models and Schema
// models.ts
export interface ContentItem {
id: string;
title: string;
body: string;
url: string;
metadata: Record<string, string>;
publishedAt: Date;
}
export interface DistributionTask {
taskId: string;
contentId: string;
channelId: string;
payload: ContentItem;
attempts: number;
maxRetries: number;
idempotencyKey: string;
status: 'pending' | 'processing' | 'success' | 'failed';
}
// schema-validation.ts
import { z } from 'zod';
export const ContentSchema = z.object({
id: z.string().uuid(),
title: z.string().min(1).max(280), // Constraint example
body: z.string(),
url: z.string().url(),
metadata: z.record(z.string()).optional(),
publishedAt: z.date(),
});
export type ValidatedContent = z.infer<typeof ContentSchema>;
2. Channel Adapter Interface
// adapters/channel-adapter.interface.ts
export interface ChannelAdapter {
readonly channelName: string;
distribute(content: ValidatedContent): Promise<ChannelResult>;
validateRateLimit(): Promise<boolean>;
}
export interface ChannelResult {
success: boolean;
externalId?: string;
error?: string;
rateLimitReset?: Date;
}
3. Example Adapter: Twitter/X Integration
// adapters/twitter-adapter.ts
import { ChannelAdapter, ChannelResult, ValidatedContent } from './channel-adapter.interface';
import { TwitterApi } from 'twitter-api-v2';
export class TwitterAdapter implements ChannelAdapter {
readonly channelName = 'twitter';
private client: TwitterApi;
constructor(apiKey: string, apiSecret: string) {
this.client = new TwitterApi({ appKey: apiKey, appSecret: apiSecret });
}
async distribute(content: ValidatedContent): Promise<ChannelResult> {
const tweetText = `${content.title} ${content.url}`;
try {
const response = await this.client.v2.tweet(tweetText);
return { success: true, externalId: response.data.id };
} catch (error: any) {
// Handle specific API errors
if (error.code === 429) {
return {
success: false,
error: 'Rate limit exceeded',
rateLimitReset: new Date(Date.now() + 15 * 60 * 1000), // Approx reset
};
}
return { success: false, error: error.message };
}
}
async validateRateLimit(): Promise<boolean> {
// Implement rate limit check via API or local token bucket
return true;
}
}
4. Distribution Engine with Retry and Idempotency
// engine/distribution-engine.ts
import { DistributionTask, ValidatedContent } from '../models';
import { ChannelAdapter } from '../adapters/channel-adapter.interface';
import { Redis } from 'ioredis';
import { createHash } from 'crypto';
export class DistributionEngine {
private redis: Redis;
private adapters: Map<string, ChannelAdapter>;
constructor(redisUrl: string, adapters: ChannelAdapter[]) {
this.redis = new Redis(redisUrl);
this.adapters = new Map(adapters.map(a => [a.channelName, a]));
}
async execute(task: DistributionTask, content: ValidatedContent): Promise<void> {
const adapter = this.adapters.get(task.channelId);
if (!adapter) {
throw new Error(`Unknown channel: ${task.channelId}`);
}
// Idempotency Check
const lockKey = `dist:lock:${task.idempotencyKey}`;
const isLocked = await this.redis.set(lockKey, '1', 'EX', 300, 'NX');
if (!isLocked) {
console.log(`Task ${task.taskId} already processing or completed. Skipping.`);
return;
}
try {
// Rate Limit Validation
if (!(await adapter.validateRateLimit())) {
throw new Error('Rate limit exceeded');
}
const result = await adapter.distribute(content);
if (result.success) {
await this.recordSuccess(task, result.externalId);
} else {
await this.handleFailure(task, result.error);
}
} catch (error: any) {
await this.handleFailure(task, error.message);
} finally {
await this.redis.del(lockKey);
}
}
private async handleFailure(task: DistributionTask, error: string): Promise<void> {
if (task.attempts < task.maxRetries) {
const delay = Math.pow(2, task.attempts) * 1000; // Exponential backoff
console.warn(`Task ${task.taskId} failed. Retrying in ${delay}ms. Error: ${error}`);
// Re-queue task with incremented attempts
await this.requeueTask({ ...task, attempts: task.attempts + 1 }, delay);
} else {
console.error(`Task ${task.taskId} permanently failed after ${task.attempts} attempts.`);
await this.recordPermanentFailure(task, error);
// Trigger alerting mechanism here
}
}
private generateIdempotencyKey(task: DistributionTask): string {
return createHash('sha256')
.update(`${task.contentId}:${task.channelId}`)
.digest('hex');
}
}
5. Webhook Receiver for External Triggers
For channels that support webhooks (e.g., notifying a CMS when a post is updated), implement a secure receiver:
// receivers/webhook-receiver.ts
import { Request, Response } from 'express';
import crypto from 'crypto';
export class WebhookReceiver {
private secret: string;
constructor(secret: string) {
this.secret = secret;
}
verifySignature(req: Request, res: Response, next: Function) {
const signature = req.headers['x-webhook-signature'] as string;
const payload = JSON.stringify(req.body);
const expected = crypto
.createHmac('sha256', this.secret)
.update(payload)
.digest('hex');
if (signature !== expected) {
return res.status(401).send('Invalid signature');
}
next();
}
}
Pitfall Guide
Production distribution systems fail due to predictable anti-patterns. Avoid these critical mistakes:
-
Ignoring Rate Limit Headers:
- Mistake: Treating
429 Too Many Requests as a generic error.
- Impact: Immediate IP bans or account suspension.
- Fix: Parse
Retry-After or X-RateLimit-Reset headers and dynamically adjust the worker queue delay. Implement a token bucket algorithm locally to preemptively throttle requests.
-
Missing Idempotency:
- Mistake: Retrying a failed distribution without checking if the previous attempt succeeded on the remote side.
- Impact: Duplicate posts, spamming audiences, and API errors from the provider.
- Fix: Generate a deterministic idempotency key based on content ID and channel ID. Store execution state in a persistent store before and after API calls.
-
Synchronous Distribution:
- Mistake: Calling distribution APIs directly within the request/response cycle of content creation.
- Impact: High latency for content creators, timeout errors, and cascading failures if a channel is down.
- Fix: Always use an asynchronous queue. The content API should return
202 Accepted immediately after queuing the distribution task.
-
Schema Drift:
- Mistake: Assuming content structure remains constant.
- Impact: Malformed payloads rejected by channels, silent failures.
- Fix: Enforce strict JSON Schema validation at the ingestion point. Use a schema registry to version content models and handle migrations gracefully.
-
Hardcoded Secrets:
- Mistake: Embedding API keys in code or environment variables committed to VCS.
- Impact: Security breaches, credential leakage.
- Fix: Use a secrets manager (e.g., AWS Secrets Manager, HashiCorp Vault) and inject credentials at runtime via short-lived tokens.
-
Lack of Circuit Breakers:
- Mistake: Continuing to send requests to a channel that is consistently failing.
- Impact: Wasted resources, increased latency, and potential blacklisting.
- Fix: Implement a circuit breaker pattern. If error rate exceeds a threshold (e.g., 50% over 1 minute), open the circuit and stop sending requests for a cooldown period.
-
Attribution Blindness:
- Mistake: Distributing content without unique tracking parameters per channel and campaign.
- Impact: Inability to measure channel performance, wasted marketing spend.
- Fix: Automate UTM parameter injection based on channel, content type, and campaign metadata. Ensure deep links include device-specific routing parameters.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Real-time notifications (e.g., Slack) | Direct API Call with Fallback | Low latency required; failure is acceptable for non-critical updates. | Low compute, higher risk of user impact. |
| High-volume social posting | Event Queue + Batch Processing | Handles spikes, respects rate limits, ensures idempotency. | Moderate infra cost, high reliability. |
| Enterprise integration (e.g., ERP) | Synchronous Webhook with Retry | Requires immediate acknowledgment and transactional integrity. | Higher integration complexity. |
| Content syndication to partners | Signed Webhook + Validation | Partners need push notifications; security is paramount. | Low bandwidth, high trust setup. |
| Legacy system integration | Polling Adapter | Legacy APIs may not support webhooks or events. | Higher latency, increased polling cost. |
Configuration Template
Use this template to configure distribution channels and policies:
# distribution.config.yaml
version: "1.0"
engine:
queue: "sqs:distribution-tasks"
dead_letter_queue: "sqs:distribution-dlq"
max_concurrency: 50
channels:
- id: "twitter"
type: "adapter"
adapter: "TwitterAdapter"
rate_limit:
requests_per_minute: 15
burst_size: 5
retry_policy:
max_attempts: 3
backoff_base_ms: 1000
backoff_max_ms: 30000
attribution:
utm_source: "twitter"
utm_medium: "social"
- id: "email_digest"
type: "batch"
adapter: "EmailAdapter"
batch_size: 100
schedule: "0 9 * * *" # Daily at 9 AM
retry_policy:
max_attempts: 5
backoff_base_ms: 5000
- id: "partner_webhook"
type: "webhook"
url: "https://partner.example.com/api/content"
secret_env: "PARTNER_WEBHOOK_SECRET"
retry_policy:
max_attempts: 10
backoff_base_ms: 2000
Quick Start Guide
-
Initialize Project:
mkdir content-distro && cd content-distro
npm init -y
npm install typescript ts-node ioredis express zod
npx tsc --init
-
Create Configuration:
Copy the distribution.config.yaml template to your project root and update channel credentials and URLs.
-
Run Infrastructure:
Start Redis and a message queue (e.g., local SQS emulator or RabbitMQ):
docker run -d -p 6379:6379 redis
# Add queue setup as needed
-
Deploy Worker:
Build and run the distribution worker:
npm run build
node dist/worker.js
-
Verify:
Publish a test event to the queue and monitor logs for successful distribution, idempotency checks, and metric emission. Check Redis for lock keys to confirm idempotency is active.