Skip to content

Commit

Permalink
Specify shape in prediction contrib and interaction. (#6614)
Browse files Browse the repository at this point in the history
  • Loading branch information
trivialfis authored Jan 25, 2021
1 parent 8942c98 commit 4bf23c2
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 92 deletions.
13 changes: 10 additions & 3 deletions doc/tutorials/dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,21 @@ For prediction, pass the ``output`` returned by ``train`` into ``xgb.dask.predic
.. code-block:: python
prediction = xgb.dask.predict(client, output, dtrain)
# Or equivalently, pass ``output['booster']``:
prediction = xgb.dask.predict(client, output['booster'], dtrain)
Or equivalently, pass ``output['booster']``:
Eliminating the construction of DaskDMatrix is also possible, this can make the
computation a bit faster when meta information like ``base_margin`` is not needed:

.. code-block:: python
prediction = xgb.dask.predict(client, output['booster'], dtrain)
prediction = xgb.dask.predict(client, output, X)
# Use inplace version.
prediction = xgb.dask.inplace_predict(client, output, X)
Here ``prediction`` is a dask ``Array`` object containing predictions from model.
Here ``prediction`` is a dask ``Array`` object containing predictions from model if input
is a ``DaskDMatrix`` or ``da.Array``. For ``dd.DataFrame``, the return value is a
``dd.Series``.

Alternatively, XGBoost also implements the Scikit-Learn interface with ``DaskXGBClassifier``
and ``DaskXGBRegressor``. See ``xgboost/demo/dask`` for more examples.
Expand Down
4 changes: 2 additions & 2 deletions python-package/xgboost/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def _check_call(ret):
raise XGBoostError(py_str(_LIB.XGBGetLastError()))


def ctypes2numpy(cptr, length, dtype):
def ctypes2numpy(cptr, length, dtype) -> np.ndarray:
"""Convert a ctypes pointer array to a numpy array."""
NUMPY_TO_CTYPES_MAPPING = {
np.float32: ctypes.c_float,
Expand Down Expand Up @@ -1553,7 +1553,7 @@ def predict(self,
ctypes.byref(preds)))
preds = ctypes2numpy(preds, length.value, np.float32)
if pred_leaf:
preds = preds.astype(np.int32)
preds = preds.astype(np.int32, copy=False)
nrow = data.num_row()
if preds.size != nrow and preds.size % nrow == 0:
chunk_size = int(preds.size / nrow)
Expand Down
115 changes: 85 additions & 30 deletions python-package/xgboost/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -964,22 +964,21 @@ async def _predict_async(
pred_contribs: bool,
approx_contribs: bool,
pred_interactions: bool,
validate_features: bool
validate_features: bool,
) -> _DaskCollection:
if isinstance(model, Booster):
booster = model
elif isinstance(model, dict):
booster = model['booster']
booster = model["booster"]
else:
raise TypeError(_expect([Booster, dict], type(model)))
if not isinstance(data, (DaskDMatrix, da.Array, dd.DataFrame)):
raise TypeError(_expect([DaskDMatrix, da.Array, dd.DataFrame],
type(data)))
raise TypeError(_expect([DaskDMatrix, da.Array, dd.DataFrame], type(data)))

def mapped_predict(partition: Any, is_df: bool) -> Any:
worker = distributed.get_worker()
with config.config_context(**global_config):
booster.set_param({'nthread': worker.nthreads})
booster.set_param({"nthread": worker.nthreads})
m = DMatrix(data=partition, missing=missing, nthread=worker.nthreads)
predt = booster.predict(
data=m,
Expand All @@ -988,15 +987,16 @@ def mapped_predict(partition: Any, is_df: bool) -> Any:
pred_contribs=pred_contribs,
approx_contribs=approx_contribs,
pred_interactions=pred_interactions,
validate_features=validate_features
validate_features=validate_features,
)
if is_df:
if lazy_isinstance(partition, 'cudf', 'core.dataframe.DataFrame'):
if lazy_isinstance(partition, "cudf", "core.dataframe.DataFrame"):
import cudf
predt = cudf.DataFrame(predt, columns=['prediction'])
predt = cudf.DataFrame(predt, columns=["prediction"])
else:
predt = DataFrame(predt, columns=['prediction'])
predt = DataFrame(predt, columns=["prediction"])
return predt

# Predict on dask collection directly.
if isinstance(data, (da.Array, dd.DataFrame)):
return await _direct_predict_impl(client, data, mapped_predict)
Expand All @@ -1010,16 +1010,16 @@ def mapped_predict(partition: Any, is_df: bool) -> Any:
meta_names = data.meta_names

def dispatched_predict(
worker_id: int, list_of_orders: List[int], list_of_parts: _DataParts
) -> List[Tuple[Tuple["dask.delayed.Delayed", int], int]]:
'''Perform prediction on each worker.'''
LOGGER.debug('Predicting on %d', worker_id)
worker_id: int, list_of_orders: List[int], list_of_parts: _DataParts
) -> List[Tuple[List[Union["dask.delayed.Delayed", int]], int]]:
"""Perform prediction on each worker."""
LOGGER.debug("Predicting on %d", worker_id)
with config.config_context(**global_config):
worker = distributed.get_worker()
list_of_parts = _get_worker_parts_ordered(meta_names, list_of_parts)
predictions = []

