In the previous post we went through using StatefulSets to deploy Kafka and Zookeeper on GKE. One problem was that we used an effectively "random" image. Confluent provides a set of images for deploying Kafka, Zookeeper, and more that is continually updated and supported so we'd like to move to those images.
We're basically going to be combining the already-working object definitions from the last post with the quickstart and other reference documentation from Confluent. Let's reuse the service name and other fairly inconsequential items.
The HOSTNAME
for a StatefulSet is postfixed with a number
(zk-0
, zk-1
, ...) so we'll grab that in our command
and set the ZOOKEEPER_SERVER_ID
by chopping off the zk-
prefix. Hopefully in the future, we'll be able to use the
downward api
with metadata.ordinal
or similar since overriding
command
is pretty hacky.
image: confluentinc/cp-zookeeper:3.3.1command:- "bash"- "-c"- "ZOOKEEPER_SERVER_ID=$((${HOSTNAME:3}+1)) &&/etc/confluent/docker/run"
We also set up the ZOOKEEPER_SERVERS
config with the full
entries for the three zk pods for leader election and
quorum. Remember that we're using a headless service so that
Zookeeper can handle it's own routing.
data:servers: "zk-0.zk-svc.default.svc.cluster.local:2888:3888;zk-1.zk-svc.default.svc.cluster.local:2888:3888;zk-2.zk-svc.default.svc.cluster.local:2888:3888"
That and some env var changes gets us the Zookeeper yaml.
---apiVersion: v1kind: Servicemetadata:name: zk-svclabels:app: zk-svcspec:ports:- port: 2888name: server- port: 3888name: leader-electionclusterIP: Noneselector:app: zk---apiVersion: v1kind: ConfigMapmetadata:name: zk-cmdata:tick: "2000"servers: "zk-0.zk-svc.default.svc.cluster.local:2888:3888;zk-1.zk-svc.default.svc.cluster.local:2888:3888;zk-2.zk-svc.default.svc.cluster.local:2888:3888"---apiVersion: policy/v1beta1kind: PodDisruptionBudgetmetadata:name: zk-pdbspec:selector:matchLabels:app: zkminAvailable: 2---apiVersion: apps/v1beta1kind: StatefulSetmetadata:name: zkspec:serviceName: zk-svcreplicas: 3podManagementPolicy: Paralleltemplate:metadata:labels:app: zkspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: "app"operator: Invalues:- zktopologyKey: "kubernetes.io/hostname"containers:- name: k8szkimagePullPolicy: Alwaysimage: confluentinc/cp-zookeeper:3.3.1command:- "bash"- "-c"- "ZOOKEEPER_SERVER_ID=$((${HOSTNAME:3}+1)) &&/etc/confluent/docker/run"resources:requests:memory: "2Gi"cpu: "500m"ports:- containerPort: 2181name: client- containerPort: 2888name: server- containerPort: 3888name: leader-electionenv:- name: ZOOKEEPER_TICK_TIMEvalueFrom:configMapKeyRef:name: zk-cmkey: tick- name: ZOOKEEPER_SYNC_LIMITvalueFrom:configMapKeyRef:name: zk-cmkey: tick- name: ZOOKEEPER_SERVERSvalueFrom:configMapKeyRef:name: zk-cmkey: servers- name: ZOOKEEPER_CLIENT_PORTvalue: "2182"# SERVER_ID is required but not used as this value- name: ZOOKEEPER_SERVER_IDvalueFrom:fieldRef:fieldPath: metadata.namevolumeMounts:- name: datadirmountPath: /var/lib/zookeepervolumeClaimTemplates:- metadata:name: datadirspec:accessModes: ["ReadWriteOnce"]resources:requests:storage: 10Gi
We'll modify the kafka configmap to connect to zookeeper at the appropriate locations.
apiVersion: v1kind: ConfigMapmetadata:name: kafka-cmdata:advertised.listeners: PLAINTEXT://kafka-0.kafka-svc.default.svc.cluster.local:9093connect: zk-0.zk-svc.default.svc.cluster.local:2181,zk-1.zk-svc.default.svc.cluster.local:2181,zk-2.zk-svc.default.svc.cluster.local:2181
We'll also modify the command
to set
KAFKA_ADVERTISED_LISTENERS
and KAFKA_BROKER_ID
. Both
options use the pull the prefix off the HOSTNAME
to leave
us with the pod's number (${HOSTNAME##*-}
).
KAFKA_ADVERTISED_LISTENERS
need to be set to this pod's
host and port, while the KAFKA_BROKER_ID
needs to be a
unique number. Luckily for us, we have Kubernetes allocating
unique pod names with number prefixes (because statefulset).
These look like kafka-0
, kafka-1
, etc.
image: confluentinc/cp-kafka:3.3.1---command:- sh- -c- "KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-${HOSTNAME##*-}.kafka-svc.default.svc.cluster.local:9093KAFKA_BROKER_ID=${HOSTNAME##*-}/etc/confluent/docker/run"
---apiVersion: v1kind: Servicemetadata:name: kafka-svclabels:app: kafkaspec:ports:- port: 9093name: serverclusterIP: Noneselector:app: kafka---apiVersion: policy/v1beta1kind: PodDisruptionBudgetmetadata:name: kafka-pdbspec:selector:matchLabels:app: kafkaminAvailable: 2---apiVersion: v1kind: ConfigMapmetadata:name: kafka-cmdata:advertised.listeners: PLAINTEXT://kafka-0.kafka-svc.default.svc.cluster.local:9093#PLAINTEXT://:9093connect: zk-0.zk-svc.default.svc.cluster.local:2181,zk-1.zk-svc.default.svc.cluster.local:2181,zk-2.zk-svc.default.svc.cluster.local:2181---apiVersion: apps/v1beta1kind: StatefulSetmetadata:name: kafkaspec:serviceName: kafka-svcreplicas: 3template:metadata:labels:app: kafkaspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: "app"operator: Invalues:- kafkatopologyKey: "kubernetes.io/hostname"podAffinity:preferredDuringSchedulingIgnoredDuringExecution:- weight: 1podAffinityTerm:labelSelector:matchExpressions:- key: "app"operator: Invalues:- zktopologyKey: "kubernetes.io/hostname"terminationGracePeriodSeconds: 300containers:- name: k8skafkaimagePullPolicy: Alwaysimage: confluentinc/cp-kafka:3.3.1resources:requests:memory: "1Gi"cpu: 500mports:- containerPort: 9093name: servercommand:- sh- -c- "KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-${HOSTNAME##*-}.kafka-svc.default.svc.cluster.local:9093KAFKA_BROKER_ID=${HOSTNAME##*-}/etc/confluent/docker/run"env:- name: KAFKA_HEAP_OPTSvalue: "-Xmx512M -Xms512M"- name: KAFKA_OPTSvalue: "-Dlogging.level=INFO"- name: KAFKA_ADVERTISED_LISTENERSvalueFrom:configMapKeyRef:name: kafka-cmkey: advertised.listeners- name: KAFKA_ZOOKEEPER_CONNECTvalueFrom:configMapKeyRef:name: kafka-cmkey: connectvolumeMounts:- name: datadirmountPath: /var/lib/kafka# readinessProbe:# exec:# command:# - sh# - -c# - "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9093"volumeClaimTemplates:- metadata:name: datadirspec:accessModes: ["ReadWriteOnce"]resources:requests:storage: 10Gi
We can now play with Kafka. First we'll exec into kafka-0
.
kubectl exec -it kafka-0 bash
Notably, we use a shell in kafka-0
to do this because the
headless services are only available in-cluster. First, we
set an environment variable to store the list of kafka
brokers. Then we use kafka-topics
and the pre-existing
$KAFKA_ZOOKEEPER_CONNECT
to create a topic. We can then
list all topics according to zookeeper, produce a few
messages, and consume a few messages.
> export KAFKA_BROKERS=kafka-0.kafka-svc.default.svc.cluster.local:9093;kafka-1.kafka-svc.default.svc.cluster.local:9093;kafka-2.kafka-svc.default.svc.cluster.local:9093> kafka-topics --create \--zookeeper $KAFKA_ZOOKEEPER_CONNECT \--replication-factor 3 \--partitions 3 \--topic testCreated topic "test".> kafka-topics --list --zookeeper $KAFKA_ZOOKEEPER_CONNECT__consumer_offsetstest> kafka-console-producer --broker-list $KAFKA_BROKERS --topic testThis is a messageThis is another message> kafka-console-consumer \--bootstrap-server $KAFKA_BROKERS \--topic test \--from-beginningThis is a messageThis is another message