Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Adds m5 forecasting kaggle example #280

Merged
merged 5 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions examples/model_examples/time-series/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Kaggle M5 Forecasting
Source - https://www.kaggle.com/c/m5-forecasting-accuracy.

This is an example of how one might use Hamilton using the M5 Forecasting Kaggle
challenge as an example. The code here is based on Ratan Rohith's notebook
that can be [found here](https://www.kaggle.com/code/ratan123/m5-forecasting-lightgbm-with-timeseries-splits).
The notebook is a beginner level notebook that does some feature engineering, and uses LightGBM to fit
a model over folds that then predicts the future. The notebook is a good starting point for anyone want to know
the basics of an approach to time series forecasting.

# Set up
1. Set up the python virtual environment and activate it, then install the required python dependencies.
`pip install -r requirements.txt`.
2. [Download the data](https://www.kaggle.com/competitions/m5-forecasting-accuracy/data) --
you will need to log in to Kaggle to download the data.
3. Decompress the data into the same folder as the code.
4. Run `run.py`. `python run.py`.

# Notes
Here's what this code executes:
![kaggle_submission_df](kaggle_submission_df.dot.png)
319 changes: 319 additions & 0 deletions examples/model_examples/time-series/data_loaders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
import gc
import logging
import os
from typing import Dict

import pandas as pd
import utils

from hamilton.function_modifiers import config, extract_columns, extract_fields

logger = logging.getLogger(__name__)


def _load_csv(
path: str, name: str, verbose: bool = True, reduce_memory: bool = True
) -> pd.DataFrame:
"""Helper function to load csv files.

Given that loading from parquet is faster than csv, this function will also save a parquet version of the csv file.

:param path:
:param name:
:param verbose:
:param reduce_memory:
:return:
"""
parquet_path = path.replace(".csv", ".parquet")
if os.path.exists(parquet_path):
logger.info("Loading from parquet.")
data_frame = pd.read_parquet(parquet_path)
else:
data_frame = pd.read_csv(path)
data_frame.to_parquet(parquet_path)
if reduce_memory:
data_frame = utils.reduce_mem_usage(data_frame, name, verbose=verbose)
if verbose:
logger.info(
"{} has {} rows and {} columns".format(name, data_frame.shape[0], data_frame.shape[1])
)
return data_frame


def calendar(
calendar_path: str = "/kaggle/input/m5-forecasting-accuracy/calendar.csv",
) -> pd.DataFrame:
"""Loads the calendar data.

Performs some normalization on it.

:param calendar_path:
:return:
"""
_calendar = _load_csv(calendar_path, "calendar", True, True)
# drop some calendar features -- TBD whether this is the best place
_calendar.drop(["weekday", "wday", "month", "year"], inplace=True, axis=1)
nan_features = ["event_name_1", "event_type_1", "event_name_2", "event_type_2"]
for feature in nan_features:
_calendar[feature].fillna("unknown", inplace=True)
_calendar["date"] = pd.to_datetime(_calendar["date"])
return _calendar


def sell_prices(
sell_prices_path: str = "/kaggle/input/m5-forecasting-accuracy/sell_prices.csv",
) -> pd.DataFrame:
"""Loads the sell prices data.

:param sell_prices_path:
:return:
"""
_sell_prices = _load_csv(sell_prices_path, "sell_prices", True, True)
return _sell_prices


def sales_train_validation(
sales_train_validation_path: str = "/kaggle/input/m5-forecasting-accuracy/sales_train_validation.csv",
) -> pd.DataFrame:
"""Loads the sales train validation data.

Makes the data into a shape we want to work with.
:param sales_train_validation_path:
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
:return:
"""
_sales_train_validation = _load_csv(
sales_train_validation_path, "sales_train_validation", True, reduce_memory=False
)
_sales_train_validation = pd.melt(
_sales_train_validation,
id_vars=["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"],
var_name="day",
value_name="demand",
)
_sales_train_validation = utils.reduce_mem_usage(
_sales_train_validation, "sales_train_validation", verbose=True
)
logger.info(
"Melted sales train validation has {} rows and {} columns".format(
_sales_train_validation.shape[0], _sales_train_validation.shape[1]
)
)
_sales_train_validation["part"] = "train"
return _sales_train_validation


@extract_fields(
{"test1_base": pd.DataFrame, "test2_base": pd.DataFrame, "submission": pd.DataFrame}
)
def submission_loader(
submission_path: str = "/kaggle/input/m5-forecasting-accuracy/sample_submission.csv",
) -> Dict[str, pd.DataFrame]:
"""Loads the submission data.

The notebook I got this from did some weird splitting with test1 and test 2.
I'm not sure why they did that, but I replicated that logic here anyway.

:param submission_path:
:return: dict of dataframes
"""
submission = _load_csv(submission_path, "submission", True, reduce_memory=False)
# seperate test dataframes
test1_rows = [row for row in submission["id"] if "validation" in row]
test2_rows = [row for row in submission["id"] if "evaluation" in row]
test1 = submission[submission["id"].isin(test1_rows)]
test2 = submission[submission["id"].isin(test2_rows)]

# change column names
test1.columns = [
"id",
"d_1914",
"d_1915",
"d_1916",
"d_1917",
"d_1918",
"d_1919",
"d_1920",
"d_1921",
"d_1922",
"d_1923",
"d_1924",
"d_1925",
"d_1926",
"d_1927",
"d_1928",
"d_1929",
"d_1930",
"d_1931",
"d_1932",
"d_1933",
"d_1934",
"d_1935",
"d_1936",
"d_1937",
"d_1938",
"d_1939",
"d_1940",
"d_1941",
]
test2.columns = [
"id",
"d_1942",
"d_1943",
"d_1944",
"d_1945",
"d_1946",
"d_1947",
"d_1948",
"d_1949",
"d_1950",
"d_1951",
"d_1952",
"d_1953",
"d_1954",
"d_1955",
"d_1956",
"d_1957",
"d_1958",
"d_1959",
"d_1960",
"d_1961",
"d_1962",
"d_1963",
"d_1964",
"d_1965",
"d_1966",
"d_1967",
"d_1968",
"d_1969",
]

return {"test1_base": test1, "test2_base": test2, "submission": submission}


def product(sales_train_validation: pd.DataFrame) -> pd.DataFrame:
"""Creates a "product" table from the sales train validation data.

:param sales_train_validation:
:return:
"""
# get product table
_product = sales_train_validation[
["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"]
].drop_duplicates()
return _product


def _merge_test_set_with_product(
base_df: pd.DataFrame, product: pd.DataFrame, part_value: str
) -> pd.DataFrame:
"""Helper function to merge the test set with the product table.

:param base_df:
:param product:
:param part_value:
:return:
"""
merged_df = base_df.merge(product, how="left", on="id")
merged_df = pd.melt(
merged_df,
id_vars=["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"],
var_name="day",
value_name="demand",
)
merged_df["part"] = part_value
return merged_df


@config.when(load_test2="True")
def base_dataset__full(
test1_base: pd.DataFrame,
test2_base: pd.DataFrame,
product: pd.DataFrame,
sales_train_validation: pd.DataFrame,
) -> pd.DataFrame:
"""Creates the base dataset we're building from.

This is only invoked if you're wanting test2 to be used as well.

:param test1_base:
:param test2_base:
:param product:
:param sales_train_validation:
:return:
"""
# create test1 and test2
test1 = _merge_test_set_with_product(test1_base, product, "test1")
test2 = _merge_test_set_with_product(test2_base, product, "test2")
data = pd.concat([sales_train_validation, test1, test2], axis=0)
return data


@config.when(load_test2="False")
def base_dataset__test1_only(
test1_base: pd.DataFrame, product: pd.DataFrame, sales_train_validation: pd.DataFrame
) -> pd.DataFrame:
"""Creates the base dataset we're building from.

This is the default one.

:param test1_base:
:param product:
:param sales_train_validation:
:return:
"""
# create test1 only
test1 = _merge_test_set_with_product(test1_base, product, "test1")
data = pd.concat([sales_train_validation, test1], axis=0)
return data


@extract_columns(
*[
"id",
"item_id",
"dept_id",
"cat_id",
"store_id",
"state_id",
"demand",
"part",
"date",
"wm_yr_wk",
"event_name_1",
"event_type_1",
"event_name_2",
"event_type_2",
"snap_CA",
"snap_TX",
"snap_WI",
"sell_price",
]
)
def base_training_data(
base_dataset: pd.DataFrame,
calendar: pd.DataFrame,
sell_prices: pd.DataFrame,
num_rows_to_skip: int = 55000000,
) -> pd.DataFrame:
"""Creates the base training data set.

:param base_dataset:
:param calendar:
:param sell_prices:
:param num_rows_to_skip: how many rows to skip. If zero, or negative, use all rows.
:return:
"""
subset = base_dataset.loc[num_rows_to_skip:]
# notebook crash with the entire dataset (maybee use tensorflow, dask, pyspark xD)
subset = pd.merge(subset, calendar, how="left", left_on=["day"], right_on=["d"])
subset.drop(["d", "day"], inplace=True, axis=1)
# get the sell price data (this feature should be very important)
subset = subset.merge(sell_prices, on=["store_id", "item_id", "wm_yr_wk"], how="left")
subset.index = subset.id
logger.info(
"Our final dataset to train has {} rows and {} columns".format(
subset.shape[0], subset.shape[1]
)
)
gc.collect()
return subset
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading