gency: this.classifyUrgency(raw['propertyName'] as string),
confidence: 0.95,
firedAt: new Date(timestamp * 1000).toISOString(),
receivedAt: new Date().toISOString(),
version: 1,
};
}
validate(raw: unknown): boolean {
const obj = raw as Record<string, unknown>;
return !!(obj['objectId'] && obj['propertyName'] && obj['occurredAt']);
}
private classifyUrgency(property: string): GtmEvent['urgency'] {
const highImpact = new Set(['dealstage', 'closedate', 'hs_deal_stage_probability']);
return highImpact.has(property) ? 'high' : 'standard';
}
}
**Rationale:** TypeScript interfaces enforce schema contracts at compile time, preventing silent payload drift. Versioning the canonical event allows backward-compatible schema evolution without breaking downstream consumers. Adapters stay stateless and focused solely on translation, making them trivial to unit test and replace.
### Layer 2: Event Backbone & Consumer Isolation
The message broker sits between every layer. Producers publish normalized events to typed topics. Consumers subscribe, process, and acknowledge. The broker handles delivery guarantees, retry logic, and dead-letter routing. No layer holds direct references to another.
```typescript
import { SQSClient, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs';
export class EventBus {
private client: SQSClient;
private queueUrl: string;
constructor(region: string, queueUrl: string) {
this.client = new SQSClient({ region });
this.queueUrl = queueUrl;
}
async publish(event: GtmEvent, priority: string = 'standard'): Promise<void> {
const message = {
id: event.eventId,
topic: `gtm.${event.eventType}`,
priority,
payload: event,
publishedAt: new Date().toISOString(),
};
await this.client.send(
new SendMessageCommand({
QueueUrl: this.queueUrl,
MessageBody: JSON.stringify(message),
MessageGroupId: event.accountId, // Ensures ordering per account
})
);
}
async consume(topic: string, handler: (event: GtmEvent) => Promise<void>, batchSize: number = 5): Promise<void> {
while (true) {
const response = await this.client.send(
new ReceiveMessageCommand({
QueueUrl: this.queueUrl,
MaxNumberOfMessages: batchSize,
WaitTimeSeconds: 10, // Long polling
})
);
if (!response.Messages) continue;
for (const msg of response.Messages) {
try {
const parsed = JSON.parse(msg.Body!);
if (parsed.topic === topic) {
await handler(parsed.payload);
await this.client.send(new DeleteMessageCommand({ QueueUrl: this.queueUrl, ReceiptHandle: msg.ReceiptHandle! }));
}
} catch (error) {
// Message remains visible for retry; DLQ handles permanent failures
console.error(`Consumer failed for ${msg.MessageId}:`, error);
}
}
}
}
}
Rationale: SQS provides at-least-once delivery, visibility timeouts, and native dead-letter queue (DLQ) support. Grouping by accountId preserves ordering for account-specific workflows while allowing parallel processing across accounts. Long polling reduces empty response costs. The consumer loop isolates failures: a single malformed event doesn't crash the worker, and visibility timeouts allow automatic retries before routing to the DLQ.
Layer 3: Idempotent Context Enrichment
Enrichment pulls raw events, attaches external context, and republishes enriched events to a new topic. It never triggers actions or makes routing decisions. The layer must be idempotent: running the same event twice yields identical output, which is critical when external APIs timeout or retry.
export class ContextEnricher {
constructor(
private crmClient: CrmApiClient,
private intentProvider: IntentDataProvider,
private eventBus: EventBus
) {}
async process(rawEvent: GtmEvent): Promise<void> {
const enriched = { ...rawEvent };
if (rawEvent.dealId) {
const deal = await this.crmClient.fetchDeal(rawEvent.dealId);
enriched.payload['dealContext'] = {
stage: deal.stage,
daysInStage: deal.daysInStage,
lastActivityDaysAgo: deal.lastActivityDaysAgo,
closeDate: deal.closeDate,
openTasks: deal.tasks.map(t => t.description),
competitors: deal.competitors,
};
}
if (rawEvent.accountId) {
const account = await this.crmClient.fetchAccount(rawEvent.accountId);
const intent = await this.intentProvider.getRecentSignals(rawEvent.accountId, 30);
enriched.payload['accountContext'] = {
employeeCount: account.employeeCount,
industry: account.industry,
recentIntent: intent,
};
}
enriched.urgency = this.recalculateUrgency(enriched);
await this.eventBus.publish(enriched, 'enriched');
}
private recalculateUrgency(event: GtmEvent): GtmEvent['urgency'] {
const ctx = event.payload['dealContext'] as any;
if (!ctx) return event.urgency;
const daysToClose = this.daysUntil(ctx.closeDate);
const idleDays = ctx.lastActivityDaysAgo ?? 0;
if (daysToClose && daysToClose < 21 && idleDays > 10) {
return 'immediate';
}
return event.urgency;
}
private daysUntil(dateStr?: string): number | null {
if (!dateStr) return null;
const diff = new Date(dateStr).getTime() - Date.now();
return Math.ceil(diff / (1000 * 60 * 60 * 24));
}
}
Rationale: Enrichment stays stateless and focused on data assembly. External API calls are wrapped with timeouts and circuit breakers in production. Idempotency is guaranteed by using deterministic enrichment logic and avoiding side effects. Republishing to a separate topic (gtm.enriched) allows the decision layer to consume only context-ready events, preventing race conditions.
Layer 4: Rules-First Decision Routing
The decision layer consumes enriched events and produces action plans. It uses a two-pass strategy: deterministic rules handle 80β90% of cases instantly and cheaply. Ambiguous or complex scenarios fall back to an LLM router. The layer never executes actions; it publishes decisions to the queue.
export interface ActionPlan {
actionType: string;
priority: string;
targetId: string;
parameters: Record<string, unknown>;
reasoning?: string;
}
export class DecisionRouter {
constructor(
private rulesEngine: RulesEngine,
private llmRouter: LlmDecisionClient
) {}
async route(enrichedEvent: GtmEvent): Promise<ActionPlan> {
const ruleResult = this.rulesEngine.evaluate(enrichedEvent);
if (ruleResult.conclusive) {
return ruleResult.plan;
}
return this.llmRouter.generatePlan(enrichedEvent);
}
}
export class RulesEngine {
evaluate(event: GtmEvent): { conclusive: boolean; plan: ActionPlan } {
const deal = event.payload['dealContext'] as any;
if (!deal) return { conclusive: false, plan: { actionType: 'none', priority: 'low', targetId: event.accountId, parameters: {} } };
const idleDays = deal.lastActivityDaysAgo ?? 0;
const daysToClose = this.daysUntil(deal.closeDate);
const competitors = deal.competitors ?? [];
if (idleDays > 10 && daysToClose && daysToClose < 21) {
return {
conclusive: true,
plan: {
actionType: 'draft_followup',
priority: 'immediate',
targetId: event.contactId ?? event.accountId,
parameters: { template: 'stalled_deal_reactivation', context: deal },
},
};
}
if (competitors.length > 0 && deal.stage === 'negotiation') {
return {
conclusive: true,
plan: {
actionType: 'notify_ae',
priority: 'high',
targetId: deal.ownerId,
parameters: { competitors, stage: deal.stage, alert: 'competitive_threat' },
},
};
}
return { conclusive: false, plan: { actionType: 'none', priority: 'low', targetId: event.accountId, parameters: {} } };
}
private daysUntil(dateStr?: string): number | null {
if (!dateStr) return null;
return Math.ceil((new Date(dateStr).getTime() - Date.now()) / (1000 * 60 * 60 * 24));
}
}
Rationale: Rules execute in milliseconds, cost nothing, and provide deterministic audit trails. LLM fallback handles edge cases where business logic is too nuanced for static conditions (e.g., interpreting call transcript sentiment combined with deal velocity). The separation prevents LLM costs from scaling linearly with event volume. Decision outputs remain structured, enabling the execution layer to route actions without parsing natural language.
Layer 5: Idempotent Action Execution
The execution layer consumes action plans and dispatches them to external systems (email platforms, CRM APIs, Slack, task managers). Each action includes an idempotency key derived from the event ID and action type. Retries are safe. Failures route to the DLQ with full context.
export class ActionExecutor {
constructor(
private emailService: EmailProvider,
private crmClient: CrmApiClient,
private slackClient: SlackNotifier
) {}
async execute(plan: ActionPlan, eventId: string): Promise<void> {
const idempotencyKey = `${eventId}:${plan.actionType}`;
switch (plan.actionType) {
case 'draft_followup':
await this.emailService.sendDraft(plan.targetId, plan.parameters, { idempotencyKey });
break;
case 'notify_ae':
await this.slackClient.postMessage(plan.targetId, plan.parameters, { idempotencyKey });
break;
case 'update_crm_field':
await this.crmClient.updateField(plan.targetId, plan.parameters, { idempotencyKey });
break;
default:
throw new Error(`Unknown action type: ${plan.actionType}`);
}
}
}
Rationale: Execution stays isolated from decision logic. Idempotency keys prevent duplicate outreach when retries occur. External API calls use exponential backoff and circuit breakers. The layer logs every execution attempt with correlation IDs, enabling full audit trails for compliance and debugging.
Pitfall Guide
1. Schema Drift at the Ingestion Boundary
Explanation: Source systems change webhook formats, rename properties, or drop fields. If adapters don't validate or version payloads, downstream layers receive malformed data, causing silent failures or incorrect enrichment.
Fix: Implement strict JSON schema validation at the adapter boundary. Version the canonical event schema (version: 1). Route unparseable payloads to a dedicated schema_violations topic for monitoring and adapter updates.
2. Synchronous Enrichment Blocking the Pipeline
Explanation: Calling external APIs directly within the ingestion or decision layer creates blocking I/O. If an enrichment vendor times out, the entire consumer thread stalls, backing up the queue and delaying all downstream actions.
Fix: Run enrichment in isolated workers with configurable timeouts. Cache frequent lookups (account firmographics, deal stages) with short TTLs. Use async republishing so the decision layer never waits for enrichment to complete.
3. LLM as Primary Decision Maker
Explanation: Routing every event through an LLM increases latency, costs, and unpredictability. LLMs struggle with deterministic business rules and can hallucinate action parameters.
Fix: Enforce a rules-first architecture. Use LLMs only for ambiguous cases where rules return conclusive: false. Set confidence thresholds and fallback rules. Log LLM decisions separately for audit and prompt refinement.
4. Missing Correlation IDs
Explanation: Without tracing identifiers, debugging async pipelines requires manual log searching across multiple services. Engineers cannot reconstruct the lifecycle of a single event.
Fix: Generate a correlationId at ingestion and propagate it through every publish/subscribe cycle. Include it in all logs, metrics, and DLQ messages. Use OpenTelemetry or similar tracing frameworks to visualize event flow.
5. Ignoring Dead-Letter Queues
Explanation: Messages that fail repeatedly accumulate in the main queue, blocking healthy consumers and masking systemic failures. Without DLQ routing, failed events disappear into retry loops.
Fix: Configure visibility timeouts and maximum receive counts. Route exhausted messages to a DLQ with full payload and error context. Set up alerts on DLQ depth and implement replay jobs for fixed adapters.
6. Non-Idempotent Action Execution
Explanation: Retrying a failed email send or CRM update without idempotency controls creates duplicate outreach, inflated metrics, and customer frustration.
Fix: Generate deterministic idempotency keys (eventId:actionType). Store executed keys in a fast lookup store (Redis/DynamoDB). Skip execution if the key exists. Log skipped duplicates for monitoring.
7. Over-Enriching Before Decision
Explanation: Pulling excessive context (full call transcripts, historical intent, all open tasks) increases latency and API costs. The decision layer only needs specific fields to route actions.
Fix: Define minimal enrichment contracts. Request only fields required by rules/LLM prompts. Use field-level filtering in CRM API calls. Cache aggregated context rather than raw payloads.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| High-volume behavioral tracking (10k+ events/hr) | Rules-only routing + batch enrichment | LLM latency/cost scales poorly; rules handle pattern matching efficiently | Low (compute + queue only) |
| Low-volume high-value CRM updates (deal stage changes) | Rules + LLM fallback for edge cases | Requires nuanced context; LLM handles ambiguity without bloating rule sets | Medium (LLM calls on ~10-15% of events) |
| Ambiguous intent signals (partial transcripts, weak firmographics) | LLM-first with confidence threshold | Rules lack sufficient signal; LLM synthesizes context into actionable plans | High (LLM per event, but offset by higher conversion) |
| Compliance-heavy regions (GDPR, CCPA) | Rules-only + explicit consent checks | LLMs cannot guarantee deterministic compliance; rules enforce audit trails | Low (audit logging + rule evaluation) |
Configuration Template
// pipeline.config.ts
export const PipelineConfig = {
broker: {
region: process.env.AWS_REGION || 'us-east-1',
queueUrl: process.env.SQS_QUEUE_URL || '',
dlqUrl: process.env.SQS_DLQ_URL || '',
visibilityTimeoutSeconds: 30,
maxReceiveCount: 3,
waitTimeSeconds: 10,
},
enrichment: {
timeoutMs: 5000,
retryAttempts: 2,
cacheTtlSeconds: 300,
allowedContextFields: ['stage', 'daysInStage', 'closeDate', 'competitors', 'employeeCount', 'industry'],
},
decision: {
rulesEngine: {
enabled: true,
timeoutMs: 100,
},
llmFallback: {
enabled: true,
model: 'gpt-4o-mini',
maxTokens: 256,
temperature: 0.2,
confidenceThreshold: 0.75,
maxDailyCalls: 5000,
},
},
execution: {
idempotencyStore: 'redis',
retryPolicy: {
maxAttempts: 3,
baseDelayMs: 1000,
maxDelayMs: 10000,
},
circuitBreaker: {
failureThreshold: 5,
resetTimeoutMs: 30000,
},
},
observability: {
correlationIdHeader: 'X-Correlation-Id',
metricsPrefix: 'gtm.pipeline',
dlqAlertThreshold: 50,
schemaViolationAlertThreshold: 10,
},
};
Quick Start Guide
- Provision the message backbone: Create an SQS queue with a dead-letter queue attached. Configure visibility timeout (30s), maximum receives (3), and long polling (10s). Set environment variables for queue URLs.
- Deploy the ingestion adapter: Run the HubSpot adapter (or your source adapter) as a lightweight HTTP service. Validate incoming webhooks, normalize to
GtmEvent, and publish to the broker. Test with a mock webhook payload.
- Launch the enrichment worker: Start the
ContextEnricher consumer. Configure CRM and intent API credentials. Verify it republishes enriched events to the gtm.enriched topic. Monitor cache hit rates and timeout logs.
- Activate the decision router: Deploy the
DecisionRouter with rules engine enabled. Add a mock LLM fallback client for testing. Send an enriched event and verify action plans are published with correct priority and parameters.
- Connect the execution layer: Start the
ActionExecutor consumer. Configure email, Slack, and CRM clients. Verify idempotency keys prevent duplicate sends. Trigger a test event and confirm end-to-end flow completes within 2β5 seconds.