Hallo Welt
Hallo Welt
Original Lingva Deutsch
Übersetzung wird vorbereitet...
Dieser Vorgang kann bis zu 60 Sekunden dauern.
Diese Seite wird erstmalig übersetzt und dann für alle Besucher gespeichert.
0%
DE Zurück zu Deutsch
Übersetzung durch Lingva Translate

234 Dokumentationen verfügbar

Wissensdatenbank

Message Queues RabbitMQ Grundlagen

Zuletzt aktualisiert: 20.01.2026 um 10:03 Uhr

Message Queues: Asynchrone Kommunikation

Message Queues entkoppeln Systeme und ermöglichen asynchrone Verarbeitung. Lernen Sie die Grundlagen mit RabbitMQ.

Warum Message Queues?

Synchron (problematisch):
User → API → Email Service → Response
            ↑
         Wartet 2s auf Email-Versand!

Asynchron (besser):
User → API → Queue → Response (sofort!)
              ↓
         Email Worker → Email Service (im Hintergrund)

Konzepte

┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐
│ Producer │───>│ Exchange │───>│  Queue   │───>│ Consumer │
└──────────┘    └──────────┘    └──────────┘    └──────────┘

Producer:  Sendet Nachrichten
Exchange:  Routet Nachrichten zu Queues
Queue:     Speichert Nachrichten
Consumer:  Verarbeitet Nachrichten

RabbitMQ Setup

# Docker
docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:management

# Management UI: http://localhost:15672
# Default: guest / guest

# Node.js
npm install amqplib

Einfaches Beispiel

// producer.js
const amqp = require('amqplib');

async function send(message) {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const queue = 'tasks';
    await channel.assertQueue(queue, { durable: true });

    channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
        persistent: true
    });

    console.log('Sent:', message);
    await channel.close();
    await connection.close();
}

send({ task: 'send_email', to: 'user@example.com' });
// consumer.js
const amqp = require('amqplib');

async function consume() {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const queue = 'tasks';
    await channel.assertQueue(queue, { durable: true });

    // Nur 1 Nachricht gleichzeitig pro Consumer
    channel.prefetch(1);

    console.log('Waiting for messages...');

    channel.consume(queue, async (msg) => {
        const content = JSON.parse(msg.content.toString());
        console.log('Received:', content);

        try {
            await processTask(content);
            channel.ack(msg);  // Erfolgreich verarbeitet
        } catch (error) {
            channel.nack(msg, false, true);  // Zurück in Queue
        }
    });
}

async function processTask(task) {
    // Verarbeitung...
    await new Promise(r => setTimeout(r, 1000));
    console.log('Task completed:', task);
}

consume();

Exchange-Typen

# Direct Exchange: Exaktes Routing
Exchange ─── routing_key="email" ───> Email Queue
         └── routing_key="sms" ─────> SMS Queue

# Fanout Exchange: Broadcast an alle
Exchange ─┬─> Queue 1
          ├─> Queue 2
          └─> Queue 3

# Topic Exchange: Pattern-Matching
Exchange ─── "user.created" ────> User Events Queue
         └── "order.*" ─────────> Order Events Queue
         └── "#" ───────────────> All Events Queue

# Headers Exchange: Header-basiertes Routing

Pub/Sub Pattern

// publisher.js
async function publish(event, data) {
    const channel = await getChannel();

    const exchange = 'events';
    await channel.assertExchange(exchange, 'topic', { durable: true });

    channel.publish(
        exchange,
        event,  // z.B. 'user.created'
        Buffer.from(JSON.stringify(data))
    );
}

publish('user.created', { userId: 123, email: 'user@example.com' });
publish('order.completed', { orderId: 456, total: 99.99 });
// subscriber.js
async function subscribe(pattern, handler) {
    const channel = await getChannel();

    const exchange = 'events';
    await channel.assertExchange(exchange, 'topic', { durable: true });

    // Exklusive Queue für diesen Consumer
    const q = await channel.assertQueue('', { exclusive: true });

    await channel.bindQueue(q.queue, exchange, pattern);

    channel.consume(q.queue, (msg) => {
        const event = msg.fields.routingKey;
        const data = JSON.parse(msg.content.toString());
        handler(event, data);
        channel.ack(msg);
    });
}

// Alle User-Events
subscribe('user.*', (event, data) => {
    console.log('User event:', event, data);
});

// Nur order.completed
subscribe('order.completed', (event, data) => {
    console.log('Order completed:', data);
});

Dead Letter Queue

// Für fehlgeschlagene Nachrichten
async function setupWithDLQ() {
    const channel = await getChannel();

    // Dead Letter Queue
    await channel.assertQueue('tasks.dlq', { durable: true });

    // Hauptqueue mit DLQ
    await channel.assertQueue('tasks', {
        durable: true,
        arguments: {
            'x-dead-letter-exchange': '',
            'x-dead-letter-routing-key': 'tasks.dlq',
            'x-message-ttl': 60000  // 60s TTL
        }
    });
}

// Nach X Fehlversuchen landet Nachricht in DLQ
channel.consume('tasks', (msg) => {
    const retries = msg.properties.headers['x-retry-count'] || 0;

    try {
        processMessage(msg);
        channel.ack(msg);
    } catch (error) {
        if (retries < 3) {
            // Retry
            channel.publish('', 'tasks', msg.content, {
                headers: { 'x-retry-count': retries + 1 }
            });
            channel.ack(msg);
        } else {
            // An DLQ
            channel.nack(msg, false, false);
        }
    }
});

Best Practices

✅ Do:
  • Nachrichten immer ACK/NACK
  • Idempotente Consumer (gleiche Nachricht = gleiches Ergebnis)
  • Dead Letter Queues für Fehler
  • Persistent Messages für wichtige Daten
💡 Tipp: Überwachen Sie Ihre Queue-Längen und Consumer mit dem Enjyn Status Monitor.

Weitere Informationen