skip to Main Content

to start with – I am a sort of newbie to Kubernetes and I might omit some fundamentals.

I have a working containerized app that is orchestrated with docker-compose (and works alright) and I am rewriting it to deploy into Kubernetes. I’ve converted it to K8s .yaml files via Kompose and modified it to some degree. I am struggling to set up a connection between a Python app and Kafka that are running on separate pods. The Python app constantly returns NoBrokersAvailable() error no matter what I try to apply – it’s quite obvious that it cannot connect to a broker. What am I missing? I’ve defined proper listeners and network policy. I am running it locally on Minikube with local Docker images registry.

The Python app connects to the following address:
KafkaProducer(bootstrap_servers='kafka-service.default.svc.cluster.local:9092')

kafka-deployment.yaml (the Dockerfile image is based on confluentinc/cp-kafka:6.2.0 with a topics setup script added to it):

apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    kompose.cmd: C:ProgramDatachocolateylibkubernetes-komposetoolskompose.exe convert
    kompose.version: 1.27.0 (b0ed6a2c9)
  creationTimestamp: null
  labels:
    io.kompose.service: kafka
  name: kafka-app
spec:
  replicas: 1
  selector:
    matchLabels:
      io.kompose.service: kafka
  strategy: {}
  template:
    metadata:
      annotations:
        kompose.cmd: C:ProgramDatachocolateylibkubernetes-komposetoolskompose.exe convert
        kompose.version: 1.27.0 (b0ed6a2c9)
      creationTimestamp: null
      labels:
        io.kompose.network/pipeline-network: "true"
        io.kompose.service: kafka
    spec:
      containers:
        - env:
            - name: KAFKA_LISTENERS
              value: "LISTENER_INTERNAL://0.0.0.0:29092,LISTENER_EXTERNAL://0.0.0.0:9092"
            - name: KAFKA_ADVERTISED_LISTENERS
              value: "LISTENER_INTERNAL://localhost:29092,LISTENER_EXTERNAL://kafka-service.default.svc.cluster.local:9092"
            - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
              value: "LISTENER_EXTERNAL:PLAINTEXT,LISTENER_INTERNAL:PLAINTEXT"
            - name: KAFKA_INTER_BROKER_LISTENER_NAME
              value: "LISTENER_INTERNAL"
            - name: KAFKA_BROKER_ID
              value: "1"
            - name: KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
              value: "0"
            - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
              value: "1"
            - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: zookeeper:2181
            - name: POD_IP
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
          image: finnhub-streaming-data-pipeline-kafka:latest
          imagePullPolicy: Never
          lifecycle:
            postStart:
              exec: 
                command: ["/bin/sh","-c","/kafka-setup-k8s.sh"]
          name: kafka-app
          ports:
            - containerPort: 9092
            - containerPort: 29092
          resources: {}
      restartPolicy: Always

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service
spec:
  selector:
    app: kafka
  ports:
    - protocol: TCP
      name: firstport
      port: 9092
      targetPort: 9092
    - protocol: TCP
      name: secondport
      port: 29092
      targetPort: 29092

finnhub-producer.yaml (aka my Python app deployment):

apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    kompose.cmd: C:ProgramDatachocolateylibkubernetes-komposetoolskompose.exe convert
    kompose.version: 1.27.0 (b0ed6a2c9)
  creationTimestamp: null
  labels:
    io.kompose.service: finnhubproducer
  name: finnhubproducer
spec:
  replicas: 1
  selector:
    matchLabels:
      io.kompose.service: finnhubproducer
  strategy: {}
  template:
    metadata:
      annotations:
        kompose.cmd: C:ProgramDatachocolateylibkubernetes-komposetoolskompose.exe convert
        kompose.version: 1.27.0 (b0ed6a2c9)
      creationTimestamp: null
      labels:
        io.kompose.network/pipeline-network: "true"
        io.kompose.service: finnhubproducer
    spec:
      containers:
        - env:
            - name: KAFKA_PORT
              value: "9092"
            - name: KAFKA_SERVER
              value: kafka-service.default.svc.cluster.local
            - name: KAFKA_TOPIC_NAME
              value: market
          image: docker.io/library/finnhub-streaming-data-pipeline-finnhubproducer:latest
          imagePullPolicy: Never
          name: finnhubproducer
          ports:
            - containerPort: 8001
          resources: {}
      restartPolicy: Always
status: {}

---
apiVersion: v1
kind: Service
metadata:
  annotations:
    kompose.cmd: C:ProgramDatachocolateylibkubernetes-komposetoolskompose.exe convert
    kompose.version: 1.27.0 (b0ed6a2c9)
  creationTimestamp: null
  labels:
    io.kompose.service: finnhubproducer
  name: finnhubproducer
spec:
  ports:
    - name: "8001"
      port: 8001
      targetPort: 8001
  selector:
    io.kompose.service: finnhubproducer
status:
  loadBalancer: {}

pipeline-network-networkpolicy.yaml:

apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  creationTimestamp: null
  name: pipeline-network
spec:
  ingress:
    - from:
        - podSelector:
            matchLabels:
              io.kompose.network/pipeline-network: "true"
  podSelector:
    matchLabels:
      io.kompose.network/pipeline-network: "true"

EDIT:
Dockerfile for Kafka image:

FROM confluentinc/cp-kafka:6.2.0

COPY ./scripts/kafka-setup-k8s.sh /kafka-setup-k8s.sh

kafka-setup-k8s.sh:


# blocks until kafka is reachable
kafka-topics --bootstrap-server localhost:29092 --list

echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server localhost:29092 --create --if-not-exists --topic market --replication-factor 1 --partitions 1

echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server localhost:29092 --list

2

Answers


  1. Chosen as BEST ANSWER

    I have managed to make it work by deleting services from deployment and running kubectl expose deployment kafka-app. The issue comes from Kompose labeling.


  2. Your service’s app selector is kafka, whereas the deployment is kafka-app, so they aren’t connected.

    I suggest you use Strimzi (or Confluent for Kubernetes if you want to use their images), not convert your existing Docker Compose file using Kompose, as it rarely gets network policies correct. If fact, you can probably remove the network labels and remove the network policy completely, as it isn’t really necessary in the same namespace.

    Regarding your Python app, you shouldn’t need to separately define Kafka host and port; use one variable for KAFKA_BOOTSTRAP_SERVERS, which can accept multiple brokers, including their ports

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search