-shopify-hmac-sha256']);
const enrichedEvent = {
eventId: crypto.randomUUID(),
topic: headers['x-shopify-topic'],
shopDomain: headers['x-shopify-shop-domain'],
timestamp: new Date().toISOString(),
rawPayload: payload,
idempotencyKey: `${headers['x-shopify-topic']}:${(payload as any).id}`
};
await this.router.enqueue(enrichedEvent);
}
private verifySignature(payload: unknown, hmac: string): void {
const digest = createHmac('sha256', process.env.SHOPIFY_WEBHOOK_SECRET)
.update(JSON.stringify(payload))
.digest('base64');
if (hmac !== digest) throw new Error('Invalid webhook signature');
}
}
**Rationale:** Signature verification prevents spoofed requests. Enrichment attaches a deterministic idempotency key before the event enters the pipeline. Immediate queueing guarantees sub-200ms ACK latency regardless of downstream health.
### Step 2: The Outbox Transaction
State mutation and external notification must occur atomically. Writing to your canonical database and queuing an outbound message in separate steps creates dual-write vulnerabilities. If the queue write fails, your database is updated but the external system never receives the change.
```typescript
import { Pool } from 'pg';
import { OutboxRepository } from './outbox-repo';
export class InventoryMutator {
constructor(private readonly db: Pool, private readonly outbox: OutboxRepository) {}
async applyDelta(event: any): Promise<void> {
const client = await this.db.connect();
try {
await client.query('BEGIN');
await client.query(
`UPDATE warehouse_stock
SET quantity = $1, version = version + 1
WHERE sku = $2 AND location_code = $3 AND version = $4`,
[event.newQuantity, event.sku, event.location, event.expectedVersion]
);
await this.outbox.insert(client, {
aggregateId: event.sku,
eventType: 'inventory.updated',
payload: JSON.stringify(event),
status: 'pending',
createdAt: new Date()
});
await client.query('COMMIT');
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}
}
Rationale: Wrapping the state update and outbox insert in a single database transaction guarantees consistency. If the transaction commits, the event is guaranteed to exist in the outbox table. A background worker polls this table, eliminating the need for external message brokers during the critical path.
Step 3: Circuit-Breaker Protected Delivery
Outbound workers must isolate failures. A failing ERP should not block marketplace updates or inventory syncs. Circuit breakers monitor error rates and halt traffic to degraded dependencies, allowing them to recover.
export class CircuitGuard {
private failures: number = 0;
private lastFailureTime: number = 0;
private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED';
constructor(
private readonly threshold: number = 5,
private readonly resetTimeoutMs: number = 60000
) {}
async execute<T>(fn: () => Promise<T>): Promise<T> {
if (this.state === 'OPEN') {
if (Date.now() - this.lastFailureTime > this.resetTimeoutMs) {
this.state = 'HALF_OPEN';
} else {
throw new Error('Circuit open: dependency unavailable');
}
}
try {
const result = await fn();
if (this.state === 'HALF_OPEN') this.state = 'CLOSED';
this.failures = 0;
return result;
} catch (err) {
this.failures++;
this.lastFailureTime = Date.now();
if (this.failures >= this.threshold) this.state = 'OPEN';
throw err;
}
}
}
Rationale: The three-state model prevents cascading failures. CLOSED allows normal traffic. OPEN fails fast without waiting for timeouts. HALF_OPEN probes recovery with limited traffic. This protects your connection pool and prevents rate limit exhaustion on third-party APIs.
Step 4: Idempotent Consumer & DLQ Routing
Workers must safely handle duplicate deliveries. Shopify's retry mechanism guarantees at-least-once delivery. Your consumer must detect and discard duplicates without side effects.
import { Redis } from 'ioredis';
import { DeadLetterQueue } from './dlq-handler';
export class AsyncProcessor {
constructor(
private readonly cache: Redis,
private readonly dlq: DeadLetterQueue,
private readonly mutator: InventoryMutator
) {}
async process(event: any): Promise<void> {
const lockKey = `idem:${event.idempotencyKey}`;
const alreadySeen = await this.cache.get(lockKey);
if (alreadySeen) return; // Safe duplicate discard
try {
await this.mutator.applyDelta(event);
await this.cache.set(lockKey, '1', 'EX', 604800); // 7-day TTL
} catch (err) {
await this.dlq.push(event, err);
throw err;
}
}
}
Rationale: Redis provides O(1) deduplication checks. The 7-day TTL aligns with Shopify's maximum retry window. Failed events route to a dead letter queue instead of blocking the main pipeline. Engineers can inspect, fix root causes, and replay events deterministically.
Pitfall Guide
1. Synchronous Webhook Handling
Explanation: Performing database writes or external API calls inside the HTTP handler blocks the response. Shopify retries unacknowledged webhooks, creating duplicate processing cascades.
Fix: Decouple ingestion from processing. Validate, enrich, queue, and return 200 OK immediately. Delegate business logic to background workers.
2. Unbounded Retry Loops
Explanation: Retrying failed requests without exponential backoff or jitter creates thundering herds. Every retry amplifies load on a struggling dependency, accelerating failure.
Fix: Implement capped exponential backoff with randomized jitter. Add maximum retry limits. Route exhausted events to a DLQ for manual inspection.
3. Missing Idempotency Controls
Explanation: Assuming webhooks arrive exactly once leads to double-charges, inventory overwrites, and order duplication. Platform retries are guaranteed under network instability.
Fix: Generate deterministic idempotency keys from event metadata. Store processed keys in a fast lookup store (Redis, dedup table). Skip execution if the key exists.
4. Shared Connection Pools Across Domains
Explanation: Routing inventory updates, order pushes, and customer syncs through a single database or HTTP connection pool causes resource contention. A slow ERP query starves inventory workers.
Fix: Implement bulkheading. Allocate separate connection pools and worker threads per domain. Use queue partitioning to isolate traffic classes.
5. Silent Schema Drift
Explanation: Shopify and third-party APIs evolve. Payload structures change without breaking existing contracts immediately. Your transformer silently drops new fields or crashes on unexpected types.
Fix: Enforce strict schema validation at ingestion. Version your internal event schema. Route unparseable payloads to a DLQ with schema mismatch metadata. Monitor for field addition/removal rates.
6. Ignoring Backpressure Signals
Explanation: When consumers lag behind producers, queues grow indefinitely. Memory exhaustion follows. The system appears healthy until it OOMs.
Fix: Monitor queue depth and consumer lag. Implement producer throttling when lag exceeds thresholds. Scale consumer instances horizontally or partition queues by key.
7. Trace Context Loss in Async Boundaries
Explanation: Synchronous requests carry trace IDs naturally. Async pipelines break context propagation. Engineers cannot reconstruct event lifecycles during incidents.
Fix: Embed trace IDs in every event payload. Propagate them through queue messages, database outbox rows, and outbound API headers. Use OpenTelemetry context injection at pipeline boundaries.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Low volume, single ERP sync | Managed connector (Celigo, Pipe17) | Zero infrastructure overhead, built-in retry logic | $200-$500/mo subscription |
| Mid volume, custom business rules | Hybrid: connector + custom outbox workers | Leverages existing reliability, adds custom logic where needed | Infrastructure + partial dev cost |
| High volume, multi-channel | Custom async pipeline with DLQ & circuit breakers | Full control over throughput, isolation, and recovery | Higher dev cost, lower incident cost |
| Regulated industry (PCI/HIPAA) | Custom pipeline with append-only audit logs | Compliance requires immutable event trails and replay capability | Audit storage + compliance overhead |
Configuration Template
# docker-compose.yml for local integration pipeline
services:
webhook-ingestor:
build: ./ingestor
environment:
- REDIS_URL=redis://cache:6379
- DB_POOL_SIZE=10
- ACK_TIMEOUT_MS=150
depends_on: [cache, broker]
async-worker:
build: ./worker
environment:
- DB_POOL_SIZE=20
- CIRCUIT_BREAKER_THRESHOLD=5
- CIRCUIT_BREAKER_RESET_MS=60000
- MAX_RETRIES=3
- DLQ_ENABLED=true
depends_on: [db, cache, broker]
cache:
image: redis:7-alpine
command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru
broker:
image: rabbitmq:3-management
environment:
RABBITMQ_DEFAULT_USER: dev
RABBITMQ_DEFAULT_PASS: dev
ports: ["15672:15672"]
db:
image: postgres:15-alpine
environment:
POSTGRES_DB: integration_core
POSTGRES_USER: dev
POSTGRES_PASSWORD: dev
volumes: ["pgdata:/var/lib/postgresql/data"]
volumes:
pgdata:
Quick Start Guide
- Initialize the ingestion layer: Deploy the webhook receiver with signature verification and immediate queue publishing. Configure your platform to point webhooks to this endpoint.
- Seed the outbox table: Run the migration script to create
outbox_events with columns for aggregate_id, event_type, payload, status, and retry_count. Add an index on status and created_at.
- Launch the async worker: Start the background processor with Redis for idempotency checks and the circuit breaker configured. Point it to the outbox table polling interval (default: 500ms).
- Validate with replay: Send a test webhook. Verify the
200 OK response returns in <200ms. Check the outbox table for the pending row. Confirm the worker processes it, updates state, and marks the row completed.
- Enable observability: Attach trace IDs to all events. Configure alerts for DLQ depth >50, queue lag >30s, and outbound error rate >2%. Verify end-to-end trace reconstruction in your logging platform.