skip to Main Content

I have a parquet datalake on S3 that I wish to query from.

To optimize performance, I aim to compact my files periodically using below script:

import boto3
import datetime
import math
from awsglue.utils import getResolvedOptions
import sys

args = getResolvedOptions(
    sys.argv, ["bucket_output", "bucket_queries", "db_name", "table_prefix", "compacted_file_size_mb"]
)

bucket_output = args["bucket_output"]
bucket_queries = args["bucket_queries"]
db_name = args["db_name"]
table_prefix = args["table_prefix"]
compacted_file_size_mb = int(args["compacted_file_size_mb"])

table_suffix_temp = "_temporarytable"


# general function for running athena queries
def run_athena_query(query, database):
    client = boto3.client("athena")
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={"Database": database},
        ResultConfiguration={
            "OutputLocation": f"s3://{bucket_queries}/"  # Replace with your Athena query results location
        },
    )
    return response["QueryExecutionId"]


# general function for awaiting result of athena query
def wait_for_query_completion(query_execution_id):
    client = boto3.client("athena")
    while True:
        response = client.get_query_execution(QueryExecutionId=query_execution_id)
        status = response["QueryExecution"]["Status"]["State"]
        if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
            break
    return status


# function for running athena CTAS query to compact parquet tables
# The CTAS creates a temporary compacted table in a new s3 location with suffix table_suffix_temp
def compact_table(device, table_excl_prefix, database, bucketed_by_column, bucket_count):
    query = f"""
    CREATE TABLE {database}.{table_prefix}{table_excl_prefix}{table_suffix_temp}
    WITH (
        format = 'PARQUET',
        parquet_compression = 'SNAPPY',
        bucketed_by = ARRAY['{bucketed_by_column}'], 
        bucket_count = {bucket_count},
        external_location = 's3://{bucket_output}/{device}/{table_excl_prefix}{table_suffix_temp}/'
    ) AS
    SELECT * FROM {database}.{table_prefix}{table_excl_prefix};
    """

    query_execution_id = run_athena_query(query, database)
    status = wait_for_query_completion(query_execution_id)
    if status != "SUCCEEDED":
        return status

    return "SUCCEEDED"


# move the CTAS compacted S3 objects into the original S3 table path
def move_s3_temp_table(bucket, device, table_excl_prefix):
    for obj in bucket.objects.filter(Prefix=f"{device}/{table_excl_prefix}{table_suffix_temp}/"):
        new_key = obj.key.replace(table_suffix_temp, "") + ".parquet"
        copy_source = {"Bucket": bucket_output, "Key": obj.key}
        bucket.copy(copy_source, new_key)
        obj.delete()

    print(
        f"Success: Moved all s3 objects from {device}/{table_excl_prefix}{table_suffix_temp} to {device}/{table_excl_prefix}"
    )


# delete s3 objects in the original table folder once CTAS query is done (if object is part of CTAS)
def delete_s3_table(bucket, device, table_excl_prefix, ctas_start_time):
    table_path = bucket.objects.filter(Prefix=f"{device}/{table_excl_prefix}/")
    for obj in table_path:
        if obj.last_modified < ctas_start_time:
            obj.delete()
    print(f"Success: Deleted all s3 objects created before {ctas_start_time} in {device}/{table_excl_prefix}")


# get glue metadata for use in CTAS query
def get_table_meta(glue, database, device, table_excl_prefix):
    response = glue.get_table(DatabaseName=database, Name=f"{table_prefix}{table_excl_prefix}")

    # Extract the name of the second column
    second_column = response["Table"]["StorageDescriptor"]["Columns"][1]["Name"]
    size_in_bytes = int(response["Table"]["Parameters"]["sizeKey"])
    size_in_mb = size_in_bytes / 1000000

    return second_column, size_in_mb


def table_exists_in_glue(glue, database, table_excl_prefix):
    try:
        glue.get_table(DatabaseName=database, Name=table_excl_prefix)
        return True
    except glue.exceptions.EntityNotFoundException:
        return False


