From b39a1617af2eff0b618f4aa93a2d45ac7f2db963 Mon Sep 17 00:00:00 2001 From: ffineis Date: Fri, 1 Jan 2021 16:20:00 -0600 Subject: [PATCH 01/16] ranker support wip --- python-package/lightgbm/dask.py | 82 +++++++++++---- tests/python_package_test/test_dask.py | 140 ++++++++++++++++++++++++- 2 files changed, 197 insertions(+), 25 deletions(-) diff --git a/python-package/lightgbm/dask.py b/python-package/lightgbm/dask.py index 66ab83591cdb..b5ec32266adc 100644 --- a/python-package/lightgbm/dask.py +++ b/python-package/lightgbm/dask.py @@ -16,7 +16,7 @@ from dask.distributed import default_client, get_worker, wait from .basic import _LIB, _safe_call -from .sklearn import LGBMClassifier, LGBMRegressor +from .sklearn import LGBMClassifier, LGBMRegressor, LGBMRanker import scipy.sparse as ss @@ -68,15 +68,22 @@ def _train_part(params, model_factory, list_of_parts, worker_addresses, return_m network_params = _build_network_params(worker_addresses, get_worker().address, local_listen_port, time_out) params.update(network_params) + is_ranker = model_factory.__qualname__ == 'LGBMRanker' + # Concatenate many parts into one - parts = tuple(zip(*list_of_parts)) - data = _concat(parts[0]) - label = _concat(parts[1]) - weight = _concat(parts[2]) if len(parts) == 3 else None + data = _concat([d['X'] for d in list_of_parts]) + label = _concat([d['y'] for d in list_of_parts]) + weight = _concat([d['weight'] for d in list_of_parts]) if 'weight' in list_of_parts[0] else None try: model = model_factory(**params) - model.fit(data, label, sample_weight=weight, **kwargs) + + if is_ranker: + group = _concat([d['group'] for d in list_of_parts]) + model.fit(data, y=label, sample_weight=weight, group=group, **kwargs) + else: + model.fit(data, label, sample_weight=weight, **kwargs) + finally: _safe_call(_LIB.LGBM_NetworkFree()) @@ -91,7 +98,7 @@ def _split_to_parts(data, is_matrix): return parts -def _train(client, data, label, params, model_factory, weight=None, **kwargs): +def _train(client, data, label, params, model_factory, sample_weight=None, group=None, **kwargs): """Inner train routine. Parameters @@ -102,20 +109,30 @@ def _train(client, data, label, params, model_factory, weight=None, **kwargs): y : dask array of shape = [n_samples] The target values (class labels in classification, real numbers in regression). params : dict - model_factory : lightgbm.LGBMClassifier or lightgbm.LGBMRegressor class + model_factory : lightgbm.LGBMClassifier, lightgbm.LGBMRegressor, or lightgbm.LGBMRanker class sample_weight : array-like of shape = [n_samples] or None, optional (default=None) - Weights of training data. + Weights of training data. + group : array-like + Group/query data, used for ranking task. sum(group) = n_samples. """ - # Split arrays/dataframes into parts. Arrange parts into tuples to enforce co-locality + # Split arrays/dataframes into parts. Arrange parts into dicts to enforce co-locality data_parts = _split_to_parts(data, is_matrix=True) label_parts = _split_to_parts(label, is_matrix=False) - if weight is None: - parts = list(map(delayed, zip(data_parts, label_parts))) - else: - weight_parts = _split_to_parts(weight, is_matrix=False) - parts = list(map(delayed, zip(data_parts, label_parts, weight_parts))) + parts = [{'X': x, 'y': y} for (x, y) in zip(data_parts, label_parts)] + + # append weight, group vectors to part dicts when needed. + if sample_weight is not None: + weight_parts = _split_to_parts(sample_weight, is_matrix=False) + for i, d in enumerate(parts): + d.update({'weight': weight_parts[i]}) + + if group is not None: + group_parts = _split_to_parts(group, is_matrix=False) + for i, d in enumerate(parts): + d.update({'group': group_parts[i]}) # Start computation in the background + parts = list(map(delayed, parts)) parts = client.compute(parts) wait(parts) @@ -179,7 +196,7 @@ def _predict(model, data, proba=False, dtype=np.float32, **kwargs): Parameters ---------- - model : + model : local lightgbm.LGBM[Classifier/Regressor/Ranker] data : dask array of shape = [n_samples, n_features] Input feature matrix. proba : bool @@ -202,13 +219,13 @@ def _predict(model, data, proba=False, dtype=np.float32, **kwargs): class _LGBMModel: - def _fit(self, model_factory, X, y=None, sample_weight=None, client=None, **kwargs): + def _fit(self, model_factory, X, y=None, sample_weight=None, group=None, client=None, **kwargs): """Docstring is inherited from the LGBMModel.""" if client is None: client = default_client() params = self.get_params(True) - model = _train(client, X, y, params, model_factory, sample_weight, **kwargs) + model = _train(client, X, y, params, model_factory, sample_weight, group, **kwargs) self.set_params(**model.get_params()) self._copy_extra_params(model, self) @@ -233,8 +250,8 @@ class DaskLGBMClassifier(_LGBMModel, LGBMClassifier): """Distributed version of lightgbm.LGBMClassifier.""" def fit(self, X, y=None, sample_weight=None, client=None, **kwargs): - """Docstring is inherited from the LGBMModel.""" - return self._fit(LGBMClassifier, X, y, sample_weight, client, **kwargs) + """Docstring is inherited from the lightgbm.LGBMClassifier.fit.""" + return self._fit(LGBMClassifier, X=X, y=y, sample_weight=sample_weight, client=client, **kwargs) fit.__doc__ = LGBMClassifier.fit.__doc__ def predict(self, X, **kwargs): @@ -262,7 +279,7 @@ class DaskLGBMRegressor(_LGBMModel, LGBMRegressor): def fit(self, X, y=None, sample_weight=None, client=None, **kwargs): """Docstring is inherited from the lightgbm.LGBMRegressor.fit.""" - return self._fit(LGBMRegressor, X, y, sample_weight, client, **kwargs) + return self._fit(LGBMRegressor, X=X, y=y, sample_weight=sample_weight, client=client, **kwargs) fit.__doc__ = LGBMRegressor.fit.__doc__ def predict(self, X, **kwargs): @@ -278,3 +295,26 @@ def to_local(self): model : lightgbm.LGBMRegressor """ return self._to_local(LGBMRegressor) + + +class DaskLGBMRanker(_LGBMModel, LGBMRanker): + """Docstring is inherited from the lightgbm.LGBMRanker.""" + + def fit(self, X, y=None, sample_weight=None, group=None, client=None, **kwargs): + """Docstring is inherited from the lightgbm.LGBMRanker.fit.""" + return self._fit(LGBMRanker, X=X, y=y, sample_weight=sample_weight, group=group, client=client, **kwargs) + fit.__doc__ = LGBMRanker.fit.__doc__ + + def predict(self, X, **kwargs): + """Docstring is inherited from the lightgbm.LGBMRanker.predict.""" + return _predict(self.to_local(), X, **kwargs) + predict.__doc__ = LGBMRanker.predict.__doc__ + + def to_local(self): + """Create regular version of lightgbm.LGBMRanker from the distributed version. + + Returns + ------- + model : lightgbm.LGBMRanker + """ + return self._to_local(LGBMRanker) diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index d512cfcbda63..8132b24ca10b 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -1,5 +1,6 @@ # coding: utf-8 import os +import itertools import sys import pytest @@ -10,11 +11,12 @@ import dask.dataframe as dd import numpy as np import pandas as pd +from scipy.stats import pearsonr, spearmanr import scipy.sparse from dask.array.utils import assert_eq -from dask_ml.metrics import accuracy_score, r2_score from distributed.utils_test import client, cluster_fixture, gen_cluster, loop from sklearn.datasets import make_blobs, make_regression +from sklearn.utils import check_random_state import lightgbm import lightgbm.dask as dlgbm @@ -36,6 +38,101 @@ def listen_port(): listen_port.port = 13000 +def r2_score_from_arrays(dy_true, dy_pred): + numerator = ((dy_true - dy_pred) ** 2).sum(axis=0, dtype="f8") + denominator = ((dy_true - dy_pred.mean(axis=0)) ** 2).sum(axis=0, dtype="f8") + return (1 - numerator / denominator).compute() + + +def _make_ranking(n_samples=100, n_features=20, n_informative=5, gmax=4, avg_gs=10, random_state=0): + """Generate a learning-to-rank dataset - feature vectors grouped together with + integer-valued graded relevance scores. Replace this with a sklearn.datasets function + if ranking objective becomes supported.""" + rnd_generator = check_random_state(random_state) + + y_vec, group_vec = np.empty((0,), dtype=int), np.empty((0,), dtype=int) + gid = 0 + + # build target, group ID vectors. + relvalues = range(gmax + 1) + while len(y_vec) < n_samples: + gsize = rnd_generator.poisson(avg_gs) + if not gsize: + continue + + rel = rnd_generator.choice(relvalues, size=gsize, replace=True) + y_vec = np.append(y_vec, rel) + group_vec = np.append(group_vec, [gid] * gsize) + gid += 1 + + y_vec, group_vec = y_vec[0:n_samples], group_vec[0:n_samples] + + # build feature data, X. Transform first few into informative features. + n_informative = max(min(n_features, n_informative), 0) + x_grid = np.linspace(0, stop=1, num=gmax + 2) + X = np.random.uniform(size=(n_samples, n_features)) + + # make first n_informative features values bucketed according to relevance scores. + def bucket_fn(z): + return np.random.uniform(x_grid[z], high=x_grid[z + 1]) + + for j in range(n_informative): + bias, coef = rnd_generator.normal(size=2) + X[:, j] = bias + coef * np.apply_along_axis(bucket_fn, axis=0, arr=y_vec) + + return X, y_vec, group_vec + + +def _create_ranking_data(n_samples=100, output='array', chunk_size=10): + X, y, g = _make_ranking(n_samples=n_samples, random_state=42) + rnd = np.random.RandomState(42) + w = rnd.rand(X.shape[0]) * 0.01 + g_rle = [sum([1 for _ in grp]) for _, grp in itertools.groupby(g)] + + if output == 'dataframe': + + # add target, weight, and group to DataFrame so that partitions abide by group boundaries. + X_df = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(X.shape[1])]) + X_df = X_df.assign(y=y, g=g, w=w) + + # set_index ensures partitions are based on group id. See https://bit.ly/3pAWyNw. + X_df.set_index('g', inplace=True) + dX = dd.from_pandas(X_df, chunksize=chunk_size) + + # separate target, weight from features. + dy = dX['y'] + dw = dX['w'] + dX = dX.drop(columns=['y', 'w']) + dg = dX.index.to_series() + + # encode group identifiers into run-length encoding, the format LightGBMRanker is expecting + # so that within each partition, sum(g) = n_samples. + dg = dg.map_partitions(lambda p: p.groupby('g', sort=False).apply(lambda z: z.shape[0])) + + elif output == 'array': + + # ranking arrays: one chunk per group. Each chunk must include all columns. + p = X.shape[1] + dX, dy, dw, dg = list(), list(), list(), list() + for g_idx, rhs in enumerate(np.cumsum(g_rle)): + lhs = rhs - g_rle[g_idx] + dX.append(da.from_array(X[lhs:rhs, :], chunks=(rhs-lhs, p))) + dy.append(da.from_array(y[lhs:rhs])) + dw.append(da.from_array(w[lhs:rhs])) + dg.append(da.from_array(g[lhs:rhs])) + + dX = da.concatenate(dX, axis=0) + dy = da.concatenate(dy, axis=0) + dw = da.concatenate(dw, axis=0) + dg = da.concatenate(dg, axis=0) + assert np.array_equal(np.array(dX.chunks[0]), g_rle) + + else: + raise ValueError('ranking data creation only supported for Dask arrays and dataframes') + + return X, y, w, g_rle, dX, dy, dw, dg + + def _create_data(objective, n_samples=100, centers=2, output='array', chunk_size=50): if objective == 'classification': X, y = make_blobs(n_samples=n_samples, centers=centers, random_state=42) @@ -74,7 +171,8 @@ def test_classifier(output, centers, client, listen_port): dask_classifier = dlgbm.DaskLGBMClassifier(time_out=5, local_listen_port=listen_port) dask_classifier = dask_classifier.fit(dX, dy, sample_weight=dw, client=client) p1 = dask_classifier.predict(dX) - s1 = accuracy_score(dy, p1) + # s1 = accuracy_score(dy, p1) + s1 = da.average(dy == p1).compute() p1 = p1.compute() local_classifier = lightgbm.LGBMClassifier() @@ -130,7 +228,7 @@ def test_regressor(output, client, listen_port): dask_regressor = dask_regressor.fit(dX, dy, client=client, sample_weight=dw) p1 = dask_regressor.predict(dX) if output != 'dataframe': - s1 = r2_score(dy, p1) + s1 = r2_score_from_arrays(dy, p1) p1 = p1.compute() local_regressor = lightgbm.LGBMRegressor(seed=42) @@ -174,7 +272,7 @@ def test_regressor_local_predict(client, listen_port): dask_regressor = dask_regressor.fit(dX, dy, sample_weight=dw, client=client) p1 = dask_regressor.predict(dX) p2 = dask_regressor.to_local().predict(X) - s1 = r2_score(dy, p1) + s1 = r2_score_from_arrays(dy, p1) p1 = p1.compute() s2 = dask_regressor.to_local().score(X, y) @@ -183,6 +281,40 @@ def test_regressor_local_predict(client, listen_port): assert_eq(s1, s2) +@pytest.mark.parametrize('output', ['array', 'dataframe']) +def test_ranker(output, client, listen_port): + X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output) + + rnk_dask = dlgbm.LGBMRanker(time_out=5, local_listen_port=listen_port, seed=42) + rnk_dask = rnk_dask.fit(dX, dy, sample_weight=dw, group=dg, client=client) + rnkvec_dask = rnk_dask.predict(dX, client=client) + rnkvec_dask = rnkvec_dask.compute() + + rnk_local = lightgbm.LGBMRanker(seed=42) + rnk_local.fit(X, y, sample_weight=w, group=g) + rnkvec_local = rnk_local.predict(X) + + # distributed ranker should do a pretty good job of ranking + assert spearmanr(rnkvec_dask, y).correlation > 0.95 + + # distributed scores should give virtually same ranking as local model. + assert pearsonr(rnkvec_dask, rnkvec_local)[0] > 0.98 + + +@pytest.mark.parametrize('output', ['array', 'dataframe']) +def test_ranker_local_predict(output, client, listen_port): + X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output) + + a = dlgbm.LGBMRanker(local_listen_port=listen_port, seed=42) + a = a.fit(dX, dy, group=dg, client=client) + rnk1 = a.predict(dX) + rnk1 = rnk1.compute() + rnk2 = a.to_local().predict(X) + + # distributed and to-local scores should be the same. + assert_eq(rnk1, rnk2) + + def test_build_network_params(): workers_ips = [ 'tcp://192.168.0.1:34545', From 19dbce6e6c085e072e7f00e4eda8c93618c37e12 Mon Sep 17 00:00:00 2001 From: ffineis Date: Sat, 2 Jan 2021 16:48:00 -0600 Subject: [PATCH 02/16] fix ranker tests --- tests/python_package_test/test_dask.py | 59 +++++++++++++++----------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index 8132b24ca10b..2263e8f183b6 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -1,4 +1,9 @@ # coding: utf-8 +"""Tests for lightgbm.dask module + +An easy way to run these tests is from the (python) docker container. +> python -m pytest /LightGBM/tests/python_package_test/test_dask.py +""" import os import itertools import sys @@ -38,9 +43,10 @@ def listen_port(): listen_port.port = 13000 -def r2_score_from_arrays(dy_true, dy_pred): - numerator = ((dy_true - dy_pred) ** 2).sum(axis=0, dtype="f8") - denominator = ((dy_true - dy_pred.mean(axis=0)) ** 2).sum(axis=0, dtype="f8") +def r2_score(dy_true, dy_pred): + """Helper function taken from dask_ml.metrics: computes coefficient of determination.""" + numerator = ((dy_true - dy_pred) ** 2).sum(axis=0) + denominator = ((dy_true - dy_pred.mean(axis=0)) ** 2).sum(axis=0) return (1 - numerator / denominator).compute() @@ -87,12 +93,13 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=10): X, y, g = _make_ranking(n_samples=n_samples, random_state=42) rnd = np.random.RandomState(42) w = rnd.rand(X.shape[0]) * 0.01 - g_rle = [sum([1 for _ in grp]) for _, grp in itertools.groupby(g)] + g_rle = np.array([sum([1 for _ in grp]) for _, grp in itertools.groupby(g)]) if output == 'dataframe': # add target, weight, and group to DataFrame so that partitions abide by group boundaries. X_df = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(X.shape[1])]) + X = X_df.copy() X_df = X_df.assign(y=y, g=g, w=w) # set_index ensures partitions are based on group id. See https://bit.ly/3pAWyNw. @@ -116,10 +123,10 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=10): dX, dy, dw, dg = list(), list(), list(), list() for g_idx, rhs in enumerate(np.cumsum(g_rle)): lhs = rhs - g_rle[g_idx] - dX.append(da.from_array(X[lhs:rhs, :], chunks=(rhs-lhs, p))) + dX.append(da.from_array(X[lhs:rhs, :], chunks=(rhs - lhs, p))) dy.append(da.from_array(y[lhs:rhs])) dw.append(da.from_array(w[lhs:rhs])) - dg.append(da.from_array(g[lhs:rhs])) + dg.append(da.from_array(np.array([g_rle[g_idx]]))) dX = da.concatenate(dX, axis=0) dy = da.concatenate(dy, axis=0) @@ -130,6 +137,9 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=10): else: raise ValueError('ranking data creation only supported for Dask arrays and dataframes') + # verify sum(g) == #data + assert dg.sum().compute() == np.sum(g_rle) + return X, y, w, g_rle, dX, dy, dw, dg @@ -171,7 +181,6 @@ def test_classifier(output, centers, client, listen_port): dask_classifier = dlgbm.DaskLGBMClassifier(time_out=5, local_listen_port=listen_port) dask_classifier = dask_classifier.fit(dX, dy, sample_weight=dw, client=client) p1 = dask_classifier.predict(dX) - # s1 = accuracy_score(dy, p1) s1 = da.average(dy == p1).compute() p1 = p1.compute() @@ -179,8 +188,7 @@ def test_classifier(output, centers, client, listen_port): local_classifier.fit(X, y, sample_weight=w) p2 = local_classifier.predict(X) s2 = local_classifier.score(X, y) - - assert_eq(s1, s2) + assert np.isclose(s1, s2) assert_eq(p1, p2) assert_eq(y, p1) @@ -228,7 +236,7 @@ def test_regressor(output, client, listen_port): dask_regressor = dask_regressor.fit(dX, dy, client=client, sample_weight=dw) p1 = dask_regressor.predict(dX) if output != 'dataframe': - s1 = r2_score_from_arrays(dy, p1) + s1 = r2_score(dy, p1) p1 = p1.compute() local_regressor = lightgbm.LGBMRegressor(seed=42) @@ -272,47 +280,48 @@ def test_regressor_local_predict(client, listen_port): dask_regressor = dask_regressor.fit(dX, dy, sample_weight=dw, client=client) p1 = dask_regressor.predict(dX) p2 = dask_regressor.to_local().predict(X) - s1 = r2_score_from_arrays(dy, p1) + s1 = r2_score(dy, p1) p1 = p1.compute() s2 = dask_regressor.to_local().score(X, y) # Predictions and scores should be the same assert_eq(p1, p2) - assert_eq(s1, s2) + assert np.isclose(s1, s2, rtol=1e-4) @pytest.mark.parametrize('output', ['array', 'dataframe']) def test_ranker(output, client, listen_port): X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output) - rnk_dask = dlgbm.LGBMRanker(time_out=5, local_listen_port=listen_port, seed=42) - rnk_dask = rnk_dask.fit(dX, dy, sample_weight=dw, group=dg, client=client) - rnkvec_dask = rnk_dask.predict(dX, client=client) + dask_ranker = dlgbm.DaskLGBMRanker(time_out=5, local_listen_port=listen_port, seed=42, min_child_samples=1) + dask_ranker = dask_ranker.fit(dX, dy, sample_weight=dw, group=dg, client=client) + rnkvec_dask = dask_ranker.predict(dX) rnkvec_dask = rnkvec_dask.compute() - rnk_local = lightgbm.LGBMRanker(seed=42) - rnk_local.fit(X, y, sample_weight=w, group=g) - rnkvec_local = rnk_local.predict(X) + local_ranker = lightgbm.LGBMRanker(seed=42, min_child_samples=1) + local_ranker.fit(X, y, sample_weight=w, group=g) + rnkvec_local = local_ranker.predict(X) # distributed ranker should do a pretty good job of ranking assert spearmanr(rnkvec_dask, y).correlation > 0.95 # distributed scores should give virtually same ranking as local model. - assert pearsonr(rnkvec_dask, rnkvec_local)[0] > 0.98 + assert pearsonr(rnkvec_dask, rnkvec_local)[0] > 0.95 @pytest.mark.parametrize('output', ['array', 'dataframe']) def test_ranker_local_predict(output, client, listen_port): X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output) - a = dlgbm.LGBMRanker(local_listen_port=listen_port, seed=42) - a = a.fit(dX, dy, group=dg, client=client) - rnk1 = a.predict(dX) - rnk1 = rnk1.compute() - rnk2 = a.to_local().predict(X) + dask_ranker = dlgbm.DaskLGBMRanker(time_out=5, local_listen_port=listen_port, seed=42, min_child_samples=1) + dask_ranker = dask_ranker.fit(dX, dy, group=dg, client=client) + rnkvec_dask = dask_ranker.predict(dX) + rnkvec_dask = rnkvec_dask.compute() + + rnkvec_local = dask_ranker.to_local().predict(X) # distributed and to-local scores should be the same. - assert_eq(rnk1, rnk2) + assert_eq(rnkvec_dask, rnkvec_local) def test_build_network_params(): From 42149a0da05045c25788f294920065ebe0b7d6a8 Mon Sep 17 00:00:00 2001 From: ffineis Date: Sun, 3 Jan 2021 23:15:37 -0600 Subject: [PATCH 03/16] fix _make_ranking rnd gen bug, add sleep to help w stoch binding port failed exceptions --- tests/python_package_test/test_dask.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index 2263e8f183b6..d9d3bd174de7 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -6,6 +6,7 @@ """ import os import itertools +import time import sys import pytest @@ -16,7 +17,7 @@ import dask.dataframe as dd import numpy as np import pandas as pd -from scipy.stats import pearsonr, spearmanr +from scipy.stats import spearmanr import scipy.sparse from dask.array.utils import assert_eq from distributed.utils_test import client, cluster_fixture, gen_cluster, loop @@ -33,7 +34,6 @@ pytest.mark.skipif(os.getenv("TASK", "") == "mpi", reason="Fails to run with MPI interface") ] - @pytest.fixture() def listen_port(): listen_port.port += 10 @@ -76,11 +76,11 @@ def _make_ranking(n_samples=100, n_features=20, n_informative=5, gmax=4, avg_gs= # build feature data, X. Transform first few into informative features. n_informative = max(min(n_features, n_informative), 0) x_grid = np.linspace(0, stop=1, num=gmax + 2) - X = np.random.uniform(size=(n_samples, n_features)) + X = rnd_generator.uniform(size=(n_samples, n_features)) # make first n_informative features values bucketed according to relevance scores. def bucket_fn(z): - return np.random.uniform(x_grid[z], high=x_grid[z + 1]) + return rnd_generator.uniform(x_grid[z], high=x_grid[z + 1]) for j in range(n_informative): bias, coef = rnd_generator.normal(size=2) @@ -293,7 +293,8 @@ def test_regressor_local_predict(client, listen_port): def test_ranker(output, client, listen_port): X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output) - dask_ranker = dlgbm.DaskLGBMRanker(time_out=5, local_listen_port=listen_port, seed=42, min_child_samples=1) + time.sleep(10) + dask_ranker = dlgbm.DaskLGBMRanker(local_listen_port=listen_port, seed=42, min_child_samples=1) dask_ranker = dask_ranker.fit(dX, dy, sample_weight=dw, group=dg, client=client) rnkvec_dask = dask_ranker.predict(dX) rnkvec_dask = rnkvec_dask.compute() @@ -302,18 +303,19 @@ def test_ranker(output, client, listen_port): local_ranker.fit(X, y, sample_weight=w, group=g) rnkvec_local = local_ranker.predict(X) - # distributed ranker should do a pretty good job of ranking - assert spearmanr(rnkvec_dask, y).correlation > 0.95 + # distributed ranker should do a pretty good job of ranking. Correlation affected by group size. + assert spearmanr(rnkvec_dask, y).correlation > 0.9 - # distributed scores should give virtually same ranking as local model. - assert pearsonr(rnkvec_dask, rnkvec_local)[0] > 0.95 + # distributed scores should give similar ranking to local model. + assert spearmanr(rnkvec_dask, rnkvec_local).correlation > 0.9 @pytest.mark.parametrize('output', ['array', 'dataframe']) def test_ranker_local_predict(output, client, listen_port): X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output) - dask_ranker = dlgbm.DaskLGBMRanker(time_out=5, local_listen_port=listen_port, seed=42, min_child_samples=1) + time.sleep(10) + dask_ranker = dlgbm.DaskLGBMRanker(local_listen_port=listen_port, seed=42, min_child_samples=1) dask_ranker = dask_ranker.fit(dX, dy, group=dg, client=client) rnkvec_dask = dask_ranker.predict(dX) rnkvec_dask = rnkvec_dask.compute() From 607c857042c530f7fa30a8dfc85bf9ab8ea9dfab Mon Sep 17 00:00:00 2001 From: ffineis Date: Wed, 6 Jan 2021 23:06:01 -0600 Subject: [PATCH 04/16] add wait_for_workers to prevent Binding port exception --- .ci/test.sh | 2 +- python-package/lightgbm/dask.py | 6 +++--- tests/python_package_test/test_dask.py | 30 +++++++++++++++----------- 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/.ci/test.sh b/.ci/test.sh index a32be65a93ed..7fa8c36b772e 100755 --- a/.ci/test.sh +++ b/.ci/test.sh @@ -52,7 +52,7 @@ if [[ $TRAVIS == "true" ]] && [[ $TASK == "lint" ]]; then "r-lintr>=2.0" pip install --user cpplint echo "Linting Python code" - pycodestyle --ignore=E501,W503 --exclude=./compute,./eigen,./.nuget,./external_libs . || exit -1 + pycodestyle --ignore=E402,E501,W503 --exclude=./compute,./eigen,./.nuget,./external_libs . || exit -1 pydocstyle --convention=numpy --add-ignore=D105 --match-dir="^(?!^compute|^eigen|external_libs|test|example).*" --match="(?!^test_|setup).*\.py" . || exit -1 echo "Linting R code" Rscript ${BUILD_DIRECTORY}/.ci/lint_r_code.R ${BUILD_DIRECTORY} || exit -1 diff --git a/python-package/lightgbm/dask.py b/python-package/lightgbm/dask.py index b5ec32266adc..ccace02405f3 100644 --- a/python-package/lightgbm/dask.py +++ b/python-package/lightgbm/dask.py @@ -82,7 +82,7 @@ def _train_part(params, model_factory, list_of_parts, worker_addresses, return_m group = _concat([d['group'] for d in list_of_parts]) model.fit(data, y=label, sample_weight=weight, group=group, **kwargs) else: - model.fit(data, label, sample_weight=weight, **kwargs) + model.fit(data, y=label, sample_weight=weight, **kwargs) finally: _safe_call(_LIB.LGBM_NetworkFree()) @@ -124,12 +124,12 @@ def _train(client, data, label, params, model_factory, sample_weight=None, group if sample_weight is not None: weight_parts = _split_to_parts(sample_weight, is_matrix=False) for i, d in enumerate(parts): - d.update({'weight': weight_parts[i]}) + parts[i] = {**d, 'weight': weight_parts[i]} if group is not None: group_parts = _split_to_parts(group, is_matrix=False) for i, d in enumerate(parts): - d.update({'group': group_parts[i]}) + parts[i] = {**d, 'group': group_parts[i]} # Start computation in the background parts = list(map(delayed, parts)) diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index d9d3bd174de7..734d5ea4f387 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -34,6 +34,7 @@ pytest.mark.skipif(os.getenv("TASK", "") == "mpi", reason="Fails to run with MPI interface") ] + @pytest.fixture() def listen_port(): listen_port.port += 10 @@ -50,10 +51,10 @@ def r2_score(dy_true, dy_pred): return (1 - numerator / denominator).compute() -def _make_ranking(n_samples=100, n_features=20, n_informative=5, gmax=4, avg_gs=10, random_state=0): +def _make_ranking(n_samples=100, n_features=20, n_informative=5, gmax=1, random_gs=True, avg_gs=10, random_state=0): """Generate a learning-to-rank dataset - feature vectors grouped together with integer-valued graded relevance scores. Replace this with a sklearn.datasets function - if ranking objective becomes supported.""" + if ranking objective becomes supported in sklearn.datasets module.""" rnd_generator = check_random_state(random_state) y_vec, group_vec = np.empty((0,), dtype=int), np.empty((0,), dtype=int) @@ -62,7 +63,7 @@ def _make_ranking(n_samples=100, n_features=20, n_informative=5, gmax=4, avg_gs= # build target, group ID vectors. relvalues = range(gmax + 1) while len(y_vec) < n_samples: - gsize = rnd_generator.poisson(avg_gs) + gsize = avg_gs if not random_gs else rnd_generator.poisson(avg_gs) if not gsize: continue @@ -132,14 +133,10 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=10): dy = da.concatenate(dy, axis=0) dw = da.concatenate(dw, axis=0) dg = da.concatenate(dg, axis=0) - assert np.array_equal(np.array(dX.chunks[0]), g_rle) else: raise ValueError('ranking data creation only supported for Dask arrays and dataframes') - # verify sum(g) == #data - assert dg.sum().compute() == np.sum(g_rle) - return X, y, w, g_rle, dX, dy, dw, dg @@ -293,8 +290,11 @@ def test_regressor_local_predict(client, listen_port): def test_ranker(output, client, listen_port): X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output) + # Avoid lightgbm.basic.LightGBMError: Binding port 13xxx failed exceptions. + client.wait_for_workers() time.sleep(10) - dask_ranker = dlgbm.DaskLGBMRanker(local_listen_port=listen_port, seed=42, min_child_samples=1) + + dask_ranker = dlgbm.DaskLGBMRanker(time_out=5, local_listen_port=listen_port, seed=42, min_child_samples=1) dask_ranker = dask_ranker.fit(dX, dy, sample_weight=dw, group=dg, client=client) rnkvec_dask = dask_ranker.predict(dX) rnkvec_dask = rnkvec_dask.compute() @@ -303,19 +303,23 @@ def test_ranker(output, client, listen_port): local_ranker.fit(X, y, sample_weight=w, group=g) rnkvec_local = local_ranker.predict(X) - # distributed ranker should do a pretty good job of ranking. Correlation affected by group size. - assert spearmanr(rnkvec_dask, y).correlation > 0.9 + # distributed ranker should be able to rank decently well. + dcor = spearmanr(rnkvec_dask, y).correlation + assert dcor > 0.6 - # distributed scores should give similar ranking to local model. - assert spearmanr(rnkvec_dask, rnkvec_local).correlation > 0.9 + # relative difference between distributed ranker and local ranker spearman corr should be small. + lcor = spearmanr(rnkvec_local, y).correlation + assert np.abs(dcor - lcor) / lcor < 0.01 @pytest.mark.parametrize('output', ['array', 'dataframe']) def test_ranker_local_predict(output, client, listen_port): X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output) + client.wait_for_workers() time.sleep(10) - dask_ranker = dlgbm.DaskLGBMRanker(local_listen_port=listen_port, seed=42, min_child_samples=1) + + dask_ranker = dlgbm.DaskLGBMRanker(time_out=5, local_listen_port=listen_port, seed=42, min_child_samples=1) dask_ranker = dask_ranker.fit(dX, dy, group=dg, client=client) rnkvec_dask = dask_ranker.predict(dX) rnkvec_dask = rnkvec_dask.compute() From a33a4de561cbd3aa1e8917e429cee02e6949d1df Mon Sep 17 00:00:00 2001 From: ffineis Date: Mon, 11 Jan 2021 14:58:53 -0500 Subject: [PATCH 05/16] another attempt to stabilize test_dask.py --- tests/python_package_test/test_dask.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index 734d5ea4f387..b6aad125a2cb 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -6,12 +6,12 @@ """ import os import itertools -import time import sys +import time import pytest -if not sys.platform.startswith("linux"): - pytest.skip("lightgbm.dask is currently supported in Linux environments", allow_module_level=True) +if not sys.platform.startswith('linux'): + pytest.skip('lightgbm.dask is currently supported in Linux environments', allow_module_level=True) import dask.array as da import dask.dataframe as dd @@ -31,7 +31,7 @@ data_centers = [[[-4, -4], [4, 4]], [[-4, -4], [4, 4], [-4, 4]]] pytestmark = [ - pytest.mark.skipif(os.getenv("TASK", "") == "mpi", reason="Fails to run with MPI interface") + pytest.mark.skipif(os.getenv('TASK', '') == 'mpi', reason='Fails to run with MPI interface') ] @@ -51,7 +51,7 @@ def r2_score(dy_true, dy_pred): return (1 - numerator / denominator).compute() -def _make_ranking(n_samples=100, n_features=20, n_informative=5, gmax=1, random_gs=True, avg_gs=10, random_state=0): +def _make_ranking(n_samples=100, n_features=20, n_informative=5, gmax=1, random_gs=False, avg_gs=10, random_state=0): """Generate a learning-to-rank dataset - feature vectors grouped together with integer-valued graded relevance scores. Replace this with a sklearn.datasets function if ranking objective becomes supported in sklearn.datasets module.""" @@ -90,7 +90,7 @@ def bucket_fn(z): return X, y_vec, group_vec -def _create_ranking_data(n_samples=100, output='array', chunk_size=10): +def _create_ranking_data(n_samples=100, output='array', chunk_size=50): X, y, g = _make_ranking(n_samples=n_samples, random_state=42) rnd = np.random.RandomState(42) w = rnd.rand(X.shape[0]) * 0.01 @@ -291,7 +291,6 @@ def test_ranker(output, client, listen_port): X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output) # Avoid lightgbm.basic.LightGBMError: Binding port 13xxx failed exceptions. - client.wait_for_workers() time.sleep(10) dask_ranker = dlgbm.DaskLGBMRanker(time_out=5, local_listen_port=listen_port, seed=42, min_child_samples=1) @@ -309,14 +308,13 @@ def test_ranker(output, client, listen_port): # relative difference between distributed ranker and local ranker spearman corr should be small. lcor = spearmanr(rnkvec_local, y).correlation - assert np.abs(dcor - lcor) / lcor < 0.01 + assert np.abs(dcor - lcor) / lcor < 0.03 @pytest.mark.parametrize('output', ['array', 'dataframe']) def test_ranker_local_predict(output, client, listen_port): X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output) - client.wait_for_workers() time.sleep(10) dask_ranker = dlgbm.DaskLGBMRanker(time_out=5, local_listen_port=listen_port, seed=42, min_child_samples=1) From d6e520945413cdb60463370363b265415bf29166 Mon Sep 17 00:00:00 2001 From: ffineis Date: Sat, 16 Jan 2021 16:08:59 -0600 Subject: [PATCH 06/16] requested changes: docstrings, dask_ml, tuples for list_of_parts --- .ci/test.sh | 2 +- python-package/lightgbm/dask.py | 40 ++++++++++++++------------ tests/python_package_test/test_dask.py | 2 +- 3 files changed, 24 insertions(+), 20 deletions(-) diff --git a/.ci/test.sh b/.ci/test.sh index 2b994171734c..447d6b13bc8c 100755 --- a/.ci/test.sh +++ b/.ci/test.sh @@ -50,7 +50,7 @@ if [[ $TASK == "lint" ]]; then "r-lintr>=2.0" pip install --user cpplint echo "Linting Python code" - pycodestyle --ignore=E402,E501,W503 --exclude=./compute,./eigen,./.nuget,./external_libs . || exit -1 + pycodestyle --ignore=E501,W503 --exclude=./compute,./eigen,./.nuget,./external_libs . || exit -1 pydocstyle --convention=numpy --add-ignore=D105 --match-dir="^(?!^compute|^eigen|external_libs|test|example).*" --match="(?!^test_|setup).*\.py" . || exit -1 echo "Linting R code" Rscript ${BUILD_DIRECTORY}/.ci/lint_r_code.R ${BUILD_DIRECTORY} || exit -1 diff --git a/python-package/lightgbm/dask.py b/python-package/lightgbm/dask.py index 22fe7743d7d2..80c223b94f7b 100644 --- a/python-package/lightgbm/dask.py +++ b/python-package/lightgbm/dask.py @@ -135,17 +135,19 @@ def _train_part(params, model_factory, list_of_parts, worker_address_to_port, re is_ranker = model_factory.__qualname__ == 'LGBMRanker' # Concatenate many parts into one - data = _concat([d['X'] for d in list_of_parts]) - label = _concat([d['y'] for d in list_of_parts]) - weight = _concat([d['weight'] for d in list_of_parts]) if 'weight' in list_of_parts[0] else None + parts = tuple(zip(*list_of_parts)) + data = _concat(parts[0]) + label = _concat(parts[1]) try: model = model_factory(**params) if is_ranker: - group = _concat([d['group'] for d in list_of_parts]) + group = _concat(parts[-1]) + weight = _concat(parts[2]) if len(parts) == 4 else None model.fit(data, y=label, sample_weight=weight, group=group, **kwargs) else: + weight = _concat(parts[2]) if len(parts) == 3 else None model.fit(data, y=label, sample_weight=weight, **kwargs) finally: @@ -176,24 +178,26 @@ def _train(client, data, label, params, model_factory, sample_weight=None, group model_factory : lightgbm.LGBMClassifier, lightgbm.LGBMRegressor, or lightgbm.LGBMRanker class sample_weight : array-like of shape = [n_samples] or None, optional (default=None) Weights of training data. - group : array-like - Group/query data, used for ranking task. sum(group) = n_samples. + group : array-like where sum(group) = [n_samples] or None for non-ranking objectives (default=None) + Group/query data, only used for ranking task. sum(group) = n_samples. For example, + if you have a 100-record dataset with `group = [10, 20, 40, 10, 10]`, that means that you have + 5 groups, where the first 10 records are in the first group, records 11-30 are the second group, etc. """ # Split arrays/dataframes into parts. Arrange parts into dicts to enforce co-locality data_parts = _split_to_parts(data, is_matrix=True) label_parts = _split_to_parts(label, is_matrix=False) - parts = [{'X': x, 'y': y} for (x, y) in zip(data_parts, label_parts)] - - # append weight, group vectors to part dicts when needed. - if sample_weight is not None: - weight_parts = _split_to_parts(sample_weight, is_matrix=False) - for i, d in enumerate(parts): - parts[i] = {**d, 'weight': weight_parts[i]} - - if group is not None: - group_parts = _split_to_parts(group, is_matrix=False) - for i, d in enumerate(parts): - parts[i] = {**d, 'group': group_parts[i]} + weight_parts = _split_to_parts(sample_weight, is_matrix=False) if sample_weight is not None else None + group_parts = _split_to_parts(group, is_matrix=False) if group is not None else None + + # choose between four options of (sample_weight, group) being (un)specified + if weight_parts is None and group_parts is None: + parts = zip(data_parts, label_parts) + elif weight_parts is not None and group_parts is None: + parts = zip(data_parts, label_parts, weight_parts) + elif weight_parts is None and group_parts is not None: + parts = zip(data_parts, label_parts, group_parts) + else: + parts = zip(data_parts, label_parts, weight_parts, group_parts) # Start computation in the background parts = list(map(delayed, parts)) diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index 89205b968e9f..03d153ec1cc2 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -365,7 +365,7 @@ def test_ranker(output, client, listen_port, group): # difference between distributed ranker and local ranker spearman corr should be small. lcor = spearmanr(rnkvec_local, y).correlation - assert np.abs(dcor - lcor) < 0.003 + assert np.abs(dcor - lcor) < 0.03 @pytest.mark.parametrize('output', ['array', 'dataframe']) From 1d6054b1274f08d32b26536db7a22077d2387949 Mon Sep 17 00:00:00 2001 From: ffineis Date: Sat, 16 Jan 2021 16:30:14 -0600 Subject: [PATCH 07/16] fix lint bug, add group param to test_ranker_local_predict --- tests/python_package_test/test_dask.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index 03d153ec1cc2..cf816b263929 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -46,8 +46,8 @@ def listen_port(): listen_port.port = 13000 -def _make_ranking(n_samples=100, n_features=20, n_informative=5, gmax=2 - , group=None, random_gs=False, avg_gs=10, random_state=0): +def _make_ranking(n_samples=100, n_features=20, n_informative=5, gmax=2, + group=None, random_gs=False, avg_gs=10, random_state=0): """Generate a learning-to-rank dataset - feature vectors grouped together with integer-valued graded relevance scores. Replace this with a sklearn.datasets function if ranking objective becomes supported in sklearn.datasets module. @@ -341,8 +341,8 @@ def test_regressor_local_predict(client, listen_port): s2 = dask_regressor.to_local().score(X, y) # Predictions and scores should be the same + assert_eq(s1, s2) assert_eq(p1, p2) - assert np.isclose(s1, s2, rtol=1e-4) @pytest.mark.parametrize('output', ['array', 'dataframe']) @@ -371,7 +371,7 @@ def test_ranker(output, client, listen_port, group): @pytest.mark.parametrize('output', ['array', 'dataframe']) @pytest.mark.parametrize('group', [None, group_sizes]) def test_ranker_local_predict(output, client, listen_port, group): - X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output) + X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output, group=group) dask_ranker = dlgbm.DaskLGBMRanker(time_out=5, local_listen_port=listen_port, seed=42, min_child_samples=1) dask_ranker = dask_ranker.fit(dX, dy, group=dg, client=client) From 23216925e1a0bc706e68933ccfc6a76daa801967 Mon Sep 17 00:00:00 2001 From: ffineis Date: Mon, 18 Jan 2021 22:26:27 -0600 Subject: [PATCH 08/16] decorator to skip tests with errors on fixture teardown --- tests/python_package_test/test_dask.py | 41 +++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index cf816b263929..e85b9cc5cebc 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -4,8 +4,9 @@ An easy way to run these tests is from the (python) docker container. Also see lightgbm-dask-testing repo: https://github.com/jameslamb/lightgbm-dask-testing """ -import os +import functools import itertools +import os import socket import sys @@ -13,6 +14,7 @@ if not sys.platform.startswith('linux'): pytest.skip('lightgbm.dask is currently supported in Linux environments', allow_module_level=True) +from asyncio import TimeoutError import dask.array as da import dask.dataframe as dd import numpy as np @@ -37,6 +39,26 @@ ] +def handle_fixture_timeout_errors(f): + """ + Decorator to avoid asyncio.exceptions.TimeoutErrors triggered on teardown of dask.utils_test + client fixture (do_disconnect). See GitHub issue: https://github.com/dask/dask-ml/issues/611 + Inform pytest to skip the tests instead. + """ + @functools.wraps(f) + def wrapper(*args, **kwargs): + try: + return f(*args, **kwargs) + except TimeoutError as e: + if 'Task cancelled' in str(e): + msg = 'Ignoring do_disconnect error in fixture teardown' + str(e) + pytest.skip(msg) + else: + raise(e) + + return wrapper + + @pytest.fixture() def listen_port(): listen_port.port += 10 @@ -211,6 +233,7 @@ def _create_data(objective, n_samples=100, centers=2, output='array', chunk_size @pytest.mark.parametrize('output', data_output) @pytest.mark.parametrize('centers', data_centers) +@handle_fixture_timeout_errors def test_classifier(output, centers, client, listen_port): X, y, w, dX, dy, dw = _create_data('classification', output=output, centers=centers) @@ -231,6 +254,7 @@ def test_classifier(output, centers, client, listen_port): assert_eq(y, p2) +@handle_fixture_timeout_errors def test_training_does_not_fail_on_port_conflicts(client): _, _, _, dX, dy, dw = _create_data('classification', output='array') @@ -253,6 +277,7 @@ def test_training_does_not_fail_on_port_conflicts(client): @pytest.mark.parametrize('output', data_output) @pytest.mark.parametrize('centers', data_centers) +@handle_fixture_timeout_errors def test_classifier_proba(output, centers, client, listen_port): X, y, w, dX, dy, dw = _create_data('classification', output=output, centers=centers) @@ -268,6 +293,7 @@ def test_classifier_proba(output, centers, client, listen_port): assert_eq(p1, p2, atol=0.3) +@handle_fixture_timeout_errors def test_classifier_local_predict(client, listen_port): X, y, w, dX, dy, dw = _create_data('classification', output='array') @@ -285,6 +311,7 @@ def test_classifier_local_predict(client, listen_port): @pytest.mark.parametrize('output', data_output) +@handle_fixture_timeout_errors def test_regressor(output, client, listen_port): X, y, w, dX, dy, dw = _create_data('regression', output=output) @@ -311,6 +338,7 @@ def test_regressor(output, client, listen_port): @pytest.mark.parametrize('output', data_output) @pytest.mark.parametrize('alpha', [.1, .5, .9]) +@handle_fixture_timeout_errors def test_regressor_quantile(output, client, listen_port, alpha): X, y, w, dX, dy, dw = _create_data('regression', output=output) @@ -329,6 +357,7 @@ def test_regressor_quantile(output, client, listen_port, alpha): np.testing.assert_allclose(q2, alpha, atol=0.2) +@handle_fixture_timeout_errors def test_regressor_local_predict(client, listen_port): X, y, w, dX, dy, dw = _create_data('regression', output='array') @@ -347,15 +376,17 @@ def test_regressor_local_predict(client, listen_port): @pytest.mark.parametrize('output', ['array', 'dataframe']) @pytest.mark.parametrize('group', [None, group_sizes]) +@handle_fixture_timeout_errors def test_ranker(output, client, listen_port, group): X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output, group=group) - dask_ranker = dlgbm.DaskLGBMRanker(time_out=5, local_listen_port=listen_port, seed=42, min_child_samples=1) + dask_ranker = dlgbm.DaskLGBMRanker(time_out=5, local_listen_port=listen_port, + n_estimators=10, num_leaves=10, seed=42, min_child_samples=1) dask_ranker = dask_ranker.fit(dX, dy, sample_weight=dw, group=dg, client=client) rnkvec_dask = dask_ranker.predict(dX) rnkvec_dask = rnkvec_dask.compute() - local_ranker = lightgbm.LGBMRanker(seed=42, min_child_samples=1) + local_ranker = lightgbm.LGBMRanker(n_estimators=10, num_leaves=10, seed=42, min_child_samples=1) local_ranker.fit(X, y, sample_weight=w, group=g) rnkvec_local = local_ranker.predict(X) @@ -370,10 +401,12 @@ def test_ranker(output, client, listen_port, group): @pytest.mark.parametrize('output', ['array', 'dataframe']) @pytest.mark.parametrize('group', [None, group_sizes]) +@handle_fixture_timeout_errors def test_ranker_local_predict(output, client, listen_port, group): X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output, group=group) - dask_ranker = dlgbm.DaskLGBMRanker(time_out=5, local_listen_port=listen_port, seed=42, min_child_samples=1) + dask_ranker = dlgbm.DaskLGBMRanker(time_out=5, local_listen_port=listen_port, + n_estimators=10, num_leaves=10, seed=42, min_child_samples=1) dask_ranker = dask_ranker.fit(dX, dy, group=dg, client=client) rnkvec_dask = dask_ranker.predict(dX) rnkvec_dask = rnkvec_dask.compute() From d472c8237f127447a098df87d8321ef018b744ea Mon Sep 17 00:00:00 2001 From: ffineis Date: Tue, 19 Jan 2021 18:52:09 -0600 Subject: [PATCH 09/16] remove gpu ranker tests, reduce make_ranking data complexity --- tests/python_package_test/test_dask.py | 31 +++++++++++++------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index 39a9a78e1535..b53b31f48db2 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -14,7 +14,7 @@ if not sys.platform.startswith('linux'): pytest.skip('lightgbm.dask is currently supported in Linux environments', allow_module_level=True) -from asyncio import TimeoutError +from asyncio import TimeoutError, CancelledError import dask.array as da import dask.dataframe as dd import numpy as np @@ -49,8 +49,8 @@ def handle_fixture_timeout_errors(f): def wrapper(*args, **kwargs): try: return f(*args, **kwargs) - except TimeoutError as e: - msg = 'Ignoring Timeout Error in fixture teardown: ' + str(e) + except (TimeoutError, CancelledError) as e: + msg = 'Ignoring error in fixture teardown: ' + str(e) pytest.skip(msg) return wrapper @@ -134,16 +134,11 @@ def _make_ranking(n_samples=100, n_features=20, n_informative=5, gmax=2, # build feature data, X. Transform first few into informative features. n_informative = max(min(n_features, n_informative), 0) - x_grid = np.linspace(0, stop=1, num=gmax + 2) X = rnd_generator.uniform(size=(n_samples, n_features)) - # make first n_informative features values bucketed according to relevance scores. - def bucket_fn(z): - return rnd_generator.uniform(x_grid[z], high=x_grid[z + 1]) - for j in range(n_informative): bias, coef = rnd_generator.normal(size=2) - X[:, j] = bias + coef * np.apply_along_axis(bucket_fn, axis=0, arr=y_vec) + X[:, j] = bias + coef * y_vec return X, y_vec, group_id_vec @@ -413,25 +408,29 @@ def test_regressor_local_predict(client, listen_port): @pytest.mark.parametrize('group', [None, group_sizes]) @handle_fixture_timeout_errors def test_ranker(output, client, listen_port, group): + + if os.getenv('TASK', '') == 'gpu': + pytest.skip('Ranker fails to run with GPU interface') + X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output, group=group) + # -- use many trees + leaves to overfit, help ensure that dask data-parallel strategy matches that of + # -- serial learner. See https://github.com/microsoft/LightGBM/issues/3292#issuecomment-671288210. dask_ranker = dlgbm.DaskLGBMRanker(time_out=5, local_listen_port=listen_port, - n_estimators=10, num_leaves=10, seed=42, min_child_samples=1) + n_estimators=50, num_leaves=20, seed=42, min_child_samples=1) dask_ranker = dask_ranker.fit(dX, dy, sample_weight=dw, group=dg, client=client) rnkvec_dask = dask_ranker.predict(dX) rnkvec_dask = rnkvec_dask.compute() - local_ranker = lightgbm.LGBMRanker(n_estimators=10, num_leaves=10, seed=42, min_child_samples=1) + local_ranker = lightgbm.LGBMRanker(n_estimators=50, num_leaves=20, seed=42, min_child_samples=1) local_ranker.fit(X, y, sample_weight=w, group=g) rnkvec_local = local_ranker.predict(X) - # distributed ranker should be able to rank decently well. + # distributed ranker should be able to rank decently well and should + # have high rank correlation with scores from serial ranker. dcor = spearmanr(rnkvec_dask, y).correlation assert dcor > 0.6 - - # difference between distributed ranker and local ranker spearman corr should be small. - lcor = spearmanr(rnkvec_local, y).correlation - assert np.abs(dcor - lcor) < 0.03 + assert spearmanr(rnkvec_dask, rnkvec_local).correlation > 0.9 @pytest.mark.parametrize('output', ['array', 'dataframe']) From d9efb5b82711e410ed5fbd5d60efb8a1ffb0e339 Mon Sep 17 00:00:00 2001 From: ffineis Date: Tue, 19 Jan 2021 23:37:56 -0600 Subject: [PATCH 10/16] another attempt to silence client, decorator does not silence fixture errors --- tests/python_package_test/test_dask.py | 50 ++++++++++++-------------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index 6149cd6a6804..09965d92692d 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -4,7 +4,6 @@ An easy way to run these tests is from the (python) docker container. Also see lightgbm-dask-testing repo: https://github.com/jameslamb/lightgbm-dask-testing """ -import functools import itertools import os import socket @@ -14,7 +13,6 @@ if not sys.platform.startswith('linux'): pytest.skip('lightgbm.dask is currently supported in Linux environments', allow_module_level=True) -from asyncio import TimeoutError, CancelledError import dask.array as da import dask.dataframe as dd import numpy as np @@ -39,23 +37,6 @@ ] -def handle_fixture_timeout_errors(f): - """ - Decorator to avoid asyncio.exceptions.TimeoutErrors triggered on teardown of dask.utils_test - client fixture (do_disconnect). See GitHub issue: https://github.com/dask/dask-ml/issues/611 - Inform pytest to skip the tests instead. - """ - @functools.wraps(f) - def wrapper(*args, **kwargs): - try: - return f(*args, **kwargs) - except (TimeoutError, CancelledError) as e: - msg = 'Ignoring error in fixture teardown: ' + str(e) - pytest.skip(msg) - - return wrapper - - @pytest.fixture() def listen_port(): listen_port.port += 10 @@ -225,7 +206,6 @@ def _create_data(objective, n_samples=100, centers=2, output='array', chunk_size @pytest.mark.parametrize('output', data_output) @pytest.mark.parametrize('centers', data_centers) -@handle_fixture_timeout_errors def test_classifier(output, centers, client, listen_port): X, y, w, dX, dy, dw = _create_data('classification', output=output, centers=centers) @@ -253,8 +233,9 @@ def test_classifier(output, centers, client, listen_port): assert_eq(y, p2) assert_eq(p1_proba, p2_proba, atol=0.3) + client.close() + -@handle_fixture_timeout_errors def test_training_does_not_fail_on_port_conflicts(client): _, _, _, dX, dy, dw = _create_data('classification', output='array') @@ -276,10 +257,11 @@ def test_training_does_not_fail_on_port_conflicts(client): ) assert dask_classifier.booster_ + client.close() + @pytest.mark.parametrize('output', data_output) @pytest.mark.parametrize('centers', data_centers) -@handle_fixture_timeout_errors def test_classifier_proba(output, centers, client, listen_port): X, y, w, dX, dy, dw = _create_data('classification', output=output, centers=centers) @@ -294,8 +276,9 @@ def test_classifier_proba(output, centers, client, listen_port): assert_eq(p1, p2, atol=0.3) + client.close() + -@handle_fixture_timeout_errors def test_classifier_local_predict(client, listen_port): X, y, w, dX, dy, dw = _create_data('classification', output='array') @@ -316,9 +299,10 @@ def test_classifier_local_predict(client, listen_port): assert_eq(y, p1) assert_eq(y, p2) + client.close() + @pytest.mark.parametrize('output', data_output) -@handle_fixture_timeout_errors def test_regressor(output, client, listen_port): X, y, w, dX, dy, dw = _create_data('regression', output=output) @@ -347,10 +331,11 @@ def test_regressor(output, client, listen_port): assert_eq(y, p1, rtol=1., atol=100.) assert_eq(y, p2, rtol=1., atol=50.) + client.close() + @pytest.mark.parametrize('output', data_output) @pytest.mark.parametrize('alpha', [.1, .5, .9]) -@handle_fixture_timeout_errors def test_regressor_quantile(output, client, listen_port, alpha): X, y, w, dX, dy, dw = _create_data('regression', output=output) @@ -381,8 +366,9 @@ def test_regressor_quantile(output, client, listen_port, alpha): np.testing.assert_allclose(q1, alpha, atol=0.2) np.testing.assert_allclose(q2, alpha, atol=0.2) + client.close() + -@handle_fixture_timeout_errors def test_regressor_local_predict(client, listen_port): X, y, _, dX, dy, dw = _create_data('regression', output='array') @@ -403,10 +389,11 @@ def test_regressor_local_predict(client, listen_port): assert_eq(s1, s2) assert_eq(p1, p2) + client.close() + @pytest.mark.parametrize('output', ['array', 'dataframe']) @pytest.mark.parametrize('group', [None, group_sizes]) -@handle_fixture_timeout_errors def test_ranker(output, client, listen_port, group): if os.getenv('TASK', '') == 'gpu': @@ -432,11 +419,16 @@ def test_ranker(output, client, listen_port, group): assert dcor > 0.6 assert spearmanr(rnkvec_dask, rnkvec_local).correlation > 0.9 + client.close() + @pytest.mark.parametrize('output', ['array', 'dataframe']) @pytest.mark.parametrize('group', [None, group_sizes]) -@handle_fixture_timeout_errors def test_ranker_local_predict(output, client, listen_port, group): + + if os.getenv('TASK', '') == 'gpu': + pytest.skip('Ranker fails to run with GPU interface') + X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output, group=group) dask_ranker = dlgbm.DaskLGBMRanker(time_out=5, local_listen_port=listen_port, @@ -449,6 +441,8 @@ def test_ranker_local_predict(output, client, listen_port, group): # distributed and to-local scores should be the same. assert_eq(rnkvec_dask, rnkvec_local) + client.close() + def test_find_open_port_works(): worker_ip = '127.0.0.1' From d1d4abf2f1472e9653b62d5afc62986bff6e14bc Mon Sep 17 00:00:00 2001 From: ffineis Date: Wed, 20 Jan 2021 13:19:43 -0600 Subject: [PATCH 11/16] address requested changes on 1/20/20 --- python-package/lightgbm/dask.py | 2 +- tests/python_package_test/test_dask.py | 21 +-------------------- 2 files changed, 2 insertions(+), 21 deletions(-) diff --git a/python-package/lightgbm/dask.py b/python-package/lightgbm/dask.py index c149656e05d1..f086140818e4 100644 --- a/python-package/lightgbm/dask.py +++ b/python-package/lightgbm/dask.py @@ -183,7 +183,7 @@ def _train(client, data, label, params, model_factory, sample_weight=None, group if you have a 100-record dataset with `group = [10, 20, 40, 10, 10]`, that means that you have 5 groups, where the first 10 records are in the first group, records 11-30 are the second group, etc. """ - # Split arrays/dataframes into parts. Arrange parts into dicts to enforce co-locality + # Split arrays/dataframes into parts. Arrange parts into tuples to enforce co-locality data_parts = _split_to_parts(data, is_matrix=True) label_parts = _split_to_parts(label, is_matrix=False) weight_parts = _split_to_parts(sample_weight, is_matrix=False) if sample_weight is not None else None diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index 09965d92692d..b35b42a1e833 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -260,25 +260,6 @@ def test_training_does_not_fail_on_port_conflicts(client): client.close() -@pytest.mark.parametrize('output', data_output) -@pytest.mark.parametrize('centers', data_centers) -def test_classifier_proba(output, centers, client, listen_port): - X, y, w, dX, dy, dw = _create_data('classification', output=output, centers=centers) - - dask_classifier = dlgbm.DaskLGBMClassifier(time_out=5, local_listen_port=listen_port) - dask_classifier = dask_classifier.fit(dX, dy, sample_weight=dw, client=client) - p1 = dask_classifier.predict_proba(dX) - p1 = p1.compute() - - local_classifier = lightgbm.LGBMClassifier() - local_classifier.fit(X, y, sample_weight=w) - p2 = local_classifier.predict_proba(X) - - assert_eq(p1, p2, atol=0.3) - - client.close() - - def test_classifier_local_predict(client, listen_port): X, y, w, dX, dy, dw = _create_data('classification', output='array') @@ -386,8 +367,8 @@ def test_regressor_local_predict(client, listen_port): s2 = dask_regressor.to_local().score(X, y) # Predictions and scores should be the same - assert_eq(s1, s2) assert_eq(p1, p2) + assert_eq(s1, s2) client.close() From 56bb2c9e1da297178c57076795b50f75f037ff49 Mon Sep 17 00:00:00 2001 From: ffineis Date: Wed, 20 Jan 2021 15:46:17 -0600 Subject: [PATCH 12/16] skip test_dask for all GPU tasks --- tests/python_package_test/test_dask.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index 901a158ea2a1..27f4552b4972 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -33,7 +33,8 @@ group_sizes = [5, 5, 5, 10, 10, 10, 20, 20, 20, 50, 50] pytestmark = [ - pytest.mark.skipif(os.getenv('TASK', '') == 'mpi', reason='Fails to run with MPI interface') + pytest.mark.skipif(os.getenv('TASK', '') == 'mpi', reason='Fails to run with MPI interface'), + pytest.mark.skipif(os.getenv('TASK', '') == 'gpu', reason='Fails to run with GPU interface') ] @@ -380,9 +381,6 @@ def test_regressor_local_predict(client, listen_port): @pytest.mark.parametrize('group', [None, group_sizes]) def test_ranker(output, client, listen_port, group): - if os.getenv('TASK', '') == 'gpu': - pytest.skip('Ranker fails to run with GPU interface') - X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output, group=group) # -- use many trees + leaves to overfit, help ensure that dask data-parallel strategy matches that of @@ -410,9 +408,6 @@ def test_ranker(output, client, listen_port, group): @pytest.mark.parametrize('group', [None, group_sizes]) def test_ranker_local_predict(output, client, listen_port, group): - if os.getenv('TASK', '') == 'gpu': - pytest.skip('Ranker fails to run with GPU interface') - X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output, group=group) dask_ranker = dlgbm.DaskLGBMRanker(time_out=5, local_listen_port=listen_port, tree_learner='data', From fc985bf1faee680d9850017880bf8404551634ae Mon Sep 17 00:00:00 2001 From: ffineis Date: Thu, 21 Jan 2021 19:20:41 -0600 Subject: [PATCH 13/16] address changes requested on 1/21/21 --- python-package/lightgbm/dask.py | 23 +++++++----- tests/python_package_test/test_dask.py | 48 ++++++++++++-------------- 2 files changed, 37 insertions(+), 34 deletions(-) diff --git a/python-package/lightgbm/dask.py b/python-package/lightgbm/dask.py index d3d2e9f9f197..0714f0c36b7c 100644 --- a/python-package/lightgbm/dask.py +++ b/python-package/lightgbm/dask.py @@ -179,10 +179,11 @@ def _train(client, data, label, params, model_factory, sample_weight=None, group model_factory : lightgbm.LGBMClassifier, lightgbm.LGBMRegressor, or lightgbm.LGBMRanker class sample_weight : array-like of shape = [n_samples] or None, optional (default=None) Weights of training data. - group : array-like where sum(group) = [n_samples] or None for non-ranking objectives (default=None) - Group/query data, only used for ranking task. sum(group) = n_samples. For example, - if you have a 100-record dataset with `group = [10, 20, 40, 10, 10]`, that means that you have - 5 groups, where the first 10 records are in the first group, records 11-30 are the second group, etc. + group : array-like or None, optional (default=None) + Group/query data. + Only used in the learning-to-rank task. + sum(group) = n_samples. + For example, if you have a 100-document dataset with ``group = [10, 20, 40, 10, 10, 10]``, that means that you have 6 groups, where the first 10 records are in the first group, records 11-30 are in the second group, etc. """ params = deepcopy(params) @@ -302,13 +303,13 @@ def _predict(model, data, proba=False, dtype=np.float32, **kwargs): Parameters ---------- - model : local lightgbm.LGBM[Classifier/Regressor/Ranker] + model : lightgbm.LGBMClassifier, lightgbm.LGBMRegressor, or lightgbm.LGBMRanker class data : dask array of shape = [n_samples, n_features] Input feature matrix. proba : bool - Should method return results of predict_proba (proba == True) or predict (proba == False) + Should method return results of predict_proba (proba == True) or predict (proba == False). dtype : np.dtype - Dtype of the output + Dtype of the output. kwargs : other parameters passed to predict or predict_proba method """ if isinstance(data, dd._Frame): @@ -331,7 +332,8 @@ def _fit(self, model_factory, X, y=None, sample_weight=None, group=None, client= client = default_client() params = self.get_params(True) - model = _train(client, X, y, params, model_factory, sample_weight, group, **kwargs) + model = _train(client, data=X, label=y, params=params, model_factory=model_factory, + sample_weight=sample_weight, group=group, **kwargs) self.set_params(**model.get_params()) self._copy_extra_params(model, self) @@ -406,8 +408,11 @@ def to_local(self): class DaskLGBMRanker(_LGBMModel, LGBMRanker): """Docstring is inherited from the lightgbm.LGBMRanker.""" - def fit(self, X, y=None, sample_weight=None, group=None, client=None, **kwargs): + def fit(self, X, y=None, sample_weight=None, init_score=None, group=None, client=None, **kwargs): """Docstring is inherited from the lightgbm.LGBMRanker.fit.""" + if init_score is not None: + raise RuntimeError('init_score is not currently supported in lightgbm.dask') + return self._fit(LGBMRanker, X=X, y=y, sample_weight=sample_weight, group=group, client=client, **kwargs) fit.__doc__ = LGBMRanker.fit.__doc__ diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index 27f4552b4972..18fb47a2f07b 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -1,9 +1,6 @@ # coding: utf-8 -"""Tests for lightgbm.dask module +"""Tests for lightgbm.dask module""" -An easy way to run these tests is from the (python) docker container. -Also see lightgbm-dask-testing repo: https://github.com/jameslamb/lightgbm-dask-testing -""" import itertools import os import socket @@ -55,33 +52,33 @@ def _make_ranking(n_samples=100, n_features=20, n_informative=5, gmax=2, Parameters ---------- - n_samples: int (default=100) - Total number of documents (records) in the dataset - n_features : int (default=20) - Total number of features in the dataset - n_informative : int (default=5) - Number of features that are "informative" for ranking, as they are bias + beta * unif(min=y, max=y+1), + n_samples : int, optional (default=100) + Total number of documents (records) in the dataset. + n_features : int, optional (default=20) + Total number of features in the dataset. + n_informative : int, optional (default=5) + Number of features that are "informative" for ranking, as they are bias + beta * y where bias and beta are standard normal variates. If this is greater than n_features, the dataset will have n_features features, all will be informative. group : array-like, optional (default=None) 1-d array or list of group sizes. When `group` is specified, this overrides n_samples, random_gs, and avg_gs by simply creating groups with sizes group[0], ..., group[-1]. - gmax : int (default=2) + gmax : int, optional (default=2) Maximum graded relevance value for creating relevance/target vector. If you set this to 2, for example, all documents in a group will have relevance scores of either 0, 1, or 2. - random_gs : bool (default=False) + random_gs : bool, optional (default=False) True will make group sizes ~ Poisson(avg_gs), False will make group sizes == avg_gs. - avg_gs : int (default=10) - Average number of documents (records) in each group + avg_gs : int, optional (default=10) + Average number of documents (records) in each group. Returns ---------- - X : 2-d np.ndarray of shape = [n_samples (or np.sum(group), n_features] - Input feature matrix for ranking objective + X : 2-d np.ndarray of shape = [n_samples (or np.sum(group)), n_features] + Input feature matrix for ranking objective. y : 1-d np.array of shape = [n_samples (or np.sum(group))] - integer-graded relevance scores + Integer-graded relevance scores. group_ids: 1-d np.array of shape = [n_samples (or np.sum(group))] - vector of group ids, each value indicates to which group each record belongs + Array of group ids, each value indicates to which group each record belongs. """ rnd_generator = check_random_state(random_state) @@ -112,7 +109,7 @@ def _make_ranking(n_samples=100, n_features=20, n_informative=5, gmax=2, group_id_vec = np.append(group_id_vec, [gid] * gsize) gid += 1 - y_vec, group_id_vec = y_vec[0:n_samples], group_id_vec[0:n_samples] + y_vec, group_id_vec = y_vec[:n_samples], group_id_vec[:n_samples] # build feature data, X. Transform first few into informative features. n_informative = max(min(n_features, n_informative), 0) @@ -129,7 +126,7 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=50, **kwargs) X, y, g = _make_ranking(n_samples=n_samples, random_state=42, **kwargs) rnd = np.random.RandomState(42) w = rnd.rand(X.shape[0]) * 0.01 - g_rle = np.array([sum([1 for _ in grp]) for _, grp in itertools.groupby(g)]) + g_rle = np.array([len(list(grp)) for _, grp in itertools.groupby(g)]) if output == 'dataframe': @@ -138,7 +135,8 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=50, **kwargs) X = X_df.copy() X_df = X_df.assign(y=y, g=g, w=w) - # set_index ensures partitions are based on group id. See https://bit.ly/3pAWyNw. + # set_index ensures partitions are based on group id. + # See https://stackoverflow.com/questions/49532824/dask-dataframe-split-partitions-based-on-a-column-or-function. X_df.set_index('g', inplace=True) dX = dd.from_pandas(X_df, chunksize=chunk_size) @@ -156,7 +154,7 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=50, **kwargs) # ranking arrays: one chunk per group. Each chunk must include all columns. p = X.shape[1] - dX, dy, dw, dg = list(), list(), list(), list() + dX, dy, dw, dg = [], [], [], [] for g_idx, rhs in enumerate(np.cumsum(g_rle)): lhs = rhs - g_rle[g_idx] dX.append(da.from_array(X[lhs:rhs, :], chunks=(rhs - lhs, p))) @@ -170,7 +168,7 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=50, **kwargs) dg = da.concatenate(dg, axis=0) else: - raise ValueError('ranking data creation only supported for Dask arrays and dataframes') + raise ValueError('Ranking data creation only supported for Dask arrays and dataframes') return X, y, w, g_rle, dX, dy, dw, dg @@ -383,8 +381,8 @@ def test_ranker(output, client, listen_port, group): X, y, w, g, dX, dy, dw, dg = _create_ranking_data(output=output, group=group) - # -- use many trees + leaves to overfit, help ensure that dask data-parallel strategy matches that of - # -- serial learner. See https://github.com/microsoft/LightGBM/issues/3292#issuecomment-671288210. + # use many trees + leaves to overfit, help ensure that dask data-parallel strategy matches that of + # serial learner. See https://github.com/microsoft/LightGBM/issues/3292#issuecomment-671288210. dask_ranker = dlgbm.DaskLGBMRanker(time_out=5, local_listen_port=listen_port, tree_learner_type='data_parallel', n_estimators=50, num_leaves=20, seed=42, min_child_samples=1) dask_ranker = dask_ranker.fit(dX, dy, sample_weight=dw, group=dg, client=client) From 8c33f56f94350643faa29b6540971e4c0c8d1ae9 Mon Sep 17 00:00:00 2001 From: Frank Fineis Date: Thu, 21 Jan 2021 22:40:53 -0600 Subject: [PATCH 14/16] issubclass instead of __qualname__ Co-authored-by: Nikita Titov --- python-package/lightgbm/dask.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python-package/lightgbm/dask.py b/python-package/lightgbm/dask.py index 0714f0c36b7c..94947603a44a 100644 --- a/python-package/lightgbm/dask.py +++ b/python-package/lightgbm/dask.py @@ -133,7 +133,7 @@ def _train_part(params, model_factory, list_of_parts, worker_address_to_port, re } params.update(network_params) - is_ranker = model_factory.__qualname__ == 'LGBMRanker' + is_ranker = issubclass(model_factory, LGBMRanker) # Concatenate many parts into one parts = tuple(zip(*list_of_parts)) From 872c578eeff3dabc30fc06dc5fea26e9c8c94827 Mon Sep 17 00:00:00 2001 From: Frank Fineis Date: Thu, 21 Jan 2021 22:42:49 -0600 Subject: [PATCH 15/16] parity in group docstr with sklearn Co-authored-by: Nikita Titov --- python-package/lightgbm/dask.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python-package/lightgbm/dask.py b/python-package/lightgbm/dask.py index 94947603a44a..4251af78f8bc 100644 --- a/python-package/lightgbm/dask.py +++ b/python-package/lightgbm/dask.py @@ -183,7 +183,8 @@ def _train(client, data, label, params, model_factory, sample_weight=None, group Group/query data. Only used in the learning-to-rank task. sum(group) = n_samples. - For example, if you have a 100-document dataset with ``group = [10, 20, 40, 10, 10, 10]``, that means that you have 6 groups, where the first 10 records are in the first group, records 11-30 are in the second group, etc. + For example, if you have a 100-document dataset with ``group = [10, 20, 40, 10, 10, 10]``, that means that you have 6 groups, + where the first 10 records are in the first group, records 11-30 are in the second group, records 31-70 are in the third group, etc. """ params = deepcopy(params) From 53988ecbc5066ea09ccbb8216ec88ae760e14bb4 Mon Sep 17 00:00:00 2001 From: Frank Fineis Date: Thu, 21 Jan 2021 22:43:39 -0600 Subject: [PATCH 16/16] _make_ranking docstr cleanup Co-authored-by: Nikita Titov --- tests/python_package_test/test_dask.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index 18fb47a2f07b..960e1a56da63 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -72,12 +72,12 @@ def _make_ranking(n_samples=100, n_features=20, n_informative=5, gmax=2, Average number of documents (records) in each group. Returns - ---------- + ------- X : 2-d np.ndarray of shape = [n_samples (or np.sum(group)), n_features] Input feature matrix for ranking objective. y : 1-d np.array of shape = [n_samples (or np.sum(group))] Integer-graded relevance scores. - group_ids: 1-d np.array of shape = [n_samples (or np.sum(group))] + group_ids : 1-d np.array of shape = [n_samples (or np.sum(group))] Array of group ids, each value indicates to which group each record belongs. """ rnd_generator = check_random_state(random_state)