Message Queues RabbitMQ Grundlagen
Message Queues: Asynchrone Kommunikation
Message Queues entkoppeln Systeme und ermöglichen asynchrone Verarbeitung. Lernen Sie die Grundlagen mit RabbitMQ.
Warum Message Queues?
Synchron (problematisch):
User → API → Email Service → Response
↑
Wartet 2s auf Email-Versand!
Asynchron (besser):
User → API → Queue → Response (sofort!)
↓
Email Worker → Email Service (im Hintergrund)
Konzepte
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Producer │───>│ Exchange │───>│ Queue │───>│ Consumer │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ Producer: Sendet Nachrichten Exchange: Routet Nachrichten zu Queues Queue: Speichert Nachrichten Consumer: Verarbeitet Nachrichten
RabbitMQ Setup
# Docker docker run -d --name rabbitmq \ -p 5672:5672 \ -p 15672:15672 \ rabbitmq:management # Management UI: http://localhost:15672 # Default: guest / guest # Node.js npm install amqplib
Einfaches Beispiel
// producer.js
const amqp = require('amqplib');
async function send(message) {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'tasks';
await channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
persistent: true
});
console.log('Sent:', message);
await channel.close();
await connection.close();
}
send({ task: 'send_email', to: 'user@example.com' });
// consumer.js
const amqp = require('amqplib');
async function consume() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'tasks';
await channel.assertQueue(queue, { durable: true });
// Nur 1 Nachricht gleichzeitig pro Consumer
channel.prefetch(1);
console.log('Waiting for messages...');
channel.consume(queue, async (msg) => {
const content = JSON.parse(msg.content.toString());
console.log('Received:', content);
try {
await processTask(content);
channel.ack(msg); // Erfolgreich verarbeitet
} catch (error) {
channel.nack(msg, false, true); // Zurück in Queue
}
});
}
async function processTask(task) {
// Verarbeitung...
await new Promise(r => setTimeout(r, 1000));
console.log('Task completed:', task);
}
consume();
Exchange-Typen
# Direct Exchange: Exaktes Routing
Exchange ─── routing_key="email" ───> Email Queue
└── routing_key="sms" ─────> SMS Queue
# Fanout Exchange: Broadcast an alle
Exchange ─┬─> Queue 1
├─> Queue 2
└─> Queue 3
# Topic Exchange: Pattern-Matching
Exchange ─── "user.created" ────> User Events Queue
└── "order.*" ─────────> Order Events Queue
└── "#" ───────────────> All Events Queue
# Headers Exchange: Header-basiertes Routing
Pub/Sub Pattern
// publisher.js
async function publish(event, data) {
const channel = await getChannel();
const exchange = 'events';
await channel.assertExchange(exchange, 'topic', { durable: true });
channel.publish(
exchange,
event, // z.B. 'user.created'
Buffer.from(JSON.stringify(data))
);
}
publish('user.created', { userId: 123, email: 'user@example.com' });
publish('order.completed', { orderId: 456, total: 99.99 });
// subscriber.js
async function subscribe(pattern, handler) {
const channel = await getChannel();
const exchange = 'events';
await channel.assertExchange(exchange, 'topic', { durable: true });
// Exklusive Queue für diesen Consumer
const q = await channel.assertQueue('', { exclusive: true });
await channel.bindQueue(q.queue, exchange, pattern);
channel.consume(q.queue, (msg) => {
const event = msg.fields.routingKey;
const data = JSON.parse(msg.content.toString());
handler(event, data);
channel.ack(msg);
});
}
// Alle User-Events
subscribe('user.*', (event, data) => {
console.log('User event:', event, data);
});
// Nur order.completed
subscribe('order.completed', (event, data) => {
console.log('Order completed:', data);
});
Dead Letter Queue
// Für fehlgeschlagene Nachrichten
async function setupWithDLQ() {
const channel = await getChannel();
// Dead Letter Queue
await channel.assertQueue('tasks.dlq', { durable: true });
// Hauptqueue mit DLQ
await channel.assertQueue('tasks', {
durable: true,
arguments: {
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': 'tasks.dlq',
'x-message-ttl': 60000 // 60s TTL
}
});
}
// Nach X Fehlversuchen landet Nachricht in DLQ
channel.consume('tasks', (msg) => {
const retries = msg.properties.headers['x-retry-count'] || 0;
try {
processMessage(msg);
channel.ack(msg);
} catch (error) {
if (retries < 3) {
// Retry
channel.publish('', 'tasks', msg.content, {
headers: { 'x-retry-count': retries + 1 }
});
channel.ack(msg);
} else {
// An DLQ
channel.nack(msg, false, false);
}
}
});
Best Practices
✅ Do:
- Nachrichten immer ACK/NACK
- Idempotente Consumer (gleiche Nachricht = gleiches Ergebnis)
- Dead Letter Queues für Fehler
- Persistent Messages für wichtige Daten
💡 Tipp:
Überwachen Sie Ihre Queue-Längen und Consumer mit dem Enjyn Status Monitor.