skip to Main Content

Actual message read from Kafka:

{
  "body": {
    "metadata": {
      "id": "bce16e11"
    },
    "eventDetails": {
      "eventID": "c5f615f1",
      "customerId": "123456789",
      "Name": "NEW"
    }
  }
}

want to add Kafka offset and partition to the message as follows. Please note that Kafka offset and partition are from NiFi attributes and not part of the message or not hard coded values.

{
  "body" : {
    "metadata" : {
      "id" : "bce16e11",
      "kafkaOffset" : 4537732,
      "kafkaPartition" : 4
    },
    "eventDetails" : {
      "eventID" : "c5f615f1",
      "customerId" : "123456789",
      "Name" : "NEW"
    }
  }
}

2

Answers


  1. You can use JoltTransformJSON processor along with a shift spec which contains # wildcards in order to emphasize fixed values such that :

    [
      {
        "operation": "shift",
        "spec": {
          "body": {
            "metadata": {
              "*": "&2.&1.&", //represents all of the existing elements within the "metadata" object
                              //&1 stands for "metadata" key
                              //&2 stands for "body" key
              "#4537732": "&2.&1.kafkaOffset",
              "#4": "&2.&1.kafkaPaitition"
            },
            "*": "&1.&" //represents the elements other than "metadata" object
              //&1 stands for "body" key
          }
        }
      },
      {
        "operation": "modify-overwrite-beta",
        "spec": {
          "body": {
            "metadata": {
              "*": "=toInteger"
            }
          }
        }
      }
    ]
    

    or directly use a modify spec which contains @ wildcards such that :

    [
      {
        "operation": "modify-overwrite-beta",
        "spec": {
          "body": {
            "metadata": {
              "kafkaOffset": "=@(4537732)",
              "kafkaPartition": "=@(4)"
            }
          }
        }
      }
    ]
    

    Edit : Considering that you don’t need fixed values but some attributes coming from the Kafka side based on the last edit, you can use such a specification :

    [
      {
        "operation": "modify-overwrite-beta",
        "spec": {
          "body": {
            "metadata": {
              "kafkaOffset": "${attrOffset}",
              "kafkaPartition": "${attrPartition}"
            }
          }
        }
      }
    ]
    

    for the attributes attrOffset and attrPartition.

    Login or Signup to reply.
  2. If you’re using NiFi 1.x, then I’d recommend using ConsumeKafkaRecord_2_6 processor with the Output Stragegy set to Use Wrapper. This way all of the consumed records (=Kafka messages) will also contain a bunch of information including the offset.

    Here is an example of what the output would look like if consuming a single Kafka message and using the JSON Record Writer:

    [
      {
        "key": {
          "name": "Acme",
          "number": "AC1234"
        },
        "value": {
          "address": "1234 First Street",
          "zip": "12345",
          "account": {
            "name": "Acme",
            "number": "AC1234"
          }
        },
        "headers": {
          "attributeA": "valueA",
          "attributeB": "valueB"
        },
        "metadata": {
          "topic": "accounts",
          "partition": 0,
          "offset": 0,
          "timestamp": 0
        }
      }
    ]
    

    Note: using Record-based processors in NiFi is highly recommended for higher throughput and performances.

    See https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.27.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html (around the end) for more details.

    If you’re using NiFi 2, then this would be ConsumeKafka processor and you have the same option for its configuration.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search