skip to Main Content

I am trying to run a query to read data from redshift table through AWS Glue job. When it reads data from table, it creating dataframe with only 1 partition and it is taking lot of time to read the data from redshift table(reading data from redshift table is a complex query which has multiple joins)

        df = spark.read 
            .format("jdbc") 
            .option("url", url) 
            .option("query", query) 
            .option("numPartitions", 10) 
            .option("fetchsize", "10000") 
            .option("batchsize", "10000") 
            .load()

I used numPartitions as 10 still it is reading whole data in one partition. Not sure why it is not impacting to create 10 partitions while reading.

Can anyone help me to understand why option("numPartitions",10) is not effective and how we implement parallelism while reading data from redshift to improve the performance of jdbc read.

Also is this the best approach to read data from redshift by passing complex query to the spark.read.jdbc.

Really appreciate any help on this.

================================================
I tired with suggestion given by Bogdan but still it is not giving any improvement.

enter image description here

Its always running with 2 executors(1 is for driver and 1 is worker node). Really how do we improve read parallel. I tried different options as follows and i don’t see any improvement. I am looking for reading with 10 to 20 executors and any suggestions greatly appreciaetd.

DF_options  = {
                "connectionName": connection,
                "url": url,
                "sampleQuery": query,
                "user": user,
                "password": pwd,
                "redshiftTmpDir": "s3://bucket/temp/",
                "hashfield": "location",
                "hashexpression": "id",
                "hashpartitions": "5"

        }

        DF = glueContext.create_dynamic_frame_from_options(connection_type="redshift",
                                                                      connection_options=DF_options ,
                                                                      transformation_ctx="DF")
                                                              

Thanks,
Bab

2

Answers


  1. This worked for me very well on redshift queries :

    print(f"Executing SQL query = {sql_query}")
    
    # Script generated for node Amazon Redshift
    AmazonRedshift_node1 = glueContext.create_dynamic_frame.from_options(
        connection_type="redshift",
        connection_options={
            "sampleQuery": sql_query ,
            "redshiftTmpDir": "s3://aws-glue-assets-***/temporary/",
            "useConnectionProperties": "true",
            "connectionName": "your-connection-name",
            "aws_iam_user": "arn:aws:iam::123456789:role/yourGlueRole"
        },
        transformation_ctx="AmazonRedshift_node1",)
    
    print("Size of DynamicFrame: {}".format(AmazonRedshift_node1.toDF().count()))
    
    Login or Signup to reply.
  2. To read a JDBC source in parallel you need to define partitionColumn, lowerBound, upperBound, check documentation. Spark will use this configuration to split the original query in numPartitions queries.

    The query will be very roughly splitted like below and Spark can send the separated queries in parallel, if you have enough executors/cores/slots.

    -- Original query:
    SELECT *
    FROM my_table
    
    -- Queries with:
    --  partitionColumn = 'my_date'
    --  lowerBound = '2021-01-01'
    --  upperBound = '2021-01-02'
    --  numPartitions = 3
    
    SELECT *
    FROM my_table
    WHERE my_date < '2021-01-01'
    
    SELECT *
    FROM my_table
    WHERE my_date >= '2021-01-01'
      AND my_date < '2021-01-02'
    
    SELECT *
    FROM my_table
    WHERE my_date >= '2021-01-02'
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search