def main():
    # initialize s3 (boto3) and glue
    s3_client = boto3.client("s3")
    s3 = boto3.resource("s3")
    bucket = s3.Bucket(bucket_output)
    glue = boto3.client("glue")

    # list device folders
    response = s3_client.list_objects_v2(Bucket=bucket_output, Delimiter="/")
    devices = [cp["Prefix"].rstrip("/") for cp in response["CommonPrefixes"]]
    print(f"nThe s3 bucket {bucket_output} contains the following devices: ", devices)

    for device in devices:
        # list all tables in device folder
        table_response = s3_client.list_objects_v2(Bucket=bucket_output, Prefix=device + "/", Delimiter="/")
        tables_excl_prefix = [
            cp["Prefix"].split(device + "/")[-1].rstrip("/") for cp in table_response["CommonPrefixes"]
        ]

        for table_excl_prefix in tables_excl_prefix:
            print(f"nNow processing table {table_prefix}{table_excl_prefix}")

            # skip tables that are not mapped by glue crawler
            if not table_exists_in_glue(glue, db_name, f"{table_prefix}{table_excl_prefix}"):
                print(
                    f"Warning: Table {table_prefix}{table_excl_prefix} does not exist in Glue Data Catalog. Skipping"
                )
                continue

            # extract meta data from the table for use in CTAS query
            second_column, size_in_mb = get_table_meta(glue, db_name, device, table_excl_prefix)
            bucketed_by_column = second_column
            
            bucket_count = math.ceil(size_in_mb / compacted_file_size_mb)
            print(f"Calculating bucket_count as math.ceil({size_in_mb} / {compacted_file_size_mb}) = {bucket_count}")

            # get current time, then run CTAS compaction query
            ctas_start_time = datetime.datetime.now(datetime.timezone.utc)
            status = compact_table(device, table_excl_prefix, db_name, bucketed_by_column, bucket_count)
            print("CTAS query status: ", status)

            if status == "SUCCEEDED":
                # copy CTAS output from temp S3 path to original table path, delete all pre query S3 objects and delete the temp CTAS table
                move_s3_temp_table(bucket, device, table_excl_prefix)
                delete_s3_table(bucket, device, table_excl_prefix, ctas_start_time)
                glue.delete_table(DatabaseName=db_name, Name=f"{table_prefix}{table_excl_prefix}{table_suffix_temp}")
            else:
                # if CTAS query fails, no further action is taken for this table
                print(f"Error compacting {device}/{table_excl_prefix}")


main()

This works as intended and compacts the parquet files, though it also changes the format and some other properties vs. my original parquet files.

However, I am observing an unexpected behavior.

When I run the below query on the original data, I only scan 12.9 MB of the total ~250 MB. This is because I’m only extracting data for 5 min of one column.

However, if I run the same query after compacting the data using the script above, the scanned MB is 162.8 MB, as if all data is being scanned to produce the same result. Why is this?

SELECT
    AVG(a1) as AVG_a1                 
FROM
    tbl_11111111_a_allfiles
WHERE
t BETWEEN TIMESTAMP '2000-01-09 17:58:17' AND TIMESTAMP '2000-01-09 18:02:24'

Note that my data is non-partitioned in both cases (before/after compaction). Note also that before compaction I had 5 x 54.3 MB files of my ‘original’ parquet structure, and after I have 14 x 15.8 MB files. The actual query results have not changed between the two query runs. I have also verified that the behavior persists after re-running my AWS Glue crawler.

Note also that I am bucketing based on one of my signal columns (could e.g. be vehiclespeed) in the CTAS query – wheras my filtering uses timestamps. I guess perhaps the issue here is that I’m "loosing" the ability to prune row groups because my compacted data is no longer optimally grouped/sorted by the timestamp column. But is there a way to chunk the data into compacted files, while maintaining the timestamp column as my primary filter?

2

Answers


  1. Chosen as BEST ANSWER

    We ended up not using the AWS Athena CTAS compaction as it does not seem to be a good fit for our type of use case (time series data), where the ideal solution would instead be to create row groups of sorted data by time. We instead optimized our structure via date style partition projections as described here


  2. Bucketing as you mentioned will organize the files based on your bucketed by column. If this is something like vehicle speed, then potentially many of your resulting files satisfy your timestamp filter of BETWEEN TIMESTAMP '2000-01-09 17:58:17' AND TIMESTAMP '2000-01-09 18:02:24'. This causes more MB being scanned.

    I assume you have a good reason for bucketing by vehicle speed and you want to keep bucketing using this column. Then, one way is partitioning based on dates.

    Else, not bucketing as you have observed already improved the MB scanned. Because parquet is column-oriented, it is optimized for column filters, even if everything is grouped in 1 file.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search