: string;
version: number;
timestamp: string;
metadata: Record<string, unknown>;
}
interface OrderCreated extends BaseEvent {
eventType: 'OrderCreated';
payload: {
orderId: string;
customerId: string;
items: Array<{ sku: string; quantity: number; price: number }>;
totalAmount: number;
};
}
type DomainEvent = OrderCreated | OrderItemAdded | OrderPaid | OrderCancelled;
Versioning strategy: Keep `version` as the stream position. Use `eventType` for routing. Never mutate published events. Introduce `schemaVersion` in metadata only when structural changes require projection adapters.
### Step 2: Implement Append-Only Event Store Interface
The store must enforce stream ordering, optimistic concurrency, and atomic batch writes.
```typescript
interface EventStore {
append(streamId: string, expectedVersion: number, events: DomainEvent[]): Promise<void>;
read(streamId: string, fromVersion?: number): Promise<DomainEvent[]>;
getStreamVersion(streamId: string): Promise<number>;
}
class InMemoryEventStore implements EventStore {
private streams = new Map<string, DomainEvent[]>();
async append(streamId: string, expectedVersion: number, events: DomainEvent[]): Promise<void> {
const current = this.streams.get(streamId) ?? [];
if (current.length !== expectedVersion) {
throw new ConcurrencyError(`Expected version ${expectedVersion}, got ${current.length}`);
}
const appended = events.map((e, i) => ({
...e,
version: expectedVersion + i + 1,
eventId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
}));
this.streams.set(streamId, [...current, ...appended]);
}
async read(streamId: string, fromVersion = 0): Promise<DomainEvent[]> {
return (this.streams.get(streamId) ?? []).filter(e => e.version > fromVersion);
}
async getStreamVersion(streamId: string): Promise<number> {
return (this.streams.get(streamId) ?? []).length;
}
}
Architecture decision: Use optimistic concurrency control (OCC) via stream versioning. Pessimistic locks create contention under high write throughput. OCC fails fast, enabling retry logic at the command layer without blocking.
Step 3: Build Projection Pipeline
Projections transform events into read-optimized state. They must be idempotent, deterministic, and replayable.
interface Projection<T> {
handle(event: DomainEvent, currentState: T): T;
}
class OrderProjection implements Projection<OrderState> {
handle(event: DomainEvent, state: OrderState): OrderState {
switch (event.eventType) {
case 'OrderCreated':
return {
orderId: event.payload.orderId,
status: 'CREATED',
items: event.payload.items,
totalAmount: event.payload.totalAmount,
updatedAt: event.timestamp,
};
case 'OrderItemAdded':
return {
...state,
items: [...state.items, event.payload.item],
totalAmount: state.totalAmount + event.payload.item.price * event.payload.item.quantity,
updatedAt: event.timestamp,
};
case 'OrderPaid':
return { ...state, status: 'PAID', updatedAt: event.timestamp };
case 'OrderCancelled':
return { ...state, status: 'CANCELLED', updatedAt: event.timestamp };
default:
return state;
}
}
}
Projection execution strategy: Run projections in a dedicated worker process. Use a cursor table to track last processed event ID. Implement exponential backoff on failures. Never block command handlers on projection completion.
Step 4: Introduce Snapshotting for High-Frequency Aggregates
Rehydrating aggregates with 10k+ events causes latency spikes. Snapshots capture state at intervals.
interface SnapshotStore {
save(streamId: string, version: number, state: unknown): Promise<void>;
load(streamId: string): Promise<{ version: number; state: unknown } | null>;
}
class AggregateRehydrator {
constructor(
private eventStore: EventStore,
private snapshotStore: SnapshotStore,
private projection: Projection<unknown>,
private snapshotInterval = 100
) {}
async rehydrate(streamId: string): Promise<unknown> {
const snapshot = await this.snapshotStore.load(streamId);
let state = snapshot?.state ?? this.projection.init();
const fromVersion = snapshot?.version ?? 0;
const events = await this.eventStore.read(streamId, fromVersion);
for (const event of events) {
state = this.projection.handle(event, state);
}
if (events.length >= this.snapshotInterval) {
await this.snapshotStore.save(streamId, fromVersion + events.length, state);
}
return state;
}
}
Architecture decision: Snapshots are write-optimized, not read-optimized. Store them in the same durable medium as events. Trigger snapshot creation after projection rebuilds, not on every command. This prevents snapshot drift and reduces write amplification.
Step 5: Deploy Dual-Write Strangler Migration
Route commands through a facade that writes to both legacy state and event store. Shift read paths gradually.
class MigrationFacade {
constructor(
private legacyRepo: LegacyOrderRepository,
private eventStore: EventStore,
private projection: OrderProjection
) {}
async execute(command: CreateOrderCommand): Promise<void> {
const streamId = `order-${command.orderId}`;
const version = await this.eventStore.getStreamVersion(streamId);
const event: OrderCreated = {
streamId,
eventType: 'OrderCreated',
version: version + 1,
payload: {
orderId: command.orderId,
customerId: command.customerId,
items: command.items,
totalAmount: command.items.reduce((sum, i) => sum + i.price * i.quantity, 0),
},
metadata: { source: 'migration_facade' },
eventId: '',
timestamp: '',
};
await Promise.all([
this.legacyRepo.save(command),
this.eventStore.append(streamId, version, [event]),
]);
}
}
Migration path:
- Enable dual-write.
- Run projections in parallel.
- Validate projection state against legacy state using checksum reconciliation.
- Flip read traffic to projections.
- Decommission legacy write path.
Pitfall Guide
-
Treating Events as Database Rows
Events are not CRUD operations. Inserting events into relational tables with ON CONFLICT updates breaks immutability and corrupts stream ordering. Use append-only storage with stream versioning. Validate expected version before writing.
-
Ignoring Event Schema Evolution
Adding fields to events is safe. Removing or renaming fields breaks projections. Implement schema versioning in metadata, not payloads. Use projection adapters for structural changes. Never mutate published events.
-
Missing Idempotency in Projections
Event delivery guarantees are at-least-once. Projections must produce identical state regardless of execution count. Use deterministic event ordering, idempotent database upserts, and deduplication via event ID tracking.
-
Skipping Snapshotting
Aggregates with 500+ events cause rehydration latency >200ms. Snapshots reduce replay scope. Store snapshots at fixed intervals, not on every command. Validate snapshot consistency against event stream during reconciliation.
-
Overcomplicating Sagas and Orchestration
Synchronous event chains create distributed transactions. Use choreography with compensating events. Implement timeout handlers for uncompleted workflows. Avoid long-running state machines in projections.
-
Assuming Eventual Consistency is Free
Read-your-writes guarantees require causal tracking or sticky sessions. Implement correlation IDs, versioned reads, and fallback to synchronous queries during migration. Document consistency SLAs explicitly.
-
Poor Stream Naming Conventions
Flat or inconsistent stream IDs fragment queries. Use hierarchical naming: tenant:entity:id. Reserve prefixes for system events (_system, _audit). Enforce naming via schema validation at write time.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Greenfield microservice | Pure Event Sourcing | No legacy debt; projections can be designed from day one | +22% initial dev, -40% long-term compliance |
| Legacy monolith with audit requirements | Hybrid Strangler Migration | Preserves availability while decoupling state from storage | +8% infra, -35% audit engineering |
| Regulated financial workflow | Event Sourcing + CQRS | Immutable audit trail required; read/write separation prevents query contention | +35% infra, -60% reconciliation cost |
| High-frequency IoT telemetry | Event Streaming + Materialized Views | Append-only store handles burst writes; projections aggregate for analytics | +15% storage, -50% query latency |
Configuration Template
// event-sourcing.config.ts
export const eventStoreConfig = {
connectionString: process.env.EVENT_STORE_URI ?? 'tcp://localhost:1113',
maxRetries: 3,
retryBackoffMs: 200,
concurrencyControl: 'optimistic_stream_version',
snapshotInterval: 100,
projection: {
workers: 2,
batchSize: 50,
cursorTable: 'projection_cursor',
deduplicationWindowMs: 30000,
failureStrategy: 'pause_and_alert',
},
reconciliation: {
enabled: true,
schedule: '0 */6 * * *',
checksumAlgorithm: 'sha256',
toleranceDriftPercent: 0.0,
},
migration: {
dualWriteEnabled: true,
readCutoverStrategy: 'canary_percentage',
canarySteps: [10, 25, 50, 100],
rollbackThreshold: {
projectionLagMs: 5000,
stateMismatchRate: 0.01,
},
},
};
Quick Start Guide
- Initialize Event Store: Run a local EventStoreDB or use the in-memory implementation. Configure connection strings and stream naming conventions.
- Deploy Projection Worker: Start the projection pipeline with cursor tracking. Verify idempotency by replaying a test event batch twice.
- Enable Dual-Write: Route a subset of commands through the migration facade. Validate checksum parity between legacy state and projections.
- Flip Read Traffic: Shift 10% of read queries to projections. Monitor lag and mismatch rates. Scale canary percentage after 24-hour stability.
- Decommission Legacy Write: Disable legacy persistence for migrated streams. Archive historical state tables. Activate snapshot reconciliation schedule.