I have created a Firehose that uses dynamic partitioning with inline parsing. It is Direct PUT because I am using a Lambda to get data from an event source that doesn’t directly connect with Firehose, and then on the Lambda I am doing put_record_batch
calls to send the data to firehose. The JSON objects that I am sending to Firehose are like so
{
"customer_id": "123123123",
...
}
according to the documentation here, when configuring Firehose, I should be able to just specify the following to be able to partition based on customer_id
- Set Dynamic Partitioning to enabled
- Set Multi-Record DeAggregation to enabled
- Set Inline Parsing for JSON to enabled
- Create a dynamic partition keyname of
customer_id
and a jq expression of.customer_id
then if i send through a put_request_batch
request like the following:
payload1 = {"customer_id": "123123"}
encoded_data1 = base64.b64encode(json.dumps(payload1).encode('UTF-8'))
payload2 = {"customer_id": "323232"}
encoded_data2 = base64.b64encode(json.dumps(payload2).encode('UTF-8'))
response = client.put_record_batch(
DeliveryStreamName='string',
Records=[
{
'Data': encoded_data1
},
{
'Data': encoded_data2
},
]
)
I should be able to have the data partitioned based on "customer_id" when it is sent to s3. Instead of that though, I get the following error:
{
"deliveryStreamARN": "<arn>",
"destination": "<s3 bucket>",
"deliveryStreamVersionId": 4,
"message": "Metadata extraction failed because of invalid record.",
"errorCode": "DynamicPartitioning.MetadataExtractionFailed"
}
and the error record that gets stored in s3 says
"errorCode":"DynamicPartitioning.MetadataExtractionFailed","errorMessage":"Non JSON record provided"
What am I doing wrong? Clearly I sent JSON to firehose.
2
Answers
The problem is the put record batch that concatenates all JSONs. Check the option in the Firehose configuration:
Multi record deaggregation
Data deaggregation is the process of parsing through the records in a delivery stream and separating the records based either on valid JSON or on the specified delimiter. If your data is aggregated, dynamic partitioning can be applied only after data deaggregation is performed. So if you enable dynamic partitioning to your aggregated data, you must enable multi record deaggregation.
The Data you put inside the
'Data'
key should not be base64 encoded. Just do this:AWS documentation is saying that the Data blob will be base64 encoded once serialized.