Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use better kind api #129

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions hooli-data-ingest/hooli_data_ingest/assets/sling.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ def get_group_name(self, stream_definition):

def get_tags(self, stream_definition):
# derive storage_kind from the target set in the replication_config
storage_kind = self.replication_config.get("target", "DUCKDB")
if storage_kind.startswith("SNOWFLAKE"):
storage_kind = "SNOWFLAKE"
return {**build_kind_tag(storage_kind)}
kinds = self.replication_config.get("target", "DUCKDB")
if kinds.startswith("SNOWFLAKE"):
kinds = "SNOWFLAKE"
return {**build_kind_tag(kinds)}


@sling_assets(
Expand Down
15 changes: 3 additions & 12 deletions hooli_basics/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
from sklearn.linear_model import LinearRegression as Regression

@asset(
tags={
**build_kind_tag("Kubernetes"),
**build_kind_tag("S3"),
},
kinds={"Kubernetes", "S3"},
)
def country_stats() -> DataFrame:
df = read_html("https://tinyurl.com/mry64ebh", flavor='html5lib')[0]
Expand All @@ -40,21 +37,15 @@ def check_country_stats(country_stats):
return AssetCheckResult(passed=True)

@asset(
tags={
**build_kind_tag("Kubernetes"),
**build_kind_tag("S3"),
},
kinds={"Kubernetes", "S3"}
)
def change_model(country_stats: DataFrame) -> Regression:
data = country_stats.dropna(subset=["pop_change"])
dummies = get_dummies(data[["continent"]])
return Regression().fit(dummies, data["pop_change"])

@asset(
tags={
**build_kind_tag("Kubernetes"),
**build_kind_tag("S3"),
},
kinds={"Kubernetes", "S3"}
)
def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataFrame:
result = country_stats.groupby("continent").sum()
Expand Down
5 changes: 1 addition & 4 deletions hooli_batch_enrichment/dagster_batch_enrichment/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ class experimentConfig(Config):
)

