Skip to content

Commit

Permalink
If k8s is not installed, add if condition to exclude DAG(local_kubern…
Browse files Browse the repository at this point in the history
…etes_executor) (#22556)

GitOrigin-RevId: 27d72d75c5b9d332810dc63ab4624816d56ac4dc
  • Loading branch information
subkanthi authored and Cloud Composer Team committed Sep 12, 2024
1 parent 0e581fa commit 6af81b7
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 25 deletions.
1 change: 1 addition & 0 deletions airflow/example_dags/example_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
)
k8s = None


if k8s:
with DAG(
dag_id='example_kubernetes_executor',
Expand Down
52 changes: 27 additions & 25 deletions airflow/example_dags/example_local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,32 +36,34 @@
except ImportError:
log.warning("Could not import DAGs in example_local_kubernetes_executor.py", exc_info=True)
log.warning("Install Kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]")
k8s = None

with DAG(
dag_id='example_local_kubernetes_executor',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example3'],
) as dag:
# You can use annotations on your kubernetes pods!
start_task_executor_config = {
"pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
}
if k8s:
with DAG(
dag_id='example_local_kubernetes_executor',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example3'],
) as dag:
# You can use annotations on your kubernetes pods!
start_task_executor_config = {
"pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
}

@task(
executor_config=start_task_executor_config,
queue='kubernetes',
task_id='task_with_kubernetes_executor',
)
def task_with_template():
print_stuff()
@task(
executor_config=start_task_executor_config,
queue='kubernetes',
task_id='task_with_kubernetes_executor',
)
def task_with_template():
print_stuff()

@task(task_id='task_with_local_executor')
def task_with_local(ds=None, **kwargs):
"""Print the Airflow context and ds variable from the context."""
print(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
@task(task_id='task_with_local_executor')
def task_with_local(ds=None, **kwargs):
"""Print the Airflow context and ds variable from the context."""
print(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'

task_with_local() >> task_with_template()
task_with_local() >> task_with_template()

0 comments on commit 6af81b7

Please sign in to comment.