Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default source nodes rendering #1107

Merged
merged 34 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9d4c2cf
Add default source nodes rendering
arojasb3 Jul 19, 2024
d925d40
Update ResourceType reference with constant
arojasb3 Jul 19, 2024
070b4d5
Add MixIn Operator + freshness template rendering
arojasb3 Jul 21, 2024
021a834
Add render_source_nodes to settings.py
arojasb3 Jul 22, 2024
56a77ed
update jaffle shop source with tests
arojasb3 Jul 22, 2024
6c208c1
Add source node rendering param to render config
arojasb3 Jul 25, 2024
8ad040c
Update existing tests with new source rendering
arojasb3 Jul 26, 2024
395ecc6
Update project hash in tests
arojasb3 Jul 26, 2024
e9e3793
Merge branch 'main' into add-source-nodes
arojasb3 Jul 29, 2024
4fcb9a2
update extra context after merge
arojasb3 Jul 29, 2024
760d3f8
update test hash after merge
arojasb3 Jul 29, 2024
9c42e0d
Merge branch 'main' into add-source-nodes
arojasb3 Jul 30, 2024
91ae0f4
adding quotes in ls command + source test
arojasb3 Aug 2, 2024
734666a
Merge branch 'main' into add-source-nodes
arojasb3 Aug 2, 2024
d8983f8
Update freshness param in source tests
arojasb3 Aug 2, 2024
203c534
update dbt ls command for 1.5.4
arojasb3 Aug 6, 2024
d788c04
Merge branch 'main' into add-source-nodes
arojasb3 Aug 6, 2024
6465a6f
Merge branch 'main' into add-source-nodes
arojasb3 Aug 9, 2024
093b6e5
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Aug 9, 2024
41546f0
Change default source rendering to None
arojasb3 Aug 9, 2024
5a062e2
Merge branch 'add-source-nodes' of github.com:arojasb3/astronomer-cos…
arojasb3 Aug 9, 2024
4866f61
Update project hash
arojasb3 Aug 9, 2024
ce6da81
Add suggestions + workflow env variables
arojasb3 Aug 13, 2024
dab5229
Run ci on add-source-nodes branch
pankajastro Aug 13, 2024
d6ad582
Run ci on add-source-nodes branch
pankajastro Aug 13, 2024
1dd4b82
intentionally fail the sql job
pankajastro Aug 13, 2024
1c6ae65
Revert env changes
pankajastro Aug 13, 2024
59574fc
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Aug 13, 2024
ff65e5b
Merge branch 'main' into add-source-nodes
pankajastro Aug 13, 2024
ad0e0c2
Add tests
pankajastro Aug 13, 2024
b0b4020
Add test
pankajastro Aug 13, 2024
7fd92ab
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Aug 13, 2024
96492c8
Revert source render param
pankajastro Aug 13, 2024
bacd19d
Add no cover
pankajastro Aug 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ jobs:
POSTGRES_DB: postgres
POSTGRES_SCHEMA: public
POSTGRES_PORT: 5432
SOURCE_RENDERING_BEHAVIOR: all

- name: Upload coverage to Github
uses: actions/upload-artifact@v2
Expand Down Expand Up @@ -234,6 +235,7 @@ jobs:
POSTGRES_DB: postgres
POSTGRES_SCHEMA: public
POSTGRES_PORT: 5432
SOURCE_RENDERING_BEHAVIOR: all

- name: Upload coverage to Github
uses: actions/upload-artifact@v2
Expand Down Expand Up @@ -377,6 +379,7 @@ jobs:
POSTGRES_DB: postgres
POSTGRES_SCHEMA: public
POSTGRES_PORT: 5432
SOURCE_RENDERING_BEHAVIOR: all

- name: Upload coverage to Github
uses: actions/upload-artifact@v2
Expand Down
34 changes: 32 additions & 2 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
TESTABLE_DBT_RESOURCES,
DbtResourceType,
ExecutionMode,
SourceRenderingBehavior,
TestBehavior,
TestIndirectSelection,
)
Expand Down Expand Up @@ -127,7 +128,11 @@


