skip to Main Content

Hi I have been stuck in a problem since two weeks, I am trying to upload a CSV in kafka partition using a CSV source connector. First, I tried on windows 11 using first.row.as.header:true, Source connector was created but it’s state remained failed, as it was not able to infer schema from input files. Then, I tried to use ubuntu in a VM, It didn’t work with same reason. Now, I removed the header row and trying to use my own schema. I am getting following error.

root@vineet-virtual-machine:/home/vineet/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero# curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config  -d @csv-connector.json
HTTP/1.1 500 Internal Server Error
Date: Tue, 14 Nov 2023 10:39:25 GMT
Content-Type: application/json
Content-Length: 299
Server: Jetty(9.4.40.v20210413)

{"error_code":500,"message":"Cannot deserialize instance of `java.lang.String` out of START_OBJECT tokenn at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 447] (through reference chain: java.util.LinkedHashMap["key.schema"])"}root@vineet-virtual-machine:/home/vineet/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero# 

Now this is my docker compose

root@vineet-virtual-machine:/home/vineet/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero# cat docker-compose.yml 
---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:6.2.0
    container_name: broker
    depends_on:
      - zookeeper
    ports:
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    # An important note about accessing Kafka from clients on other machines:
    # -----------------------------------------------------------------------
    #
    # The config used here exposes port 9092 for _external_ connections to the broker
    # i.e. those from _outside_ the docker network. This could be from the host machine
    # running docker, or maybe further afield if you've got a more complicated setup.
    # If the latter is true, you will need to change the value 'localhost' in
    # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
    # remote clients
    #
    # For connections _internal_ to the docker network, such as from other services
    # and components, use broker:29092.
    #
    # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    #
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100

  schema-registry:
    image: confluentinc/cp-schema-registry:6.2.0
    container_name: schema-registry
    ports:
      - "8081:8081"
    depends_on:
      - broker
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:29092

  kafka-connect:
    image: confluentinc/cp-kafka-connect-base:6.2.0
    container_name: kafka-connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
    #  ---------------
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
    # If you want to use the Confluent Hub installer to d/l component, but make them available
    # when running this offline, spin up the stack once and then run : 
    #   docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars
    volumes:
      - $PWD/data:/data
    # In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
    command:
      - bash
      - -c
      - |
        echo "Installing Connector"
        confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0
        confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.1.3
        confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:2.0.0
        confluent-hub install --no-prompt jcustenborder/kafka-connect-spooldir:2.0.60
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        sleep infinity

  ksqldb:
    # *-----------------------------*
    # To connect to ksqlDB CLI
    #   docker exec --interactive --tty ksqldb ksql http://localhost:8088
    # *-----------------------------*
    image: confluentinc/ksqldb-server:0.21.0
    container_name: ksqldb
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:29092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      KSQL_STREAMS_PRODUCER_MAX_BLOCK_MS: 9223372036854775807
      KSQL_KSQL_CONNECT_URL: http://kafka-connect:8083
      KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
      KSQL_KSQL_HIDDEN_TOPICS: '^_.*'

  control-center:
    image: confluentinc/cp-enterprise-control-center:6.2.0
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT_CLUSTER: 'kafka-connect:8083'
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_KSQL_KSQLDB_URL: "http://ksqldb:8088"
      # The advertised URL needs to be the URL on which the browser
      #  can access the KSQL server (e.g. http://localhost:8088/info)
      CONTROL_CENTER_KSQL_KSQLDB_ADVERTISED_URL: "http://localhost:8088"
      # -v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v
      # Useful settings for development/laptop use - modify as needed for Prod
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_COMMAND_TOPIC_REPLICATION: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_REPLICATION: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_INTERNAL_TOPICS_REPLICATION: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 1
      CONTROL_CENTER_STREAMS_CACHE_MAX_BYTES_BUFFERING: 104857600
    command:
      - bash
      - -c 
      - |
        echo "Waiting two minutes for Kafka brokers to start and 
               necessary topics to be available"
        sleep 120  
        /etc/confluent/docker/run

