Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix calculation of health check threshold for SchedulerJob #31277

Merged
merged 1 commit into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2294,7 +2294,8 @@ scheduler:
description: |
If the last scheduler heartbeat happened more than scheduler_health_check_threshold
ago (in seconds), scheduler is considered unhealthy.
This is used by the health check in the "/health" endpoint
This is used by the health check in the "/health" endpoint and in `airflow jobs check` CLI
for SchedulerJob.
version_added: 1.10.2
type: string
example: ~
Expand Down
3 changes: 2 additions & 1 deletion airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,8 @@ pool_metrics_interval = 5.0

# If the last scheduler heartbeat happened more than scheduler_health_check_threshold
# ago (in seconds), scheduler is considered unhealthy.
# This is used by the health check in the "/health" endpoint
# This is used by the health check in the "/health" endpoint and in `airflow jobs check` CLI
# for SchedulerJob.
scheduler_health_check_threshold = 30

# When you start a scheduler, airflow starts a tiny web server
Expand Down
7 changes: 5 additions & 2 deletions airflow/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,13 @@ def is_alive(self, grace_multiplier=2.1):
:param grace_multiplier: multiplier of heartrate to require heart beat
within
"""
if self.job_type == "SchedulerJob":
health_check_threshold: int = conf.getint("scheduler", "scheduler_health_check_threshold")
Comment on lines +133 to +134
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to hard-code this, we can create an internal method on this base class that returns health_check_threshold and override that on SchedulerJob class

Copy link
Member Author

@potiuk potiuk May 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no SchedulerJob class any more.

Copy link
Member Author

@potiuk potiuk May 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was the gist of the change in #30302 and #30255 - to get rid of the Polymorphism we had and keep one "Job" which is not a "BaseJob" any more. We can (as next step and I might do it) move the threshold calculation to SchedulerJobRunner, but this one is the easiest and safest fix to put specific exception in the "is_alive" method.

Copy link
Member Author

@potiuk potiuk May 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW. There is also a follow-up from @jedcunningham to move the Job to models #31194 (where it actually belongs) and we could do it as part of that move - the whole decoupling actually made it possible to do such a move.

Copy link
Member

@ashb ashb May 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tiny a change in behaviour.

Previously if you called SchedulerJob.is_alive(2.2) it would use the multiplier instead of the configures threshold.

This is probably okay to ignore the difference but should be called out in a news fragment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But yeah - might be actually good to spell it out that you should use airflow jobs check now instead of SchedulerJob model as we know poeple DID use the queries (In Matthew's chart for one). Let me add it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've addded description. I will be weakly available this afternoon to follow-up, but I am perfectly fine with refining the message I explained in the newsfragment if someone would like to describe it better (feel free to add changes and merge without me doing so :) )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashb - I think it makes little sense to mention the multiplier as a change - it's better to say (if you've used it in the past - don't do it at all).

Copy link
Member Author

@potiuk potiuk May 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @kaxil -> I also added another fixup, where I went a bit further - i.e. I implemented in the way that SchedulerJobRunner is overriding the threshold, so that it is not "hard-coded" (so what I discussed above) - I think it might be a bit nicer in the sense it is now not "hard-coded" in Job and moved to the runner- Job is now not aware about Scheduler Job at all. Maybe that is a better idea?

(and I removed the grace_multiplier altogether as parameter - converted it to a constant).

Copy link
Member Author

@potiuk potiuk May 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved it back - I realized that we cannot do it - > the check should be based on the job_type not set externally by the runner - the way it is used by job check is that it retrives it from the DB - and runner is not involved. So really the "is_alive" check must apply different threshold based on the type, not set by the runner.

else:
health_check_threshold: int = self.heartrate * grace_multiplier
return (
self.state == State.RUNNING
and (timezone.utcnow() - self.latest_heartbeat).total_seconds()
< self.heartrate * grace_multiplier
and (timezone.utcnow() - self.latest_heartbeat).total_seconds() < health_check_threshold
)

@provide_session
Expand Down
21 changes: 0 additions & 21 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,27 +264,6 @@ def _debug_dump(self, signum: int, frame: FrameType | None) -> None:
self.job.executor.debug_dump()
self.log.info("-" * 80)

def is_alive(self, grace_multiplier: float | None = None) -> bool:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used any more - I missed during the refactor that this "special" case for SchedulerJob is not called any more.

"""
Whether the SchedulerJob is alive.

We define alive as in a state of running and a heartbeat within the
threshold defined in the ``scheduler_health_check_threshold`` config
setting.

``grace_multiplier`` is accepted for compatibility with the parent class.

"""
if grace_multiplier is not None:
# Accept the same behaviour as superclass
return self.job.is_alive(grace_multiplier=grace_multiplier)
scheduler_health_check_threshold: int = conf.getint("scheduler", "scheduler_health_check_threshold")
return (
self.job.state == State.RUNNING
and (timezone.utcnow() - self.job.latest_heartbeat).total_seconds()
< scheduler_health_check_threshold
)

def __get_concurrency_maps(self, states: Iterable[TaskInstanceState], session: Session) -> ConcurrencyMap:
"""
Get the concurrency maps.
Expand Down
8 changes: 8 additions & 0 deletions newsfragments/31277.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Clarifications of the external Health Check mechanism and using ``Job`` classes.
ashb marked this conversation as resolved.
Show resolved Hide resolved

In the past SchedulerJob and other ``*Job`` classes are known to have been used to perform
external health checks for Airflow components. Those are, however, Airflow DB ORM related classes.
The DB models and database structure of Airflow are considered as internal implementation detail, following
`public interface <https://airflow.apache.org/docs/apache-airflow/stable/public-airflow-interface.html>`_).
Therefore, they should not be used for external health checks. Instead, you should use the
``airflow jobs check`` CLI command (introduced in Airflow 2.1) for that purpose.
23 changes: 23 additions & 0 deletions tests/jobs/test_base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,29 @@ def test_is_alive(self):
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=10)
assert job.is_alive() is False, "Completed jobs even with recent heartbeat should not be alive"

def test_is_alive_scheduler(self):
job = Job(heartrate=10, state=State.RUNNING, job_type="SchedulerJob")
assert job.is_alive() is True

job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=20)
assert job.is_alive() is True

# default health-check grace period for scheduler job is not heartrate*2.1 but 30 seconds
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=21)
assert job.is_alive() is True

job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=31)
assert job.is_alive() is False

# test because .seconds was used before instead of total_seconds
# internal repr of datetime is (days, seconds)
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(days=1)
assert job.is_alive() is False

job.state = State.SUCCESS
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=10)
assert job.is_alive() is False, "Completed jobs even with recent heartbeat should not be alive"

@patch("airflow.jobs.job.create_session")
def test_heartbeat_failed(self, mock_create_session):
when = timezone.utcnow() - datetime.timedelta(seconds=60)
Expand Down