Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dask] use random ports in network setup #3823

Merged
merged 11 commits into from
Feb 24, 2021

Conversation

jmoralez
Copy link
Collaborator

This is intended to solve the problem that @jameslamb defined in #3768 by running socket.bind('', 0) with client.run, which finds a random port for each worker. I've included a small test to check that the found ports are indeed different for a LocalCluster. Happy to receive any feedback.

@jmoralez jmoralez requested a review from jameslamb as a code owner January 23, 2021 07:20
@ghost
Copy link

ghost commented Jan 23, 2021

CLA assistant check
All CLA requirements met.

Copy link
Collaborator

@jameslamb jameslamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution and interest in LightGBM @jmoralez !

It does seem like this produces the desired behavior on LocalCluster, but I'm concerned that this might be disruptive for users on multi-machine clusters, such as those that can be created with dask-cloudprovider. With the approach proposed in this PR, you lose the ability to control the range of ports that are used by lightgbm, which I think would be problematic for people working in environments that are limited by firewall rules. What do you think?

@jmoralez
Copy link
Collaborator Author

Thank you for your feedback, @jameslamb! I don't think there will be a problem by using a different type of cluster, since client.run executes the function in every worker, so the found ports will be open in that (possibly remote) worker, I will try this out tomorrow using coiled. I understand your point about loosing control of the range of ports found, however in a remote setting the client only communicates to the scheduler and I believe the ports chosen to run each worker are defined in the same way as I proposed (I haven't been able to confirm this), so I think the scheduler should be able to access the ports found for lightgbm without a problem. I'm by no means an expert in this matter and could be wrong, so I'm happy to get your thoughts on this and discuss it further.

@jameslamb
Copy link
Collaborator

I understand your point about loosing control of the range of ports found, however in a remote setting the client only communicates to the scheduler and I believe the ports chosen to run each worker are defined in the same way as I proposed (I haven't been able to confirm this), so I think the scheduler should be able to access the ports found for lightgbm without a problem

Ah! Let me add more information on this.

By default Dask does find ports for that worker-to-scheduler communication randomly, but it exposes options to control that more tightly: https://docs.dask.org/en/latest/setup/cli.html#handling-ports.

The ports for LightGBM aren't related to Dask directly. LightGBM has its own distributed learning framework written in C++ (https://github.com/microsoft/LightGBM/tree/master/src/network). This Dask package just wraps that framework. In the Dask package here, we set up one long-running Dask task per worker, and that task is running a LightGBM worker. The LightGBM workers talk to each other directly over TCP sockets (by default), and Dask has no knowledge of that communication.

I will try this out tomorrow using coiled

Thanks very much! If it's easier, I have a testing setup that uses dask-cloudprovider to run a cluster on AWS Fargate: https://github.com/jameslamb/lightgbm-dask-testing.

When I was working on #3766, I found that there were several fixes I tried which worked well on LocalCluster that didn't hold up in a multi-machine setting.

@jameslamb jameslamb added the fix label Jan 25, 2021
@jmoralez
Copy link
Collaborator Author

Hi @jameslamb. I've successfully ran the test_training_does_not_fail_on_port_conflicts test on a coiled cluster with 4 machines and 2 cpus each, however I'm not sure how to share that with you. I have a software environment called jmoralez/lightgbm-test there if you want to try it out (however you'll still need the local version which is the part I'm not sure about). I had to set sample_size=10_000 because I was getting the Connecting to rank x failed error. Do you have any suggestions for sharing the tests or how to proceed?

@jmoralez
Copy link
Collaborator Author

Hi James. I've set up the version of my branch here. I think this random port assignment could be a fallback when specifying local_listen_port=0 and otherwise use the current implementation, what do you think?

As a side note I've noticed that with few samples I get Connecting to rank x failed like crazy. For example, by running the notebook as is I got stuck at the second fit and had to do a client.restart, however with sample_size=10_000 it runs the 5 fits in 4 seconds. I believe this is related to #3797 and I experienced this a couple of months ago as well as the TCP fail with dask-lightgbm and made me drop it. I'll dig deeper into this.

@jameslamb
Copy link
Collaborator

As a side note I've noticed that with few samples I get Connecting to rank x failed like crazy. For example, by running the notebook as is I got stuck at the second fit and had to do a client.restart, however with sample_size=10_000 it runs the 5 fits in 4 seconds

Can you please open a separate issue for this so we can discuss there? I have some theories.


I've set up the version of my branch here. I think this random port assignment could be a fallback when specifying local_listen_port=0 and otherwise use the current implementation, what do you think?

really cool, thank you! I clicked into the notebook and ran it. I really really appreciate the effort you've put into this PR and into setting up this nice test harness in Coiled. I can see that the 5 consecutive runs worked!

image

I even tried changing the test and running 50 consecutive trainings back to back. No problems :).

