skip to Main Content

I have a kafka topic which receives messages like

Kafka key: serverId
value: server host name

I just need the latest ones for each serverId so it’s a very small segment size and log compaction is enabled. So what I do is the following

@KafkaListener(topics = SERVERS_KAFKA_TOPIC, id = "#{T(java.util.UUID).randomUUID().toString()}",
  properties = {
    "spring.json.key.default.type=java.lang.String",
    "spring.json.value.default.type=java.lang.String"
  })
  public void registerServer(
    @Payload(required = false) String serverHostName
    ) {

Basically I create a new consumer group per listener because I am going by the assumption that if it is the same consumer group, it will continue where the last one left off and the state across a replica set will not be what I am expecting.

Of course it sort of makes the list of consumer groups very annoying. I was wondering short of creating another shared service like Redis to store the data since I can’t really store using Redis as the data that I build from the message is a GRPC client, can I configure the listener to always start from the beginning and ignore the other consumers in the group?

2

Answers


  1. Chosen as BEST ANSWER

    The seekToBeginning() method that gets provided by AbstractConsumerSeekAware didn't seek from beginning when I tried. However, this code did work

    /**
     * Reset to the beginning
     *
     * <p>{@inheritDoc}
     */
    @Override
    public void onPartitionsAssigned(
            @NotNull Map<TopicPartition, Long> assignments,
            @NotNull ConsumerSeekCallback callback) {
    
        assignments.keySet().stream()
                .filter(topicPartition -> SERVERS_KAFKA_TOPIC.equals(topicPartition.topic()))
                .forEach(
                        topicPartition ->
                                callback.seekToBeginning(
                                        topicPartition.topic(), topicPartition.partition()));
    }
    

    Also I needed it for a specific topic not for ALL topics as such the approach I found also limits it to a specific topic.


  2. Make your listener bean extend AbstractConsumerSeekAware and in onPartitionsAssigned, call seekToBeginning().

        /**
         * Seek all assigned partitions to the beginning.
         * @since 2.6
         */
        public void seekToBeginning() {
    

    https://docs.spring.io/spring-kafka/docs/current/reference/html/

    How to implement ConsumerSeekAware in Spring-kafka?

    Is there any way to get the eldest available offset for a kafka topic

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search