skip to Main Content

I am trying to read the data from RDS Postgres via PySpark 3.3 and AWS Glue 5.0 versions using the below command.

df = (
                self.config.spark_details.spark.read.format("jdbc")
                .option(
                    "url",
                    f"jdbc:postgresql://{self.postgres_host}:{self.postgres_port}/{self.postgres_database}",
                )
                .option("driver", "org.postgresql.Driver")
                .option("user", self.postgres_username)
                .option("password", self.postgres_password)
                .option("query", query)
                .load()
            )

Now, I want to write this data to S3. For that, I tried the snippet below:

final_df.write.partitionBy("year","month","day").mode("append").parquet(s3_path)

But this gave me an executor heartbeat error. Later, after debugging, I learned that this could be because the data was not repartitioned, so I added repartition(10000) while writing the data. This seemed to work, but the job didn’t finish even after 5 hours. No error so I had to stop the job.

After debugging further, I discovered that when data is loaded from the DB, it is loaded in a single partition. So, no matter how many executors I add, they will be of no use.
enter image description here

There are no transformations. I am just supposed to read and write the data in a partitioned way. Data size would be less than 100 GB. I am using the Glue 5.0 version with 3 workers(12 DPUS) of the G4.X worker type.

The same code worked for 38 M records but caused problems for 59 M records. I also went through the driver and executor logs and I see something like below:

enter image description here.
and this enter image description here.

I am failing to understand. Since there is just 1 partition in which the data is loaded, how come I can see multiple partitions being processed in the logs?

What am I missing here? Any hints would be appreciated. Even now the job has been running for 2 hrs but hasn’t finished yet.
TIA

2

Answers


  1. The JDBC reader makes Spark reads everything in a single query and put it into a single partition, because Spark can’t read the same query in all executors, since Spark doesn’t know how to distribute the data from your query.

    You need to set in the reader for JDBC options:

    .option("partitionColumn", "id") -> the column spark is going to divide()<br>
    .option("lowerBound", "1") -> the lowest possible value<br>
    .option("upperBound", "1000") -> the highest possible value<br>
    .option("numPartitions", "10") -> number of partitions you want
    

    Now Spark knows how to split your query into the executors. If you don’t have a column to partition, you can create one by using ROW_NUMBER() in, for example, a table.

    Edit:
    Since you are passing a query parameter to the DataFrameReader API, the config above will not work, because spark will treat as a standalone query.
    There are 2 solutions for this approach if you want to maintain a custom query param

    1. load your query and repartition the data with df.repartition
    2. create a list of multiple queries so you break them yourself using your own criteria, then generate a list of dataframes and union them all
    Login or Signup to reply.
  2. When we attempt to configure both the query and partitionColumn options simultaneously, Spark throws the following error:

    Options query and partitionColumn cannot be specified together. Please define the query using the dbtable option instead and make sure to qualify the partition columns using the supplied subquery alias to resolve any ambiguity. Example:

    spark.read.format("jdbc")
         .option("url", jdbcUrl)
         .option("dbtable", "(select c1, c2 from t1) as subq")
         .option("partitionColumn", "c1")
         .option("lowerBound", "1")
         .option("upperBound", "100")
         .option("numPartitions", "3")
         .load()
    

    This error gives us a crucial hint: the dbtable option can be used to define a query that generates the column required for data partitioning. By embedding the query we want to use directly into the dbtable option, Spark can process the data in parallel across multiple partitions.
    Another example:

    spark.read.format("jdbc")
         .option("url", jdbcUrl)
         .option("dbtable", "(select date, cast(day_of_month as int) from dates) as subq")
         .option("partitionColumn", "day_of_month")
         .option("lowerBound", "1")
         .option("upperBound", "31")
         .option("numPartitions", "16")
         .load()
    

    This configuration works perfectly fine, enabling Spark to read the data concurrently using 16 partitions. Then, you can write while partitioning by whichever criteria you require.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search