From aac0fe12f1d104f9c2f82a0a108be5f7447510da Mon Sep 17 00:00:00 2001 From: Maxim Zherelo <60392282+brsnw250@users.noreply.github.com> Date: Thu, 27 Jul 2023 13:16:12 +0300 Subject: [PATCH] [BUG] CLI forecast command fails with pipeline ensembles (#1331) * added tests * handle ensemble horizons * added tests * updated changelog --- CHANGELOG.md | 2 +- etna/commands/forecast_command.py | 12 ++++-- tests/test_commands/conftest.py | 36 ++++++++++++++++ tests/test_commands/test_backtest.py | 24 +++++++---- tests/test_commands/test_forecast.py | 64 +++++++++++++++++++++------- 5 files changed, 110 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 96cbbf56c..a82ca4a06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Remove upper limitation on version of numba ([#1321](https://github.com/tinkoff-ai/etna/pull/1321)) ### Fixed -- +- Pipeline ensembles fail in `etna forecast` CLI ([#1331](https://github.com/tinkoff-ai/etna/pull/1331)) - - Fix performance of `DeepARModel` and `TFTModel` ([#1322](https://github.com/tinkoff-ai/etna/pull/1322)) - `mrmr` feature selection working with categoricals ([#1311](https://github.com/tinkoff-ai/etna/pull/1311)) diff --git a/etna/commands/forecast_command.py b/etna/commands/forecast_command.py index d1b863ed3..f22294e4b 100644 --- a/etna/commands/forecast_command.py +++ b/etna/commands/forecast_command.py @@ -44,6 +44,14 @@ def compute_horizon(horizon: int, forecast_params: Dict[str, Any], tsdataset: TS return horizon +def update_horizon(pipeline_configs: Dict[str, Any], forecast_params: Dict[str, Any], tsdataset: TSDataset): + """Update the ``horizon`` parameter in the pipeline config if ``start_timestamp`` is set.""" + for config in pipeline_configs.get("pipelines", [pipeline_configs]): + horizon: int = config["horizon"] # type: ignore + horizon = compute_horizon(horizon=horizon, forecast_params=forecast_params, tsdataset=tsdataset) + config["horizon"] = horizon # type: ignore + + def filter_forecast(forecast_ts: TSDataset, forecast_params: Dict[str, Any]) -> TSDataset: """Filter out forecasts before `start_timestamp` if `start_timestamp` presented in `forecast_params`..""" if "start_timestamp" in forecast_params: @@ -122,9 +130,7 @@ def forecast( tsdataset = TSDataset(df=df_timeseries, freq=freq, df_exog=df_exog, known_future=k_f) - horizon: int = pipeline_configs["horizon"] # type: ignore - horizon = compute_horizon(horizon=horizon, forecast_params=forecast_params, tsdataset=tsdataset) - pipeline_configs["horizon"] = horizon # type: ignore + update_horizon(pipeline_configs=pipeline_configs, forecast_params=forecast_params, tsdataset=tsdataset) pipeline_args = remove_params(params=pipeline_configs, to_remove=ADDITIONAL_PIPELINE_PARAMETERS) pipeline: Pipeline = hydra_slayer.get_from_params(**pipeline_args) diff --git a/tests/test_commands/conftest.py b/tests/test_commands/conftest.py index e1c48f13f..0663ab8a8 100644 --- a/tests/test_commands/conftest.py +++ b/tests/test_commands/conftest.py @@ -50,6 +50,42 @@ def base_pipeline_with_context_size_yaml_path(): tmp.close() +@pytest.fixture +def base_ensemble_yaml_path(): + tmp = NamedTemporaryFile("w") + tmp.write( + """ + _target_: etna.ensembles.VotingEnsemble + pipelines: + - _target_: etna.pipeline.Pipeline + horizon: 4 + model: + _target_: etna.models.SeasonalMovingAverageModel + seasonality: 4 + window: 1 + transforms: [] + - _target_: etna.pipeline.Pipeline + horizon: 4 + model: + _target_: etna.models.SeasonalMovingAverageModel + seasonality: 7 + window: 2 + transforms: [] + - _target_: etna.pipeline.Pipeline + horizon: 4 + model: + _target_: etna.models.SeasonalMovingAverageModel + seasonality: 7 + window: 7 + transforms: [] + context_size: 49 + """ + ) + tmp.flush() + yield Path(tmp.name) + tmp.close() + + @pytest.fixture def elementary_linear_model_pipeline(): tmp = NamedTemporaryFile("w") diff --git a/tests/test_commands/test_backtest.py b/tests/test_commands/test_backtest.py index 0c253bc6e..8ab83e2b3 100644 --- a/tests/test_commands/test_backtest.py +++ b/tests/test_commands/test_backtest.py @@ -67,14 +67,16 @@ def backtest_with_stride_yaml_path(): tmp.close() -def test_dummy_run(base_pipeline_yaml_path, base_backtest_yaml_path, base_timeseries_path): +@pytest.mark.parametrize("pipeline_path_name", ("base_pipeline_yaml_path", "base_ensemble_yaml_path")) +def test_dummy_run(pipeline_path_name, base_backtest_yaml_path, base_timeseries_path, request): tmp_output = TemporaryDirectory() tmp_output_path = Path(tmp_output.name) + pipeline_path = request.getfixturevalue(pipeline_path_name) run( [ "etna", "backtest", - str(base_pipeline_yaml_path), + str(pipeline_path), str(base_backtest_yaml_path), str(base_timeseries_path), "D", @@ -85,16 +87,18 @@ def test_dummy_run(base_pipeline_yaml_path, base_backtest_yaml_path, base_timese assert Path.exists(tmp_output_path / file_name) +@pytest.mark.parametrize("pipeline_path_name", ("base_pipeline_yaml_path", "base_ensemble_yaml_path")) def test_dummy_run_with_exog( - base_pipeline_yaml_path, base_backtest_yaml_path, base_timeseries_path, base_timeseries_exog_path + pipeline_path_name, base_backtest_yaml_path, base_timeseries_path, base_timeseries_exog_path, request ): tmp_output = TemporaryDirectory() tmp_output_path = Path(tmp_output.name) + pipeline_path = request.getfixturevalue(pipeline_path_name) run( [ "etna", "backtest", - str(base_pipeline_yaml_path), + str(pipeline_path), str(base_backtest_yaml_path), str(base_timeseries_path), "D", @@ -126,16 +130,18 @@ def test_forecast_format(base_pipeline_yaml_path, base_backtest_yaml_path, base_ @pytest.mark.parametrize( - "backtest_config_path_name,expected", + "pipeline_path_name,backtest_config_path_name,expected", ( - ("backtest_with_folds_estimation_yaml_path", 24), - ("backtest_with_stride_yaml_path", 1), + ("base_pipeline_with_context_size_yaml_path", "backtest_with_folds_estimation_yaml_path", 24), + ("base_ensemble_yaml_path", "backtest_with_folds_estimation_yaml_path", 12), + ("base_pipeline_with_context_size_yaml_path", "backtest_with_stride_yaml_path", 1), ), ) def test_backtest_estimate_n_folds( - base_pipeline_with_context_size_yaml_path, backtest_config_path_name, base_timeseries_path, expected, request + pipeline_path_name, backtest_config_path_name, base_timeseries_path, expected, request ): backtest_config_path = request.getfixturevalue(backtest_config_path_name) + pipeline_path = request.getfixturevalue(pipeline_path_name) tmp_output = TemporaryDirectory() tmp_output_path = Path(tmp_output.name) @@ -143,7 +149,7 @@ def test_backtest_estimate_n_folds( [ "etna", "backtest", - str(base_pipeline_with_context_size_yaml_path), + str(pipeline_path), str(backtest_config_path), str(base_timeseries_path), "D", diff --git a/tests/test_commands/test_forecast.py b/tests/test_commands/test_forecast.py index cf0ddaa1e..c95ec7b6e 100644 --- a/tests/test_commands/test_forecast.py +++ b/tests/test_commands/test_forecast.py @@ -2,12 +2,17 @@ from subprocess import run from tempfile import NamedTemporaryFile +import hydra_slayer import numpy as np import pandas as pd import pytest +from omegaconf import OmegaConf +from etna.commands.forecast_command import ADDITIONAL_PIPELINE_PARAMETERS from etna.commands.forecast_command import compute_horizon from etna.commands.forecast_command import filter_forecast +from etna.commands.forecast_command import update_horizon +from etna.commands.utils import remove_params from etna.datasets import TSDataset @@ -59,14 +64,16 @@ def base_forecast_with_folds_estimation_omegaconf_path(): tmp.close() -def test_dummy_run_with_exog(base_pipeline_yaml_path, base_timeseries_path, base_timeseries_exog_path): +@pytest.mark.parametrize("pipeline_path_name", ("base_pipeline_yaml_path", "base_ensemble_yaml_path")) +def test_dummy_run_with_exog(pipeline_path_name, base_timeseries_path, base_timeseries_exog_path, request): tmp_output = NamedTemporaryFile("w") tmp_output_path = Path(tmp_output.name) + pipeline_path = request.getfixturevalue(pipeline_path_name) run( [ "etna", "forecast", - str(base_pipeline_yaml_path), + str(pipeline_path), str(base_timeseries_path), "D", str(tmp_output_path), @@ -103,16 +110,18 @@ def test_dummy_run(base_pipeline_yaml_path, base_timeseries_path): assert len(df_output) == 2 * 4 +@pytest.mark.parametrize("pipeline_path_name", ("base_pipeline_yaml_path", "base_ensemble_yaml_path")) def test_run_with_predictive_intervals( - base_pipeline_yaml_path, base_timeseries_path, base_timeseries_exog_path, base_forecast_omegaconf_path + pipeline_path_name, base_timeseries_path, base_timeseries_exog_path, base_forecast_omegaconf_path, request ): tmp_output = NamedTemporaryFile("w") tmp_output_path = Path(tmp_output.name) + pipeline_path = request.getfixturevalue(pipeline_path_name) run( [ "etna", "forecast", - str(base_pipeline_yaml_path), + str(pipeline_path), str(base_timeseries_path), "D", str(tmp_output_path), @@ -213,24 +222,45 @@ def test_filter_forecast(forecast_params, expected, example_tsds): @pytest.mark.parametrize( - "model_pipeline", - [ - "elementary_linear_model_pipeline", - "elementary_boosting_model_pipeline", - ], + "forecast_params,pipeline_path_name,expected", + ( + ({"start_timestamp": "2020-04-10"}, "base_pipeline_with_context_size_yaml_path", 4), + ({"start_timestamp": "2020-04-12"}, "base_pipeline_with_context_size_yaml_path", 6), + ({"start_timestamp": "2020-04-11"}, "base_ensemble_yaml_path", 5), + ), +) +def test_update_horizon(pipeline_path_name, forecast_params, example_tsds, expected, request): + pipeline_path = request.getfixturevalue(pipeline_path_name) + pipeline_conf = OmegaConf.to_object(OmegaConf.load(pipeline_path)) + + update_horizon(pipeline_configs=pipeline_conf, forecast_params=forecast_params, tsdataset=example_tsds) + + pipeline_conf = remove_params(params=pipeline_conf, to_remove=ADDITIONAL_PIPELINE_PARAMETERS) + pipeline = hydra_slayer.get_from_params(**pipeline_conf) + + assert pipeline.horizon == expected + + +@pytest.mark.parametrize( + "pipeline_path_name", + ("base_pipeline_with_context_size_yaml_path", "base_ensemble_yaml_path"), ) def test_forecast_start_timestamp( - model_pipeline, base_timeseries_path, base_timeseries_exog_path, start_timestamp_forecast_omegaconf_path, request + pipeline_path_name, + base_timeseries_path, + base_timeseries_exog_path, + start_timestamp_forecast_omegaconf_path, + request, ): tmp_output = NamedTemporaryFile("w") tmp_output_path = Path(tmp_output.name) - model_pipeline = request.getfixturevalue(model_pipeline) + pipeline_path = request.getfixturevalue(pipeline_path_name) run( [ "etna", "forecast", - str(model_pipeline), + str(pipeline_path), str(base_timeseries_path), "D", str(tmp_output_path), @@ -240,24 +270,28 @@ def test_forecast_start_timestamp( ) df_output = pd.read_csv(tmp_output_path) - assert len(df_output) == 3 * 2 # 3 predictions for 2 segments + assert len(df_output) == 4 * 2 # 4 predictions for 2 segments assert df_output["timestamp"].min() == "2021-09-10" # start_timestamp assert not np.any(df_output.isna().values) +@pytest.mark.parametrize("pipeline_path_name", ("base_pipeline_with_context_size_yaml_path", "base_ensemble_yaml_path")) def test_forecast_estimate_n_folds( - base_pipeline_with_context_size_yaml_path, + pipeline_path_name, base_forecast_with_folds_estimation_omegaconf_path, base_timeseries_path, base_timeseries_exog_path, + request, ): tmp_output = NamedTemporaryFile("w") tmp_output_path = Path(tmp_output.name) + pipeline_path = request.getfixturevalue(pipeline_path_name) + run( [ "etna", "forecast", - str(base_pipeline_with_context_size_yaml_path), + str(pipeline_path), str(base_timeseries_path), "D", str(tmp_output_path),