skip to Main Content

I want to access Parquet files on an Azure data lake, and only retrieve some rows.

Here is a reproducible example, using a public dataset:

import pyarrow.dataset as ds
from adlfs import AzureBlobFileSystem

abfs_public = AzureBlobFileSystem(
    account_name="azureopendatastorage")

dataset_public = ds.dataset('az://nyctlc/yellow/puYear=2010/puMonth=1/part-00000-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426339-18.c000.snappy.parquet', filesystem=abfs_public)

The processing time is the same for collecting 5 rows compared to collecting the full dataset. Is there a way to achieve slice pushdown using Pyarrow?

Here are my tests:

dataset_public.to_table()
# 5min 30s

dataset_public.head(5)
# 5min 11s

dataset_public.scanner().head(5)
# 5min 43s

I’m not sure if there is a difference between .head() and .scanner().head()

Related pages:

2

Answers


  1. It took more than 5 minutes 6:57 mins for me to load the public data set with slice pushdown refer below:-

    import pyarrow.dataset as ds
    
    from adlfs import AzureBlobFileSystem
    
      
    
    abfs_public = AzureBlobFileSystem(
    
    account_name="azureopendatastorage")
    
      
    
    dataset_public = ds.dataset('az://nyctlc/yellow/puYear=2010/puMonth=1/part-00000-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426339-18.c000.snappy.parquet', filesystem=abfs_public)
    
      
    
    scanner = dataset_public.scanner()
    
    table = scanner.to_table()
    
    subset_table = table.slice(0, 5)
    
    print(subset_table)
    

    Output :-

    enter image description here

    enter image description here

    It appears that gathering 5 rows of data takes the same amount of time as gathering the entire dataset. Because, The pyarrow.dataset module does not include slice pushdown method, the full dataset is first loaded into memory before any rows are filtered.

    As a workaround, You can make use of Pyspark that processed the result faster refer below:-

    Code :-

    # Azure storage access info
    blob_account_name = "azureopendatastorage"
    blob_container_name = "nyctlc"
    blob_relative_path = "yellow"
    blob_sas_token = "r"
    
    # Allow SPARK to read from Blob remotely
    wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
    spark.conf.set(
      'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
      blob_sas_token)
    print('Remote blob path: ' + wasbs_path)
    
    # SPARK read parquet, note that it won't load any data yet by now
    df = spark.read.parquet(wasbs_path)
    print('Register the DataFrame as a SQL temporary view: source')
    df.createOrReplaceTempView('source')
    
    # Display top 10 rows
    print('Displaying top 10 rows: ')
    display(spark.sql('SELECT * FROM source LIMIT 10'))
    

    Output:-

    enter image description here

    Reference :-

    azure-docs/dataset-taxi-yellow.md at main · MicrosoftDocs/azure-docs · GitHub

    Login or Signup to reply.
  2. With a few tweaks, I think I got what you were looking for. First, let’s look at the original code you posted:

    import pyarrow.dataset as ds
    from adlfs import AzureBlobFileSystem
    
    abfs_public = AzureBlobFileSystem(
        account_name="azureopendatastorage")
    
    dataset_public = ds.dataset('az://nyctlc/yellow/puYear=2010/puMonth=1/part-00000-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426339-18.c000.snappy.parquet', filesystem=abfs_public)
    

    Looking at the path you provided, you’re pointing it a single file instead of the whole dataset. Adding a couple tweaks:

    import pyarrow.dataset as ds
    from adlfs import AzureBlobFileSystem
    
    abfs_public = AzureBlobFileSystem(account_name="azureopendatastorage")
    dataset_public = ds.dataset('nyctlc/yellow/', filesystem=abfs_public, partitioning='hive')
    

    Now, using dataset_public.head(5) I get:

    Screenshot VSCode Jupyter

    Since I didn’t give it a sort order, it just grabbed the first 5 rows it could get from whatever file happened to be the first fragment (most likely).

    In your original code example, the path you gave was using puYear=2010 and puMonth=1, so we can use those. Because we told it to use hive partitioning, we can confirm that it picked up that the dataset is partitioned on these values:

    print(dataset_public.partitioning.schema)
    # prints:
    # puYear: int32
    # puMonth: int32
    # -- schema metadata --
    # org.apache.spark.sql.parquet.row.metadata: '{"type":"struct","fields":[{"' + 1456
    

    And if we get the first 5 rows using those fields as a filter:

    get filtered rows

    So that took 1min and 31s. But we can do better than that!

    batch_size = 5

    W00t! 1.12s

    See, the default batch_size is pretty large, I forget what it is offhand right now. But if you only want to grab a small number of rows you can adjust the batch_size and fragment readahead size etc. To better fit your use case.

    If you look at the base API documentation for the head() method, it has **kwargs that says "see scanner() method for full parameter description". And if you go to the scanner() method, it points you here: https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_dataset where you can see all of the available parameters to the method. Like getting only a subset of columns (very efficient because Parquet):

    column filter

    I hope this helps you understand better how to leverage the dataset APIs and the rough edges / tricks to improving performance.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search