skip to Main Content

I’m building a Go application that reads messages from a Kafka topic and writes them to a PostgreSQL database.

I’ve set up a loop that reads messages from Kafka using a kafka.Reader and inserts them into the database using a sql.DB. If there’s an error reading a message or inserting it into the database, I log the error and continue to the next message.

However, I’m not sure how to handle errors that occur when committing the Kafka message after inserting the data into the PostgreSQL database. Specifically, if the manual commit causes an error, what should I do? Should I retry the commit operation? Should I log the error and continue to the next message? What’s the best practice for handling these types of errors?

for {
        kafkaMessage, err := kafkaReader.ReadMessage(context.Background())
        if err != nil {
            fmt.Printf("Failed to read message from Kafka: %sn", err)
            continue
        }

        _, err = db.Exec("INSERT INTO mytable (payload) VALUES ($1)", kafkaMessage.Value)
        if err != nil {
            fmt.Printf("Failed to insert payload into database: %sn", err)
            continue
        }

        // What should I do if the commit operation fails?
        err = kafkaReader.CommitMessages(context.Background(), kafkaMessage)
        if err != nil {
            // What's the best practice for handling this error?
        }
    }

2

Answers


  1. Generally speaking, business logic should rely on the consistency at the last level of your data. For your case, I assume the data will persist in DB, and this is the only thing that matters, then you should design a consistency model around your DB, find the data properties and design a proper business flow that can help you guarantee the eventual consistency at your DB.

    For your question, it really doesn’t matter whether you handle the error, there’s something unexpected happened at Kafka, you ignore it or what ever, Kafka should continue to the next message if the commit (actually) succeeded, or stay at the current offset if the commit (actually) failed. This is the so-called at-least-once delivery, so you business logic should handle the duplicate message properly.

    If you want exactly-once delivery, that’s much tricker, and I don’t suggest business logic relying on exactly-once delivery. For example, what will happen if the DB inserted the record but network failed when response transmitting back to your application? Will you assume that the network was lost after DB commit or before DB commit? Kafka can guarantee exactly-once delivery when your request arrived at the Kafka server, but outside the Kafka server’s scope, it can’t help you much.

    Login or Signup to reply.
  2. When facing an error just continue to the next iteration of the for loop.

    If it fails to commit the Kafka message due to any reason, Kafka will return the same message again in the next reader.ReadMessage(ctx).

    But to make sure that your code does not continue futilely to do the same failing job many times, exhaust your resource, flooding the logs with the same error messages, etc. use a simple sleep after each error, or if really needed use a circuit breaker logic for your function.

    if err != nil {
       log.Errorf("...", ...)
       time.Sleep(5 * time.Second)
       continue
    }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search