Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented proper work with multiple threads #1361

Merged
merged 17 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion deselected_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,6 @@ gpu:
- neighbors/tests/test_neighbors.py::test_neigh_predictions_algorithm_agnosticity[float64-KNeighborsRegressor-50-500-l2-1000-5-100]
- neighbors/tests/test_neighbors.py::test_neigh_predictions_algorithm_agnosticity[float64-KNeighborsRegressor-100-1000-l2-1000-5-100]
# failing due to numeric/code error
- ensemble/tests/test_bagging.py::test_parallel_classification
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

- linear_model/tests/test_common.py::test_balance_property[42-False-LogisticRegressionCV]
- sklearn/manifold/tests/test_t_sne.py::test_n_iter_without_progress
- model_selection/tests/test_search.py::test_searchcv_raise_warning_with_non_finite_score[RandomizedSearchCV-specialized_params1-False]
Expand Down
20 changes: 20 additions & 0 deletions sklearnex/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,23 @@ def get_patch_map():
import sklearn.neighbors as neighbors_module
import sklearn.svm as svm_module

if sklearn_check_version("1.2.1"):
olegkkruglov marked this conversation as resolved.
Show resolved Hide resolved
import sklearn.utils.parallel as parallel_module
else:
import sklearn.utils.fixes as parallel_module

# Classes and functions for patching
from ._config import config_context as config_context_sklearnex
from ._config import get_config as get_config_sklearnex
from ._config import set_config as set_config_sklearnex

if sklearn_check_version("1.2.1"):
from .utils.parallel import _FuncWrapper as _FuncWrapper_sklearnex
else:
from .utils.parallel import _FuncWrapperOld as _FuncWrapper_sklearnex

from .cluster import DBSCAN as DBSCAN_sklearnex

from .neighbors import KNeighborsClassifier as KNeighborsClassifier_sklearnex
from .neighbors import KNeighborsRegressor as KNeighborsRegressor_sklearnex
from .neighbors import LocalOutlierFactor as LocalOutlierFactor_sklearnex
Expand Down Expand Up @@ -226,6 +238,14 @@ def get_patch_map():
mapping["config_context"] = [
[(base_module, "config_context", config_context_sklearnex), None]
]

# Necessary for proper work with multiple threads
mapping["parallel.get_config"] = [
[(parallel_module, "get_config", get_config_sklearnex), None]
]
mapping["_funcwrapper"] = [
[(parallel_module, "_FuncWrapper", _FuncWrapper_sklearnex), None]
]
return mapping


Expand Down
43 changes: 17 additions & 26 deletions sklearnex/svm/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,39 +100,30 @@ def _compute_balanced_class_weight(self, y):
return recip_freq[le.transform(classes)]

def _fit_proba(self, X, y, sample_weight=None, queue=None):
from .._config import config_context, get_config

params = self.get_params()
params["probability"] = False
params["decision_function_shape"] = "ovr"
clf_base = self.__class__(**params)

# We use stock metaestimators below, so the only way
# to pass a queue is using config_context.
cfg = get_config()
cfg["target_offload"] = queue
with config_context(**cfg):
try:
olegkkruglov marked this conversation as resolved.
Show resolved Hide resolved
n_splits = 5
n_jobs = n_splits if queue is None or queue.sycl_device.is_cpu else 1
cv = StratifiedKFold(
n_splits=n_splits, shuffle=True, random_state=self.random_state
)
if sklearn_check_version("0.24"):
self.clf_prob = CalibratedClassifierCV(
clf_base, ensemble=False, cv=cv, method="sigmoid", n_jobs=n_jobs
)
else:
self.clf_prob = CalibratedClassifierCV(
clf_base, cv=cv, method="sigmoid"
)
self.clf_prob.fit(X, y, sample_weight)
except ValueError:
clf_base = clf_base.fit(X, y, sample_weight)
try:
n_splits = 5
n_jobs = n_splits if queue is None or queue.sycl_device.is_cpu else 1
cv = StratifiedKFold(
n_splits=n_splits, shuffle=True, random_state=self.random_state
)
if sklearn_check_version("0.24"):
self.clf_prob = CalibratedClassifierCV(
clf_base, cv="prefit", method="sigmoid"
clf_base, ensemble=False, cv=cv, method="sigmoid", n_jobs=n_jobs
)
self.clf_prob.fit(X, y, sample_weight)
else:
self.clf_prob = CalibratedClassifierCV(clf_base, cv=cv, method="sigmoid")
self.clf_prob.fit(X, y, sample_weight)
except ValueError:
clf_base = clf_base.fit(X, y, sample_weight)
self.clf_prob = CalibratedClassifierCV(
clf_base, cv="prefit", method="sigmoid"
)
self.clf_prob.fit(X, y, sample_weight)

