Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensuring a group of tasks are scheduled together. #4485

Closed
trivialfis opened this issue Feb 5, 2021 · 9 comments · Fixed by #4503
Closed

Ensuring a group of tasks are scheduled together. #4485

trivialfis opened this issue Feb 5, 2021 · 9 comments · Fixed by #4503

Comments

@trivialfis
Copy link

trivialfis commented Feb 5, 2021

Hi all,

This is a feature request for applications that depend on MPI like communication framework like XGBoost. I will use XGBoost as an example in the following description.

Motivation

During training, XGBoost runs 1 process per-worker and needs to perform allreduce between all processes. When training 1 model at a time that's fine, we just submit a task for each worker:

def train(...):
    futures = []
    # workers are obtained from input data
    for i, worker in enumerate(workers):
        f = client.submit(train_on_worker, ..., workers=[worker])
        futures.append(f)

    results = client.gather(futures)
    return results

# User side:
output = train(...)

This works as all train_on_worker will get a chance to run eventually. But if users launch multiple training sessions simultaneously, XGBoost might hang. For example, given there are 2 available workers, and each call to train use all of them.

# user side:
output_0 = client.submit(train, Xy_0)
output_1 = client.submit(train, Xy_1)
# Notice that the sub-task launched in `train` from both `train` might interleave due to parallelization.

distributed might schedule 1 worker for each call to train. Due to the MPI allreduce call, the scheduled worker will be waiting for unscheduled worker to synchronize. And as both calls to train will be waiting (remember that we only have 2 workers), other tasks might never be scheduled, resulting into a hang.

Feature request

Notice inside the train function where we submit tasks for each worker, is there a way to ensure tasks submitted there can be scheduled as a whole?

@madsbk
Copy link
Contributor

madsbk commented Feb 5, 2021

Maybe explict-comms in dask-cuda might be useful here. Using CommsContext.run() you can schedule a task on each worker that are guaranteed to run in parallel.

@quasiben
Copy link
Member

quasiben commented Feb 5, 2021

Relaxing the worker pinning on the submit might help here. I believe this can be done with allow_other_workers: https://distributed.dask.org/en/latest/locality.html#user-control .

@trivialfis
Copy link
Author

trivialfis commented Feb 5, 2021

Thanks for the reply @madsbk @quasiben I'm looking into the CommsContext class, seems quite complicated so might take some time to see if I can modify it enough to fit the use case. Not sure if allow_other_workers is the right way to continue, relaxing worker pinning will make data copies. Also in the example, there's no other available worker even if I don't pin any worker. Let me look into it a bit more.

@madsbk madsbk mentioned this issue Feb 10, 2021
5 tasks
@madsbk
Copy link
Contributor

madsbk commented Feb 10, 2021

@trivialfis I am working on a general solution to this problem: #4503

@jrbourbeau
Copy link
Member

I'm probably just missing something here, but @madsbk why is a MultiLock lock needed instead of our existing Lock which could be acquired inside train to coordinate when we submit new tasks in train?

@madsbk
Copy link
Contributor

madsbk commented Feb 11, 2021

I'm probably just missing something here, but @madsbk why is a MultiLock lock needed instead of our existing Lock which could be acquired inside train to coordinate when we submit new tasks in train?

The thing is, we want to wait until we can get exclusive access to all workers that have input data. In the following example train() only submits jobs when all workers are ready. Notice, the set of workers will be different between calls to train() and we want multiple calls on non-overlapping sets of workers to run in parallel.

def train(...):
    futures = []
    # workers are obtained from input data
    with MultiLock(lock_names=workers):
        for i, worker in enumerate(workers):
            f = client.submit(train_on_worker, ..., workers=[worker])
            futures.append(f)
        results = client.gather(futures)
        return results

In principle this could be implemented using the Lock and Variable extension but it will very inefficient. MultiLock is a generalization of Lock -- the semantic of Lock and MultiLock is the same when len(lock_names) == 1.

@madsbk
Copy link
Contributor

madsbk commented Feb 11, 2021

Check out rapidsai/dask-cuda#523 for an use case

@trivialfis
Copy link
Author

Thanks @madsbk So if I'm not mistaken, this is similar to creating a single lock with all used workers as the name?

@madsbk
Copy link
Contributor

madsbk commented Feb 12, 2021

Thanks @madsbk So if I'm not mistaken, this is similar to creating a single lock with all used workers as the name?

Not exactly because different worker sets that overlap will also block each other.
It is the same as doing Lock(w) for w in workers with the condition that no lock is acquired before all locks can be acquired.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants