Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create and run accurate SQL statements when using ExecutionMode.AIRFLOW_ASYNC #1474

Merged
merged 42 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
e657823
Monkeypatch BiqQuery adapter for retriveing SQL for async execution
pankajkoti Jan 21, 2025
8b7b45d
Update cosmos/operators/local.py
pankajkoti Jan 21, 2025
e3ea847
Update cosmos/operators/local.py
pankajkoti Jan 21, 2025
8563a8c
Address @tatiana's review feedback
pankajkoti Jan 23, 2025
94eada9
Refactor run_command method to reduce complexity
pankajkoti Jan 24, 2025
92314e8
Resolve type-check errrors with respect to update method signatures
pankajkoti Jan 24, 2025
859f3ad
Fix tests args
pankajkoti Jan 24, 2025
379d997
Test async dag
pankajkoti Jan 24, 2025
7a85d27
Merge branch 'main' into monkeypatch-bq-adapter
pankajkoti Jan 27, 2025
152b936
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Jan 27, 2025
c11f614
Update cosmos/operators/airflow_async.py
pankajkoti Jan 27, 2025
685757d
Update cosmos/operators/airflow_async.py
pankajkoti Jan 27, 2025
d327b6d
Update cosmos/operators/airflow_async.py
pankajkoti Jan 27, 2025
31161bf
Moment of glory
pankajkoti Jan 28, 2025
5ea5217
Moment of glory 2
pankajkoti Jan 29, 2025
dd595f7
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Jan 29, 2025
f6e17a5
push the progress
pankajkoti Jan 30, 2025
e4fc114
Merge branch 'main' into monkeypatch-bq-adapter
pankajkoti Jan 30, 2025
93e7a8c
Stop another call to BaseOperator init
pankajkoti Jan 30, 2025
9fc5112
Fix import
pankajkoti Jan 30, 2025
55acacc
Try changing inheritance order to see if MRO helps
pankajkoti Jan 30, 2025
57ed5a8
Remove compile task test
pankajkoti Jan 31, 2025
df45cbd
Fix tests
pankajkoti Feb 1, 2025
e848fec
Correct asserts for a poorly written test
pankajkoti Feb 1, 2025
fdc1668
Fix a bunch of type-check errors
pankajkoti Feb 1, 2025
9325ac7
fix type check
pankajkoti Feb 2, 2025
b21dabb
Ignore a test for the time being
pankajkoti Feb 2, 2025
d9f4abc
Import annotations from future
pankajkoti Feb 2, 2025
a6209b1
Set log property for abstract class
pankajkoti Feb 2, 2025
b5488ea
Add tests & minor refactorings
pankajkoti Feb 3, 2025
8751ff7
Fix DAG args and mark a test integration due to adapter dependency
pankajkoti Feb 3, 2025
4f59ed1
Remove unused code in airflow_async.py
pankajkoti Feb 3, 2025
d0f9b23
Add some tests
pankajkoti Feb 3, 2025
563f9d9
Merge branch 'main' into monkeypatch-bq-adapter
pankajkoti Feb 3, 2025
089e65e
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Feb 3, 2025
494a641
Add dataset alias support, uncomment commented tests, add few explana…
pankajkoti Feb 3, 2025
f76c954
Update Changelog for 1.9.0a5
pankajkoti Feb 3, 2025
df9ff3f
Merge branch 'main' into monkeypatch-bq-adapter
pankajkoti Feb 3, 2025
5385460
Merge branch 'main' into monkeypatch-bq-adapter
pankajkoti Feb 5, 2025
efba945
Update CHANGELOG.rst
pankajkoti Feb 5, 2025
ffb97f8
Update CHANGELOG.rst
pankajkoti Feb 5, 2025
08f1e85
Update CHANGELOG.rst
pankajkoti Feb 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Changelog
=========

1.9.0a4 (2025-01-29)
1.9.0a5 (2025-02-03)
--------------------

Breaking changes
Expand All @@ -18,6 +18,7 @@ Features
* Allow users to opt-out of ``dbtRunner`` during DAG parsing with ``InvocationMode.SUBPROCESS`` by @tatiana in #1495. Check out the `documentation <https://astronomer.github.io/astronomer-cosmos/configuration/render-config.html#how-to-run-dbt-ls-invocation-mode>`_.
* Add structure to support multiple db for async operator execution by @pankajastro in #1483
* Support overriding the ``profile_config`` per dbt node or folder using config by @tatiana in #1492. More information `here <https://astronomer.github.io/astronomer-cosmos/profiles/#profile-customise-per-node>`_.
* Use dbt to generate the full SQL and support different materializations for BQ for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti in #1474
pankajkoti marked this conversation as resolved.
Show resolved Hide resolved

