Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples/ and docs/code-comparison Kedro #896

Merged
merged 11 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/code-comparisons/_kedro_snippets/hamilton_assemble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# run.py
from hamilton import driver
import dataflow # module containing node definitions

# pass the module to the `Builder` to create a `Driver`
dr = driver.Builder().with_modules(dataflow).build()
33 changes: 33 additions & 0 deletions docs/code-comparisons/_kedro_snippets/hamilton_definition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# dataflow.py
import pandas as pd

def _is_true(x: pd.Series) -> pd.Series:
return x == "t"

def companies_preprocessed(companies: pd.DataFrame) -> pd.DataFrame:
"""Companies with added column `iata_approved`"""
companies["iata_approved"] = _is_true(companies["iata_approved"])
return companies

def shuttles_preprocessed(shuttles: pd.DataFrame) -> pd.DataFrame:
"""Shuttles with added columns `d_check_complete`
and `moon_clearance_complete`."""
shuttles["d_check_complete"] = _is_true(
shuttles["d_check_complete"]
)
shuttles["moon_clearance_complete"] = _is_true(
shuttles["moon_clearance_complete"]
)
return shuttles

def model_input_table(
shuttles_preprocessed: pd.DataFrame,
companies_preprocessed: pd.DataFrame,
) -> pd.DataFrame:
"""Table containing shuttles and companies data."""
shuttles_preprocessed = shuttles_preprocessed.drop("id", axis=1)
model_input_table = shuttles_preprocessed.merge(
companies_preprocessed, left_on="company_id", right_on="id"
)
model_input_table = model_input_table.dropna()
return model_input_table
13 changes: 13 additions & 0 deletions docs/code-comparisons/_kedro_snippets/hamilton_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# run.py
import pandas as pd
from hamilton import driver
import dataflow

dr = driver.Builder().with_modules(dataflow).build()
# ^ from Step 2
inputs = dict(
companies=pd.read_parquet("path/to/companies.parquet"),
shuttles=pd.read_parquet("path/to/shuttles.parquet"),
)
results = dr.execute(["model_input_table"], inputs=inputs)
# results is a dict {"model_input_table": VALUE}
34 changes: 34 additions & 0 deletions docs/code-comparisons/_kedro_snippets/kedro_assemble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# pipeline.py
from kedro.pipeline import Pipeline, node, pipeline
from nodes import (
create_model_input_table,
preprocess_companies,
preprocess_shuttles
)

def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=preprocess_companies,
inputs="companies",
outputs="preprocessed_companies",
name="preprocess_companies_node",
),
node(
func=preprocess_shuttles,
inputs="shuttles",
outputs="preprocessed_shuttles",
name="preprocess_shuttles_node",
),
node(
func=create_model_input_table,
inputs=[
"preprocessed_shuttles",
"preprocessed_companies"
],
outputs="model_input_table",
name="create_model_input_table_node",
),
]
)
31 changes: 31 additions & 0 deletions docs/code-comparisons/_kedro_snippets/kedro_definition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# nodes.py
import pandas as pd

def _is_true(x: pd.Series) -> pd.Series:
return x == "t"

def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for companies."""
companies["iata_approved"] = _is_true(companies["iata_approved"])
return companies

def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for shuttles."""
shuttles["d_check_complete"] = _is_true(
shuttles["d_check_complete"]
)
shuttles["moon_clearance_complete"] = _is_true(
shuttles["moon_clearance_complete"]
)
return shuttles

def create_model_input_table(
shuttles: pd.DataFrame, companies: pd.DataFrame,
) -> pd.DataFrame:
"""Combines all data to create a model input table."""
shuttles = shuttles.drop("id", axis=1)
model_input_table = shuttles.merge(
companies, left_on="company_id", right_on="id"
)
model_input_table = model_input_table.dropna()
return model_input_table
16 changes: 16 additions & 0 deletions docs/code-comparisons/_kedro_snippets/kedro_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# run.py
import pandas as pd
from kedro.io import DataCatalog
from kedro.runner import SequentialRunner
from pipeline import create_pipeline
# ^ from Step 2

pipeline = create_pipeline().to_nodes("create_model_input_table")
catalog = DataCatalog(
feed_dict=dict(
companies=pd.read_parquet("path/to/companies.parquet"),
shuttles=pd.read_parquet("path/to/shuttles.parquet"),
),
)
SequentialRunner().run(pipeline, catalog)
# doesn't return a value in-memory
1 change: 1 addition & 0 deletions docs/code-comparisons/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Code Comparisons
This section showcases what Hamilton code looks like in comparison to other popular libraries and frameworks.

.. toctree::
kedro
dagster
langchain
airflow
Loading