Log Aggregation ELK Stack
Log Aggregation mit dem ELK Stack
Zentralisiertes Logging ist essentiell für Debugging und Monitoring verteilter Systeme. Lernen Sie den ELK Stack (Elasticsearch, Logstash, Kibana) zu konfigurieren.
ELK Stack Architektur
┌─────────────────────────────────────────────────────────────┐ │ ELK STACK │ ├─────────────────────────────────────────────────────────────┤ │ │ │ QUELLEN TRANSPORT SPEICHER UI │ │ │ │ ┌─────────┐ │ │ │ App 1 │──┐ │ │ │ Logs │ │ ┌───────────┐ ┌─────────────┐ │ │ └─────────┘ │ │ │ │ │ │ │ ├─────►│ LOGSTASH │──►│ELASTICSEARCH│ │ │ ┌─────────┐ │ │ (Parser) │ │ (Storage) │ │ │ │ App 2 │──┤ │ │ │ │ │ │ │ Logs │ │ └───────────┘ └──────┬──────┘ │ │ └─────────┘ │ ▲ │ │ │ │ │ │ │ │ ┌─────────┐ │ ┌─────┴─────┐ │ │ │ │ System │──┘ │ FILEBEAT │ ▼ │ │ │ Logs │ │ (Shipper) │ ┌───────────┐ │ │ └─────────┘ └───────────┘ │ KIBANA │ │ │ │ (UI) │ │ │ └───────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘
Komponenten
| Komponente | Aufgabe | Alternative |
|---|---|---|
| Elasticsearch | Speicherung und Suche | OpenSearch |
| Logstash | Parsing, Transformation | Fluentd, Vector |
| Kibana | Visualisierung, Dashboards | Grafana |
| Filebeat | Log-Shipping (leichtgewichtig) | Fluentbit |
Docker Compose Setup
# docker-compose.yml
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
ports:
- "9200:9200"
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
volumes:
- ./logstash/pipeline:/usr/share/logstash/pipeline
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml
ports:
- "5044:5044" # Beats input
- "5000:5000" # TCP input
depends_on:
- elasticsearch
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
depends_on:
- elasticsearch
filebeat:
image: docker.elastic.co/beats/filebeat:8.11.0
user: root
volumes:
- ./filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- /var/log:/var/log:ro
depends_on:
- logstash
volumes:
elasticsearch-data:
Filebeat Konfiguration
# filebeat/filebeat.yml
filebeat.inputs:
# Application Logs
- type: log
enabled: true
paths:
- /var/log/myapp/*.log
fields:
app: myapp
environment: production
multiline:
pattern: '^\d{4}-\d{2}-\d{2}'
negate: true
match: after
# Nginx Access Logs
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
fields:
type: nginx-access
# Docker Container Logs
- type: container
paths:
- '/var/lib/docker/containers/*/*.log'
processors:
- add_docker_metadata:
host: "unix:///var/run/docker.sock"
output.logstash:
hosts: ["logstash:5044"]
# Oder direkt zu Elasticsearch
# output.elasticsearch:
# hosts: ["elasticsearch:9200"]
# index: "filebeat-%{+yyyy.MM.dd}"
Logstash Pipeline
# logstash/pipeline/main.conf
input {
beats {
port => 5044
}
tcp {
port => 5000
codec => json
}
}
filter {
# Nginx Access Log parsen
if [fields][type] == "nginx-access" {
grok {
match => { "message" => '%{IPORHOST:client_ip} - %{USER:user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATH:path}(?:%{URIQUERY:query})? HTTP/%{NUMBER:http_version}" %{NUMBER:status:int} %{NUMBER:bytes:int} "%{DATA:referrer}" "%{DATA:user_agent}"' }
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
geoip {
source => "client_ip"
target => "geoip"
}
useragent {
source => "user_agent"
target => "ua"
}
}
# JSON Logs parsen
if [fields][app] == "myapp" {
json {
source => "message"
target => "parsed"
}
mutate {
add_field => {
"level" => "%{[parsed][level]}"
"logger" => "%{[parsed][logger]}"
}
}
}
# Sensitive Daten entfernen
mutate {
gsub => [
"message", "password=\S+", "password=[REDACTED]",
"message", "api_key=\S+", "api_key=[REDACTED]"
]
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logs-%{[fields][app]}-%{+YYYY.MM.dd}"
}
# Debug: Auch auf stdout ausgeben
# stdout { codec => rubydebug }
}
Strukturierte Logs (JSON)
// Statt unstrukturierter Logs:
// [2024-01-15 10:30:00] ERROR: User 123 failed to login - invalid password
// Strukturierte JSON Logs:
{
"timestamp": "2024-01-15T10:30:00.000Z",
"level": "ERROR",
"logger": "auth.service",
"message": "Login failed",
"context": {
"user_id": 123,
"reason": "invalid_password",
"ip": "192.168.1.100",
"request_id": "abc-123-xyz"
}
}
// PHP Beispiel mit Monolog
use Monolog\Logger;
use Monolog\Handler\StreamHandler;
use Monolog\Formatter\JsonFormatter;
$logger = new Logger('app');
$handler = new StreamHandler('/var/log/myapp/app.log');
$handler->setFormatter(new JsonFormatter());
$logger->pushHandler($handler);
$logger->error('Login failed', [
'user_id' => $userId,
'reason' => 'invalid_password',
'ip' => $_SERVER['REMOTE_ADDR'],
'request_id' => $requestId
]);
// Node.js Beispiel mit Winston
const winston = require('winston');
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
defaultMeta: {
service: 'user-service',
environment: process.env.NODE_ENV
},
transports: [
new winston.transports.File({ filename: '/var/log/myapp/app.log' })
]
});
logger.error('Login failed', {
userId: 123,
reason: 'invalid_password',
ip: req.ip,
requestId: req.headers['x-request-id']
});
Kibana Queries (KQL)
# Kibana Query Language (KQL) # Alle Errors finden level: "ERROR" # Bestimmter User context.user_id: 123 # Kombination level: "ERROR" and context.user_id: 123 # Wildcard message: *timeout* # Range (Status Codes) status >= 400 and status < 500 # Zeitraum (in Discover) @timestamp >= "2024-01-15" and @timestamp < "2024-01-16" # Negation NOT level: "DEBUG" # Nested Fields context.request.method: "POST" # Regex (Lucene Syntax) message: /.*exception.*/i
Index Lifecycle Management
# PUT _ilm/policy/logs-policy
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_age": "1d",
"max_size": "50gb"
},
"set_priority": {
"priority": 100
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": {
"number_of_shards": 1
},
"forcemerge": {
"max_num_segments": 1
},
"set_priority": {
"priority": 50
}
}
},
"cold": {
"min_age": "30d",
"actions": {
"set_priority": {
"priority": 0
}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}
Log Levels richtig nutzen
LOG LEVELS (nach Schwere)
┌─────────┬────────────────────────────────────────────────────┐
│ FATAL │ System kann nicht weiterlaufen, sofortige Aktion │
├─────────┼────────────────────────────────────────────────────┤
│ ERROR │ Fehler, aber System läuft weiter │
├─────────┼────────────────────────────────────────────────────┤
│ WARN │ Unerwartete Situation, kein Fehler │
├─────────┼────────────────────────────────────────────────────┤
│ INFO │ Wichtige Geschäftsereignisse │
├─────────┼────────────────────────────────────────────────────┤
│ DEBUG │ Detaillierte Entwickler-Informationen │
├─────────┼────────────────────────────────────────────────────┤
│ TRACE │ Sehr detailliert, nur für Debugging │
└─────────┴────────────────────────────────────────────────────┘
PRODUCTION: INFO und höher
DEBUGGING: DEBUG und höher
NIEMALS: TRACE in Production (zu viel Volume)
// Beispiele
logger.info('Order placed', { orderId, userId, amount }); // ✅
logger.debug('SQL Query executed', { query, params, duration }); // ✅
logger.error('Payment failed', { orderId, error: err.message }); // ✅
logger.debug(JSON.stringify(largeObject)); // ❌ Zu viel Daten
Request Tracing
// Jeder Request bekommt eine eindeutige ID
// Express Middleware
const { v4: uuid } = require('uuid');
app.use((req, res, next) => {
req.requestId = req.headers['x-request-id'] || uuid();
res.setHeader('x-request-id', req.requestId);
// Logger mit Request-Kontext
req.logger = logger.child({ requestId: req.requestId });
next();
});
// In Route Handlers
app.get('/api/users/:id', async (req, res) => {
req.logger.info('Fetching user', { userId: req.params.id });
try {
const user = await userService.findById(req.params.id);
req.logger.info('User found', { userId: user.id });
res.json(user);
} catch (err) {
req.logger.error('Failed to fetch user', {
userId: req.params.id,
error: err.message
});
res.status(500).json({ error: 'Internal error' });
}
});
// Alle Logs mit gleicher requestId gehören zusammen
// → Einfaches Debugging von Request-Flows
💡 Best Practices:
1. Immer strukturierte (JSON) Logs verwenden
2. Request IDs für Tracing durchreichen
3. Sensitive Daten NIEMALS loggen
4. Log Levels konsequent nutzen
5. Index Lifecycle für Speicher-Management
2. Request IDs für Tracing durchreichen
3. Sensitive Daten NIEMALS loggen
4. Log Levels konsequent nutzen
5. Index Lifecycle für Speicher-Management
Weitere Informationen
- 📊 Infrastructure Monitoring
- 🔍 Full-Text Search
- 📈 Enjyn Status Monitor - Überwachung Ihrer Services