Bug Fixes

Expand All @@ -27,9 +28,12 @@ Enhancement

* Fix OpenLineage deprecation warning by @CorsettiS in #1449
* Move ``DbtRunner`` related functions into ``dbt/runner.py`` module by @tatiana in #1480
* Add ``on_warning_callback`` to ``DbtSourceKubernetesOperator`` and refactor previous operators by @LuigiCerone in #1501


Others

* Ignore dbt package tests when running Cosmos tests by @tatiana in #1502
* GitHub Actions Dependabot: #1487
* Pre-commit updates: #1473, #1493

Expand Down
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
Contains dags, task groups, and operators.
"""

__version__ = "1.9.0a4"
__version__ = "1.9.0a5"


from cosmos.airflow.dag import DbtDag
Expand Down
28 changes: 0 additions & 28 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from cosmos.config import RenderConfig
from cosmos.constants import (
DBT_COMPILE_TASK_ID,
DEFAULT_DBT_RESOURCES,
SUPPORTED_BUILD_RESOURCES,
TESTABLE_DBT_RESOURCES,
Expand Down Expand Up @@ -392,32 +391,6 @@ def generate_task_or_group(
return task_or_group


def _add_dbt_compile_task(
pankajkoti marked this conversation as resolved.
Show resolved Hide resolved
nodes: dict[str, DbtNode],
dag: DAG,
execution_mode: ExecutionMode,
task_args: dict[str, Any],
tasks_map: dict[str, Any],
task_group: TaskGroup | None,
) -> None:
if execution_mode != ExecutionMode.AIRFLOW_ASYNC:
return

compile_task_metadata = TaskMetadata(
id=DBT_COMPILE_TASK_ID,
operator_class="cosmos.operators.airflow_async.DbtCompileAirflowAsyncOperator",
arguments=task_args,
extra_context={"dbt_dag_task_group_identifier": _get_dbt_dag_task_group_identifier(dag, task_group)},
)
compile_airflow_task = create_airflow_task(compile_task_metadata, dag, task_group=task_group)

for task_id, task in tasks_map.items():
if not task.upstream_list:
compile_airflow_task >> task

tasks_map[DBT_COMPILE_TASK_ID] = compile_airflow_task


def _get_dbt_dag_task_group_identifier(dag: DAG, task_group: TaskGroup | None) -> str:
dag_id = dag.dag_id
task_group_id = task_group.group_id if task_group else None
Expand Down Expand Up @@ -588,7 +561,6 @@ def build_airflow_graph(
tasks_map[node_id] = test_task

create_airflow_task_dependencies(nodes, tasks_map)
_add_dbt_compile_task(nodes, dag, execution_mode, task_args, tasks_map, task_group)
return tasks_map


Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import aenum
from packaging.version import Version

BIGQUERY_PROFILE_TYPE = "bigquery"
DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml")
DEFAULT_DBT_PROFILE_NAME = "cosmos_profile"
DEFAULT_DBT_TARGET_NAME = "cosmos_target"
Expand Down
18 changes: 18 additions & 0 deletions cosmos/dbt_adapters/__init__.py
pankajkoti marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from __future__ import annotations

from typing import Any

from cosmos.constants import BIGQUERY_PROFILE_TYPE
from cosmos.dbt_adapters.bigquery import _associate_bigquery_async_op_args, _mock_bigquery_adapter

PROFILE_TYPE_MOCK_ADAPTER_CALLABLE_MAP = {
pankajkoti marked this conversation as resolved.
Show resolved Hide resolved
BIGQUERY_PROFILE_TYPE: _mock_bigquery_adapter,
}

PROFILE_TYPE_ASSOCIATE_ARGS_CALLABLE_MAP = {
BIGQUERY_PROFILE_TYPE: _associate_bigquery_async_op_args,
}


def associate_async_operator_args(async_operator_obj: Any, profile_type: str, **kwargs: Any) -> Any:
return PROFILE_TYPE_ASSOCIATE_ARGS_CALLABLE_MAP[profile_type](async_operator_obj, **kwargs)
33 changes: 33 additions & 0 deletions cosmos/dbt_adapters/bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from __future__ import annotations

from typing import Any

from cosmos.exceptions import CosmosValueError


def _mock_bigquery_adapter() -> None:
from typing import Optional, Tuple

import agate
from dbt.adapters.bigquery.connections import BigQueryAdapterResponse, BigQueryConnectionManager
from dbt_common.clients.agate_helper import empty_table

def execute( # type: ignore[no-untyped-def]
self, sql, auto_begin=False, fetch=None, limit: Optional[int] = None
pankajkoti marked this conversation as resolved.
Show resolved Hide resolved
) -> Tuple[BigQueryAdapterResponse, agate.Table]:
return BigQueryAdapterResponse("mock_bigquery_adapter_response"), empty_table()

BigQueryConnectionManager.execute = execute


def _associate_bigquery_async_op_args(async_op_obj: Any, **kwargs: Any) -> Any:
sql = kwargs.get("sql")
if not sql:
raise CosmosValueError("Keyword argument 'sql' is required for BigQuery Async operator")
async_op_obj.configuration = {
"query": {
"query": sql,
"useLegacySql": False,
}
}
return async_op_obj
31 changes: 19 additions & 12 deletions cosmos/operators/_asynchronous/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from __future__ import annotations

import importlib
import logging
from abc import ABCMeta
from typing import Any, Sequence

from airflow.utils.context import Context
from typing import Any

from cosmos.airflow.graph import _snake_case_to_camelcase
from cosmos.config import ProfileConfig
Expand Down Expand Up @@ -36,11 +35,16 @@ def _create_async_operator_class(profile_type: str, dbt_class: str) -> Any:
return DbtRunLocalOperator


class DbtRunAirflowAsyncFactoryOperator(DbtRunLocalOperator, metaclass=ABCMeta): # type: ignore[misc]
class DbtRunAirflowAsyncFactoryOperator(DbtRunLocalOperator): # type: ignore[misc]

template_fields: Sequence[str] = DbtRunLocalOperator.template_fields + ("project_dir",) # type: ignore[operator]

def __init__(self, project_dir: str, profile_config: ProfileConfig, **kwargs: Any):
def __init__(
self,
project_dir: str,
profile_config: ProfileConfig,
extra_context: dict[str, object] | None = None,
dbt_kwargs: dict[str, object] | None = None,
**kwargs: Any,
) -> None:
self.project_dir = project_dir
self.profile_config = profile_config

Expand All @@ -51,7 +55,13 @@ def __init__(self, project_dir: str, profile_config: ProfileConfig, **kwargs: An
# When using composition instead of inheritance to initialize the async class and run its execute method,
# Airflow throws a `DuplicateTaskIdFound` error.
DbtRunAirflowAsyncFactoryOperator.__bases__ = (async_operator_class,)
super().__init__(project_dir=project_dir, profile_config=profile_config, **kwargs)
super().__init__(
project_dir=project_dir,
profile_config=profile_config,
extra_context=extra_context,
dbt_kwargs=dbt_kwargs,
**kwargs,
)

def create_async_operator(self) -> Any:

Expand All @@ -60,6 +70,3 @@ def create_async_operator(self) -> Any:
async_class_operator = _create_async_operator_class(profile_type, "DbtRun")

return async_class_operator

def execute(self, context: Context) -> None:
super().execute(context)
100 changes: 32 additions & 68 deletions cosmos/operators/_asynchronous/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING, Any, Sequence
from typing import Any, Sequence

from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
import airflow
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.utils.context import Context
from packaging.version import Version

from cosmos import settings
from cosmos.config import ProfileConfig
from cosmos.exceptions import CosmosValueError
from cosmos.settings import remote_target_path, remote_target_path_conn_id
from cosmos.dataset import get_dataset_alias_name
from cosmos.operators.local import AbstractDbtLocalBase

AIRFLOW_VERSION = Version(airflow.__version__)

class DbtRunAirflowAsyncBigqueryOperator(BigQueryInsertJobOperator): # type: ignore[misc]

class DbtRunAirflowAsyncBigqueryOperator(BigQueryInsertJobOperator, AbstractDbtLocalBase): # type: ignore[misc]

template_fields: Sequence[str] = (
"full_refresh",
"gcp_project",
"dataset",
"location",
Expand All @@ -27,6 +28,7 @@ def __init__(
project_dir: str,
profile_config: ProfileConfig,
extra_context: dict[str, Any] | None = None,
dbt_kwargs: dict[str, Any] | None = None,
**kwargs: Any,
):
self.project_dir = project_dir
Expand All @@ -36,73 +38,35 @@ def __init__(
self.gcp_project = profile["project"]
self.dataset = profile["dataset"]
self.extra_context = extra_context or {}
self.full_refresh = None
if "full_refresh" in kwargs:
self.full_refresh = kwargs.pop("full_refresh")
self.configuration: dict[str, Any] = {}
self.dbt_kwargs = dbt_kwargs or {}
task_id = self.dbt_kwargs.pop("task_id")
AbstractDbtLocalBase.__init__(
self, task_id=task_id, project_dir=project_dir, profile_config=profile_config, **self.dbt_kwargs
)
if kwargs.get("emit_datasets", True) and settings.enable_dataset_alias and AIRFLOW_VERSION >= Version("2.10"):
from airflow.datasets import DatasetAlias

# ignoring the type because older versions of Airflow raise the follow error in mypy
# error: Incompatible types in assignment (expression has type "list[DatasetAlias]", target has type "str")
dag_id = kwargs.get("dag")
task_group_id = kwargs.get("task_group")
kwargs["outlets"] = [
DatasetAlias(name=get_dataset_alias_name(dag_id, task_group_id, self.task_id))
] # type: ignore
super().__init__(
gcp_conn_id=self.gcp_conn_id,
configuration=self.configuration,
deferrable=True,
**kwargs,
)
self.async_context = extra_context or {}
self.async_context["profile_type"] = self.profile_config.get_profile_type()
self.async_context["async_operator"] = BigQueryInsertJobOperator

def get_remote_sql(self) -> str:
if not settings.AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(f"Cosmos async support is only available starting in Airflow 2.8 or later.")
from airflow.io.path import ObjectStoragePath

file_path = self.extra_context["dbt_node_config"]["file_path"] # type: ignore
dbt_dag_task_group_identifier = self.extra_context["dbt_dag_task_group_identifier"]

remote_target_path_str = str(remote_target_path).rstrip("/")

if TYPE_CHECKING: # pragma: no cover
assert self.project_dir is not None

project_dir_parent = str(Path(self.project_dir).parent)
relative_file_path = str(file_path).replace(project_dir_parent, "").lstrip("/")
remote_model_path = f"{remote_target_path_str}/{dbt_dag_task_group_identifier}/compiled/{relative_file_path}"

object_storage_path = ObjectStoragePath(remote_model_path, conn_id=remote_target_path_conn_id)
with object_storage_path.open() as fp: # type: ignore
return fp.read() # type: ignore

def drop_table_sql(self) -> None:
model_name = self.extra_context["dbt_node_config"]["resource_name"] # type: ignore
sql = f"DROP TABLE IF EXISTS {self.gcp_project}.{self.dataset}.{model_name};"

hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
)
self.configuration = {
"query": {
"query": sql,
"useLegacySql": False,
}
}
hook.insert_job(configuration=self.configuration, location=self.location, project_id=self.gcp_project)

def execute(self, context: Context) -> Any | None:
@property
def base_cmd(self) -> list[str]:
return ["run"]

if not self.full_refresh:
raise CosmosValueError("The async execution only supported for full_refresh")
else:
# It may be surprising to some, but the dbt-core --full-refresh argument fully drops the table before populating it
# https://github.com/dbt-labs/dbt-core/blob/5e9f1b515f37dfe6cdae1ab1aa7d190b92490e24/core/dbt/context/base.py#L662-L666
# https://docs.getdbt.com/reference/resource-configs/full_refresh#recommendation
# We're emulating this behaviour here
# The compiled SQL has several limitations here, but these will be addressed in the PR: https://github.com/astronomer/astronomer-cosmos/pull/1474.
self.drop_table_sql()
sql = self.get_remote_sql()
model_name = self.extra_context["dbt_node_config"]["resource_name"] # type: ignore
# prefix explicit create command to create table
sql = f"CREATE TABLE {self.gcp_project}.{self.dataset}.{model_name} AS {sql}"
self.configuration = {
"query": {
"query": sql,
"useLegacySql": False,
}
}
return super().execute(context)
def execute(self, context: Context, **kwargs: Any) -> None:
self.build_and_run_cmd(context=context, run_as_async=True, async_context=self.async_context)
1 change: 1 addition & 0 deletions cosmos/operators/_asynchronous/databricks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# TODO: Implement it
from __future__ import annotations

from typing import Any

Expand Down
Loading
Loading