@asset(
tags={
**build_kind_tag("Kubernetes"),
**build_kind_tag("S3"),
},
kinds={"Kubernetes", "S3"},
)
def raw_data(
context: OpExecutionContext,
Expand Down
32 changes: 8 additions & 24 deletions hooli_data_eng/assets/forecasting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
from databricks.sdk.service import jobs
from pydantic import Field

from hooli_data_eng.utils.storage_kind_helpers import get_storage_kind
from hooli_data_eng.utils.kind_helpers import get_kind


# dynamically determine storage_kind based on environment
storage_kind = get_storage_kind()
storage_kind = get_kind()


def model_func(x, a, b):
Expand Down Expand Up @@ -63,10 +63,7 @@ class modelHyperParams(Config):
@asset(
ins={"weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"])},
io_manager_key="model_io_manager",
tags={
**build_kind_tag("scikitlearn"),
**build_kind_tag("s3"),
},
kinds={"scikitlearn", "S3"}
)
def order_forecast_model(
context, weekly_order_summary: pd.DataFrame, config: modelHyperParams
Expand Down Expand Up @@ -102,9 +99,8 @@ def order_forecast_model(
partitions_def=MonthlyPartitionsDefinition(start_date="2022-01-01"),
tags={
"core_kpis":"",
**build_kind_tag("scikitlearn"),
**build_kind_tag(storage_kind),
},
kinds={"scikitlearn", storage_kind}
)
def model_stats_by_month(
context,
Expand Down Expand Up @@ -142,10 +138,7 @@ def model_stats_by_month(
"order_forecast_model": AssetIn(),
},
key_prefix=["FORECASTING"],
tags={
**build_kind_tag("pandas"),
**build_kind_tag(storage_kind),
},
kinds={"pandas", storage_kind}
)
def predicted_orders(
weekly_order_summary: pd.DataFrame, order_forecast_model: Tuple[float, float]
Expand All @@ -172,10 +165,7 @@ def predicted_orders(
key_prefix=["FORECASTING"],
required_resource_keys={"step_launcher", "pyspark"},
metadata={"resource_constrained_at": 50},
tags={
**build_kind_tag("pyspark"),
**build_kind_tag("databricks"),
},
kinds={"pyspark", "databricks"},
)
def big_orders(context, predicted_orders: pd.DataFrame):
"""Days where predicted orders surpass our current carrying capacity"""
Expand Down Expand Up @@ -204,10 +194,7 @@ def big_orders(context, predicted_orders: pd.DataFrame):
# or use that upstream Snowflake table, it is used here for illustrative purposes
@asset(
deps=[predicted_orders],
tags={
**build_kind_tag("pyspark"),
**build_kind_tag("databricks"),
},
kinds={"pyspark", "databricks"},
)
def databricks_asset(
context: AssetExecutionContext,
Expand Down Expand Up @@ -255,10 +242,7 @@ def databricks_asset(
# or use that upstream Snowflake table, it is used here for illustrative purposes
@asset(
deps=[predicted_orders],
tags={
**build_kind_tag("kubernetes"),
**build_kind_tag("S3"),
},
kinds={"kubernetes", "S3"},
)
def k8s_pod_asset(
context: AssetExecutionContext,
Expand Down
25 changes: 8 additions & 17 deletions hooli_data_eng/assets/marketing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@
AssetKey,
define_asset_job,
ScheduleDefinition,
AssetSelection
)
from dagster._core.definitions.tags import build_kind_tag
AssetSelection,
EnvVar,)
from dagster_snowflake import SnowflakeResource
from dagster_cloud.anomaly_detection import build_anomaly_detection_freshness_checks
import pandas as pd
from hooli_data_eng.utils.storage_kind_helpers import get_storage_kind
from hooli_data_eng.utils.kind_helpers import get_kind


# dynamically determine storage_kind based on environment
storage_kind = get_storage_kind()
storage_kind = get_kind()


# These assets take data from a SQL table managed by
Expand All @@ -33,10 +33,7 @@
automation_condition=AutomationCondition.on_cron('0 0 1-31/2 * *'),
owners=["team:programmers", "lopp@dagsterlabs.com"],
ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])},
tags={
**build_kind_tag("pandas"),
**build_kind_tag(storage_kind),
},
kinds={"pandas", storage_kind},
)
def avg_orders(
context: AssetExecutionContext, company_perf: pd.DataFrame
Expand All @@ -61,10 +58,7 @@ def check_avg_orders(context, avg_orders: pd.DataFrame):
key_prefix="MARKETING",
owners=["team:programmers"],
ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])},
tags={
**build_kind_tag("pandas"),
**build_kind_tag(storage_kind),
},
kinds={"pandas", storage_kind},
)
def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame:
"""Computes min order KPI"""
Expand All @@ -83,10 +77,7 @@ def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame:
io_manager_key="model_io_manager",
key_prefix="MARKETING",
ins={"sku_stats": AssetIn(key_prefix=["ANALYTICS"])},
tags={
**build_kind_tag("hex"),
**build_kind_tag("s3"),
},
kinds={"hex", "s3"},
)
def key_product_deepdive(context, sku_stats):
"""Creates a file for a BI tool based on the current quarters top product, represented as a dynamic partition"""
Expand Down
14 changes: 4 additions & 10 deletions hooli_data_eng/assets/raw_data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import pandas as pd

from hooli_data_eng.resources.api import RawDataAPI
from hooli_data_eng.utils.storage_kind_helpers import get_storage_kind
from hooli_data_eng.utils.kind_helpers import get_kind


# dynamically determine storage_kind based on environment
storage_kind = get_storage_kind()
storage_kind = get_kind()


daily_partitions = DailyPartitionsDefinition(
Expand All @@ -43,10 +43,7 @@ def _daily_partition_seq(start, end):
partitions_def=daily_partitions,
metadata={"partition_expr": "created_at"},
backfill_policy=BackfillPolicy.single_run(),
tags={"core_kpis":"",
**build_kind_tag("api"),
**build_kind_tag(storage_kind),
},
kinds={"api", storage_kind},
)
def users(context, api: RawDataAPI) -> pd.DataFrame:
"""A table containing all users data"""
Expand Down Expand Up @@ -92,10 +89,7 @@ def check_users(context, users: pd.DataFrame):
jitter=Jitter.FULL
),
backfill_policy=BackfillPolicy.single_run(),
tags={
**build_kind_tag("api"),
**build_kind_tag(storage_kind),
},
kinds={"api", storage_kind},
)
def orders(context, api: RawDataAPI) -> pd.DataFrame:
"""A table containing all orders that have been placed"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os

def get_storage_kind() -> str:
def get_kind() -> str:
"""
Determine the storage kind based on the environment.

Expand Down
Loading