def create_task_metadata(
node: DbtNode, execution_mode: ExecutionMode, args: dict[str, Any], use_task_group: bool = False
node: DbtNode,
execution_mode: ExecutionMode,
args: dict[str, Any],
use_task_group: bool = False,
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE,
) -> TaskMetadata | None:
"""
Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node.
Expand All @@ -145,6 +150,7 @@
DbtResourceType.SNAPSHOT: "DbtSnapshot",
DbtResourceType.SEED: "DbtSeed",
DbtResourceType.TEST: "DbtTest",
DbtResourceType.SOURCE: "DbtSource",
}
args = {**args, **{"models": node.resource_name}}

Expand All @@ -154,6 +160,23 @@
task_id = f"{node.name}_run"
if use_task_group is True:
task_id = "run"
elif node.resource_type == DbtResourceType.SOURCE:
if (source_rendering_behavior == SourceRenderingBehavior.NONE) or (
source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS
and node.has_freshness is False
and node.has_test is False
):
return None
# TODO: https://github.com/astronomer/astronomer-cosmos
# pragma: no cover
task_id = f"{node.name}_source"
args["select"] = f"source:{node.resource_name}"
args.pop("models")
if use_task_group is True:
task_id = node.resource_type.value
Comment on lines +175 to +176
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep only this and remove the lines 182-183, that contain the same?

            if use_task_group is True:
                task_id = node.resource_type.value

if node.has_freshness is False and source_rendering_behavior == SourceRenderingBehavior.ALL:

Check warning on line 177 in cosmos/airflow/graph.py

View check run for this annotation

Codecov / codecov/patch

cosmos/airflow/graph.py#L172-L177

Added lines #L172 - L177 were not covered by tests
# render sources without freshness as empty operators
return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator")

Check warning on line 179 in cosmos/airflow/graph.py

View check run for this annotation

Codecov / codecov/patch

cosmos/airflow/graph.py#L179

Added line #L179 was not covered by tests
else:
task_id = f"{node.name}_{node.resource_type.value}"
if use_task_group is True:
Expand Down Expand Up @@ -185,6 +208,7 @@
execution_mode: ExecutionMode,
task_args: dict[str, Any],
test_behavior: TestBehavior,
source_rendering_behavior: SourceRenderingBehavior,
test_indirect_selection: TestIndirectSelection,
on_warning_callback: Callable[..., Any] | None,
**kwargs: Any,
Expand All @@ -198,7 +222,11 @@
)

task_meta = create_task_metadata(
node=node, execution_mode=execution_mode, args=task_args, use_task_group=use_task_group
node=node,
execution_mode=execution_mode,
args=task_args,
use_task_group=use_task_group,
source_rendering_behavior=source_rendering_behavior,
)

# In most cases, we'll map one DBT node to one Airflow task
Expand Down Expand Up @@ -260,6 +288,7 @@
"""
node_converters = render_config.node_converters or {}
test_behavior = render_config.test_behavior
source_rendering_behavior = render_config.source_rendering_behavior
tasks_map = {}
task_or_group: TaskGroup | BaseOperator

