Skip to content

Commit

Permalink
Add a way to create discoverable jobs in the asset files (#2153)
Browse files Browse the repository at this point in the history
  • Loading branch information
ravenac95 committed Sep 17, 2024
1 parent 65f2f9f commit 7cf3413
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 76 deletions.
46 changes: 31 additions & 15 deletions warehouse/oso_dagster/assets/ossd.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
from typing import cast, Optional, Dict
import typing as t

import arrow
import polars as pl
from dagster import (
multi_asset,
Output,
AssetOut,
AssetExecutionContext,
JsonMetadataValue,
Config,
AssetIn,
AssetOut,
AssetsDefinition,
AssetSelection,
Config,
JsonMetadataValue,
Output,
define_asset_job,
multi_asset,
)
from ossdirectory import fetch_data
from ossdirectory.fetch import OSSDirectory
import polars as pl
import arrow

from oso_dagster.dlt_sources.github_repos import (
oss_directory_github_repositories_resource,
)
from oso_dagster.factories import dlt_factory
from oso_dagster.factories.common import AssetFactoryResponse
from oso_dagster.factories.jobs import discoverable_jobs
from oso_dagster.utils import secret_ref_arg
from ossdirectory import fetch_data
from ossdirectory.fetch import OSSDirectory

common_tags: Dict[str, str] = {
common_tags: t.Dict[str, str] = {
"opensource.observer/environment": "production",
"opensource.observer/group": "ossd",
"opensource.observer/type": "source",
Expand All @@ -35,7 +39,7 @@ class OSSDirectoryConfig(Config):
force_write: bool = False


def oss_directory_to_dataframe(output: str, data: Optional[OSSDirectory] = None):
def oss_directory_to_dataframe(output: str, data: t.Optional[OSSDirectory] = None):
if not data:
data = fetch_data()
assert data.meta is not None
Expand Down Expand Up @@ -97,8 +101,8 @@ def projects_and_collections(
"repo_meta", {}
)
if repo_meta:
repo_meta = cast(JsonMetadataValue, repo_meta)
repo_meta_dict = cast(dict, repo_meta.data)
repo_meta = t.cast(JsonMetadataValue, repo_meta)
repo_meta_dict = t.cast(dict, repo_meta.data)
context.log.debug(
{
"message": "repo_meta",
Expand Down Expand Up @@ -141,3 +145,15 @@ def repositories(
gh_token: str = secret_ref_arg(group_name="ossd", key="github_token"),
):
yield oss_directory_github_repositories_resource(projects_df, gh_token)


@discoverable_jobs(dependencies=[repositories])
def ossd_jobs(dependencies: t.List[AssetFactoryResponse]):
repositories = t.cast(AssetsDefinition, list(dependencies[0].assets)[0])
return [
define_asset_job(
name="oss_directory_sync",
selection=AssetSelection.assets(projects_and_collections)
| AssetSelection.assets(repositories),
)
]
69 changes: 41 additions & 28 deletions warehouse/oso_dagster/factories/common.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
import logging
import inspect
from typing import List, Iterable, Union, Callable, Any, Dict, Optional, cast
import logging
import typing as t
from dataclasses import dataclass, field

from dagster import (
SensorDefinition,
AssetChecksDefinition,
AssetsDefinition,
JobDefinition,
AssetChecksDefinition,
SensorDefinition,
SourceAsset,
)
from dagster._core.definitions.asset_dep import CoercibleToAssetDep
from dagster._core.definitions.asset_key import CoercibleToAssetKeyPrefix

# This import is fragile but it can't be helped for the current typing.
# Continuous deployment will have to save us here.
from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition
from dagster._core.definitions.asset_dep import CoercibleToAssetDep
from dagster._core.definitions.asset_key import CoercibleToAssetKeyPrefix
from dagster._core.definitions.unresolved_asset_job_definition import (
UnresolvedAssetJobDefinition,
)

from oso_dagster import constants

type GenericAsset = Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]
type NonCacheableAssetsDefinition = Union[AssetsDefinition, SourceAsset]
type AssetList = Iterable[GenericAsset]
type AssetDeps = Iterable[CoercibleToAssetDep]
type GenericAsset = t.Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]
type NonCacheableAssetsDefinition = t.Union[AssetsDefinition, SourceAsset]
type AssetList = t.Iterable[GenericAsset]
type AssetDeps = t.Iterable[CoercibleToAssetDep]
type AssetKeyPrefixParam = CoercibleToAssetKeyPrefix
type FactoryJobDefinition = JobDefinition | UnresolvedAssetJobDefinition

logger = logging.getLogger(__name__)

Expand All @@ -42,11 +42,9 @@ def sync(self):
@dataclass
class AssetFactoryResponse:
assets: AssetList
sensors: List[SensorDefinition] = field(default_factory=lambda: [])
jobs: List[JobDefinition | UnresolvedAssetJobDefinition] = field(
default_factory=lambda: []
)
checks: List[AssetChecksDefinition] = field(default_factory=lambda: [])
sensors: t.List[SensorDefinition] = field(default_factory=lambda: [])
jobs: t.List[FactoryJobDefinition] = field(default_factory=lambda: [])
checks: t.List[AssetChecksDefinition] = field(default_factory=lambda: [])

def __add__(self, other: "AssetFactoryResponse") -> "AssetFactoryResponse":
return AssetFactoryResponse(
Expand All @@ -57,13 +55,13 @@ def __add__(self, other: "AssetFactoryResponse") -> "AssetFactoryResponse":
)

def filter_assets(
self, f: Callable[[NonCacheableAssetsDefinition], bool]
) -> Iterable[NonCacheableAssetsDefinition]:
self, f: t.Callable[[NonCacheableAssetsDefinition], bool]
) -> t.Iterable[NonCacheableAssetsDefinition]:
"""Due to limitations of docs on CacheableAssetsDefinitions, we filter
out any CacheableAssetsDefinitions as they cannot be compared against
for filtering"""
no_cacheable_assets = cast(
List[NonCacheableAssetsDefinition],
no_cacheable_assets = t.cast(
t.List[NonCacheableAssetsDefinition],
filter(lambda a: not isinstance(a, CacheableAssetsDefinition), self.assets),
)
return filter(f, no_cacheable_assets)
Expand All @@ -75,11 +73,11 @@ def filter_assets_by_name(self, name: str):

def find_job_by_name(
self, name: str
) -> Optional[Union[JobDefinition, UnresolvedAssetJobDefinition]]:
) -> t.Optional[t.Union[JobDefinition, UnresolvedAssetJobDefinition]]:
return next((job for job in self.jobs if job.name == name), None)


type EarlyResourcesAssetDecoratedFunction[**P] = Callable[
type EarlyResourcesAssetDecoratedFunction[**P] = t.Callable[
P, AssetFactoryResponse | AssetsDefinition
]

Expand All @@ -92,17 +90,22 @@ class EarlyResourcesAssetFactory:
def __init__(
self,
f: EarlyResourcesAssetDecoratedFunction,
caller: Optional[inspect.FrameInfo] = None,
additional_annotations: Optional[Dict[str, Any]] = None,
caller: t.Optional[inspect.FrameInfo] = None,
additional_annotations: t.Optional[t.Dict[str, t.Any]] = None,
dependencies: t.Optional[t.List["EarlyResourcesAssetFactory"]] = None,
):
self._f = f
self._caller = caller
self.additional_annotations = additional_annotations or {}
self._dependencies = dependencies or []

def __call__(self, **early_resources) -> AssetFactoryResponse:
def __call__(
self, dependencies: t.List[AssetFactoryResponse], **early_resources
) -> AssetFactoryResponse:
annotations = self._f.__annotations__.copy()
annotations.update(self.additional_annotations)
args: Dict[str, Any] = dict()
early_resources["dependencies"] = dependencies
args: t.Dict[str, t.Any] = dict()
for key, value in annotations.items():
if key not in early_resources:
raise Exception(
Expand Down Expand Up @@ -132,15 +135,25 @@ def __call__(self, **early_resources) -> AssetFactoryResponse:
else:
raise Exception("Invalid early resource factory")

@property
def dependencies(self):
return self._dependencies[:]


def early_resources_asset_factory(
*, caller_depth: int = 1, additional_annotations: Optional[Dict[str, Any]] = None
*,
caller_depth: int = 1,
additional_annotations: t.Optional[t.Dict[str, t.Any]] = None,
dependencies: t.Optional[t.List[EarlyResourcesAssetFactory]] = None,
):
caller = inspect.stack()[caller_depth]

def _decorator(f: EarlyResourcesAssetDecoratedFunction):
return EarlyResourcesAssetFactory(
f, caller=caller, additional_annotations=additional_annotations
f,
caller=caller,
additional_annotations=additional_annotations,
dependencies=dependencies,
)

return _decorator
45 changes: 24 additions & 21 deletions warehouse/oso_dagster/factories/dlt.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,46 @@
from typing import (
List,
Dict,
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Mapping,
MutableMapping,
Optional,
Callable,
Iterator,
Iterable,
cast,
Union,
Type,
Union,
cast,
)

from uuid import uuid4

import dlt as dltlib
from dagster import (
PartitionsDefinition,
asset,
AssetIn,
AssetExecutionContext,
MaterializeResult,
Config,
AssetIn,
AssetMaterialization,
Config,
MaterializeResult,
PartitionsDefinition,
asset,
define_asset_job,
)
import dlt as dltlib
from dlt.sources import DltResource
from dagster_embedded_elt.dlt import DagsterDltResource
from dlt.common.destination import Destination
from dlt.common.libs.pydantic import pydantic_to_table_schema_columns
from dagster_embedded_elt.dlt import DagsterDltResource
from pydantic import Field, BaseModel
from dlt.sources import DltResource
from pydantic import BaseModel, Field

from .. import constants
from ..utils import SecretResolver, resolve_secrets_for_func
from .common import (
AssetDeps,
AssetFactoryResponse,
AssetKeyPrefixParam,
EarlyResourcesAssetFactory,
early_resources_asset_factory,
AssetFactoryResponse,
)
from ..utils import SecretResolver, resolve_secrets_for_func
from .sql import PrefixedDltTranslator
from .. import constants


class DltAssetConfig(Config):
Expand Down Expand Up @@ -91,7 +92,9 @@ def dlt_factory(
key_prefix_str = "_".join(key_prefix)
dataset_name = dataset_name or key_prefix_str

def _decorator(f: Callable[..., Iterator[DltResource]]):
def _decorator(
f: Callable[..., Iterator[DltResource]]
) -> EarlyResourcesAssetFactory:
asset_name = name or f.__name__

@early_resources_asset_factory(caller_depth=2)
Expand Down
32 changes: 32 additions & 0 deletions warehouse/oso_dagster/factories/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import typing as t

from oso_dagster.factories.common import (
AssetFactoryResponse,
EarlyResourcesAssetFactory,
FactoryJobDefinition,
early_resources_asset_factory,
)


def discoverable_jobs(dependencies: t.Optional[t.List[t.Any]] = None):
"""A decorator for defining a set of automatically loaded jobs.
This does this by generating an AssetFactoryResponse with the jobs
configured. This is useful if you need to create jobs that span multiple
assets that aren't all created from a single factory"""
dependencies = dependencies or []
for dep in dependencies:
assert isinstance(dep, EarlyResourcesAssetFactory)

def _decorated(f: t.Callable[..., t.Iterable[FactoryJobDefinition]]):
@early_resources_asset_factory(caller_depth=2, dependencies=dependencies)
def _jobs(dependencies: t.List[AssetFactoryResponse]):
if dependencies:
jobs = list(f(dependencies=dependencies))
else:
jobs = list(f())
return AssetFactoryResponse(assets=[], jobs=jobs)

return _jobs

return _decorated
Loading

0 comments on commit 7cf3413

Please sign in to comment.