Actual message read from Kafka:
{
"body": {
"metadata": {
"id": "bce16e11"
},
"eventDetails": {
"eventID": "c5f615f1",
"customerId": "123456789",
"Name": "NEW"
}
}
}
want to add Kafka offset and partition to the message as follows. Please note that Kafka offset and partition are from NiFi attributes and not part of the message or not hard coded values.
{
"body" : {
"metadata" : {
"id" : "bce16e11",
"kafkaOffset" : 4537732,
"kafkaPartition" : 4
},
"eventDetails" : {
"eventID" : "c5f615f1",
"customerId" : "123456789",
"Name" : "NEW"
}
}
}
2
Answers
You can use JoltTransformJSON processor along with a shift spec which contains # wildcards in order to emphasize fixed values such that :
or directly use a modify spec which contains @ wildcards such that :
Edit : Considering that you don’t need fixed values but some attributes coming from the Kafka side based on the last edit, you can use such a specification :
for the attributes
attrOffset
andattrPartition
.If you’re using NiFi 1.x, then I’d recommend using ConsumeKafkaRecord_2_6 processor with the Output Stragegy set to Use Wrapper. This way all of the consumed records (=Kafka messages) will also contain a bunch of information including the offset.
Here is an example of what the output would look like if consuming a single Kafka message and using the JSON Record Writer:
Note: using Record-based processors in NiFi is highly recommended for higher throughput and performances.
See https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.27.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html (around the end) for more details.
If you’re using NiFi 2, then this would be ConsumeKafka processor and you have the same option for its configuration.