Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples/ibis and Ibis plugin #725

Merged
merged 13 commits into from
Mar 5, 2024
Empty file added examples/ibis/README.md
Empty file.
89 changes: 89 additions & 0 deletions examples/ibis/column_dataflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from typing import Optional

import ibis
import ibis.expr.types as ir

from hamilton.function_modifiers import extract_columns
from hamilton.plugins import ibis_extensions # noqa: F401


# extract specific columns from the table
@extract_columns("son", "pet", "month_of_absence")
def raw_table(raw_data_path: str) -> ir.Table:
"""Load the CSV found at `raw_data_path` into a Table expression
and format columns to snakecase
"""
return ibis.read_csv(sources=raw_data_path, table_name="absenteism").rename("snake_case")


# accesses a single column from `raw_table`
def has_children(son: ir.Column) -> ir.BooleanColumn:
"""True if someone has any children"""
return ibis.ifelse(son > 0, True, False)


# narrows the return type from `ir.Column` to `ir.BooleanColumn`
def has_pet(pet: ir.Column) -> ir.BooleanColumn:
"""True if someone has any pets"""
return ibis.ifelse(pet > 0, True, False).cast(bool)


# typing and docstring provides business context to features
def is_summer_brazil(month_of_absence: ir.Column) -> ir.BooleanColumn:
"""True if it is summer in Brazil during this month

People in the northern hemisphere are likely to take vacations
to warm places when it's cold locally
"""
return month_of_absence.isin([1, 2, 12])


def feature_table(
raw_table: ir.Table,
has_children: ir.BooleanColumn,
has_pet: ir.BooleanColumn,
is_summer_brazil: ir.BooleanColumn,
) -> ir.Table:
"""Join computed features to the `raw_data` table"""
return raw_table.mutate(
has_children=has_children,
has_pet=has_pet,
is_summer_brazil=is_summer_brazil,
)


def feature_set(
feature_table: ir.Table,
feature_selection: list[str],
condition: Optional[ibis.common.deferred.Deferred] = None,
) -> ir.Table:
"""Select feature columns and filter rows"""
return feature_table[feature_selection].filter(condition)


if __name__ == "__main__":
import __main__

from hamilton import driver

dr = driver.Builder().with_modules(__main__).build()
inputs = dict(
raw_data_path="../data_quality/simple/Absenteeism_at_work.csv",
feature_selection=[
"id",
"has_children",
"has_pet",
"is_summer_brazil",
"service_time",
"seasons",
"disciplinary_failure",
"absenteeism_time_in_hours",
],
condition=ibis.ifelse(ibis._.has_pet == 1, True, False),
)

res = dr.execute(["feature_set"], inputs=inputs)

df = res["feature_set"].to_pandas()
print(df.head())
print(df.shape)
Binary file added examples/ibis/columns.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added examples/ibis/cross_validation.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added examples/ibis/full_dataflow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
205 changes: 205 additions & 0 deletions examples/ibis/model_training.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
import ibis
import ibis.expr.types as ir
import ibisml
import pandas as pd
from sklearn.base import BaseEstimator, clone
from sklearn.ensemble import HistGradientBoostingRegressor, RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import KFold

from hamilton.function_modifiers import config, extract_fields
from hamilton.htypes import Collect, Parallelizable


@config.when(model="linear")
def base_model__linear() -> BaseEstimator:
return LinearRegression()


@config.when(model="random_forest")
def base_model__random_forest() -> BaseEstimator:
return RandomForestRegressor()


@config.when(model="boosting")
def base_model__boosting() -> BaseEstimator:
return HistGradientBoostingRegressor()


def preprocessing_recipe() -> ibisml.Recipe:
return ibisml.Recipe(
ibisml.Drop(["id"]),
ibisml.ImputeMean(ibisml.numeric()),
ibisml.ScaleStandard(ibisml.numeric()),
ibisml.OneHotEncode(ibisml.nominal()),
)


def data_split(
feature_set: ir.Table,
n_splits: int = 3,
) -> Parallelizable[tuple]:
ids = feature_set.id.to_pandas()
folds = KFold(n_splits=n_splits)
for train_idx, val_idx in folds.split(ids):
yield train_idx, val_idx


@extract_fields(
dict(
X_train=pd.DataFrame,
X_val=pd.DataFrame,
y_train=pd.DataFrame,
y_val=pd.DataFrame,
)
)
def prepare_data(
feature_set: ir.Table,
label: str,
data_split: tuple,
preprocessing_recipe: ibisml.Recipe,
) -> dict:
train_idx, val_idx = data_split

train_set = feature_set # .filter(ibis._.id.isin(train_idx))
val_set = feature_set # .filter(ibis._.id.isin(val_idx))

transform = preprocessing_recipe.fit(train_set, outcomes=[label])

train = transform(train_set)
df_train = train.to_pandas()
X_train = df_train[train.features]
y_train = df_train[train.outcomes]

df_test = transform(val_set).to_pandas()
X_val = df_test[train.features]
y_val = df_test[train.outcomes]

return dict(
X_train=X_train,
y_train=y_train,
X_val=X_val,
y_val=y_val,
)


