Now that Golang Kafka library (sarama) is providing consumer group capability without any external library help with kafka 10. How can I get the current message offset being processed by a consumer group at any given time ?
Previously I used kazoo-go (https://github.com/wvanbergen/kazoo-go) to get my consumer group message offset as it is stored in Zookeeper. Now I use sarama-cluster (https://github.com/bsm/sarama-cluster), I am not sure which API to use to get my consumer group message offset.
4
Answers
I am also working with Sarama and Kafka to get offset of a topic.
You can get offset with following code.
Let me know if this is the answer you are looking for and if it is helpful.
Under the hood the
consumerGroupSession
struct is usingPartitionOffsetManager
to get next offset:Here is the documentation of pom.NextOffset().
When a
consumerGroupSession
constructs aconsumerGroupClaim
struct vianewConsumerGroupClaim()
method, it passes offset, returned bypom.NextOffset()
, asoffset
argument. You can access it later viaclaim.InitialOffset()
. After you started consuming messages, you can usemessage.Offset
of the currently processed message.Unfortunately,
consumerGroupSession.offsets.findPOM()
can’t be accessed fromConsumerGroupHandler.ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim)
method, because it receives session as aConsumerGroupSession
interface, not asconsumerGroupSession
struct. So the offsets variable is private and not accessible.Thus we can’t really access
NextOffset()
method, which does precisely what the OP wants.Here’s a sample code to get the consumer group offset (i.e. the offset where the consumer group will start):
I’ve just been doing work on this myself. As @boris-burkov mentioned you don’t have access to the getPOM method, however, you can create a POM yourself and called NextOffset() to get the current consumer’s actual offset: