skip to Main Content

I was trying to make kafka consumer which would collect messages for a particular amount of time after that I could manually commit with the messages that has been collected. But I could not find a method or api from shopify sarama that could be used to commit a message or a batch of messages , Please help out

2

Answers


  1. You could use the Interval params of the AutoCommit field of the Offsets Config:

    // Offsets specifies configuration for how and when to commit consumed
    // offsets. This currently requires the manual use of an OffsetManager
    // but will eventually be automated.
    Offsets struct {
        // Deprecated: CommitInterval exists for historical compatibility
        // and should not be used. Please use Consumer.Offsets.AutoCommit
        CommitInterval time.Duration
    
        // AutoCommit specifies configuration for commit messages automatically.
        AutoCommit struct {
            // Whether or not to auto-commit updated offsets back to the broker.
            // (default enabled).
            Enable bool
    
            // How frequently to commit updated offsets. Ineffective unless
            // auto-commit is enabled (default 1s)
            Interval time.Duration
        }
    

    As example:

    // init (custom) config, enable errors and notifications
    config := cluster.NewConfig()
    ...
    
    // Autocommit after two minutes
    config.Consumer.Offsets.AutoCommit.Interval = 2 * time.Minute
    
    Login or Signup to reply.
  2. With autocommit you don’t have full control on when it’s happening anyway. It’s periodic and happens behind the scenes for you.
    If it’s not ok for you you can also use the ConsumerGroupSession.MarkOffset(topic string, partition int32, offset int64, metadata string) for committing whenever you want (so even after a specific amount of time) an offset as the last of a batch of consumed messages.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search