My question is relevant to my previous one at Error of using parallelizing data processing by "sentence_transformers" on 2 GPUs from Jupyter notebook. I have tried a new solution because I got an error for the proposed one.
I would like to use sentence-transformer ( to encode some English sentences. In order to improve the efficiency, I am trying to run it on 2 T4 GPUs from Jupyter notebook on GCP (Linux Debian python 3.8).
I am using dask.dataframe "map_partitions()" ( in order to process the data on multiple GPUs.
My code:
import cudf
import dask_cudf
from dask.distributed import Client, wait, get_worker, get_client
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0,1", n_workers=2, threads_per_worker=4, memory_limit="15GB", device_memory_limit="24GB", rmm_pool_size="2GB", rmm_maximum_pool_size="15GB")
client = Client(cluster), "managed") # Uses managed memory instead of "default"
import dask.dataframe as dd
import pandas as pd
from dask.multiprocessing import get
import random
df = pd.DataFrame({'col_1': ["This is sentence " + str(x) for x in random.sample(range(10**7), 10**7)], 'col_2': ["That is another sentence " + str(x) for x in random.sample(range(10**7), 10**7)]})
cudf_df = cudf.DataFrame.from_pandas(df)
dask_df = dask_cudf.from_cudf(cudf_df, npartitions=8)
from sentence_transformers import SentenceTransformer
import numpy as np
sbert_model = SentenceTransformer('all-MiniLM-L6-v2')
def test_f_str(df, args):
col1, col2, chunks = args
for col in [col1, col2]:
emb = sbert_model.encode(sentences=list(df[col].to_dask_dataframe()), batch_sze=1024, show_progress_bar=True)
semb = np.array([str(x) for x in emb])
emb_array = dask.array.from_array(semb, chunks=chunks)
df[col+'_emb'] = emb_array
return df
type(dask_df), dask_df.npartitions
(dask_cudf.core.DataFrame, 8)
chunks = dask_df.map_partitions(lambda x: len(x)).compute().to_numpy()
print(chunks, type(chunks))
[1250000 1250000 1250000 1250000 1250000 1250000 1250000 1250000] <class 'numpy.ndarray'>
new_dask_df = dask_df.map_partitions(test_f_str,
args=('col_1', 'col_2', chunks),
col_1 object
col_2 object
col_1_emb object
col_2_emb object
dtype: object
new_dask_df.head(). # error Exception: 'AttributeError("'Series' object has no attribute 'to_dask_dataframe'")'
More detailed errors:
.. /distributed/ UserWarning: Sending large graph of size 163.19 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
2023-06-21 22:43:41,581 - distributed.worker - WARNING - Compute Failed
Key: ('test_f_str-61ab17ba9c61c486f2f10e2f92b28b02', 0)
Function: subgraph_callable-82156164-6500-4cda-a935-2f8f328a
args: ( col_1 col_2
0 This is sentence 6819330 That is another sentence 8466591
1 This is sentence 7294963 That is another sentence 5338403
This is sentence 7280211 That is another sentence 8222981
3 This is sentence 2673618 That is another sentence 4661579
4 This is sentence 6749945 That is another sentence 1511266
... ... ...
1249995 This is sentence 6905835 That is another sentence 9818177
1249996 This is sentence 933624 That is another sentence 6111931
1249997 This is sentence 6352113 That is another sentence 7685898
1249998 This is sentence 4950656 That is another sentence 4090789
1249999 This is sentence 3460942 That is another sentence 6747583
[1250000 rows x 2 columns], 'from_cudf-f7c58ae2f4acb6c92d7d79480cb40f46')
kwargs: {}
Exception: 'AttributeError("'Series' object has no attribute 'to_dask_dataframe'")'
But if I tried the following example, it worked well.
temp = dask_df['col_1'].to_dask_dataframe() # no error
['This is sentence 6819330',
'This is sentence 7294963',
'This is sentence 7280211',
'This is sentence 2673618',
'This is sentence 6749945',
'This is sentence 2843628',
'This is sentence 8009669',
'This is sentence 391329',
'This is sentence 7333531',
'This is sentence 6114318']
type(temp), type(dask_df['col_1'])
(dask.dataframe.core.Series, dask_cudf.core.Series)
Could anybody let know why I got this error ? It seems that "dask_cudf.core.Series" can access "to_dask_dataframe" well ?
Your errors are likely stemming from the complexity of mixing distributed arrays inside a function designed to operate on a single partition.
If this is the goal, you can take an alternative approach. When you use
, the function you pass should be designed to accept a pandas (or cuDF) dataframe as input and generally stay in the "single CPU/GPU" land.The following code illustrates what you’re trying to do using a RAPIDS 23.06 conda environment (with sentence-transformers also installed).
First, I set up the cluster and create the fake data (using your example text data).
Then, I define a function to encode the text into embeddings. Notice that everything in this function is independent from any Dask machinery.
Finally, I use the function to encode the text. My machine has two GPUs, so I also use the Jupyterlab
magic to illustrate the performance gains of using 1 vs 2 GPUs in this way.I don’t get perfect scaling for a variety of reasons (including the call to
, overhead from Dask, etc.), but I get quite a nice improvement. If I wanted to further optimize this, I could use something like Triton Inference Server, PyTriton, or any other model serving tool that can use multiple GPUs.I would add to @nick-becker ‘s answer a short summary (and this could be added into that answer if people like).
In dask, you have high level APIs (array and dataframe) that allow you to make a compute which is partitioned and run in parallel. This things that are then done to each partition should never normally interact with dask.array or dask.dataframe, only numpy/cupy/pandas. The offending line is probably