skip to Main Content

I have a mongo db with multiple collections (let’s say 3). I want to join them all so that I can do some aggregations. I was able to connect to a local db and one collection using documentation found oneline but I want to be able to read multiple collections.

from pyspark.sql import SparkSession
my_spark = SparkSession 
    .builder 
    .appName("myApp") 
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") 
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/mydb.collA") 
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/mydb.output") 
    .getOrCreate()
df = my_spark.read.format("mongo").load()
df.printSchema()

The above code lets me read collA into a df. but I want to also be able to read collB, collC and so on.

2

Answers


  1. You can provide the source collection as part of the spark.read:

    val df: DataFrame = spark.read
     .format("com.mongodb.spark.sql.DefaultSource")
     .option("spark.mongodb.input.uri", mongoUri)
     .option("spark.mongodb.input.database", dbName)
     .option("collection", collectionName)
     .load()
    

    If the collections are schema compatible, you can union them in the following way:

    val collections = Seq("collA", "collB", "collC")
    collections.map { collection => 
      spark.read
         .format("com.mongodb.spark.sql.DefaultSource")
         .option("spark.mongodb.input.uri", "mongodb://127.0.0.1")
         .option("spark.mongodb.input.database", "mydb")
         .option("collection", collection)
         .load()
        }.reduce(_ union _)
    
    
    Login or Signup to reply.
  2. You can pass a list of URIs into the spark.mongodb.input.uri config.

    from pyspark.sql import SparkSession
    
    my_spark = SparkSession 
        .builder 
        .appName("myApp") 
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") 
        .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/mydb.collA, mongodb://127.0.0.1/mydb.collB, mongodb://127.0.0.1/mydb.collC") 
        .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/mydb.output") 
        .getOrCreate()
    
    df_collA = my_spark.read.format("mongo").load()
    df_collB = my_spark.read.format("mongo").option("collection", "collB").load()
    df_collC = my_spark.read.format("mongo").option("collection", "collC").load()
    
    df_collA.printSchema()
    df_collB.printSchema()
    df_collC.printSchema()
    

    The first one is the default, but swithing the collection option before the load can lead you into the other collections as well. Source

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search