I am using a Ubuntu 20.04.4 server with 2xNVidia A100 GPUs. Spark (3.3.0) works fine normally, but when I try to use GPUs through RAPIDS, it just keeps waiting without loading data. I tried loading data as CSV and parquet files, but it fails. The current way in which I am invoking the GPU is shown below, though I have tried many combinations I could find on the internet. I also used spark-submit
to submit jobs which caused same problems shown below. I would be grateful for any help in fixing the errors.
$ nvidia-smi
Mon Aug 8 17:00:05 2022
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 495.29.05 Driver Version: 495.29.05 CUDA Version: 11.5 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
| | | MIG M. |
|===============================+======================+======================|
| 0 NVIDIA A100-PCI... Off | 00000000:25:00.0 Off | 0 |
| N/A 26C P0 35W / 250W | 0MiB / 40536MiB | 0% Default |
| | | Disabled |
+-------------------------------+----------------------+----------------------+
| 1 NVIDIA A100-PCI... Off | 00000000:E1:00.0 Off | 0 |
| N/A 24C P0 35W / 250W | 0MiB / 40536MiB | 33% Default |
| | | Disabled |
+-------------------------------+----------------------+----------------------+
+-----------------------------------------------------------------------------+
| Processes: |
| GPU GI CI PID Type Process name GPU Memory |
| ID ID Usage |
|=============================================================================|
| No running processes found |
+
The errors I get are as follows:
$ echo $SPARK_RAPIDS_PLUGIN_JAR
/home/softy/soft/rapids-4-spark/rapids-4-spark_2.12-22.06.0-cuda11.jar
(base) softy@genome:~/spark/jclust-3.3.0-gpu$ spark-shell
> --conf spark.executor.resource.gpu.amount=1
> --conf spark.task.resource.gpu.amount=1
> --conf spark.executor.resource.gpu.discoveryScript=/home/softy/soft/spark-3.3.0-scala2.12/examples/src/main/scripts/getGpusResources.sh
> --num-executors 1
> --conf spark.executor.cores=10
> --conf spark.rapids.sql.concurrentGpuTasks=1
> --conf spark.sql.files.maxPartitionBytes=512m
> --conf spark.sql.shuffle.partitions=10
> --conf spark.rapids.sql.explain=ALL
> --driver-memory=200g
> --conf spark.local.dir=/tmp
> --conf spark.rpc.message.maxSize=2047
> --conf spark.plugins=com.nvidia.spark.SQLPlugin
> --jars ${SPARK_RAPIDS_PLUGIN_JAR}
22/08/08 17:27:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/08 17:27:22 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
22/08/08 17:27:22 WARN ResourceUtils: The configuration of cores (exec = 10 task = 1, runnable tasks = 10) will result in wasted resources due to resource gpu limiting the number of runnable tasks per executor to: 1. Please adjust your configuration.
22/08/08 17:27:23 WARN RapidsPluginUtils: RAPIDS Accelerator 22.06.0 using cudf 22.06.0.
22/08/08 17:27:23 WARN RapidsPluginUtils: RAPIDS Accelerator is enabled, to disable GPU support set `spark.rapids.sql.enabled` to false.
22/08/08 17:27:23 WARN ResourceUtils: The configuration of cores (exec = 256 task = 1, runnable tasks = 256) will result in wasted resources due to resource gpu limiting the number of runnable tasks per executor to: 1. Please adjust your configuration.
22/08/08 17:27:30 WARN RapidsConf: CUDA runtime/driver does not support the ASYNC allocator, falling back to ARENA
Spark context Web UI available at http://genome:4040
Spark context available as 'sc' (master = local[*], app id = local-1659959843286).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 3.3.0
/_/
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 17.0.3-internal)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.read.format("csv").option("delimiter", "t").option("header", "true").csv("t.csv").show(4)
22/08/08 17:27:36 WARN GpuOverrides:
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
@Partitioning <SinglePartition$> could run on GPU
*Exec <FilterExec> will run on GPU
*Expression <GreaterThan> (length(trim(value#0, None)) > 0) will run on GPU
*Expression <Length> length(trim(value#0, None)) will run on GPU
*Expression <StringTrim> trim(value#0, None) will run on GPU
!Exec <FileSourceScanExec> cannot run on GPU because unsupported file format: org.apache.spark.sql.execution.datasources.text.TextFileFormat
22/08/08 17:27:39 WARN Signaling: Cancelling all active jobs, this can take a while. Press Ctrl+C again to exit now.
org.apache.spark.SparkException: Job 0 cancelled as part of cancellation of all jobs
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:2554)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$doCancelAllJobs$2(DAGScheduler.scala:1067)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.doCancelAllJobs(DAGScheduler.scala:1066)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2825)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:112)
at org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:65)
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:62)
at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:210)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:207)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:411)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:537)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:443)
... 47 elided
scala>
scala> val columns = Seq("Name", "X1", "X2", "X3", "X4")
columns: Seq[String] = List(Name, X1, X2, X3, X4)
scala> val data = Seq(("id1", "1", "2", "3", "4"),("id2", "2", "2", "1", "8"),("id3", "1", "2", "5", "8"))
data: Seq[(String, String, String, String, String)] = List((id1,1,2,3,4), (id2,2,2,1,8), (id3,1,2,5,8))
scala> val rdd = spark.sparkContext.parallelize(data)
rdd: org.apache.spark.rdd.RDD[(String, String, String, String, String)] = ParallelCollectionRDD[6] at parallelize at <console>:23
scala> spark.createDataFrame(rdd).toDF(columns:_*).show()
22/08/08 17:28:04 WARN GpuOverrides:
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
@Partitioning <SinglePartition$> could run on GPU
*Exec <ProjectExec> will run on GPU
*Expression <Alias> _1#21 AS Name#46 will run on GPU
*Expression <Alias> _2#22 AS X1#47 will run on GPU
*Expression <Alias> _3#23 AS X2#48 will run on GPU
*Expression <Alias> _4#24 AS X3#49 will run on GPU
*Expression <Alias> _5#25 AS X4#50 will run on GPU
! <SerializeFromObjectExec> cannot run on GPU because not all expressions can be replaced; GPU does not currently support the operator class org.apache.spark.sql.execution.SerializeFromObjectExec
@Expression <Alias> staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._1, true, false, true) AS _1#21 could run on GPU
! <StaticInvoke> staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._1, true, false, true) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
! <Invoke> knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._1 cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.Invoke
!Expression <KnownNotNull> knownnotnull(assertnotnull(input[0, scala.Tuple5, true])) cannot run on GPU because input expression AssertNotNull assertnotnull(input[0, scala.Tuple5, true]) (ObjectType(class scala.Tuple5) is not supported); expression KnownNotNull knownnotnull(assertnotnull(input[0, scala.Tuple5, true])) produces an unsupported type ObjectType(class scala.Tuple5)
! <AssertNotNull> assertnotnull(input[0, scala.Tuple5, true]) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
! <BoundReference> input[0, scala.Tuple5, true] cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.BoundReference
@Expression <Alias> staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._2, true, false, true) AS _2#22 could run on GPU
! <StaticInvoke> staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._2, true, false, true) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
! <Invoke> knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._2 cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.Invoke
!Expression <KnownNotNull> knownnotnull(assertnotnull(input[0, scala.Tuple5, true])) cannot run on GPU because input expression AssertNotNull assertnotnull(input[0, scala.Tuple5, true]) (ObjectType(class scala.Tuple5) is not supported); expression KnownNotNull knownnotnull(assertnotnull(input[0, scala.Tuple5, true])) produces an unsupported type ObjectType(class scala.Tuple5)
! <AssertNotNull> assertnotnull(input[0, scala.Tuple5, true]) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
! <BoundReference> input[0, scala.Tuple5, true] cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.BoundReference
@Expression <Alias> staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._3, true, false, true) AS _3#23 could run on GPU
! <StaticInvoke> staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._3, true, false, true) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
! <Invoke> knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._3 cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.Invoke
!Expression <KnownNotNull> knownnotnull(assertnotnull(input[0, scala.Tuple5, true])) cannot run on GPU because input expression AssertNotNull assertnotnull(input[0, scala.Tuple5, true]) (ObjectType(class scala.Tuple5) is not supported); expression KnownNotNull knownnotnull(assertnotnull(input[0, scala.Tuple5, true])) produces an unsupported type ObjectType(class scala.Tuple5)
! <AssertNotNull> assertnotnull(input[0, scala.Tuple5, true]) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
! <BoundReference> input[0, scala.Tuple5, true] cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.BoundReference
@Expression <Alias> staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._4, true, false, true) AS _4#24 could run on GPU
! <StaticInvoke> staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._4, true, false, true) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
! <Invoke> knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._4 cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.Invoke
!Expression <KnownNotNull> knownnotnull(assertnotnull(input[0, scala.Tuple5, true])) cannot run on GPU because input expression AssertNotNull assertnotnull(input[0, scala.Tuple5, true]) (ObjectType(class scala.Tuple5) is not supported); expression KnownNotNull knownnotnull(assertnotnull(input[0, scala.Tuple5, true])) produces an unsupported type ObjectType(class scala.Tuple5)
! <AssertNotNull> assertnotnull(input[0, scala.Tuple5, true]) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
! <BoundReference> input[0, scala.Tuple5, true] cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.BoundReference
@Expression <Alias> staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._5, true, false, true) AS _5#25 could run on GPU
! <StaticInvoke> staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._5, true, false, true) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
! <Invoke> knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._5 cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.Invoke
!Expression <KnownNotNull> knownnotnull(assertnotnull(input[0, scala.Tuple5, true])) cannot run on GPU because input expression AssertNotNull assertnotnull(input[0, scala.Tuple5, true]) (ObjectType(class scala.Tuple5) is not supported); expression KnownNotNull knownnotnull(assertnotnull(input[0, scala.Tuple5, true])) produces an unsupported type ObjectType(class scala.Tuple5)
! <AssertNotNull> assertnotnull(input[0, scala.Tuple5, true]) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
! <BoundReference> input[0, scala.Tuple5, true] cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.BoundReference
! <ExternalRDDScanExec> cannot run on GPU because not all expressions can be replaced; GPU does not currently support the operator class org.apache.spark.sql.execution.ExternalRDDScanExec
!Expression <AttributeReference> obj#20 cannot run on GPU because expression AttributeReference obj#20 produces an unsupported type ObjectType(class scala.Tuple5)
[Stage 1:> (0 + 0) / 1]
2
Answers
Most of the time a hang means that Spark could not allocate all of the resources needed to fulfill the resource request. Here you are running in local mode
local[*]
which means that Spark is going to try and allocate a task per CPU thread on your computer. But you have launched Spark withWhich asks Spark to have 1 GPU per executor and 1 GPU per task. I am assuming that you have more cores than just one on your machine so Spark now is stuck it want to allocate X tasks, which will need X GPUs to run, but there is only 1 GPU available. Spark could be a lot better about throwing errors in these deadlock/miss-configuration cases.
When running in local mode you can only use 1 GPU. So the simplest way to launch it is to just remove all of the resource requests.
That should let you run.
@Quiescent
I would suggest you consider creating a Spark cluster. For example, it could be a Spark standalone cluster, Spark on Yarn cluster, or even Spark on K8s cluster.
Maybe starting with Spark standalone cluster is easier in the begining and you just need to start spark master and worker processes as daemon processes firstly.
Make sure Spark Master UI shows the correct CPU, memory, GPU resources.
Then submit Spark job(no matter spark-shell/spark-sql/spark-submit) towards the Spark Standalone cluster.