I’m consuming a Kafka topic published by another team (so I have very limited influence over the message format). The message has a field that holds an ARRAY of STRUCTS (an array of objects), but if the array has only one value then it just holds that STRUCT (no array, just an object). I’m trying to transform the message using Confluent KSQL. Unfortunately, I cannot figure out how to do this.
For example:
{ "field": {...} } <-- STRUCT (single element)
{ "field": [ {...}, {...} ] } <-- ARRAY (multiple elements)
{ "field": [ {...}, {...}, {...} ] <-- ARRAY (multiple elements)
If I configure the field in my message schema as a STRUCT then all messages with multiple values error. If I configure the field in my message schema as an ARRAY then all messages with a single value error. I could create two streams and merge them, but then my error log will be polluted with irrelevant errors.
I’ve tried capturing this field as a STRING/VARCHAR which is fine and I can split the messages into two streams. If I do this, then I can parse the single value messages and extract the data I need, but I cannot figure out how to parse the multivalue messages. None of the KSQL JSON functions seem to allow parsing of JSON Arrays out of JSON Strings. I can use EXTRACTJSONFIELD() to extract a particular element of the array, but not all of the elements.
Am I missing something? Is there any way to handle this reasonably?
2
Answers
I finally solved this in a roundabout way...
First, I created an initial stream reading the transaction as a stream of bytes using KAFKA format instead of JSON format. This allows me to put a filter conditional filter on the data so I can fork the stream into a version for the single (STRUCT) variation and a version for the multiple (ARRAY) variation.
The initial stream looks like:
Forking that stream looks like this with a second for a multiple version filtering for
IS NOT NULL
:At this point I can create a schema for both variations, explode
field
, and merge the two streams back together. I don't know if this can be refined to be more efficient, but this successfully processes the transactions as I wanted.In my experience, this is one use-case where KSQL just doesn’t work. You would need to use Kafka Streams or a plain consumer to deserialize the event as a generic JSON type, then check
object.get("field").isArray()
orisObject()
, and handle accordingly.Even if you used a UDF in KSQL, the STREAM definition would be required to know ahead of time if you have
field ARRAY<?>
orfield STRUCT<...>