Skip to content

Commit

Permalink
add pyarrow data engine
Browse files Browse the repository at this point in the history
  • Loading branch information
tvdboom committed Aug 15, 2023
1 parent eb248ea commit 77cad79
Show file tree
Hide file tree
Showing 217 changed files with 4,974 additions and 1,852 deletions.
105 changes: 63 additions & 42 deletions atom/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,28 +227,35 @@ class ATOMClassifier(BaseTransformer, ATOM):
e.g. `device="gpu"` to use the GPU. Read more in the
[user guide][accelerating-pipelines].
engine: str, default="sklearn"
Execution engine to use for the estimators. Refer to the
[user guide][accelerating-pipelines] for an explanation
regarding every choice. Choose from:
engine: dict or None, default=None
Execution engine to use for [data][data-acceleration] and
[models][model-acceleration]. The value should be a dictionary
with keys `data` and/or `models`, with their corresponding
choice as values. If None, the default options are selected.
Choose from:
- "sklearn" (only if device="cpu")
- "sklearnex"
- "cuml" (only if device="gpu")
- "data":
- "numpy" (default)
- "pyarrow"
- "modin"
- "models":
- "sklearn" (default)
- "sklearnex"
- "cuml"
backend: str, default="loky"
Parallelization backend. Choose from:
Parallelization backend. Read more in the
[user guide][parallel-execution]. Choose from:
- "loky": Single-node, process-based parallelism.
- "multiprocessing": Legacy single-node, process-based
parallelism. Less robust than 'loky'.
parallelism. Less robust than `loky`.
- "threading": Single-node, thread-based parallelism.
- "ray": Multi-node, process-based parallelism.
Selecting the ray backend also parallelizes the data using
[modin][], a multi-threading, drop-in replacement for pandas,
that uses Ray as backend.
verbose: int, default=0
Verbosity level of the class. Choose from:
Expand Down Expand Up @@ -320,7 +327,7 @@ def __init__(
holdout_size: SCALAR | None = None,
n_jobs: INT = 1,
device: str = "cpu",
engine: str = "sklearn",
engine: dict | None = None,
backend: str = "loky",
verbose: INT = 0,
warnings: bool | str = False,
Expand Down Expand Up @@ -446,28 +453,35 @@ class ATOMForecaster(BaseTransformer, ATOM):
e.g. `device="gpu"` to use the GPU. Read more in the
[user guide][accelerating-pipelines].
engine: str, default="sklearn"
Execution engine to use for the estimators. Refer to the
[user guide][accelerating-pipelines] for an explanation
regarding every choice. Choose from:
engine: dict or None, default=None
Execution engine to use for [data][data-acceleration] and
[models][model-acceleration]. The value should be a dictionary
with keys `data` and/or `models`, with their corresponding
choice as values. If None, the default options are selected.
Choose from:
- "data":
- "numpy" (default)
- "pyarrow"
- "modin"
- "sklearn" (only if device="cpu")
- "sklearnex"
- "cuml" (only if device="gpu")
- "models":
- "sklearn" (default)
- "sklearnex"
- "cuml"
backend: str, default="loky"
Parallelization backend. Choose from:
Parallelization backend. Read more in the
[user guide][parallel-execution]. Choose from:
- "loky": Single-node, process-based parallelism.
- "multiprocessing": Legacy single-node, process-based
parallelism. Less robust than 'loky'.
parallelism. Less robust than `loky`.
- "threading": Single-node, thread-based parallelism.
- "ray": Multi-node, process-based parallelism.
Selecting the ray backend also parallelizes the data using
[modin][], a multi-threading, drop-in replacement for pandas,
that uses Ray as backend.
verbose: int, default=0
Verbosity level of the class. Choose from:
Expand Down Expand Up @@ -532,7 +546,7 @@ def __init__(
holdout_size: SCALAR | None = None,
n_jobs: INT = 1,
device: str = "cpu",
engine: str = "sklearn",
engine: dict | None = None,
backend: str = "loky",
verbose: INT = 0,
warnings: bool | str = False,
Expand Down Expand Up @@ -668,28 +682,35 @@ class ATOMRegressor(BaseTransformer, ATOM):
e.g. `device="gpu"` to use the GPU. Read more in the
[user guide][accelerating-pipelines].
engine: str, default="sklearn"
Execution engine to use for the estimators. Refer to the
[user guide][accelerating-pipelines] for an explanation
regarding every choice. Choose from:
engine: dict or None, default=None
Execution engine to use for [data][data-acceleration] and
[models][model-acceleration]. The value should be a dictionary
with keys `data` and/or `models`, with their corresponding
choice as values. If None, the default options are selected.
Choose from:
- "data":
- "sklearn" (only if device="cpu")
- "sklearnex"
- "cuml" (only if device="gpu")
- "numpy" (default)
- "pyarrow"
- "modin"
- "models":
- "sklearn" (default)
- "sklearnex"
- "cuml"
backend: str, default="loky"
Parallelization backend. Choose from:
Parallelization backend. Read more in the
[user guide][parallel-execution]. Choose from:
- "loky": Single-node, process-based parallelism.
- "multiprocessing": Legacy single-node, process-based
parallelism. Less robust than 'loky'.
parallelism. Less robust than `loky`.
- "threading": Single-node, thread-based parallelism.
- "ray": Multi-node, process-based parallelism.
Selecting the ray backend also parallelizes the data using
[modin][], a multi-threading, drop-in replacement for pandas,
that uses Ray as backend.
verbose: int, default=0
Verbosity level of the class. Choose from:
Expand Down Expand Up @@ -760,7 +781,7 @@ def __init__(
holdout_size: SCALAR | None = None,
n_jobs: INT = 1,
device: str = "cpu",
engine: str = "sklearn",
engine: dict | None = None,
backend: str = "loky",
verbose: INT = 0,
warnings: bool | str = False,
Expand Down
22 changes: 15 additions & 7 deletions atom/atom.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from copy import deepcopy
from platform import machine, platform, python_build, python_version
from typing import Callable

from pyarrow.lib import ArrowInvalid
import dill as pickle
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -120,8 +120,10 @@ def __init__(
)
if "gpu" in self.device.lower():
self.log("GPU training enabled.", 1)
if self.engine != "sklearn":
self.log(f"Execution engine: {self.engine}.", 1)
if (data := self.engine.get("data")) != "numpy":
self.log(f"Data execution engine: {data}.", 1)
if (models := self.engine.get("models")) != "sklearn":
self.log(f"Models execution engine: {models}.", 1)
if self.backend == "ray" or self.n_jobs > 1:
self.log(f"Parallelization backend: {self.backend}", 1)
if self.experiment:
Expand Down Expand Up @@ -262,7 +264,7 @@ def n_categorical(self) -> int:
return len(self.categorical)

@property
def outliers(self) -> pd.series | None:
def outliers(self) -> SERIES | None:
"""Columns in training set with amount of outlier values."""
if not is_sparse(self.X):
z_scores = self.train.select_dtypes(include=["number"]).apply(stats.zscore)
Expand Down Expand Up @@ -905,9 +907,15 @@ def stats(self, _vb: INT = -2, /):
else:
nans = self.nans.sum()
n_categorical = self.n_categorical
outliers = self.outliers.sum()
try:
# Can fail for unhashable columns (e.g. multilabel with lists)
try: # Fails for pyarrow dtypes
outliers = self.outliers.sum()
except ArrowInvalid:
outliers = None
self.log(
"Unable to calculate the number of outlier values. "
"Incompatible operation with the pyarrrow data engine.", 3
)
try: # Can fail for unhashable columns (e.g. multilabel with lists)
duplicates = self.dataset.duplicated().sum()
except TypeError:
duplicates = None
Expand Down
38 changes: 25 additions & 13 deletions atom/basemodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,22 +107,34 @@ class BaseModel(BaseTransformer, BaseTracker, HTPlot, PredictionPlot, ShapPlot):
e.g. `device="gpu"` to use the GPU. Read more in the
[user guide][accelerating-pipelines].
engine: str, default="sklearn"
Execution engine to use for the estimators. Refer to the
[user guide][accelerating-pipelines] for an explanation
regarding every choice. Choose from:
engine: dict or None, default=None
Execution engine to use for [data][data-acceleration] and
[models][model-acceleration]. The value should be a dictionary
with keys `data` and/or `models`, with their corresponding
choice as values. If None, the default options are selected.
Choose from:
- "sklearn" (only if device="cpu")
- "sklearnex"
- "cuml" (only if device="gpu")
- "data":
- "numpy" (default)
- "pyarrow"
- "modin"
- "models":
- "sklearn" (default)
- "sklearnex"
- "cuml"
backend: str, default="loky"
Parallelization backend. Choose from:
Parallelization backend. Read more in the
[user guide][parallel-execution]. Choose from:
- "loky"
- "multiprocessing"
- "threading"
- "ray"
- "loky": Single-node, process-based parallelism.
- "multiprocessing": Legacy single-node, process-based
parallelism. Less robust than `loky`.
- "threading": Single-node, thread-based parallelism.
- "ray": Multi-node, process-based parallelism.
verbose: int, default=0
Verbosity level of the class. Choose from:
Expand Down Expand Up @@ -164,7 +176,7 @@ def __init__(
metric: ClassMap | None = None,
n_jobs: INT = 1,
device: str = "cpu",
engine: str = "sklearn",
engine: dict | None = None,
backend: str = "loky",
verbose: INT = 0,
warnings: bool | str = False,
Expand Down
2 changes: 1 addition & 1 deletion atom/baserunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ def get_sample_weight(self, dataset: str = "train") -> SERIES:
)

weights = compute_sample_weight("balanced", y=getattr(self, dataset.lower()))
return bk.Series(weights, name="sample_weight").round(3)
return pd.Series(weights, name="sample_weight").round(3)

@composed(crash, method_to_log)
def merge(self, other: Any, /, suffix: str = "2"):
Expand Down
Loading

0 comments on commit 77cad79

Please sign in to comment.