skip to Main Content

I want to control the throughtput of a JDBC source kafka connector.

I have a lot of data stored in a PostgreSQL table and I wand to ingest it into a Kafka topic. However I would like to avoid a huge "peak" in the ingestion.

My config looks like:

{
    "name": "my-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": "1",
        "topic.prefix": "my-topic",
        "connection.url": "jdbc:postgresql://localhost:5432/my-db",
        "connection.user": "user",
        "connection.password": "password",
        "mode": "timestamp",
        "timestamp.column.name": "time",
        "poll.interval.ms": "10000",
        "batch.max.rows": "100",
        "query": "SELECT * FROM my-table",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
    }
}

I guess I would need to play with these parameters:

  • poll.interval.ms
  • batch.max.rows

I don’t understand how they impact the throughput. With these values it goes really fast.

How can I configure it properly to slow it down?

Edit: the idea looks like KIP-731 and the propose to limit record rate.

2

Answers


  1. Chosen as BEST ANSWER

    I finally manage to pass a custom query to the connector configuration to LIMIT the number of messages returned by query:

    {
      "name": "jdbc-source",
      "tasks.max": "1",
      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
      "connection.url": "jdbc:postgresql://localhost:5432/my-db",
      "connection.user": "<user>",
      "connection.password": "<password>",
      "topic.prefix": "my-topic",
      "mode": "timestamp",
      "timestamp.column.name": "time",
      "query": " SELECT * FROM (SELECT * FROM public.table WHERE "time" > ? AND "time" < ? ORDER BY "time" ASC ) t LIMIT 500 -- ",
      "batch.max.rows": "500",
      "poll.interval.ms": "1000",
      "value.converter.schemas.enable": "false",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
    

    The import part is the query parameter with:

    SELECT * FROM (SELECT * FROM public.table WHERE "time" > ? AND "time" < ? ORDER BY "time" ASC ) t LIMIT 500 --
    

    Note the -- at the end of the query to comment what the connector append to the query and control it yourself.

    The other important parameter is poll.interval.ms set to "1000".

    Thus, here I process around 500 msg/s. You can increase the number in the SQL query after LIMIT to increase the throughput.

    Note: I just see query.suffix argument to the connector which may help to pass a LIMIT to the SQL query. I didn't try it. Check the doc.


  2. You currently have batch.max.rows=100 which is the default

    https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html#connector

    Once 100 rows are included in the batch, the connector will send the batch to the Kafka topic. If you want to increase throughput you should try increasing this value.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search