I have a parquet datalake on S3 that I wish to query from.
To optimize performance, I aim to compact my files periodically using below script:
import boto3
import datetime
import math
from awsglue.utils import getResolvedOptions
import sys
args = getResolvedOptions(
sys.argv, ["bucket_output", "bucket_queries", "db_name", "table_prefix", "compacted_file_size_mb"]
)
bucket_output = args["bucket_output"]
bucket_queries = args["bucket_queries"]
db_name = args["db_name"]
table_prefix = args["table_prefix"]
compacted_file_size_mb = int(args["compacted_file_size_mb"])
table_suffix_temp = "_temporarytable"
# general function for running athena queries
def run_athena_query(query, database):
client = boto3.client("athena")
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={"Database": database},
ResultConfiguration={
"OutputLocation": f"s3://{bucket_queries}/" # Replace with your Athena query results location
},
)
return response["QueryExecutionId"]
# general function for awaiting result of athena query
def wait_for_query_completion(query_execution_id):
client = boto3.client("athena")
while True:
response = client.get_query_execution(QueryExecutionId=query_execution_id)
status = response["QueryExecution"]["Status"]["State"]
if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
break
return status
# function for running athena CTAS query to compact parquet tables
# The CTAS creates a temporary compacted table in a new s3 location with suffix table_suffix_temp
def compact_table(device, table_excl_prefix, database, bucketed_by_column, bucket_count):
query = f"""
CREATE TABLE {database}.{table_prefix}{table_excl_prefix}{table_suffix_temp}
WITH (
format = 'PARQUET',
parquet_compression = 'SNAPPY',
bucketed_by = ARRAY['{bucketed_by_column}'],
bucket_count = {bucket_count},
external_location = 's3://{bucket_output}/{device}/{table_excl_prefix}{table_suffix_temp}/'
) AS
SELECT * FROM {database}.{table_prefix}{table_excl_prefix};
"""
query_execution_id = run_athena_query(query, database)
status = wait_for_query_completion(query_execution_id)
if status != "SUCCEEDED":
return status
return "SUCCEEDED"
# move the CTAS compacted S3 objects into the original S3 table path
def move_s3_temp_table(bucket, device, table_excl_prefix):
for obj in bucket.objects.filter(Prefix=f"{device}/{table_excl_prefix}{table_suffix_temp}/"):
new_key = obj.key.replace(table_suffix_temp, "") + ".parquet"
copy_source = {"Bucket": bucket_output, "Key": obj.key}
bucket.copy(copy_source, new_key)
obj.delete()
print(
f"Success: Moved all s3 objects from {device}/{table_excl_prefix}{table_suffix_temp} to {device}/{table_excl_prefix}"
)
# delete s3 objects in the original table folder once CTAS query is done (if object is part of CTAS)
def delete_s3_table(bucket, device, table_excl_prefix, ctas_start_time):
table_path = bucket.objects.filter(Prefix=f"{device}/{table_excl_prefix}/")
for obj in table_path:
if obj.last_modified < ctas_start_time:
obj.delete()
print(f"Success: Deleted all s3 objects created before {ctas_start_time} in {device}/{table_excl_prefix}")
# get glue metadata for use in CTAS query
def get_table_meta(glue, database, device, table_excl_prefix):
response = glue.get_table(DatabaseName=database, Name=f"{table_prefix}{table_excl_prefix}")
# Extract the name of the second column
second_column = response["Table"]["StorageDescriptor"]["Columns"][1]["Name"]
size_in_bytes = int(response["Table"]["Parameters"]["sizeKey"])
size_in_mb = size_in_bytes / 1000000
return second_column, size_in_mb
def table_exists_in_glue(glue, database, table_excl_prefix):
try:
glue.get_table(DatabaseName=database, Name=table_excl_prefix)
return True
except glue.exceptions.EntityNotFoundException:
return False
def main():
# initialize s3 (boto3) and glue
s3_client = boto3.client("s3")
s3 = boto3.resource("s3")
bucket = s3.Bucket(bucket_output)
glue = boto3.client("glue")
# list device folders
response = s3_client.list_objects_v2(Bucket=bucket_output, Delimiter="/")
devices = [cp["Prefix"].rstrip("/") for cp in response["CommonPrefixes"]]
print(f"nThe s3 bucket {bucket_output} contains the following devices: ", devices)
for device in devices:
# list all tables in device folder
table_response = s3_client.list_objects_v2(Bucket=bucket_output, Prefix=device + "/", Delimiter="/")
tables_excl_prefix = [
cp["Prefix"].split(device + "/")[-1].rstrip("/") for cp in table_response["CommonPrefixes"]
]
for table_excl_prefix in tables_excl_prefix:
print(f"nNow processing table {table_prefix}{table_excl_prefix}")
# skip tables that are not mapped by glue crawler
if not table_exists_in_glue(glue, db_name, f"{table_prefix}{table_excl_prefix}"):
print(
f"Warning: Table {table_prefix}{table_excl_prefix} does not exist in Glue Data Catalog. Skipping"
)
continue
# extract meta data from the table for use in CTAS query
second_column, size_in_mb = get_table_meta(glue, db_name, device, table_excl_prefix)
bucketed_by_column = second_column
bucket_count = math.ceil(size_in_mb / compacted_file_size_mb)
print(f"Calculating bucket_count as math.ceil({size_in_mb} / {compacted_file_size_mb}) = {bucket_count}")
# get current time, then run CTAS compaction query
ctas_start_time = datetime.datetime.now(datetime.timezone.utc)
status = compact_table(device, table_excl_prefix, db_name, bucketed_by_column, bucket_count)
print("CTAS query status: ", status)
if status == "SUCCEEDED":
# copy CTAS output from temp S3 path to original table path, delete all pre query S3 objects and delete the temp CTAS table
move_s3_temp_table(bucket, device, table_excl_prefix)
delete_s3_table(bucket, device, table_excl_prefix, ctas_start_time)
glue.delete_table(DatabaseName=db_name, Name=f"{table_prefix}{table_excl_prefix}{table_suffix_temp}")
else:
# if CTAS query fails, no further action is taken for this table
print(f"Error compacting {device}/{table_excl_prefix}")
main()
This works as intended and compacts the parquet files, though it also changes the format and some other properties vs. my original parquet files.
However, I am observing an unexpected behavior.
When I run the below query on the original data, I only scan 12.9 MB of the total ~250 MB. This is because I’m only extracting data for 5 min of one column.
However, if I run the same query after compacting the data using the script above, the scanned MB is 162.8 MB, as if all data is being scanned to produce the same result. Why is this?
SELECT
AVG(a1) as AVG_a1
FROM
tbl_11111111_a_allfiles
WHERE
t BETWEEN TIMESTAMP '2000-01-09 17:58:17' AND TIMESTAMP '2000-01-09 18:02:24'
Note that my data is non-partitioned in both cases (before/after compaction). Note also that before compaction I had 5 x 54.3 MB files of my ‘original’ parquet structure, and after I have 14 x 15.8 MB files. The actual query results have not changed between the two query runs. I have also verified that the behavior persists after re-running my AWS Glue crawler.
Note also that I am bucketing based on one of my signal columns (could e.g. be vehiclespeed) in the CTAS query – wheras my filtering uses timestamps. I guess perhaps the issue here is that I’m "loosing" the ability to prune row groups because my compacted data is no longer optimally grouped/sorted by the timestamp column. But is there a way to chunk the data into compacted files, while maintaining the timestamp column as my primary filter?
2
Answers
We ended up not using the AWS Athena CTAS compaction as it does not seem to be a good fit for our type of use case (time series data), where the ideal solution would instead be to create row groups of sorted data by time. We instead optimized our structure via date style partition projections as described here
Bucketing as you mentioned will organize the files based on your
bucketed by
column. If this is something likevehicle speed
, then potentially many of your resulting files satisfy your timestamp filter ofBETWEEN TIMESTAMP '2000-01-09 17:58:17' AND TIMESTAMP '2000-01-09 18:02:24'
. This causes more MB being scanned.I assume you have a good reason for bucketing by
vehicle speed
and you want to keep bucketing using this column. Then, one way is partitioning based on dates.Else, not bucketing as you have observed already improved the MB scanned. Because parquet is column-oriented, it is optimized for column filters, even if everything is grouped in 1 file.