Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor unneeded 'continue' jumps in api #33842

Merged
merged 1 commit into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions airflow/api/common/delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session =
count = 0

for model in get_sqla_model_classes():
if hasattr(model, "dag_id"):
if keep_records_in_log and model.__name__ == "Log":
continue
if hasattr(model, "dag_id") and (not keep_records_in_log or model.__name__ != "Log"):
count += session.execute(
delete(model)
.where(model.dag_id.in_(dags_to_delete))
Expand Down
26 changes: 12 additions & 14 deletions airflow/api/common/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,15 @@ def _create_dagruns(
}

for info in infos:
if info.logical_date in dag_runs:
continue
dag_runs[info.logical_date] = dag.create_dagrun(
execution_date=info.logical_date,
data_interval=info.data_interval,
start_date=timezone.utcnow(),
external_trigger=False,
state=state,
run_type=run_type,
)
if info.logical_date not in dag_runs:
dag_runs[info.logical_date] = dag.create_dagrun(
execution_date=info.logical_date,
data_interval=info.data_interval,
start_date=timezone.utcnow(),
external_trigger=False,
state=state,
run_type=run_type,
)
return dag_runs.values()


Expand Down Expand Up @@ -493,10 +492,9 @@ def set_dag_run_state_to_failed(

tasks = []
for task in dag.tasks:
if task.task_id not in task_ids_of_running_tis:
continue
task.dag = dag
tasks.append(task)
if task.task_id in task_ids_of_running_tis:
task.dag = dag
tasks.append(task)

# Mark non-finished tasks as SKIPPED.
tis = session.scalars(
Expand Down