skip to Main Content

I am using pyspark to join two tables with 100k rows for each (so not skewed join). It takes longer than 30mins even an hour which I think something is wrong here. The code is just regular join

a = b.join(c, b.id == c.id, "inner").drop(c.id)

I did a lot search and try, including:

  • use more cores in cluster
  • replace larger instances in each node
  • set spark.sql.adaptive.enabled=true
  • repartition etc

Neither works.

My question is: if both tables (pyspark.sql.dataframe object) came from using udf, does it matter? This is the only difference compared with common use.

I used the following udf logic to prepare the tables:

def func(row):
    id = row.split(",")[0]
    f1, f2 = ", ".join(row.split(",")[1:-1]), int(row.split(",")[-1])
    return (int(id), f1, f2)

func_udf = udf(func, 
              StructType([
                StructField("id", IntegerType(), True),
                StructField("f1", StringType(), True),
                StructField("f2", IntegerType(), True)
               ]))
df = df.withColumn("Result", func_udf(col("data")))
df = df.drop(df.data).select("Result.*")

df is the table used for join.

Any troubleshooting idea is appreciated. Thank you.

P.S. table b has 3 columns and table c has 6 columns. So they are not
wide. Also, if I shrink size to 10k, the join works as expected.

2

Answers


  1. a.explain()
    

    check Logical Plan about b join c

    Login or Signup to reply.
  2. If all those changes that you tried before didn’t work, you can try to improve by using the Spark API but not using the UDF, as it will have data serialization and deserialization between Python runtime and JVM, and it affects the performance. In fact, your UDF logic can be changed to below:

    df = df.withColumn(
        "data_split", 
        func.split(func.col("data"), ",")
    ).withColumn(
        "result",
        func.struct(
            func.col("data_split").getItem(0).cast(IntegerType()).alias("id"),
            func.concat_ws(", ", func.slice(func.col("data_split"), 2, func.size(func.col("data_split"))-2)).cast(StringType()).alias("f1"),
            func.col("data_split").getItem(-1).cast(IntegerType()).alias("id").alias("f2"),
        )
    ).select(
        "result.*"
    )
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search