skip to Main Content

I am tyring to setup the AWS MSK connector to the DynamoDB but I cant understand how can I specify the DunamoDB key.

connector.class=io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector
table.name.format=test_sink_table_2
confluent.topic.bootstrap.servers="someserver"
tasks.max=1
topics=spk-enriched-stream
aws.dynamodb.endpoint=https://dynamodb.eu-west-3.amazonaws.com
confluent.topic.security.protocol=SASL_SSL
confluent.topic.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
confluent.topic.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
aws.dynamodb.region=eu-west-3
confluent.topic.sasl.mechanism=AWS_MSK_IAM

My application send messages to Kafka in TSV format.

Currently I am getting this error:

[Worker-0e5124fe718e9e914] Caused by: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: The provided key element does not match the schema (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: N789DKVS6F25MUQJV9U356DG7BVV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)

So how can I specify connector to load the data in DynamoDB? How can I get the key from the value.converter?

2

Answers


  1. Chosen as BEST ANSWER

    So the only option that we were able to find is to convert messages from TSV to JSON and then setup PK DynamoDB keys.


  2. When you use ByteArrayConverter or StringConverter, you will be unable to access fields within the data because there is no structure to them. The example documentation uses AvroConverter, which can access fields because Avro is structured… TSV is not a recommended format in Kafka because you have no easy way to parse out "column 1" to use as any field/ID (you also don’t know if you have any empty rows)

    If you want to use fields from the data to insert into the database, you should review your settings for aws.dynamodb.pk.hash and aws.dynamodb.pk.sort. As documented, the hashed key defaults to use the partition and is sorted by offset, however this will also require you do change how data is written to the topic.

    https://docs.confluent.io/kafka-connectors/aws-dynamodb/current/configuration_options.html#dynamodb-parameters

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search