Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dask] Search for open ports only once per IP #3768

Closed
jameslamb opened this issue Jan 15, 2021 · 2 comments
Closed

[dask] Search for open ports only once per IP #3768

jameslamb opened this issue Jan 15, 2021 · 2 comments

Comments

@jameslamb
Copy link
Collaborator

Summary

In LightGBM distributed training (documented here and here), each worker needs access to a list of all other workers' IPs + a port to communicate with them over.

#3766 updated lightgbm.dask to search for open ports on each worker when creating this list, instead of just assuming a fixed range of ports would be available.

This works well, but it's a blocking operation that has to be done sequentially, so it slows down training.

The following pseudocode illustrates the process:

worker_to_port = {}
ports_seen_so_far = set()

for worker_address in worker_addresses:
    port = client.submit(
        _find_an_open_port,
       worker_address,
       ports_seen_so_far
    )
    worker_to_port[worker_address] = port
    ports_seen_so_far.add(port)

This is done sequentially because multiple Dask worker processes can live on the same IP address. So if you use a LocalCluster with 3 workers, for example, all 3 of those workers will be on your local machine. Or if you use a multi-machine cluster with nprocs > 1, multiple worker processes will run on each physical machine in the cluster.

As a result of this change, the time complexity of that "find open ports" step is O(num_worker_processes). If instead we only did the search once per IP address, then this check could be safely parallelized, and the time complexity would be more like O(nprocs).

To close this issue, change lightgbm.dask._find_ports_for_workers() (

def _find_ports_for_workers(client: Client, worker_addresses: Iterable[str], local_listen_port: int) -> Dict[str, int]:
) to instead dispatch a function to each worker machine that returns a list of as many open ports as there are Dask workers on that machine.

Motivation

This optimization would reduce the overhead introduced by using Dask for distributed training, which should make training faster.

References

This could be done following something like the code @ffineis provided in #3766 (comment).

@jameslamb
Copy link
Collaborator Author

Adding this to #2302 and closing it, per our practice for managing features. Anyone is welcome to contribute this feature! Please leave a comment here if you're interested in contributing it.

@jameslamb
Copy link
Collaborator Author

This was fixed by #3823

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant