skip to Main Content

Using tensorflow 2.9, python 3.7.

My goal is to feed tf.data.dataset into tensorflow model while reducing memory consumption.

In S3 there are parquet files that I want to use and create tf.data.Dataset for training deep learning model on keras.
First I create a generator by iterating through parquet files:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

def gen(parquet_paths, batch_size, x_features, label):
    curr_row = 0
    for pq_path in parquet_paths:
        parquet_file = pq.ParquetDataset(pq_path, filesystem=fs)
        df = parquet_file.read().to_pandas()
        df = df[x_features+[label]].astype('float32')
        len_df = len(df)

        print(len_df)

        while curr_row < len_df:
            batch_df = df.iloc[curr_row:curr_row+batch_size, :]
            X = batch_df[x_features].values
            y = batch_df[label].values
            curr_row += batch_size
            yield (X, y)

Then the generator is wrapped via tf.data.Dataset.from_generator

def wrapper(parquet_paths, batch_size, x_features, label):
    return tf.data.Dataset.from_generator(lambda: gen(parquet_paths, batch_size, x_features, label), (float32, float32))


train_ds = wrapper(train_pq_path, 512, x_features, label)
valid_ds = wrapper(val_pq_path, 99999, x_features, label)

model.fit(train_ds, validation_data=valid_ds, epochs=10)

I want to optimize and add shuffle functionality in my data pipeline.
What are some improvements I could do from here? On

Try # 1:

  • downloading parquet file to local machine then iterating through it and yielding improves speed however I’m also constrained by amount of disk storage therefore it won’t be scalable method.

Try # 2:

  • From tensorflow documentation it says using .from_generator() to create tf.data.dataset has limited portability and scalability since it must run in the same python process that created the generator and is subject to python GIL. I’ve tried to get rid of it but with memory constraints I cannot find a way to create tf.data.dataset other than from a generator.

Try # 3:
Leveraging tf.data.Dataset.interleave() to parallelize data extraction as I’m assuming network I/O is also one of the bottleneck.
Here is the changed code:

def read_pq_file(parquet_path):
    pq_file = pd.ParquetDataset(parquet_path)
    df pq_file.read().to_pandas()
    X = df[x_features].astype('float32').values
    y = df[label].astype('float32').values
    yield (X, y)

def read_and_preprocess(parquet_path):
    dataset = tf.data.Dataset.from_generator(
        lambda: read_pq_file(parquet_path), (tf.float32, tf.float32))

# train_pq_path = [.... list of parquet file paths ..]
train_dataset_paths = tf.data.Dataset.from_tensor_slices(train_pq_path)
train_dataset = train_dataset_paths.interleave(read_and_preprocess, num_parallel_calls=2, cycle_length=2, deterministic=False).batch(batch_size)

model.fit(train_dataset)

However this outputs following error: InvalidArgumentError: TypeError: not a path-like object

2

Answers


  1. It seems like the error is occurring because the interleave function is expecting a tf.data.Dataset object but is getting a string (file path) instead.

    To resolve this, you could use the tf.data.Dataset.list_files function to create a dataset of file paths, and then use the interleave function to load and preprocess the data from those paths.

    Here’s a possible solution that includes shuffling:

    def gen(parquet_path, x_features, label):
        parquet_file = pq.ParquetFile(parquet_path, filesystem=fs)
        df = parquet_file.read().to_pandas()
        df = df[x_features+[label]].astype('float32')
        X = df[x_features].values
        y = df[label].values
        return (X, y)
    
    def load_data(parquet_path):
        return tf.data.Dataset.from_generator(
                lambda: gen(parquet_path), 
                output_signature=(tf.TensorSpec(shape=(None, len(x_features)), 
    dtype=tf.float32),
                                  tf.TensorSpec(shape=(None,), 
    dtype=tf.float32))
                )
    
    def wrapper(parquet_paths, batch_size, x_features, label):
        # Create a dataset of file paths
        file_paths = tf.data.Dataset.from_tensor_slices(parquet_paths)
        # Use interleave to load and preprocess data
        dataset = file_paths.interleave(
            lambda x: load_data(x),
            cycle_length=2, 
            num_parallel_calls=tf.data.AUTOTUNE,
            deterministic=False) # To add stochasticity to the order of 
    elements
        dataset = dataset.shuffle(1000) # Tune this parameter as needed
        dataset = dataset.batch(batch_size)
        return dataset
    
    train_ds = wrapper(train_pq_path, 512, x_features, label)
    valid_ds = wrapper(val_pq_path, 99999, x_features, label)
    
    model.fit(train_ds, validation_data=valid_ds, epochs=10)
    

    This code will:

    Create a tf.data.Dataset of file paths.
    Use interleave to load and preprocess the data from those paths. The lambda x: load_data(x) function will be applied to each element of the dataset (i.e., each file path).
    Shuffle the data to add randomness, which is generally good for training deep learning models.
    Batch the data into the specified batch size.
    Remember to tune the shuffle buffer size and interleave parameters (cycle_length, num_parallel_calls) based on your system’s capabilities and the specifics of your data to optimize performance.

    Also, the output_signature argument in the from_generator function specifies the structure, dtypes, and shapes of the dataset elements, which is necessary if the shapes of the elements are not fully defined. It’s a good practice to specify this to avoid potential errors or inefficiencies.

    Make sure to replace len(x_features) with the actual number of features in your data.

    Login or Signup to reply.
  2. The error message you’re seeing is likely due to the fact that TensorFlow’s tf.data.Dataset.from_tensor_slices and interleave methods cannot handle S3 paths directly. When you’re reading parquet files from S3, you need to use s3fs as you did in your original generator method.

    You also tried to read the entire parquet file at once in your read_pq_file function. Instead, you should read in chunks to avoid running out of memory.

    Here’s how you can modify your code:

    import tensorflow as tf
    import pyarrow.parquet as pq
    import pandas as pd
    import s3fs
    
    fs = s3fs.S3FileSystem()
    
    # Define your features and label
    x_features = [...]
    label = '...'
    
    def read_pq_file(parquet_path, batch_size):
        parquet_file = pq.ParquetDataset(parquet_path, filesystem=fs)
        df = parquet_file.read().to_pandas()
        df = df[x_features+[label]].astype('float32')
        len_df = len(df)
    
        curr_row = 0
        while curr_row < len_df:
            batch_df = df.iloc[curr_row:curr_row+batch_size, :]
            X = batch_df[x_features].values
            y = batch_df[label].values
            curr_row += batch_size
            yield (X, y)
    
    def read_and_preprocess(parquet_path, batch_size):
        return tf.data.Dataset.from_generator(
            lambda: read_pq_file(parquet_path, batch_size), 
            output_signature=(
                tf.TensorSpec(shape=(None, len(x_features)), dtype=tf.float32),
                tf.TensorSpec(shape=(None,), dtype=tf.float32)
            ))
    
    # train_pq_path = [.... list of parquet file paths ..]
    train_dataset_paths = tf.data.Dataset.from_tensor_slices(train_pq_path)
    
    batch_size = 512
    train_dataset = train_dataset_paths.interleave(
        lambda x: read_and_preprocess(x, batch_size), 
        num_parallel_calls=tf.data.experimental.AUTOTUNE, 
        cycle_length=2, 
        deterministic=False)
    
    train_dataset = 
    train_dataset.shuffle(10000).prefetch(tf.data.experimental.AUTOTUNE)
    
    model.fit(train_dataset)
    

    In the code above, output_signature is used to specify the output shapes and types of the from_generator method. Also, shuffle is added to randomly shuffle the elements of the dataset for better training performance, and prefetch is used to overlap the preprocessing and model execution of a training step, which can reduce training time.

    Please replace x_features and label with your actual feature columns and label columns.

    Remember to adjust the batch_size and shuffle buffer size according to your available memory and dataset size. For large datasets, it’s not necessary to have a shuffle buffer size equal to the full dataset size; a size in the tens of thousands can be sufficient.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search