I am using Athena to query an S3 bucket and using Athena partition projection as well.
Here is my bucket structure: bucket-name/node=[A-Z0-9]{4}/date={yyyy}-{MM}-{dd}/{uuid}.parquet
e.g: bucket-name/node=NODE/date=2023-10-10/123.parquet
Glue table schema:
[
{
"Name": "data",
"Type": "string"
},
{
"Name": "effective_time_start",
"Type": "bigint"
},
{
"Name": "node",
"Type": "string",
"PartitionKey": "Partition (0)"
},
{
"Name": "date",
"Type": "date",
"PartitionKey": "Partition (1)"
}
]
Glue table properties:
has_encrypted_data: false
projection.date.type: date
projection.date.format: yyyy-MM-dd
projection.date.range: 2023-01-01,2024-12-31
projection.node.type: injected
classification: parquet:
projection.enabled: true
Glue table details:
Input format: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
Output format: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
Serialization lib: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
And I have following files in S3:
bucket-name/node=NOD1/date=2023-10-10/FILE1.parquet
bucket-name/node=NOD1/date=2023-10-10/FILE2.parquet
bucket-name/node=NOD1/date=2023-10-11/FILE3.parquet
bucket-name/node=NOD1/date=2023-10-11/FILE4.parquet
bucket-name/node=NOD1/date=2023-10-12/FILE5.parquet
...
With this setup:
Firstly, I use node, date and effective_time_start in query:
SELECT "$path" AS s3url
FROM "my_table"
WHERE node = 'NOD1'
AND date >= DATE('2023-10-10')
AND date <= DATE('2023-10-11')
AND effective_time_start >= 1696910400000
Explain Analyze result:
Query Plan
Queued: 871.91us, Analysis: 140.94ms, Planning: 97.68ms, Execution: 401.73ms
Fragment 1 [SOURCE]
CPU: 60.47ms, Scheduled: 849.85ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 18 rows (1.42kB), Data Scanned: 348B; per task: avg.: 18.00 std.dev.: 0.00, Output: 18 rows (1.90kB)
Output layout: [$path]
Output partitioning: SINGLE []
ScanFilterProject[table = awsdatacatalog:db:my_table, filterPredicate = ("effective_time_start" >= 1696910400000), projectLocality = LOCAL]
Layout: [$path:varchar]
CPU: 60.00ms (100.00%), Scheduled: 849.00ms (100.00%), Blocked: 0.00ns (?%), Output: 18 rows (1.90kB)
Input avg.: 3.00 rows, Input std.dev.: 0.00%
effective_time_start := effective_time_start:bigint:REGULAR
$path := $path:string:SYNTHESIZED
date:date:PARTITION_KEY
:: [[2023-10-10], [2023-10-11], [2023-10-12], [2023-10-13], [2023-10-14], [2023-10-15]]
node:string:PARTITION_KEY
:: [[NOD1]]
Input: 18 rows (1.42kB), Filtered: 0.00%, Physical input: 348B, Physical input time: 827990000.00ns
From S3 logs, I see that:
- 1 LIST request per date partition, so in total 2 requests — This is expected and normal
- 4 GET request per file in queried interval, so in total 16 requests — Didn’t understand why it is requesting same file 4 times
Then, I try to query with only node and date
SELECT "$path" AS s3url
FROM "my_table"
WHERE node = 'NOD1'
AND date >= DATE('2023-10-10')
AND date <= DATE('2023-10-11')
Explain analyze result:
Query Plan
Queued: 516.34us, Analysis: 157.75ms, Planning: 87.13ms, Execution: 327.21ms
Fragment 1 [SOURCE]
CPU: 43.43ms, Scheduled: 589.93ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 18 rows (1.27kB), Data Scanned: 0B; per task: avg.: 18.00 std.dev.: 0.00, Output: 18 rows (1.27kB)
Output layout: [$path]
Output partitioning: SINGLE []
TableScan[table = awsdatacatalog:db:my_table]
Layout: [$path:varchar]
CPU: 43.00ms (100.00%), Scheduled: 589.00ms (100.00%), Blocked: 0.00ns (?%), Output: 18 rows (1.27kB)
Input avg.: 3.00 rows, Input std.dev.: 0.00%
$path := $path:string:SYNTHESIZED
node:string:PARTITION_KEY
:: [[NOD1]]
date:date:PARTITION_KEY
:: [[2023-10-10], [2023-10-11], [2023-10-12], [2023-10-13], [2023-10-14], [2023-10-15]]
Input: 18 rows (1.27kB), Physical input time: 571970000.00ns
From S3 logs, I see that:
- 1 LIST request per date partition, so in total 2 requests
- 1 GET request per file in queried interval, so in total 4 requests
So, my question is why Athena is making additional calls to same file when I use a field that not part of partitioning?
2
Answers
Apparently, this is how parquet works. Athena is making separate calls to read footer, column data and separate calls to each partition if the file is large. (not table partition, the file partition).
I have checked the S3 logs and see that each GET request read different size of bytes.
Also, I have setup the same table for CSV file and run the same queries and see that it is only doing one GET per file.
Without seeing the Execution Plans (which you can get by running explain on the query) I can only give you an educated guess.
Athena (or Trino in the background) runs on a cluster and tries to distribute the operation across the cluster in order to give you a faster response. Depending on the size of the parquet files, the query planer could have decided to assign the row groups of each file to different nodes which then perform the filtering operation (
effective_time_start >= 1696910400000
) in parallel and aggregate the results in the end. This would result in the cluster nodes reading the files multiple times.