skip to Main Content

This has been driving me a little crazy so any help is greatly appreciated

I have a list of dataframes df_list with maybe around 500 small dataframes (I ingested csvs and wrote them as parquets, then I read each one as a spark df and appent to list)

and I’m trying to merge them like this

merged_df = reduce(lambda left, right: left.unionByName(right, allowMissingColumns=True), df_list)

the problem is this is only using a single core, and is bottlenecked, I don’t understand why my other cores aren’t being activated (I’m viewing htop)

I also tried this

df_list = [df.repartition(16) for df in df_list]  # Adjust the number based on your cluster size
merged_df = df_list[0]
for df in df_list[1:]:
    merged_df = merged_df.unionByName(df, allowMissingColumns=True)

no luck. How can I get more core activation? How can I get spark to distribute this merge accross multiple cores/executors? Does this question even make sense and if not, what am I fundamentally misunderstanding here?

I have a 128 gb memory, 32 core ec2 instance.

Below is the environment, basically I have a spark master and 1 worker (with 5 executors, each 4 core, 16 gb ram) running on a single ubuntu machine, I’m running this from a jupyter notebook and initializing my spark client as such

spark = (
    SparkSession
    .builder
    .master("spark://<removed by me>:9077")
    .appName("spark_experiment_mark_21")
    .config("spark.executor.memory", "16g")
    .config("spark.dynamicAllocation.initialExecutors", 5)
    .config("spark.executor.cores", 4)
    .config("spark.driver.memory", "30g")
    .config("spark.memory.offHeap.enabled", True)
    .config("spark.memory.offHeap.size","10g")
    .getOrCreate()
)

I believe most of the values are taking, I can see them in the spark configuration, also I set the following environment

SPARK_WORKER_CORES=20
SPARK_WORKER_MEMORY=80g

and the following spark properties (perhaps redundantly

spark.dynamicAllocation.initialExecutors        5
spark.executor.memory                           16g
spark.executor.cores                            4
spark.memory.offHeap.enabled                    true
spark.memory.offHeap.size                       20g

the spark.memory.offHeap.size I set in the spark session object is overriding the one in the properties file

2

Answers


  1. Merry XMAS. That said the question does not make sense, and although I get your point, the reality is:

    1. reduce is serial
    2. UnionByName is also a a serial operation in terms of (py)Spark.

    You would need to roll your own code/logic, somehow.

    Login or Signup to reply.
  2. Spark distributes tasks at the partition level, but the way you’ve implemented the merging process may inherently limit parallelism.

    you can do

    1. Repartition Before Merging

    2. Use unionByName in a Parallel-Friendly Way

      def parallel_union(dataframes):
      
       while len(dataframes) > 1:
           # Pairwise union of DataFrames
           dataframes = [
               dataframes[i].unionByName(dataframes[i + 1], allowMissingColumns=True)
               for i in range(0, len(dataframes) - 1, 2)
           ] + ([dataframes[-1]] if len(dataframes) % 2 == 1 else [])
       return dataframes[0]
      
      merged_df = parallel_union(df_list)
      
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search