skip to Main Content

In our applications have enabled exactly-once in both Producer and Consumer.

Producer is a python component.We have enabled:

  • idempotence
  • use transactions (new transactionId is used every time when we send messages)

Consumer is a Spring Boot application. We have enabled:

  • read_committed isolation level
  • use manual acknowledgement for messages

We have multi-partition Kafka topic (lets say 3 partitions) on ConfluentCloud.

Our application design is as follows:

  • multiple Producer app instances
  • for performance ,we have lots of Consumer app instances (currently around 24)

enter image description here

Problem:

We noticed that sometimes the same Kafka message is consumed more than once in the Consumer.We detected this by using following consumer code. We keep the previously consumed kafka message Id (with offset) in Redis and compare them with newly consumed message.

Consumer code:

 @KafkaListener(topics = "${datalake.datasetevents.topic}",  groupId = "${spring.kafka.consumer.group-id}")
    public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
                       @Header(KafkaHeaders.OFFSET) String offset,
                       @Payload InputEvent inputEvent, Acknowledgment acknowledgment)  {
        //KafkaHeaders.
        Event event = new Event();
        event.setCorrId(inputEvent.getCorrId());
        event.setQn(inputEvent.getQn());
        event.setCreatedTs(new Date());
        event.setEventTs(inputEvent.getEventTs());
        event.setMeta(inputEvent.getMeta() != null ? inputEvent.getMeta(): new HashMap<>());
        event.setType(inputEvent.getType());
        event.setUlid(key);

        //detect message duplications
        try {
            String eventRedisKey = "tg_e_d_" + key.toLowerCase();
            String redisVal = offset;
            String tmp = redisTemplateString.opsForValue().get(eventRedisKey);
            if (tmp != null) {
                dlkLogging.error("kafka_event_dup", "Event consumed more than once ulid:" + event.getUlid()+ " redis offset: "+tmp+ " event offset:"+offset);
                redisTemplateString.delete(eventRedisKey);
            }
            redisTemplateString.opsForValue().set(eventRedisKey, redisVal, 30, TimeUnit.SECONDS);
        } catch (Exception e) {
            dlkLogging.error("kafka_consumer_redis","Redis error at kafka consumere ", e);
        }

        //process the message and ack
        try {
            eventService.saveEvent(persistEvent, event);
            ack.acknowledge(); 
        } catch (Exception ee) {
            //Refer : https://stackoverflow.com/questions/62413270/kafka-what-is-the-point-of-using-acknowledgment-nack-if-i-can-simply-not-ack
            ack.nack(1);
            dlkLogging.error("event_sink_error","error sinking kafka event.Will retry", ee);
        }
    }

Behavior:
We notice the "kafka_event_dup" is sent several times per day.

Error Message: Event consumed more than once
ulid:01G77G8KNTSM2Q01SB1MK60BTH redis offset: 659238 event
offset:659238

Question:
Why consumer read the same message even though we have configured exactly-once in both Producer and Consumer?

Update : After reading several SO posts, seems we still need to implement deduplication logic in Consumer side even though exactly-once is configured?

Additional Info:

Consumer configuration:

public DefaultKafkaConsumerFactory kafkaDatasetEventConsumerFactory(KafkaProperties properties) {

        Map<String, Object> props = properties.buildConsumerProperties();
        props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
    
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, CustomJsonDeserializer.class.getName());
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.fr.det.datalake.eventdriven.model.kafka.InputEvent");
        return new DefaultKafkaConsumerFactory(props);
     
 }

Producer code (python):

def __get_producer(self):
        conf = {
            'bootstrap.servers': self.server,
            'enable.idempotence': True,
            'acks': 'all',
            'retry.backoff.ms': self.sleep_seconds * 100
        }
        if self.sasl_mechanism:
            conf['sasl.mechanisms'] = self.sasl_mechanism
        if self.security_protocol:
            conf['security.protocol'] = self.security_protocol
        if self.sasl_username:
            conf['sasl.username'] = self.sasl_username
        if self.sasl_username:
            conf['sasl.password'] = self.sasl_password
        if self.transaction_prefix:
            conf['transactional.id'] = self.__get_transaction_id()

        producer = Producer(conf)
        return producer