def cross_validation_fold(
X_train: pd.DataFrame,
X_val: pd.DataFrame,
y_train: pd.DataFrame,
y_val: pd.DataFrame,
base_model: BaseEstimator,
data_split: tuple,
) -> dict:
train_idx, val_idx = data_split
model = clone(base_model)

model.fit(X_train, y_train)

y_val_pred = model.predict(X_val)
score = mean_squared_error(y_val, y_val_pred)

return dict(id=val_idx, y_true=y_val, y_pred=y_val_pred, score=score)


@extract_fields(
dict(
cross_validation_scores=list[float],
cross_validation_preds=list[dict],
)
)
def cross_validation_fold_collection(cross_validation_fold: Collect[dict]) -> dict:
scores, preds = [], []
for fold in cross_validation_fold:
scores.append(fold.pop("score"))
preds.append(fold)
return dict(
cross_validation_scores=scores,
cross_validation_preds=preds,
)


def prediction_table(cross_validation_preds: list[dict]) -> ir.Table:
return ibis.memtable(cross_validation_preds)


def store_predictions(prediction_table: ir.Table) -> bool:
return True


@extract_fields(
dict(
full_model=BaseEstimator,
fitted_recipe=ibisml.RecipeTransform,
)
)
def train_full_model(
feature_set: ir.Table,
label: str,
preprocessing_recipe: ibisml.Recipe,
base_model: BaseEstimator,
) -> dict:
transform = preprocessing_recipe.fit(feature_set, outcomes=[label])

data = transform(feature_set)
df = data.to_pandas()
X = df[data.features]
y = df[data.outcomes]

base_model.fit(X, y)
return dict(
full_model=base_model,
fitted_recipe=transform,
)


if __name__ == "__main__":
import model_training
import table_dataflow

from hamilton import driver
from hamilton.execution.executors import SynchronousLocalTaskExecutor

dr = (
driver.Builder()
.enable_dynamic_execution(allow_experimental_mode=True)
.with_modules(model_training, table_dataflow)
.with_remote_executor(SynchronousLocalTaskExecutor())
.with_config(dict(model="linear"))
.build()
)

inputs = dict(
raw_data_path="../data_quality/simple/Absenteeism_at_work.csv",
feature_selection=[
"id",
"has_children",
"has_pet",
"is_summer_brazil",
"service_time",
"seasons",
"disciplinary_failure",
"absenteeism_time_in_hours",
],
condition=ibis.ifelse(ibis._.has_pet == 1, True, False),
label="absenteeism_time_in_hours",
)
dr.visualize_execution(
final_vars=[
"cross_validation_scores",
"cross_validation_preds",
"full_model",
"fitted_recipe",
],
output_file_path="cross_validation.png",
inputs=inputs,
)
final_vars = ["cross_validation_scores"]

res = dr.execute(final_vars, inputs=inputs)

# df = res["feature_set"].to_pandas()
print(res["cross_validation_scores"]) # .to_pandas())
breakpoint()
print()
3 changes: 3 additions & 0 deletions examples/ibis/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ibis-framework[duckdb,examples]
scikit-learn
sf-hamilton[visualization]
69 changes: 69 additions & 0 deletions examples/ibis/table_dataflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from typing import Optional

import ibis
import ibis.expr.types as ir

from hamilton.function_modifiers import check_output # noqa: F401
from hamilton.plugins import ibis_extensions # noqa: F401


def raw_table(raw_data_path: str) -> ir.Table:
"""Load CSV from `raw_data_path` into a Table expression
and format column names to snakecase
"""
return ibis.read_csv(sources=raw_data_path, table_name="absenteism").rename("snake_case")


# @check_output(
# schema=ibis.schema(
# [("has_children", "int"), ("has_pet", "bool")]
# )
# )
def feature_table(raw_table: ir.Table) -> ir.Table:
"""Add to `raw_table` the feature columns `has_children`
`has_pet`, and `is_summer_brazil`
"""
return raw_table.mutate(
has_children=(ibis.ifelse(ibis._.son > 0, True, False)),
has_pet=ibis.ifelse(ibis._.pet > 0, True, False),
is_summer_brazil=ibis._.month_of_absence.isin([1, 2, 12]),
)


def feature_set(
feature_table: ir.Table,
feature_selection: list[str],
condition: Optional[ibis.common.deferred.Deferred] = None,
) -> ir.Table:
"""Select feature columns and filter rows"""
return feature_table[feature_selection].filter(condition)


if __name__ == "__main__":
import __main__
zilto marked this conversation as resolved.
Show resolved Hide resolved

from hamilton import driver

dr = driver.Builder().with_modules(__main__).build()

inputs = dict(
raw_data_path="../data_quality/simple/Absenteeism_at_work.csv",
feature_selection=[
"id",
"has_children",
"has_pet",
"is_summer_brazil",
"service_time",
"seasons",
"disciplinary_failure",
"absenteeism_time_in_hours",
],
condition=ibis.ifelse(ibis._.has_pet == 1, True, False),
)
dr.display_all_functions("schema.png", show_schema=True)

res = dr.execute(["feature_set"], inputs=inputs)
breakpoint()
df = res["feature_set"].to_pandas()
print(df.head())
print(df.shape)
Binary file added examples/ibis/tables.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading