Post

Designing a Production-Grade Kafka + ELK Logging Pipeline on K8s - Part 1

Designing a Production-Grade Kafka + ELK Logging Pipeline on K8s - Part 1

Hello everyone!

In my first blog post, I explored how I integrated Kafka into the SIEM (ELK Stack) to streamline and scale log ingestion. The setup worked well using Docker Compose, allowing me to quickly spin up the ELK Stack and Kafka brokers to demonstrate a reliable log processing pipeline.

As the project grew, I began to notice a key limitation of Docker Compose — it doesn’t scale efficiently for distributed or production-grade workloads. Managing replicas, ensuring high availability, handling node failures, and dynamically scheduling containers all become challenging as the environment expands.

This led me to take the next logical step: migrating the entire architecture to Kubernetes (K8s), a powerful container orchestration platform that automates deployment, scaling, and management of containerized applications. Our goal is to deploy a robust log ingestion pipeline inside Kubernetes and integrate various log sources — whether they are network logs, application logs, or logs from monolithic or microservice applictions.

objectif

In this Part 1 post, I’ll walk you through:

  • Deploying the different KELK resources on Minikube

  • Validate & Testing the deployment by ingesting messages into Kafka and visualizing them in Grafana

Let’s get started!

Deployment

you need a running k8s controll node (i used minikube in this blog post)

let’s start by creating a namespace

1
kubectl create namespace kelk

Namespaces in Kubernetes help isolate resources logically. By creating a dedicated kelk namespace, we ensure that kelk resources do not interfere with other applications running in the cluster.

Kafka deployment

In newer versions, Apache Kafka no longer depends on ZooKeeper to manage cluster metadata or handle controller elections. Instead, it introduces a new operating mode called KRaft (Kafka Raft Metadata mode), which allows Kafka to operate independently of ZooKeeper, simplifying the architecture.

  • Set Up kafka Persistent Storage
1
2
3
4
5
6
7
8
9
10
11
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: kafka-pvc
  namespace: kelk
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 5Gi

Kafka stores logs and metadata on disk. A PersistentVolumeClaim (PVC) ensures that Kafka data persists even if the Pod is restarted, avoiding data loss.

  • Kafka Service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
apiVersion: v1
kind: Service
metadata:
  name: kafka-service
  namespace: kelk
spec:
  selector:
    app: kafka
  ports:
    - name: kafka
      protocol: TCP
      port: 9092
      targetPort: 9092
    - name: controller
      protocol: TCP
      port: 9093
      targetPort: 909

The service is optional for a single-node setup, but it becomes essential in a multi-node cluster for broker-to-broker and client communication.

  • Deploy Kafka as a StatefulSet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  namespace: kelk
spec:
  serviceName: "kafka-service"
  replicas: 1
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      containers:
        - name: kafka
          image: apache/kafka:latest
          ports:
            - containerPort: 9092
              name: kafka
            - containerPort: 9093
              name: controller
          volumeMounts:
            - name: kafka-storage
              mountPath: /var/lib/kafka/data
          env:
            - name: KAFKA_PROCESS_ROLES
              value: "broker,controller"
            - name: KAFKA_CONTROLLER_LISTENER_NAMES
              value: "CONTROLLER"
            - name: KAFKA_LISTENERS
              value: "PLAINTEXT://:9092,CONTROLLER://:9093"
            - name: KAFKA_CONTROLLER_QUORUM_VOTERS
              value: "1@kafka-0.kafka-service.kelk.svc.cluster.local:9093"
            - name: KAFKA_NODE_ID
              value: "1"
            - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
              value: "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"
            - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_ADVERTISED_LISTENERS
              value: "PLAINTEXT://kafka-0.kafka-service.kelk.svc.cluster.local:9092"
            - name: KAFKA_LOG_DIRS
              value: "/var/lib/kafka/data"
            - name: KAFKA_INTER_BROKER_LISTENER_NAME
              value: "PLAINTEXT"
      volumes:
        - name: kafka-storage
          persistentVolumeClaim:
            claimName: kafka-pvc

the yaml config file can be in a single file and apply them with :

1
k apply <file_name>.yaml

Elasicsearch Deployment

Elasticsearch serves as the database where Logstash writes logs ingested from Kafka topics.

  • Set Up ES Persistent Storage
1
2
3
4
5
6
7
8
9
10
11
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: es-pvc
  namespace: kelk
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 5Gi
  • Set Up ES Service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
apiVersion: v1
kind: Service
metadata:
  name: es-service
  namespace: kelk
spec:
  selector:
    app: es
  ports:
    - name: http
      protocol: TCP
      port: 9200
      targetPort: 9200
    - name: transport
      protocol: TCP
      port: 9300
      targetPort: 9300

This service is necessary for Grafana (kibana) or any other client to access Elasticsearch indexes.

  • Deploy Kafka as a StatefulSet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: es
  namespace: kelk
