skip to Main Content

I’m using two microservices in order to communicate with each other using Kafka. On one microservice there is the listener, and on the other microservice, the sender. The problem is that whenever producer sends a message, consumer will get 20 duplicate messages, and i want the consumer to receive only one.

Microservice A:

@Service
public class LocationProducer {

private static final String MAINLOCATION = "mainlocation";

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;

public void sendMessage(String message){

    this.kafkaTemplate.send(MAINLOCATION,message);
}



kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

Microservice B:

@Service
public class LocationConsumer {

@Autowired
LocationService locationService;

@KafkaListener(topics = "mainlocation", groupId = "group_id")
public void consume(String location){
    locationService.saveLocation(new Location(1,location));
}




kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Docker compose code:

zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPIC: "mainlocation:1:3"

Aditional information:

  1. When doing a debug in intelij, if i put a breakpoint on:

    this.kafkaTemplate.send(MAINLOCATION,message);

    This method will be executed twice

  2. On the consumer side, after it receives the message it tries to save into a mongoDB and it fails with an exception ( might be relevant for this i don’t know )

  3. I even tried setting autocommit to true, configured max.poll seconds and the problem is the same.

2

Answers


  1. Chosen as BEST ANSWER

    Problem Solved :

    @Bean
    DefaultErrorHandler eh() {
        return new DefaultErrorHandler((rec, ex) -> {
            System.out.println(ex.getMessage());
        }, new FixedBackOff(0L, 0L));
    }
    

    By this, we are defining a Error Handler for Kafka that tells it to stop retrying after getting errors/exceptions.


  2. A couple of pointers that might help.

    • You’re setting the consumers to earliest. I’m not familiar with that particular docker image, but if you’re not pruning the volumes between test attempts, it’s possible that when you start your application it’ll fetch the records from your previous attempt – that might be causing the listener logic to be executed twice. If you’re triggering sending manually, you might set this to latest.

    • You should check from the producer side if there are any errors that might be triggering retrials. You might want to check the topic using Kafka’s console tools to see how many records are in fact in the topic after the producer is triggered.

    • You mention the execution fails. By default, Spring Kafka will retry 10 times if record consumption fails. You should either make your service idempotent or disable retries if that’s a problem. You can also specify fatal exceptions that should not be retried.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search