As far as I know, Spark executors handle many tasks at the same time to guarantee processing data parallelly.Here comes the question. When connecting to external data storage,say mysql,how many tasks are there to finishi this job?In other words,are multiple tasks created at the same time and each task reads all data ,or data is read from only one task and is distributed to the cluster in some other way? How about writing data to mysql,how many connections are there?
Here is some piece of code to read or write data from/to mysql:
def jdbc(sqlContext: SQLContext, url: String, driver: String, dbtable: String, user: String, password: String, numPartitions: Int): DataFrame = {
sqlContext.read.format("jdbc").options(Map(
"url" -> url,
"driver" -> driver,
"dbtable" -> s"(SELECT * FROM $dbtable) $dbtable",
"user" -> user,
"password" -> password,
"numPartitions" -> numPartitions.toString
)).load
}
def mysqlToDF(sparkSession:SparkSession, jdbc:JdbcInfo, table:String): DataFrame ={
var dF1 = sparkSession.sqlContext.read.format("jdbc")
.option("url", jdbc.jdbcUrl)
.option("user", jdbc.user)
.option("password", jdbc.passwd)
.option("driver", jdbc.jdbcDriver)
.option("dbtable", table)
.load()
// dF1.show(3)
dF1.createOrReplaceTempView(s"${table}")
dF1
}
}
2
Answers
here is a good article which answers your question:
https://freecontent.manning.com/what-happens-behind-the-scenes-with-spark/
In simple words: the workers separate the reading task into several parts and each worker only read a part of your input data. The number of tasks divided depends on your ressources and your data volume. The writing is the same principle: Spark writes the data to a distributed storage system, such as Hdfs and in Hdfs the data is stored in a ditributed way: each worker writes its data to some storage node in Hdfs.
By default data from jdbc source are loaded by one thread so you will have one task processed by one executor and thats the case you may expect in your second function mysqlToDF
In the first function "jdbc" you are closer to parallel read but still some parameters are needed, numPartitions is not enough, spark need some integer/date column and lower/upper bounds to be able to read in paralell (it will execute x queries for partial results)
Spark jdb documentation
In this docu you will find:
regarding write
As stated in docu it also depends on numPartitions, if number of partitions when writing will be higher than numPartitions Spark will figure it out and call coalesce. Remember that coalesce may generate skew so sometimes it may be better to repartition it explicitly with repartition(numPartitions) to distribute data equally before write
If you don’t set numPartitions number of paralell connections on write may be the same as number of active tasks in given moment so be aware that with to high parallelism and no upper bound you may choke source server