Skip to content

Commit

Permalink
Prevent scheduler crash when serialized dag is missing (apache#19113)
Browse files Browse the repository at this point in the history
Scheduler._send_dag_callbacks_to_processor calls dag_run.get_dag which
raises exception. This PR changes to calling dagbag.get_dag and changing
Scheduler._send_dag_callbacks_to_processor args to accept dag instead of dag_run.

(cherry picked from commit 5dc375a)
  • Loading branch information
ephraimbuddy authored and jedcunningham committed Oct 25, 2021
1 parent 6bc1524 commit 953ac92
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
14 changes: 8 additions & 6 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,12 @@ def _do_scheduling(self, session) -> int:

# Send the callbacks after we commit to ensure the context is up to date when it gets run
for dag_run, callback_to_run in callback_tuples:
self._send_dag_callbacks_to_processor(dag_run, callback_to_run)
dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
if not dag:
self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue

self._send_dag_callbacks_to_processor(dag, callback_to_run)

# Without this, the session has an invalid view of the DB
session.expunge_all()
Expand Down Expand Up @@ -992,7 +997,7 @@ def _schedule_dag_run(
)

# Send SLA & DAG Success/Failure Callbacks to be executed
self._send_dag_callbacks_to_processor(dag_run, callback_to_execute)
self._send_dag_callbacks_to_processor(dag, callback_to_execute)

return 0

Expand Down Expand Up @@ -1032,13 +1037,10 @@ def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session=None):
# Verify integrity also takes care of session.flush
dag_run.verify_integrity(session=session)

def _send_dag_callbacks_to_processor(
self, dag_run: DagRun, callback: Optional[DagCallbackRequest] = None
):
def _send_dag_callbacks_to_processor(self, dag: DAG, callback: Optional[DagCallbackRequest] = None):
if not self.processor_agent:
raise ValueError("Processor agent is not started.")

dag = dag_run.get_dag()
self._send_sla_callbacks_to_processor(dag)
if callback:
self.processor_agent.send_callback_to_execute(callback)
Expand Down
4 changes: 1 addition & 3 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1402,7 +1402,6 @@ def test_dagrun_callbacks_are_not_added_when_callbacks_are_not_defined(self, sta
self.scheduler_job._send_dag_callbacks_to_processor.assert_called_once()
call_args = self.scheduler_job._send_dag_callbacks_to_processor.call_args[0]
assert call_args[0].dag_id == dr.dag_id
assert call_args[0].execution_date == dr.execution_date
assert call_args[1] is None

session.rollback()
Expand Down Expand Up @@ -1433,11 +1432,10 @@ def test_dagrun_callbacks_are_added_when_callbacks_are_defined(self, state, msg,
with mock.patch.object(settings, "USE_JOB_SCHEDULE", False):
self.scheduler_job._do_scheduling(session)

# Verify Callback is not set (i.e is None) when no callbacks are set on DAG
# Verify Callback is set (i.e is None) when no callbacks are set on DAG
self.scheduler_job._send_dag_callbacks_to_processor.assert_called_once()
call_args = self.scheduler_job._send_dag_callbacks_to_processor.call_args[0]
assert call_args[0].dag_id == dr.dag_id
assert call_args[0].execution_date == dr.execution_date
assert call_args[1] is not None
assert call_args[1].msg == msg
session.rollback()
Expand Down

0 comments on commit 953ac92

Please sign in to comment.