Using tensorflow 2.9, python 3.7.
My goal is to feed tf.data.dataset into tensorflow model while reducing memory consumption.
In S3 there are parquet files that I want to use and create tf.data.Dataset for training deep learning model on keras.
First I create a generator by iterating through parquet files:
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
def gen(parquet_paths, batch_size, x_features, label):
curr_row = 0
for pq_path in parquet_paths:
parquet_file = pq.ParquetDataset(pq_path, filesystem=fs)
df = parquet_file.read().to_pandas()
df = df[x_features+[label]].astype('float32')
len_df = len(df)
print(len_df)
while curr_row < len_df:
batch_df = df.iloc[curr_row:curr_row+batch_size, :]
X = batch_df[x_features].values
y = batch_df[label].values
curr_row += batch_size
yield (X, y)
Then the generator is wrapped via tf.data.Dataset.from_generator
def wrapper(parquet_paths, batch_size, x_features, label):
return tf.data.Dataset.from_generator(lambda: gen(parquet_paths, batch_size, x_features, label), (float32, float32))
train_ds = wrapper(train_pq_path, 512, x_features, label)
valid_ds = wrapper(val_pq_path, 99999, x_features, label)
model.fit(train_ds, validation_data=valid_ds, epochs=10)
I want to optimize and add shuffle functionality in my data pipeline.
What are some improvements I could do from here? On
Try # 1:
- downloading parquet file to local machine then iterating through it and yielding improves speed however I’m also constrained by amount of disk storage therefore it won’t be scalable method.
Try # 2:
- From tensorflow documentation it says using .from_generator() to create tf.data.dataset has limited portability and scalability since it must run in the same python process that created the generator and is subject to python GIL. I’ve tried to get rid of it but with memory constraints I cannot find a way to create tf.data.dataset other than from a generator.
Try # 3:
Leveraging tf.data.Dataset.interleave() to parallelize data extraction as I’m assuming network I/O is also one of the bottleneck.
Here is the changed code:
def read_pq_file(parquet_path):
pq_file = pd.ParquetDataset(parquet_path)
df pq_file.read().to_pandas()
X = df[x_features].astype('float32').values
y = df[label].astype('float32').values
yield (X, y)
def read_and_preprocess(parquet_path):
dataset = tf.data.Dataset.from_generator(
lambda: read_pq_file(parquet_path), (tf.float32, tf.float32))
# train_pq_path = [.... list of parquet file paths ..]
train_dataset_paths = tf.data.Dataset.from_tensor_slices(train_pq_path)
train_dataset = train_dataset_paths.interleave(read_and_preprocess, num_parallel_calls=2, cycle_length=2, deterministic=False).batch(batch_size)
model.fit(train_dataset)
However this outputs following error: InvalidArgumentError: TypeError: not a path-like object
2
Answers
It seems like the error is occurring because the interleave function is expecting a tf.data.Dataset object but is getting a string (file path) instead.
To resolve this, you could use the tf.data.Dataset.list_files function to create a dataset of file paths, and then use the interleave function to load and preprocess the data from those paths.
Here’s a possible solution that includes shuffling:
This code will:
Create a tf.data.Dataset of file paths.
Use interleave to load and preprocess the data from those paths. The lambda x: load_data(x) function will be applied to each element of the dataset (i.e., each file path).
Shuffle the data to add randomness, which is generally good for training deep learning models.
Batch the data into the specified batch size.
Remember to tune the shuffle buffer size and interleave parameters (cycle_length, num_parallel_calls) based on your system’s capabilities and the specifics of your data to optimize performance.
Also, the output_signature argument in the from_generator function specifies the structure, dtypes, and shapes of the dataset elements, which is necessary if the shapes of the elements are not fully defined. It’s a good practice to specify this to avoid potential errors or inefficiencies.
Make sure to replace len(x_features) with the actual number of features in your data.
The error message you’re seeing is likely due to the fact that TensorFlow’s tf.data.Dataset.from_tensor_slices and interleave methods cannot handle S3 paths directly. When you’re reading parquet files from S3, you need to use s3fs as you did in your original generator method.
You also tried to read the entire parquet file at once in your read_pq_file function. Instead, you should read in chunks to avoid running out of memory.
Here’s how you can modify your code:
In the code above, output_signature is used to specify the output shapes and types of the from_generator method. Also, shuffle is added to randomly shuffle the elements of the dataset for better training performance, and prefetch is used to overlap the preprocessing and model execution of a training step, which can reduce training time.
Please replace x_features and label with your actual feature columns and label columns.
Remember to adjust the batch_size and shuffle buffer size according to your available memory and dataset size. For large datasets, it’s not necessary to have a shuffle buffer size equal to the full dataset size; a size in the tens of thousands can be sufficient.