def _save_attributes(self):
self.support_vectors_ = self._onedal_estimator.support_vectors_
Expand Down
2 changes: 1 addition & 1 deletion sklearnex/tests/test_memory_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
estimator, name = listing[0][0][2], listing[0][0][1]
if not isinstance(estimator, types.FunctionType):
if name not in ban_list:
if isinstance(estimator(), BaseEstimator):
if issubclass(estimator, BaseEstimator):
if hasattr(estimator, "fit"):
output_list.append(estimator)

Expand Down Expand Up @@ -168,7 +168,7 @@
elif isinstance(x, pd.core.frame.DataFrame):
x_train, x_test = x.iloc[train_index], x.iloc[test_index]
y_train, y_test = y.iloc[train_index], y.iloc[test_index]
# TODO: add parameters for all estimators to prevent

Check notice on line 171 in sklearnex/tests/test_memory_usage.py

View check run for this annotation

codefactor.io / CodeFactor

sklearnex/tests/test_memory_usage.py#L171

unresolved comment '# TODO: add parameters for all estimators to prevent' (C100)
# fallback to stock scikit-learn with default parameters
alg = estimator()
alg.fit(x_train, y_train)
Expand Down
50 changes: 50 additions & 0 deletions sklearnex/tests/test_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# ==============================================================================
# Copyright 2023 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import pytest

from sklearnex import config_context, patch_sklearn

patch_sklearn()

from sklearn.datasets import make_classification
from sklearn.ensemble import BaggingClassifier
from sklearn.svm import SVC

try:
import dpctl

dpctl_is_available = True
gpu_is_available = dpctl.has_gpu_devices()
except (ImportError, ModuleNotFoundError):
dpctl_is_available = False


@pytest.mark.skipif(
not dpctl_is_available or gpu_is_available,
ethanglaser marked this conversation as resolved.
Show resolved Hide resolved
reason="GPU device should not be available for this test "
"to see raised 'SyclQueueCreationError'. "
"'dpctl' module is required for test.",
)
def test_config_context_in_parallel():
x, y = make_classification(random_state=42)
try:
with config_context(target_offload="gpu"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
with config_context(target_offload="gpu"):
with config_context(target_offload="gpu", allow_fallback_to_host=False):

Copy link
Contributor

@ethanglaser ethanglaser Sep 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually maybe this modification isn't necessary, but I am a bit confused by the test - when would dpctl be available but no GPU? Thanks for adding the test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dpctl is not only for gpu devices. For example, CI has dpctl installed without gpu in azure pipelines used instances.

BaggingClassifier(SVC(), n_jobs=2).fit(x, y)
raise ValueError(
"'SyclQueueCreationError' wasn't raised " "for non-existing 'gpu' device"
)
except dpctl._sycl_queue.SyclQueueCreationError:
pass
59 changes: 59 additions & 0 deletions sklearnex/utils/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# ===============================================================================
# Copyright 2023 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

import warnings
from functools import update_wrapper

from .._config import config_context, get_config


class _FuncWrapper:
"""Load the global configuration before calling the function."""

def __init__(self, function):
self.function = function
update_wrapper(self, self.function)

def with_config(self, config):
self.config = config
return self

def __call__(self, *args, **kwargs):
config = getattr(self, "config", None)
if config is None:
warnings.warn(
"`sklearn.utils.parallel.delayed` should be used with "
"`sklearn.utils.parallel.Parallel` to make it possible to propagate "
"the scikit-learn configuration of the current thread to the "
"joblib workers.",
UserWarning,
)
config = {}
with config_context(**config):
return self.function(*args, **kwargs)

olegkkruglov marked this conversation as resolved.
Show resolved Hide resolved

class _FuncWrapperOld:
"""Load the global configuration before calling the function."""

def __init__(self, function):
self.function = function
self.config = get_config()
update_wrapper(self, self.function)

def __call__(self, *args, **kwargs):
with config_context(**self.config):
return self.function(*args, **kwargs)
Loading