I am using pyspark to join two tables with 100k rows for each (so not skewed join). It takes longer than 30mins even an hour which I think something is wrong here. The code is just regular join
a = b.join(c, b.id == c.id, "inner").drop(c.id)
I did a lot search and try, including:
- use more cores in cluster
- replace larger instances in each node
- set
spark.sql.adaptive.enabled=true
- repartition etc
Neither works.
My question is: if both tables (pyspark.sql.dataframe
object) came from using udf, does it matter? This is the only difference compared with common use.
I used the following udf logic to prepare the tables:
def func(row):
id = row.split(",")[0]
f1, f2 = ", ".join(row.split(",")[1:-1]), int(row.split(",")[-1])
return (int(id), f1, f2)
func_udf = udf(func,
StructType([
StructField("id", IntegerType(), True),
StructField("f1", StringType(), True),
StructField("f2", IntegerType(), True)
]))
df = df.withColumn("Result", func_udf(col("data")))
df = df.drop(df.data).select("Result.*")
df
is the table used for join.
Any troubleshooting idea is appreciated. Thank you.
P.S. table b has 3 columns and table c has 6 columns. So they are not
wide. Also, if I shrink size to 10k, the join works as expected.
2
Answers
check Logical Plan about b join c
If all those changes that you tried before didn’t work, you can try to improve by using the Spark API but not using the UDF, as it will have data serialization and deserialization between Python runtime and JVM, and it affects the performance. In fact, your UDF logic can be changed to below: