to start with – I am a sort of newbie to Kubernetes and I might omit some fundamentals.
I have a working containerized app that is orchestrated with docker-compose (and works alright) and I am rewriting it to deploy into Kubernetes. I’ve converted it to K8s .yaml files via Kompose and modified it to some degree. I am struggling to set up a connection between a Python app and Kafka that are running on separate pods. The Python app constantly returns NoBrokersAvailable() error no matter what I try to apply – it’s quite obvious that it cannot connect to a broker. What am I missing? I’ve defined proper listeners and network policy. I am running it locally on Minikube with local Docker images registry.
The Python app connects to the following address:
KafkaProducer(bootstrap_servers='kafka-service.default.svc.cluster.local:9092')
kafka-deployment.yaml (the Dockerfile image is based on confluentinc/cp-kafka:6.2.0 with a topics setup script added to it):
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
kompose.cmd: C:ProgramDatachocolateylibkubernetes-komposetoolskompose.exe convert
kompose.version: 1.27.0 (b0ed6a2c9)
creationTimestamp: null
labels:
io.kompose.service: kafka
name: kafka-app
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: kafka
strategy: {}
template:
metadata:
annotations:
kompose.cmd: C:ProgramDatachocolateylibkubernetes-komposetoolskompose.exe convert
kompose.version: 1.27.0 (b0ed6a2c9)
creationTimestamp: null
labels:
io.kompose.network/pipeline-network: "true"
io.kompose.service: kafka
spec:
containers:
- env:
- name: KAFKA_LISTENERS
value: "LISTENER_INTERNAL://0.0.0.0:29092,LISTENER_EXTERNAL://0.0.0.0:9092"
- name: KAFKA_ADVERTISED_LISTENERS
value: "LISTENER_INTERNAL://localhost:29092,LISTENER_EXTERNAL://kafka-service.default.svc.cluster.local:9092"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "LISTENER_EXTERNAL:PLAINTEXT,LISTENER_INTERNAL:PLAINTEXT"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "LISTENER_INTERNAL"
- name: KAFKA_BROKER_ID
value: "1"
- name: KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
value: "0"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
value: "1"
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "1"
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper:2181
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
image: finnhub-streaming-data-pipeline-kafka:latest
imagePullPolicy: Never
lifecycle:
postStart:
exec:
command: ["/bin/sh","-c","/kafka-setup-k8s.sh"]
name: kafka-app
ports:
- containerPort: 9092
- containerPort: 29092
resources: {}
restartPolicy: Always
---
apiVersion: v1
kind: Service
metadata:
name: kafka-service
spec:
selector:
app: kafka
ports:
- protocol: TCP
name: firstport
port: 9092
targetPort: 9092
- protocol: TCP
name: secondport
port: 29092
targetPort: 29092
finnhub-producer.yaml (aka my Python app deployment):
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
kompose.cmd: C:ProgramDatachocolateylibkubernetes-komposetoolskompose.exe convert
kompose.version: 1.27.0 (b0ed6a2c9)
creationTimestamp: null
labels:
io.kompose.service: finnhubproducer
name: finnhubproducer
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: finnhubproducer
strategy: {}
template:
metadata:
annotations:
kompose.cmd: C:ProgramDatachocolateylibkubernetes-komposetoolskompose.exe convert
kompose.version: 1.27.0 (b0ed6a2c9)
creationTimestamp: null
labels:
io.kompose.network/pipeline-network: "true"
io.kompose.service: finnhubproducer
spec:
containers:
- env:
- name: KAFKA_PORT
value: "9092"
- name: KAFKA_SERVER
value: kafka-service.default.svc.cluster.local
- name: KAFKA_TOPIC_NAME
value: market
image: docker.io/library/finnhub-streaming-data-pipeline-finnhubproducer:latest
imagePullPolicy: Never
name: finnhubproducer
ports:
- containerPort: 8001
resources: {}
restartPolicy: Always
status: {}
---
apiVersion: v1
kind: Service
metadata:
annotations:
kompose.cmd: C:ProgramDatachocolateylibkubernetes-komposetoolskompose.exe convert
kompose.version: 1.27.0 (b0ed6a2c9)
creationTimestamp: null
labels:
io.kompose.service: finnhubproducer
name: finnhubproducer
spec:
ports:
- name: "8001"
port: 8001
targetPort: 8001
selector:
io.kompose.service: finnhubproducer
status:
loadBalancer: {}
pipeline-network-networkpolicy.yaml:
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
creationTimestamp: null
name: pipeline-network
spec:
ingress:
- from:
- podSelector:
matchLabels:
io.kompose.network/pipeline-network: "true"
podSelector:
matchLabels:
io.kompose.network/pipeline-network: "true"
EDIT:
Dockerfile for Kafka image:
FROM confluentinc/cp-kafka:6.2.0
COPY ./scripts/kafka-setup-k8s.sh /kafka-setup-k8s.sh
kafka-setup-k8s.sh:
# blocks until kafka is reachable
kafka-topics --bootstrap-server localhost:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server localhost:29092 --create --if-not-exists --topic market --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server localhost:29092 --list
2
Answers
I have managed to make it work by deleting services from deployment and running
kubectl expose deployment kafka-app
. The issue comes from Kompose labeling.Your service’s app selector is
kafka
, whereas the deployment iskafka-app
, so they aren’t connected.I suggest you use Strimzi (or Confluent for Kubernetes if you want to use their images), not convert your existing Docker Compose file using Kompose, as it rarely gets network policies correct. If fact, you can probably remove the network labels and remove the network policy completely, as it isn’t really necessary in the same namespace.
Regarding your Python app, you shouldn’t need to separately define Kafka host and port; use one variable for
KAFKA_BOOTSTRAP_SERVERS
, which can accept multiple brokers, including their ports