diff --git a/redash/cli/rq.py b/redash/cli/rq.py index c103d6707d..b9a450eb44 100644 --- a/redash/cli/rq.py +++ b/redash/cli/rq.py @@ -5,11 +5,11 @@ from click import argument from flask.cli import AppGroup -from rq import Connection, Worker +from rq import Connection from sqlalchemy.orm import configure_mappers from redash import rq_redis_connection -from redash.schedule import rq_scheduler, schedule_periodic_jobs, periodic_job_definitions +from redash.tasks import HardLimitingWorker as Worker, rq_scheduler, schedule_periodic_jobs, periodic_job_definitions manager = AppGroup(help="RQ management commands.") diff --git a/redash/tasks/__init__.py b/redash/tasks/__init__.py index 6f0d590514..e51b55b52c 100644 --- a/redash/tasks/__init__.py +++ b/redash/tasks/__init__.py @@ -3,3 +3,5 @@ refresh_schemas, cleanup_query_results, empty_schedules) from .alerts import check_alerts_for_query from .failure_report import send_aggregated_errors +from .hard_limiting_worker import HardLimitingWorker +from .schedule import rq_scheduler, schedule_periodic_jobs, periodic_job_definitions diff --git a/redash/tasks/hard_limiting_worker.py b/redash/tasks/hard_limiting_worker.py new file mode 100644 index 0000000000..c4bb30166b --- /dev/null +++ b/redash/tasks/hard_limiting_worker.py @@ -0,0 +1,84 @@ +import errno +import os +from rq import Worker, get_current_job +from rq.utils import utcnow +from rq.timeouts import UnixSignalDeathPenalty, HorseMonitorTimeoutException +from rq.job import JobStatus + + +class HardLimitingWorker(Worker): + """ + RQ's work horses enforce time limits by setting a timed alarm and stopping jobs + when they reach their time limits. However, the work horse may be entirely blocked + and may not respond to the alarm interrupt. Since respecting timeouts is critical + in Redash (if we don't respect them, workers may be infinitely stuck and as a result, + service may be denied for other queries), we enforce two time limits: + 1. A soft time limit, enforced by the work horse + 2. A hard time limit, enforced by the parent worker + + The HardLimitingWorker class changes the default monitoring behavior of the default + RQ Worker by checking if the work horse is still busy with the job, even after + it should have timed out (+ a grace period of 15s). If it does, it kills the work horse. + """ + grace_period = 15 + + def soft_limit_exceeded(self, job): + seconds_under_monitor = (utcnow() - self.monitor_started).seconds + return seconds_under_monitor > job.timeout + self.grace_period + + def enforce_hard_limit(self, job): + self.log.warning('Job %s exceeded timeout of %ds (+%ds grace period) but work horse did not terminate it. ' + 'Killing the work horse.', job.id, job.timeout, self.grace_period) + self.kill_horse() + + def monitor_work_horse(self, job): + """The worker will monitor the work horse and make sure that it + either executes successfully or the status of the job is set to + failed + """ + self.monitor_started = utcnow() + while True: + try: + with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): + retpid, ret_val = os.waitpid(self._horse_pid, 0) + break + except HorseMonitorTimeoutException: + # Horse has not exited yet and is still running. + # Send a heartbeat to keep the worker alive. + self.heartbeat(self.job_monitoring_interval + 5) + + if self.soft_limit_exceeded(job): + self.enforce_hard_limit(job) + except OSError as e: + # In case we encountered an OSError due to EINTR (which is + # caused by a SIGINT or SIGTERM signal during + # os.waitpid()), we simply ignore it and enter the next + # iteration of the loop, waiting for the child to end. In + # any other case, this is some other unexpected OS error, + # which we don't want to catch, so we re-raise those ones. + if e.errno != errno.EINTR: + raise + # Send a heartbeat to keep the worker alive. + self.heartbeat() + + if ret_val == os.EX_OK: # The process exited normally. + return + job_status = job.get_status() + if job_status is None: # Job completed and its ttl has expired + return + if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: + + if not job.ended_at: + job.ended_at = utcnow() + + # Unhandled failure: move the job to the failed queue + self.log.warning(( + 'Moving job to FailedJobRegistry ' + '(work-horse terminated unexpectedly; waitpid returned {})' + ).format(ret_val)) + + self.handle_job_failure( + job, + exc_string="Work-horse process was terminated unexpectedly " + "(waitpid returned %s)" % ret_val + ) diff --git a/redash/schedule.py b/redash/tasks/schedule.py similarity index 99% rename from redash/schedule.py rename to redash/tasks/schedule.py index 8a577e4c39..b1a527cf42 100644 --- a/redash/schedule.py +++ b/redash/tasks/schedule.py @@ -21,6 +21,7 @@ queue_name="periodic", interval=5) + def job_id(kwargs): metadata = kwargs.copy() metadata['func'] = metadata['func'].__name__ diff --git a/tests/test_schedule.py b/tests/tasks/test_schedule.py similarity index 92% rename from tests/test_schedule.py rename to tests/tasks/test_schedule.py index 13a38b11ac..5a8b1d19b5 100644 --- a/tests/test_schedule.py +++ b/tests/tasks/test_schedule.py @@ -1,7 +1,7 @@ from unittest import TestCase from mock import patch, ANY -from redash.schedule import rq_scheduler, schedule_periodic_jobs +from redash.tasks.schedule import rq_scheduler, schedule_periodic_jobs class TestSchedule(TestCase): def setUp(self): @@ -26,7 +26,7 @@ def foo(): pass schedule_periodic_jobs([{"func": foo, "interval": 60}]) - with patch('redash.schedule.rq_scheduler.schedule') as schedule: + with patch('redash.tasks.rq_scheduler.schedule') as schedule: schedule_periodic_jobs([{"func": foo, "interval": 60}]) schedule.assert_not_called()