spec:
  serviceName: "es-service"
  replicas: 1
  selector:
    matchLabels:
      app: es
  template:
    metadata:
      labels:
        app: es
    spec:
      containers:
        - name: es
          image: docker.elastic.co/elasticsearch/elasticsearch:8.10.2
          ports:
            - containerPort: 9200
              name: http
            - containerPort: 9300
              name: transport
          volumeMounts:
            - name: es-storage
              mountPath: /usr/share/elasticsearch/data
          env:
            - name: cluster.name
              value: "kafka-cluster"
            - name: bootstrap.memory_lock
              value: "true"
            - name: network.host
              value: "0.0.0.0"
            - name: discovery.type
              value: "single-node"
            - name: xpack.security.enabled
              value: "false"
            - name: ES_JAVA_OPTS
              value: "-Xms512m -Xmx512m"
      volumes:
        - name: es-storage
          persistentVolumeClaim:
            claimName: es-pv

logstash Deployment

logstash is a very powerful tool, it has the capability to pull and collect data from a numbered sources, and apply different operation such as filtering or enrichiment and then forward the logs to many outputs. For our use case we’ll create a pipeline configuration to instruct logstash to read from a specific kafka topic and outputs it to elasticsearch, and for this we’ll use k8s ConfigMaps:

logstash-kafka.conf :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
input {
  kafka {
    bootstrap_servers => "kafka-0.kafka-service.kelk.svc.cluster.local:9092"
    topics => ["logs"]
  }
}

output {
  elasticsearch {
    hosts => ["http://es-0.es-service.kelk.svc.cluster.local:9200"]
    index => "logs"
    workers => 1
  }
}

logstash.yml :

1
2
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://es-0.es-service.kelk.svc.cluster.local:9200" ]
  • Create ConfigMaps
    1
    2
    
    kubectl create configmap logstash-config --from-file=logstash-kafka.conf -n kelk
    kubectl create configmap logstash-yaml --from-file=logstash.yml -n kelk
    
  • Set Up ES Persistent Storage
1
2
3
4
5
6
7
8
9
10
11
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: lgstch-pvc
  namespace: kelk
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 5Gi
  • Deploy logstash as a StatefulSet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: logstash-service
  namespace: kelk
spec:
  replicas: 1
  selector:
    matchLabels:
      app: logstash
  template:
    metadata:
      labels:
        app: logstash
    spec:
      containers:
        - name: logstash
          image: docker.elastic.co/logstash/logstash:8.10.2
          volumeMounts:
            - name: logstash-config-volume
              mountPath: /usr/share/logstash/pipeline/logstash.conf
              subPath: logstash-kafka.conf
            - name: logstash-yaml-volume
              mountPath: /usr/share/logstash/config/logstash.yml
              subPath: logstash.yml
      volumes:
        - name: logstash-config-volume
          configMap:
            name: logstash-config
        - name: logstash-yaml-volume
          configMap:
            name: logstash-yam

Deployment Validation

Check all resources:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ k get all -n kelk
NAME                     READY   STATUS    RESTARTS      AGE
pod/es-0                 1/1     Running   2 (13h ago)   18h
pod/kafka-0              1/1     Running   2 (13h ago)   20h
pod/logstash-service-0   1/1     Running   1 (13h ago)   16h

NAME                    TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)             AGE
service/es-service      ClusterIP   10.107.125.4   <none>        9200/TCP,9300/TCP   18h
service/kafka-service   ClusterIP   10.101.25.94   <none>        9092/TCP,9093/TCP   21h

NAME                                READY   AGE
statefulset.apps/es                 1/1     18h
statefulset.apps/kafka              1/1     20h
statefulset.apps/logstash-service   1/1     16h

Validating ConfigMaps and pvc

1
2
3
4
$ k get pvc -n kelk
NAME        STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS   VOLUMEATTRIBUTESCLASS   AGE
es-pvc      Bound    pvc-a35c2dba-2236-4128-ad47-9df39596f9a5   5Gi        RWO            standard       <unset>                 18h
kafka-pvc   Bound    pvc-e85e8b6b-59bc-4d68-bf99-62ef7e8e6ee6   5Gi        RWO            standard       <unset>                 20h
1
2
3
4
5
$ k get configmap -n kelk
NAME               DATA   AGE
kube-root-ca.crt   1      21h
logstash-config    1      17h
logstash-yaml      1      16h

Testing the Pipeline

before satrting to ingest and visualize data, let’s port forward the es service, this can be done by a simple command or be permanent by changing the service type to NodePort

Port Forward ES

1
k port-forward pod/es-0 --address 0.0.0.0 9200:9200 -n kelk

then in grafana make a new data source connection with you host IP address (localhost will work)

grafana

Producing kafka messages

first access to the pod :

1
2
3
4
5
k exec -it pod/kafka-0 -n kelk -- /bin/bash

kafka-0:/$ /opt/kafka/bin/kafka-console-producer.sh --topic logs --bootstrap-server localhost:9092
>Hi! This is a test           
>

let’s check grafana if the message was ingested right (go to explore and choose logs):

grafana

Conclusion

You now have a fully functional KELK log ingestion pipeline on k8s using Kafka, Elasticsearch, and Logstash. This setup can be extended for multiple topics, additional log sources, and scaled across nodes when moving from Minikube to a production cluster.

Stay tuned for Part 2, where we will integrate Filebeat as DaemonSet to collect pod/container logs and filtering or enrich our log data.

This post is licensed under CC BY 4.0 by the author.