skip to Main Content

I have a kafka topic which receives messages like

Kafka key: serverId
value: server host name

I just need the latest ones for each serverId so it’s a very small segment size and log compaction is enabled. So what I do is the following

@KafkaListener(topics = SERVERS_KAFKA_TOPIC, id = "#{T(java.util.UUID).randomUUID().toString()}",
  properties = {
  public void registerServer(
    @Payload(required = false) String serverHostName
    ) {

Basically I create a new consumer group per listener because I am going by the assumption that if it is the same consumer group, it will continue where the last one left off and the state across a replica set will not be what I am expecting.

Of course it sort of makes the list of consumer groups very annoying. I was wondering short of creating another shared service like Redis to store the data since I can’t really store using Redis as the data that I build from the message is a GRPC client, can I configure the listener to always start from the beginning and ignore the other consumers in the group?



  1. Chosen as BEST ANSWER

    The seekToBeginning() method that gets provided by AbstractConsumerSeekAware didn't seek from beginning when I tried. However, this code did work

     * Reset to the beginning
     * <p>{@inheritDoc}
    public void onPartitionsAssigned(
            @NotNull Map<TopicPartition, Long> assignments,
            @NotNull ConsumerSeekCallback callback) {
                .filter(topicPartition -> SERVERS_KAFKA_TOPIC.equals(topicPartition.topic()))
                        topicPartition ->
                                        topicPartition.topic(), topicPartition.partition()));

    Also I needed it for a specific topic not for ALL topics as such the approach I found also limits it to a specific topic.

  2. Make your listener bean extend AbstractConsumerSeekAware and in onPartitionsAssigned, call seekToBeginning().

         * Seek all assigned partitions to the beginning.
         * @since 2.6
        public void seekToBeginning() {

    How to implement ConsumerSeekAware in Spring-kafka?

    Is there any way to get the eldest available offset for a kafka topic

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top