skip to Main Content

I have a scenario where I have a topic_source which will have all generated messages by another application, these json/messages might be duplicated, so i need to deduplicate the messages `based on "window" size , say for every 10 sec , if there is any duplicates from topic_source , i will send deduplicate (based on message_id) messages to topic_target.

For the same i am using KStream, reading from topic_source , I am grouping by message_id using aggregation "count" , for each entry I am sending one message to topic_target.

Some thing like below

final KStream<String, Output> msgs =
builder.stream("topic_source",Serdes.String());

final KTable<Windowed, Long> counts = clickEvents
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count();

counts.toStream()
.map((key, value) -> KeyValue.pair(
key.key(),
new Output(key.key(), value, key.window().start())))
.to("topic_target", Produced.with(Serdes.Integer(), new JsonSerde<>(Output.class)));

This is working fine in my local (windows standalone eclipse IDE ) machine ( when tested ).

But when I deploy the service/application on kubernatics pods , when I test , i found topic_target recieve as many meesages topic_source. ( no deduplication ) is happening.

I think , topic_source messages going/processed on different pods , where aggression of cumulative pods not resulting into single group by (message_id) set, i.e. each pod (group by of same message_id ) sending its own deduplicate messages to topic_target, where accumulated result result into duplicates.

Is there any way to solve this issue on kubernatics cluster ?
i.e. is there any way all pods togther groupBy on one set , and send one distinct/deduplicated messages set to topic_target ?

This to achieve , what features of kubernatics/dockers should i use ? should there be any design machanisum/pattern I should follow ?

Any advice highly thankful.

2

Answers


  1. There are two things that jump to mind:

    • It looks like you’ll be creating a "Tumbling Window" which means that you have non-overlapping windows every 10 seconds. So, if I send two messages one second apart (eg. at seconds 9.5 and 10.5) they will land in different windows and both be sent.
    • Did you set the application.id properly for all of the pods? If the application.id is different across pods, each of them will process all of the messages once. If it’s the same, then the messages will be split between the pods.
    Login or Signup to reply.
  2. Who processes which messages depends on your partition assignment. Even if you have multiple pods KafkaStreams will allocate the same partitions to the same pods. So pod 1 will have partition 1 of input_topic, and partition 1 of whatever other topic your application is consuming.

    Granted the specificity of your needs – which is possible to implement using standard operators – I’d probably implement this with processor API. It requires an extra changelog topic versus the repartition you’ll need for grouping by key.

    The processor code would look like something below:

    public class DeduplicationTimedProcessor<Key, Value> implements Processor<Key, Value, Key, Value> {
    
        private final String storeName;
        private final long deduplicationOffset;
        private ProcessorContext<Key, Value> context;
        private KeyValueStore<Key, TimestampedValue<Value>> deduplicationStore;
        
        @Data
        @NoArgsConstructor
        @AllArgsConstructor
        public static class TimestampedValue<Value> {
            private long timestamp;
            private Value value;
        }
    
        // Store needed for deduplication - means one changelog topic
        public DeduplicationTimedProcessor(String storeName, long deduplicationOffset) {
            this.storeName = storeName;
            this.deduplicationOffset = deduplicationOffset;
        }
    
        @Override
        public void init(ProcessorContext<Key, Value> context) {
            Processor.super.init(context);
            this.context = context;
            this.deduplicationStore = context.getStateStore(storeName);
        }
    
        @Override
        public void process(Record<Key, Value> record) {
    
            var key = record.key();
            var value = record.value();
            var timestamp = context.currentSystemTimeMs(); // Uses System.currentTimeMillis() by default but easier for testing
            
            var previousValue = deduplicationStore.get(key);
            // New value - no deduplication - store + forward
            if(previousValue == null) {
                deduplicationStore.put(key, new TimestampedValue<>(timestamp, value));
                context.forward(new Record<>(key, value, timestamp));
                return;
            }
            
            // previous value exists - check if duplicate && in window
            if(previousValue.equals(value) && timestamp - previousValue.timestamp < deduplicationOffset) {
                // skip this message as duplicate within window
                return;
            }
    
            deduplicationStore.put(key, new TimestampedValue<>(timestamp, value));
            context.forward(new Record<>(key, value, timestamp));
        }
    }

    Added a few comments for clarity in there.

    Please be mindful that cleanup of the store rests with you, otherwise at some point you’ll run out of disk space. Since you mentioned that your operation is for analytics I’d probably utilize a punctuator to routinely cleanup everything that is appropriately "old".

    To use the processor use the process method (in older versions of KafkaStreams transform)

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search