skip to Main Content

I have created a Firehose that uses dynamic partitioning with inline parsing. It is Direct PUT because I am using a Lambda to get data from an event source that doesn’t directly connect with Firehose, and then on the Lambda I am doing put_record_batch calls to send the data to firehose. The JSON objects that I am sending to Firehose are like so

{
  "customer_id": "123123123",
  ...
}

according to the documentation here, when configuring Firehose, I should be able to just specify the following to be able to partition based on customer_id

  1. Set Dynamic Partitioning to enabled
  2. Set Multi-Record DeAggregation to enabled
  3. Set Inline Parsing for JSON to enabled
  4. Create a dynamic partition keyname of customer_id and a jq expression of .customer_id

then if i send through a put_request_batch request like the following:

payload1 = {"customer_id": "123123"}
encoded_data1 = base64.b64encode(json.dumps(payload1).encode('UTF-8'))

payload2 = {"customer_id": "323232"}
encoded_data2 = base64.b64encode(json.dumps(payload2).encode('UTF-8'))

response = client.put_record_batch(
    DeliveryStreamName='string',
    Records=[
        {
            'Data': encoded_data1
        },
        {
            'Data': encoded_data2
        },
    ]
)

I should be able to have the data partitioned based on "customer_id" when it is sent to s3. Instead of that though, I get the following error:

{
    "deliveryStreamARN": "<arn>",
    "destination": "<s3 bucket>",
    "deliveryStreamVersionId": 4,
    "message": "Metadata extraction failed because of invalid record.",
    "errorCode": "DynamicPartitioning.MetadataExtractionFailed"
}

and the error record that gets stored in s3 says

"errorCode":"DynamicPartitioning.MetadataExtractionFailed","errorMessage":"Non JSON record provided"

What am I doing wrong? Clearly I sent JSON to firehose.

2

Answers


  1. The problem is the put record batch that concatenates all JSONs. Check the option in the Firehose configuration:

    Multi record deaggregation

    Data deaggregation is the process of parsing through the records in a delivery stream and separating the records based either on valid JSON or on the specified delimiter. If your data is aggregated, dynamic partitioning can be applied only after data deaggregation is performed. So if you enable dynamic partitioning to your aggregated data, you must enable multi record deaggregation.

    Login or Signup to reply.
  2. The Data you put inside the 'Data'key should not be base64 encoded. Just do this:

    response = client.put_record_batch(
        DeliveryStreamName=stream_name,
        Records=[
            {
                'Data': json.dumps(payload1)
            },
            {
                'Data': json.dumps(payload2)
            },
        ]
    )
    

    AWS documentation is saying that the Data blob will be base64 encoded once serialized.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search