Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Commit

Permalink
Adds m5 forecasting kaggle example (#280)
Browse files Browse the repository at this point in the history
I pretty much verbatim copied the code from [this notebook](https://www.kaggle.com/code/ratan123/m5-forecasting-lightgbm-with-timeseries-splits).
It does have a few odd quirks in it -- so if I understood what was going on I removed it, else I left it in. E.g. `test_2` is not used... It is however known to be a good example of a beginner notebook that shows basic approaches to
modeling the problem.

This is an example that shows:

1. loading data
2. creating a base set.
3. adding more features.
4. creating a training set.
5. fitting model & predicting with it.
6. saving the result.

There are a few other ways one could write some of this code.
But as a first pass this seems fine. E.g. we could split up the model
training function a bit more. Or we could use the parameterize decorator.

# Tweaks I made

1. added doc strings to help people get the feel for that.
2. refactored some functions to make it appear more "production like", showing
better reuse/smaller functions, and better naming. E.g. the fit and predict functions
 should be themselves nodes in the DAG, that a function can itself call out to.

# Notes

No unit tests have been added, or data quality checks -- however for real production
purposes we would expect one to add them.
  • Loading branch information
skrawcz authored Jan 24, 2023
1 parent c75d515 commit dc677ae
Show file tree
Hide file tree
Showing 8 changed files with 840 additions and 0 deletions.
21 changes: 21 additions & 0 deletions examples/model_examples/time-series/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Kaggle M5 Forecasting
Source - https://www.kaggle.com/c/m5-forecasting-accuracy.

This is an example of how one might use Hamilton using the M5 Forecasting Kaggle
challenge as an example. The code here is based on Ratan Rohith's notebook
that can be [found here](https://www.kaggle.com/code/ratan123/m5-forecasting-lightgbm-with-timeseries-splits).
The notebook is a beginner level notebook that does some feature engineering, and uses LightGBM to fit
a model over folds that then predicts the future. The notebook is a good starting point for anyone want to know
the basics of an approach to time series forecasting.

# Set up
1. Set up the python virtual environment and activate it, then install the required python dependencies.
`pip install -r requirements.txt`.
2. [Download the data](https://www.kaggle.com/competitions/m5-forecasting-accuracy/data) --
you will need to log in to Kaggle to download the data.
3. Decompress the data into the same folder as the code.
4. Run `run.py`. `python run.py`.

# Notes
Here's what this code executes:
![kaggle_submission_df](kaggle_submission_df.dot.png)
319 changes: 319 additions & 0 deletions examples/model_examples/time-series/data_loaders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
import gc
import logging
import os
from typing import Dict

import pandas as pd
import utils

from hamilton.function_modifiers import config, extract_columns, extract_fields

logger = logging.getLogger(__name__)


def _load_csv(
path: str, name: str, verbose: bool = True, reduce_memory: bool = True
) -> pd.DataFrame:
"""Helper function to load csv files.
Given that loading from parquet is faster than csv, this function will also save a parquet version of the csv file.
:param path:
:param name:
:param verbose:
:param reduce_memory:
:return:
"""
parquet_path = path.replace(".csv", ".parquet")
if os.path.exists(parquet_path):
logger.info("Loading from parquet.")
data_frame = pd.read_parquet(parquet_path)
else:
data_frame = pd.read_csv(path)
data_frame.to_parquet(parquet_path)
if reduce_memory:
data_frame = utils.reduce_mem_usage(data_frame, name, verbose=verbose)
if verbose:
logger.info(
"{} has {} rows and {} columns".format(name, data_frame.shape[0], data_frame.shape[1])
)
return data_frame


def calendar(
calendar_path: str = "/kaggle/input/m5-forecasting-accuracy/calendar.csv",
) -> pd.DataFrame:
"""Loads the calendar data.
Performs some normalization on it.
:param calendar_path:
:return:
"""
_calendar = _load_csv(calendar_path, "calendar", True, True)
# drop some calendar features -- TBD whether this is the best place
_calendar.drop(["weekday", "wday", "month", "year"], inplace=True, axis=1)
nan_features = ["event_name_1", "event_type_1", "event_name_2", "event_type_2"]
for feature in nan_features:
_calendar[feature].fillna("unknown", inplace=True)
_calendar["date"] = pd.to_datetime(_calendar["date"])
return _calendar


def sell_prices(
sell_prices_path: str = "/kaggle/input/m5-forecasting-accuracy/sell_prices.csv",
) -> pd.DataFrame:
"""Loads the sell prices data.
:param sell_prices_path:
:return:
"""
_sell_prices = _load_csv(sell_prices_path, "sell_prices", True, True)
return _sell_prices


def sales_train_validation(
sales_train_validation_path: str = "/kaggle/input/m5-forecasting-accuracy/sales_train_validation.csv",
) -> pd.DataFrame:
"""Loads the sales train validation data.
Makes the data into a shape we want to work with.
:param sales_train_validation_path:
:return:
"""
_sales_train_validation = _load_csv(
sales_train_validation_path, "sales_train_validation", True, reduce_memory=False
)
_sales_train_validation = pd.melt(
_sales_train_validation,
id_vars=["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"],
var_name="day",
value_name="demand",
)
_sales_train_validation = utils.reduce_mem_usage(
_sales_train_validation, "sales_train_validation", verbose=True
)
logger.info(
"Melted sales train validation has {} rows and {} columns".format(
_sales_train_validation.shape[0], _sales_train_validation.shape[1]
)
)
_sales_train_validation["part"] = "train"
return _sales_train_validation


@extract_fields(
{"test1_base": pd.DataFrame, "test2_base": pd.DataFrame, "submission": pd.DataFrame}
)
def submission_loader(
submission_path: str = "/kaggle/input/m5-forecasting-accuracy/sample_submission.csv",
) -> Dict[str, pd.DataFrame]:
"""Loads the submission data.
The notebook I got this from did some weird splitting with test1 and test 2.
I'm not sure why they did that, but I replicated that logic here anyway.
:param submission_path:
:return: dict of dataframes
"""
submission = _load_csv(submission_path, "submission", True, reduce_memory=False)
# seperate test dataframes
test1_rows = [row for row in submission["id"] if "validation" in row]
test2_rows = [row for row in submission["id"] if "evaluation" in row]
test1 = submission[submission["id"].isin(test1_rows)]
test2 = submission[submission["id"].isin(test2_rows)]

# change column names
test1.columns = [
"id",
"d_1914",
"d_1915",
"d_1916",
"d_1917",
"d_1918",
"d_1919",
"d_1920",
"d_1921",
"d_1922",
"d_1923",
"d_1924",
"d_1925",
"d_1926",
"d_1927",
"d_1928",
"d_1929",
"d_1930",
"d_1931",
"d_1932",
"d_1933",
"d_1934",
"d_1935",
"d_1936",
"d_1937",
"d_1938",
"d_1939",
"d_1940",
"d_1941",
]
test2.columns = [
"id",
"d_1942",
"d_1943",
"d_1944",
"d_1945",
"d_1946",
"d_1947",
"d_1948",
"d_1949",
"d_1950",
"d_1951",
"d_1952",
"d_1953",
"d_1954",
"d_1955",
"d_1956",
"d_1957",
"d_1958",
"d_1959",
"d_1960",
"d_1961",
"d_1962",
"d_1963",
"d_1964",
"d_1965",
"d_1966",
"d_1967",
"d_1968",
"d_1969",
]

return {"test1_base": test1, "test2_base": test2, "submission": submission}


def product(sales_train_validation: pd.DataFrame) -> pd.DataFrame:
"""Creates a "product" table from the sales train validation data.
:param sales_train_validation:
:return:
"""
# get product table
_product = sales_train_validation[
["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"]
].drop_duplicates()
return _product


def _merge_test_set_with_product(
base_df: pd.DataFrame, product: pd.DataFrame, part_value: str
) -> pd.DataFrame:
"""Helper function to merge the test set with the product table.
:param base_df:
:param product:
:param part_value:
:return:
"""
merged_df = base_df.merge(product, how="left", on="id")
merged_df = pd.melt(
merged_df,
id_vars=["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"],
var_name="day",
value_name="demand",
)
merged_df["part"] = part_value
return merged_df


@config.when(load_test2="True")
def base_dataset__full(
test1_base: pd.DataFrame,
test2_base: pd.DataFrame,
product: pd.DataFrame,
sales_train_validation: pd.DataFrame,
) -> pd.DataFrame:
"""Creates the base dataset we're building from.
This is only invoked if you're wanting test2 to be used as well.
:param test1_base:
:param test2_base:
:param product:
:param sales_train_validation:
:return:
"""
# create test1 and test2
test1 = _merge_test_set_with_product(test1_base, product, "test1")
test2 = _merge_test_set_with_product(test2_base, product, "test2")
data = pd.concat([sales_train_validation, test1, test2], axis=0)
return data


@config.when(load_test2="False")
def base_dataset__test1_only(
test1_base: pd.DataFrame, product: pd.DataFrame, sales_train_validation: pd.DataFrame
) -> pd.DataFrame:
"""Creates the base dataset we're building from.
This is the default one.
:param test1_base:
:param product:
:param sales_train_validation:
:return:
"""
# create test1 only
test1 = _merge_test_set_with_product(test1_base, product, "test1")
data = pd.concat([sales_train_validation, test1], axis=0)
return data


@extract_columns(
*[
"id",
"item_id",
"dept_id",
"cat_id",
"store_id",
"state_id",
"demand",
"part",
"date",
"wm_yr_wk",
"event_name_1",
"event_type_1",
"event_name_2",
"event_type_2",
"snap_CA",
"snap_TX",
"snap_WI",
"sell_price",
]
)
def base_training_data(
base_dataset: pd.DataFrame,
calendar: pd.DataFrame,
sell_prices: pd.DataFrame,
num_rows_to_skip: int = 55000000,
) -> pd.DataFrame:
"""Creates the base training data set.
:param base_dataset:
:param calendar:
:param sell_prices:
:param num_rows_to_skip: how many rows to skip. If zero, or negative, use all rows.
:return:
"""
subset = base_dataset.loc[num_rows_to_skip:]
# notebook crash with the entire dataset (maybee use tensorflow, dask, pyspark xD)
subset = pd.merge(subset, calendar, how="left", left_on=["day"], right_on=["d"])
subset.drop(["d", "day"], inplace=True, axis=1)
# get the sell price data (this feature should be very important)
subset = subset.merge(sell_prices, on=["store_id", "item_id", "wm_yr_wk"], how="left")
subset.index = subset.id
logger.info(
"Our final dataset to train has {} rows and {} columns".format(
subset.shape[0], subset.shape[1]
)
)
gc.collect()
return subset
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit dc677ae

Please sign in to comment.