diff --git a/examples/ibis/README.md b/examples/ibis/README.md index e69de29bb..75aeed32a 100644 --- a/examples/ibis/README.md +++ b/examples/ibis/README.md @@ -0,0 +1,38 @@ +# Ibis + Hamilton + +[Ibis](https://ibis-project.org/) is a portable dataframe library to write procedural data transformations in Python and be able to execute them directly on various SQL backends (DuckDB, Snowflake, Postgres, Flink, see [full list](https://ibis-project.org/support_matrix)). Hamilton provides a declarative way to define testable, modular, self-documenting dataflows, that encode lineage and metadata. + +In this example, we'll show how to get started with creating feature transformations and training a machine learning model. You'll learn about the basics of Ibis and IbisML and how they integrate with Hamilton. + + +![column-level feature engineering](./columns.png) + + +# Running the example +Follow these steps to get the example working: +1. create and activate virtual environment + + ```script + python -m venv venv & . venv/bin/activate + ``` +2. install requirements + + ```script + pip install -r requirements.txt + ``` + +3. execute the Hamilton feature engineering dataflow at the table or column level + + ```script + python run.py --level [table, column] + ``` + +# Files +- `table_dataflow.py` and `column_dataflow.py` include the same Ibis feature engineering dataflow, but with different level of granularity +- `tables.png` and `columns.png` were generated by Hamilton directly from the code. +- `ibis_feature_set.png` was generated by Ibis. It describes the atomic data transformations executed by the expression. + + +# Resources +- [Learn more about Hamilton + Ibis](https://hamilton.dagworks.io/en/latest/integrations/ibis/) +- [Ibis documentation](https://ibis-project.org/) diff --git a/examples/ibis/cross_validation.png b/examples/ibis/cross_validation.png deleted file mode 100644 index a92c9f364..000000000 Binary files a/examples/ibis/cross_validation.png and /dev/null differ diff --git a/examples/ibis/requirements.txt b/examples/ibis/requirements.txt index 30e38e573..54412d8cb 100644 --- a/examples/ibis/requirements.txt +++ b/examples/ibis/requirements.txt @@ -1,5 +1,3 @@ -ibiml ibis-framework[duckdb,examples] -scikit-learn sf-hamilton[visualization] tdqm diff --git a/examples/ibis/run.py b/examples/ibis/run.py index de71bcf9b..44f65c8bc 100644 --- a/examples/ibis/run.py +++ b/examples/ibis/run.py @@ -1,3 +1,7 @@ +from hamilton import driver +from hamilton.plugins.h_tqdm import ProgressBar + + def view_expression(expression, **kwargs): """View an Ibis expression @@ -11,39 +15,20 @@ def view_expression(expression, **kwargs): return dot -def main(level: str, model: str): - dataflow_components = [] - config = {} - final_vars = ["feature_set"] - +def main(level: str): if level == "column": import column_dataflow - dataflow_components.append(column_dataflow) + feature_dataflow = column_dataflow elif level == "table": import table_dataflow - dataflow_components.append(table_dataflow) + feature_dataflow = table_dataflow else: raise ValueError("`level` must be in ['column', 'table']") - if model: - import model_training - - dataflow_components.append(model_training) - config["model"] = model - final_vars.extend(["full_model", "fitted_recipe", "cross_validation_scores"]) - # build the Driver from modules - dr = ( - driver.Builder() - .with_modules(*dataflow_components) - .with_config(config) - .with_adapters(ProgressBar()) - .enable_dynamic_execution(allow_experimental_mode=True) - .with_remote_executor(SynchronousLocalTaskExecutor()) - .build() - ) + dr = driver.Builder().with_modules(feature_dataflow).with_adapters(ProgressBar()).build() inputs = dict( raw_data_path="../data_quality/simple/Absenteeism_at_work.csv", @@ -56,28 +41,20 @@ def main(level: str, model: str): "disciplinary_failure", "absenteeism_time_in_hours", ], - label="absenteeism_time_in_hours", - ) - dr.visualize_execution( - final_vars=final_vars, inputs=inputs, output_file_path="cross_validation.png" ) - res = dr.execute(final_vars, inputs=inputs) + res = dr.execute(["feature_set"], inputs=inputs) view_expression(res["feature_set"], filename="ibis_feature_set", format="png") - print(res.keys()) + print("Dataflow result keys: ", list(res.keys())) if __name__ == "__main__": import argparse - from hamilton import driver - from hamilton.execution.executors import SynchronousLocalTaskExecutor - from hamilton.plugins.h_tqdm import ProgressBar - parser = argparse.ArgumentParser() - parser.add_argument("--level", choices=["column", "table"]) - parser.add_argument("--model", choices=["linear", "random_forest", "boosting"]) + parser.add_argument("--level", choices=["column", "table"], default="table") args = parser.parse_args() - main(level=args.level, model=args.model) + print(f"Running dataflow at {args.level} level") + main(level=args.level) diff --git a/examples/ibis/table_dataflow.py b/examples/ibis/table_dataflow.py index e10d0c9de..a75d8ade1 100644 --- a/examples/ibis/table_dataflow.py +++ b/examples/ibis/table_dataflow.py @@ -3,9 +3,6 @@ import ibis import ibis.expr.types as ir -from hamilton.function_modifiers import check_output # noqa: F401 -from hamilton.plugins import ibis_extensions # noqa: F401 - def raw_table(raw_data_path: str) -> ir.Table: """Load CSV from `raw_data_path` into a Table expression @@ -25,20 +22,6 @@ def feature_table(raw_table: ir.Table) -> ir.Table: ) -@check_output( - schema=ibis.schema( - { - "has_children": "bool", - "has_pet": "bool", - "is_summer_brazil": "bool", - "service_time": "int", - "seasons": "int", - "disciplinary_failure": "int", - "absenteeism_time_in_hours": "int", - } - ), - importance="fail", -) def feature_set( feature_table: ir.Table, feature_selection: list[str], diff --git a/examples/ibisml/README.md b/examples/ibisml/README.md new file mode 100644 index 000000000..d6279e8e8 --- /dev/null +++ b/examples/ibisml/README.md @@ -0,0 +1 @@ +# IbisML + Hamilton [WIP] diff --git a/examples/ibisml/cross_validation.png b/examples/ibisml/cross_validation.png new file mode 100644 index 000000000..d44453340 Binary files /dev/null and b/examples/ibisml/cross_validation.png differ diff --git a/examples/ibis/model_training.py b/examples/ibisml/model_training.py similarity index 91% rename from examples/ibis/model_training.py rename to examples/ibisml/model_training.py index 5580bce5d..2beb019f3 100644 --- a/examples/ibis/model_training.py +++ b/examples/ibisml/model_training.py @@ -50,8 +50,11 @@ def data_split( """Generate indices to create train/validation splits n times""" folds = KFold(n_splits=n_splits) idx = list(range(feature_set.count().execute())) + feature_set = feature_set.mutate(idx=ibis.row_number()) for train_idx, val_idx in folds.split(idx): - yield train_idx, val_idx + train_set = feature_set.filter(ibis._.idx.isin(train_idx)) + val_set = feature_set.filter(ibis._.idx.isin(val_idx)) + yield train_set, val_set @extract_fields( @@ -69,22 +72,18 @@ def prepare_data( preprocessing_recipe: ibisml.Recipe, ) -> dict: """Split data and apply preprocessing recipe""" - train_idx, val_idx = data_split + train_set, val_set = data_split # add temporary idx column for train/val splits - feature_set = feature_set.mutate(idx=ibis.row_number()) - train_set = feature_set.filter(ibis._.idx.isin(train_idx)) - val_set = feature_set.filter(ibis._.idx.isin(val_idx)) - transform = preprocessing_recipe.fit(train_set, outcomes=[label]) train = transform(train_set) df_train = train.to_pandas() X_train = df_train[train.features] - y_train = df_train[train.outcomes] + y_train = df_train[train.outcomes].to_numpy().reshape(-1) df_test = transform(val_set).to_pandas() X_val = df_test[train.features] - y_val = df_test[train.outcomes] + y_val = df_test[train.outcomes].to_numpy().reshape(-1) return dict( X_train=X_train, @@ -103,7 +102,6 @@ def cross_validation_fold( data_split: tuple, ) -> dict: """Train model and make predictions on validation""" - train_idx, val_idx = data_split model = clone(base_model) model.fit(X_train, y_train) @@ -111,7 +109,7 @@ def cross_validation_fold( y_val_pred = model.predict(X_val) score = mean_squared_error(y_val, y_val_pred) - return dict(id=val_idx, y_true=y_val, y_pred=y_val_pred, score=score) + return dict(y_true=y_val, y_pred=y_val_pred, score=score) @extract_fields( @@ -163,7 +161,7 @@ def train_full_model( data = transform(feature_set) df = data.to_pandas() X = df[data.features] - y = df[data.outcomes] + y = df[data.outcomes].to_numpy().reshape(-1) base_model.fit(X, y) return dict( diff --git a/examples/ibisml/requirements.txt b/examples/ibisml/requirements.txt new file mode 100644 index 000000000..532313658 --- /dev/null +++ b/examples/ibisml/requirements.txt @@ -0,0 +1,5 @@ +ibis-framework[duckdb,examples] +ibisml +scikit-learn +sf-hamilton[visualization] +tdqm diff --git a/examples/ibisml/run.py b/examples/ibisml/run.py new file mode 100644 index 000000000..6822c6775 --- /dev/null +++ b/examples/ibisml/run.py @@ -0,0 +1,66 @@ +from hamilton import driver +from hamilton.execution.executors import SynchronousLocalTaskExecutor +from hamilton.plugins.h_tqdm import ProgressBar + + +def view_expression(expression, **kwargs): + """View an Ibis expression + + see graphviz reference for `.render()` kwargs + ref: https://graphviz.readthedocs.io/en/stable/api.html#graphviz.Graph.render + """ + import ibis.expr.visualize as viz + + dot = viz.to_graph(expression) + dot.render(**kwargs) + return dot + + +def main(model: str): + import model_training + import table_dataflow + + config = {"model": model} + final_vars = ["full_model", "fitted_recipe", "cross_validation_scores"] + + # build the Driver from modules + dr = ( + driver.Builder() + .with_modules(table_dataflow, model_training) + .with_config(config) + .with_adapters(ProgressBar()) + .enable_dynamic_execution(allow_experimental_mode=True) + .with_remote_executor(SynchronousLocalTaskExecutor()) + .build() + ) + + inputs = dict( + raw_data_path="../data_quality/simple/Absenteeism_at_work.csv", + feature_selection=[ + "has_children", + "has_pet", + "is_summer_brazil", + "service_time", + "seasons", + "disciplinary_failure", + "absenteeism_time_in_hours", + ], + label="absenteeism_time_in_hours", + ) + dr.visualize_execution( + final_vars=final_vars, inputs=inputs, output_file_path="cross_validation.png" + ) + + res = dr.execute(final_vars, inputs=inputs) + + print("Dataflow result keys: ", list(res.keys())) + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("--model", choices=["linear", "random_forest", "boosting"]) + args = parser.parse_args() + + main(model=args.model) diff --git a/examples/ibisml/table_dataflow.py b/examples/ibisml/table_dataflow.py new file mode 100644 index 000000000..a75d8ade1 --- /dev/null +++ b/examples/ibisml/table_dataflow.py @@ -0,0 +1,31 @@ +from typing import Optional + +import ibis +import ibis.expr.types as ir + + +def raw_table(raw_data_path: str) -> ir.Table: + """Load CSV from `raw_data_path` into a Table expression + and format column names to snakecase + """ + return ibis.read_csv(sources=raw_data_path, table_name="absenteism").rename("snake_case") + + +def feature_table(raw_table: ir.Table) -> ir.Table: + """Add to `raw_table` the feature columns `has_children` + `has_pet`, and `is_summer_brazil` + """ + return raw_table.mutate( + has_children=(ibis.ifelse(ibis._.son > 0, True, False)), + has_pet=ibis.ifelse(ibis._.pet > 0, True, False), + is_summer_brazil=ibis._.month_of_absence.isin([1, 2, 12]), + ) + + +def feature_set( + feature_table: ir.Table, + feature_selection: list[str], + condition: Optional[ibis.common.deferred.Deferred] = None, +) -> ir.Table: + """Select feature columns and filter rows""" + return feature_table[feature_selection].filter(condition)