skip to Main Content

I am trying to read data from mongodb in batch
My compute cluster configurations are:

  • Spark Version > 3.5.0
  • Scala Version > 2.12

I have installed mongodb connector library version is(through maven):

  • org.mongodb.spark:mongo-spark-connector_2.12:10.4.0

I have my schemas defined in a separate cell in the notebook

    StructField("_id", StringType(), True),
    StructField("channelId", StringType(), True),
    StructField("channelInformation", StructType([
        StructField("channelDefinition", StructType([
            StructField("channelName", StringType(), True),
            StructField("subChannelName", StringType(), True)
        ]), True)
    ]), True),
    StructField("componentId", StringType(), True),
    StructField("connectorId", StringType(), True),
    StructField("createdAt", StringType(), True),
    StructField("customer", StructType([
        StructField("email", StringType(), True),
        StructField("displayName", StringType(), True)
    ]), True),
    StructField("displayFinancialStatus", StringType(), True),
    StructField("displayFulfillmentStatus", StringType(), True),
    StructField("disputes", ArrayType(StructType([
        StructField("id", StringType(), True),
        StructField("status", StringType(), True),
        StructField("initiatedAs", StringType(), True)
    ])), True),
    StructField("disputesInternal", ArrayType(StringType()), True),
    StructField("id", StringType(), True),
    StructField("lineItems", StructType([
        StructField("edges", ArrayType(StructType([
            StructField("node", StructType([
                StructField("variant", StructType([
                    StructField("sku", StringType(), True)
                ]), True)
            ]), True)
        ])), True)
    ]), True),
    StructField("name", StringType(), True),
    StructField("note", StringType(), True),
    StructField("riskLevel", StringType(), True),
    StructField("tags", ArrayType(StringType()), True),
    StructField("tenantId", StringType(), True),
    StructField("transactions", ArrayType(StructType([
        StructField("authorizationExpiresAt", StringType(), True)
    ])), True)
])

In another cell i have my code that reads data from mongodb


# Define your MongoDB connection details
connectionString = "mongodb+srv://user:xyz@connection_string/"
database = tenant


for component_id, schema in schemas.items():
    options = {
        "spark.mongodb.connection.uri": connectionString,
        "spark.mongodb.database": database,
        "spark.mongodb.collection": f"Persistence_{component_id}",
    }
    df = (spark.read.format("mongodb")
          .options(**options)
          .schema(schema)
          .load())

Now when i call df.show() in the next cell it gives the following error:

: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(Lscala/collection/immutable/Seq;Lorg/apache/spark/sql/catalyst/analysis/Analyzer;)Lorg/apache/spark/sql/catalyst/encoders/ExpressionEncoder;
    at com.mongodb.spark.sql.connector.schema.SchemaToExpressionEncoderFunction.apply(SchemaToExpressionEncoderFunction.java:97)
    at com.mongodb.spark.sql.connector.schema.RowToInternalRowFunction.<init>(RowToInternalRowFunction.java:41)
    at com.mongodb.spark.sql.connector.schema.BsonDocumentToRowConverter.<init>(BsonDocumentToRowConverter.java:111)
    at com.mongodb.spark.sql.connector.read.MongoBatch.<init>(MongoBatch.java:46)
    at com.mongodb.spark.sql.connector.read.MongoScan.toBatch(MongoScan.java:79)
    at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:54)
    at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:54)
    at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.$anonfun$inputPartitions$2(BatchScanExec.scala:72)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputPartitions$lzycompute(BatchScanExec.scala:72)
    at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputPartitions(BatchScanExec.scala:70)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar(DataSourceV2ScanExecBase.scala:172)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar$(DataSourceV2ScanExecBase.scala:168)
    at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.supportsColumnar(BatchScanExec.scala:44)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.apply(DataSourceV2Strategy.scala:184)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:78)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:78)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:119)
    at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:106)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$5(QueryPlanner.scala:104)
    at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
    at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$4(QueryPlanner.scala:101)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:119)
    at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:106)
    at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:1112)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:488)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:471)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$5(QueryExecution.scala:613)
    at org.apache.spark.sql.execution.SQLExecution$.withExecutionPhase(SQLExecution.scala:143)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$4(QueryExecution.scala:613)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1177)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:612)
    at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:608)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1184)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:608)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhaseWithTracker$1(QueryExecution.scala:625)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:195)
    at org.apache.spark.sql.execution.QueryExecution.executePhaseWithTracker(QueryExecution.scala:625)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:475)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:471)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$_executedPlan$1(QueryExecution.scala:504)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1184)
    at org.apache.spark.sql.execution.QueryExecution._executedPlan$lzycompute(QueryExecution.scala:504)
    at org.apache.spark.sql.execution.QueryExecution._executedPlan(QueryExecution.scala:499)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:573)
    at com.databricks.sql.transaction.tahoe.metering.DeltaMetering$.reportUsage(DeltaMetering.scala:229)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$10(SQLExecution.scala:655)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:793)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:333)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1184)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:204)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:730)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4805)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:3544)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:3775)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:397)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:433)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
    at py4j.Gateway.invoke(Gateway.java:306)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
    at java.lang.Thread.run(Thread.java:750)
