diff --git a/CHANGELOG.md b/CHANGELOG.md index 03b86938e..3ef9f7d72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased ### Added - Notebook `forecast_interpretation.ipynb` with forecast decomposition ([#1220](https://github.com/tinkoff-ai/etna/pull/1220)) -- +- Class `Tune` for hyperparameter optimization within existing pipeline ([#1200](https://github.com/tinkoff-ai/etna/pull/1200)) - - ### Changed diff --git a/etna/auto/__init__.py b/etna/auto/__init__.py index b0fe30c9f..4ec66790f 100644 --- a/etna/auto/__init__.py +++ b/etna/auto/__init__.py @@ -1,2 +1,3 @@ from etna.auto.auto import Auto +from etna.auto.auto import Tune from etna.auto.pool import Pool diff --git a/etna/auto/auto.py b/etna/auto/auto.py index c7286fc3b..4cf30e3d8 100644 --- a/etna/auto/auto.py +++ b/etna/auto/auto.py @@ -7,8 +7,17 @@ import pandas as pd from hydra_slayer import get_from_params +from optuna.distributions import CategoricalDistribution +from optuna.distributions import DiscreteUniformDistribution +from optuna.distributions import IntLogUniformDistribution +from optuna.distributions import IntUniformDistribution +from optuna.distributions import LogUniformDistribution +from optuna.distributions import UniformDistribution +from optuna.samplers import BaseSampler +from optuna.samplers import TPESampler from optuna.storages import BaseStorage from optuna.storages import RDBStorage +from optuna.trial import FrozenTrial from optuna.trial import Trial from typing_extensions import Protocol @@ -26,7 +35,7 @@ from etna.metrics import Sign from etna.metrics.utils import MetricAggregationStatistics from etna.metrics.utils import aggregate_metrics_df -from etna.pipeline import Pipeline +from etna.pipeline.base import BasePipeline class _Callback(Protocol): @@ -35,7 +44,7 @@ def __call__(self, metrics_df: pd.DataFrame, forecast_df: pd.DataFrame, fold_inf class _Initializer(Protocol): - def __call__(self, pipeline: Pipeline) -> None: + def __call__(self, pipeline: BasePipeline) -> None: ... @@ -51,7 +60,7 @@ def fit( initializer: Optional[_Initializer] = None, callback: Optional[_Callback] = None, **optuna_kwargs, - ) -> Pipeline: + ) -> BasePipeline: """ Start automatic pipeline selection. @@ -82,47 +91,19 @@ def summary(self) -> pd.DataFrame: pass @abstractmethod - def top_k(self, k: int = 5) -> List[Pipeline]: - """ - Get top k pipelines. - - Parameters - ---------- - k: - number of pipelines to return - """ + def _summary(self, study: List[FrozenTrial]) -> List[dict]: + """Get information from trial summary.""" pass - @staticmethod @abstractmethod - def objective( - ts: TSDataset, - target_metric: Metric, - metric_aggregation: MetricAggregationStatistics, - metrics: List[Metric], - backtest_params: dict, - initializer: Optional[_Initializer] = None, - callback: Optional[_Callback] = None, - ) -> Callable[[Trial], float]: + def top_k(self, k: int = 5) -> List[BasePipeline]: """ - Optuna objective wrapper. + Get top k pipelines. Parameters ---------- - ts: - tsdataset to fit on - target_metric: - metric to optimize - metric_aggregation: - aggregation method for per-segment metrics - metrics: - list of metrics to compute - backtest_params: - custom parameters for backtest instead of default backtest parameters - initializer: - is called before each pipeline backtest, can be used to initialize loggers - callback: - is called after each pipeline backtest, can be used to log extra metrics + k: + number of pipelines to return """ pass @@ -193,14 +174,10 @@ def summary(self) -> pd.DataFrame: study = self._optuna.study.get_trials() - study_params = [ - {**trial.user_attrs, "pipeline": get_from_params(**trial.user_attrs["pipeline"]), "state": trial.state} - for trial in study - ] - + study_params = self._summary(study=study) return pd.DataFrame(study_params) - def top_k(self, k: int = 5) -> List[Pipeline]: + def top_k(self, k: int = 5) -> List[BasePipeline]: """ Get top k pipelines. @@ -227,7 +204,7 @@ def __init__( metric_aggregation: MetricAggregationStatistics = "mean", backtest_params: Optional[dict] = None, experiment_folder: Optional[str] = None, - pool: Union[Pool, List[Pipeline]] = Pool.default, + pool: Union[Pool, List[BasePipeline]] = Pool.default, runner: Optional[AbstractRunner] = None, storage: Optional[BaseStorage] = None, metrics: Optional[List[Metric]] = None, @@ -276,7 +253,7 @@ def fit( initializer: Optional[_Initializer] = None, callback: Optional[_Callback] = None, **optuna_kwargs, - ) -> Pipeline: + ) -> BasePipeline: """ Start automatic pipeline selection. @@ -316,6 +293,14 @@ def fit( return get_from_params(**self._optuna.study.best_trial.user_attrs["pipeline"]) + def _summary(self, study: List[FrozenTrial]) -> List[dict]: + """Get information from trial summary.""" + study_params = [ + {**trial.user_attrs, "pipeline": get_from_params(**trial.user_attrs["pipeline"]), "state": trial.state} + for trial in study + ] + return study_params + @staticmethod def objective( ts: TSDataset, @@ -345,6 +330,11 @@ def objective( is called before each pipeline backtest, can be used to initialize loggers callback: is called after each pipeline backtest, can be used to log extra metrics + + Returns + ------- + objective: + function that runs specified trial and returns its evaluated score """ def _objective(trial: Trial) -> float: @@ -353,7 +343,7 @@ def _objective(trial: Trial) -> float: pipeline_config.update(trial.relative_params) pipeline_config.update(trial.params) - pipeline: Pipeline = get_from_params(**pipeline_config) + pipeline: BasePipeline = get_from_params(**pipeline_config) if initializer is not None: initializer(pipeline=pipeline) @@ -387,3 +377,214 @@ def _init_optuna(self): sampler=ConfigSampler(configs=pool_), ) return optuna + + +class Tune(AutoBase): + """Automatic tuning of custom pipeline.""" + + def __init__( + self, + pipeline: BasePipeline, + target_metric: Metric, + horizon: int, + metric_aggregation: MetricAggregationStatistics = "mean", + backtest_params: Optional[dict] = None, + experiment_folder: Optional[str] = None, + runner: Optional[AbstractRunner] = None, + storage: Optional[BaseStorage] = None, + metrics: Optional[List[Metric]] = None, + sampler: Optional[BaseSampler] = None, + ): + """ + Initialize Auto class. + + Parameters + ---------- + pipeline: + pipeline to optimize + target_metric: + metric to optimize + horizon: + horizon to forecast for + metric_aggregation: + aggregation method for per-segment metrics + backtest_params: + custom parameters for backtest instead of default backtest parameters + experiment_folder: + folder to store experiment results and name for optuna study + runner: + runner to use for distributed training + storage: + optuna storage to use + metrics: + list of metrics to compute + """ + super().__init__( + target_metric=target_metric, + horizon=horizon, + metric_aggregation=metric_aggregation, + backtest_params=backtest_params, + experiment_folder=experiment_folder, + runner=runner, + storage=storage, + metrics=metrics, + ) + self.pipeline = pipeline + if sampler is None: + self.sampler: BaseSampler = TPESampler() + else: + self.sampler = sampler + + def fit( + self, + ts: TSDataset, + timeout: Optional[int] = None, + n_trials: Optional[int] = None, + initializer: Optional[_Initializer] = None, + callback: Optional[_Callback] = None, + **optuna_kwargs, + ) -> BasePipeline: + """ + Start automatic pipeline tuning. + + Parameters + ---------- + ts: + tsdataset to fit on + timeout: + timeout for optuna. N.B. this is timeout for each worker + n_trials: + number of trials for optuna. N.B. this is number of trials for each worker + initializer: + is called before each pipeline backtest, can be used to initialize loggers + callback: + is called after each pipeline backtest, can be used to log extra metrics + optuna_kwargs: + additional kwargs for optuna :py:meth:`optuna.study.Study.optimize` + """ + if self._optuna is None: + self._optuna = self._init_optuna() + + self._optuna.tune( + objective=self.objective( + ts=ts, + pipeline=self.pipeline, + target_metric=self.target_metric, + metric_aggregation=self.metric_aggregation, + metrics=self.metrics, + backtest_params=self.backtest_params, + initializer=initializer, + callback=callback, + ), + runner=self.runner, + n_trials=n_trials, + timeout=timeout, + **optuna_kwargs, + ) + + return get_from_params(**self._optuna.study.best_trial.params) + + def _summary(self, study: List[FrozenTrial]) -> List[dict]: + """Get information from trial summary.""" + study_params = [ + {**trial.user_attrs, "pipeline": get_from_params(**trial.user_attrs), "state": trial.state} + for trial in study + ] + return study_params + + @staticmethod + def objective( + ts: TSDataset, + pipeline: BasePipeline, + target_metric: Metric, + metric_aggregation: MetricAggregationStatistics, + metrics: List[Metric], + backtest_params: dict, + initializer: Optional[_Initializer] = None, + callback: Optional[_Callback] = None, + ) -> Callable[[Trial], float]: + """ + Optuna objective wrapper. + + Parameters + ---------- + ts: + tsdataset to fit on + pipeline: + pipeline to tune + target_metric: + metric to optimize + metric_aggregation: + aggregation method for per-segment metrics + metrics: + list of metrics to compute + backtest_params: + custom parameters for backtest instead of default backtest parameters + initializer: + is called before each pipeline backtest, can be used to initialize loggers + callback: + is called after each pipeline backtest, can be used to log extra metrics + + Returns + ------- + objective: + function that runs specified trial and returns its evaluated score + """ + dict_of_distrs = { + UniformDistribution: lambda x: ("suggest_uniform", {"low": x.low, "high": x.high}), + LogUniformDistribution: lambda x: ("suggest_loguniform", {"low": x.low, "high": x.high}), + DiscreteUniformDistribution: lambda x: ( + "suggest_discrete_uniform", + {"low": x.low, "high": x.high, "q": x.q}, + ), + IntUniformDistribution: lambda x: ("suggest_int", {"low": x.low, "high": x.high, "step": x.step}), + IntLogUniformDistribution: lambda x: ( + "suggest_int", + {"low": x.low, "high": x.high, "step": x.step}, + ), + CategoricalDistribution: lambda x: ("suggest_categorical", {"choices": x.choices}), + } + + def _objective(trial: Trial) -> float: + + params_to_tune = pipeline.params_to_tune() + + # using received optuna.distribution objects to call corresponding trial.suggest_xxx + params_suggested = {} + for param_name, param_distr in params_to_tune.items(): + method_name, method_kwargs = dict_of_distrs[type(param_distr)](param_distr) + method = getattr(trial, method_name) + params_suggested[param_name] = method(param_name, **method_kwargs) + + # create pipeline instance with the parameters to try + pipeline_trial_params: BasePipeline = pipeline.set_params(**params_suggested) + + if initializer is not None: + initializer(pipeline=pipeline_trial_params) + + metrics_df, forecast_df, fold_info_df = pipeline_trial_params.backtest( + ts, metrics=metrics, **backtest_params + ) + + if callback is not None: + callback(metrics_df=metrics_df, forecast_df=forecast_df, fold_info_df=fold_info_df) + + aggregated_metrics = aggregate_metrics_df(metrics_df) + + for metric in aggregated_metrics: + trial.set_user_attr(metric, aggregated_metrics[metric]) + + return aggregated_metrics[f"{target_metric.name}_{metric_aggregation}"] + + return _objective + + def _init_optuna(self): + """Initialize optuna.""" + # sampler receives no hyperparameters here and optimizes only the hyperparameters suggested in objective + optuna = Optuna( + direction="maximize" if self.target_metric.greater_is_better else "minimize", + study_name=self.experiment_folder, + storage=self.storage, + sampler=self.sampler, + ) + return optuna diff --git a/etna/core/mixins.py b/etna/core/mixins.py index 02c4f4986..4bae58291 100644 --- a/etna/core/mixins.py +++ b/etna/core/mixins.py @@ -11,11 +11,14 @@ from typing import Dict from typing import Sequence from typing import Tuple +from typing import TypeVar from typing import cast import hydra_slayer from sklearn.base import BaseEstimator +TMixin = TypeVar("TMixin", bound="BaseMixin") + class BaseMixin: """Base mixin for etna classes.""" @@ -134,7 +137,7 @@ def _update_nested_structure(cls, structure: Any, keys: Sequence[str], value: An return new_structure - def set_params(self, **params: dict) -> "BaseMixin": + def set_params(self: TMixin, **params: dict) -> TMixin: """Return new object instance with modified parameters. Method also allows to change parameters of nested objects within the current object. diff --git a/tests/test_auto/conftest.py b/tests/test_auto/conftest.py new file mode 100644 index 000000000..105204111 --- /dev/null +++ b/tests/test_auto/conftest.py @@ -0,0 +1,27 @@ +from os import unlink + +import pytest +from optuna.storages import RDBStorage +from typing_extensions import Literal +from typing_extensions import NamedTuple + +from etna.models import NaiveModel +from etna.pipeline import Pipeline + + +@pytest.fixture() +def optuna_storage(): + yield RDBStorage("sqlite:///test.db") + unlink("test.db") + + +@pytest.fixture() +def trials(): + class Trial(NamedTuple): + user_attrs: dict + state: Literal["COMPLETE", "RUNNING", "PENDING"] = "COMPLETE" + + return [ + Trial(user_attrs={"pipeline": pipeline.to_dict(), "SMAPE_median": i}) + for i, pipeline in enumerate((Pipeline(NaiveModel(j), horizon=7) for j in range(10))) + ] diff --git a/tests/test_auto/test_auto.py b/tests/test_auto/test_auto.py index 4061aec55..369ddaa7a 100644 --- a/tests/test_auto/test_auto.py +++ b/tests/test_auto/test_auto.py @@ -1,14 +1,11 @@ -from os import unlink +from functools import partial from unittest.mock import MagicMock from unittest.mock import patch import pytest -from optuna.storages import RDBStorage from typing_extensions import Literal -from typing_extensions import NamedTuple from etna.auto import Auto -from etna.auto.auto import AutoBase from etna.auto.auto import _Callback from etna.auto.auto import _Initializer from etna.metrics import MAE @@ -16,24 +13,6 @@ from etna.pipeline import Pipeline -@pytest.fixture() -def optuna_storage(): - yield RDBStorage("sqlite:///test.db") - unlink("test.db") - - -@pytest.fixture() -def trials(): - class Trial(NamedTuple): - user_attrs: dict - state: Literal["COMPLETE", "RUNNING", "PENDING"] = "COMPLETE" - - return [ - Trial(user_attrs={"pipeline": pipeline.to_dict(), "SMAPE_median": i}) - for i, pipeline in enumerate((Pipeline(NaiveModel(j), horizon=7) for j in range(10))) - ] - - def test_objective( example_tsds, target_metric=MAE(), @@ -125,7 +104,9 @@ def test_summary( auto=MagicMock(), ): auto._optuna.study.get_trials.return_value = trials - df_summary = AutoBase.summary(self=auto) + auto._summary = partial(Auto._summary, self=auto) # essential for summary + df_summary = Auto.summary(self=auto) + assert len(df_summary) == len(trials) assert list(df_summary["SMAPE_median"].values) == [trial.user_attrs["SMAPE_median"] for trial in trials] @@ -136,13 +117,16 @@ def test_top_k( k, auto=MagicMock(), ): - auto._optuna.study.get_trials.return_value = trials auto.target_metric.name = "SMAPE" auto.metric_aggregation = "median" auto.target_metric.greater_is_better = False - df_summary = AutoBase.summary(self=auto) + auto._optuna.study.get_trials.return_value = trials + auto._summary = partial(Auto._summary, self=auto) + df_summary = Auto.summary(self=auto) + auto.summary = MagicMock(return_value=df_summary) - top_k = AutoBase.top_k(auto, k=k) + + top_k = Auto.top_k(auto, k=k) assert len(top_k) == k assert [pipeline.model.lag for pipeline in top_k] == [i for i in range(k)] # noqa C416 diff --git a/tests/test_auto/test_tune.py b/tests/test_auto/test_tune.py new file mode 100644 index 000000000..320b0985f --- /dev/null +++ b/tests/test_auto/test_tune.py @@ -0,0 +1,212 @@ +from functools import partial +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest +from optuna.distributions import CategoricalDistribution +from optuna.distributions import DiscreteUniformDistribution +from optuna.distributions import IntLogUniformDistribution +from optuna.distributions import IntUniformDistribution +from optuna.distributions import LogUniformDistribution +from optuna.distributions import UniformDistribution +from typing_extensions import Literal + +from etna.auto import Tune +from etna.auto.auto import _Callback +from etna.auto.auto import _Initializer +from etna.metrics import MAE +from etna.models import NaiveModel +from etna.models import SimpleExpSmoothingModel +from etna.pipeline import AutoRegressivePipeline +from etna.pipeline import Pipeline +from etna.pipeline.hierarchical_pipeline import HierarchicalPipeline +from etna.reconciliation import BottomUpReconciliator +from etna.transforms import AddConstTransform +from etna.transforms import DateFlagsTransform + + +def test_objective( + example_tsds, + target_metric=MAE(), + metric_aggregation: Literal["mean"] = "mean", + metrics=[MAE()], + backtest_params={}, + initializer=MagicMock(spec=_Initializer), + callback=MagicMock(spec=_Callback), + pipeline=Pipeline(NaiveModel()), +): + trial = MagicMock() + _objective = Tune.objective( + ts=example_tsds, + pipeline=pipeline, + target_metric=target_metric, + metric_aggregation=metric_aggregation, + metrics=metrics, + backtest_params=backtest_params, + initializer=initializer, + callback=callback, + ) + aggregated_metric = _objective(trial) + assert isinstance(aggregated_metric, float) + + initializer.assert_called_once() + callback.assert_called_once() + + +def test_fit( + ts=MagicMock(), + tune=MagicMock(), + timeout=4, + n_trials=2, + initializer=MagicMock(), + callback=MagicMock(), +): + Tune.fit( + self=tune, + ts=ts, + timeout=timeout, + n_trials=n_trials, + initializer=initializer, + callback=callback, + ) + + tune._optuna.tune.assert_called_with( + objective=tune.objective.return_value, runner=tune.runner, n_trials=n_trials, timeout=timeout + ) + + +@patch("optuna.samplers.TPESampler", return_value=MagicMock()) +@patch("etna.auto.auto.Optuna", return_value=MagicMock()) +def test_init_optuna( + optuna_mock, + sampler_mock, + auto=MagicMock(), +): + auto.configure_mock(sampler=sampler_mock) + Tune._init_optuna(self=auto) + + optuna_mock.assert_called_once_with( + direction="maximize", study_name=auto.experiment_folder, storage=auto.storage, sampler=sampler_mock + ) + + +@pytest.mark.parametrize( + "params, model", + [ + ({"model.smoothing_level": UniformDistribution(0.1, 1)}, SimpleExpSmoothingModel()), + ({"model.smoothing_level": LogUniformDistribution(0.1, 1)}, SimpleExpSmoothingModel()), + ({"model.smoothing_level": DiscreteUniformDistribution(0.1, 1, 0.1)}, SimpleExpSmoothingModel()), + ({"model.lag": IntUniformDistribution(1, 5)}, NaiveModel()), + ({"model.lag": IntLogUniformDistribution(1, 5)}, NaiveModel()), + ({"model.lag": CategoricalDistribution((1, 2, 3))}, NaiveModel()), + ({"model.smoothing_level": UniformDistribution(1, 5)}, SimpleExpSmoothingModel()), + ], +) +def test_can_handle_distribution_type(example_tsds, optuna_storage, params, model): + with patch.object(Pipeline, "params_to_tune", return_value=params): + pipeline = Pipeline(model, horizon=7) + tune = Tune(pipeline, MAE(), metric_aggregation="median", horizon=7, storage=optuna_storage) + tune.fit(ts=example_tsds, n_trials=2) + + +def test_can_handle_transforms(example_tsds, optuna_storage): + params = {"transforms.0.value": IntUniformDistribution(0, 17), "transforms.1.value": IntUniformDistribution(0, 17)} + with patch.object(Pipeline, "params_to_tune", return_value=params): + pipeline = Pipeline( + NaiveModel(), + [AddConstTransform(in_column="target", value=8), AddConstTransform(in_column="target", value=4)], + horizon=7, + ) + tune = Tune(pipeline, MAE(), metric_aggregation="median", horizon=7, storage=optuna_storage) + tune.fit(ts=example_tsds, n_trials=2) + + +def test_summary( + trials, + tune=MagicMock(), +): + tune._optuna.study.get_trials.return_value = trials + tune._summary = partial(Tune._summary, self=tune) # essential for summary + df_summary = Tune.summary(self=tune) + + assert len(df_summary) == len(trials) + assert list(df_summary["SMAPE_median"].values) == [trial.user_attrs["SMAPE_median"] for trial in trials] + + +@pytest.mark.parametrize("k", [1, 2, 3]) +def test_top_k( + trials, + k, + tune=MagicMock(), +): + tune.target_metric.name = "SMAPE" + tune.metric_aggregation = "median" + tune.target_metric.greater_is_better = False + + tune._optuna.study.get_trials.return_value = trials + tune._summary = partial(Tune._summary, self=tune) + df_summary = Tune.summary(self=tune) + + tune.summary = MagicMock(return_value=df_summary) + + top_k = Tune.top_k(tune, k=k) + assert len(top_k) == k + assert [pipeline["pipeline"].model.lag for pipeline in top_k] == [i for i in range(k)] # noqa C416 + + +@pytest.mark.parametrize( + "pipeline", + [ + (Pipeline(NaiveModel(1), horizon=7)), + (AutoRegressivePipeline(model=NaiveModel(1), horizon=7, transforms=[])), + (AutoRegressivePipeline(model=NaiveModel(1), horizon=7, transforms=[DateFlagsTransform()])), + ], +) +def test_tune_run(example_tsds, optuna_storage, pipeline): + tune = Tune( + pipeline, + MAE(), + metric_aggregation="median", + horizon=7, + storage=optuna_storage, + ) + tune.fit(ts=example_tsds, n_trials=2) + + assert len(tune._optuna.study.trials) == 2 + assert len(tune.summary()) == 2 + assert len(tune.top_k()) == 2 + assert len(tune.top_k(k=1)) == 1 + + +@pytest.mark.parametrize( + "pipeline", + [ + ( + HierarchicalPipeline( + reconciliator=BottomUpReconciliator(target_level="total", source_level="market"), + model=NaiveModel(1), + transforms=[], + horizon=1, + ) + ), + ], +) +def test_tune_hierarchical_run( + market_level_constant_hierarchical_ts, + optuna_storage, + pipeline, +): + tune = Tune( + pipeline, + MAE(), + metric_aggregation="median", + horizon=7, + backtest_params={"n_folds": 2}, + storage=optuna_storage, + ) + tune.fit(ts=market_level_constant_hierarchical_ts, n_trials=2) + + assert len(tune._optuna.study.trials) == 2 + assert len(tune.summary()) == 2 + assert len(tune.top_k()) == 2 + assert len(tune.top_k(k=1)) == 1