# Other systems
  mysql:
    # *-----------------------------*
    # To connect to the DB:
    #   docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
    # *-----------------------------*
    image: mysql:8.0
    container_name: mysql
    ports:
      - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
    volumes:
     - ${PWD}/data/mysql:/docker-entrypoint-initdb.d
     - ${PWD}/data:/data

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.10.1
    container_name: elasticsearch
    hostname: elasticsearch
    ports:
      - 9200:9200
    environment:
      xpack.security.enabled: "false"
      ES_JAVA_OPTS: "-Xms1g -Xmx1g"
      discovery.type: "single-node"

  kibana:
    image: docker.elastic.co/kibana/kibana:7.10.1
    container_name: kibana
    hostname: kibana
    depends_on:
      - elasticsearch
    ports:
      - 5601:5601
    environment:
      xpack.security.enabled: "false"
      discovery.type: "single-node"
    command:
      - bash
      - -c
      - |
        /usr/local/bin/kibana-docker &
        echo "Waiting for Kibana to be ready ⏳"
        while [ $$(curl -H 'kbn-xsrf: true' -s -o /dev/null -w %{http_code} http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) -ne 200 ] ; do 
          echo -e "t" $$(date) " Kibana saved objects request response: " $$(curl -H 'kbn-xsrf: true' -o /dev/null -w %{http_code} -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) $$(curl -H 'kbn-xsrf: true' -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) " (waiting for 200)"
          sleep 5  
        done

        echo -e "t" $$(date) " Kibana saved objects request response: " $$(curl -H 'kbn-xsrf: true' -o /dev/null -w %{http_code} -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) $$(curl -H 'kbn-xsrf: true' -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*)

        echo -e "n--n+> Pre-creating index pattern"
        curl -s -XPOST 'http://localhost:5601/api/saved_objects/index-pattern/mysql-debezium-asgard.demo.orders' 
          -H 'kbn-xsrf: nevergonnagiveyouup' 
          -H 'Content-Type: application/json' 
          -d '{"attributes":{"title":"mysql-debezium-asgard.demo.orders","timeFieldName":"CREATE_TS"}}'

        echo -e "n--n+> Setting the index pattern as default"
        curl -s -XPOST 'http://localhost:5601/api/kibana/settings' 
          -H 'kbn-xsrf: nevergonnagiveyouup' 
          -H 'content-type: application/json' 
          -d '{"changes":{"defaultIndex":"mysql-debezium-asgard.demo.orders"}}'

        echo -e "n--n+> Opt out of Kibana telemetry"
        curl 'http://localhost:5601/api/telemetry/v2/optIn' 
            -H 'kbn-xsrf: nevergonnagiveyouup' 
            -H 'content-type: application/json' 
            -H 'accept: application/json' 
            --data-binary '{"enabled":false}' 
            --compressed

        sleep infinity

  neo4j:
    image: neo4j:4.2.3
    container_name: neo4j
    ports:
    - "7474:7474"
    - "7687:7687"
    environment:
      NEO4J_AUTH: neo4j/connect
      NEO4J_dbms_memory_heap_max__size: 8G
      NEO4J_ACCEPT_LICENSE_AGREEMENT: 'yes'

  kafkacat:
    image: edenhill/kafkacat:1.6.0
    container_name: kafkacat
    entrypoint: 
      - /bin/sh 
      - -c 
      - |
        apk add jq; 
        while [ 1 -eq 1 ];do sleep 60;done

This is my source connector

vineet@vineet-virtual-machine:~/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero$ cat csv-connector.json 
{
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "topic": "inventory_spooldir_01",
        "input.path": "/data/unprocessed",
        "finished.path": "/data/processed",
        "error.path": "/data/error",
        "input.file.pattern": ".*\.csv",
        "schema.generation.key.name":"book",
    "schema.generation.value.name":"books",
    "schema.generation.enabled" : "false",
    "key.schema": {
                "name": "com.github.jcustenborder.kafka.connect.model.Key",
                "type": "STRUCT",
                "isOptional": false,
                "fieldSchemas": {
                  "GUID": {
                    "type": "STRING",
                    "isOptional": false
                    }
                  }
              },
         "value.schema": {
                "name": "com.github.jcustenborder.kafka.connect.model.Value",
                "type": "STRUCT",
                "isOptional": false,
                "fieldSchemas": {
                  "ID": {
                    "type": "STRING",
                    "isOptional": true
                   },
                  "NAME": {
                    "type": "STRING",
                    "isOptional": true
                   },
                  "AUTHOR": {
                    "type": "STRING",
                    "isOptional": true
                  },
                  "PRICE": {
                        "type": "STRING",
                        "isOptional": true
                  },
                  "STOCK": {
                        "type": "STRING",
                        "isOptional": true
                  },
                  "PUBLICATION": {
                        "type": "STRING",
                        "isOptional": true
                  },
                  "GUID": {
                        "type": "STRING",
                        "isOptional": false
                  }

                }
             }
        }

This is first few lines among 1 million rows. I created it manually using a nodejs script locally.

vineet@vineet-virtual-machine:~/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero$ head ./data/unprocessed/testdata.csv 
1,Ricky,Lawrence,$2641.54,8394,Tmh,11ae5913-3efe-4526-a66c-9c0e91b704bc
2,Devin,Thomas,$9588.26,5173,Evergreen,6a1e4f8f-e69d-40ae-8ee1-81ea58805f3a
3,Ernest,Henry,$3938.58,1297,Evergreen,dcadba97-81a4-4e17-8f1c-5a8a4f01b777
4,Susie,King,$8660.66,4209,Bhawan,0e3b4283-bcc4-498c-8293-c861625fc045
5,Lina,Jones,$670.80,1613,Bhawan,86edcbab-b599-40e2-b564-340ebe820243
6,Troy,McCoy,$132.60,2956,Tmh,a8a4c6ef-3dc9-4978-b766-97ee7b4444af
7,Ola,White,$2066.71,5027,Tmh,64d3be45-b2f0-47ad-b1c0-3dfc0892d6bc
8,David,Watkins,$3660.59,5276,Bharati,6f738ca5-d0ee-46b9-8aea-41ab3a2ab9a2
9,Effie,Kennedy,$8158.57,9929,Tmh,c5caec0d-0fd5-403f-aa90-759eab8ff45f
10,Elva,Harris,$8996.29,8765,Arihant,0ea64116-6b27-4586-ae04-1580165ab703

Also, I have give 777 permission to testdata.csv. Thanks

2

Answers


  1. Chosen as BEST ANSWER

    Adding these two lines in my --data JSON solved my problem.

        "key.serializer":"org.apache.kafka.common.serialization.StringSerializer",
        "value.serializer":"io.confluent.kafka.serializers.KafkaAvroSerializer",
    

    Actually I had used these two properties in my kafka-connect container in docker compose that I shared above in my question

          CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
          CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
    

    Referenced from:


  2. key.schema and value.schema must be an in-line, escaped JSON object.

    Kafka Connect values can only be strings/numbers/boolean, not objects.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search