File <command-1524629330570308>, line 1
----> 1 df.show()
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     45 start = time.perf_counter()
     46 try:
---> 47     res = func(*args, **kwargs)
     48     logger.log_success(
     49         module_name, class_name, function_name, time.perf_counter() - start, signature
     50     )
     51     return res
File /databricks/spark/python/pyspark/sql/dataframe.py:1072, in DataFrame.show(self, n, truncate, vertical)
    983 def show(self, n: int = 20, truncate: Union[bool, int] = True, vertical: bool = False) -> None:
    984     """
    985     Prints the first ``n`` rows of the DataFrame to the console.
    986 
   (...)
   1070     name | This is a super l...
   1071     """
-> 1072     print(self._show_string(n, truncate, vertical))
File /databricks/spark/python/pyspark/sql/dataframe.py:1090, in DataFrame._show_string(self, n, truncate, vertical)
   1084     raise PySparkTypeError(
   1085         error_class="NOT_BOOL",
   1086         message_parameters={"arg_name": "vertical", "arg_type": type(vertical).__name__},
   1087     )
   1089 if isinstance(truncate, bool) and truncate:
-> 1090     return self._jdf.showString(n, 20, vertical)
   1091 else:
   1092     try:
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args)
   1349 command = proto.CALL_COMMAND_NAME +
   1350     self.command_header +
   1351     args_command +
   1352     proto.END_COMMAND_PART
   1354 answer = self.gateway_client.send_command(command)
-> 1355 return_value = get_return_value(
   1356     answer, self.gateway_client, self.target_id, self.name)
   1358 for temp_arg in temp_args:
   1359     if hasattr(temp_arg, "_detach"):
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:255, in capture_sql_exception.<locals>.deco(*a, **kw)
    252 from py4j.protocol import Py4JJavaError
    254 try:
--> 255     return f(*a, **kw)
    256 except Py4JJavaError as e:
    257     converted = convert_exception(e.java_exception)
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:n{3}n".
    332         format(target_id, ".", name, value))

2

Answers


  1. Chosen as BEST ANSWER

    I installed the spark-mongo-connector version 2.12: 10.3.0 on compute cluster through Maven and the code started working for me.

    Previously i tested with versions 2.12 : 10.4.0 and 2.13 : 10.4.0 both of them did not work.

    The possible reason is that spark version - 3.5.0 was not compatible with the 10.4.0 version of the connector.

    You can install the library by going to the libraries tab in the cluster settings as below:

    Select the Maven as library source and click Search Packages

    Now select Maven Central and type spark-mongodb in the search bar

    Now select version 10.3.0 and make sure that your clusters scala version matches with the connector's scala version


  2. The error you are encountering, java.lang.NoSuchMethodError,
    indicating it is not matching between the versions of libraries youare using in your Spark environment.

    I have tried the below package for mongo-spark-connector, so first of all try to install it like this:

    !pip install --upgrade pymongo[srv],pyspark,pyarrow
    

    Results:

    Downloading dnspython-2.6.1-py3-none-any.whl (307 kB)
       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0.0/307.7 kB ? eta -:--:--
       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 307.7/307.7 kB 37.8 MB/s eta 0:00:00
    Downloading pymongo-4.9.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.7 MB)
       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0.0/1.7 MB ? eta -:--:--
       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╺━━━━━━━━ 1.3/1.7 MB 38.8 MB/s eta 0:00:01
       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.7/1.7 MB 29.6 MB/s eta 0:00:00
    Installing collected packages: pyarrow, dnspython, pymongo
      Attempting uninstall: pyarrow
        Found existing installation: pyarrow 14.0.1
        Not uninstalling pyarrow at /databricks/python3/lib/python3.11/site-packages, outside environment /local_disk0/.ephemeral_nfs/envs/pythonEnv-f23a457c-5669-49a8-b25a-6bcbb71220ed
        Can't uninstall 'pyarrow'. No files were found to uninstall.
    Successfully installed dnspython-2.6.1 pyarrow-17.0.0 pymongo-4.9.1
    

    Then use the below:

    from pyspark.sql import SparkSession
    spark = (SparkSession.builder
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.4.0")
        .getOrCreate())
    

    enter image description here

    Next you can read from Mongodb using the below code:

    df = spark.read.format("com.mongodb.spark.sql.DefaultSource") .option("spark.mongodb.input.uri", "mongodb:xxxx:<PORT>/<DB>.<COLLECTION>").load()
    

    Reference: How do I connect with spark to mongodb?

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search