skip to Main Content

We have several collections in Mongo based on n tenants and want the kafka connector to only watch for specific collections.

Below is my mongosource.properties file where I have added the pipeline filter to listen only to specific collections.It works

pipeline=[{$match:{“ns.coll”:{"$in":[“ecom-tesla-cms-instance”,“ca-tesla-cms-instance”,“ecom-tesla-cms-page”,“ca-tesla-cms-page”]}}}]

the collections will grow in the future to maybe 200 collections which have to be watched, wanted to know the below three things

  1. is there some performance impact with one connector listening to huge number of collections ?
  2. is there any limit on the collections one connector can watch ?
  3. what would be the best practice, to run one connector listening to 100 collections or 10 different connectors listening to 10 collections each ?

2

Answers


  1. Best practice would say to run many connectors, where "many" depends on your ability to maintain the overhead of them all.

    Reason being – one connector creates a single point of failure (per task, but only one task should be assigned to any collection at a time, to prevent duplicates). If the Connect task fails with a non-retryable error, then that will halt the connector’s tasks completely, and stop reading from all collections assigned to that connector.

    You could also try Debezium, which might have less resource usage than the Mongo Source Connector since it acts as a replica rather than querying the collection at an interval.

    Login or Signup to reply.
  2. You can listen to multiple change streams from multiple mongo collections, you just need to provide the suitable Regex for the collection names in pipeline. You can even exclude collection/collections by providing the Regex from where you don’t want to listen to any change streams.

    "pipeline": "[{"$match":{"$and":[{"ns.db":{"$regex":/^database-name$/}},{"ns.coll":{"$regex":/^collection_.*/}}]}}]"  
    

    You can even exclude any given database using $nin, which you don’t want to listen for any change-stream.

    "pipeline": "[{"$match":{"$and":[{"ns.db":{"$regex":/^database-name$/,"$nin":[/^any_database_name$/]}},{"ns.coll":{"$regex":/^collection_.*/}}]}}]"
    

    Coming to your questions:

    1. Is there some performance impact with one connector listening to huge number of collections?

      • To my knowledge I don’t think so, since it is not mentioned anywhere in the docs. You can listen to multiple mongo collections using a single connector.
    2. Is there any limit on the collections one connector can watch?

      • Again to my knowledge there is no limit mentioned in docs.
    3. What would be the best practice, to run one connector listening to 100 collections or 10 different connectors listening to 10 collections each?

      • From my point of view it will be an overhead to create an N number of Kafka connectors for each collection, make sure you provide fault tolerance using recommended configurations, just don’t rely on a default configuration of connector.

    Here is the basic Kafka connector configuration.

    Mongo to Kafka source connector

    {
      "name": "mongo-to-kafka-connect",
      "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
        "publish.full.document.only": "true",
        "tasks.max": "3",
        "key.converter.schemas.enable": "false",
        "topic.creation.enable": "true",
        "poll.await.time.ms": 1000,
        "poll.max.batch.size": 100,
        "topic.prefix": "any prefix for topic name",
        "output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
        "connection.uri": "mongodb://<username>:<password>@ip:27017,ip:27017,ip:27017,ip:27017/?authSource=admin&replicaSet=xyz&tls=true",
        "value.converter.schemas.enable": "false",
        "copy.existing": "true",
        "topic.creation.default.replication.factor": 3,
        "topic.creation.default.partitions": 3,
        "topic.creation.compacted.cleanup.policy": "compact",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "mongo.errors.log.enable": "true",
        "heartbeat.interval.ms": 10000,
        "pipeline": "[{"$match":{"$and":[{"ns.db":{"$regex":/^database-name$/}},{"ns.coll":{"$regex":/^collection_.*/}}]}}]"
      }
    }
    

    You can get more details from official docs.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search