skip to Main Content

I’m consuming a Kafka topic published by another team (so I have very limited influence over the message format). The message has a field that holds an ARRAY of STRUCTS (an array of objects), but if the array has only one value then it just holds that STRUCT (no array, just an object). I’m trying to transform the message using Confluent KSQL. Unfortunately, I cannot figure out how to do this.

For example:

{ "field": {...} }                 <-- STRUCT (single element)
{ "field": [ {...}, {...} ] }      <-- ARRAY (multiple elements)
{ "field": [ {...}, {...}, {...} ] <-- ARRAY (multiple elements)

If I configure the field in my message schema as a STRUCT then all messages with multiple values error. If I configure the field in my message schema as an ARRAY then all messages with a single value error. I could create two streams and merge them, but then my error log will be polluted with irrelevant errors.

I’ve tried capturing this field as a STRING/VARCHAR which is fine and I can split the messages into two streams. If I do this, then I can parse the single value messages and extract the data I need, but I cannot figure out how to parse the multivalue messages. None of the KSQL JSON functions seem to allow parsing of JSON Arrays out of JSON Strings. I can use EXTRACTJSONFIELD() to extract a particular element of the array, but not all of the elements.

Am I missing something? Is there any way to handle this reasonably?

2

Answers


  1. Chosen as BEST ANSWER

    I finally solved this in a roundabout way...

    First, I created an initial stream reading the transaction as a stream of bytes using KAFKA format instead of JSON format. This allows me to put a filter conditional filter on the data so I can fork the stream into a version for the single (STRUCT) variation and a version for the multiple (ARRAY) variation.

    The initial stream looks like:

    CREATE OR REPLACE STREAM `my-topic-stream` (
      id STRING KEY,
      data BYTES
    )
    WITH (
      KAFKA_TOPIC='my-topic',
      VALUE_FORMAT='KAFKA'
    );
    

    Forking that stream looks like this with a second for a multiple version filtering for IS NOT NULL:

    CREATE OR REPLACE STREAM `my-single-stream`
    WITH (
      kafka_topic='my-single-topic'
    ) AS
    SELECT *
    FROM `my-topic-stream` 
    WHERE JSON_ARRAY_LENGTH(EXTRACTJSONFIELD(FROM_BYTES(data, 'utf8'), '$.field')) IS NULL;
    

    At this point I can create a schema for both variations, explode field, and merge the two streams back together. I don't know if this can be refined to be more efficient, but this successfully processes the transactions as I wanted.


  2. In my experience, this is one use-case where KSQL just doesn’t work. You would need to use Kafka Streams or a plain consumer to deserialize the event as a generic JSON type, then check object.get("field").isArray() or isObject(), and handle accordingly.

    Even if you used a UDF in KSQL, the STREAM definition would be required to know ahead of time if you have field ARRAY<?> or field STRUCT<...>

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search