Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples/ibis and Ibis plugin #725

Merged
merged 13 commits into from
Mar 5, 2024
Empty file added examples/ibis/README.md
Empty file.
61 changes: 61 additions & 0 deletions examples/ibis/column_dataflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from typing import Optional

import ibis
import ibis.expr.types as ir

from hamilton.function_modifiers import extract_columns
from hamilton.plugins import ibis_extensions # noqa: F401


# extract specific columns from the table
@extract_columns("son", "pet", "month_of_absence")
def raw_table(raw_data_path: str) -> ir.Table:
"""Load the CSV found at `raw_data_path` into a Table expression
and format columns to snakecase
"""
return ibis.read_csv(sources=raw_data_path, table_name="absenteism").rename("snake_case")


# accesses a single column from `raw_table`
def has_children(son: ir.Column) -> ir.BooleanColumn:
"""True if someone has any children"""
return ibis.ifelse(son > 0, True, False)


# narrows the return type from `ir.Column` to `ir.BooleanColumn`
def has_pet(pet: ir.Column) -> ir.BooleanColumn:
"""True if someone has any pets"""
return ibis.ifelse(pet > 0, True, False).cast(bool)


# typing and docstring provides business context to features
def is_summer_brazil(month_of_absence: ir.Column) -> ir.BooleanColumn:
"""True if it is summer in Brazil during this month

People in the northern hemisphere are likely to take vacations
to warm places when it's cold locally
"""
return month_of_absence.isin([1, 2, 12])


def feature_table(
raw_table: ir.Table,
has_children: ir.BooleanColumn,
has_pet: ir.BooleanColumn,
is_summer_brazil: ir.BooleanColumn,
) -> ir.Table:
"""Join computed features to the `raw_data` table"""
return raw_table.mutate(
has_children=has_children,
has_pet=has_pet,
is_summer_brazil=is_summer_brazil,
)


def feature_set(
feature_table: ir.Table,
feature_selection: list[str],
condition: Optional[ibis.common.deferred.Deferred] = None,
) -> ir.Table:
"""Select feature columns and filter rows"""
return feature_table[feature_selection].filter(condition)
Binary file added examples/ibis/columns.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added examples/ibis/cross_validation.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added examples/ibis/ibis_feature_set.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
172 changes: 172 additions & 0 deletions examples/ibis/model_training.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import ibis
import ibis.expr.types as ir
import ibisml
import pandas as pd
from sklearn.base import BaseEstimator, clone
from sklearn.ensemble import HistGradientBoostingRegressor, RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import KFold

from hamilton.function_modifiers import config, extract_fields
from hamilton.htypes import Collect, Parallelizable


@config.when(model="linear")
def base_model__linear() -> BaseEstimator:
"""Use Linear regression"""
return LinearRegression()


@config.when(model="random_forest")
def base_model__random_forest() -> BaseEstimator:
"Use Random forest regression"
return RandomForestRegressor()


@config.when(model="boosting")
def base_model__boosting() -> BaseEstimator:
"Use gradient boosting reression"
return HistGradientBoostingRegressor()


def preprocessing_recipe() -> ibisml.Recipe:
"""Recipe to preprocess data for fitting and inference.
We drop the temporary `idx` column generated to
create cross validation splits
"""
return ibisml.Recipe(
ibisml.Drop(["idx"]),
ibisml.ImputeMean(ibisml.numeric()),
ibisml.ScaleStandard(ibisml.numeric()),
ibisml.OneHotEncode(ibisml.nominal()),
)


def data_split(
feature_set: ir.Table,
n_splits: int = 3,
) -> Parallelizable[tuple]:
"""Generate indices to create train/validation splits n times"""
folds = KFold(n_splits=n_splits)
idx = list(range(feature_set.count().execute()))
for train_idx, val_idx in folds.split(idx):
yield train_idx, val_idx


@extract_fields(
dict(
X_train=pd.DataFrame,
X_val=pd.DataFrame,
y_train=pd.DataFrame,
y_val=pd.DataFrame,
)
)
def prepare_data(
feature_set: ir.Table,
label: str,
data_split: tuple,
preprocessing_recipe: ibisml.Recipe,
) -> dict:
"""Split data and apply preprocessing recipe"""
train_idx, val_idx = data_split
# add temporary idx column for train/val splits
feature_set = feature_set.mutate(idx=ibis.row_number())
train_set = feature_set.filter(ibis._.idx.isin(train_idx))
val_set = feature_set.filter(ibis._.idx.isin(val_idx))

transform = preprocessing_recipe.fit(train_set, outcomes=[label])

train = transform(train_set)
df_train = train.to_pandas()
X_train = df_train[train.features]
y_train = df_train[train.outcomes]

df_test = transform(val_set).to_pandas()
X_val = df_test[train.features]
y_val = df_test[train.outcomes]

return dict(
X_train=X_train,
y_train=y_train,
X_val=X_val,
y_val=y_val,
)


def cross_validation_fold(
X_train: pd.DataFrame,
X_val: pd.DataFrame,
y_train: pd.DataFrame,
y_val: pd.DataFrame,
base_model: BaseEstimator,
data_split: tuple,
) -> dict:
"""Train model and make predictions on validation"""
train_idx, val_idx = data_split
model = clone(base_model)

model.fit(X_train, y_train)

y_val_pred = model.predict(X_val)
score = mean_squared_error(y_val, y_val_pred)