@_retry_on_error
    def send_messages(self, messages, *args, **kwargs):
        ts = time.time()
        producer = kwargs.get('producer', None)
        if producer is not None:
            for message in messages:
                key = message.get('key', str(ulid.from_timestamp(ts)))
                value = message.get('value', None)
                topic = message.get('topic', self.topic)
                producer.produce(topic=topic,
                                value=value,
                                key=key,
                                on_delivery=self.acked)
            producer.commit_transaction(30)


def _retry_on_error(func, *args, **kwargs):
        def inner(self, messages, *args, **kwargs):

            attempts = 0

            while True:
                attempts += 1
                sleep_time = attempts * self.sleep_seconds

                try:
                    producer = self.__get_producer()
                    self.logger.info(f"Producer: {producer}, Attempt: {attempts}")
                    producer.init_transactions(30)
                    producer.begin_transaction()
                    res = func(self, messages, *args, producer=producer, **kwargs)
                    return res
                except KafkaException as e:
                    if attempts <= self.retry_count:
                        if e.args[0].txn_requires_abort():
                            producer.abort_transaction(30)
                        
                        time.sleep(sleep_time)
                        continue

                    self.logger.error(str(e), exc_info=True, extra=extra)
                    break

        return inner

2

Answers


  1. I can see you have set ENABLE_AUTO_COMMIT_CONFIG to false, that means you are having a manual commit process in place. If we are not committing the offset of the messages read efficiently, then we will end up in processing duplicate messages.

    Kindly refer session from 4.6 from https://www.baeldung.com/kafka-exactly-once

    Also, for processing.guarantee : exactly_once the following parameters you no need to set explicitly.

    • isolation.level=read_committed
    • enable.idempotence=true
    • MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=5
    Login or Signup to reply.
  2. Kafka exactly-once is essentially a Kafka-Streams feature, although it can be used with regular consumer and producers as well.

    Exactly once can only be achieved in a context where your applications are only interacting with Kafka: there is no XA nor other kind of distributed transactions across technologies that would enable a Kafka consumer interact with some other storage (like Redis) in an exactly-once manner.

    In a distributed world, we have to acknowledge that is not desirable, since it introduce locking, contention, and exponentially degrading performance under load. If we don’t need to be in a distributed world, then we don’t need Kafka and many things become easier.

    Transactions in Kafka are meant to be used within one application that is only interacting with Kafka, it lets you guarantee that the app will 1) read from some topic partitions, 2) write some result in some other topic partitions and 3) commit the read offsets related to 1, or do none of those things. If several apps are put back-to-back and interacting through Kafka in such manner, then you could achieve exactly once if you’re very careful. If your consumer needs to 4) interact with Redis 5) interact with some other storage or do some side effect somewhere (like sending an email or so), then there is in general no way to perform steps 1,2,3,4,5 atomically as part of a distributed application. You can achieve this kind of things with other storage technologies (yes, Kafka is essentially a storage), but they cannot be distributed and your application cannot either. That’s essentially what the CAP theorem tells us.

    That’s also why exactly-once is essentially a Kafka-streams stuff: Kafka Stream is just a smart wrapper around the Kafka consumer/producer clients that lest you build applications that interact only with Kafka.

    You can also achieve exactly-once streaming processing with other data-processing framework, like Spark Streaming or Flink.

    In practice it’s often much simpler to not bother with transactions and just de-duplicate in the consumer. You have the guarantee that at max one consumer of the consumer group is connected to each partition at any point in time, so duplicates will always happen in the same instance of your app (until it re-scales), and, depending on your config, the duplication should typically only happen within one single Kafka consumer buffer, so you don’t need to store much state in your consumer to de-duplicate. If you use some kind of event-ids that can only increase for example (which is essentially what the Kafka offset is BTW, and it’s no coincidence), then you just need to keep in the state of each instance of your app the maximum event-id per partition that you’ve successfully processed.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search