Hi I have been stuck in a problem since two weeks, I am trying to upload a CSV in kafka partition using a CSV source connector. First, I tried on windows 11 using first.row.as.header:true
, Source connector was created but it’s state remained failed, as it was not able to infer schema from input files. Then, I tried to use ubuntu in a VM, It didn’t work with same reason. Now, I removed the header row and trying to use my own schema. I am getting following error.
root@vineet-virtual-machine:/home/vineet/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero# curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config -d @csv-connector.json
HTTP/1.1 500 Internal Server Error
Date: Tue, 14 Nov 2023 10:39:25 GMT
Content-Type: application/json
Content-Length: 299
Server: Jetty(9.4.40.v20210413)
{"error_code":500,"message":"Cannot deserialize instance of `java.lang.String` out of START_OBJECT tokenn at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 447] (through reference chain: java.util.LinkedHashMap["key.schema"])"}root@vineet-virtual-machine:/home/vineet/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero#
Now this is my docker compose
root@vineet-virtual-machine:/home/vineet/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero# cat docker-compose.yml
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:6.2.0
container_name: broker
depends_on:
- zookeeper
ports:
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
# An important note about accessing Kafka from clients on other machines:
# -----------------------------------------------------------------------
#
# The config used here exposes port 9092 for _external_ connections to the broker
# i.e. those from _outside_ the docker network. This could be from the host machine
# running docker, or maybe further afield if you've got a more complicated setup.
# If the latter is true, you will need to change the value 'localhost' in
# KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
# remote clients
#
# For connections _internal_ to the docker network, such as from other services
# and components, use broker:29092.
#
# See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
#
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
schema-registry:
image: confluentinc/cp-schema-registry:6.2.0
container_name: schema-registry
ports:
- "8081:8081"
depends_on:
- broker
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:29092
kafka-connect:
image: confluentinc/cp-kafka-connect-base:6.2.0
container_name: kafka-connect
depends_on:
- broker
- schema-registry
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
# ---------------
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
# If you want to use the Confluent Hub installer to d/l component, but make them available
# when running this offline, spin up the stack once and then run :
# docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars
volumes:
- $PWD/data:/data
# In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
command:
- bash
- -c
- |
echo "Installing Connector"
confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0
confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.1.3
confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:2.0.0
confluent-hub install --no-prompt jcustenborder/kafka-connect-spooldir:2.0.60
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
ksqldb:
# *-----------------------------*
# To connect to ksqlDB CLI
# docker exec --interactive --tty ksqldb ksql http://localhost:8088
# *-----------------------------*
image: confluentinc/ksqldb-server:0.21.0
container_name: ksqldb
depends_on:
- broker
- schema-registry
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:29092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KSQL_STREAMS_PRODUCER_MAX_BLOCK_MS: 9223372036854775807
KSQL_KSQL_CONNECT_URL: http://kafka-connect:8083
KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
KSQL_KSQL_HIDDEN_TOPICS: '^_.*'
control-center:
image: confluentinc/cp-enterprise-control-center:6.2.0
container_name: control-center
depends_on:
- broker
- schema-registry
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT_CLUSTER: 'kafka-connect:8083'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_KSQL_KSQLDB_URL: "http://ksqldb:8088"
# The advertised URL needs to be the URL on which the browser
# can access the KSQL server (e.g. http://localhost:8088/info)
CONTROL_CENTER_KSQL_KSQLDB_ADVERTISED_URL: "http://localhost:8088"
# -v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v
# Useful settings for development/laptop use - modify as needed for Prod
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_COMMAND_TOPIC_REPLICATION: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_REPLICATION: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_INTERNAL_TOPICS_REPLICATION: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 1
CONTROL_CENTER_STREAMS_CACHE_MAX_BYTES_BUFFERING: 104857600
command:
- bash
- -c
- |
echo "Waiting two minutes for Kafka brokers to start and
necessary topics to be available"
sleep 120
/etc/confluent/docker/run
# Other systems
mysql:
# *-----------------------------*
# To connect to the DB:
# docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
# *-----------------------------*
image: mysql:8.0
container_name: mysql
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
volumes:
- ${PWD}/data/mysql:/docker-entrypoint-initdb.d
- ${PWD}/data:/data
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.10.1
container_name: elasticsearch
hostname: elasticsearch
ports:
- 9200:9200
environment:
xpack.security.enabled: "false"
ES_JAVA_OPTS: "-Xms1g -Xmx1g"
discovery.type: "single-node"
kibana:
image: docker.elastic.co/kibana/kibana:7.10.1
container_name: kibana
hostname: kibana
depends_on:
- elasticsearch
ports:
- 5601:5601
environment:
xpack.security.enabled: "false"
discovery.type: "single-node"
command:
- bash
- -c
- |
/usr/local/bin/kibana-docker &
echo "Waiting for Kibana to be ready ⏳"
while [ $$(curl -H 'kbn-xsrf: true' -s -o /dev/null -w %{http_code} http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) -ne 200 ] ; do
echo -e "t" $$(date) " Kibana saved objects request response: " $$(curl -H 'kbn-xsrf: true' -o /dev/null -w %{http_code} -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) $$(curl -H 'kbn-xsrf: true' -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) " (waiting for 200)"
sleep 5
done
echo -e "t" $$(date) " Kibana saved objects request response: " $$(curl -H 'kbn-xsrf: true' -o /dev/null -w %{http_code} -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) $$(curl -H 'kbn-xsrf: true' -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*)
echo -e "n--n+> Pre-creating index pattern"
curl -s -XPOST 'http://localhost:5601/api/saved_objects/index-pattern/mysql-debezium-asgard.demo.orders'
-H 'kbn-xsrf: nevergonnagiveyouup'
-H 'Content-Type: application/json'
-d '{"attributes":{"title":"mysql-debezium-asgard.demo.orders","timeFieldName":"CREATE_TS"}}'
echo -e "n--n+> Setting the index pattern as default"
curl -s -XPOST 'http://localhost:5601/api/kibana/settings'
-H 'kbn-xsrf: nevergonnagiveyouup'
-H 'content-type: application/json'
-d '{"changes":{"defaultIndex":"mysql-debezium-asgard.demo.orders"}}'
echo -e "n--n+> Opt out of Kibana telemetry"
curl 'http://localhost:5601/api/telemetry/v2/optIn'
-H 'kbn-xsrf: nevergonnagiveyouup'
-H 'content-type: application/json'
-H 'accept: application/json'
--data-binary '{"enabled":false}'
--compressed
sleep infinity
neo4j:
image: neo4j:4.2.3
container_name: neo4j
ports:
- "7474:7474"
- "7687:7687"
environment:
NEO4J_AUTH: neo4j/connect
NEO4J_dbms_memory_heap_max__size: 8G
NEO4J_ACCEPT_LICENSE_AGREEMENT: 'yes'
kafkacat:
image: edenhill/kafkacat:1.6.0
container_name: kafkacat
entrypoint:
- /bin/sh
- -c
- |
apk add jq;
while [ 1 -eq 1 ];do sleep 60;done
This is my source connector
vineet@vineet-virtual-machine:~/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero$ cat csv-connector.json
{
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"topic": "inventory_spooldir_01",
"input.path": "/data/unprocessed",
"finished.path": "/data/processed",
"error.path": "/data/error",
"input.file.pattern": ".*\.csv",
"schema.generation.key.name":"book",
"schema.generation.value.name":"books",
"schema.generation.enabled" : "false",
"key.schema": {
"name": "com.github.jcustenborder.kafka.connect.model.Key",
"type": "STRUCT",
"isOptional": false,
"fieldSchemas": {
"GUID": {
"type": "STRING",
"isOptional": false
}
}
},
"value.schema": {
"name": "com.github.jcustenborder.kafka.connect.model.Value",
"type": "STRUCT",
"isOptional": false,
"fieldSchemas": {
"ID": {
"type": "STRING",
"isOptional": true
},
"NAME": {
"type": "STRING",
"isOptional": true
},
"AUTHOR": {
"type": "STRING",
"isOptional": true
},
"PRICE": {
"type": "STRING",
"isOptional": true
},
"STOCK": {
"type": "STRING",
"isOptional": true
},
"PUBLICATION": {
"type": "STRING",
"isOptional": true
},
"GUID": {
"type": "STRING",
"isOptional": false
}
}
}
}
This is first few lines among 1 million rows. I created it manually using a nodejs script locally.
vineet@vineet-virtual-machine:~/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero$ head ./data/unprocessed/testdata.csv
1,Ricky,Lawrence,$2641.54,8394,Tmh,11ae5913-3efe-4526-a66c-9c0e91b704bc
2,Devin,Thomas,$9588.26,5173,Evergreen,6a1e4f8f-e69d-40ae-8ee1-81ea58805f3a
3,Ernest,Henry,$3938.58,1297,Evergreen,dcadba97-81a4-4e17-8f1c-5a8a4f01b777
4,Susie,King,$8660.66,4209,Bhawan,0e3b4283-bcc4-498c-8293-c861625fc045
5,Lina,Jones,$670.80,1613,Bhawan,86edcbab-b599-40e2-b564-340ebe820243
6,Troy,McCoy,$132.60,2956,Tmh,a8a4c6ef-3dc9-4978-b766-97ee7b4444af
7,Ola,White,$2066.71,5027,Tmh,64d3be45-b2f0-47ad-b1c0-3dfc0892d6bc
8,David,Watkins,$3660.59,5276,Bharati,6f738ca5-d0ee-46b9-8aea-41ab3a2ab9a2
9,Effie,Kennedy,$8158.57,9929,Tmh,c5caec0d-0fd5-403f-aa90-759eab8ff45f
10,Elva,Harris,$8996.29,8765,Arihant,0ea64116-6b27-4586-ae04-1580165ab703
Also, I have give 777 permission to testdata.csv. Thanks
2
Answers
Adding these two lines in my --data JSON solved my problem.
Actually I had used these two properties in my kafka-connect container in docker compose that I shared above in my question
Referenced from:
key.schema
andvalue.schema
must be an in-line, escaped JSON object.Kafka Connect values can only be strings/numbers/boolean, not objects.