diff --git a/warehouse/oso_dagster/assets/ossd.py b/warehouse/oso_dagster/assets/ossd.py index 9d5c3e07..e9b4e28f 100644 --- a/warehouse/oso_dagster/assets/ossd.py +++ b/warehouse/oso_dagster/assets/ossd.py @@ -1,26 +1,30 @@ -from typing import cast, Optional, Dict +import typing as t +import arrow +import polars as pl from dagster import ( - multi_asset, - Output, - AssetOut, AssetExecutionContext, - JsonMetadataValue, - Config, AssetIn, + AssetOut, + AssetsDefinition, + AssetSelection, + Config, + JsonMetadataValue, + Output, + define_asset_job, + multi_asset, ) -from ossdirectory import fetch_data -from ossdirectory.fetch import OSSDirectory -import polars as pl -import arrow - from oso_dagster.dlt_sources.github_repos import ( oss_directory_github_repositories_resource, ) from oso_dagster.factories import dlt_factory +from oso_dagster.factories.common import AssetFactoryResponse +from oso_dagster.factories.jobs import discoverable_jobs from oso_dagster.utils import secret_ref_arg +from ossdirectory import fetch_data +from ossdirectory.fetch import OSSDirectory -common_tags: Dict[str, str] = { +common_tags: t.Dict[str, str] = { "opensource.observer/environment": "production", "opensource.observer/group": "ossd", "opensource.observer/type": "source", @@ -35,7 +39,7 @@ class OSSDirectoryConfig(Config): force_write: bool = False -def oss_directory_to_dataframe(output: str, data: Optional[OSSDirectory] = None): +def oss_directory_to_dataframe(output: str, data: t.Optional[OSSDirectory] = None): if not data: data = fetch_data() assert data.meta is not None @@ -97,8 +101,8 @@ def projects_and_collections( "repo_meta", {} ) if repo_meta: - repo_meta = cast(JsonMetadataValue, repo_meta) - repo_meta_dict = cast(dict, repo_meta.data) + repo_meta = t.cast(JsonMetadataValue, repo_meta) + repo_meta_dict = t.cast(dict, repo_meta.data) context.log.debug( { "message": "repo_meta", @@ -141,3 +145,15 @@ def repositories( gh_token: str = secret_ref_arg(group_name="ossd", key="github_token"), ): yield oss_directory_github_repositories_resource(projects_df, gh_token) + + +@discoverable_jobs(dependencies=[repositories]) +def ossd_jobs(dependencies: t.List[AssetFactoryResponse]): + repositories = t.cast(AssetsDefinition, list(dependencies[0].assets)[0]) + return [ + define_asset_job( + name="oss_directory_sync", + selection=AssetSelection.assets(projects_and_collections) + | AssetSelection.assets(repositories), + ) + ] diff --git a/warehouse/oso_dagster/factories/common.py b/warehouse/oso_dagster/factories/common.py index e3cafe88..d460812b 100644 --- a/warehouse/oso_dagster/factories/common.py +++ b/warehouse/oso_dagster/factories/common.py @@ -1,32 +1,32 @@ -import logging import inspect -from typing import List, Iterable, Union, Callable, Any, Dict, Optional, cast +import logging +import typing as t from dataclasses import dataclass, field from dagster import ( - SensorDefinition, + AssetChecksDefinition, AssetsDefinition, JobDefinition, - AssetChecksDefinition, + SensorDefinition, SourceAsset, ) +from dagster._core.definitions.asset_dep import CoercibleToAssetDep +from dagster._core.definitions.asset_key import CoercibleToAssetKeyPrefix # This import is fragile but it can't be helped for the current typing. # Continuous deployment will have to save us here. from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition -from dagster._core.definitions.asset_dep import CoercibleToAssetDep -from dagster._core.definitions.asset_key import CoercibleToAssetKeyPrefix from dagster._core.definitions.unresolved_asset_job_definition import ( UnresolvedAssetJobDefinition, ) - from oso_dagster import constants -type GenericAsset = Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition] -type NonCacheableAssetsDefinition = Union[AssetsDefinition, SourceAsset] -type AssetList = Iterable[GenericAsset] -type AssetDeps = Iterable[CoercibleToAssetDep] +type GenericAsset = t.Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition] +type NonCacheableAssetsDefinition = t.Union[AssetsDefinition, SourceAsset] +type AssetList = t.Iterable[GenericAsset] +type AssetDeps = t.Iterable[CoercibleToAssetDep] type AssetKeyPrefixParam = CoercibleToAssetKeyPrefix +type FactoryJobDefinition = JobDefinition | UnresolvedAssetJobDefinition logger = logging.getLogger(__name__) @@ -42,11 +42,9 @@ def sync(self): @dataclass class AssetFactoryResponse: assets: AssetList - sensors: List[SensorDefinition] = field(default_factory=lambda: []) - jobs: List[JobDefinition | UnresolvedAssetJobDefinition] = field( - default_factory=lambda: [] - ) - checks: List[AssetChecksDefinition] = field(default_factory=lambda: []) + sensors: t.List[SensorDefinition] = field(default_factory=lambda: []) + jobs: t.List[FactoryJobDefinition] = field(default_factory=lambda: []) + checks: t.List[AssetChecksDefinition] = field(default_factory=lambda: []) def __add__(self, other: "AssetFactoryResponse") -> "AssetFactoryResponse": return AssetFactoryResponse( @@ -57,13 +55,13 @@ def __add__(self, other: "AssetFactoryResponse") -> "AssetFactoryResponse": ) def filter_assets( - self, f: Callable[[NonCacheableAssetsDefinition], bool] - ) -> Iterable[NonCacheableAssetsDefinition]: + self, f: t.Callable[[NonCacheableAssetsDefinition], bool] + ) -> t.Iterable[NonCacheableAssetsDefinition]: """Due to limitations of docs on CacheableAssetsDefinitions, we filter out any CacheableAssetsDefinitions as they cannot be compared against for filtering""" - no_cacheable_assets = cast( - List[NonCacheableAssetsDefinition], + no_cacheable_assets = t.cast( + t.List[NonCacheableAssetsDefinition], filter(lambda a: not isinstance(a, CacheableAssetsDefinition), self.assets), ) return filter(f, no_cacheable_assets) @@ -75,11 +73,11 @@ def filter_assets_by_name(self, name: str): def find_job_by_name( self, name: str - ) -> Optional[Union[JobDefinition, UnresolvedAssetJobDefinition]]: + ) -> t.Optional[t.Union[JobDefinition, UnresolvedAssetJobDefinition]]: return next((job for job in self.jobs if job.name == name), None) -type EarlyResourcesAssetDecoratedFunction[**P] = Callable[ +type EarlyResourcesAssetDecoratedFunction[**P] = t.Callable[ P, AssetFactoryResponse | AssetsDefinition ] @@ -92,17 +90,22 @@ class EarlyResourcesAssetFactory: def __init__( self, f: EarlyResourcesAssetDecoratedFunction, - caller: Optional[inspect.FrameInfo] = None, - additional_annotations: Optional[Dict[str, Any]] = None, + caller: t.Optional[inspect.FrameInfo] = None, + additional_annotations: t.Optional[t.Dict[str, t.Any]] = None, + dependencies: t.Optional[t.List["EarlyResourcesAssetFactory"]] = None, ): self._f = f self._caller = caller self.additional_annotations = additional_annotations or {} + self._dependencies = dependencies or [] - def __call__(self, **early_resources) -> AssetFactoryResponse: + def __call__( + self, dependencies: t.List[AssetFactoryResponse], **early_resources + ) -> AssetFactoryResponse: annotations = self._f.__annotations__.copy() annotations.update(self.additional_annotations) - args: Dict[str, Any] = dict() + early_resources["dependencies"] = dependencies + args: t.Dict[str, t.Any] = dict() for key, value in annotations.items(): if key not in early_resources: raise Exception( @@ -132,15 +135,25 @@ def __call__(self, **early_resources) -> AssetFactoryResponse: else: raise Exception("Invalid early resource factory") + @property + def dependencies(self): + return self._dependencies[:] + def early_resources_asset_factory( - *, caller_depth: int = 1, additional_annotations: Optional[Dict[str, Any]] = None + *, + caller_depth: int = 1, + additional_annotations: t.Optional[t.Dict[str, t.Any]] = None, + dependencies: t.Optional[t.List[EarlyResourcesAssetFactory]] = None, ): caller = inspect.stack()[caller_depth] def _decorator(f: EarlyResourcesAssetDecoratedFunction): return EarlyResourcesAssetFactory( - f, caller=caller, additional_annotations=additional_annotations + f, + caller=caller, + additional_annotations=additional_annotations, + dependencies=dependencies, ) return _decorator diff --git a/warehouse/oso_dagster/factories/dlt.py b/warehouse/oso_dagster/factories/dlt.py index 8a536019..c892b7b7 100644 --- a/warehouse/oso_dagster/factories/dlt.py +++ b/warehouse/oso_dagster/factories/dlt.py @@ -1,45 +1,46 @@ from typing import ( - List, - Dict, Any, + Callable, + Dict, + Iterable, + Iterator, + List, Mapping, MutableMapping, Optional, - Callable, - Iterator, - Iterable, - cast, - Union, Type, + Union, + cast, ) - from uuid import uuid4 + +import dlt as dltlib from dagster import ( - PartitionsDefinition, - asset, - AssetIn, AssetExecutionContext, - MaterializeResult, - Config, + AssetIn, AssetMaterialization, + Config, + MaterializeResult, + PartitionsDefinition, + asset, define_asset_job, ) -import dlt as dltlib -from dlt.sources import DltResource +from dagster_embedded_elt.dlt import DagsterDltResource from dlt.common.destination import Destination from dlt.common.libs.pydantic import pydantic_to_table_schema_columns -from dagster_embedded_elt.dlt import DagsterDltResource -from pydantic import Field, BaseModel +from dlt.sources import DltResource +from pydantic import BaseModel, Field +from .. import constants +from ..utils import SecretResolver, resolve_secrets_for_func from .common import ( AssetDeps, + AssetFactoryResponse, AssetKeyPrefixParam, + EarlyResourcesAssetFactory, early_resources_asset_factory, - AssetFactoryResponse, ) -from ..utils import SecretResolver, resolve_secrets_for_func from .sql import PrefixedDltTranslator -from .. import constants class DltAssetConfig(Config): @@ -91,7 +92,9 @@ def dlt_factory( key_prefix_str = "_".join(key_prefix) dataset_name = dataset_name or key_prefix_str - def _decorator(f: Callable[..., Iterator[DltResource]]): + def _decorator( + f: Callable[..., Iterator[DltResource]] + ) -> EarlyResourcesAssetFactory: asset_name = name or f.__name__ @early_resources_asset_factory(caller_depth=2) diff --git a/warehouse/oso_dagster/factories/jobs.py b/warehouse/oso_dagster/factories/jobs.py new file mode 100644 index 00000000..84c4874d --- /dev/null +++ b/warehouse/oso_dagster/factories/jobs.py @@ -0,0 +1,32 @@ +import typing as t + +from oso_dagster.factories.common import ( + AssetFactoryResponse, + EarlyResourcesAssetFactory, + FactoryJobDefinition, + early_resources_asset_factory, +) + + +def discoverable_jobs(dependencies: t.Optional[t.List[t.Any]] = None): + """A decorator for defining a set of automatically loaded jobs. + + This does this by generating an AssetFactoryResponse with the jobs + configured. This is useful if you need to create jobs that span multiple + assets that aren't all created from a single factory""" + dependencies = dependencies or [] + for dep in dependencies: + assert isinstance(dep, EarlyResourcesAssetFactory) + + def _decorated(f: t.Callable[..., t.Iterable[FactoryJobDefinition]]): + @early_resources_asset_factory(caller_depth=2, dependencies=dependencies) + def _jobs(dependencies: t.List[AssetFactoryResponse]): + if dependencies: + jobs = list(f(dependencies=dependencies)) + else: + jobs = list(f()) + return AssetFactoryResponse(assets=[], jobs=jobs) + + return _jobs + + return _decorated diff --git a/warehouse/oso_dagster/factories/loader.py b/warehouse/oso_dagster/factories/loader.py index 5982ea95..155f7fa9 100644 --- a/warehouse/oso_dagster/factories/loader.py +++ b/warehouse/oso_dagster/factories/loader.py @@ -1,43 +1,83 @@ -from typing import List, Dict, Any -from types import ModuleType import importlib import pkgutil +import typing as t +from graphlib import TopologicalSorter +from types import ModuleType -from dagster import ( - load_assets_from_modules, -) +from dagster import load_assets_from_modules from .common import AssetFactoryResponse, EarlyResourcesAssetFactory +class EarlyResourcesAssetFactoryDAG: + def __init__(self): + self._graph: t.Dict[ + EarlyResourcesAssetFactory, t.Set[EarlyResourcesAssetFactory] + ] = {} + self._sorted: ( + t.List[ + t.Tuple[EarlyResourcesAssetFactory, t.Set[EarlyResourcesAssetFactory]] + ] + | None + ) = None + + def add(self, resource_factory: EarlyResourcesAssetFactory): + self._graph[resource_factory] = set(resource_factory.dependencies) + self._sorted = None + + def sorted(self): + if not self._sorted: + sorter = TopologicalSorter(self._graph) + sorted = sorter.static_order() + self._sorted = [] + for factory in sorted: + self._sorted.append((factory, self._graph[factory])) + + return self._sorted + + def load_all_assets_from_package( - package: ModuleType, early_resources: Dict[str, Any] + package: ModuleType, early_resources: t.Dict[str, t.Any] ) -> AssetFactoryResponse: """Loads all assets and factories from a given package and any submodules it may have""" package_path = package.__path__ - modules: List[ModuleType] = [] + modules: t.List[ModuleType] = [] + early_resources_dag: EarlyResourcesAssetFactoryDAG = EarlyResourcesAssetFactoryDAG() for module_info in pkgutil.walk_packages(package_path, package.__name__ + "."): module_name = module_info.name module = importlib.import_module(module_name) modules.append(module) - factories = load_assets_factories_from_modules(modules, early_resources) + factories = load_assets_factories_from_modules(modules, early_resources_dag) + + resolved_factories: t.Dict[EarlyResourcesAssetFactory, AssetFactoryResponse] = {} + + # Resolve all early factories in topological order + for early_factory, deps in early_resources_dag.sorted(): + resolved_deps = [resolved_factories[factory] for factory in deps] + + resp = early_factory(dependencies=resolved_deps, **early_resources) + + resolved_factories[early_factory] = resp + factories = factories + resp + asset_defs = load_assets_from_modules(modules) return factories + AssetFactoryResponse(asset_defs) def load_assets_factories_from_modules( - modules: List[ModuleType], - early_resources: Dict[str, Any], + modules: t.List[ModuleType], + dag: EarlyResourcesAssetFactoryDAG, ) -> AssetFactoryResponse: all = AssetFactoryResponse([]) for module in modules: module_dict = module.__dict__.copy() for _, obj in module_dict.items(): if isinstance(obj, EarlyResourcesAssetFactory): - resp = obj(**early_resources) - all = all + resp + # resp = obj(**early_resources) + # all = all + resp + dag.add(obj) elif isinstance(obj, AssetFactoryResponse): all = all + obj return all