This has been driving me a little crazy so any help is greatly appreciated
I have a list of dataframes df_list
with maybe around 500 small dataframes (I ingested csvs and wrote them as parquets, then I read each one as a spark df and appent to list)
and I’m trying to merge them like this
merged_df = reduce(lambda left, right: left.unionByName(right, allowMissingColumns=True), df_list)
the problem is this is only using a single core, and is bottlenecked, I don’t understand why my other cores aren’t being activated (I’m viewing htop)
I also tried this
df_list = [df.repartition(16) for df in df_list] # Adjust the number based on your cluster size
merged_df = df_list[0]
for df in df_list[1:]:
merged_df = merged_df.unionByName(df, allowMissingColumns=True)
no luck. How can I get more core activation? How can I get spark to distribute this merge accross multiple cores/executors? Does this question even make sense and if not, what am I fundamentally misunderstanding here?
I have a 128 gb memory, 32 core ec2 instance.
Below is the environment, basically I have a spark master and 1 worker (with 5 executors, each 4 core, 16 gb ram) running on a single ubuntu machine, I’m running this from a jupyter notebook and initializing my spark client as such
spark = (
SparkSession
.builder
.master("spark://<removed by me>:9077")
.appName("spark_experiment_mark_21")
.config("spark.executor.memory", "16g")
.config("spark.dynamicAllocation.initialExecutors", 5)
.config("spark.executor.cores", 4)
.config("spark.driver.memory", "30g")
.config("spark.memory.offHeap.enabled", True)
.config("spark.memory.offHeap.size","10g")
.getOrCreate()
)
I believe most of the values are taking, I can see them in the spark configuration, also I set the following environment
SPARK_WORKER_CORES=20
SPARK_WORKER_MEMORY=80g
and the following spark properties (perhaps redundantly
spark.dynamicAllocation.initialExecutors 5
spark.executor.memory 16g
spark.executor.cores 4
spark.memory.offHeap.enabled true
spark.memory.offHeap.size 20g
the spark.memory.offHeap.size
I set in the spark session object is overriding the one in the properties file
2
Answers
Merry XMAS. That said the question does not make sense, and although I get your point, the reality is:
reduce
is serialUnionByName
is also a a serial operation in terms of (py)Spark.You would need to roll your own code/logic, somehow.
Spark distributes tasks at the partition level, but the way you’ve implemented the merging process may inherently limit parallelism.
you can do
Repartition Before Merging
Use unionByName in a Parallel-Friendly Way