skip to Main Content

I am using Athena to query an S3 bucket and using Athena partition projection as well.

Here is my bucket structure: bucket-name/node=[A-Z0-9]{4}/date={yyyy}-{MM}-{dd}/{uuid}.parquet

e.g: bucket-name/node=NODE/date=2023-10-10/123.parquet

Glue table schema:

[
  {
    "Name": "data",
    "Type": "string"
  },
  {
    "Name": "effective_time_start",
    "Type": "bigint"
  },
  {
    "Name": "node",
    "Type": "string",
    "PartitionKey": "Partition (0)"
  },
  {
    "Name": "date",
    "Type": "date",
    "PartitionKey": "Partition (1)"
  }
]

Glue table properties:

has_encrypted_data: false
projection.date.type: date
projection.date.format: yyyy-MM-dd
projection.date.range: 2023-01-01,2024-12-31
projection.node.type: injected
classification: parquet:
projection.enabled: true

Glue table details:

Input format: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
Output format: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
Serialization lib: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe

And I have following files in S3:

bucket-name/node=NOD1/date=2023-10-10/FILE1.parquet
bucket-name/node=NOD1/date=2023-10-10/FILE2.parquet
bucket-name/node=NOD1/date=2023-10-11/FILE3.parquet
bucket-name/node=NOD1/date=2023-10-11/FILE4.parquet
bucket-name/node=NOD1/date=2023-10-12/FILE5.parquet
...

With this setup:
Firstly, I use node, date and effective_time_start in query:

SELECT "$path" AS s3url
FROM "my_table"
WHERE node = 'NOD1'
    AND date >= DATE('2023-10-10')
    AND date <= DATE('2023-10-11')
    AND effective_time_start >= 1696910400000

Explain Analyze result:

Query Plan
Queued: 871.91us, Analysis: 140.94ms, Planning: 97.68ms, Execution: 401.73ms
Fragment 1 [SOURCE]
    CPU: 60.47ms, Scheduled: 849.85ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 18 rows (1.42kB), Data Scanned: 348B; per task: avg.: 18.00 std.dev.: 0.00, Output: 18 rows (1.90kB)
    Output layout: [$path]
    Output partitioning: SINGLE []
    ScanFilterProject[table = awsdatacatalog:db:my_table, filterPredicate = ("effective_time_start" >= 1696910400000), projectLocality = LOCAL]
        Layout: [$path:varchar]
        CPU: 60.00ms (100.00%), Scheduled: 849.00ms (100.00%), Blocked: 0.00ns (?%), Output: 18 rows (1.90kB)
        Input avg.: 3.00 rows, Input std.dev.: 0.00%
        effective_time_start := effective_time_start:bigint:REGULAR
        $path := $path:string:SYNTHESIZED
        date:date:PARTITION_KEY
            :: [[2023-10-10], [2023-10-11], [2023-10-12], [2023-10-13], [2023-10-14], [2023-10-15]]
        node:string:PARTITION_KEY
            :: [[NOD1]]
        Input: 18 rows (1.42kB), Filtered: 0.00%, Physical input: 348B, Physical input time: 827990000.00ns

From S3 logs, I see that:

  • 1 LIST request per date partition, so in total 2 requests — This is expected and normal
  • 4 GET request per file in queried interval, so in total 16 requests — Didn’t understand why it is requesting same file 4 times

Then, I try to query with only node and date

SELECT "$path" AS s3url
FROM "my_table"
WHERE node = 'NOD1'
    AND date >= DATE('2023-10-10')
    AND date <= DATE('2023-10-11')

Explain analyze result:

Query Plan
Queued: 516.34us, Analysis: 157.75ms, Planning: 87.13ms, Execution: 327.21ms
Fragment 1 [SOURCE]
    CPU: 43.43ms, Scheduled: 589.93ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 18 rows (1.27kB), Data Scanned: 0B; per task: avg.: 18.00 std.dev.: 0.00, Output: 18 rows (1.27kB)
    Output layout: [$path]
    Output partitioning: SINGLE []
    TableScan[table = awsdatacatalog:db:my_table]
        Layout: [$path:varchar]
        CPU: 43.00ms (100.00%), Scheduled: 589.00ms (100.00%), Blocked: 0.00ns (?%), Output: 18 rows (1.27kB)
        Input avg.: 3.00 rows, Input std.dev.: 0.00%
        $path := $path:string:SYNTHESIZED
        node:string:PARTITION_KEY
            :: [[NOD1]]
        date:date:PARTITION_KEY
            :: [[2023-10-10], [2023-10-11], [2023-10-12], [2023-10-13], [2023-10-14], [2023-10-15]]
        Input: 18 rows (1.27kB), Physical input time: 571970000.00ns

From S3 logs, I see that:

  • 1 LIST request per date partition, so in total 2 requests
  • 1 GET request per file in queried interval, so in total 4 requests

So, my question is why Athena is making additional calls to same file when I use a field that not part of partitioning?

2

Answers


  1. Chosen as BEST ANSWER

    Apparently, this is how parquet works. Athena is making separate calls to read footer, column data and separate calls to each partition if the file is large. (not table partition, the file partition).

    I have checked the S3 logs and see that each GET request read different size of bytes.

    Also, I have setup the same table for CSV file and run the same queries and see that it is only doing one GET per file.


  2. Without seeing the Execution Plans (which you can get by running explain on the query) I can only give you an educated guess.

    Athena (or Trino in the background) runs on a cluster and tries to distribute the operation across the cluster in order to give you a faster response. Depending on the size of the parquet files, the query planer could have decided to assign the row groups of each file to different nodes which then perform the filtering operation (effective_time_start >= 1696910400000) in parallel and aggregate the results in the end. This would result in the cluster nodes reading the files multiple times.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search