I want to access Parquet files on an Azure data lake, and only retrieve some rows.
Here is a reproducible example, using a public dataset:
import pyarrow.dataset as ds
from adlfs import AzureBlobFileSystem
abfs_public = AzureBlobFileSystem(
account_name="azureopendatastorage")
dataset_public = ds.dataset('az://nyctlc/yellow/puYear=2010/puMonth=1/part-00000-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426339-18.c000.snappy.parquet', filesystem=abfs_public)
The processing time is the same for collecting 5 rows compared to collecting the full dataset. Is there a way to achieve slice pushdown using Pyarrow?
Here are my tests:
dataset_public.to_table()
# 5min 30s
dataset_public.head(5)
# 5min 11s
dataset_public.scanner().head(5)
# 5min 43s
I’m not sure if there is a difference between .head()
and .scanner().head()
Related pages:
- Apache Arrow website: https://arrow.apache.org/docs/python/parquet.html#reading-from-cloud-storage
- ADLFS Github page: https://github.com/fsspec/adlfs
2
Answers
It took more than 5 minutes 6:57 mins for me to load the public data set with slice pushdown refer below:-
Output :-
It appears that gathering 5 rows of data takes the same amount of time as gathering the entire dataset. Because, The pyarrow.dataset module does not include slice pushdown method, the full dataset is first loaded into memory before any rows are filtered.
As a workaround, You can make use of Pyspark that processed the result faster refer below:-
Code :-
Output:-
Reference :-
azure-docs/dataset-taxi-yellow.md at main · MicrosoftDocs/azure-docs · GitHub
With a few tweaks, I think I got what you were looking for. First, let’s look at the original code you posted:
Looking at the path you provided, you’re pointing it a single file instead of the whole dataset. Adding a couple tweaks:
Now, using
dataset_public.head(5)
I get:Since I didn’t give it a sort order, it just grabbed the first 5 rows it could get from whatever file happened to be the first fragment (most likely).
In your original code example, the path you gave was using
puYear=2010
andpuMonth=1
, so we can use those. Because we told it to usehive
partitioning, we can confirm that it picked up that the dataset is partitioned on these values:And if we get the first 5 rows using those fields as a filter:
So that took 1min and 31s. But we can do better than that!
W00t! 1.12s
See, the default batch_size is pretty large, I forget what it is offhand right now. But if you only want to grab a small number of rows you can adjust the batch_size and fragment readahead size etc. To better fit your use case.
If you look at the base API documentation for the
head()
method, it has**kwargs
that says "see scanner() method for full parameter description". And if you go to thescanner()
method, it points you here: https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_dataset where you can see all of the available parameters to the method. Like getting only a subset of columns (very efficient because Parquet):I hope this helps you understand better how to leverage the dataset APIs and the rough edges / tricks to improving performance.