I have a dask architecture implemented with five docker containers: a client, a scheduler, and three workers. I also have a large dask dataframe stored in parquet format in a docker volume. The dataframe was created with 3 partitions, so there are 3 files (one file per partition).
I need to run a function on the dataframe with map_partitions
, where each worker will take one partition to process.
My attempt:
def my_function(dfx):
return dfx['abc'] = dfx['def'] + 1
df = dd.read_parquet(... path to parquet file)
client = Client('127.0.0.1:8786')
with joblib.parallel_backend('dask'):
df = df.map_partitions(my_function)
Is this the correct approach? how to tell dask to use the client
variable in the with
statement so the functions run on the workers? Do I need df.compute()
to start the execution?
Note: removing the ‘with’ statement, this works fine if the dask client is run on Jupyter. Problem is when the Dask client is run on Docker as Dask creates the workers in the web application instead of the Docker containers.
UPDATE
docker compose file:
version: '3'
services:
web:
image: img-python-01
container_name: cont_flask
volumes:
- c:/visualcode-py:/code
- c:/conf:/conf
- vol_dask_data:/data
- vol_dask_model:/model
ports:
- "5000:5000"
working_dir: /code
environment:
- app.config=/conf/py.app.json
- common.config=/conf/py.common.json
- CUDA_VISIBLE_DEVICES=''
entrypoint:
- gunicorn
command:
- -t 7200
- -b 0.0.0.0:5000
- --reload
- app.frontend.app:app
scheduler:
image: img-python-01
container_name: cont_scheduler
ports:
- "8787:8787"
- "8786:8786"
entrypoint:
- dask-scheduler
worker:
image: img-python-01
depends_on:
- scheduler
environment:
- app.config=/conf/py.app.json
- common.config=/conf/py.common.json
- PYTHONPATH=/code
- MODEL_PATH=/model/rfc_model.pkl
- PREPROCESSING_PATH=/model/data_columns.pkl
- SCHEDULER_ADDRESS=scheduler
- SCHEDULER_PORT=8786
- CUDA_VISIBLE_DEVICES=''
working_dir: /code
volumes:
- c:/visualcode-py:/code
- c:/conf:/conf
- c:/winfiles:/winfiles
- vol_dask_data:/data
- vol_dask_model:/model
entrypoint:
- dask-worker
command:
- scheduler:8786
volumes:
vol_dask_data:
name: vol_dask_data
vol_dask_model:
name: vol_dask_model
Starts with docker-compose up -d --scale worker=4
, the flask/gunicorn application runs on web
.
Note: this configuration works fine when I run a client.submit()
, workers run on containers.
UPDATE 2
This is the code that works with current docker compose file:
futures1 = client.submit(process_loans, exec_id, 1, dataset, w1)
futures2 = client.submit(process_loans, exec_id, 2, dataset, w2)
worker_responses = client.gather([futures1, futures2])
I see in the Dask dashboard that the function process_loans
is running on the worker containers
2
Answers
The
python
snippet does not appear to use thedask
API efficiently. It might be that your actual function is a bit more complex, somap_partitions
cannot be avoided, but let’s take a look at the simple case first:Another way to re-write the above (for this basic case) is to explicitly assign new column values:
Or to use the
.assign
method:In terms of the other questions:
if a
client
is created outside the context,joblib
will use the existing client;for restricting computations to one worker per partition, the easiest one is to assign each worker a specific unit of resource
foo
and require that each computation uses up onefoo
of the available resources, seedocs on resources
;whether
.compute
is necessary depends on what is happening downstream. If the data can fit into available memory and it is efficient to have data in memory, then.compute
should be executed. Otherwise, it is likely to be more efficient to delay any actual computations until the final step. For example, if the end result of this code is saving of the updated data to another set of parquet files, then there is no need to issue.compute
sincedask
will trigger computations when.to_parquet
is executed.It seems to me that there is some confusion here between the various pieces of the system.
First let me point out that the function as given produces a syntax error. Maybe you meant
(this was mentioned in another answer)
Secondly, why is joblib involved at all here? You do not seem to be submitting work to joblib anywhere, it is not being used at all. All you need are dask API calls and your client.
and then do whatever it is that you wanted to do with
df2
. This dos not yet begin any execution, it makes a graph of operations to be carried out.If you wanted to resolve the whole resultant dataset into client memory (this is probably not what you wanted!), you could do
and this will use your distributed scheduler automatically.
You could also be more explicit
which returns a future that you can either wait on (
f.result()
and other functions indistributed
) or allow to proceed in the background.