Using `kcat` with `kubectl` for Kafka Diagnostics in Kubernetes
Categories:
Using kcat with kubectl for Kafka Diagnostics in Kubernetes
When diagnosing issues with Kafka running in a Kubernetes cluster, it can be useful to run diagnostic tools directly within the cluster. One such tool is kcat (formerly known as kafkacat), a versatile command-line utility to produce and consume Kafka messages. In this article, we’ll explain how to run kcat as a temporary pod in a Kubernetes cluster using the kubectl command. The kcat repo is here: https://github.com/edenhill/kcat
The Command:
|
|
Breaking it Down:
-
kubectl run: This is the primary command to run a particular container in a Kubernetes cluster. It creates a new pod with the specified container and runs it. -
-it: These flags stand for “interactive” and “tty”. They allow you to interact with the container (in this case,kcat) directly from the terminal. -
-n fx-fx: This specifies the namespace in which the pod will be created. Here, the namespace isfx-fx. -
kcat: This is the name of the pod that will be created. -
--image=edenhill/kcat:1.7.1: This specifies the Docker image to use for the pod. In this case, we’re using thekcatimage version1.7.1from theedenhillrepository. -
--rm: This flag ensures that the pod is automatically deleted once it’s terminated. This is useful for temporary diagnostic tasks as it cleans up resources after the task is completed. -
--: This delimiter is used to separate thekubectl runparameters from the arguments that will be passed to thekcattool. -
-L: This is akcatflag that lists all topics, partitions, and brokers in the cluster. -
-b fx-kafka.core: The-bflag specifies the broker (or brokers) to connect to. Here,kcatwill connect to the broker atfx-kafka.core. -
-G test2 minio: The-Gflag is used to specify a consumer group. In this case,kcatwill join the consumer grouptest2and consume messages from theminiotopic.
By running the above command, you’ll be able to interactively use kcat within your Kubernetes cluster to diagnose issues with your Kafka setup. This approach is particularly useful because it allows you to run diagnostic tools without needing to install them on your local machine or modify existing pods in the cluster.
More examples - Mileage can vary
The following are samples from the kcat page adapted to kubectl. Not all commands work as expected, but this could be due to me running a non-standard Kafka on a k3s cluster.
-
Balanced KafkaConsumer for Multiple Topics:
1kubectl run -it -n fx-fx kcat --image=edenhill/kcat:1.7.1 --rm -- -b mybroker -G minio ferris.events -
Produce Messages from System Logs with Compression:
1 2kubectl exec -it [YOUR_LOG_POD_NAME] -- tail -f /var/log/syslog | kubectl run -it -n fx-fx kcat --image=edenhill/kcat:1.7.1 --rm -- -b mybroker -t syslog -z snappy -
Read Messages from Kafka ‘syslog’ Topic:
1kubectl run -it -n fx-fx kcat --image=edenhill/kcat:1.7.1 --rm -- -b mybroker -t syslog -
Produce Messages from File: First, you’d need to copy the file to the pod or have it available via a shared volume. This requires pod to b
1 2kubectl cp myfile1.bin kcat-pod:/tmp/ kubectl exec -it kcat-pod -- kcat -P -b mybroker -t filedrop -p 0 /tmp/myfile1.bin -
Output Consumed Messages in JSON Envelope:
1kubectl run -it -n fx-fx kcat --image=edenhill/kcat:1.7.1 --rm -- -b mybroker -t syslog -J -
Decode Avro Message and Extract “age” Field:
1 2kubectl run -it -n fx-fx kcat --image=edenhill/kcat:1.7.1 --rm -- -b mybroker -t ledger -s value=avro -r http://schema-registry-url:8080 | jq .payload.age -
Output Consumed Messages According to Format String:
1 2kubectl run -it -n fx-fx kcat --image=edenhill/kcat:1.7.1 --rm -- -b mybroker -t syslog -f 'Topic %t[%p], offset: %o, key: %k, payload: %S bytes: %s\n' -
Metadata Listing:
1kubectl run -it -n fx-fx kcat --image=edenhill/kcat:1.7.1 --rm -- -L -b mybroker -
JSON Metadata Listing:
1 2kubectl run -it -n fx-fx kcat --image=edenhill/kcat:1.7.1 --rm -- -b mybroker -L -J | jq . -
Consume Messages Between Two Timestamps:
|
|
Remember, when using kubectl with kcat in this manner, you’re creating temporary pods to run these commands. Ensure you have the necessary permissions and resources in your Kubernetes cluster to execute these commands.