From ed925d56f837d40aeece32e5e501fe89ed48a739 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Wed, 13 Nov 2019 22:59:59 +0200 Subject: [PATCH 1/8] enforce hard limits on non-responsive work horses by workers --- redash/cli/rq.py | 3 ++- redash/worker.py | 67 ++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/redash/cli/rq.py b/redash/cli/rq.py index c103d6707d..eb917c8d39 100644 --- a/redash/cli/rq.py +++ b/redash/cli/rq.py @@ -5,10 +5,11 @@ from click import argument from flask.cli import AppGroup -from rq import Connection, Worker +from rq import Connection from sqlalchemy.orm import configure_mappers from redash import rq_redis_connection +from redash.worker import HardTimeLimitingWorker as Worker from redash.schedule import rq_scheduler, schedule_periodic_jobs, periodic_job_definitions manager = AppGroup(help="RQ management commands.") diff --git a/redash/worker.py b/redash/worker.py index 18da7de7a2..99431f0ecf 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -1,4 +1,5 @@ - +import errno +import os from datetime import timedelta from functools import partial @@ -9,8 +10,11 @@ from celery.signals import worker_process_init from celery.utils.log import get_logger -from rq import get_current_job +from rq import Worker, get_current_job +from rq.utils import utcnow from rq.decorators import job as rq_job +from rq.timeouts import UnixSignalDeathPenalty, HorseMonitorTimeoutException +from rq.job import JobStatus from redash import create_app, extensions, settings, redis_connection, rq_redis_connection from redash.metrics import celery as celery_metrics # noqa @@ -93,3 +97,62 @@ def add_periodic_tasks(sender, **kwargs): for params in extensions.periodic_tasks.values(): # Add it to Celery's periodic task registry, too. sender.add_periodic_task(**params) + + +class HardTimeLimitingWorker(Worker): + grace_period = 15 + + def monitor_work_horse(self, job): + """The worker will monitor the work horse and make sure that it + either executes successfully or the status of the job is set to + failed + """ + monitor_started = utcnow() + while True: + try: + with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): + retpid, ret_val = os.waitpid(self._horse_pid, 0) + break + except HorseMonitorTimeoutException: + # Horse has not exited yet and is still running. + # Send a heartbeat to keep the worker alive. + self.heartbeat(self.job_monitoring_interval + 5) + + seconds_under_monitor = (utcnow() - monitor_started).seconds + if seconds_under_monitor > job.timeout + self.grace_period: + self.log.warning('Job %s exceeded timeout of %ds (+%ds grace period) but work horse did not terminate it. ' + 'Killing the work horse.', job.id, job.timeout, self.grace_period) + self.kill_horse() + except OSError as e: + # In case we encountered an OSError due to EINTR (which is + # caused by a SIGINT or SIGTERM signal during + # os.waitpid()), we simply ignore it and enter the next + # iteration of the loop, waiting for the child to end. In + # any other case, this is some other unexpected OS error, + # which we don't want to catch, so we re-raise those ones. + if e.errno != errno.EINTR: + raise + # Send a heartbeat to keep the worker alive. + self.heartbeat() + + if ret_val == os.EX_OK: # The process exited normally. + return + job_status = job.get_status() + if job_status is None: # Job completed and its ttl has expired + return + if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: + + if not job.ended_at: + job.ended_at = utcnow() + + # Unhandled failure: move the job to the failed queue + self.log.warning(( + 'Moving job to FailedJobRegistry ' + '(work-horse terminated unexpectedly; waitpid returned {})' + ).format(ret_val)) + + self.handle_job_failure( + job, + exc_string="Work-horse process was terminated unexpectedly " + "(waitpid returned %s)" % ret_val + ) From 859fe2ab776140cbd765cf920b6d1f3a2d9d027e Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Wed, 13 Nov 2019 23:30:01 +0200 Subject: [PATCH 2/8] move differences from Worker to helper methods to help make the specialization clearer --- redash/worker.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/redash/worker.py b/redash/worker.py index 99431f0ecf..d7a103218e 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -102,12 +102,21 @@ def add_periodic_tasks(sender, **kwargs): class HardTimeLimitingWorker(Worker): grace_period = 15 + def soft_limit_exceeded(self, job): + seconds_under_monitor = (utcnow() - self.monitor_started).seconds + return seconds_under_monitor > job.timeout + self.grace_period + + def enforce_hard_limit(self, job): + self.log.warning('Job %s exceeded timeout of %ds (+%ds grace period) but work horse did not terminate it. ' + 'Killing the work horse.', job.id, job.timeout, self.grace_period) + self.kill_horse() + def monitor_work_horse(self, job): """The worker will monitor the work horse and make sure that it either executes successfully or the status of the job is set to failed """ - monitor_started = utcnow() + self.monitor_started = utcnow() while True: try: with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): @@ -118,11 +127,8 @@ def monitor_work_horse(self, job): # Send a heartbeat to keep the worker alive. self.heartbeat(self.job_monitoring_interval + 5) - seconds_under_monitor = (utcnow() - monitor_started).seconds - if seconds_under_monitor > job.timeout + self.grace_period: - self.log.warning('Job %s exceeded timeout of %ds (+%ds grace period) but work horse did not terminate it. ' - 'Killing the work horse.', job.id, job.timeout, self.grace_period) - self.kill_horse() + if self.soft_limit_exceeded(job): + self.enforce_hard_limit(job) except OSError as e: # In case we encountered an OSError due to EINTR (which is # caused by a SIGINT or SIGTERM signal during From d12010095caff3dc884b2491294dc65fd98b16c9 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Thu, 14 Nov 2019 20:06:27 +0000 Subject: [PATCH 3/8] move HardLimitingWorker to redash/tasks --- redash/cli/rq.py | 2 +- redash/tasks/__init__.py | 1 + redash/tasks/hard_limiting_worker.py | 70 +++++++++++++++++++++++++++ redash/worker.py | 72 +--------------------------- 4 files changed, 73 insertions(+), 72 deletions(-) create mode 100644 redash/tasks/hard_limiting_worker.py diff --git a/redash/cli/rq.py b/redash/cli/rq.py index eb917c8d39..7ad2759e09 100644 --- a/redash/cli/rq.py +++ b/redash/cli/rq.py @@ -9,7 +9,7 @@ from sqlalchemy.orm import configure_mappers from redash import rq_redis_connection -from redash.worker import HardTimeLimitingWorker as Worker +from redash.tasks import HardLimitingWorker as Worker from redash.schedule import rq_scheduler, schedule_periodic_jobs, periodic_job_definitions manager = AppGroup(help="RQ management commands.") diff --git a/redash/tasks/__init__.py b/redash/tasks/__init__.py index 6f0d590514..cf195e64ea 100644 --- a/redash/tasks/__init__.py +++ b/redash/tasks/__init__.py @@ -3,3 +3,4 @@ refresh_schemas, cleanup_query_results, empty_schedules) from .alerts import check_alerts_for_query from .failure_report import send_aggregated_errors +from .hard_limiting_worker import HardLimitingWorker \ No newline at end of file diff --git a/redash/tasks/hard_limiting_worker.py b/redash/tasks/hard_limiting_worker.py new file mode 100644 index 0000000000..d95a5e045b --- /dev/null +++ b/redash/tasks/hard_limiting_worker.py @@ -0,0 +1,70 @@ +import errno +import os +from rq import Worker, get_current_job +from rq.utils import utcnow +from rq.timeouts import UnixSignalDeathPenalty, HorseMonitorTimeoutException +from rq.job import JobStatus + +class HardLimitingWorker(Worker): + grace_period = 15 + + def soft_limit_exceeded(self, job): + seconds_under_monitor = (utcnow() - self.monitor_started).seconds + return seconds_under_monitor > job.timeout + self.grace_period + + def enforce_hard_limit(self, job): + self.log.warning('Job %s exceeded timeout of %ds (+%ds grace period) but work horse did not terminate it. ' + 'Killing the work horse.', job.id, job.timeout, self.grace_period) + self.kill_horse() + + def monitor_work_horse(self, job): + """The worker will monitor the work horse and make sure that it + either executes successfully or the status of the job is set to + failed + """ + self.monitor_started = utcnow() + while True: + try: + with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): + retpid, ret_val = os.waitpid(self._horse_pid, 0) + break + except HorseMonitorTimeoutException: + # Horse has not exited yet and is still running. + # Send a heartbeat to keep the worker alive. + self.heartbeat(self.job_monitoring_interval + 5) + + if self.soft_limit_exceeded(job): + self.enforce_hard_limit(job) + except OSError as e: + # In case we encountered an OSError due to EINTR (which is + # caused by a SIGINT or SIGTERM signal during + # os.waitpid()), we simply ignore it and enter the next + # iteration of the loop, waiting for the child to end. In + # any other case, this is some other unexpected OS error, + # which we don't want to catch, so we re-raise those ones. + if e.errno != errno.EINTR: + raise + # Send a heartbeat to keep the worker alive. + self.heartbeat() + + if ret_val == os.EX_OK: # The process exited normally. + return + job_status = job.get_status() + if job_status is None: # Job completed and its ttl has expired + return + if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: + + if not job.ended_at: + job.ended_at = utcnow() + + # Unhandled failure: move the job to the failed queue + self.log.warning(( + 'Moving job to FailedJobRegistry ' + '(work-horse terminated unexpectedly; waitpid returned {})' + ).format(ret_val)) + + self.handle_job_failure( + job, + exc_string="Work-horse process was terminated unexpectedly " + "(waitpid returned %s)" % ret_val + ) diff --git a/redash/worker.py b/redash/worker.py index d7a103218e..13ec1b18e9 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -1,5 +1,3 @@ -import errno -import os from datetime import timedelta from functools import partial @@ -10,11 +8,8 @@ from celery.signals import worker_process_init from celery.utils.log import get_logger -from rq import Worker, get_current_job -from rq.utils import utcnow +from rq import get_current_job from rq.decorators import job as rq_job -from rq.timeouts import UnixSignalDeathPenalty, HorseMonitorTimeoutException -from rq.job import JobStatus from redash import create_app, extensions, settings, redis_connection, rq_redis_connection from redash.metrics import celery as celery_metrics # noqa @@ -97,68 +92,3 @@ def add_periodic_tasks(sender, **kwargs): for params in extensions.periodic_tasks.values(): # Add it to Celery's periodic task registry, too. sender.add_periodic_task(**params) - - -class HardTimeLimitingWorker(Worker): - grace_period = 15 - - def soft_limit_exceeded(self, job): - seconds_under_monitor = (utcnow() - self.monitor_started).seconds - return seconds_under_monitor > job.timeout + self.grace_period - - def enforce_hard_limit(self, job): - self.log.warning('Job %s exceeded timeout of %ds (+%ds grace period) but work horse did not terminate it. ' - 'Killing the work horse.', job.id, job.timeout, self.grace_period) - self.kill_horse() - - def monitor_work_horse(self, job): - """The worker will monitor the work horse and make sure that it - either executes successfully or the status of the job is set to - failed - """ - self.monitor_started = utcnow() - while True: - try: - with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): - retpid, ret_val = os.waitpid(self._horse_pid, 0) - break - except HorseMonitorTimeoutException: - # Horse has not exited yet and is still running. - # Send a heartbeat to keep the worker alive. - self.heartbeat(self.job_monitoring_interval + 5) - - if self.soft_limit_exceeded(job): - self.enforce_hard_limit(job) - except OSError as e: - # In case we encountered an OSError due to EINTR (which is - # caused by a SIGINT or SIGTERM signal during - # os.waitpid()), we simply ignore it and enter the next - # iteration of the loop, waiting for the child to end. In - # any other case, this is some other unexpected OS error, - # which we don't want to catch, so we re-raise those ones. - if e.errno != errno.EINTR: - raise - # Send a heartbeat to keep the worker alive. - self.heartbeat() - - if ret_val == os.EX_OK: # The process exited normally. - return - job_status = job.get_status() - if job_status is None: # Job completed and its ttl has expired - return - if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: - - if not job.ended_at: - job.ended_at = utcnow() - - # Unhandled failure: move the job to the failed queue - self.log.warning(( - 'Moving job to FailedJobRegistry ' - '(work-horse terminated unexpectedly; waitpid returned {})' - ).format(ret_val)) - - self.handle_job_failure( - job, - exc_string="Work-horse process was terminated unexpectedly " - "(waitpid returned %s)" % ret_val - ) From 1fa6abf219483cf364d762f35b4b31450fa56d59 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sat, 16 Nov 2019 21:22:56 +0000 Subject: [PATCH 4/8] move schedule.py to /tasks --- redash/cli/rq.py | 3 +-- redash/tasks/__init__.py | 3 ++- redash/{ => tasks}/schedule.py | 0 tests/{ => tasks}/test_schedule.py | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) rename redash/{ => tasks}/schedule.py (100%) rename tests/{ => tasks}/test_schedule.py (92%) diff --git a/redash/cli/rq.py b/redash/cli/rq.py index 7ad2759e09..b9a450eb44 100644 --- a/redash/cli/rq.py +++ b/redash/cli/rq.py @@ -9,8 +9,7 @@ from sqlalchemy.orm import configure_mappers from redash import rq_redis_connection -from redash.tasks import HardLimitingWorker as Worker -from redash.schedule import rq_scheduler, schedule_periodic_jobs, periodic_job_definitions +from redash.tasks import HardLimitingWorker as Worker, rq_scheduler, schedule_periodic_jobs, periodic_job_definitions manager = AppGroup(help="RQ management commands.") diff --git a/redash/tasks/__init__.py b/redash/tasks/__init__.py index cf195e64ea..f622eb481b 100644 --- a/redash/tasks/__init__.py +++ b/redash/tasks/__init__.py @@ -3,4 +3,5 @@ refresh_schemas, cleanup_query_results, empty_schedules) from .alerts import check_alerts_for_query from .failure_report import send_aggregated_errors -from .hard_limiting_worker import HardLimitingWorker \ No newline at end of file +from .hard_limiting_worker import * +from .schedule import * \ No newline at end of file diff --git a/redash/schedule.py b/redash/tasks/schedule.py similarity index 100% rename from redash/schedule.py rename to redash/tasks/schedule.py diff --git a/tests/test_schedule.py b/tests/tasks/test_schedule.py similarity index 92% rename from tests/test_schedule.py rename to tests/tasks/test_schedule.py index 13a38b11ac..5a8b1d19b5 100644 --- a/tests/test_schedule.py +++ b/tests/tasks/test_schedule.py @@ -1,7 +1,7 @@ from unittest import TestCase from mock import patch, ANY -from redash.schedule import rq_scheduler, schedule_periodic_jobs +from redash.tasks.schedule import rq_scheduler, schedule_periodic_jobs class TestSchedule(TestCase): def setUp(self): @@ -26,7 +26,7 @@ def foo(): pass schedule_periodic_jobs([{"func": foo, "interval": 60}]) - with patch('redash.schedule.rq_scheduler.schedule') as schedule: + with patch('redash.tasks.rq_scheduler.schedule') as schedule: schedule_periodic_jobs([{"func": foo, "interval": 60}]) schedule.assert_not_called() From 1251b9b24ea23e2bd4233daa28b352e5bed0e073 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sat, 16 Nov 2019 21:30:19 +0000 Subject: [PATCH 5/8] explain the motivation for HardLimitingWorker --- redash/tasks/hard_limiting_worker.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/redash/tasks/hard_limiting_worker.py b/redash/tasks/hard_limiting_worker.py index d95a5e045b..f85957e18b 100644 --- a/redash/tasks/hard_limiting_worker.py +++ b/redash/tasks/hard_limiting_worker.py @@ -5,6 +5,19 @@ from rq.timeouts import UnixSignalDeathPenalty, HorseMonitorTimeoutException from rq.job import JobStatus +""" +RQ's work horses enforce time limits by setting a timed alarm and stopping jobs +when they reach their time limits. However, the work horse may be entirely blocked +and may not respond to the alarm interrupt. Since respecting timeouts is critical +in Redash (if we don't respect them, workers may be infinitely stuck and as a result, +service may be denied for other queries), we enforce two time limits: +1. A soft time limit, enforced by the work horse +2. A hard time limit, enforced by the parent worker + +The HardLimitingWorker class changes the default monitoring behavior of the default +RQ Worker by checking if the work horse is still busy with the job, even after +it should have timed out (+ a grace period of 15s). If it does, it kills the work horse. +""" class HardLimitingWorker(Worker): grace_period = 15 From 4ae624b2b3f7a76b7138390d98aaafec4be5792a Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sun, 17 Nov 2019 09:04:53 +0000 Subject: [PATCH 6/8] pleasing CodeClimate --- redash/tasks/__init__.py | 2 +- redash/tasks/hard_limiting_worker.py | 26 +++++++++++++------------- redash/tasks/schedule.py | 1 + 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/redash/tasks/__init__.py b/redash/tasks/__init__.py index f622eb481b..b75d01caad 100644 --- a/redash/tasks/__init__.py +++ b/redash/tasks/__init__.py @@ -4,4 +4,4 @@ from .alerts import check_alerts_for_query from .failure_report import send_aggregated_errors from .hard_limiting_worker import * -from .schedule import * \ No newline at end of file +from .schedule import * diff --git a/redash/tasks/hard_limiting_worker.py b/redash/tasks/hard_limiting_worker.py index f85957e18b..56e0788981 100644 --- a/redash/tasks/hard_limiting_worker.py +++ b/redash/tasks/hard_limiting_worker.py @@ -5,20 +5,20 @@ from rq.timeouts import UnixSignalDeathPenalty, HorseMonitorTimeoutException from rq.job import JobStatus -""" -RQ's work horses enforce time limits by setting a timed alarm and stopping jobs -when they reach their time limits. However, the work horse may be entirely blocked -and may not respond to the alarm interrupt. Since respecting timeouts is critical -in Redash (if we don't respect them, workers may be infinitely stuck and as a result, -service may be denied for other queries), we enforce two time limits: -1. A soft time limit, enforced by the work horse -2. A hard time limit, enforced by the parent worker - -The HardLimitingWorker class changes the default monitoring behavior of the default -RQ Worker by checking if the work horse is still busy with the job, even after -it should have timed out (+ a grace period of 15s). If it does, it kills the work horse. -""" class HardLimitingWorker(Worker): + """ + RQ's work horses enforce time limits by setting a timed alarm and stopping jobs + when they reach their time limits. However, the work horse may be entirely blocked + and may not respond to the alarm interrupt. Since respecting timeouts is critical + in Redash (if we don't respect them, workers may be infinitely stuck and as a result, + service may be denied for other queries), we enforce two time limits: + 1. A soft time limit, enforced by the work horse + 2. A hard time limit, enforced by the parent worker + + The HardLimitingWorker class changes the default monitoring behavior of the default + RQ Worker by checking if the work horse is still busy with the job, even after + it should have timed out (+ a grace period of 15s). If it does, it kills the work horse. + """ grace_period = 15 def soft_limit_exceeded(self, job): diff --git a/redash/tasks/schedule.py b/redash/tasks/schedule.py index 8a577e4c39..b1a527cf42 100644 --- a/redash/tasks/schedule.py +++ b/redash/tasks/schedule.py @@ -21,6 +21,7 @@ queue_name="periodic", interval=5) + def job_id(kwargs): metadata = kwargs.copy() metadata['func'] = metadata['func'].__name__ From 9cfd453a1f7138ff5c1427a5a15b0fa5c8848b85 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sun, 17 Nov 2019 09:18:26 +0000 Subject: [PATCH 7/8] pleasing CodeClimate --- redash/tasks/hard_limiting_worker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/redash/tasks/hard_limiting_worker.py b/redash/tasks/hard_limiting_worker.py index 56e0788981..c4bb30166b 100644 --- a/redash/tasks/hard_limiting_worker.py +++ b/redash/tasks/hard_limiting_worker.py @@ -5,6 +5,7 @@ from rq.timeouts import UnixSignalDeathPenalty, HorseMonitorTimeoutException from rq.job import JobStatus + class HardLimitingWorker(Worker): """ RQ's work horses enforce time limits by setting a timed alarm and stopping jobs @@ -14,7 +15,7 @@ class HardLimitingWorker(Worker): service may be denied for other queries), we enforce two time limits: 1. A soft time limit, enforced by the work horse 2. A hard time limit, enforced by the parent worker - + The HardLimitingWorker class changes the default monitoring behavior of the default RQ Worker by checking if the work horse is still busy with the job, even after it should have timed out (+ a grace period of 15s). If it does, it kills the work horse. From 768f0f610e467ba7b321815ff4dfcff4af6b448b Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Tue, 26 Nov 2019 13:10:56 +0000 Subject: [PATCH 8/8] avoid star imports --- redash/tasks/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redash/tasks/__init__.py b/redash/tasks/__init__.py index b75d01caad..e51b55b52c 100644 --- a/redash/tasks/__init__.py +++ b/redash/tasks/__init__.py @@ -3,5 +3,5 @@ refresh_schemas, cleanup_query_results, empty_schedules) from .alerts import check_alerts_for_query from .failure_report import send_aggregated_errors -from .hard_limiting_worker import * -from .schedule import * +from .hard_limiting_worker import HardLimitingWorker +from .schedule import rq_scheduler, schedule_periodic_jobs, periodic_job_definitions