ring): Record<string, unknown> {
try {
const parsed = JSON.parse(raw);
const requiredFields = ['id', 'line_items', 'total_price'];
for (const field of requiredFields) {
if (parsed[field] === undefined || parsed[field] === null) {
throw new MappingFault(
Critical field missing in payload: ${field},
'HARVEST',
field
);
}
}
return parsed;
} catch (err) {
if (err instanceof SyntaxError) {
throw new MappingFault('Invalid JSON payload', 'HARVEST', 'body');
}
throw err;
}
}
private standardize(payload: Record<string, unknown>): Record<string, unknown> {
return {
...payload,
email: (payload.email as string)?.toLowerCase().trim() ?? null,
total_price: this.sanitizePrice(payload.total_price),
created_at: new Date(payload.created_at as string).toISOString(),
country_code: (payload.shipping_address as any)?.country_code?.toUpperCase() ?? null,
};
}
private sanitizePrice(value: unknown): string {
if (typeof value === 'number') return value.toFixed(2);
if (typeof value === 'string') {
const num = parseFloat(value);
if (isNaN(num)) throw new MappingFault('Invalid price format', 'STANDARDIZE', 'total_price');
return num.toFixed(2);
}
throw new MappingFault('Price field type mismatch', 'STANDARDIZE', 'total_price');
}
private translate(payload: Record<string, unknown>, context: ShopContext): Record<string, unknown> {
// Delegate to specific mappers based on entity type
// Implementation details in subsequent sections
return payload;
}
private audit(payload: Record<string, unknown>): ValidatedPayload {
// Delegate to schema validator
// Returns payload wrapped with metadata
return { data: payload, metadata: { stage: 'AUDIT', shopId: 'shop-123' } };
}
}
**Rationale:** Separating delivery from transformation is critical. If the target system is down, the mapping logic should not be blamed. By returning a validated payload, the pipeline allows the dispatcher to handle retries and backpressure independently.
#### 2. Typed Error Taxonomy
Generic exceptions obscure the source of failures. A robust mapping layer uses a custom error class that carries structured context. This enables automated alerting and precise log filtering.
```typescript
type MappingStage = 'HARVEST' | 'STANDARDIZE' | 'TRANSLATE' | 'AUDIT' | 'DISPATCH';
class MappingFault extends Error {
constructor(
message: string,
public stage: MappingStage,
public field: string
) {
super(message);
this.name = 'MappingFault';
}
}
Rationale: When an alert fires at 3 AM, the on-call engineer needs the stage and field immediately. This structure allows dashboards to aggregate errors by stage, revealing systemic issues (e.g., a spike in TRANSLATE errors indicates a mapping rule change is needed, while HARVEST errors suggest a payload format change).
3. Generic Identifier Registry
Cross-system ID resolution is a common failure point. Creating a dedicated table for every integration pair leads to schema sprawl and inconsistent validation. A single, generic registry table simplifies gap detection and bulk auditing.
CREATE TABLE id_registry (
registry_id SERIAL PRIMARY KEY,
tenant_id VARCHAR(255) NOT NULL,
entity_type VARCHAR(100) NOT NULL,
source_system VARCHAR(100) NOT NULL,
source_ref VARCHAR(255) NOT NULL,
target_system VARCHAR(100) NOT NULL,
target_ref VARCHAR(255) NOT NULL,
is_active BOOLEAN DEFAULT TRUE,
last_verified TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (tenant_id, entity_type, source_system, source_ref, target_system)
);
CREATE INDEX idx_registry_lookup
ON id_registry (tenant_id, entity_type, source_system, source_ref, target_system)
WHERE is_active = TRUE;
Rationale: The unique constraint prevents duplicate mappings. The partial index on is_active optimizes lookups for current records. This design supports multi-tenant scenarios and allows operations teams to run queries like "Find all unmapped SKUs for tenant X" against a single table.
4. Cached Resolver with Miss Alerting
Identifier resolution should be cached to reduce database load, but stale caches can cause data corruption. The resolver must implement a short TTL and alert on cache misses to detect registry gaps.
async function resolveTargetId(
tenant: string,
entity: string,
srcSys: string,
srcRef: string,
tgtSys: string
): Promise<string | null> {
const cacheKey = `reg:${tenant}:${entity}:${srcSys}:${srcRef}:${tgtSys}`;
// Check cache
const cached = await redisClient.get(cacheKey);
if (cached) return cached;
// Database lookup
const result = await db.query(
`SELECT target_ref FROM id_registry
WHERE tenant_id = $1 AND entity_type = $2
AND source_system = $3 AND source_ref = $4
AND target_system = $5 AND is_active = TRUE`,
[tenant, entity, srcSys, srcRef, tgtSys]
);
if (result.rows.length === 0) {
// Alert on missing mapping
await alertingService.send({
type: 'MAPPING_GAP',
details: { tenant, entity, srcSys, srcRef, tgtSys }
});
return null;
}
const targetRef = result.rows[0].target_ref;
// Cache with 1-hour TTL
await redisClient.set(cacheKey, targetRef, { EX: 3600 });
return targetRef;
}
Rationale: A one-hour TTL balances performance with freshness. Invalidating the cache on registry updates is essential. The miss alerting mechanism ensures that unmapped references are detected immediately, rather than causing silent failures downstream.
5. Per-Field Null Policies
Null handling must be granular. Applying a global null strategy fails when dealing with polymorphic data (e.g., digital products vs. physical goods). Define policies per field to enforce business rules accurately.
const NULL_POLICIES = {
strict: (value: unknown, fieldName: string) => {
if (value === null || value === undefined) {
throw new MappingFault(`Required field is null: ${fieldName}`, 'STANDARDIZE', fieldName);
}
return value;
},
fallback: (value: unknown, defaultValue: unknown) => value ?? defaultValue,
nullable: (value: unknown) => value ?? null,
conditional: (payload: Record<string, unknown>, path: string, defaultValue: unknown = null) => {
const parts = path.split('.');
let current: unknown = payload;
for (const part of parts) {
if (current === null || current === undefined) return defaultValue;
current = (current as any)[part];
}
return current ?? defaultValue;
}
};
// Usage
const city = NULL_POLICIES.conditional(payload, 'shipping_address.city', 'N/A');
const price = NULL_POLICIES.strict(payload.total_price, 'total_price');
Rationale: The conditional policy safely traverses nested objects without throwing on intermediate nulls. The strict policy enforces critical data requirements. This approach prevents the "digital product" edge case where a missing shipping address would otherwise crash a global null check.
6. Schema Version Registry
Shopify API versions introduce breaking changes. Hardcoding field accessors leads to widespread breakage. A version registry centralizes schema knowledge and isolates changes.
const API_SCHEMA_REGISTRY = {
'2023-07': {
marketing: { field: 'accepts_marketing', type: 'boolean' }
},
'2024-01': {
marketing: { field: 'email_marketing_consent', type: 'object' }
}
};
function extractMarketingConsent(order: Record<string, unknown>, version: string): string {
const schema = API_SCHEMA_REGISTRY[version] ?? API_SCHEMA_REGISTRY['2023-07'];
const field = schema.marketing.field;
if (schema.marketing.type === 'boolean') {
return order[field] ? 'subscribed' : 'unsubscribed';
}
if (schema.marketing.type === 'object') {
return (order[field] as any)?.state ?? 'unsubscribed';
}
return 'unsubscribed';
}
Rationale: When Shopify deprecates accepts_marketing, only the registry and the accessor function need updates. This prevents a codebase-wide search-and-replace and reduces the risk of missing edge cases.
7. O(1) Metafield Resolution
Metafields are often stored as arrays. Iterating through this array for every lookup results in O(n) complexity, which degrades performance at scale. Indexing metafields on construction enables constant-time resolution.
interface Metafield {
namespace: string;
key: string;
value: string;
type: string;
}
class MetafieldIndex {
private lookup: Map<string, Metafield>;
constructor(metafields: Metafield[]) {
this.lookup = new Map(
metafields.map(mf => [`${mf.namespace}.${mf.key}`, mf])
);
}
resolve(namespace: string, key: string, defaultValue: unknown = null): unknown {
const mf = this.lookup.get(`${namespace}.${key}`);
if (!mf) return defaultValue;
switch (mf.type) {
case 'integer': return parseInt(mf.value, 10);
case 'number_decimal': return parseFloat(mf.value);
case 'boolean': return mf.value === 'true';
case 'json': return JSON.parse(mf.value);
default: return mf.value;
}
}
}
Rationale: Indexing on construction shifts the cost to initialization, making lookups O(1). For products with dozens of metafields processed in high-volume batches, this optimization is measurable and prevents CPU bottlenecks.
Pitfall Guide
1. The Global Null Assumption
- Explanation: Applying a single null-handling rule across all fields. This fails when data structures vary by product type or order state.
- Fix: Implement per-field null policies. Use conditional traversal for nested fields and strict checks for critical data.
2. Monolithic Transformation Functions
- Explanation: Embedding extraction, normalization, and mapping logic in a single function. This obscures failure points and makes testing difficult.
- Fix: Adopt a pipeline architecture with distinct stages. Each stage should have a single responsibility and independent error handling.
3. Siloed Identifier Tables
- Explanation: Creating separate tables for each integration pair (e.g.,
shopify_erp_customers, shopify_erp_products). This complicates gap detection and audit processes.
- Fix: Use a generic identifier registry with a unified schema. This simplifies queries for unmapped references and supports multi-tenant isolation.
4. Unversioned Accessors
- Explanation: Directly accessing fields like
order.accepts_marketing without version awareness. This causes breakage when APIs evolve.
- Fix: Maintain a schema version registry. Access fields through version-aware functions that abstract underlying changes.
5. Linear Metafield Scans
- Explanation: Iterating through the metafield array for every lookup. This results in O(n) complexity and performance degradation.
- Fix: Index metafields on construction using a Map. This enables O(1) lookups regardless of metafield count.
6. Cache Staleness
- Explanation: Caching identifier resolutions indefinitely or without invalidation. This leads to stale data and routing errors.
- Fix: Implement short TTLs (e.g., 1 hour) and invalidate cache entries on registry updates. Monitor cache miss rates to detect gaps.
7. Ignoring Type Coercion Risks
- Explanation: Blindly casting strings to numbers without validation. This can truncate prices or introduce NaN values.
- Fix: Validate types before coercion. Use safe parsing functions that throw descriptive errors on invalid formats.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Simple Field Rename | Direct Mapping | Low complexity; minimal risk. | Low |
| SKU to ERP Code | Lookup / Cross-Reference | Dynamic resolution required; handles gaps. | Medium |
| Price Formatting | Computed Field | Logic-based transformation; ensures precision. | Low |
| Address Splitting | Split Mapping | Structural expansion; requires validation. | Low |
| Enum Remapping | Value Transformation | Prevents routing errors; needs exhaustive mapping. | Medium |
| Aggregated Metrics | Aggregation Mapping | Compresses data; requires loss analysis. | High |
Configuration Template
// pipeline.config.ts
export const MAPPING_CONFIG = {
stages: {
harvest: { requiredFields: ['id', 'line_items', 'total_price'] },
standardize: {
dateFormats: ['ISO8601'],
pricePrecision: 2,
},
translate: {
nullPolicies: {
'shipping_address.city': 'conditional',
'total_price': 'strict',
},
versionRegistry: '2024-01',
},
audit: {
driftThreshold: 0.01, // 1% warning rate
schemaPath: './schemas/order.schema.json',
},
},
registry: {
cacheTTL: 3600, // 1 hour
missAlerting: true,
},
};
Quick Start Guide
- Define Schema: Create a JSON schema for expected payloads, including required fields and types.
- Initialize Pipeline: Instantiate
DataIngestionPipeline with the schema and configuration.
- Add Error Handler: Wrap pipeline execution in a try-catch block that logs
MappingFault details and triggers alerts.
- Deploy Monitor: Enable runtime drift detection to alert on schema changes or unexpected values.
- Validate: Run integration tests using production-derived fixtures to verify mapping accuracy.