skip to Main Content

I am using Kafka 10.0 and https://github.com/Shopify/sarama.
I am trying to get the offset of the latest message that a consumer processed.

To do so I’ve found the method NewOffsetManagerFromClient(group string, client Client) which require the group name.

How do I get consumer group name?

offsets := make(map[int32]int64)

config := sarama.NewConfig()
config.Consumer.Offsets.CommitInterval = 200 * time.Millisecond
config.Version = sarama.V0_10_0_0

// config.Consumer.Offsets.Initial = sarama.OffsetNewest
cli, _ := sarama.NewClient(kafkaHost, config)
defer cli.Close()

offsetManager, _ := sarama.NewOffsetManagerFromClient(group, cli)
for _, partition := range partitions {
    partitionOffsetManager, _ := offsetManager.ManagePartition(topic, partition)
    offset, _ := partitionOffsetManager.NextOffset()

    offsets[partition] = offset
}
return offsets

I created a consumer with

consumer := sarama.NewConsumer(connections, config)

but I do not know how to create a consumer group and get its group name.

2

Answers


  1. You are attempting to create your own offset manager to find current offsets:

    offsetManager, _ := sarama.NewOffsetManagerFromClient(group, cli)
    

    Similarly, the consumer that was consuming your topic’s messages would have to use the same offset manager and they would have used a specific group id. Use that group id.

    Login or Signup to reply.
  2. I think you can use any string as groupId. Please look at the example from sarama GoDoc

    // Start a new consumer group
    group, err := NewConsumerGroupFromClient("my-group", client)
    if err != nil {
        panic(err)
    }
    defer func() { _ = group.Close() }()
    

    Maybe you can give it any string. And you should make sure the other consumers can get the same groupId for joining the group.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search