I am using Kafka 10.0 and https://github.com/Shopify/sarama.
I am trying to get the offset of the latest message that a consumer processed.
To do so I’ve found the method NewOffsetManagerFromClient(group string, client Client) which require the group name.
How do I get consumer group name?
offsets := make(map[int32]int64)
config := sarama.NewConfig()
config.Consumer.Offsets.CommitInterval = 200 * time.Millisecond
config.Version = sarama.V0_10_0_0
// config.Consumer.Offsets.Initial = sarama.OffsetNewest
cli, _ := sarama.NewClient(kafkaHost, config)
defer cli.Close()
offsetManager, _ := sarama.NewOffsetManagerFromClient(group, cli)
for _, partition := range partitions {
partitionOffsetManager, _ := offsetManager.ManagePartition(topic, partition)
offset, _ := partitionOffsetManager.NextOffset()
offsets[partition] = offset
}
return offsets
I created a consumer with
consumer := sarama.NewConsumer(connections, config)
but I do not know how to create a consumer group and get its group name.
2
Answers
You are attempting to create your own offset manager to find current offsets:
Similarly, the consumer that was consuming your topic’s messages would have to use the same offset manager and they would have used a specific group id. Use that group id.
I think you can use any string as groupId. Please look at the example from sarama GoDoc
Maybe you can give it any string. And you should make sure the other consumers can get the same groupId for joining the group.