ata, and batches records.
2. Transport: Apache Kafka acts as a durable, partitioned buffer. Fluent Bit pushes to Kafka using async producers with configurable linger, batch size, and retry policies. Kafka decouples ingestion from storage, enabling independent scaling.
3. Storage: OpenSearch (or Elasticsearch-compatible engines) consumes from Kafka via connectors. Index templates enforce schema, and ILM policies transition indices through hot β warm β cold β delete tiers based on age and size.
4. Consumption: Grafana provides dashboards; OpenSearch Dashboards enables ad-hoc search; Alertmanager integrates with OpenSearch for threshold-based alerting. All queries are routed through a query cache and field data optimization layer.
Component Configurations
1. Fluent Bit (Collection)
[SERVICE]
Flush 5
Daemon Off
Log_Level info
Parsers_File parsers.conf
[INPUT]
Name tail
Path /var/log/containers/*.log
Parser json
Tag kube.*
Refresh_Interval 5
Mem_Buf_Limit 50MB
Skip_Long_Lines On
[FILTER]
Name kubernetes
Match kube.*
Kube_URL https://kubernetes.default.svc:443
Kube_Tag_Prefix kube.var.log.containers.
Merge_Log On
Keep_Log Off
Buffer_Size 0
[OUTPUT]
Name kafka
Match *
Brokers kafka-1:9092,kafka-2:9092,kafka-3:9092
Topics logs-ingestion
Timestamp_Key @timestamp
Retry_Limit 5
Queue_Size 10000
Batch_Size 1MB
Linger_Ms 1000
Compression lz4
Key design choices: Mem_Buf_Limit prevents OOM during spikes. Kafka output uses LZ4 compression to reduce network payload. Linger_Ms and Batch_Size optimize throughput vs. latency trade-offs.
2. Kafka (Transport)
# docker-compose snippet for Kafka broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CFG_LOG_RETENTION_HOURS: 72
KAFKA_CFG_LOG_SEGMENT_BYTES: 1073741824
KAFKA_CFG_LOG_RETENTION_CHECK_INTERVAL_MS: 300000
KAFKA_CFG_NUM_PARTITIONS: 12
KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_CFG_MIN_IN_SYNC_REPLICAS: 2
KAFKA_CFG_LOG_CLEANER_ENABLE: true
Key design choices: 12 partitions enable parallel consumption. MIN_IN_SYNC_REPLICAS=2 ensures durability without sacrificing availability. 72-hour retention provides a safety net for storage pipeline failures.
3. OpenSearch Index Template & ILM Policy
PUT _index_template/logs_v1
{
"index_patterns": ["logs-*"],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"index.lifecycle.name": "logs-ilm-policy",
"index.lifecycle.rollover_alias": "logs-current"
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"host": { "type": "keyword" },
"service": { "type": "keyword" },
"level": { "type": "keyword" },
"message": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } },
"trace_id": { "type": "keyword" },
"k8s": {
"properties": {
"namespace": { "type": "keyword" },
"pod": { "type": "keyword" },
"container": { "type": "keyword" }
}
}
}
}
}
}
PUT _ilm/policy/logs-ilm-policy
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": { "max_size": "50gb", "max_age": "1d" },
"set_priority": { "priority": 100 }
}
},
"warm": {
"min_age": "3d",
"actions": {
"shrink": { "number_of_shards": 1 },
"forcemerge": { "max_num_segments": 1 },
"set_priority": { "priority": 50 }
}
},
"cold": {
"min_age": "14d",
"actions": {
"searchable_snapshot": { "snapshot_repository": "s3-cold" },
"set_priority": { "priority": 0 }
}
},
"delete": {
"min_age": "90d",
"actions": { "delete": {} }
}
}
}
}
Key design choices: rollover prevents oversized shards. forcemerge in warm phase reduces segment count for faster queries. searchable_snapshot in cold phase moves data to S3 while retaining query capability, cutting storage costs by ~70%.
4. Data Flow Validation
# Generate test logs
echo '{"@timestamp":"2024-06-15T10:00:00Z","host":"node-1","service":"auth","level":"info","message":"User login successful","trace_id":"abc123"}' | fluent-bit -c /etc/fluent-bit/fluent-bit.conf
# Verify Kafka topic
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic logs-ingestion --from-beginning --max-messages 1
# Query OpenSearch
curl -X GET "localhost:9200/logs-*/_search" -H 'Content-Type: application/json' -d'
{
"query": { "match": { "service": "auth" } },
"sort": [{ "@timestamp": "desc" }],
"size": 5
}'
Pitfall Guide
1. Blind Ingestion Without Sampling
Problem: Shipping every debug line from high-throughput services inflates storage costs and degrades query performance.
Mitigation: Implement dynamic sampling at the collector level. Drop DEBUG logs in production, sample high-cardinality events, and use grep/lua filters in Fluent Bit to drop known noise patterns before they enter Kafka.
2. Ignoring Backpressure & Buffer Overflow
Problem: When storage lags, collectors fill memory buffers and crash, causing data loss or application throttling.
Mitigation: Configure Mem_Buf_Limit and Storage.Type filesystem in Fluent Bit. Set Kafka queue.buffering.max.messages and enable retry.backoff.ms. Monitor output.dropped and input.bytes metrics to trigger auto-scaling or alerting.
3. Poor Shard & Partition Strategy
Problem: Too many small shards waste cluster resources; too few large shards cause hotspots and slow queries.
Mitigation: Align shard count with data volume and query concurrency. Use 1-3 primary shards per 50GB of data. Leverage ILM shrink action to reduce shard count in warm phase. Avoid time-based indices without rollover; use alias-based rollover instead.
4. PII Leakage & Compliance Gaps
Problem: Logs accidentally contain emails, tokens, or personal data, violating GDPR/HIPAA and exposing legal risk.
Mitigation: Deploy a pre-ingestion PII redaction filter using regex or ML-based detection. Mask sensitive fields in Fluent Bit or Kafka Streams. Maintain an audit log of data access and enforce field-level security in OpenSearch via index-level permissions.
5. Single Point of Failure in Transport
Problem: A lone Kafka broker or Fluent Bit coordinator becomes a bottleneck. If it fails, the entire pipeline halts.
Mitigation: Deploy Kafka as a 3-node cluster with replication.factor=3. Use Fluent Bit DaemonSet mode (not centralized) to distribute collection. Implement dead-letter queues (DLQ) in Kafka for failed records, and monitor consumer lag with Prometheus + Alertmanager.
6. Neglecting Schema Evolution
Problem: Application updates change log structure, causing index mapping conflicts or query failures.
Mitigation: Use dynamic: false in OpenSearch mappings to prevent automatic field creation. Maintain a schema registry or versioned index templates. Use ignore_malformed and coerce settings where appropriate. Version indices (logs-v1, logs-v2) and route traffic via aliases during migration.
Production Bundle
Checklist
| Phase | Action Item | Verification |
|---|
| Pre-Deployment | Size CPU/memory for Fluent Bit based on log volume | kubectl top pods -n logging |
| Configure network policies for Kafka/OpenSearch | kubectl get netpol |
| Set up TLS/mTLS for all pipeline components | openssl s_client -connect kafka:9092 |
| Define retention & compliance boundaries | Documented ILM policy + PII redaction rules |
| Runtime | Monitor consumer lag & output drops | Prometheus: kafka_consumer_group_lag |
| Validate index rollover & ILM transitions | GET _ilm/explain/logs-* |
| Test failover by killing a Kafka broker | Pipeline continues; no data loss |
| Audit query performance & cache hit ratio | GET _nodes/stats/indices/query_cache |
| Post-Deployment | Run load test with synthetic log generator | Verify P95 latency < 2s |
| Document runbook for pipeline recovery | Stored in wiki/Confluence |
| Schedule quarterly cost & storage review | Compare cold tier savings vs. baseline |
Decision Matrix
| Criterion | Fluent Bit + Kafka + OpenSearch | Fluentd + Kafka + ELK | Vector + Pulsar + Loki | Splunk Cloud |
|---|
| Scale | β
β
β
β
β
(partitioned, async) | β
β
β
β
β (Ruby-based, higher overhead) | β
β
β
β
β
(Rust, low footprint) | β
β
β
ββ (vendor limits) |
| Cost | β
β
β
β
β (open source, self-hosted) | β
β
β
ββ (higher resource usage) | β
β
β
β
β
(efficient, label-based) | β
ββββ (license-heavy) |
| Complexity | β
β
β
ββ (requires Kafka/OS ops) | β
β
β
ββ (similar complexity) | β
β
β
β
β (simpler storage model) | β
β
β
β
β
(managed) |
| Query Flexibility | β
β
β
β
β
(full-text, aggregations) | β
β
β
β
β
(identical to OS) | β
β
β
ββ (logql, limited joins) | β
β
β
β
β (SPL, mature) |
| Cloud-Native Fit | β
β
β
β
β
(K8s, eBPF, sidecar ready) | β
β
β
β
β (DaemonSet supported) | β
β
β
β
β
(eBPF, vector-native) | β
β
βββ (agent-heavy) |
| Best For | Enterprise, compliance, high throughput | Legacy ELK migrations | Cost-sensitive, Kubernetes-native | Non-technical teams, budget |
Config Template (Consolidated)
# fluent-bit-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: fluent-bit-config
namespace: logging
data:
fluent-bit.conf: |
[SERVICE]
Flush 5
Log_Level info
Parsers_File parsers.conf
[INPUT]
Name tail
Path /var/log/containers/*.log
Parser json
Tag kube.*
Refresh_Interval 5
Mem_Buf_Limit 50MB
Skip_Long_Lines On
[FILTER]
Name kubernetes
Match kube.*
Kube_URL https://kubernetes.default.svc:443
Merge_Log On
Keep_Log Off
[OUTPUT]
Name kafka
Match *
Brokers kafka.logging.svc:9092
Topics logs-ingestion
Timestamp_Key @timestamp
Retry_Limit 5
Queue_Size 10000
Batch_Size 1MB
Linger_Ms 1000
Compression lz4
Quick Start
- Deploy Kafka: Use Strimzi Operator or Helm chart. Create a 3-broker cluster with 12 partitions and
replication.factor=3. Verify connectivity: kafka-broker-api-versions.sh --bootstrap-server localhost:9092.
- Deploy OpenSearch: Use the official Helm chart. Apply the index template and ILM policy via
curl or API. Ensure JVM heap is set to 50% of node memory (max 31GB).
- Install Fluent Bit: Deploy as a DaemonSet in
logging namespace. Mount /var/log/containers and /var/lib/kubelet/pods as read-only. Apply the ConfigMap above.
- Generate Test Traffic: Run
kubectl run log-generator --image=busybox --command -- /bin/sh -c "while true; do echo '{\"@timestamp\":\"$(date -u +%Y-%m-%dT%H:%M:%SZ)\",\"service\":\"test\",\"level\":\"info\",\"message\":\"heartbeat\"}'; sleep 1; done".
- Validate Pipeline:
- Check Kafka:
kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic logs-ingestion --from-beginning --max-messages 1
- Query OpenSearch:
GET logs-*/_search { "query": { "match": { "service": "test" } } }
- Monitor:
kubectl top pods -n logging, kubectl logs -l app=fluent-bit -n logging
Once validated, enable ILM transitions, configure Grafana dashboards, and set up Alertmanager rules for consumer lag > 1000 or output drop rate > 0.1%. The pipeline is now production-ready.
Log aggregation is no longer a utility task; it is a foundational data engineering discipline. By decoupling collection, transport, storage, and consumption, enforcing schema discipline, and automating lifecycle management, organizations transform logs from a cost center into a strategic observability asset. The architecture outlined here balances performance, resilience, and cost, providing a repeatable blueprint for modern infrastructure at scale.