Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task timeout support. #1317

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from typing import Any, Callable, Union

from datetime import timedelta

from airflow.models import BaseOperator
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup
Expand Down Expand Up @@ -135,6 +137,8 @@ def create_task_metadata(
dbt_dag_task_group_identifier: str,
use_task_group: bool = False,
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE,
model_timeout: bool = False,
model_sla: bool = False,
) -> TaskMetadata | None:
"""
Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node.
Expand Down Expand Up @@ -166,6 +170,11 @@ def create_task_metadata(
task_id = f"{node.name}_run"
if use_task_group is True:
task_id = "run"
if model_timeout and "model_timeout" in node.config.keys():
logger.error(f'model_timeout: {node.config["model_timeout"]} in values')
args["execution_timeout"] = timedelta(seconds=int(node.config["model_timeout"]))
if model_sla and "model_sla" in node.config.keys():
args["sla"] = timedelta(seconds=int(node.config["model_sla"]))
t0momi219 marked this conversation as resolved.
Show resolved Hide resolved
elif node.resource_type == DbtResourceType.SOURCE:
if (source_rendering_behavior == SourceRenderingBehavior.NONE) or (
source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS
Expand Down Expand Up @@ -217,6 +226,8 @@ def generate_task_or_group(
source_rendering_behavior: SourceRenderingBehavior,
test_indirect_selection: TestIndirectSelection,
on_warning_callback: Callable[..., Any] | None,
model_timeout: bool,
model_sla: bool,
**kwargs: Any,
) -> BaseOperator | TaskGroup | None:
task_or_group: BaseOperator | TaskGroup | None = None
Expand All @@ -234,6 +245,8 @@ def generate_task_or_group(
dbt_dag_task_group_identifier=_get_dbt_dag_task_group_identifier(dag, task_group),
use_task_group=use_task_group,
source_rendering_behavior=source_rendering_behavior,
model_timeout=model_timeout,
model_sla=model_sla
)

# In most cases, we'll map one DBT node to one Airflow task
Expand Down Expand Up @@ -335,6 +348,8 @@ def build_airflow_graph(
node_converters = render_config.node_converters or {}
test_behavior = render_config.test_behavior
source_rendering_behavior = render_config.source_rendering_behavior
model_timeout = render_config.model_timeout
model_sla = render_config.model_sla
tasks_map = {}
task_or_group: TaskGroup | BaseOperator

Expand All @@ -356,6 +371,8 @@ def build_airflow_graph(
source_rendering_behavior=source_rendering_behavior,
test_indirect_selection=test_indirect_selection,
on_warning_callback=on_warning_callback,
model_timeout=model_timeout,
model_sla=model_sla,
node=node,
)
if task_or_group is not None:
Expand Down
2 changes: 2 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class RenderConfig:
enable_mock_profile: bool = True
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE
airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list)
model_timeout: bool = False
model_sla: bool = False
t0momi219 marked this conversation as resolved.
Show resolved Hide resolved

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
if self.env_vars:
Expand Down
Loading