-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RQ: implement reliable timeout #4305
Closed
Closed
Changes from 8 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
ed925d5
enforce hard limits on non-responsive work horses by workers
859fe2a
move differences from Worker to helper methods to help make the speci…
d120100
move HardLimitingWorker to redash/tasks
86b9075
Merge branch 'master' into hard-time-limit
1fa6abf
move schedule.py to /tasks
1251b9b
explain the motivation for HardLimitingWorker
4ae624b
pleasing CodeClimate
9cfd453
pleasing CodeClimate
768f0f6
avoid star imports
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,84 @@ | ||||||
import errno | ||||||
import os | ||||||
from rq import Worker, get_current_job | ||||||
from rq.utils import utcnow | ||||||
from rq.timeouts import UnixSignalDeathPenalty, HorseMonitorTimeoutException | ||||||
from rq.job import JobStatus | ||||||
|
||||||
|
||||||
class HardLimitingWorker(Worker): | ||||||
""" | ||||||
RQ's work horses enforce time limits by setting a timed alarm and stopping jobs | ||||||
when they reach their time limits. However, the work horse may be entirely blocked | ||||||
and may not respond to the alarm interrupt. Since respecting timeouts is critical | ||||||
in Redash (if we don't respect them, workers may be infinitely stuck and as a result, | ||||||
service may be denied for other queries), we enforce two time limits: | ||||||
1. A soft time limit, enforced by the work horse | ||||||
2. A hard time limit, enforced by the parent worker | ||||||
|
||||||
The HardLimitingWorker class changes the default monitoring behavior of the default | ||||||
RQ Worker by checking if the work horse is still busy with the job, even after | ||||||
it should have timed out (+ a grace period of 15s). If it does, it kills the work horse. | ||||||
""" | ||||||
grace_period = 15 | ||||||
|
||||||
def soft_limit_exceeded(self, job): | ||||||
seconds_under_monitor = (utcnow() - self.monitor_started).seconds | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Small consideration. The timedelta docs say that .seconds is:
Suggested change
|
||||||
return seconds_under_monitor > job.timeout + self.grace_period | ||||||
|
||||||
def enforce_hard_limit(self, job): | ||||||
self.log.warning('Job %s exceeded timeout of %ds (+%ds grace period) but work horse did not terminate it. ' | ||||||
'Killing the work horse.', job.id, job.timeout, self.grace_period) | ||||||
self.kill_horse() | ||||||
|
||||||
def monitor_work_horse(self, job): | ||||||
"""The worker will monitor the work horse and make sure that it | ||||||
either executes successfully or the status of the job is set to | ||||||
failed | ||||||
""" | ||||||
self.monitor_started = utcnow() | ||||||
while True: | ||||||
try: | ||||||
with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): | ||||||
retpid, ret_val = os.waitpid(self._horse_pid, 0) | ||||||
break | ||||||
except HorseMonitorTimeoutException: | ||||||
# Horse has not exited yet and is still running. | ||||||
# Send a heartbeat to keep the worker alive. | ||||||
self.heartbeat(self.job_monitoring_interval + 5) | ||||||
|
||||||
if self.soft_limit_exceeded(job): | ||||||
self.enforce_hard_limit(job) | ||||||
except OSError as e: | ||||||
# In case we encountered an OSError due to EINTR (which is | ||||||
# caused by a SIGINT or SIGTERM signal during | ||||||
# os.waitpid()), we simply ignore it and enter the next | ||||||
# iteration of the loop, waiting for the child to end. In | ||||||
# any other case, this is some other unexpected OS error, | ||||||
# which we don't want to catch, so we re-raise those ones. | ||||||
if e.errno != errno.EINTR: | ||||||
raise | ||||||
# Send a heartbeat to keep the worker alive. | ||||||
self.heartbeat() | ||||||
|
||||||
if ret_val == os.EX_OK: # The process exited normally. | ||||||
return | ||||||
job_status = job.get_status() | ||||||
if job_status is None: # Job completed and its ttl has expired | ||||||
return | ||||||
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: | ||||||
|
||||||
if not job.ended_at: | ||||||
job.ended_at = utcnow() | ||||||
|
||||||
# Unhandled failure: move the job to the failed queue | ||||||
self.log.warning(( | ||||||
'Moving job to FailedJobRegistry ' | ||||||
'(work-horse terminated unexpectedly; waitpid returned {})' | ||||||
).format(ret_val)) | ||||||
|
||||||
self.handle_job_failure( | ||||||
job, | ||||||
exc_string="Work-horse process was terminated unexpectedly " | ||||||
"(waitpid returned %s)" % ret_val | ||||||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: please don't do
*
imports.