Expand All @@ -278,6 +307,7 @@
execution_mode=execution_mode,
task_args=task_args,
test_behavior=test_behavior,
source_rendering_behavior=source_rendering_behavior,
test_indirect_selection=test_indirect_selection,
on_warning_callback=on_warning_callback,
node=node,
Expand Down
3 changes: 3 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
ExecutionMode,
InvocationMode,
LoadMode,
SourceRenderingBehavior,
TestBehavior,
TestIndirectSelection,
)
Expand Down Expand Up @@ -59,6 +60,7 @@ class RenderConfig:
:param dbt_project_path: Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``.
:param dbt_ls_path: Configures the location of an output of ``dbt ls``. Required when using ``load_method=LoadMode.DBT_LS_FILE``.
:param enable_mock_profile: Allows to enable/disable mocking profile. Enabled by default. Mock profiles are useful for parsing Cosmos DAGs in the CI, but should be disabled to benefit from partial parsing (since Cosmos 1.4).
:param source_rendering_behavior: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6).
"""

emit_datasets: bool = True
Expand All @@ -75,6 +77,7 @@ class RenderConfig:
dbt_ls_path: Path | None = None
project_path: Path | None = field(init=False)
enable_mock_profile: bool = True
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE
airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list)

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
Expand Down
10 changes: 10 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ class TestIndirectSelection(Enum):
EMPTY = "empty"


class SourceRenderingBehavior(Enum):
"""
Modes to configure the source rendering behavior.
"""

NONE = "none"
ALL = "all"
WITH_TESTS_OR_FRESHNESS = "with_tests_or_freshness"


class DbtResourceType(aenum.Enum): # type: ignore
"""
Type of dbt node.
Expand Down
2 changes: 1 addition & 1 deletion cosmos/core/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def get_airflow_task(task: Task, dag: DAG, task_group: "TaskGroup | None" = None
dag=dag,
task_group=task_group,
owner=task_owner,
extra_context=task.extra_context,
**({} if class_name == "EmptyOperator" else {"extra_context": task.extra_context}),
**task.arguments,
)

Expand Down
59 changes: 56 additions & 3 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from functools import cached_property
from pathlib import Path
from subprocess import PIPE, Popen
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Optional

from airflow.models import Variable

Expand All @@ -33,6 +33,7 @@
DbtResourceType,
ExecutionMode,
LoadMode,
SourceRenderingBehavior,
)
from cosmos.dbt.parser.project import LegacyDbtProject
from cosmos.dbt.project import create_symlinks, environ, get_partial_parse_path, has_non_empty_dependencies_file
Expand All @@ -53,7 +54,7 @@ class CosmosLoadDbtException(Exception):
@dataclass
class DbtNode:
"""
Metadata related to a dbt node (e.g. model, seed, snapshot).
Metadata related to a dbt node (e.g. model, seed, snapshot, source).
"""

unique_id: str
Expand All @@ -62,6 +63,7 @@ class DbtNode:
file_path: Path
tags: list[str] = field(default_factory=lambda: [])
config: dict[str, Any] = field(default_factory=lambda: {})
has_freshness: bool = False
has_test: bool = False

@property
Expand Down Expand Up @@ -104,6 +106,30 @@ def context_dict(self) -> dict[str, Any]:
}


def is_freshness_effective(freshness: Optional[dict[str, Any]]) -> bool:
"""Function to find if a source has null freshness. Scenarios where freshness
looks like:
"freshness": {
"warn_after": {
"count": null,
"period": null
},
"error_after": {
"count": null,
"period": null
},
"filter": null
}
should be considered as null, this function ensures that."""
if freshness is None:
return False
for _, value in freshness.items():
if isinstance(value, dict):
if any(subvalue is not None for subvalue in value.values()):
return True
return False


def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str:
"""Run a command in a subprocess, returning the stdout."""
logger.info("Running command: `%s`", " ".join(command))
Expand Down Expand Up @@ -147,6 +173,11 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str,
file_path=project_path / node_dict["original_file_path"],
tags=node_dict.get("tags", []),
config=node_dict.get("config", {}),
has_freshness=(
is_freshness_effective(node_dict.get("freshness"))
if DbtResourceType(node_dict["resource_type"]) == DbtResourceType.SOURCE
else False
),
)
nodes[node.unique_id] = node
logger.debug("Parsed dbt resource `%s` of type `%s`", node.unique_id, node.resource_type)
Expand Down Expand Up @@ -353,7 +384,24 @@ def run_dbt_ls(
self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]
) -> dict[str, DbtNode]:
"""Runs dbt ls command and returns the parsed nodes."""
ls_command = [dbt_cmd, "ls", "--output", "json"]
if self.render_config.source_rendering_behavior != SourceRenderingBehavior.NONE:
ls_command = [
dbt_cmd,
"ls",
"--output",
"json",
"--output-keys",
"name",
"unique_id",
"resource_type",
"depends_on",
"original_file_path",
"tags",
"config",
"freshness",
]
else:
ls_command = [dbt_cmd, "ls", "--output", "json"]

