-
Notifications
You must be signed in to change notification settings - Fork 80
Add refit
parameter into backtest
#1159
Merged
Merged
Changes from 12 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
649a52b
Add parallel processing in backtest, add tests on fold grouping
b5187f5
Add tests for refit
3105872
Update changelog
fb3df20
Fix problem with info_df in custom refit
0eb8026
Improve docs for refit
5a978a9
Move typing
0535b5a
Make small test fixes
20f2f4d
Fix parameterize tests
ad8be24
Add error in documentation of backest
c7c74a4
Add logging for learning, fix parallel forecasting
2691d56
Fix name of job_type
1c1df34
Fix tests for logger
8e21e0a
Fix docstring
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
import math | ||
from abc import abstractmethod | ||
from copy import deepcopy | ||
from enum import Enum | ||
|
@@ -15,6 +16,7 @@ | |
from joblib import Parallel | ||
from joblib import delayed | ||
from scipy.stats import norm | ||
from typing_extensions import TypedDict | ||
|
||
from etna.core import AbstractSaveable | ||
from etna.core import BaseMixin | ||
|
@@ -210,6 +212,7 @@ def backtest( | |
mode: str = "expand", | ||
aggregate_metrics: bool = False, | ||
n_jobs: int = 1, | ||
refit: Union[bool, int] = True, | ||
joblib_params: Optional[Dict[str, Any]] = None, | ||
forecast_params: Optional[Dict[str, Any]] = None, | ||
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: | ||
|
@@ -229,6 +232,15 @@ def backtest( | |
If True aggregate metrics above folds, return raw metrics otherwise | ||
n_jobs: | ||
Number of jobs to run in parallel | ||
refit: | ||
Determines how often pipeline should be retrained during iteration over folds. | ||
|
||
* If ``True``: pipeline is retrained on each fold. | ||
|
||
* If ``False``: pipeline is trained on the first fold, and on future folds trained pipeline is used. | ||
|
||
* If ``value: int``: pipeline is trained every ``value`` folds starting from the first. | ||
|
||
joblib_params: | ||
Additional parameters for :py:class:`joblib.Parallel` | ||
forecast_params: | ||
|
@@ -241,6 +253,15 @@ def backtest( | |
""" | ||
|
||
|
||
class FoldParallelGroup(TypedDict): | ||
"""Group for parallel fold processing.""" | ||
|
||
train_fold_number: int | ||
train_mask: FoldMask | ||
forecast_fold_numbers: List[int] | ||
forecast_masks: List[FoldMask] | ||
|
||
|
||
class BasePipeline(AbstractPipeline, BaseMixin): | ||
"""Base class for pipeline.""" | ||
|
||
|
@@ -522,21 +543,39 @@ def _compute_metrics(metrics: List[Metric], y_true: TSDataset, y_pred: TSDataset | |
metrics_values[metric.name] = metric(y_true=y_true, y_pred=y_pred) # type: ignore | ||
return metrics_values | ||
|
||
def _run_fold( | ||
def _fit_backtest_pipeline( | ||
self, | ||
ts: TSDataset, | ||
fold_number: int, | ||
) -> "BasePipeline": | ||
"""Fit pipeline for a given data in backtest.""" | ||
tslogger.start_experiment(job_type="training", group=str(fold_number)) | ||
pipeline = deepcopy(self) | ||
pipeline.fit(ts=ts) | ||
tslogger.finish_experiment() | ||
return pipeline | ||
|
||
def _forecast_backtest_pipeline( | ||
self, pipeline: "BasePipeline", ts: TSDataset, fold_number: int, forecast_params: Dict[str, Any] | ||
) -> TSDataset: | ||
"""Make a forecast with a given pipeline in backtest.""" | ||
tslogger.start_experiment(job_type="forecasting", group=str(fold_number)) | ||
forecast = pipeline.forecast(ts=ts, **forecast_params) | ||
tslogger.finish_experiment() | ||
return forecast | ||
|
||
def _process_fold_forecast( | ||
self, | ||
forecast: TSDataset, | ||
train: TSDataset, | ||
test: TSDataset, | ||
fold_number: int, | ||
mask: FoldMask, | ||
metrics: List[Metric], | ||
forecast_params: Dict[str, Any], | ||
) -> Dict[str, Any]: | ||
"""Run fit-forecast pipeline of model for one fold.""" | ||
"""Process forecast made for a fold.""" | ||
tslogger.start_experiment(job_type="crossval", group=str(fold_number)) | ||
|
||
pipeline = deepcopy(self) | ||
pipeline.fit(ts=train) | ||
forecast = pipeline.forecast(**forecast_params) | ||
fold: Dict[str, Any] = {} | ||
for stage_name, stage_df in zip(("train", "test"), (train, test)): | ||
fold[f"{stage_name}_timerange"] = {} | ||
|
@@ -620,6 +659,108 @@ def _prepare_fold_masks(self, ts: TSDataset, masks: Union[int, List[FoldMask]], | |
mask.validate_on_dataset(ts=ts, horizon=self.horizon) | ||
return masks | ||
|
||
@staticmethod | ||
def _make_backtest_fold_groups(masks: List[FoldMask], refit: Union[bool, int]) -> List[FoldParallelGroup]: | ||
"""Make groups of folds for backtest.""" | ||
if not refit: | ||
refit = len(masks) | ||
|
||
grouped_folds = [] | ||
num_groups = math.ceil(len(masks) / refit) | ||
for group_id in range(num_groups): | ||
train_fold_number = group_id * refit | ||
forecast_fold_numbers = [train_fold_number + i for i in range(refit) if train_fold_number + i < len(masks)] | ||
alex-hse-repository marked this conversation as resolved.
Show resolved
Hide resolved
|
||
cur_group: FoldParallelGroup = { | ||
"train_fold_number": train_fold_number, | ||
"train_mask": masks[train_fold_number], | ||
"forecast_fold_numbers": forecast_fold_numbers, | ||
"forecast_masks": [masks[i] for i in forecast_fold_numbers], | ||
} | ||
grouped_folds.append(cur_group) | ||
|
||
return grouped_folds | ||
|
||
def _run_all_folds( | ||
self, | ||
masks: List[FoldMask], | ||
ts: TSDataset, | ||
metrics: List[Metric], | ||
n_jobs: int, | ||
refit: Union[bool, int], | ||
joblib_params: Dict[str, Any], | ||
forecast_params: Dict[str, Any], | ||
) -> Dict[int, Any]: | ||
"""Run pipeline on all folds.""" | ||
fold_groups = self._make_backtest_fold_groups(masks=masks, refit=refit) | ||
|
||
with Parallel(n_jobs=n_jobs, **joblib_params) as parallel: | ||
# fitting | ||
fit_masks = [group["train_mask"] for group in fold_groups] | ||
fit_datasets = ( | ||
train for train, _ in self._generate_folds_datasets(ts=ts, masks=fit_masks, horizon=self.horizon) | ||
) | ||
pipelines = parallel( | ||
delayed(self._fit_backtest_pipeline)(ts=fit_ts, fold_number=fold_groups[group_idx]["train_fold_number"]) | ||
for group_idx, fit_ts in enumerate(fit_datasets) | ||
) | ||
|
||
# forecasting | ||
forecast_masks = [group["forecast_masks"] for group in fold_groups] | ||
forecast_datasets = ( | ||
( | ||
train | ||
for train, _ in self._generate_folds_datasets( | ||
ts=ts, masks=group_forecast_masks, horizon=self.horizon | ||
) | ||
) | ||
for group_forecast_masks in forecast_masks | ||
) | ||
forecasts_flat = parallel( | ||
delayed(self._forecast_backtest_pipeline)( | ||
ts=forecast_ts, | ||
pipeline=pipelines[group_idx], | ||
fold_number=fold_groups[group_idx]["forecast_fold_numbers"][idx], | ||
forecast_params=forecast_params, | ||
) | ||
for group_idx, group_forecast_datasets in enumerate(forecast_datasets) | ||
for idx, forecast_ts in enumerate(group_forecast_datasets) | ||
) | ||
|
||
# processing forecasts | ||
fold_process_train_datasets = ( | ||
alex-hse-repository marked this conversation as resolved.
Show resolved
Hide resolved
|
||
train for train, _ in self._generate_folds_datasets(ts=ts, masks=fit_masks, horizon=self.horizon) | ||
) | ||
fold_process_test_datasets = ( | ||
alex-hse-repository marked this conversation as resolved.
Show resolved
Hide resolved
|
||
( | ||
test | ||
for _, test in self._generate_folds_datasets( | ||
ts=ts, masks=group_forecast_masks, horizon=self.horizon | ||
) | ||
) | ||
for group_forecast_masks in forecast_masks | ||
) | ||
fold_results_flat = parallel( | ||
delayed(self._process_fold_forecast)( | ||
forecast=forecasts_flat[group_idx * refit + idx], | ||
train=train, | ||
test=test, | ||
fold_number=fold_groups[group_idx]["forecast_fold_numbers"][idx], | ||
mask=fold_groups[group_idx]["forecast_masks"][idx], | ||
metrics=metrics, | ||
) | ||
for group_idx, (train, group_fold_process_test_datasets) in enumerate( | ||
zip(fold_process_train_datasets, fold_process_test_datasets) | ||
) | ||
for idx, test in enumerate(group_fold_process_test_datasets) | ||
) | ||
|
||
results = { | ||
fold_number: fold_results_flat[group_idx * refit + idx] | ||
for group_idx in range(len(fold_groups)) | ||
for idx, fold_number in enumerate(fold_groups[group_idx]["forecast_fold_numbers"]) | ||
} | ||
return results | ||
|
||
def backtest( | ||
self, | ||
ts: TSDataset, | ||
|
@@ -628,11 +769,14 @@ def backtest( | |
mode: str = "expand", | ||
aggregate_metrics: bool = False, | ||
n_jobs: int = 1, | ||
refit: Union[bool, int] = True, | ||
joblib_params: Optional[Dict[str, Any]] = None, | ||
forecast_params: Optional[Dict[str, Any]] = None, | ||
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: | ||
"""Run backtest with the pipeline. | ||
|
||
If ``refit != True`` and some component of the pipeline doesn't support forecasting with gap, this component will raise an exception. | ||
|
||
Parameters | ||
---------- | ||
ts: | ||
|
@@ -647,6 +791,14 @@ def backtest( | |
If True aggregate metrics above folds, return raw metrics otherwise | ||
n_jobs: | ||
Number of jobs to run in parallel | ||
refit: | ||
Determines how often pipeline should be retrained during iteration over folds. | ||
|
||
* If ``True``: pipeline is retrained on each fold. | ||
|
||
* If ``False``: pipeline is trained only on the first fold, and on future folds trained pipeline is used. | ||
|
||
* If ``value: int``: pipeline is trained every ``value`` folds starting from the first. | ||
joblib_params: | ||
Additional parameters for :py:class:`joblib.Parallel` | ||
forecast_params: | ||
|
@@ -656,6 +808,11 @@ def backtest( | |
------- | ||
metrics_df, forecast_df, fold_info_df: Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame] | ||
Metrics dataframe, forecast dataframe and dataframe with information about folds | ||
|
||
Raises | ||
------ | ||
Exception: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure that this is really informative) |
||
|
||
""" | ||
if joblib_params is None: | ||
joblib_params = dict(verbose=11, backend="multiprocessing", mmap_mode="c") | ||
|
@@ -666,21 +823,15 @@ def backtest( | |
self._init_backtest() | ||
self._validate_backtest_metrics(metrics=metrics) | ||
masks = self._prepare_fold_masks(ts=ts, masks=n_folds, mode=mode) | ||
|
||
folds = Parallel(n_jobs=n_jobs, **joblib_params)( | ||
delayed(self._run_fold)( | ||
train=train, | ||
test=test, | ||
fold_number=fold_number, | ||
mask=masks[fold_number], | ||
metrics=metrics, | ||
forecast_params=forecast_params, | ||
) | ||
for fold_number, (train, test) in enumerate( | ||
self._generate_folds_datasets(ts=ts, masks=masks, horizon=self.horizon) | ||
) | ||
self._folds = self._run_all_folds( | ||
masks=masks, | ||
ts=ts, | ||
metrics=metrics, | ||
n_jobs=n_jobs, | ||
refit=refit, | ||
joblib_params=joblib_params, | ||
forecast_params=forecast_params, | ||
) | ||
self._folds = {i: fold for i, fold in enumerate(folds)} | ||
|
||
metrics_df = self._get_backtest_metrics(aggregate_metrics=aggregate_metrics) | ||
forecast_df = self._get_backtest_forecasts() | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe "pipeline is trained only on the first fold". I guess it is obvious that it will be used on the other folds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean without the rest part))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will be fixed on a couple of minutes