Message Queues: Asynchrone Kommunikation | Enjyn Gruppe
Hallo Welt
Hallo Welt
Original Lingva Deutsch
Übersetzung wird vorbereitet...
Dieser Vorgang kann bis zu 60 Sekunden dauern.
Diese Seite wird erstmalig übersetzt und dann für alle Besucher gespeichert.
0%
DE Zurück zu Deutsch
Übersetzung durch Lingva Translate

235 Dokumentationen verfügbar

Wissensdatenbank

Message Queues RabbitMQ Grundlagen

Zuletzt aktualisiert: 20.01.2026 um 10:03 Uhr

Message Queues: Asynchrone Kommunikation

Message Queues entkoppeln Systeme und ermöglichen asynchrone Verarbeitung. Lernen Sie die Grundlagen mit RabbitMQ.

Warum Message Queues?

Synchron (problematisch):
User → API → Email Service → Response
            ↑
         Wartet 2s auf Email-Versand!

Asynchron (besser):
User → API → Queue → Response (sofort!)
              ↓
         Email Worker → Email Service (im Hintergrund)

Konzepte

┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐
│ Producer │───>│ Exchange │───>│  Queue   │───>│ Consumer │
└──────────┘    └──────────┘    └──────────┘    └──────────┘

Producer:  Sendet Nachrichten
Exchange:  Routet Nachrichten zu Queues
Queue:     Speichert Nachrichten
Consumer:  Verarbeitet Nachrichten

RabbitMQ Setup

# Docker
docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:management

# Management UI: http://localhost:15672
# Default: guest / guest

# Node.js
npm install amqplib

Einfaches Beispiel

// producer.js
const amqp = require('amqplib');

async function send(message) {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const queue = 'tasks';
    await channel.assertQueue(queue, { durable: true });

    channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
        persistent: true
    });

    console.log('Sent:', message);
    await channel.close();
    await connection.close();
}

send({ task: 'send_email', to: 'user@example.com' });
// consumer.js
const amqp = require('amqplib');

async function consume() {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const queue = 'tasks';
    await channel.assertQueue(queue, { durable: true });

    // Nur 1 Nachricht gleichzeitig pro Consumer
    channel.prefetch(1);

    console.log('Waiting for messages...');

    channel.consume(queue, async (msg) => {
        const content = JSON.parse(msg.content.toString());
        console.log('Received:', content);

        try {
            await processTask(content);
            channel.ack(msg);  // Erfolgreich verarbeitet
        } catch (error) {
            channel.nack(msg, false, true);  // Zurück in Queue
        }
    });
}

async function processTask(task) {
    // Verarbeitung...
    await new Promise(r => setTimeout(r, 1000));
    console.log('Task completed:', task);
}

consume();

Exchange-Typen

# Direct Exchange: Exaktes Routing
Exchange ─── routing_key="email" ───> Email Queue
         └── routing_key="sms" ─────> SMS Queue

# Fanout Exchange: Broadcast an alle
Exchange ─┬─> Queue 1
          ├─> Queue 2
          └─> Queue 3

# Topic Exchange: Pattern-Matching
Exchange ─── "user.created" ────> User Events Queue
         └── "order.*" ─────────> Order Events Queue
         └── "#" ───────────────> All Events Queue

# Headers Exchange: Header-basiertes Routing

Pub/Sub Pattern

// publisher.js
async function publish(event, data) {
    const channel = await getChannel();

    const exchange = 'events';
    await channel.assertExchange(exchange, 'topic', { durable: true });

    channel.publish(
        exchange,
        event,  // z.B. 'user.created'
        Buffer.from(JSON.stringify(data))
    );
}

publish('user.created', { userId: 123, email: 'user@example.com' });
publish('order.completed', { orderId: 456, total: 99.99 });
// subscriber.js
async function subscribe(pattern, handler) {
    const channel = await getChannel();

    const exchange = 'events';
    await channel.assertExchange(exchange, 'topic', { durable: true });

    // Exklusive Queue für diesen Consumer
    const q = await channel.assertQueue('', { exclusive: true });

    await channel.bindQueue(q.queue, exchange, pattern);

    channel.consume(q.queue, (msg) => {
        const event = msg.fields.routingKey;
        const data = JSON.parse(msg.content.toString());
        handler(event, data);
        channel.ack(msg);
    });
}

// Alle User-Events
subscribe('user.*', (event, data) => {
    console.log('User event:', event, data);
});

// Nur order.completed
subscribe('order.completed', (event, data) => {
    console.log('Order completed:', data);
});

Dead Letter Queue

// Für fehlgeschlagene Nachrichten
async function setupWithDLQ() {
    const channel = await getChannel();

    // Dead Letter Queue
    await channel.assertQueue('tasks.dlq', { durable: true });

    // Hauptqueue mit DLQ
    await channel.assertQueue('tasks', {
        durable: true,
        arguments: {
            'x-dead-letter-exchange': '',
            'x-dead-letter-routing-key': 'tasks.dlq',
            'x-message-ttl': 60000  // 60s TTL
        }
    });
}

// Nach X Fehlversuchen landet Nachricht in DLQ
channel.consume('tasks', (msg) => {
    const retries = msg.properties.headers['x-retry-count'] || 0;

    try {
        processMessage(msg);
        channel.ack(msg);
    } catch (error) {
        if (retries < 3) {
            // Retry
            channel.publish('', 'tasks', msg.content, {
                headers: { 'x-retry-count': retries + 1 }
            });
            channel.ack(msg);
        } else {
            // An DLQ
            channel.nack(msg, false, false);
        }
    }
});

Best Practices

✅ Do:
  • Nachrichten immer ACK/NACK
  • Idempotente Consumer (gleiche Nachricht = gleiches Ergebnis)
  • Dead Letter Queues für Fehler
  • Persistent Messages für wichtige Daten
💡 Tipp: Überwachen Sie Ihre Queue-Längen und Consumer mit dem Enjyn Status Monitor.

Weitere Informationen

Enjix Beta

Enjyn AI Agent

Hallo 👋 Ich bin Enjix — wie kann ich dir helfen?
120