skip to Main Content

I have a stream of events i need to match against a ktable / changelog topic but the matching is done by pattern matching on a property of the ktable entries. so i cannot join the streams based on a key since i dont know yet which one is matching.

example:

ktable X:

{
  [abc]: {id: 'abc', prop: 'some pattern'},
  [efg]: {id: 'efg', prop: 'another pattern'}
}

stream A:

{ id: 'xyz', match: 'some pattern'}

so stream A should forward something like {match: 'abc'}

So i basically need to iterate over the ktable entries and find the matching entry by pattern matching on this property.

Would it be viable to create a global state store based on the ktable and then access it from the processor API and iterate over the entries?

I could also aggregate all the entries of the ktable into 1 collection and then join on a ‘fake’ key? But this seems also rather hacky.

Or am i just forcing something which is not really streams and rather just put it into a redis cache with the normal consumer API, which is also kinda awkward since i rather have it backed by rocksDB.

edit: i guess this is kinda related to this question

2

Answers


  1. A GlobalKTable won’t work, because a stream-globalTable join allows you to extract a non-key join attribute from the stream — but the lookup into the table is still based on the table key.

    However, you could read the table input topic as a KStream, extract the join attribute, set it as key, and do an aggregation that returns a collection (ie, List, Set, etc). This way, you can do a stream-table join on the key, followed by a flatMapValues() (or flatMap()) that splits the join-result into multiple records (depending on how many records are in the collection of the table).

    As long as your join attribute has not too many duplicates (for the table input topic), and thus the value side collection in the table does not grow too large, this should work fine. You will need to provide a custom value-Serde to (de)serialize the collection data.

    Login or Signup to reply.
  2. Normally I would map the table data so I get the join key I need. We recently had a similar case, where we had to join a stream with the corresponding data in a KTable. In our case, the stream key was the first part of the table key, so we could group by that first key part and aggregate the results in a list. At the end it looked something like this.

    final KTable<String, ArrayList<String>> theTable = builder
            .table(TABLE_TOPIC, Consumed.with(keySerde, Serdes.String()))
            .groupBy((k, v) -> new KeyValue<>(k.getFirstKeyPart(), v))
            .aggregate(
                    ArrayList::new,
                    (key, value, list) -> {
                        list.add(value);
                        return list;
                    },
                    (key, value, list) -> {
                        list.remove(value);
                        return list;
                    },
                    Materialized.with(Serdes.String(), stringListSerde));
    
    final KStream<String, String> theStream = builder.stream(STREAM_TOPIC);
    
    theStream
            .join(theTable, (streamEvent, tableEventList) -> tableEventList)
            .flatMapValues(value -> value)
            .map(this::doStuff)
            .to(TARGET_TOPIC);
    

    I am not sure, if this is also possible for you, meaning, maybe it is possible for you to map the table data in some way to to the join.

    I know this does not completely belong to your case, but I hope it might be of some help anyway. Maybe you can clarify a bit, how the matching would look like for your case.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search