I am tyring to setup the AWS MSK connector to the DynamoDB but I cant understand how can I specify the DunamoDB key.
connector.class=io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector
table.name.format=test_sink_table_2
confluent.topic.bootstrap.servers="someserver"
tasks.max=1
topics=spk-enriched-stream
aws.dynamodb.endpoint=https://dynamodb.eu-west-3.amazonaws.com
confluent.topic.security.protocol=SASL_SSL
confluent.topic.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
confluent.topic.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
aws.dynamodb.region=eu-west-3
confluent.topic.sasl.mechanism=AWS_MSK_IAM
My application send messages to Kafka in TSV format.
Currently I am getting this error:
[Worker-0e5124fe718e9e914] Caused by: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: The provided key element does not match the schema (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: N789DKVS6F25MUQJV9U356DG7BVV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)
So how can I specify connector to load the data in DynamoDB? How can I get the key from the value.converter?
2
Answers
So the only option that we were able to find is to convert messages from TSV to JSON and then setup PK DynamoDB keys.
When you use
ByteArrayConverter
orStringConverter
, you will be unable to access fields within the data because there is no structure to them. The example documentation uses AvroConverter, which can access fields because Avro is structured… TSV is not a recommended format in Kafka because you have no easy way to parse out "column 1" to use as any field/ID (you also don’t know if you have any empty rows)If you want to use fields from the data to insert into the database, you should review your settings for
aws.dynamodb.pk.hash
andaws.dynamodb.pk.sort
. As documented, the hashed key defaults to use the partition and is sorted by offset, however this will also require you do change how data is written to the topic.https://docs.confluent.io/kafka-connectors/aws-dynamodb/current/configuration_options.html#dynamodb-parameters