Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split content of examples/ibis into examples/ibisml #741

Merged
merged 4 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions examples/ibis/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Ibis + Hamilton

[Ibis](https://ibis-project.org/) is a portable dataframe library to write procedural data transformations in Python and be able to execute them directly on various SQL backends (DuckDB, Snowflake, Postgres, Flink, see [full list](https://ibis-project.org/support_matrix)). Hamilton provides a declarative way to define testable, modular, self-documenting dataflows, that encode lineage and metadata.

In this example, we'll show how to get started with creating feature transformations and training a machine learning model. You'll learn about the basics of Ibis and IbisML and how they integrate with Hamilton.


![column-level feature engineering](./columns.png)


# Running the example
Follow these steps to get the example working:
1. create and activate virtual environment

```script
python -m venv venv & . venv/bin/activate
```
2. install requirements

```script
pip install -r requirements.txt
```

3. execute the Hamilton feature engineering dataflow at the table or column level

```script
python run.py --level [table, column]
```

# Files
- `table_dataflow.py` and `column_dataflow.py` include the same Ibis feature engineering dataflow, but with different level of granularity
- `tables.png` and `columns.png` were generated by Hamilton directly from the code.
- `ibis_feature_set.png` was generated by Ibis. It describes the atomic data transformations executed by the expression.


# Resources
- [Learn more about Hamilton + Ibis](https://hamilton.dagworks.io/en/latest/integrations/ibis/)
- [Ibis documentation](https://ibis-project.org/)
Binary file removed examples/ibis/cross_validation.png
Binary file not shown.
2 changes: 0 additions & 2 deletions examples/ibis/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
ibiml
ibis-framework[duckdb,examples]
scikit-learn
sf-hamilton[visualization]
tdqm
49 changes: 13 additions & 36 deletions examples/ibis/run.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from hamilton import driver
from hamilton.plugins.h_tqdm import ProgressBar


def view_expression(expression, **kwargs):
"""View an Ibis expression

Expand All @@ -11,39 +15,20 @@ def view_expression(expression, **kwargs):
return dot


def main(level: str, model: str):
dataflow_components = []
config = {}
final_vars = ["feature_set"]

def main(level: str):
if level == "column":
import column_dataflow

dataflow_components.append(column_dataflow)
feature_dataflow = column_dataflow
elif level == "table":
import table_dataflow

dataflow_components.append(table_dataflow)
feature_dataflow = table_dataflow
else:
raise ValueError("`level` must be in ['column', 'table']")

if model:
import model_training

dataflow_components.append(model_training)
config["model"] = model
final_vars.extend(["full_model", "fitted_recipe", "cross_validation_scores"])

# build the Driver from modules
dr = (
driver.Builder()
.with_modules(*dataflow_components)
.with_config(config)
.with_adapters(ProgressBar())
.enable_dynamic_execution(allow_experimental_mode=True)
.with_remote_executor(SynchronousLocalTaskExecutor())
.build()
)
dr = driver.Builder().with_modules(feature_dataflow).with_adapters(ProgressBar()).build()

inputs = dict(
raw_data_path="../data_quality/simple/Absenteeism_at_work.csv",
Expand All @@ -56,28 +41,20 @@ def main(level: str, model: str):
"disciplinary_failure",
"absenteeism_time_in_hours",
],
label="absenteeism_time_in_hours",
)
dr.visualize_execution(
final_vars=final_vars, inputs=inputs, output_file_path="cross_validation.png"
)

res = dr.execute(final_vars, inputs=inputs)
res = dr.execute(["feature_set"], inputs=inputs)
view_expression(res["feature_set"], filename="ibis_feature_set", format="png")

print(res.keys())
print("Dataflow result keys: ", list(res.keys()))


if __name__ == "__main__":
import argparse

from hamilton import driver
from hamilton.execution.executors import SynchronousLocalTaskExecutor
from hamilton.plugins.h_tqdm import ProgressBar

parser = argparse.ArgumentParser()
parser.add_argument("--level", choices=["column", "table"])
parser.add_argument("--model", choices=["linear", "random_forest", "boosting"])
parser.add_argument("--level", choices=["column", "table"], default="table")
args = parser.parse_args()

main(level=args.level, model=args.model)
print(f"Running dataflow at {args.level} level")
main(level=args.level)
17 changes: 0 additions & 17 deletions examples/ibis/table_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
import ibis
import ibis.expr.types as ir

from hamilton.function_modifiers import check_output # noqa: F401
from hamilton.plugins import ibis_extensions # noqa: F401


def raw_table(raw_data_path: str) -> ir.Table:
"""Load CSV from `raw_data_path` into a Table expression
Expand All @@ -25,20 +22,6 @@ def feature_table(raw_table: ir.Table) -> ir.Table:
)


@check_output(
schema=ibis.schema(
{
"has_children": "bool",
"has_pet": "bool",
"is_summer_brazil": "bool",
"service_time": "int",
"seasons": "int",
"disciplinary_failure": "int",
"absenteeism_time_in_hours": "int",
}
),
importance="fail",
)
def feature_set(
feature_table: ir.Table,
feature_selection: list[str],
Expand Down
1 change: 1 addition & 0 deletions examples/ibisml/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# IbisML + Hamilton [WIP]
Binary file added examples/ibisml/cross_validation.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ def data_split(
"""Generate indices to create train/validation splits n times"""
folds = KFold(n_splits=n_splits)
idx = list(range(feature_set.count().execute()))
feature_set = feature_set.mutate(idx=ibis.row_number())
for train_idx, val_idx in folds.split(idx):
yield train_idx, val_idx
train_set = feature_set.filter(ibis._.idx.isin(train_idx))
val_set = feature_set.filter(ibis._.idx.isin(val_idx))
yield train_set, val_set


@extract_fields(
Expand All @@ -69,22 +72,18 @@ def prepare_data(
preprocessing_recipe: ibisml.Recipe,
) -> dict:
"""Split data and apply preprocessing recipe"""
train_idx, val_idx = data_split
train_set, val_set = data_split
# add temporary idx column for train/val splits
feature_set = feature_set.mutate(idx=ibis.row_number())
train_set = feature_set.filter(ibis._.idx.isin(train_idx))
val_set = feature_set.filter(ibis._.idx.isin(val_idx))

transform = preprocessing_recipe.fit(train_set, outcomes=[label])

train = transform(train_set)
df_train = train.to_pandas()
X_train = df_train[train.features]
y_train = df_train[train.outcomes]
y_train = df_train[train.outcomes].to_numpy().reshape(-1)

df_test = transform(val_set).to_pandas()
X_val = df_test[train.features]
y_val = df_test[train.outcomes]
y_val = df_test[train.outcomes].to_numpy().reshape(-1)

return dict(
X_train=X_train,
Expand All @@ -103,15 +102,14 @@ def cross_validation_fold(
data_split: tuple,
) -> dict:
"""Train model and make predictions on validation"""
train_idx, val_idx = data_split
model = clone(base_model)

model.fit(X_train, y_train)

y_val_pred = model.predict(X_val)
score = mean_squared_error(y_val, y_val_pred)

return dict(id=val_idx, y_true=y_val, y_pred=y_val_pred, score=score)
return dict(y_true=y_val, y_pred=y_val_pred, score=score)


@extract_fields(
Expand Down Expand Up @@ -163,7 +161,7 @@ def train_full_model(
data = transform(feature_set)
df = data.to_pandas()
X = df[data.features]
y = df[data.outcomes]
y = df[data.outcomes].to_numpy().reshape(-1)

base_model.fit(X, y)
return dict(
Expand Down
5 changes: 5 additions & 0 deletions examples/ibisml/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ibis-framework[duckdb,examples]
ibisml
scikit-learn
sf-hamilton[visualization]
tdqm
66 changes: 66 additions & 0 deletions examples/ibisml/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from hamilton import driver
from hamilton.execution.executors import SynchronousLocalTaskExecutor
from hamilton.plugins.h_tqdm import ProgressBar


def view_expression(expression, **kwargs):
"""View an Ibis expression

see graphviz reference for `.render()` kwargs
ref: https://graphviz.readthedocs.io/en/stable/api.html#graphviz.Graph.render
"""
import ibis.expr.visualize as viz

dot = viz.to_graph(expression)
dot.render(**kwargs)
return dot


def main(model: str):
import model_training
import table_dataflow

config = {"model": model}
final_vars = ["full_model", "fitted_recipe", "cross_validation_scores"]

# build the Driver from modules
dr = (
driver.Builder()
.with_modules(table_dataflow, model_training)
.with_config(config)
.with_adapters(ProgressBar())
.enable_dynamic_execution(allow_experimental_mode=True)
.with_remote_executor(SynchronousLocalTaskExecutor())
.build()
)

inputs = dict(
raw_data_path="../data_quality/simple/Absenteeism_at_work.csv",
feature_selection=[
"has_children",
"has_pet",
"is_summer_brazil",
"service_time",
"seasons",
"disciplinary_failure",
"absenteeism_time_in_hours",
],
label="absenteeism_time_in_hours",
)
dr.visualize_execution(
final_vars=final_vars, inputs=inputs, output_file_path="cross_validation.png"
)

res = dr.execute(final_vars, inputs=inputs)

print("Dataflow result keys: ", list(res.keys()))


if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--model", choices=["linear", "random_forest", "boosting"])
args = parser.parse_args()

main(model=args.model)
31 changes: 31 additions & 0 deletions examples/ibisml/table_dataflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Optional

import ibis
import ibis.expr.types as ir


def raw_table(raw_data_path: str) -> ir.Table:
"""Load CSV from `raw_data_path` into a Table expression
and format column names to snakecase
"""
return ibis.read_csv(sources=raw_data_path, table_name="absenteism").rename("snake_case")


def feature_table(raw_table: ir.Table) -> ir.Table:
"""Add to `raw_table` the feature columns `has_children`
`has_pet`, and `is_summer_brazil`
"""
return raw_table.mutate(
has_children=(ibis.ifelse(ibis._.son > 0, True, False)),
has_pet=ibis.ifelse(ibis._.pet > 0, True, False),
is_summer_brazil=ibis._.month_of_absence.isin([1, 2, 12]),
)


def feature_set(
feature_table: ir.Table,
feature_selection: list[str],
condition: Optional[ibis.common.deferred.Deferred] = None,
) -> ir.Table:
"""Select feature columns and filter rows"""
return feature_table[feature_selection].filter(condition)