teTimeOffset Timestamp) : IDomainEvent;
public record OrderShippedEvent(
Guid OrderId,
string TrackingNumber,
DateTimeOffset Timestamp) : IDomainEvent;
### 2. Aggregate with Event Sourcing
The aggregate applies events to build state and raises new events when commands are executed.
```csharp
public class OrderAggregate
{
public Guid Id { get; private set; }
public string CustomerId { get; private set; }
public OrderStatus Status { get; private set; }
public decimal TotalAmount { get; private set; }
public List<OrderItem> Items { get; private set; } = new();
public int Version { get; private set; }
private readonly Queue<IDomainEvent> _uncommittedEvents = new();
// Command: Create Order
public static OrderAggregate Create(Guid orderId, string customerId, List<OrderItem> items)
{
var order = new OrderAggregate { Id = orderId, CustomerId = customerId, Items = items };
var evt = new OrderCreatedEvent(orderId, customerId, items, DateTimeOffset.UtcNow);
order.Apply(evt);
return order;
}
// Command: Pay Order
public void Pay(string paymentMethod, decimal amount)
{
if (Status != OrderStatus.Pending) throw new DomainException("Order not payable.");
var evt = new OrderPaidEvent(Id, paymentMethod, amount, DateTimeOffset.UtcNow);
Apply(evt);
}
// Command: Ship Order
public void Ship(string trackingNumber)
{
if (Status != OrderStatus.Paid) throw new DomainException("Order not shippable.");
var evt = new OrderShippedEvent(Id, trackingNumber, DateTimeOffset.UtcNow);
Apply(evt);
}
// Apply event to state & queue for persistence
private void Apply(IDomainEvent evt)
{
switch (evt)
{
case OrderCreatedEvent e:
Status = OrderStatus.Pending;
TotalAmount = Items.Sum(i => i.Price * i.Quantity);
break;
case OrderPaidEvent e:
Status = OrderStatus.Paid;
break;
case OrderShippedEvent e:
Status = OrderStatus.Shipped;
break;
}
Version++;
_uncommittedEvents.Enqueue(evt);
}
public IDomainEvent[] GetUncommittedEvents() => _uncommittedEvents.ToArray();
public void ClearUncommittedEvents() => _uncommittedEvents.Clear();
}
3. Event Store Interface & Implementation
Production event stores handle concurrency, serialization, and persistence. This simplified version demonstrates the contract.
public interface IEventStore
{
Task SaveAsync(Guid aggregateId, int expectedVersion, IEnumerable<IDomainEvent> events, CancellationToken ct = default);
Task<IEnumerable<IDomainEvent>> LoadAsync(Guid aggregateId, CancellationToken ct = default);
}
public class InMemoryEventStore : IEventStore
{
private readonly Dictionary<Guid, List<IDomainEvent>> _store = new();
public Task SaveAsync(Guid aggregateId, int expectedVersion, IEnumerable<IDomainEvent> events, CancellationToken ct = default)
{
if (!_store.ContainsKey(aggregateId))
_store[aggregateId] = new List<IDomainEvent>();
var currentVersion = _store[aggregateId].Count;
if (currentVersion != expectedVersion)
throw new ConcurrencyException($"Expected version {expectedVersion}, but found {currentVersion}");
_store[aggregateId].AddRange(events);
return Task.CompletedTask;
}
public Task<IEnumerable<IDomainEvent>> LoadAsync(Guid aggregateId, CancellationToken ct = default)
{
return Task.FromResult(_store.TryGetValue(aggregateId, out var events) ? events.AsEnumerable() : Enumerable.Empty<IDomainEvent>());
}
}
4. Command Handler
Commands trigger state changes. The handler loads the aggregate, executes business logic, persists events, and publishes them.
public class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommand, Guid>
{
private readonly IEventStore _eventStore;
private readonly IProjectionBus _projectionBus;
public CreateOrderCommandHandler(IEventStore eventStore, IProjectionBus projectionBus)
{
_eventStore = eventStore;
_projectionBus = projectionBus;
}
public async Task<Guid> Handle(CreateOrderCommand request, CancellationToken ct)
{
var order = OrderAggregate.Create(request.OrderId, request.CustomerId, request.Items);
var events = order.GetUncommittedEvents();
await _eventStore.SaveAsync(order.Id, 0, events, ct);
order.ClearUncommittedEvents();
// Project to read model
foreach (var evt in events)
await _projectionBus.PublishAsync(evt, ct);
return order.Id;
}
}
5. Query Handler & Projection
Queries read from an optimized read model. Projections listen to events and materialize the view.
public class GetOrderQueryHandler : IRequestHandler<GetOrderQuery, OrderReadModel>
{
private readonly IReadModelRepository _readRepo;
public GetOrderQueryHandler(IReadModelRepository readRepo) => _readRepo = readRepo;
public async Task<OrderReadModel> Handle(GetOrderQuery request, CancellationToken ct)
{
return await _readRepo.GetByOrderIdAsync(request.OrderId, ct);
}
}
// Projection example
public class OrderProjection : IEventHandler<OrderCreatedEvent>, IEventHandler<OrderPaidEvent>
{
private readonly IReadModelRepository _readRepo;
public OrderProjection(IReadModelRepository readRepo) => _readRepo = readRepo;
public async Task Handle(OrderCreatedEvent evt, CancellationToken ct)
{
var model = new OrderReadModel
{
OrderId = evt.OrderId,
CustomerId = evt.CustomerId,
Status = "Pending",
Total = evt.Items.Sum(i => i.Price * i.Quantity),
CreatedAt = evt.Timestamp
};
await _readRepo.UpsertAsync(model, ct);
}
public async Task Handle(OrderPaidEvent evt, CancellationToken ct)
{
var model = await _readRepo.GetByOrderIdAsync(evt.OrderId, ct);
model.Status = "Paid";
model.PaidAt = evt.Timestamp;
await _readRepo.UpsertAsync(model, ct);
}
}
Production Notes:
- Replace
InMemoryEventStore with a durable store (EventStoreDB, PostgreSQL pg_event, or Kafka).
- Use schema registries or explicit versioning for events.
- Implement idempotency keys on commands to prevent duplicates.
- Use background workers or outbox patterns for projection consistency.
Pitfall Guide
1. Over-Engineering for Simple CRUD Domains
Symptom: Teams introduce CQRS+ES into low-complexity domains with straightforward read/write ratios.
Impact: Unnecessary infrastructure, longer development cycles, and higher maintenance costs.
Mitigation: Apply the 80/20 rule. Use CQRS+ES only for complex, high-value domains. Keep CRUD for simple configuration or reference data.
2. Event Schema Evolution Blind Spots
Symptom: Adding/removing fields breaks projections or replay processes.
Impact: Silent data corruption or failed event handlers.
Mitigation: Implement explicit event versioning (OrderCreatedV1, OrderCreatedV2). Use schema migration strategies (forward-compatible defaults, backward-compatible readers). Never mutate existing event types; always create new versions.
3. Ignoring Eventual Consistency Realities
Symptom: Users expect immediate read-model updates after commands.
Impact: Frustration, support tickets, and workarounds that break the pattern.
Mitigation: Design UI/UX with optimistic updates or polling. Document consistency guarantees. Use correlation IDs to track command-to-projection latency. Implement retry/dead-letter queues for failed projections.
4. Missing Snapshotting for Large Aggregates
Symptom: Aggregates with 1000+ events cause slow replay and high memory usage.
Impact: Latency spikes during command processing; increased infrastructure costs.
Mitigation: Implement periodic snapshots (e.g., every 100 events). Store snapshot + delta events. Load snapshot, apply only newer events. Automate snapshot creation via background workers or event store hooks.
5. Neglecting Idempotency & Deduplication
Symptom: Network retries or message broker duplicates cause double-charging or duplicate state transitions.
Impact: Financial discrepancies, audit failures, and data integrity loss.
Mitigation: Assign unique idempotency keys to commands. Store processed command IDs in a deduplication table. Event stores naturally prevent duplicate appends, but command handlers must validate before execution.
6. Debugging & Observability Gaps
Symptom: Teams struggle to trace why a read model diverged or why a command failed.
Impact: Long MTTR, reliance on guesswork, and production incidents.
Mitigation: Instrument correlation IDs across commands, events, and projections. Log event payloads (redact PII). Use distributed tracing (OpenTelemetry). Build a temporal debugging UI that replays events for a given aggregate.
Production Bundle
β
Pre-Deployment Checklist
π Decision Matrix: When to Use CQRS + ES
| Criteria | Use CQRS + ES | Use CQRS Only | Stick to CRUD |
|---|
| Read/Write Ratio | > 3:1 or highly variable | 2:1 to 3:1 | 1:1 or balanced |
| Audit/Compliance | Mandatory (financial, healthcare) | Optional | Not required |
| Domain Complexity | High (many state transitions, rules) | Medium | Low |
| Team Size & Expertise | β₯ 5 devs, DDD/ES experience | 3-5 devs, CQRS experience | 1-3 devs, standard stack |
| Latency Tolerance | Accepts eventual consistency | Strict consistency needed | Real-time consistency |
| Budget/Infrastructure | Can support event store + projections | Moderate infra | Minimal infra |
βοΈ Configuration Template (appsettings.json)
{
"EventSourcing": {
"Store": {
"Provider": "EventStoreDB",
"ConnectionString": "esdb://localhost:2113?tls=false",
"MaxAppendSize": 1000,
"ConcurrencyCheck": true
},
"Serialization": {
"Format": "SystemTextJson",
"TypeNameHandling": "None",
"CamelCase": true
},
"Snapshots": {
"Enabled": true,
"Threshold": 100,
"Storage": "Redis",
"TtlHours": 72
},
"Projections": {
"Mode": "BackgroundWorker",
"BatchSize": 50,
"RetryPolicy": {
"MaxRetries": 5,
"BackoffMs": 1000,
"DeadLetterQueue": true
}
},
"Idempotency": {
"Enabled": true,
"TtlMinutes": 60,
"Storage": "PostgreSQL"
}
},
"ReadModel": {
"Database": "ReadDb",
"Cache": {
"Provider": "Redis",
"TtlSeconds": 300,
"InvalidationPattern": "order:*"
}
}
}
π Quick Start: 5-Step Implementation Guide
-
Initialize Project Structure
dotnet new webapi -n OrderSystem
dotnet add package MediatR
dotnet add package Microsoft.EntityFrameworkCore
mkdir Commands Queries Events Projections ReadModels
-
Define Events & Aggregate
Create immutable event records. Implement the aggregate with Apply() methods and uncommitted event queue. Ensure all state changes happen exclusively through event application.
-
Wire Command & Query Handlers
Use MediatR to route commands to handlers. Load aggregate β execute business logic β persist events β publish to projection bus. Query handlers read directly from the optimized read model.
-
Implement Projection Pipeline
Create a background worker or hosted service that subscribes to published events. Apply transformations to update the read database. Add retry logic and correlation tracking.
-
Deploy & Observe
Start with a local EventStoreDB or PostgreSQL event table. Configure the projection worker. Add OpenTelemetry instrumentation. Validate with:
curl -X POST /api/orders -d '{"orderId":"...","customerId":"...","items":[...]}'
curl -X GET /api/orders/{id}
# Check projection lag via /metrics endpoint
Closing Thoughts
CQRS and Event Sourcing are not silver bullets; they are architectural levers that amplify domain complexity when applied correctly. The patterns shine in systems where auditability, temporal analysis, and independent scaling matter more than simplicity. Success depends on disciplined event design, explicit consistency contracts, and production-grade operational practices.
Use the decision matrix to justify adoption, implement the core patterns with versioning and idempotency baked in, and deploy using the production bundle. When executed with intention, CQRS + Event Sourcing transforms chaotic state management into a transparent, auditable, and highly scalable system that evolves gracefully with your business.