ls_args = self.dbt_ls_args
ls_command.extend(self.local_flags)
Expand Down Expand Up @@ -636,6 +684,11 @@ def load_from_dbt_manifest(self) -> None:
file_path=self.execution_config.project_path / Path(node_dict["original_file_path"]),
tags=node_dict["tags"],
config=node_dict["config"],
has_freshness=(
is_freshness_effective(node_dict.get("freshness"))
if DbtResourceType(node_dict["resource_type"]) == DbtResourceType.SOURCE
else False
),
)

nodes[node.unique_id] = node
Expand Down
7 changes: 7 additions & 0 deletions cosmos/operators/aws_eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
DbtRunOperationKubernetesOperator,
DbtSeedKubernetesOperator,
DbtSnapshotKubernetesOperator,
DbtSourceKubernetesOperator,
DbtTestKubernetesOperator,
)

Expand Down Expand Up @@ -101,6 +102,12 @@ class DbtSnapshotAwsEksOperator(DbtAwsEksBaseOperator, DbtSnapshotKubernetesOper
"""


class DbtSourceAzureContainerInstanceOperator(DbtAwsEksBaseOperator, DbtSourceKubernetesOperator):
"""
Executes a dbt source freshness command.
"""


class DbtRunAwsEksOperator(DbtAwsEksBaseOperator, DbtRunKubernetesOperator):
"""
Executes a dbt core run command.
Expand Down
7 changes: 7 additions & 0 deletions cosmos/operators/azure_container_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
DbtRunOperationMixin,
DbtSeedMixin,
DbtSnapshotMixin,
DbtSourceMixin,
DbtTestMixin,
)

Expand Down Expand Up @@ -102,6 +103,12 @@ class DbtSnapshotAzureContainerInstanceOperator(DbtSnapshotMixin, DbtAzureContai
"""


class DbtSourceAzureContainerInstanceOperator(DbtSourceMixin, DbtAzureContainerInstanceBaseOperator):
"""
Executes a dbt source freshness command.
"""


class DbtRunAzureContainerInstanceOperator(DbtRunMixin, DbtAzureContainerInstanceBaseOperator): # type: ignore
"""
Executes a dbt core run command.
Expand Down
9 changes: 9 additions & 0 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,15 @@ class DbtSnapshotMixin:
ui_color = "#964B00"


class DbtSourceMixin:
"""
Executes a dbt source freshness command.
"""

base_cmd = ["source", "freshness"]
ui_color = "#34CCEB"


class DbtRunMixin:
"""
Mixin for dbt run command.
Expand Down
7 changes: 7 additions & 0 deletions cosmos/operators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
DbtRunOperationMixin,
DbtSeedMixin,
DbtSnapshotMixin,
DbtSourceMixin,
DbtTestMixin,
)

Expand Down Expand Up @@ -94,6 +95,12 @@ class DbtSnapshotDockerOperator(DbtSnapshotMixin, DbtDockerBaseOperator):
"""


class DbtSourceDockerOperator(DbtSourceMixin, DbtDockerBaseOperator):
"""
Executes a dbt source freshness command.
"""


class DbtRunDockerOperator(DbtRunMixin, DbtDockerBaseOperator):
"""
Executes a dbt core run command.
Expand Down
7 changes: 7 additions & 0 deletions cosmos/operators/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
DbtRunOperationMixin,
DbtSeedMixin,
DbtSnapshotMixin,
DbtSourceMixin,
DbtTestMixin,
)

Expand Down Expand Up @@ -125,6 +126,12 @@ class DbtSnapshotKubernetesOperator(DbtSnapshotMixin, DbtKubernetesBaseOperator)
"""


class DbtSourceKubernetesOperator(DbtSourceMixin, DbtKubernetesBaseOperator):
"""
Executes a dbt source freshness command.
"""


class DbtRunKubernetesOperator(DbtRunMixin, DbtKubernetesBaseOperator):
"""
Executes a dbt core run command.
Expand Down
Loading
Loading