image

I can see that this cluster's workers are on different IPs, so I think this is a great test that this PR will work in other multi-machine settings.

image

I like the idea of possibly using both this implementation and the current one, but I'd rather switch the order. I think it makes sense to have your proposed (faster) approach be the default behavior, and for the current behavior to only be used if people opt in to it because they're working in an environment with networking constraints like firewall rules that only permit specific port ranges. I think users will be ok with paying a small performance penalty to comply with their security setup, and that anyone NOT constrained in that way should get the faster method by default.

However, I don't think we should support setting local_listen_port to 0 or any other special value. I want the result of model.get_params() to return valid LightGBM parameters that you could take and use in other non-Dask LightGBM APIs, and 0 is not a valid value for local_listen_port.https://lightgbm.readthedocs.io/en/latest/Parameters.html#local_listen_port.

Ok sorry, that was a lot of information. Given all that, here's my proposal

  • if local_listen_port is missing from parameters, use your approach from this PR
  • if local_listen_port is set to 12400 (LightGBM's default), use your approach from this PR
  • if local_listen_port is set to a value other than 12400, use the existing approach where we specifically search 1000 ports from local_listen_port to local_listen_port + 999.

What do you think?

@jmoralez
Copy link
Collaborator Author

Hi James. That sounds good, I'll work on that and update this PR. Thank you!

@jmoralez
Copy link
Collaborator Author

jmoralez commented Feb 3, 2021

Hi James. Since local_listen_port is set to 12400 in case it is None then if its 12400 I search for a random open port and otherwise use the current implementation. Looking forward for your feedback.

Copy link
Collaborator

@jameslamb jameslamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Really like the changes. I agree with the decision you made about params["local_listen_port"] == 12400 inside _train(), since by that point that parameter is guaranteed to exist (

params = _choose_param_value(
main_param_name="local_listen_port",
params=params,
default_value=12400
)
).

I left some new suggestions. Once you've addressed those, I'll test this manually on a FargateCluster with dask-cloudprovider, and ask @StrikerRUS for a review.

tests/python_package_test/test_dask.py Outdated Show resolved Hide resolved
python-package/lightgbm/dask.py Outdated Show resolved Hide resolved
python-package/lightgbm/dask.py Outdated Show resolved Hide resolved
@jmoralez
Copy link
Collaborator Author

jmoralez commented Feb 3, 2021

Thank you for your feedback, James. I've included your comments.

Copy link
Collaborator

@jameslamb jameslamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just left one more style thing, otherwise looks ok to me! I'll pull this and test on AWS.


@StrikerRUS whenever you have time, could you give your opinion? I can give you the relevant context so you don't have to re-read all the discussion here and in #3768.

In #3766, I added a function _find_open_port() that the Dask package uses to build the machines list. But as I noted in #3766 (comment), the approach adds some overhead. It currently runs that check sequentially. So for a 3-worker cluster, it will go "find port on w1 then find port on w2 then find port on w3". This means there is O(n_workers) overhead introduced.

This PR proposes running a function to find a random port on every worker simultaneously, so in theory the overhead of setting up machines becomes O(1). This is faster, but it also means the ports used for machines are totally unrelated to local_listen_port.

In #3823 (comment) I told @jmoralez that in my opinion, this faster behavior should be the default, but that we need to still give people who are limited by firewall rules the ability to specify a local_listen_port and know that only that port and those near it will be searched.

So we settled on the current state of this PR, which is:

  1. if local_listen_port is missing from parameters, use the faster approach
  2. if local_listen_port is set to 12400 (LightGBM's default), use the faster approach
  3. if local_listen_port is set to a value other than 12400, use the _find_open_port() approach where we specifically search 1000 ports from local_listen_port to local_listen_port + 999.

python-package/lightgbm/dask.py Outdated Show resolved Hide resolved
@jameslamb
Copy link
Collaborator

Sorry for the inconvenience, but can you update this PR and #3897 to master? I just merged a PR that changes the Dask interface to move client from .fit() and .predict() into the estimator constructors (#3883).

Copy link
Collaborator

@StrikerRUS StrikerRUS left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmoralez Thanks a lot for the contribution!
@jameslamb Many thanks for the detailed overview of the problem behind this PR!

I'm worrying about only one thing. After this PR port 12400 is treated as "not set". This is not true from the user point of view. Imagine the following scenario. One reads LightGBM docs and finds that port for distributed training can be set via local_listen_port parameter and its default value is 12400. User doesn't have any preferences for ports and decides to leave default value (or more critical problem: harcodes 12400 in params). The user goes to firewall setting and opens 12400 port with the hope that everything will work. But it won't work because actually random, not 12400, port will be used inside LightGBM.

Things are getting worse due to that there is no dedicated port param in Dask wrapper that can be set to say None and take priority over local_listen_port in kwargs. I have only one solution for this inconsistency for now:
document in Dask docs that actually only for Dask port 12400 === random.
Somehow like the following with warning directive:

image

python-package/lightgbm/dask.py Outdated Show resolved Hide resolved
@jameslamb
Copy link
Collaborator

Thanks @StrikerRUS . @jmoralez , don't make any changes yet. I need to go re-read the code in network again to be sure I understand the implications of the choice we make here. Will report back soon.

@jameslamb
Copy link
Collaborator

I haven't forgotten about this PR. I did a lot of research this weekend, reading through LightGBM's network/ source code to understand exactly how the params local_listen_port and machine_list are used. I'll have a recommendation to share soon.

@jameslamb
Copy link
Collaborator

Ok @jmoralez we just merged #3994, so this PR should now be unblocked.

Could you please update this to the latest master and then replace my use _find_ports_for_workers() with your implementation here?

worker_address_to_port = _find_ports_for_workers(

@jmoralez
Copy link
Collaborator Author

Thank you, James! Should I remove _find_ports_for_workers, _find_open_port and modify test_find_open_port_works?

@jameslamb
Copy link
Collaborator

Thank you, James! Should I remove _find_ports_for_workers, _find_open_port and modify test_find_open_port_works?

yes please

@jameslamb jameslamb self-requested a review February 24, 2021 04:08
@jameslamb jameslamb changed the title [dask] use random ports [dask] use random ports in network setup Feb 24, 2021
Copy link
Collaborator

@jameslamb jameslamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent work! This is a nice change that simplifies the code AND makes it faster.

The tests look good to me as well. Thanks very much!

@@ -343,6 +335,19 @@ def test_classifier_pred_contrib(output, centers, client):
client.close(timeout=CLIENT_CLOSE_TIMEOUT)


def test_find_random_open_port(client):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

excellent test! I really like this

@jameslamb jameslamb merged commit 0e57657 into microsoft:master Feb 24, 2021
@jmoralez jmoralez deleted the dask-ports branch February 24, 2021 04:42
@jmoralez
Copy link
Collaborator Author

Thanks a lot for all your help with this, James!

@github-actions
Copy link

This pull request has been automatically locked since there has not been any recent activity since it was closed. To start a new related discussion, open a new issue at https://github.com/microsoft/LightGBM/issues including a reference to this.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Aug 23, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants