I have a scenario where I have a topic_source which will have all generated messages by another application, these json/messages might be duplicated, so i need to deduplicate the messages `based on "window" size , say for every 10 sec , if there is any duplicates from topic_source , i will send deduplicate (based on message_id) messages to topic_target.
For the same i am using KStream, reading from topic_source , I am grouping by message_id using aggregation "count" , for each entry I am sending one message to topic_target.
Some thing like below
final KStream<String, Output> msgs =
builder.stream("topic_source",Serdes.String());final KTable<Windowed, Long> counts = clickEvents
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count();counts.toStream()
.map((key, value) -> KeyValue.pair(
key.key(),
new Output(key.key(), value, key.window().start())))
.to("topic_target", Produced.with(Serdes.Integer(), new JsonSerde<>(Output.class)));
This is working fine in my local (windows standalone eclipse IDE ) machine ( when tested ).
But when I deploy the service/application on kubernatics pods , when I test , i found topic_target recieve as many meesages topic_source. ( no deduplication ) is happening.
I think , topic_source messages going/processed on different pods , where aggression of cumulative pods not resulting into single group by (message_id) set, i.e. each pod (group by of same message_id ) sending its own deduplicate messages to topic_target, where accumulated result result into duplicates.
Is there any way to solve this issue on kubernatics cluster ?
i.e. is there any way all pods togther groupBy on one set , and send one distinct/deduplicated messages set to topic_target ?
This to achieve , what features of kubernatics/dockers should i use ? should there be any design machanisum/pattern I should follow ?
Any advice highly thankful.
2
Answers
There are two things that jump to mind:
application.id
properly for all of the pods? If theapplication.id
is different across pods, each of them will process all of the messages once. If it’s the same, then the messages will be split between the pods.Who processes which messages depends on your partition assignment. Even if you have multiple pods KafkaStreams will allocate the same partitions to the same pods. So pod 1 will have partition 1 of input_topic, and partition 1 of whatever other topic your application is consuming.
Granted the specificity of your needs – which is possible to implement using standard operators – I’d probably implement this with processor API. It requires an extra changelog topic versus the repartition you’ll need for grouping by key.
The processor code would look like something below:
Added a few comments for clarity in there.
Please be mindful that cleanup of the store rests with you, otherwise at some point you’ll run out of disk space. Since you mentioned that your operation is for analytics I’d probably utilize a punctuator to routinely cleanup everything that is appropriately "old".
To use the processor use the process method (in older versions of KafkaStreams transform)