I am trying to read the data from RDS Postgres
via PySpark
3.3
and AWS Glue 5.0
versions using the below command.
df = (
self.config.spark_details.spark.read.format("jdbc")
.option(
"url",
f"jdbc:postgresql://{self.postgres_host}:{self.postgres_port}/{self.postgres_database}",
)
.option("driver", "org.postgresql.Driver")
.option("user", self.postgres_username)
.option("password", self.postgres_password)
.option("query", query)
.load()
)
Now, I want to write this data to S3
. For that, I tried the snippet below:
final_df.write.partitionBy("year","month","day").mode("append").parquet(s3_path)
But this gave me an executor heartbeat error
. Later, after debugging, I learned that this could be because the data was not repartitioned, so I added repartition(10000)
while writing the data. This seemed to work, but the job didn’t finish even after 5 hours. No error so I had to stop the job.
After debugging further, I discovered that when data is loaded from the DB, it is loaded in a single partition. So, no matter how many executors I add, they will be of no use.
There are no transformations. I am just supposed to read and write the data in a partitioned way. Data size would be less than 100 GB
. I am using the Glue 5.0 version with 3 workers(12 DPUS)
of the G4.X worker
type.
The same code worked for 38 M
records but caused problems for 59 M
records. I also went through the driver and executor logs and I see something like below:
I am failing to understand. Since there is just 1 partition in which the data is loaded, how come I can see multiple partitions being processed in the logs?
What am I missing here? Any hints would be appreciated. Even now the job has been running for 2 hrs but hasn’t finished yet.
TIA
2
Answers
The JDBC reader makes Spark reads everything in a single query and put it into a single partition, because Spark can’t read the same query in all executors, since Spark doesn’t know how to distribute the data from your query.
You need to set in the reader for JDBC options:
Now Spark knows how to split your query into the executors. If you don’t have a column to partition, you can create one by using
ROW_NUMBER()
in, for example, a table.Edit:
Since you are passing a query parameter to the DataFrameReader API, the config above will not work, because spark will treat as a standalone query.
There are 2 solutions for this approach if you want to maintain a custom query param
df.repartition
When we attempt to configure both the
query
andpartitionColumn
options simultaneously, Spark throws the following error:Options
query
andpartitionColumn
cannot be specified together. Please define the query using thedbtable
option instead and make sure to qualify the partition columns using the supplied subquery alias to resolve any ambiguity. Example:This error gives us a crucial hint: the
dbtable
option can be used to define a query that generates the column required for data partitioning. By embedding the query we want to use directly into thedbtable
option, Spark can process the data in parallel across multiple partitions.Another example:
This configuration works perfectly fine, enabling Spark to read the data concurrently using 16 partitions. Then, you can write while partitioning by whichever criteria you require.