booster.set_param({'nthread': worker.nthreads})
booster.set_param({"nthread": worker.nthreads})
for i, parts in enumerate(list_of_parts):
(data, _, _, base_margin, _, _, _) = parts
order = list_of_orders[i]
Expand All @@ -1029,7 +1029,7 @@ def dispatched_predict(
feature_names=feature_names,
feature_types=feature_types,
missing=missing,
nthread=worker.nthreads
nthread=worker.nthreads,
)
predt = booster.predict(
data=local_part,
Expand All @@ -1038,19 +1038,51 @@ def dispatched_predict(
pred_contribs=pred_contribs,
approx_contribs=approx_contribs,
pred_interactions=pred_interactions,
validate_features=validate_features
validate_features=validate_features,
)
columns = 1 if len(predt.shape) == 1 else predt.shape[1]
ret = ((dask.delayed(predt), columns), order) # pylint: disable=no-member
if pred_contribs and predt.size != local_part.num_row():
assert len(predt.shape) in (2, 3)
if len(predt.shape) == 2:
groups = 1
columns = predt.shape[1]
else:
groups = predt.shape[1]
columns = predt.shape[2]
# pylint: disable=no-member
ret = (
[dask.delayed(predt), groups, columns],
order,
)
elif pred_interactions and predt.size != local_part.num_row():
assert len(predt.shape) in (3, 4)
if len(predt.shape) == 3:
groups = 1
columns = predt.shape[1]
else:
groups = predt.shape[1]
columns = predt.shape[2]
# pylint: disable=no-member
ret = (
[dask.delayed(predt), groups, columns],
order,
)
else:
assert len(predt.shape) == 1 or len(predt.shape) == 2
columns = 1 if len(predt.shape) == 1 else predt.shape[1]
# pylint: disable=no-member
ret = (
[dask.delayed(predt), columns],
order,
)
predictions.append(ret)

return predictions

def dispatched_get_shape(
worker_id: int, list_of_orders: List[int], list_of_parts: _DataParts
) -> List[Tuple[int, int]]:
'''Get shape of data in each worker.'''
LOGGER.debug('Get shape on %d', worker_id)
"""Get shape of data in each worker."""
LOGGER.debug("Get shape on %d", worker_id)
list_of_parts = _get_worker_parts_ordered(meta_names, list_of_parts)
shapes = []
for i, parts in enumerate(list_of_parts):
Expand All @@ -1061,18 +1093,22 @@ def dispatched_get_shape(
async def map_function(
func: Callable[[int, List[int], _DataParts], Any]
) -> List[Any]:
'''Run function for each part of the data.'''
"""Run function for each part of the data."""
futures = []
workers_address = list(worker_map.keys())
for wid, worker_addr in enumerate(workers_address):
worker_addr = workers_address[wid]
list_of_parts = worker_map[worker_addr]
list_of_orders = [partition_order[part.key] for part in list_of_parts]

f = client.submit(func, worker_id=wid,
list_of_orders=list_of_orders,
list_of_parts=list_of_parts,
pure=True, workers=[worker_addr])
f = client.submit(
func,
worker_id=wid,
list_of_orders=list_of_orders,
list_of_parts=list_of_parts,
pure=True,
workers=[worker_addr],
)
assert isinstance(f, distributed.client.Future)
futures.append(f)
# Get delayed objects
Expand All @@ -1091,10 +1127,24 @@ async def map_function(
# See https://docs.dask.org/en/latest/array-creation.html
arrays = []
for i, shape in enumerate(shapes):
arrays.append(da.from_delayed(
results[i][0], shape=(shape[0],)
if results[i][1] == 1 else (shape[0], results[i][1]),
dtype=numpy.float32))
if pred_contribs:
out_shape = (
(shape[0], results[i][2])
if results[i][1] == 1
else (shape[0], results[i][1], results[i][2])
)
elif pred_interactions:
out_shape = (
(shape[0], results[i][2], results[i][2])
if results[i][1] == 1
else (shape[0], results[i][1], results[i][2])
)
else:
out_shape = (shape[0],) if results[i][1] == 1 else (shape[0], results[i][1])
arrays.append(
da.from_delayed(results[i][0], shape=out_shape, dtype=numpy.float32)
)

predictions = await da.concatenate(arrays, axis=0)
return predictions

Expand All @@ -1115,7 +1165,9 @@ def predict(
.. note::
Only default prediction mode is supported right now.
Using ``inplace_predict `` might be faster when meta information like
``base_margin`` is not needed. For other parameters, please see
``Booster.predict``.
.. versionadded:: 1.0.0
Expand All @@ -1136,6 +1188,9 @@ def predict(
Returns
-------
prediction: dask.array.Array/dask.dataframe.Series
When input data is ``dask.array.Array`` or ``DaskDMatrix``, the return value is an
array, when input data is ``dask.dataframe.DataFrame``, return value is
``dask.dataframe.Series``
'''
_assert_dask_support()
Expand Down
Loading

0 comments on commit 4bf23c2

Please sign in to comment.