skip to Main Content

I’m working with a Delta table in Databricks that is partitioned by a Year column, and I have a timestamp column in the table. I want to optimize my queries to fetch data within a specific timestamp range (e.g., between 21-01-2019 and 04-12-2019).

df_filtered = df.filter((df.timestamp >= "2019-01-21") & (df.timestamp <= "2019-12-04"))

I understand that partitioning can help with performance, but I’m not sure how to leverage it effectively when querying based on a timestamp range. Even though I’m not directly querying the Year partition column, I’d like to take advantage of partition pruning to only read the relevant partitions.

How can I ensure that the Year partition column is correlated with the timestamp data effectively so that partition pruning works optimally for timestamp range queries?

2

Answers


  1. Documentation for Delta Table batch reads and writes mentions:

    Delta Lake may be able to generate partition filters for a query whenever a partition column is defined by one of the following expressions:

    • YEAR(col) and the type of col is TIMESTAMP.
    • (…)

    It means that if you have a table defined as:

    CREATE TABLE a_table (
        timestamp TIMESTAMP,
        year INT GENERATED ALWAYS AS year(timestamp),
        [other columns]
    ) PARTITIONED BY (year);
    

    then Databricks will be able to analyze filter conditions on timestamp column and derive proper partition filters for year. With your example:

    df = spark.table("a_table")
    df_filtered = df.filter((df.timestamp >= "2019-01-21") & (df.timestamp <= "2019-12-04"))
    

    It will add a partition filter year = 2019.

    Login or Signup to reply.
  2. To optimize your queries for timestamp range queries in Databricks with a Delta table partitioned by a Year column
    The below code with Year partition column is correlated with the timestamp data.

    Here is how you can do it with Pyspark.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, to_date, year
    spark = SparkSession.builder.appName("DeltaTableExample").getOrCreate()
    data = [("2020-01-15 08:00:00", "Event 1"),
            ("2020-02-20 12:30:00", "Event 2"),
            ("2021-03-10 14:45:00", "Event 3"),
            ("2019-11-05 09:15:00", "Event 4")]
    columns = ["timestamp", "event_name"]
    df = spark.createDataFrame(data, columns)
    **Converting Timestamps to Date and Creating Year Partitioning Column**
    df = df.withColumn("date", to_date(col("timestamp")))
    df = df.withColumn("year", year(col("date")))
    delta_path = "/FileStore/tables/dileep_delta_table"  
    df.write.partitionBy("year").format("delta").mode("overwrite").save(delta_path)
    start_date = "2019-01-21"
    end_date = "2020-12-04"
    start_year = int(start_date.split("-")[0])
    end_year = int(end_date.split("-")[0])
    **Reading the delta table and filtering the data based on the timestamp range and year**
    delta_df = spark.read.format("delta").load(delta_path)
    df_filtered = delta_df.filter(
        (col("date") >= start_date) & (col("date") <= end_date) &
        (col("year") >= start_year) & (col("year") <= end_year)
    )
    df_filtered.show()
    

    enter image description here

    In the above code converting the timestamp column to a date column and create a year column for partitioning.

    Write the dataFrame as a Delta table partitioned by the "year" column.
    Read the Delta table and filter the data within the specified timestamp range.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search