return dict(id=val_idx, y_true=y_val, y_pred=y_val_pred, score=score)


@extract_fields(
dict(
cross_validation_scores=list[float],
cross_validation_preds=list[dict],
)
)
def cross_validation_fold_collection(cross_validation_fold: Collect[dict]) -> dict:
"""Collect results from cross validation folds; separate predictions and
performance scores into two variables"""
scores, preds = [], []
for fold in cross_validation_fold:
scores.append(fold.pop("score"))
preds.append(fold)
return dict(
cross_validation_scores=scores,
cross_validation_preds=preds,
)


def prediction_table(cross_validation_preds: list[dict]) -> ir.Table:
"""Create a table with cross validation predictions for future reference"""
return ibis.memtable(cross_validation_preds)


def store_predictions(prediction_table: ir.Table) -> bool:
"""Store the cross validation predictions table somewhere
Currently only returns True.
"""
return True


@extract_fields(
dict(
full_model=BaseEstimator,
fitted_recipe=ibisml.RecipeTransform,
)
)
def train_full_model(
feature_set: ir.Table,
label: str,
preprocessing_recipe: ibisml.Recipe,
base_model: BaseEstimator,
) -> dict:
"""Train a model on the full dataset to use for inference."""
transform = preprocessing_recipe.fit(feature_set, outcomes=[label])

data = transform(feature_set)
df = data.to_pandas()
X = df[data.features]
y = df[data.outcomes]

base_model.fit(X, y)
return dict(
full_model=base_model,
fitted_recipe=transform,
)
5 changes: 5 additions & 0 deletions examples/ibis/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ibiml
ibis-framework[duckdb,examples]
scikit-learn
sf-hamilton[visualization]
tdqm
83 changes: 83 additions & 0 deletions examples/ibis/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
def view_expression(expression, **kwargs):
"""View an Ibis expression

see graphviz reference for `.render()` kwargs
ref: https://graphviz.readthedocs.io/en/stable/api.html#graphviz.Graph.render
"""
import ibis.expr.visualize as viz

dot = viz.to_graph(expression)
dot.render(**kwargs)
return dot


def main(level: str, model: str):
dataflow_components = []
config = {}
final_vars = ["feature_set"]

if level == "column":
import column_dataflow

dataflow_components.append(column_dataflow)
elif level == "table":
import table_dataflow

dataflow_components.append(table_dataflow)
else:
raise ValueError("`level` must be in ['column', 'table']")

if model:
import model_training

dataflow_components.append(model_training)
config["model"] = model
final_vars.extend(["full_model", "fitted_recipe", "cross_validation_scores"])

# build the Driver from modules
dr = (
driver.Builder()
.with_modules(*dataflow_components)
.with_config(config)
.with_adapters(ProgressBar())
.enable_dynamic_execution(allow_experimental_mode=True)
.with_remote_executor(SynchronousLocalTaskExecutor())
.build()
)

inputs = dict(
raw_data_path="../data_quality/simple/Absenteeism_at_work.csv",
feature_selection=[
"has_children",
"has_pet",
"is_summer_brazil",
"service_time",
"seasons",
"disciplinary_failure",
"absenteeism_time_in_hours",
],
label="absenteeism_time_in_hours",
)
dr.visualize_execution(
final_vars=final_vars, inputs=inputs, output_file_path="cross_validation.png"
)

res = dr.execute(final_vars, inputs=inputs)
view_expression(res["feature_set"], filename="ibis_feature_set", format="png")

print(res.keys())


if __name__ == "__main__":
import argparse

from hamilton import driver
from hamilton.execution.executors import SynchronousLocalTaskExecutor
from hamilton.plugins.h_tqdm import ProgressBar

parser = argparse.ArgumentParser()
parser.add_argument("--level", choices=["column", "table"])
parser.add_argument("--model", choices=["linear", "random_forest", "boosting"])
args = parser.parse_args()

main(level=args.level, model=args.model)
48 changes: 48 additions & 0 deletions examples/ibis/table_dataflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import Optional

import ibis
import ibis.expr.types as ir

from hamilton.function_modifiers import check_output # noqa: F401
from hamilton.plugins import ibis_extensions # noqa: F401


def raw_table(raw_data_path: str) -> ir.Table:
"""Load CSV from `raw_data_path` into a Table expression
and format column names to snakecase
"""
return ibis.read_csv(sources=raw_data_path, table_name="absenteism").rename("snake_case")


def feature_table(raw_table: ir.Table) -> ir.Table:
"""Add to `raw_table` the feature columns `has_children`
`has_pet`, and `is_summer_brazil`
"""
return raw_table.mutate(
has_children=(ibis.ifelse(ibis._.son > 0, True, False)),
has_pet=ibis.ifelse(ibis._.pet > 0, True, False),
is_summer_brazil=ibis._.month_of_absence.isin([1, 2, 12]),
)


@check_output(
schema=ibis.schema(
{
"has_children": "int",
"has_pet": "bool",
"is_summer_brazil": "bool",
"service_time": "int",
"seasons": "int",
"disciplinary_failure": "int",
"absenteeism_time_in_hours": "int",
}
),
importance="fail",
)
def feature_set(
feature_table: ir.Table,
feature_selection: list[str],
condition: Optional[ibis.common.deferred.Deferred] = None,
) -> ir.Table:
"""Select feature columns and filter rows"""
return feature_table[feature_selection].filter(condition)
Binary file added examples/ibis/tables.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading