I want to control the throughtput of a JDBC source kafka connector.
I have a lot of data stored in a PostgreSQL table and I wand to ingest it into a Kafka topic. However I would like to avoid a huge "peak" in the ingestion.
My config looks like:
{
"name": "my-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"topic.prefix": "my-topic",
"connection.url": "jdbc:postgresql://localhost:5432/my-db",
"connection.user": "user",
"connection.password": "password",
"mode": "timestamp",
"timestamp.column.name": "time",
"poll.interval.ms": "10000",
"batch.max.rows": "100",
"query": "SELECT * FROM my-table",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
I guess I would need to play with these parameters:
poll.interval.ms
batch.max.rows
I don’t understand how they impact the throughput. With these values it goes really fast.
How can I configure it properly to slow it down?
Edit: the idea looks like KIP-731 and the propose to limit record rate.
2
Answers
I finally manage to pass a custom
query
to the connector configuration toLIMIT
the number of messages returned by query:The import part is the
query
parameter with:Note the
--
at the end of the query to comment what the connector append to the query and control it yourself.The other important parameter is
poll.interval.ms
set to"1000"
.Thus, here I process around 500 msg/s. You can increase the number in the SQL query after
LIMIT
to increase the throughput.Note: I just see
query.suffix
argument to the connector which may help to pass aLIMIT
to the SQL query. I didn't try it. Check the doc.You currently have
batch.max.rows=100
which is the defaulthttps://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html#connector
Once 100 rows are included in the batch, the connector will send the batch to the Kafka topic. If you want to increase throughput you should try increasing this value.