Event Sourcing CQRS Grundlagen
Event Sourcing & CQRS: Events als Wahrheit
Event Sourcing speichert alle Zustandsänderungen als Events. CQRS trennt Lese- von Schreiboperationen. Lernen Sie diese mächtigen Patterns.
Traditionell vs. Event Sourcing
# Traditionell: Aktuellen Zustand speichern
┌─────────────────────────────┐
│ accounts │
│ id | balance | updated_at │
│ 1 | 1000 | 2024-01-15 │
└─────────────────────────────┘
→ Historie verloren!
# Event Sourcing: Alle Änderungen als Events
┌─────────────────────────────────────────┐
│ events │
│ id | type | data │
│ 1 | AccountOpened | {balance: 0} │
│ 2 | MoneyDeposited | {amount: 500} │
│ 3 | MoneyDeposited | {amount: 700} │
│ 4 | MoneyWithdrawn | {amount: 200} │
└─────────────────────────────────────────┘
→ Balance = 0 + 500 + 700 - 200 = 1000
→ Komplette Historie!
Event Sourcing Konzepte
// Event = Etwas das passiert IST (Vergangenheit!)
interface DomainEvent {
eventId: string;
aggregateId: string;
timestamp: Date;
version: number;
}
class AccountOpened implements DomainEvent {
constructor(
public eventId: string,
public aggregateId: string,
public timestamp: Date,
public version: number,
public ownerId: string,
public initialBalance: number
) {}
}
class MoneyDeposited implements DomainEvent {
constructor(
public eventId: string,
public aggregateId: string,
public timestamp: Date,
public version: number,
public amount: number,
public description: string
) {}
}
class MoneyWithdrawn implements DomainEvent {
// ...
}
Aggregate mit Event Sourcing
class Account {
private id: string;
private balance: number = 0;
private events: DomainEvent[] = [];
private version: number = 0;
// Private constructor - Reconstruction via events
private constructor() {}
// Factory für neue Accounts
static open(id: string, ownerId: string): Account {
const account = new Account();
account.apply(new AccountOpened(
uuid(),
id,
new Date(),
1,
ownerId,
0
));
return account;
}
// Reconstruction aus Event-Historie
static fromHistory(events: DomainEvent[]): Account {
const account = new Account();
events.forEach(event => account.applyEvent(event));
account.events = []; // Keine neuen Events
return account;
}
deposit(amount: number, description: string): void {
if (amount <= 0) throw new Error('Invalid amount');
this.apply(new MoneyDeposited(
uuid(),
this.id,
new Date(),
this.version + 1,
amount,
description
));
}
withdraw(amount: number, description: string): void {
if (amount <= 0) throw new Error('Invalid amount');
if (amount > this.balance) throw new Error('Insufficient funds');
this.apply(new MoneyWithdrawn(
uuid(),
this.id,
new Date(),
this.version + 1,
amount,
description
));
}
private apply(event: DomainEvent): void {
this.applyEvent(event);
this.events.push(event);
}
private applyEvent(event: DomainEvent): void {
if (event instanceof AccountOpened) {
this.id = event.aggregateId;
this.balance = event.initialBalance;
} else if (event instanceof MoneyDeposited) {
this.balance += event.amount;
} else if (event instanceof MoneyWithdrawn) {
this.balance -= event.amount;
}
this.version = event.version;
}
getUncommittedEvents(): DomainEvent[] {
return [...this.events];
}
clearEvents(): void {
this.events = [];
}
}
Event Store
interface EventStore {
saveEvents(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void>;
getEvents(aggregateId: string): Promise<DomainEvent[]>;
}
class PostgresEventStore implements EventStore {
async saveEvents(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void> {
await this.db.transaction(async (tx) => {
// Optimistic Locking
const currentVersion = await tx.query(
'SELECT MAX(version) FROM events WHERE aggregate_id = $1',
[aggregateId]
);
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError('Aggregate was modified');
}
for (const event of events) {
await tx.query(
'INSERT INTO events (event_id, aggregate_id, event_type, data, version, timestamp) VALUES ($1, $2, $3, $4, $5, $6)',
[event.eventId, aggregateId, event.constructor.name, JSON.stringify(event), event.version, event.timestamp]
);
}
});
}
async getEvents(aggregateId: string): Promise<DomainEvent[]> {
const rows = await this.db.query(
'SELECT * FROM events WHERE aggregate_id = $1 ORDER BY version',
[aggregateId]
);
return rows.map(row => this.deserialize(row));
}
}
CQRS: Command Query Responsibility Segregation
/*
Traditionell:
┌─────────────────┐
│ Application │
│ ┌─────────┐ │
│ │ Model │ │ ← Read & Write
│ └─────────┘ │
│ ┌─────────┐ │
│ │ DB │ │
│ └─────────┘ │
└─────────────────┘
CQRS:
┌─────────────────────────────────┐
│ Application │
│ ┌───────────┐ ┌────────────┐ │
│ │ Commands │ │ Queries │ │
│ │ (Write) │ │ (Read) │ │
│ └─────┬─────┘ └─────┬──────┘ │
│ │ │ │
│ ┌─────▼─────┐ ┌─────▼──────┐ │
│ │ Write DB │ │ Read DB │ │
│ │ (Events) │ │ (Projections)│
│ └───────────┘ └────────────┘ │
└─────────────────────────────────┘
*/
// Command Handler
class DepositMoneyHandler {
constructor(private eventStore: EventStore) {}
async handle(command: DepositMoney): Promise<void> {
const events = await this.eventStore.getEvents(command.accountId);
const account = Account.fromHistory(events);
account.deposit(command.amount, command.description);
await this.eventStore.saveEvents(
command.accountId,
account.getUncommittedEvents(),
account.version
);
}
}
// Read Model (Projection)
class AccountBalanceProjection {
async handle(event: DomainEvent): Promise<void> {
if (event instanceof AccountOpened) {
await this.db.query(
'INSERT INTO account_balances (id, balance) VALUES ($1, $2)',
[event.aggregateId, event.initialBalance]
);
} else if (event instanceof MoneyDeposited) {
await this.db.query(
'UPDATE account_balances SET balance = balance + $1 WHERE id = $2',
[event.amount, event.aggregateId]
);
}
}
}
// Query Handler - direkt aus optimiertem Read Model
class GetAccountBalanceHandler {
async handle(query: GetAccountBalance): Promise<number> {
const result = await this.db.query(
'SELECT balance FROM account_balances WHERE id = $1',
[query.accountId]
);
return result.balance;
}
}
Wann Event Sourcing?
✅ Gut für:
- Audit-Trail erforderlich (Finanzen, Medizin)
- Zeitreisen/Debugging ("Was war am 15.1.?")
- Komplexe Domain mit vielen Zustandsübergängen
- Event-Driven Architekturen
❌ Overkill für:
- Einfache CRUD-Anwendungen
- Kleine Teams ohne ES-Erfahrung
- Wenn Eventual Consistency problematisch ist