From ede9e7a63bd541484cf861aaf527f18eb7eca45b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Morales?= Date: Thu, 19 Sep 2024 00:15:47 +0000 Subject: [PATCH] fix: chunk series in parallel forecast --- nbs/src/core/core.ipynb | 182 ++++++++++++-------------------- python/statsforecast/_modidx.py | 1 - python/statsforecast/core.py | 172 +++++++++++------------------- 3 files changed, 132 insertions(+), 223 deletions(-) diff --git a/nbs/src/core/core.ipynb b/nbs/src/core/core.ipynb index 3160dba6..f9b75611 100644 --- a/nbs/src/core/core.ipynb +++ b/nbs/src/core/core.ipynb @@ -91,7 +91,6 @@ "import reprlib\n", "import time\n", "import warnings\n", - "from collections import defaultdict\n", "from concurrent.futures import ProcessPoolExecutor, as_completed\n", "from pathlib import Path\n", "from typing import Any, Dict, List, Optional, Union\n", @@ -100,7 +99,7 @@ "import pandas as pd\n", "import utilsforecast.processing as ufp\n", "from fugue.execution.factory import make_execution_engine, try_get_context_execution_engine\n", - "from threadpoolctl import ThreadpoolController, threadpool_limits\n", + "from threadpoolctl import ThreadpoolController\n", "from tqdm.auto import tqdm\n", "from triad import conditional_dispatcher\n", "from utilsforecast.compat import DataFrame, pl_DataFrame, pl_Series\n", @@ -124,39 +123,7 @@ " datefmt='%Y-%m-%d %H:%M:%S',\n", " )\n", "logger = logging.getLogger(__name__)\n", - "\n", - "_controller = ThreadpoolController()\n", - "\n", - "@_controller.wrap(limits=1)\n", - "def _forecast_serie(h, y, X, X_future, models, fallback_model, level, fitted):\n", - " forecast_res = {}\n", - " fitted_res = {}\n", - " times = {}\n", - " for model in models:\n", - " start = time.perf_counter()\n", - " model_kwargs = dict(h=h, y=y, X=X, X_future=X_future, fitted=fitted)\n", - " if \"level\" in inspect.signature(model.forecast).parameters and level:\n", - " model_kwargs[\"level\"] = level\n", - " try:\n", - " model_res = model.forecast(**model_kwargs)\n", - " except Exception as e:\n", - " if fallback_model is None:\n", - " raise e\n", - " model_res = fallback_model.forecast(**model_kwargs)\n", - " model_name = repr(model)\n", - " times[model_name] = time.perf_counter() - start\n", - " for k, v in model_res.items():\n", - " if k == \"mean\":\n", - " forecast_res[model_name] = v\n", - " elif k.startswith((\"lo\", \"hi\")):\n", - " col_name = f\"{model_name}-{k}\"\n", - " forecast_res[col_name] = v\n", - " elif k == \"fitted\":\n", - " fitted_res[model_name] = v\n", - " elif k.startswith((\"fitted-lo\", \"fitted-hi\")):\n", - " col_name = f'{model_name}-{k.replace(\"fitted-\", \"\")}'\n", - " fitted_res[col_name] = v\n", - " return forecast_res, fitted_res, times" + "_controller = ThreadpoolController()" ] }, { @@ -471,19 +438,19 @@ " def split_fm(self, fm, n_chunks):\n", " return [fm[idxs] for idxs in np.array_split(range(self.n_groups), n_chunks) if idxs.size]\n", "\n", - "\n", + " @_controller.wrap(limits=1)\n", " def _single_threaded_fit(self, models, fallback_model=None):\n", - " with threadpool_limits(limits=1):\n", - " return self.fit(models=models, fallback_model=fallback_model)\n", + " return self.fit(models=models, fallback_model=fallback_model)\n", "\n", + " @_controller.wrap(limits=1)\n", " def _single_threaded_predict(self, fm, h, X=None, level=tuple()):\n", - " with threadpool_limits(limits=1):\n", - " return self.predict(fm=fm, h=h, X=X, level=level)\n", + " return self.predict(fm=fm, h=h, X=X, level=level)\n", "\n", + " @_controller.wrap(limits=1)\n", " def _single_threaded_fit_predict(self, models, h, X=None, level=tuple()):\n", - " with threadpool_limits(limits=1):\n", - " return self.fit_predict(models=models, h=h, X=X, level=level)\n", + " return self.fit_predict(models=models, h=h, X=X, level=level)\n", "\n", + " @_controller.wrap(limits=1)\n", " def _single_threaded_forecast(\n", " self,\n", " models,\n", @@ -495,18 +462,18 @@ " verbose=False,\n", " target_col='y',\n", " ):\n", - " with threadpool_limits(limits=1):\n", - " return self.forecast(\n", - " models=models,\n", - " h=h,\n", - " fallback_model=fallback_model,\n", - " fitted=fitted,\n", - " X=X,\n", - " level=level,\n", - " verbose=verbose,\n", - " target_col=target_col,\n", - " )\n", - " \n", + " return self.forecast(\n", + " models=models,\n", + " h=h,\n", + " fallback_model=fallback_model,\n", + " fitted=fitted,\n", + " X=X,\n", + " level=level,\n", + " verbose=verbose,\n", + " target_col=target_col,\n", + " )\n", + "\n", + " @_controller.wrap(limits=1)\n", " def _single_threaded_cross_validation(\n", " self,\n", " models,\n", @@ -521,20 +488,19 @@ " verbose=False,\n", " target_col='y',\n", " ):\n", - " with threadpool_limits(limits=1):\n", - " return self.cross_validation(\n", - " models=models,\n", - " h=h,\n", - " test_size=test_size,\n", - " fallback_model=fallback_model,\n", - " step_size=step_size,\n", - " input_size=input_size,\n", - " fitted=fitted,\n", - " level=level,\n", - " refit=refit,\n", - " verbose=verbose,\n", - " target_col=target_col,\n", - " )" + " return self.cross_validation(\n", + " models=models,\n", + " h=h,\n", + " test_size=test_size,\n", + " fallback_model=fallback_model,\n", + " step_size=step_size,\n", + " input_size=input_size,\n", + " fitted=fitted,\n", + " level=level,\n", + " refit=refit,\n", + " verbose=verbose,\n", + " target_col=target_col,\n", + " )" ] }, { @@ -1685,12 +1651,14 @@ " fm = np.vstack([f.get() for f in futures])\n", " return fm \n", " \n", - " def _get_gas_Xs(self, X):\n", - " gas = self.ga.split(self.n_jobs)\n", + " def _get_gas_Xs(self, X, tasks_per_job=1):\n", + " n_chunks = min(tasks_per_job * self.n_jobs, self.ga.n_groups)\n", + " gas = self.ga.split(n_chunks)\n", " if X is not None:\n", - " Xs = X.split(self.n_jobs)\n", + " Xs = X.split(n_chunks)\n", " else:\n", " from itertools import repeat\n", + "\n", " Xs = repeat(None)\n", " return gas, Xs\n", " \n", @@ -1735,57 +1703,47 @@ " return fm, fcsts, cols\n", "\n", " def _forecast_parallel(self, h, fitted, X, level, target_col):\n", - " n_series = self.ga.n_groups\n", - " forecast_res = defaultdict(lambda: np.empty(n_series * h, dtype=self.ga.data.dtype))\n", - " fitted_res = defaultdict(\n", - " lambda: np.empty(self.ga.data.shape[0], dtype=self.ga.data.dtype)\n", - " )\n", - " fitted_res[target_col] = self.ga.data[:, 0]\n", - " future2pos = {}\n", - " times = {repr(m): 0.0 for m in self.models}\n", + " gas, Xs = self._get_gas_Xs(X=X, tasks_per_job=100)\n", + " results = [None] * len(gas)\n", " with ProcessPoolExecutor(self.n_jobs) as executor:\n", - " for i, serie in enumerate(self.ga):\n", - " y_train = serie[:, 0]\n", - " X_train = serie[:, 1:] if serie.shape[1] > 1 else None\n", - " if X is None:\n", - " X_future = None\n", - " else:\n", - " X_future = X[i]\n", - " future = executor.submit(\n", - " _forecast_serie,\n", + " future2pos = {\n", + " executor.submit(\n", + " ga.forecast,\n", " h=h,\n", - " y=y_train,\n", - " X=X_train,\n", - " X_future=X_future,\n", " models=self.models,\n", " fallback_model=self.fallback_model,\n", " fitted=fitted,\n", + " X=X,\n", " level=level,\n", - " )\n", - " future2pos[future] = i\n", + " verbose=False,\n", + " target_col=target_col,\n", + " ): i\n", + " for i, (ga, X) in enumerate(zip(gas, Xs))\n", + " }\n", " iterable = tqdm(\n", - " as_completed(future2pos), disable=not self.verbose, total=n_series, desc=\"Forecast\"\n", - " ) \n", + " as_completed(future2pos),\n", + " disable=not self.verbose,\n", + " total=len(future2pos),\n", + " desc=\"Forecast\",\n", + " bar_format=\"{l_bar}{bar}| {n_fmt}/{total_fmt} [Elapsed: {elapsed}{postfix}]\",\n", + " )\n", " for future in iterable:\n", " i = future2pos[future]\n", - " fcst_idxs = slice(i * h, (i + 1) * h)\n", - " serie_idxs = slice(self.ga.indptr[i], self.ga.indptr[i + 1])\n", - " serie_fcst, serie_fitted, serie_times = future.result()\n", - " for k, v in serie_fcst.items():\n", - " forecast_res[k][fcst_idxs] = v\n", - " for k, v in serie_fitted.items():\n", - " fitted_res[k][serie_idxs] = v\n", - " for model_name, model_time in serie_times.items():\n", - " times[model_name] += model_time\n", - " return {\n", - " 'cols': list(forecast_res.keys()),\n", - " 'forecasts': np.hstack([v[:, None] for v in forecast_res.values()]),\n", - " 'fitted': {\n", - " 'cols': list(fitted_res.keys()),\n", - " 'values': np.hstack([v[:, None] for v in fitted_res.values()]),\n", + " results[i] = future.result()\n", + " result = {\n", + " 'cols': results[0]['cols'],\n", + " 'forecasts': np.vstack([r['forecasts'] for r in results]),\n", + " 'times': {\n", + " m: sum(r['times'][m] for r in results)\n", + " for m in [repr(m) for m in self.models]\n", " },\n", - " 'times': times,\n", - " } \n", + " }\n", + " if fitted:\n", + " result['fitted'] = {\n", + " 'cols': results[0]['fitted']['cols'],\n", + " 'values': np.hstack([r['fitted']['values'] for r in results]),\n", + " }\n", + " return result\n", "\n", " def _cross_validation_parallel(self, h, test_size, step_size, input_size, fitted, level, refit, target_col):\n", " #create elements for each core\n", diff --git a/python/statsforecast/_modidx.py b/python/statsforecast/_modidx.py index cfcac72a..aa82dc94 100644 --- a/python/statsforecast/_modidx.py +++ b/python/statsforecast/_modidx.py @@ -185,7 +185,6 @@ 'statsforecast/core.py'), 'statsforecast.core._StatsForecast.save': ( 'src/core/core.html#_statsforecast.save', 'statsforecast/core.py'), - 'statsforecast.core._forecast_serie': ('src/core/core.html#_forecast_serie', 'statsforecast/core.py'), 'statsforecast.core._get_n_jobs': ('src/core/core.html#_get_n_jobs', 'statsforecast/core.py'), 'statsforecast.core._id_as_idx': ('src/core/core.html#_id_as_idx', 'statsforecast/core.py'), 'statsforecast.core._maybe_warn_sort_df': ( 'src/core/core.html#_maybe_warn_sort_df', diff --git a/python/statsforecast/core.py b/python/statsforecast/core.py index 726589fc..66180ce5 100644 --- a/python/statsforecast/core.py +++ b/python/statsforecast/core.py @@ -14,7 +14,6 @@ import reprlib import time import warnings -from collections import defaultdict from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path from typing import Any, Dict, List, Optional, Union @@ -26,7 +25,7 @@ make_execution_engine, try_get_context_execution_engine, ) -from threadpoolctl import ThreadpoolController, threadpool_limits +from threadpoolctl import ThreadpoolController from tqdm.auto import tqdm from triad import conditional_dispatcher from utilsforecast.compat import DataFrame, pl_DataFrame, pl_Series @@ -42,41 +41,8 @@ datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) - _controller = ThreadpoolController() - -@_controller.wrap(limits=1) -def _forecast_serie(h, y, X, X_future, models, fallback_model, level, fitted): - forecast_res = {} - fitted_res = {} - times = {} - for model in models: - start = time.perf_counter() - model_kwargs = dict(h=h, y=y, X=X, X_future=X_future, fitted=fitted) - if "level" in inspect.signature(model.forecast).parameters and level: - model_kwargs["level"] = level - try: - model_res = model.forecast(**model_kwargs) - except Exception as e: - if fallback_model is None: - raise e - model_res = fallback_model.forecast(**model_kwargs) - model_name = repr(model) - times[model_name] = time.perf_counter() - start - for k, v in model_res.items(): - if k == "mean": - forecast_res[model_name] = v - elif k.startswith(("lo", "hi")): - col_name = f"{model_name}-{k}" - forecast_res[col_name] = v - elif k == "fitted": - fitted_res[model_name] = v - elif k.startswith(("fitted-lo", "fitted-hi")): - col_name = f'{model_name}-{k.replace("fitted-", "")}' - fitted_res[col_name] = v - return forecast_res, fitted_res, times - # %% ../../nbs/src/core/core.ipynb 10 class GroupedArray(BaseGroupedArray): @@ -433,18 +399,19 @@ def split_fm(self, fm, n_chunks): if idxs.size ] + @_controller.wrap(limits=1) def _single_threaded_fit(self, models, fallback_model=None): - with threadpool_limits(limits=1): - return self.fit(models=models, fallback_model=fallback_model) + return self.fit(models=models, fallback_model=fallback_model) + @_controller.wrap(limits=1) def _single_threaded_predict(self, fm, h, X=None, level=tuple()): - with threadpool_limits(limits=1): - return self.predict(fm=fm, h=h, X=X, level=level) + return self.predict(fm=fm, h=h, X=X, level=level) + @_controller.wrap(limits=1) def _single_threaded_fit_predict(self, models, h, X=None, level=tuple()): - with threadpool_limits(limits=1): - return self.fit_predict(models=models, h=h, X=X, level=level) + return self.fit_predict(models=models, h=h, X=X, level=level) + @_controller.wrap(limits=1) def _single_threaded_forecast( self, models, @@ -456,18 +423,18 @@ def _single_threaded_forecast( verbose=False, target_col="y", ): - with threadpool_limits(limits=1): - return self.forecast( - models=models, - h=h, - fallback_model=fallback_model, - fitted=fitted, - X=X, - level=level, - verbose=verbose, - target_col=target_col, - ) + return self.forecast( + models=models, + h=h, + fallback_model=fallback_model, + fitted=fitted, + X=X, + level=level, + verbose=verbose, + target_col=target_col, + ) + @_controller.wrap(limits=1) def _single_threaded_cross_validation( self, models, @@ -482,20 +449,19 @@ def _single_threaded_cross_validation( verbose=False, target_col="y", ): - with threadpool_limits(limits=1): - return self.cross_validation( - models=models, - h=h, - test_size=test_size, - fallback_model=fallback_model, - step_size=step_size, - input_size=input_size, - fitted=fitted, - level=level, - refit=refit, - verbose=verbose, - target_col=target_col, - ) + return self.cross_validation( + models=models, + h=h, + test_size=test_size, + fallback_model=fallback_model, + step_size=step_size, + input_size=input_size, + fitted=fitted, + level=level, + refit=refit, + verbose=verbose, + target_col=target_col, + ) # %% ../../nbs/src/core/core.ipynb 24 def _get_n_jobs(n_groups, n_jobs): @@ -1238,10 +1204,11 @@ def _fit_parallel(self): fm = np.vstack([f.get() for f in futures]) return fm - def _get_gas_Xs(self, X): - gas = self.ga.split(self.n_jobs) + def _get_gas_Xs(self, X, tasks_per_job=1): + n_chunks = min(tasks_per_job * self.n_jobs, self.ga.n_groups) + gas = self.ga.split(n_chunks) if X is not None: - Xs = X.split(self.n_jobs) + Xs = X.split(n_chunks) else: from itertools import repeat @@ -1289,62 +1256,47 @@ def _fit_predict_parallel(self, h, X, level): return fm, fcsts, cols def _forecast_parallel(self, h, fitted, X, level, target_col): - n_series = self.ga.n_groups - forecast_res = defaultdict( - lambda: np.empty(n_series * h, dtype=self.ga.data.dtype) - ) - fitted_res = defaultdict( - lambda: np.empty(self.ga.data.shape[0], dtype=self.ga.data.dtype) - ) - fitted_res[target_col] = self.ga.data[:, 0] - future2pos = {} - times = {repr(m): 0.0 for m in self.models} + gas, Xs = self._get_gas_Xs(X=X, tasks_per_job=100) + results = [None] * len(gas) with ProcessPoolExecutor(self.n_jobs) as executor: - for i, serie in enumerate(self.ga): - y_train = serie[:, 0] - X_train = serie[:, 1:] if serie.shape[1] > 1 else None - if X is None: - X_future = None - else: - X_future = X[i] - future = executor.submit( - _forecast_serie, + future2pos = { + executor.submit( + ga.forecast, h=h, - y=y_train, - X=X_train, - X_future=X_future, models=self.models, fallback_model=self.fallback_model, fitted=fitted, + X=X, level=level, - ) - future2pos[future] = i + verbose=False, + target_col=target_col, + ): i + for i, (ga, X) in enumerate(zip(gas, Xs)) + } iterable = tqdm( as_completed(future2pos), disable=not self.verbose, - total=n_series, + total=len(future2pos), desc="Forecast", + bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [Elapsed: {elapsed}{postfix}]", ) for future in iterable: i = future2pos[future] - fcst_idxs = slice(i * h, (i + 1) * h) - serie_idxs = slice(self.ga.indptr[i], self.ga.indptr[i + 1]) - serie_fcst, serie_fitted, serie_times = future.result() - for k, v in serie_fcst.items(): - forecast_res[k][fcst_idxs] = v - for k, v in serie_fitted.items(): - fitted_res[k][serie_idxs] = v - for model_name, model_time in serie_times.items(): - times[model_name] += model_time - return { - "cols": list(forecast_res.keys()), - "forecasts": np.hstack([v[:, None] for v in forecast_res.values()]), - "fitted": { - "cols": list(fitted_res.keys()), - "values": np.hstack([v[:, None] for v in fitted_res.values()]), + results[i] = future.result() + result = { + "cols": results[0]["cols"], + "forecasts": np.vstack([r["forecasts"] for r in results]), + "times": { + m: sum(r["times"][m] for r in results) + for m in [repr(m) for m in self.models] }, - "times": times, } + if fitted: + result["fitted"] = { + "cols": results[0]["fitted"]["cols"], + "values": np.hstack([r["fitted"]["values"] for r in results]), + } + return result def _cross_validation_parallel( self, h, test_size, step_size, input_size, fitted, level, refit, target_col