StatefulSet is the workload API object used to manage stateful applications. StatefulSets are beta in 1.8.
Kafka and Zookeeper are two of the motivating examples for StatefulSets in Kubernetes. Being stateful applications, we'll need disks to store the data on. GKE can help us allocate disks and compute for brokers even as we scale the service up.
First we need the right Google Container Engine cluster.
Since this is fairly basic and meant for development we'll
use a three node GKE cluster. This allows us to run a 3-node
Zookeeper ensemble and 3 kafka brokers. There should be one
Zookeeper and one Kafka Broker on each node. We'll modify
the node type to n1-standard-2
which will be able to
handle everything's memory requirements, etc.
gcloud container clusters create test-cluster \--machine-type "n1-standard-2" \--cluster-version "1.8.3-gke.0"
We'll use the Zookeeper and Kafka example configs to start. First, the Zookeeper config:
➜ kubectl apply -f 10-zookeeper.ymlservice "zk-svc" createdconfigmap "zk-cm" createdpoddisruptionbudget "zk-pdb" createdstatefulset "zk" created
➜ kubectl apply -f 20-kafka-brokers.ymlservice "kafka-svc" createdpoddisruptionbudget "kafka-pdb" createdstatefulset "kafka" created
Note that if you spin them up too fast sequentially, kafka
will Error
, then CrashLoopBackOff
until it can connect
to Zookeeper.
NAME READY STATUS RESTARTS AGEkafka-0 0/1 Error 1 46szk-0 1/1 Running 0 1mzk-1 1/1 Running 0 1mzk-2 0/1 ContainerCreating 0 25s
A healthy cluster:
NAME READY STATUS RESTARTS AGEkafka-0 1/1 Running 3 2mkafka-1 1/1 Running 0 1mkafka-2 1/1 Running 0 47szk-0 1/1 Running 0 3mzk-1 1/1 Running 0 3mzk-2 1/1 Running 0 2m
We can use the following command to interactively choose a kafka container to exec into.
kubectl get pods --no-headers | fzf | awk '{print $1}' | xargs -o -I % kubectl exec -it % bash
and then ls
to see that we're in.
kafka@kafka-0:/$ lsKEYS boot etc lib media opt root sbin sys usrbin dev home lib64 mnt proc run srv tmp varkafka@kafka-0:/$
We'll need to create a new topic (so run this in the
container we just exec'd into). The most interesting piece
of this is that we're pointing to the zookeeper nodes using
the cluster addresses (such as
zk-0.zk-svc.default.svc.cluster.local:2181
). This breaks
down into the stateful node identifier (zk-0
), the service
name (zk-svc
), and the network/namespace defaults (as well
as the port).
kafka-topics.sh --create \--topic test \--zookeeper zk-0.zk-svc.default.svc.cluster.local:2181,zk-1.zk-svc.default.svc.cluster.local:2181,zk-2.zk-svc.default.svc.cluster.local:2181 \--partitions 3 \--replication-factor 2
and run a simple console consumer using the
kafka-console-consumer.sh
script.
kafka-console-consumer.sh --topic test --bootstrap-server localhost:9093
then exec into the same container again and run the producer so we can send messages to the consumer.
> kafka-console-producer.sh --topic test --broker-list localhost:9093helloI like kafkagoodbye
Now we can check out the ISRs and partitions
> kafka-topics.sh --describe --topic test \--zookeeper zk-0.zk-svc.default.svc.cluster.local:2181,zk-1.zk-svc.default.svc.cluster.local:2181,zk-2.zk-svc.default.svc.cluster.local:2181Topic:test PartitionCount:3 ReplicationFactor:2 Configs:Topic: test Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1Topic: test Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
For Zookeeper we created the following objects:
For Kafka Brokers we created very similar objects:
Both of these look pretty similar. They each use a
StatefulSet
, Service
, and PodDisruptionBudget
.
Zookeeper also uses a ConfigMap
instead of a file, etc
The Zookeeper Service
is the first object we see in the
yaml file.
apiVersion: v1kind: Servicemetadata:name: zk-svclabels:app: zk-svcspec:ports:- port: 2888name: server- port: 3888name: leader-electionclusterIP: Noneselector:app: zk
This block says we're creating a Service
with a name of
zk-svc
(remember the URLs we had to use to access
Zookeeper earlier). We've removed the clusterIP
which
makes this a Headless Service. Since
Zookeeper and Kafka handle their own load balancing, etc,
using a headless service lets us opt out of Kubernetes' load
balancing and service discovery, letting Zookeeper/Kafka
handle it on their own. Also notice that we've exposed the
appropriate ports for Zookeeper leader election and server
access.
Next we define a ConfigMap
, which contains configuration
options for Zookeeper.
apiVersion: v1kind: ConfigMapmetadata:name: zk-cmdata:jvm.heap: "1G"tick: "2000"init: "10"sync: "5"client.cnxns: "60"snap.retain: "3"purge.interval: "0"
Next we create a PodDisruptionBudget
that lets us ensure
that a minimum of 2 nodes will be available at any given
time due to voluntary disruptions like upgrades. It's
important to note that this does not cover nodes crashing on
their own.
apiVersion: policy/v1beta1kind: PodDisruptionBudgetmetadata:name: zk-pdbspec:selector:matchLabels:app: zkminAvailable: 2
Ah yes, the central dish to our exploration, the StatefulSet.
We set the affinity for the Zookeeper pod to try to place itself on a node that doesn't already have a Zookeeper node.
affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: "app"operator: Invalues:- zktopologyKey: "kubernetes.io/hostname"
The container specification is fairly readable if you're
familiar with running containers. There's the start command,
env vars, and resourcing. We then get to the readiness and
liveness probes, which use a custom shell script to ask the
cluster if it's ok (using ruok
).
readinessProbe:exec:command:- "zkOk.sh"initialDelaySeconds: 10timeoutSeconds: 5livenessProbe:exec:command:- "zkOk.sh"initialDelaySeconds: 10timeoutSeconds: 5
The volume mounts allocate a volume named datadir
and
claim 10Gi
of storage. This is a generic way to "claim"
storage and translates to a
gcePersistentDisk
on GKE.
volumeMounts:- name: datadirmountPath: /var/lib/zookeeper
volumeClaimTemplates:- metadata:name: datadirspec:accessModes: ["ReadWriteOnce"]resources:requests:storage: 10Gi
The Kafka yaml file has basically the same components so I won't go over it here.
Congrats, you're running Kafka on GKE. This should be good enough for any testing you'd want to run. In the next post in this series we'll go over how to use the Confluent Platform instead of the containers specified in these yaml files.
---apiVersion: v1kind: Servicemetadata:name: zk-svclabels:app: zk-svcspec:ports:- port: 2888name: server- port: 3888name: leader-electionclusterIP: Noneselector:app: zk---apiVersion: v1kind: ConfigMapmetadata:name: zk-cmdata:jvm.heap: "1G"tick: "2000"init: "10"sync: "5"client.cnxns: "60"snap.retain: "3"purge.interval: "0"---apiVersion: policy/v1beta1kind: PodDisruptionBudgetmetadata:name: zk-pdbspec:selector:matchLabels:app: zkminAvailable: 2---apiVersion: apps/v1beta1kind: StatefulSetmetadata:name: zkspec:serviceName: zk-svcreplicas: 3template:metadata:labels:app: zkspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: "app"operator: Invalues:- zktopologyKey: "kubernetes.io/hostname"containers:- name: k8szkimagePullPolicy: Alwaysimage: gcr.io/google_samples/k8szk:v2resources:requests:memory: "2Gi"cpu: "500m"ports:- containerPort: 2181name: client- containerPort: 2888name: server- containerPort: 3888name: leader-electionenv:- name: ZK_REPLICASvalue: "3"- name: ZK_HEAP_SIZEvalueFrom:configMapKeyRef:name: zk-cmkey: jvm.heap- name: ZK_TICK_TIMEvalueFrom:configMapKeyRef:name: zk-cmkey: tick- name: ZK_INIT_LIMITvalueFrom:configMapKeyRef:name: zk-cmkey: init- name: ZK_SYNC_LIMITvalueFrom:configMapKeyRef:name: zk-cmkey: tick- name: ZK_MAX_CLIENT_CNXNSvalueFrom:configMapKeyRef:name: zk-cmkey: client.cnxns- name: ZK_SNAP_RETAIN_COUNTvalueFrom:configMapKeyRef:name: zk-cmkey: snap.retain- name: ZK_PURGE_INTERVALvalueFrom:configMapKeyRef:name: zk-cmkey: purge.interval- name: ZK_CLIENT_PORTvalue: "2181"- name: ZK_SERVER_PORTvalue: "2888"- name: ZK_ELECTION_PORTvalue: "3888"command:- sh- -c- zkGenConfig.sh && zkServer.sh start-foregroundreadinessProbe:exec:command:- "zkOk.sh"initialDelaySeconds: 10timeoutSeconds: 5livenessProbe:exec:command:- "zkOk.sh"initialDelaySeconds: 10timeoutSeconds: 5volumeMounts:- name: datadirmountPath: /var/lib/zookeepersecurityContext:runAsUser: 1000fsGroup: 1000volumeClaimTemplates:- metadata:name: datadirspec:accessModes: ["ReadWriteOnce"]resources:requests:storage: 10Gi
---apiVersion: v1kind: Servicemetadata:name: kafka-svclabels:app: kafkaspec:ports:- port: 9093name: serverclusterIP: Noneselector:app: kafka---apiVersion: policy/v1beta1kind: PodDisruptionBudgetmetadata:name: kafka-pdbspec:selector:matchLabels:app: kafkaminAvailable: 2---apiVersion: apps/v1beta1kind: StatefulSetmetadata:name: kafkaspec:serviceName: kafka-svcreplicas: 3template:metadata:labels:app: kafkaspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: "app"operator: Invalues:- kafkatopologyKey: "kubernetes.io/hostname"podAffinity:preferredDuringSchedulingIgnoredDuringExecution:- weight: 1podAffinityTerm:labelSelector:matchExpressions:- key: "app"operator: Invalues:- zktopologyKey: "kubernetes.io/hostname"terminationGracePeriodSeconds: 300containers:- name: k8skafkaimagePullPolicy: Alwaysimage: gcr.io/google_samples/k8skafka:v1resources:requests:memory: "1Gi"cpu: 500mports:- containerPort: 9093name: servercommand:- sh- -c- "exec kafka-server-start.sh/opt/kafka/config/server.properties --overridebroker.id=${HOSTNAME##*-} \--override listeners=PLAINTEXT://:9093 \--overridezookeeper.connect=zk-0.zk-svc.default.svc.cluster.local:2181,zk-1.zk-svc.default.svc.cluster.local:2181,zk-2.zk-svc.default.svc.cluster.local:2181\--override log.dir=/var/lib/kafka \--override auto.create.topics.enable=true \--override auto.leader.rebalance.enable=true \--override background.threads=10 \--override compression.type=producer \--override delete.topic.enable=false \--overrideleader.imbalance.check.interval.seconds=300 \--overrideleader.imbalance.per.broker.percentage=10 \--overridelog.flush.interval.messages=9223372036854775807\--overridelog.flush.offset.checkpoint.interval.ms=60000\--overridelog.flush.scheduler.interval.ms=9223372036854775807\--override log.retention.bytes=-1 \--override log.retention.hours=168 \--override log.roll.hours=168 \--override log.roll.jitter.hours=0 \--override log.segment.bytes=1073741824 \--override log.segment.delete.delay.ms=60000 \--override message.max.bytes=1000012 \--override min.insync.replicas=1 \--override num.io.threads=8 \--override num.network.threads=3 \--override num.recovery.threads.per.data.dir=1\--override num.replica.fetchers=1 \--override offset.metadata.max.bytes=4096 \--override offsets.commit.required.acks=-1 \--override offsets.commit.timeout.ms=5000 \--override offsets.load.buffer.size=5242880 \--overrideoffsets.retention.check.interval.ms=600000 \--override offsets.retention.minutes=1440 \--override offsets.topic.compression.codec=0 \--override offsets.topic.num.partitions=50 \--override offsets.topic.replication.factor=3\--overrideoffsets.topic.segment.bytes=104857600 \--override queued.max.requests=500 \--overridequota.consumer.default=9223372036854775807 \--overridequota.producer.default=9223372036854775807 \--override replica.fetch.min.bytes=1 \--override replica.fetch.wait.max.ms=500 \--overridereplica.high.watermark.checkpoint.interval.ms=5000\--override replica.lag.time.max.ms=10000 \--overridereplica.socket.receive.buffer.bytes=65536 \--override replica.socket.timeout.ms=30000 \--override request.timeout.ms=30000 \--override socket.receive.buffer.bytes=102400\--override socket.request.max.bytes=104857600\--override socket.send.buffer.bytes=102400 \--override unclean.leader.election.enable=true\--override zookeeper.session.timeout.ms=6000 \--override zookeeper.set.acl=false \--override broker.id.generation.enable=true \--override connections.max.idle.ms=600000 \--override controlled.shutdown.enable=true \--override controlled.shutdown.max.retries=3 \--overridecontrolled.shutdown.retry.backoff.ms=5000 \--override controller.socket.timeout.ms=30000\--override default.replication.factor=1 \--overridefetch.purgatory.purge.interval.requests=1000 \--override group.max.session.timeout.ms=300000\--override group.min.session.timeout.ms=6000 \--overrideinter.broker.protocol.version=0.10.2-IV0 \--override log.cleaner.backoff.ms=15000 \--overridelog.cleaner.dedupe.buffer.size=134217728 \--overridelog.cleaner.delete.retention.ms=86400000 \--override log.cleaner.enable=true \--overridelog.cleaner.io.buffer.load.factor=0.9 \--override log.cleaner.io.buffer.size=524288 \--overridelog.cleaner.io.max.bytes.per.second=1.7976931348623157E308\--override log.cleaner.min.cleanable.ratio=0.5\--override log.cleaner.min.compaction.lag.ms=0\--override log.cleaner.threads=1 \--override log.cleanup.policy=delete \--override log.index.interval.bytes=4096 \--override log.index.size.max.bytes=10485760 \--overridelog.message.timestamp.difference.max.ms=9223372036854775807\--overridelog.message.timestamp.type=CreateTime \--override log.preallocate=false \--overridelog.retention.check.interval.ms=300000 \--override max.connections.per.ip=2147483647 \--override num.partitions=1 \--overrideproducer.purgatory.purge.interval.requests=1000\--override replica.fetch.backoff.ms=1000 \--override replica.fetch.max.bytes=1048576 \--overridereplica.fetch.response.max.bytes=10485760 \--override reserved.broker.max.id=1000 "env:- name: KAFKA_HEAP_OPTSvalue: "-Xmx512M -Xms512M"- name: KAFKA_OPTSvalue: "-Dlogging.level=INFO"volumeMounts:- name: datadirmountPath: /var/lib/kafkareadinessProbe:exec:command:- sh- -c- "/opt/kafka/bin/kafka-broker-api-versions.sh--bootstrap-server=localhost:9093"securityContext:runAsUser: 1000fsGroup: 1000volumeClaimTemplates:- metadata:name: datadirspec:accessModes: ["ReadWriteOnce"]resources:requests:storage: 10Gi