-
Notifications
You must be signed in to change notification settings - Fork 133
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This shows how you can construct a pipeline from components and then use subdag to parameterize it for reuse.
- Loading branch information
Showing
8 changed files
with
441 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
# Modular pipeline example | ||
|
||
In this example we show how you can compose a pipeline from multiple modules. | ||
This is a common pattern in Hamilton, where you can define a module that encapsulates | ||
a set of "assets" and then use that module in a parameterized manner. | ||
|
||
The use case here is that: | ||
|
||
1. we have common data/feature engineering code. | ||
2. we have a training set that creates a model | ||
3. we have an inference step that given a model and a dataset, predicts the outcome on that dataset. | ||
|
||
With these 3 things we want to create a single pipeline that: | ||
|
||
1. trains a model and predicts on the training set. | ||
2. uses that trained model to then predict on a separate dataset. | ||
|
||
We do this by creating our base components: | ||
|
||
1. Creating a module that contains the common data/feature engineering code. | ||
2. Creating a module that trains a model. | ||
3. Creating a module that predicts on a dataset. | ||
|
||
We can then create two pipelines that use these modules in different ways: | ||
|
||
1. For training and predicting on the training set we use all 3 modules. | ||
2. For predicting on a separate dataset we use only the feature engineering module and the prediction module. | ||
3. We wire the two together so that the trained model then gets used in the prediction step for the separate dataset. | ||
|
||
By using `@subdag` we namespace the reuse of the modules and that's how we can | ||
reuse the same functions in different pipelines. | ||
|
||
See: | ||
![single_pipeline](my_dag.png) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
import pandas as pd | ||
|
||
|
||
def raw_data(path: str) -> pd.DataFrame: | ||
return pd.read_csv(path) | ||
|
||
|
||
def transformed_data(raw_data: pd.DataFrame) -> pd.DataFrame: | ||
return raw_data.dropna() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from typing import Any | ||
|
||
import pandas as pd | ||
|
||
|
||
def predicted_data(transformed_data: pd.DataFrame, fit_model: Any) -> pd.DataFrame: | ||
return fit_model.predict(transformed_data) |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
from typing import Any | ||
|
||
import features | ||
import inference | ||
import pandas as pd | ||
import train | ||
|
||
from hamilton.function_modifiers import extract_fields, source, subdag | ||
|
||
|
||
@extract_fields({"fit_model": Any, "training_prediction": pd.DataFrame}) | ||
@subdag( | ||
features, | ||
train, | ||
inference, | ||
inputs={ | ||
"path": source("path"), | ||
"model_params": source("model_params"), | ||
}, | ||
config={ | ||
"model": source("model"), # not strictly required but allows us to remap. | ||
}, | ||
) | ||
def trained_pipeline(fit_model: Any, predicted_data: pd.DataFrame) -> dict: | ||
return {"fit_model": fit_model, "training_prediction": predicted_data} | ||
|
||
|
||
@subdag( | ||
features, | ||
inference, | ||
inputs={ | ||
"path": source("predict_path"), | ||
"fit_model": source("fit_model"), | ||
}, | ||
) | ||
def predicted_data(predicted_data: pd.DataFrame) -> pd.DataFrame: | ||
return predicted_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
import pipeline | ||
|
||
from hamilton import driver | ||
|
||
|
||
def run(): | ||
dr = ( | ||
driver.Builder() | ||
.with_config({"model": "RandomForest", "model_params": {"n_estimators": 100}}) | ||
.with_modules(pipeline) | ||
.build() | ||
) | ||
dr.display_all_functions("./my_dag.png") | ||
# dr.execute(["trained_pipeline", "predicted_data"]) | ||
|
||
|
||
if __name__ == "__main__": | ||
run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
from typing import Any | ||
|
||
import pandas as pd | ||
|
||
from hamilton.function_modifiers import config | ||
|
||
|
||
@config.when(model="RandomForest") | ||
def base_model__rf(model_params: dict) -> Any: | ||
from sklearn.ensemble import RandomForestClassifier | ||
|
||
return RandomForestClassifier(**model_params) | ||
|
||
|
||
@config.when(model="LogisticRegression") | ||
def base_model__lr(model_params: dict) -> Any: | ||
from sklearn.linear_model import LogisticRegression | ||
|
||
return LogisticRegression(**model_params) | ||
|
||
|
||
@config.when(model="XGBoost") | ||
def base_model__xgb(model_params: dict) -> Any: | ||
from xgboost import XGBClassifier | ||
|
||
return XGBClassifier(**model_params) | ||
|
||
|
||
def fit_model(transformed_data: pd.DataFrame, base_model: Any) -> Any: | ||
"""Fit a model to transformed data.""" | ||
base_model.fit(transformed_data.drop("target", axis=1), transformed_data["target"]) | ||
return base_model |