From bbf8dc175d9fb3af7593b503580e56cb07eac1d0 Mon Sep 17 00:00:00 2001 From: fis Date: Thu, 12 Mar 2020 21:40:57 +0800 Subject: [PATCH 1/2] Add tracker info to training api. --- demo/dask/cpu_training.py | 2 ++ doc/parameter.rst | 2 +- doc/tutorials/saving_model.rst | 4 ++-- python-package/xgboost/dask.py | 44 +++++++++++++++++++++++++++------- tests/python/test_with_dask.py | 24 +++++++++++++++++++ 5 files changed, 64 insertions(+), 12 deletions(-) diff --git a/demo/dask/cpu_training.py b/demo/dask/cpu_training.py index b86958e8f351..6c531d4d513d 100644 --- a/demo/dask/cpu_training.py +++ b/demo/dask/cpu_training.py @@ -3,10 +3,12 @@ from dask.distributed import Client from dask.distributed import LocalCluster from dask import array as da +import logging def main(client): # generate some random data for demonstration + logging.basicConfig(level=logging.INFO) m = 100000 n = 100 X = da.random.random(size=(m, n), chunks=100) diff --git a/doc/parameter.rst b/doc/parameter.rst index fcd20cfc39ea..bc2a3e621667 100644 --- a/doc/parameter.rst +++ b/doc/parameter.rst @@ -226,7 +226,7 @@ Parameters for Tree Booster See tutorial for more information Additional parameters for `hist` and 'gpu_hist' tree method -================================================ +=========================================================== * ``single_precision_histogram``, [default=``false``] diff --git a/doc/tutorials/saving_model.rst b/doc/tutorials/saving_model.rst index 44a85cb7cc30..dd9341d4f33d 100644 --- a/doc/tutorials/saving_model.rst +++ b/doc/tutorials/saving_model.rst @@ -121,7 +121,7 @@ or in R: Will print out something similiar to (not actual output as it's too long for demonstration): -.. code-block:: json +.. code-block:: js { "Learner": { @@ -201,7 +201,7 @@ Difference between saving model and dumping model XGBoost has a function called ``dump_model`` in Booster object, which lets you to export the model in a readable format like ``text``, ``json`` or ``dot`` (graphviz). The primary use case for it is for model interpretation or visualization, and is not supposed to be -loaded back to XGBoost. The JSON version has a `schema +loaded back to XGBoost. The JSON version has a `Schema `_. See next section for more info. diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index 5e7e8624fdbf..f7c40ff28519 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -47,10 +47,14 @@ LOGGER = logging.getLogger('[xgboost.dask]') -def _start_tracker(host, n_workers): +def _start_tracker(host, port, n_workers): """Start Rabit tracker """ env = {'DMLC_NUM_WORKER': n_workers} - rabit_context = RabitTracker(hostIP=host, nslave=n_workers) + if port: + rabit_context = RabitTracker(hostIP=host, port=port, port_end=port+1, + nslave=n_workers) + else: + rabit_context = RabitTracker(hostIP=host, nslave=n_workers) env.update(rabit_context.slave_envs()) rabit_context.start(n_workers) @@ -356,13 +360,21 @@ def get_worker_data_shape(self, worker): cols = c return (rows, cols) - -def _get_rabit_args(worker_map, client): +from distributed import Client +def _get_rabit_args(worker_map, client: Client, host_ip=None, port=None): '''Get rabit context arguments from data distribution in DaskDMatrix.''' - host = distributed_comm.get_address_host(client.scheduler.address) + msg = 'Please provide both IP and port' + assert (host_ip and port) or (host_ip is None and port is None), msg - env = client.run_on_scheduler(_start_tracker, host.strip('/:'), - len(worker_map)) + if host_ip: + LOGGER.info('Running tracker on: %s, %s', host_ip, str(port)) + env = client.run_on_scheduler(_start_tracker, host_ip, port, + len(worker_map)) + else: + host = distributed_comm.get_address_host(client.scheduler.address) + LOGGER.info('Running tracker on: %s', host.strip('/:')) + env = client.run_on_scheduler(_start_tracker, host.strip('/:'), port, + len(worker_map)) rabit_args = [('%s=%s' % item).encode() for item in env.items()] return rabit_args @@ -373,7 +385,8 @@ def _get_rabit_args(worker_map, client): # evaluation history is instead returned. -def train(client, params, dtrain, *args, evals=(), **kwargs): +def train(client, params, dtrain, *args, evals=(), tracker_ip=None, + tracker_port=None, **kwargs): '''Train XGBoost model. .. versionadded:: 1.0.0 @@ -383,6 +396,19 @@ def train(client, params, dtrain, *args, evals=(), **kwargs): client: dask.distributed.Client Specify the dask client used for training. Use default client returned from dask if it's set to None. + + tracker_ip: + Address for rabit tracker that runs on dask scheduler. Use + `client.scheduler.address` if unspecified. + + .. versionadded:: 1.2.0 + + tracker_port: + Port for the tracker. Search for available ports automatically if + unspecified. + + .. versionadded:: 1.2.0 + \\*\\*kwargs: Other parameters are the same as `xgboost.train` except for `evals_result`, which is returned as part of function return value @@ -410,7 +436,7 @@ def train(client, params, dtrain, *args, evals=(), **kwargs): workers = list(_get_client_workers(client).keys()) - rabit_args = _get_rabit_args(workers, client) + rabit_args = _get_rabit_args(workers, client, tracker_ip, tracker_port) def dispatched_train(worker_addr): '''Perform training on a single worker.''' diff --git a/tests/python/test_with_dask.py b/tests/python/test_with_dask.py index 744da439348c..e7249ffec2cb 100644 --- a/tests/python/test_with_dask.py +++ b/tests/python/test_with_dask.py @@ -15,6 +15,7 @@ import dask.dataframe as dd import dask.array as da from xgboost.dask import DaskDMatrix + from dask.distributed import comm except ImportError: LocalCluster = None Client = None @@ -286,3 +287,26 @@ def test_empty_dmatrix_approx(): with Client(cluster) as client: parameters = {'tree_method': 'approx'} run_empty_dmatrix(client, parameters) + + +def test_explicit_rabit_tracker(): + with LocalCluster() as cluster: + with Client(cluster) as client: + X, y = generate_array() + host = comm.get_address_host(client.scheduler.address) + port = 9091 + dtrain = xgb.dask.DaskDMatrix(client, X, y) + + out = xgb.dask.train(client, {'tree_method': 'hist'}, dtrain, + tracker_ip=host, tracker_port=port) + prediction = xgb.dask.predict(client, out, dtrain) + assert prediction.shape[0] == kRows + + assert isinstance(prediction, da.Array) + prediction = prediction.compute() + + booster = out['booster'] + single_node_predt = booster.predict( + xgb.DMatrix(X.compute()) + ) + np.testing.assert_allclose(prediction, single_node_predt) From 7f5bb0ebb0078c68cbdd6d95ed2faa885aeb09b1 Mon Sep 17 00:00:00 2001 From: fis Date: Fri, 3 Jul 2020 00:36:11 +0800 Subject: [PATCH 2/2] Cleanup. --- python-package/xgboost/dask.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index f7c40ff28519..65cfa85896d3 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -34,6 +34,11 @@ from .sklearn import XGBModel, XGBRegressorBase, XGBClassifierBase from .sklearn import xgboost_model_doc +try: + from distributed import Client +except ImportError: + Client = None + # Current status is considered as initial support, many features are # not properly supported yet. # @@ -360,7 +365,7 @@ def get_worker_data_shape(self, worker): cols = c return (rows, cols) -from distributed import Client + def _get_rabit_args(worker_map, client: Client, host_ip=None, port=None): '''Get rabit context arguments from data distribution in DaskDMatrix.''' msg = 'Please provide both IP and port'