skip to Main Content

My question is relevant to my previous one at Error of using parallelizing data processing by "sentence_transformers" on 2 GPUs from Jupyter notebook. I have tried a new solution because I got an error for the proposed one.
 
I would like to use sentence-transformer ( https://www.sbert.net/) to encode some English sentences. In order to improve the efficiency, I am trying to run it on 2 T4 GPUs from Jupyter notebook on GCP (Linux Debian python 3.8). 

I am using dask.dataframe "map_partitions()" (https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.map_partitions.html) in order to process the data on multiple GPUs.
 
My code:

import cudf
import dask_cudf
 
from dask.distributed import Client, wait, get_worker, get_client
from dask_cuda import LocalCUDACluster
 
cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0,1", n_workers=2, threads_per_worker=4, memory_limit="15GB",    device_memory_limit="24GB", rmm_pool_size="2GB", rmm_maximum_pool_size="15GB")
 
client = Client(cluster)
 
client.run(cudf.set_allocator, "managed") # Uses managed memory instead of "default"
 
import dask.dataframe as dd
import pandas as pd
from dask.multiprocessing import get
import random
 
df = pd.DataFrame({'col_1': ["This is sentence " + str(x) for x in random.sample(range(10**7), 10**7)],   'col_2': ["That is another sentence " + str(x) for x in random.sample(range(10**7), 10**7)]})
 
cudf_df = cudf.DataFrame.from_pandas(df)
 
dask_df = dask_cudf.from_cudf(cudf_df, npartitions=8)
 
from sentence_transformers import SentenceTransformer
import numpy as np
 
sbert_model = SentenceTransformer('all-MiniLM-L6-v2')
 
def test_f_str(df, args):
    col1, col2, chunks = args
    for col in [col1, col2]:
        emb = sbert_model.encode(sentences=list(df[col].to_dask_dataframe()), batch_sze=1024, show_progress_bar=True)
        semb = np.array([str(x) for x in emb])
        emb_array = dask.array.from_array(semb, chunks=chunks)
        df[col+'_emb'] = emb_array
    return df
 
 
type(dask_df), dask_df.npartitions
(dask_cudf.core.DataFrame, 8)
 
chunks = dask_df.map_partitions(lambda x: len(x)).compute().to_numpy()
print(chunks, type(chunks))
 
[1250000 1250000 1250000 1250000 1250000 1250000 1250000 1250000] <class 'numpy.ndarray'>
 
new_dask_df = dask_df.map_partitions(test_f_str,
                                     args=('col_1', 'col_2', chunks),
                                     meta={'col_1':'object',
                                           'col_2':'object',
                                           'col_1_emb':'object',
                                           'col_2_emb':'object'})
 
 
new_dask_df.dtypes
 
col_1        object
    
col_2        object
    
col_1_emb    object
    
col_2_emb    object
    
dtype: object
 
new_dask_df.head(). # error  Exception: 'AttributeError("'Series' object has no attribute 'to_dask_dataframe'")'

 
 
More detailed errors:

 
..  /distributed/client.py:3106: UserWarning: Sending large graph of size 163.19 MiB.
 This may cause some slowdown.
    
Consider scattering data ahead of time and using futures.
 

warnings.warn(
    2023-06-21 22:43:41,581 - distributed.worker - WARNING - Compute Failed
    Key:       ('test_f_str-61ab17ba9c61c486f2f10e2f92b28b02', 0)
    Function:  subgraph_callable-82156164-6500-4cda-a935-2f8f328a
    

args:      (                            col_1                             col_2
    0        This is sentence 6819330  That is another sentence 8466591
    1        This is sentence 7294963  That is another sentence 5338403
    2         

This is sentence 7280211  That is another sentence 8222981
    3        This is sentence 2673618  That is another sentence 4661579
    4        This is sentence 6749945  That is another sentence 1511266
    ...                           ...                               ...
    1249995  This is sentence 6905835  That is another sentence 9818177
    1249996   This is sentence 933624  That is another sentence 6111931
    1249997  This is sentence 6352113  That is another sentence 7685898
    1249998  This is sentence 4950656  That is another sentence 4090789
    1249999  This is sentence 3460942  That is another sentence 6747583
[1250000 rows x 2 columns], 'from_cudf-f7c58ae2f4acb6c92d7d79480cb40f46')
    

kwargs:    {}
    Exception: 'AttributeError("'Series' object has no attribute 'to_dask_dataframe'")'
 

 
But if I tried the following example, it worked well.
 
  

temp = dask_df['col_1'].to_dask_dataframe() # no error
list(temp)[:10]
  
['This is sentence 6819330',
     'This is sentence 7294963',
     'This is sentence 7280211',
     'This is sentence 2673618',
     'This is sentence 6749945',
     'This is sentence 2843628',
     'This is sentence 8009669',
     'This is sentence 391329',
     'This is sentence 7333531',
     'This is sentence 6114318']
 
type(temp), type(dask_df['col_1'])
(dask.dataframe.core.Series, dask_cudf.core.Series)

 
 
Could anybody let know why I got this error ? It seems that "dask_cudf.core.Series" can access "to_dask_dataframe" well ?
 
thanks

2

Answers


  1. Your errors are likely stemming from the complexity of mixing distributed arrays inside a function designed to operate on a single partition.

    I would like to use sentence-transformer ( https://www.sbert.net/) to encode some English sentences.

    If this is the goal, you can take an alternative approach. When you use map_partitions, the function you pass should be designed to accept a pandas (or cuDF) dataframe as input and generally stay in the "single CPU/GPU" land.

    The following code illustrates what you’re trying to do using a RAPIDS 23.06 conda environment (with sentence-transformers also installed).

    First, I set up the cluster and create the fake data (using your example text data).

    from dask_cuda import LocalCUDACluster
    from distributed import Client, wait
    import dask.dataframe as dd
    from sentence_transformers import SentenceTransformer
    import random
    import pandas as pd
    
    cluster = LocalCUDACluster()
    client = Client(cluster)
    
    N = 10**6
    
    df = pd.DataFrame({
        'col_1': ["This is sentence " + str(x) for x in random.sample(range(10**7), N)],
        'col_2': ["That is another sentence " + str(x) for x in random.sample(range(10**7), N)]
    })
    
    ddf = dd.from_pandas(df, 2).persist() # one partition per GPU
    wait(ddf)
    
    ddf.head()
    col_1   col_2
    0   This is sentence 4395507    That is another sentence 3719353
    1   This is sentence 6044606    That is another sentence 2577824
    2   This is sentence 4141944    That is another sentence 1725753
    3   This is sentence 3598899    That is another sentence 7337551
    4   This is sentence 7166317    That is another sentence 6147354
    

    Then, I define a function to encode the text into embeddings. Notice that everything in this function is independent from any Dask machinery.

    def encode(df):
        model = SentenceTransformer('all-MiniLM-L6-v2')
        embeddings = model.encode(df.col_1.tolist(), batch_size=512)
        return embeddings
    

    Finally, I use the function to encode the text. My machine has two GPUs, so I also use the Jupyterlab timeit magic to illustrate the performance gains of using 1 vs 2 GPUs in this way.

    %%timeit -n1 -r2
    
    embeddings = encode(df)
    51.6 s ± 1.7 s per loop (mean ± std. dev. of 2 runs, 1 loop each)
    
    %%timeit -n1 -r2
    
    # compute brings the distributed array on the workers to the client process as a numpy array
    # which takes some extra time but is probably what you want
    # if the data isn't massive and you plan to manipulate it in the notebook
    embeddings = ddf.map_partitions(encode).compute()
    35 s ± 1.37 s per loop (mean ± std. dev. of 2 runs, 1 loop each)
    

    I don’t get perfect scaling for a variety of reasons (including the call to compute, overhead from Dask, etc.), but I get quite a nice improvement. If I wanted to further optimize this, I could use something like Triton Inference Server, PyTriton, or any other model serving tool that can use multiple GPUs.

    Login or Signup to reply.
  2. I would add to @nick-becker ‘s answer a short summary (and this could be added into that answer if people like).

    In dask, you have high level APIs (array and dataframe) that allow you to make a compute which is partitioned and run in parallel. This things that are then done to each partition should never normally interact with dask.array or dask.dataframe, only numpy/cupy/pandas. The offending line is probably

    emb_array = dask.array.from_array(semb, chunks=chunks)
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search