I am trying to read data from mongodb in batch
My compute cluster configurations are:
- Spark Version > 3.5.0
- Scala Version > 2.12
I have installed mongodb connector library version is(through maven):
- org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
I have my schemas defined in a separate cell in the notebook
StructField("_id", StringType(), True),
StructField("channelId", StringType(), True),
StructField("channelInformation", StructType([
StructField("channelDefinition", StructType([
StructField("channelName", StringType(), True),
StructField("subChannelName", StringType(), True)
]), True)
]), True),
StructField("componentId", StringType(), True),
StructField("connectorId", StringType(), True),
StructField("createdAt", StringType(), True),
StructField("customer", StructType([
StructField("email", StringType(), True),
StructField("displayName", StringType(), True)
]), True),
StructField("displayFinancialStatus", StringType(), True),
StructField("displayFulfillmentStatus", StringType(), True),
StructField("disputes", ArrayType(StructType([
StructField("id", StringType(), True),
StructField("status", StringType(), True),
StructField("initiatedAs", StringType(), True)
])), True),
StructField("disputesInternal", ArrayType(StringType()), True),
StructField("id", StringType(), True),
StructField("lineItems", StructType([
StructField("edges", ArrayType(StructType([
StructField("node", StructType([
StructField("variant", StructType([
StructField("sku", StringType(), True)
]), True)
]), True)
])), True)
]), True),
StructField("name", StringType(), True),
StructField("note", StringType(), True),
StructField("riskLevel", StringType(), True),
StructField("tags", ArrayType(StringType()), True),
StructField("tenantId", StringType(), True),
StructField("transactions", ArrayType(StructType([
StructField("authorizationExpiresAt", StringType(), True)
])), True)
])
In another cell i have my code that reads data from mongodb
# Define your MongoDB connection details
connectionString = "mongodb+srv://user:xyz@connection_string/"
database = tenant
for component_id, schema in schemas.items():
options = {
"spark.mongodb.connection.uri": connectionString,
"spark.mongodb.database": database,
"spark.mongodb.collection": f"Persistence_{component_id}",
}
df = (spark.read.format("mongodb")
.options(**options)
.schema(schema)
.load())
Now when i call df.show() in the next cell it gives the following error:
: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(Lscala/collection/immutable/Seq;Lorg/apache/spark/sql/catalyst/analysis/Analyzer;)Lorg/apache/spark/sql/catalyst/encoders/ExpressionEncoder;
at com.mongodb.spark.sql.connector.schema.SchemaToExpressionEncoderFunction.apply(SchemaToExpressionEncoderFunction.java:97)
at com.mongodb.spark.sql.connector.schema.RowToInternalRowFunction.<init>(RowToInternalRowFunction.java:41)
at com.mongodb.spark.sql.connector.schema.BsonDocumentToRowConverter.<init>(BsonDocumentToRowConverter.java:111)
at com.mongodb.spark.sql.connector.read.MongoBatch.<init>(MongoBatch.java:46)
at com.mongodb.spark.sql.connector.read.MongoScan.toBatch(MongoScan.java:79)
at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:54)
at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:54)
at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.$anonfun$inputPartitions$2(BatchScanExec.scala:72)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputPartitions$lzycompute(BatchScanExec.scala:72)
at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputPartitions(BatchScanExec.scala:70)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar(DataSourceV2ScanExecBase.scala:172)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar$(DataSourceV2ScanExecBase.scala:168)
at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.supportsColumnar(BatchScanExec.scala:44)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.apply(DataSourceV2Strategy.scala:184)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:78)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:78)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:119)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:106)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$5(QueryPlanner.scala:104)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$4(QueryPlanner.scala:101)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:119)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:106)
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:1112)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:488)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:471)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$5(QueryExecution.scala:613)
at org.apache.spark.sql.execution.SQLExecution$.withExecutionPhase(SQLExecution.scala:143)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$4(QueryExecution.scala:613)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1177)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:612)
at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:608)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1184)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:608)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhaseWithTracker$1(QueryExecution.scala:625)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:195)
at org.apache.spark.sql.execution.QueryExecution.executePhaseWithTracker(QueryExecution.scala:625)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:475)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:471)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$_executedPlan$1(QueryExecution.scala:504)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1184)
at org.apache.spark.sql.execution.QueryExecution._executedPlan$lzycompute(QueryExecution.scala:504)
at org.apache.spark.sql.execution.QueryExecution._executedPlan(QueryExecution.scala:499)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:573)
at com.databricks.sql.transaction.tahoe.metering.DeltaMetering$.reportUsage(DeltaMetering.scala:229)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$10(SQLExecution.scala:655)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:793)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:333)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1184)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:204)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:730)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4805)
at org.apache.spark.sql.Dataset.head(Dataset.scala:3544)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3775)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:397)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:433)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
at py4j.Gateway.invoke(Gateway.java:306)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
at java.lang.Thread.run(Thread.java:750)
File <command-1524629330570308>, line 1
----> 1 df.show()
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
45 start = time.perf_counter()
46 try:
---> 47 res = func(*args, **kwargs)
48 logger.log_success(
49 module_name, class_name, function_name, time.perf_counter() - start, signature
50 )
51 return res
File /databricks/spark/python/pyspark/sql/dataframe.py:1072, in DataFrame.show(self, n, truncate, vertical)
983 def show(self, n: int = 20, truncate: Union[bool, int] = True, vertical: bool = False) -> None:
984 """
985 Prints the first ``n`` rows of the DataFrame to the console.
986
(...)
1070 name | This is a super l...
1071 """
-> 1072 print(self._show_string(n, truncate, vertical))
File /databricks/spark/python/pyspark/sql/dataframe.py:1090, in DataFrame._show_string(self, n, truncate, vertical)
1084 raise PySparkTypeError(
1085 error_class="NOT_BOOL",
1086 message_parameters={"arg_name": "vertical", "arg_type": type(vertical).__name__},
1087 )
1089 if isinstance(truncate, bool) and truncate:
-> 1090 return self._jdf.showString(n, 20, vertical)
1091 else:
1092 try:
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args)
1349 command = proto.CALL_COMMAND_NAME +
1350 self.command_header +
1351 args_command +
1352 proto.END_COMMAND_PART
1354 answer = self.gateway_client.send_command(command)
-> 1355 return_value = get_return_value(
1356 answer, self.gateway_client, self.target_id, self.name)
1358 for temp_arg in temp_args:
1359 if hasattr(temp_arg, "_detach"):
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:255, in capture_sql_exception.<locals>.deco(*a, **kw)
252 from py4j.protocol import Py4JJavaError
254 try:
--> 255 return f(*a, **kw)
256 except Py4JJavaError as e:
257 converted = convert_exception(e.java_exception)
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:n{3}n".
332 format(target_id, ".", name, value))
2
Answers
I installed the spark-mongo-connector version 2.12: 10.3.0 on compute cluster through Maven and the code started working for me.
Previously i tested with versions 2.12 : 10.4.0 and 2.13 : 10.4.0 both of them did not work.
The possible reason is that spark version - 3.5.0 was not compatible with the 10.4.0 version of the connector.
You can install the library by going to the libraries tab in the cluster settings as below:
I have tried the below package for mongo-spark-connector, so first of all try to install it like this:
Results:
Then use the below:
Next you can read from Mongodb using the below code:
Reference: How do I connect with spark to mongodb?