In production, we have a microservice with two instances and consume from the same topic but with the same group id(and different clientId).
As a result, if we have 5 partitions, the first instance takes three and the other one takes two.
But when we deploy in production,
- We shut down the first instance.
- Then the first instance is up and running
- Then we deploy the second instance and shut down it,
- Finally and the second instance is up and running.
But this process makes Kafka rebalance the partitions when an instance is shut down
The problem is after that we process a unique message from the Kafka partition twice(I check there is no duplicate in Kafka partitions and the message is unique).
I think so during rebalancing and process time an instance does not achieve to commit some messages and the other instance processes it again. Note we have set
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false,
The result is a unique message from Kafka we process it twice.
How can resolve this issue?
Also when we read messages from Kafka we store them in a database and then commit the messages into Kafka.
I am curious how can resolve this issue…
If I use transactional consume
could I resolve this?
And what about if two instances begin transactions with the same messages?
For example
Instance A begins the transaction at t1 timestamp with ids 1,2,3,4 ,5
Instance B begins the transaction at the t2 timestamp(some ms after t1) with ids 4,5,6,7,8,9.
What happened with the messages with the same ids 4,5??
Also, I thought if it is a good choice can handle it by me using for example distributing redis cache and checking the ids.
2
Answers
At least once processing is expected if you’re handling offset commits on your own, but only committing after you’ve successfully processed any record. Specifically, if your consumer rebalances between process post-poll, but before committing, it’ll have to seek back to the last committed offset on rebalance.
Yes, transactions can help, but only within one consumer session. If you rebalance to an entirely new instance without committing, the whole transaction will be consumed again. You need to combine this logic in your own database transaction handling.
The way to workaround this is to either commit more frequently, or centrally store values that have been processed in a highly available datastore. But there’s no reason to add Redis when you already have a database, unless you expect Redis to be faster. (Looking up from this datastore for every event may cause higher consumer lag and network IO, further increasing likelihood consumers will rebalance, so you’ll need to increase poll timeout configuration)
You have two options:
Deal with the duplicate reads by making your processing idempotent. Eg check in your database whether you’ve already stored the data and drop the record if so.
Handle the offset commits yourself. There’s a good explanation in the Storing Offsets Outside Kafka section of the KafkaConsumer class of the Java Client. Basically, it will allow you to transact atomically both the offset and the actual data in your database. Note that you’ll need to handle the rebalance events yourself, there’s an example of how to do that here: ConsumerRebalanceListener.
It seems that you’re using the .NET client, so the equivalent would be to use ConsumerBuilder#SetPartitionsRevokedHandler.