skip to Main Content

I have a dask architecture implemented with five docker containers: a client, a scheduler, and three workers. I also have a large dask dataframe stored in parquet format in a docker volume. The dataframe was created with 3 partitions, so there are 3 files (one file per partition).

I need to run a function on the dataframe with map_partitions, where each worker will take one partition to process.

My attempt:

def my_function(dfx):
    return dfx['abc'] = dfx['def'] + 1

df = dd.read_parquet(... path to parquet file)

client = Client('127.0.0.1:8786')

with joblib.parallel_backend('dask'):
      df = df.map_partitions(my_function) 
      

Is this the correct approach? how to tell dask to use the client variable in the with statement so the functions run on the workers? Do I need df.compute() to start the execution?

Note: removing the ‘with’ statement, this works fine if the dask client is run on Jupyter. Problem is when the Dask client is run on Docker as Dask creates the workers in the web application instead of the Docker containers.

UPDATE

docker compose file:

version: '3'

services:   

  web:
    image: img-python-01
    container_name: cont_flask
    volumes:
      - c:/visualcode-py:/code
      - c:/conf:/conf
      - vol_dask_data:/data
      - vol_dask_model:/model
    ports:
      - "5000:5000"
    working_dir: /code
    environment:
      - app.config=/conf/py.app.json
      - common.config=/conf/py.common.json 
      - CUDA_VISIBLE_DEVICES=''
    entrypoint:
      - gunicorn
    command:
      - -t 7200
      - -b 0.0.0.0:5000
      - --reload
      - app.frontend.app:app
      
      
  scheduler:
    image: img-python-01
    container_name: cont_scheduler
    ports:
      - "8787:8787"
      - "8786:8786"
    entrypoint:
      - dask-scheduler

  worker:
    image: img-python-01
    depends_on:
      - scheduler
    environment:
      - app.config=/conf/py.app.json
      - common.config=/conf/py.common.json 
      - PYTHONPATH=/code
      - MODEL_PATH=/model/rfc_model.pkl
      - PREPROCESSING_PATH=/model/data_columns.pkl
      - SCHEDULER_ADDRESS=scheduler
      - SCHEDULER_PORT=8786
      - CUDA_VISIBLE_DEVICES=''
    working_dir: /code
    volumes:
      - c:/visualcode-py:/code
      - c:/conf:/conf
      - c:/winfiles:/winfiles
      - vol_dask_data:/data
      - vol_dask_model:/model
    entrypoint:
      - dask-worker
    command:
      - scheduler:8786
    
volumes:
  vol_dask_data:
     name: vol_dask_data
  vol_dask_model:
     name: vol_dask_model

Starts with docker-compose up -d --scale worker=4, the flask/gunicorn application runs on web.

Note: this configuration works fine when I run a client.submit(), workers run on containers.

UPDATE 2

This is the code that works with current docker compose file:

futures1 = client.submit(process_loans, exec_id, 1, dataset, w1)
futures2 = client.submit(process_loans, exec_id, 2, dataset, w2)

worker_responses = client.gather([futures1, futures2])

I see in the Dask dashboard that the function process_loans is running on the worker containers

2

Answers


  1. The python snippet does not appear to use the dask API efficiently. It might be that your actual function is a bit more complex, so map_partitions cannot be avoided, but let’s take a look at the simple case first:

    def my_function(dfx):
        # return dfx['abc'] = dfx['def'] + 1
        # the above returns the result of assignment
        # we need to separate the assignment and return statements
        dfx['abc'] = dfx['def'] + 1
        return dfx
    
    df = dd.read_parquet(... path to parquet file)
    
    client = Client('127.0.0.1:8786')
    
    with joblib.parallel_backend('dask'):
          df = df.map_partitions(my_function) 
    

    Another way to re-write the above (for this basic case) is to explicitly assign new column values:

    df = dd.read_parquet(... path to parquet file)
    df['abc'] = df['def'] + 1
    

    Or to use the .assign method:

    df = (
        dd.read_parquet(path_to_parquet_file)
        .assign(abc=lambda df: df['def'] + 1)
    )
    

    In terms of the other questions:

    • if a client is created outside the context, joblib will use the existing client;

    • for restricting computations to one worker per partition, the easiest one is to assign each worker a specific unit of resource foo and require that each computation uses up one foo of the available resources, see docs on resources;

    • whether .compute is necessary depends on what is happening downstream. If the data can fit into available memory and it is efficient to have data in memory, then .compute should be executed. Otherwise, it is likely to be more efficient to delay any actual computations until the final step. For example, if the end result of this code is saving of the updated data to another set of parquet files, then there is no need to issue .compute since dask will trigger computations when .to_parquet is executed.

    Login or Signup to reply.
  2. It seems to me that there is some confusion here between the various pieces of the system.

    First let me point out that the function as given produces a syntax error. Maybe you meant

    def my_function(dfx): 
        dfx['abc'] = dfx['def'] + 1
        return dfx
    

    (this was mentioned in another answer)

    Secondly, why is joblib involved at all here? You do not seem to be submitting work to joblib anywhere, it is not being used at all. All you need are dask API calls and your client.

    df2 = df.map_partitions(my_function)
    

    and then do whatever it is that you wanted to do with df2. This dos not yet begin any execution, it makes a graph of operations to be carried out.

    If you wanted to resolve the whole resultant dataset into client memory (this is probably not what you wanted!), you could do

    out = df2.compute()
    

    and this will use your distributed scheduler automatically.
    You could also be more explicit

    f = client.compute(df2)
    

    which returns a future that you can either wait on (f.result() and other functions in distributed) or allow to proceed in the background.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search