skip to Main Content

Now that Golang Kafka library (sarama) is providing consumer group capability without any external library help with kafka 10. How can I get the current message offset being processed by a consumer group at any given time ?

Previously I used kazoo-go (https://github.com/wvanbergen/kazoo-go) to get my consumer group message offset as it is stored in Zookeeper. Now I use sarama-cluster (https://github.com/bsm/sarama-cluster), I am not sure which API to use to get my consumer group message offset.

4

Answers


  1. I am also working with Sarama and Kafka to get offset of a topic.

    You can get offset with following code.

        package main
    
        import (
         "gopkg.in/Shopify/sarama"
         "fmt"
        )
    
        func main(){
          client , err := sarama.Client([]string{"localhost:9092"},nil) // I am not giving any configuration
          if err != nil {
              panic(err)
          }
          lastoffset, err := client.GetOffset("topic-test",0,sarama.OffsetNewest)
          if err != nil {
              panic(err)
          }
          fmt.Println("Last Commited Offset ",lastoffset)
        }
    

    Let me know if this is the answer you are looking for and if it is helpful.

    Login or Signup to reply.
  2. Under the hood the consumerGroupSession struct is using PartitionOffsetManager to get next offset:

        if pom := s.offsets.findPOM(topic, partition); pom != nil {
            offset, _ = pom.NextOffset()
        }
    

    Here is the documentation of pom.NextOffset().

    When a consumerGroupSession constructs a consumerGroupClaim struct via newConsumerGroupClaim() method, it passes offset, returned by pom.NextOffset(), as offset argument. You can access it later via claim.InitialOffset(). After you started consuming messages, you can use message.Offset of the currently processed message.

    Unfortunately, consumerGroupSession.offsets.findPOM() can’t be accessed from ConsumerGroupHandler.ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) method, because it receives session as a ConsumerGroupSession interface, not as consumerGroupSession struct. So the offsets variable is private and not accessible.

    Thus we can’t really access NextOffset() method, which does precisely what the OP wants.

    Login or Signup to reply.
  3. Here’s a sample code to get the consumer group offset (i.e. the offset where the consumer group will start):

    package main
        
        import (
            "context"
            "log"
            "strings"
        
            "github.com/Shopify/sarama"
        )
        
        func main() {
            groupName := "testgrp"
            topic := "topic_name"
            offset, e := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
            if e != nil {
                log.Fatal(e)
            }
            log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset)
        }
        
        type gcInfo struct {
            offset int64
        }
        
        func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
            return nil
        }
        
        func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
            return nil
        }
        
        func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
            g.offset = claim.InitialOffset()
            return nil
        }
        
        func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
            config := sarama.NewConfig()
            config.Consumer.Offsets.AutoCommit.Enable = false // we're not going to update the consumer group offsets
            client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
            if err != nil {
                return 0, err
            }
            info := gcInfo{}
            if err := client.Consume(ctx, []string{topic}, &info); err != nil {
                return 0, err
            }
            return info.offset, nil
        }
    
    Login or Signup to reply.
  4. I’ve just been doing work on this myself. As @boris-burkov mentioned you don’t have access to the getPOM method, however, you can create a POM yourself and called NextOffset() to get the current consumer’s actual offset:

    offsetManager, _ := sarama.NewOffsetManagerFromClient(clientName, cl.Client)
    offsetPartitionManager, _ := offsetManager.ManagePartition("test-topic", 0)
    offsetPartitionManager.NextOffset()
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search