From 824977b41a1812b5e2722050a37b53d62606ba8b Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Thu, 22 Aug 2019 00:11:10 +0300 Subject: [PATCH 01/43] add rq and an rq_worker service --- bin/docker-entrypoint | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/bin/docker-entrypoint b/bin/docker-entrypoint index d88266fa2d..361a8dd597 100755 --- a/bin/docker-entrypoint +++ b/bin/docker-entrypoint @@ -28,6 +28,18 @@ worker() { exec /app/manage.py rq worker $QUEUES } +rq_worker() { + echo "Starting RQ worker..." + + exec /app/manage.py rq worker $QUEUES +} + +dev_rq_worker() { + echo "Starting dev RQ worker..." + + exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq worker $QUEUES +} + dev_worker() { echo "Starting dev RQ worker..." From 7a59a1a424adf132a50132b401f4b7d9c3fabbf0 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Thu, 22 Aug 2019 00:12:13 +0300 Subject: [PATCH 02/43] add rq_scheduler and an rq_scheduler service --- bin/docker-entrypoint | 12 ++++++++++++ redash/cli/rq.py | 2 ++ 2 files changed, 14 insertions(+) diff --git a/bin/docker-entrypoint b/bin/docker-entrypoint index 361a8dd597..b3460a2c3f 100755 --- a/bin/docker-entrypoint +++ b/bin/docker-entrypoint @@ -28,6 +28,18 @@ worker() { exec /app/manage.py rq worker $QUEUES } +rq_scheduler() { + echo "Starting RQ scheduler..." + + exec /app/manage.py rq scheduler +} + +dev_rq_scheduler() { + echo "Starting dev RQ scheduler..." + + exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq scheduler +} + rq_worker() { echo "Starting RQ worker..." diff --git a/redash/cli/rq.py b/redash/cli/rq.py index 357f7cfd2d..1d11bc06ba 100644 --- a/redash/cli/rq.py +++ b/redash/cli/rq.py @@ -6,6 +6,8 @@ from click import argument from flask.cli import AppGroup from rq import Connection, Worker +from rq_scheduler import Scheduler + from redash import rq_redis_connection from redash.schedule import rq_scheduler, schedule_periodic_jobs From 4a7c7b7a4301cca35aecf02a2e9d7516c42a26b5 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Thu, 22 Aug 2019 00:12:36 +0300 Subject: [PATCH 03/43] move beat schedule to periodic_jobs queue --- redash/cli/rq.py | 2 + redash/tasks/queries/execution.py | 129 ++++++++++++++++++++++++++++++ redash/worker.py | 19 +++++ 3 files changed, 150 insertions(+) diff --git a/redash/cli/rq.py b/redash/cli/rq.py index 1d11bc06ba..1749b969c4 100644 --- a/redash/cli/rq.py +++ b/redash/cli/rq.py @@ -8,6 +8,8 @@ from rq import Connection, Worker from rq_scheduler import Scheduler +SECONDS = 1 +MINUTES = 60 * SECONDS from redash import rq_redis_connection from redash.schedule import rq_scheduler, schedule_periodic_jobs diff --git a/redash/tasks/queries/execution.py b/redash/tasks/queries/execution.py index eead672319..4d7935265e 100644 --- a/redash/tasks/queries/execution.py +++ b/redash/tasks/queries/execution.py @@ -160,6 +160,135 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query return job +def empty_schedules(): + logger.info("Deleting schedules of past scheduled queries...") + + queries = models.Query.past_scheduled_queries() + for query in queries: + query.schedule = None + models.db.session.commit() + + logger.info("Deleted %d schedules.", len(queries)) + + +def refresh_queries(): + logger.info("Refreshing queries...") + + outdated_queries_count = 0 + query_ids = [] + + with statsd_client.timer('manager.outdated_queries_lookup'): + for query in models.Query.outdated_queries(): + if settings.FEATURE_DISABLE_REFRESH_QUERIES: + logging.info("Disabled refresh queries.") + elif query.org.is_disabled: + logging.debug("Skipping refresh of %s because org is disabled.", query.id) + elif query.data_source is None: + logging.debug("Skipping refresh of %s because the datasource is none.", query.id) + elif query.data_source.paused: + logging.debug("Skipping refresh of %s because datasource - %s is paused (%s).", + query.id, query.data_source.name, query.data_source.pause_reason) + else: + query_text = query.query_text + + parameters = {p['name']: p.get('value') for p in query.parameters} + if any(parameters): + try: + query_text = query.parameterized.apply(parameters).query + except InvalidParameterError as e: + error = u"Skipping refresh of {} because of invalid parameters: {}".format(query.id, e.message) + track_failure(query, error) + continue + except QueryDetachedFromDataSourceError as e: + error = ("Skipping refresh of {} because a related dropdown " + "query ({}) is unattached to any datasource.").format(query.id, e.query_id) + track_failure(query, error) + continue + + enqueue_query(query_text, query.data_source, query.user_id, + scheduled_query=query, + metadata={'Query ID': query.id, 'Username': 'Scheduled'}) + + query_ids.append(query.id) + outdated_queries_count += 1 + + statsd_client.gauge('manager.outdated_queries', outdated_queries_count) + + logger.info("Done refreshing queries. Found %d outdated queries: %s" % (outdated_queries_count, query_ids)) + + status = redis_connection.hgetall('redash:status') + now = time.time() + + redis_connection.hmset('redash:status', { + 'outdated_queries_count': outdated_queries_count, + 'last_refresh_at': now, + 'query_ids': json_dumps(query_ids) + }) + + statsd_client.gauge('manager.seconds_since_refresh', now - float(status.get('last_refresh_at', now))) + + +@celery.task(name="redash.tasks.cleanup_query_results") +def cleanup_query_results(): + """ + Job to cleanup unused query results -- such that no query links to them anymore, and older than + settings.QUERY_RESULTS_MAX_AGE (a week by default, so it's less likely to be open in someone's browser and be used). + + Each time the job deletes only settings.QUERY_RESULTS_CLEANUP_COUNT (100 by default) query results so it won't choke + the database in case of many such results. + """ + + logging.info("Running query results clean up (removing maximum of %d unused results, that are %d days old or more)", + settings.QUERY_RESULTS_CLEANUP_COUNT, settings.QUERY_RESULTS_CLEANUP_MAX_AGE) + + unused_query_results = models.QueryResult.unused(settings.QUERY_RESULTS_CLEANUP_MAX_AGE).limit(settings.QUERY_RESULTS_CLEANUP_COUNT) + deleted_count = models.QueryResult.query.filter( + models.QueryResult.id.in_(unused_query_results.subquery()) + ).delete(synchronize_session=False) + models.db.session.commit() + logger.info("Deleted %d unused query results.", deleted_count) + + +@celery.task(name="redash.tasks.refresh_schema", time_limit=90, soft_time_limit=60) +def refresh_schema(data_source_id): + ds = models.DataSource.get_by_id(data_source_id) + logger.info(u"task=refresh_schema state=start ds_id=%s", ds.id) + start_time = time.time() + try: + ds.get_schema(refresh=True) + logger.info(u"task=refresh_schema state=finished ds_id=%s runtime=%.2f", ds.id, time.time() - start_time) + statsd_client.incr('refresh_schema.success') + except SoftTimeLimitExceeded: + logger.info(u"task=refresh_schema state=timeout ds_id=%s runtime=%.2f", ds.id, time.time() - start_time) + statsd_client.incr('refresh_schema.timeout') + except Exception: + logger.warning(u"Failed refreshing schema for the data source: %s", ds.name, exc_info=1) + statsd_client.incr('refresh_schema.error') + logger.info(u"task=refresh_schema state=failed ds_id=%s runtime=%.2f", ds.id, time.time() - start_time) + + +def refresh_schemas(): + """ + Refreshes the data sources schemas. + """ + blacklist = [int(ds_id) for ds_id in redis_connection.smembers('data_sources:schema:blacklist') if ds_id] + global_start_time = time.time() + + logger.info(u"task=refresh_schemas state=start") + + for ds in models.DataSource.query: + if ds.paused: + logger.info(u"task=refresh_schema state=skip ds_id=%s reason=paused(%s)", ds.id, ds.pause_reason) + elif ds.id in blacklist: + logger.info(u"task=refresh_schema state=skip ds_id=%s reason=blacklist", ds.id) + elif ds.org.is_disabled: + logger.info(u"task=refresh_schema state=skip ds_id=%s reason=org_disabled", ds.id) + else: + refresh_schema.apply_async(args=(ds.id,), queue=settings.SCHEMAS_REFRESH_QUEUE) + + logger.info(u"task=refresh_schemas state=finish total_runtime=%.2f", time.time() - global_start_time) + + def signal_handler(*args): raise InterruptException diff --git a/redash/worker.py b/redash/worker.py index 18da7de7a2..607250a250 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -49,6 +49,25 @@ def get_job_logger(name): redis_backend_use_ssl=settings.CELERY_SSL_CONFIG, include='redash.tasks') +# The internal periodic Celery tasks to automatically schedule. +celery_schedule = { +} + +if settings.VERSION_CHECK: + celery_schedule['version_check'] = { + 'task': 'redash.tasks.version_check', + # We need to schedule the version check to run at a random hour/minute, to spread the requests from all users + # evenly. + 'schedule': crontab(minute=randint(0, 59), hour=randint(0, 23)) + } + +if settings.QUERY_RESULTS_CLEANUP_ENABLED: + celery_schedule['cleanup_query_results'] = { + 'task': 'redash.tasks.cleanup_query_results', + 'schedule': timedelta(minutes=5) + } + +celery_schedule.update(settings.dynamic_settings.custom_tasks()) celery.conf.update(result_backend=settings.CELERY_RESULT_BACKEND, timezone='UTC', From 965ad8aeb4d083c43fc02ecf5522e2dc0e310a54 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Tue, 27 Aug 2019 23:20:10 +0300 Subject: [PATCH 04/43] move version checks to RQ --- redash/worker.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/redash/worker.py b/redash/worker.py index 607250a250..3ce1cc2b5d 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -53,14 +53,6 @@ def get_job_logger(name): celery_schedule = { } -if settings.VERSION_CHECK: - celery_schedule['version_check'] = { - 'task': 'redash.tasks.version_check', - # We need to schedule the version check to run at a random hour/minute, to spread the requests from all users - # evenly. - 'schedule': crontab(minute=randint(0, 59), hour=randint(0, 23)) - } - if settings.QUERY_RESULTS_CLEANUP_ENABLED: celery_schedule['cleanup_query_results'] = { 'task': 'redash.tasks.cleanup_query_results', From 13c28910f00ce1131916877a599510b390506684 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Tue, 27 Aug 2019 23:24:27 +0300 Subject: [PATCH 05/43] move query result cleanup to RQ --- redash/tasks/queries/execution.py | 1 - redash/worker.py | 6 ------ 2 files changed, 7 deletions(-) diff --git a/redash/tasks/queries/execution.py b/redash/tasks/queries/execution.py index 4d7935265e..a91b2a55ba 100644 --- a/redash/tasks/queries/execution.py +++ b/redash/tasks/queries/execution.py @@ -228,7 +228,6 @@ def refresh_queries(): statsd_client.gauge('manager.seconds_since_refresh', now - float(status.get('last_refresh_at', now))) -@celery.task(name="redash.tasks.cleanup_query_results") def cleanup_query_results(): """ Job to cleanup unused query results -- such that no query links to them anymore, and older than diff --git a/redash/worker.py b/redash/worker.py index 3ce1cc2b5d..a0201a8ccb 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -53,12 +53,6 @@ def get_job_logger(name): celery_schedule = { } -if settings.QUERY_RESULTS_CLEANUP_ENABLED: - celery_schedule['cleanup_query_results'] = { - 'task': 'redash.tasks.cleanup_query_results', - 'schedule': timedelta(minutes=5) - } - celery_schedule.update(settings.dynamic_settings.custom_tasks()) celery.conf.update(result_backend=settings.CELERY_RESULT_BACKEND, From 2b070f311d366a2c3d301945bb07b12930c88ac1 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Wed, 28 Aug 2019 00:07:29 +0300 Subject: [PATCH 06/43] move custom tasks to RQ --- redash/worker.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/redash/worker.py b/redash/worker.py index a0201a8ccb..83ef34eeff 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -53,8 +53,6 @@ def get_job_logger(name): celery_schedule = { } -celery_schedule.update(settings.dynamic_settings.custom_tasks()) - celery.conf.update(result_backend=settings.CELERY_RESULT_BACKEND, timezone='UTC', result_expires=settings.CELERY_RESULT_EXPIRES, From 6479c1fb7843f9f2775839805e339ab255ee35e0 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Thu, 29 Aug 2019 14:51:32 +0300 Subject: [PATCH 07/43] do actual schema refreshes in rq --- redash/tasks/queries/execution.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/redash/tasks/queries/execution.py b/redash/tasks/queries/execution.py index a91b2a55ba..c8f7bee2d9 100644 --- a/redash/tasks/queries/execution.py +++ b/redash/tasks/queries/execution.py @@ -2,6 +2,7 @@ import signal import time import redis +from rq import Queue from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded from celery.result import AsyncResult from celery.utils.log import get_task_logger @@ -248,7 +249,6 @@ def cleanup_query_results(): logger.info("Deleted %d unused query results.", deleted_count) -@celery.task(name="redash.tasks.refresh_schema", time_limit=90, soft_time_limit=60) def refresh_schema(data_source_id): ds = models.DataSource.get_by_id(data_source_id) logger.info(u"task=refresh_schema state=start ds_id=%s", ds.id) @@ -283,7 +283,8 @@ def refresh_schemas(): elif ds.org.is_disabled: logger.info(u"task=refresh_schema state=skip ds_id=%s reason=org_disabled", ds.id) else: - refresh_schema.apply_async(args=(ds.id,), queue=settings.SCHEMAS_REFRESH_QUEUE) + q = Queue(settings.SCHEMAS_REFRESH_QUEUE, connection=redis_connection) + q.enqueue(refresh_schema, ds.id) logger.info(u"task=refresh_schemas state=finish total_runtime=%.2f", time.time() - global_start_time) From c3688337dca349f937e3b469708a621f32f56aae Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Thu, 29 Aug 2019 15:06:00 +0300 Subject: [PATCH 08/43] move send_email to rq --- redash/authentication/account.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/redash/authentication/account.py b/redash/authentication/account.py index c20b60aab2..e3fac79165 100644 --- a/redash/authentication/account.py +++ b/redash/authentication/account.py @@ -1,7 +1,8 @@ import logging from flask import render_template -from redash import settings +from rq import Queue +from redash import settings, redis_connection from redash.tasks import send_mail from redash.utils import base_url # noinspection PyUnresolvedReferences @@ -50,7 +51,8 @@ def send_verify_email(user, org): text_content = render_template('emails/verify.txt', **context) subject = "{}, please verify your email address".format(user.name) - send_mail.delay([user.email], subject, html_content, text_content) + q = Queue(connection=redis_connection) + q.enqueue(send_mail, [user.email], subject, html_content, text_content) def send_invite_email(inviter, invited, invite_url, org): @@ -59,7 +61,8 @@ def send_invite_email(inviter, invited, invite_url, org): text_content = render_template('emails/invite.txt', **context) subject = "{} invited you to join Redash".format(inviter.name) - send_mail.delay([invited.email], subject, html_content, text_content) + q = Queue(connection=redis_connection) + q.enqueue(send_mail, [invited.email], subject, html_content, text_content) def send_password_reset_email(user): @@ -69,7 +72,8 @@ def send_password_reset_email(user): text_content = render_template('emails/reset.txt', **context) subject = "Reset your password" - send_mail.delay([user.email], subject, html_content, text_content) + q = Queue(connection=redis_connection) + q.enqueue(send_mail, [user.email], subject, html_content, text_content) return reset_link @@ -78,4 +82,5 @@ def send_user_disabled_email(user): text_content = render_template('emails/reset_disabled.txt', user=user) subject = "Your Redash account is disabled" - send_mail.delay([user.email], subject, html_content, text_content) + q = Queue(connection=redis_connection) + q.enqueue(send_mail, [user.email], subject, html_content, text_content) From af94ea947161ff7864aadcabafac95f6ecec19c7 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Thu, 29 Aug 2019 15:32:18 +0300 Subject: [PATCH 09/43] DRY up enqueues --- redash/__init__.py | 5 +++++ redash/authentication/account.py | 15 +++++---------- redash/tasks/queries/execution.py | 4 +--- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/redash/__init__.py b/redash/__init__.py index 87ac101c34..afaa0eee24 100644 --- a/redash/__init__.py +++ b/redash/__init__.py @@ -4,6 +4,7 @@ import sys import redis +from rq import Queue from flask_mail import Mail from flask_limiter import Limiter from flask_limiter.util import get_ipaddr @@ -45,5 +46,9 @@ def setup_logging(): statsd_client = StatsClient(host=settings.STATSD_HOST, port=settings.STATSD_PORT, prefix=settings.STATSD_PREFIX) limiter = Limiter(key_func=get_ipaddr, storage_uri=settings.LIMITER_STORAGE) +def enqueue(queue, func, *args, **kwargs): + q = Queue(queue, connection=redis_connection) + return q.enqueue(func, *args, **kwargs) + import_query_runners(settings.QUERY_RUNNERS) import_destinations(settings.DESTINATIONS) diff --git a/redash/authentication/account.py b/redash/authentication/account.py index e3fac79165..22d431a498 100644 --- a/redash/authentication/account.py +++ b/redash/authentication/account.py @@ -1,8 +1,7 @@ import logging from flask import render_template -from rq import Queue -from redash import settings, redis_connection +from redash import settings, redis_connection, enqueue from redash.tasks import send_mail from redash.utils import base_url # noinspection PyUnresolvedReferences @@ -51,8 +50,7 @@ def send_verify_email(user, org): text_content = render_template('emails/verify.txt', **context) subject = "{}, please verify your email address".format(user.name) - q = Queue(connection=redis_connection) - q.enqueue(send_mail, [user.email], subject, html_content, text_content) + enqueue('default', send_mail, [user.email], subject, html_content, text_content) def send_invite_email(inviter, invited, invite_url, org): @@ -61,8 +59,7 @@ def send_invite_email(inviter, invited, invite_url, org): text_content = render_template('emails/invite.txt', **context) subject = "{} invited you to join Redash".format(inviter.name) - q = Queue(connection=redis_connection) - q.enqueue(send_mail, [invited.email], subject, html_content, text_content) + enqueue('default', send_mail, [invited.email], subject, html_content, text_content) def send_password_reset_email(user): @@ -72,8 +69,7 @@ def send_password_reset_email(user): text_content = render_template('emails/reset.txt', **context) subject = "Reset your password" - q = Queue(connection=redis_connection) - q.enqueue(send_mail, [user.email], subject, html_content, text_content) + enqueue('default', send_mail, [user.email], subject, html_content, text_content) return reset_link @@ -82,5 +78,4 @@ def send_user_disabled_email(user): text_content = render_template('emails/reset_disabled.txt', user=user) subject = "Your Redash account is disabled" - q = Queue(connection=redis_connection) - q.enqueue(send_mail, [user.email], subject, html_content, text_content) + enqueue('default', send_mail, [user.email], subject, html_content, text_content) diff --git a/redash/tasks/queries/execution.py b/redash/tasks/queries/execution.py index c8f7bee2d9..76227848c0 100644 --- a/redash/tasks/queries/execution.py +++ b/redash/tasks/queries/execution.py @@ -2,7 +2,6 @@ import signal import time import redis -from rq import Queue from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded from celery.result import AsyncResult from celery.utils.log import get_task_logger @@ -283,8 +282,7 @@ def refresh_schemas(): elif ds.org.is_disabled: logger.info(u"task=refresh_schema state=skip ds_id=%s reason=org_disabled", ds.id) else: - q = Queue(settings.SCHEMAS_REFRESH_QUEUE, connection=redis_connection) - q.enqueue(refresh_schema, ds.id) + enqueue(settings.SCHEMAS_REFRESH_QUEUE, refresh_schema, ds.id) logger.info(u"task=refresh_schemas state=finish total_runtime=%.2f", time.time() - global_start_time) From d9f6d31399b3ec6ba2483d4e3a7b3c95e017c7dc Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Thu, 29 Aug 2019 22:38:06 +0300 Subject: [PATCH 10/43] ditch and use a partially applied decorator --- redash/__init__.py | 4 ---- redash/authentication/account.py | 10 +++++----- redash/tasks/queries/execution.py | 4 ++-- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/redash/__init__.py b/redash/__init__.py index afaa0eee24..db737c39af 100644 --- a/redash/__init__.py +++ b/redash/__init__.py @@ -46,9 +46,5 @@ def setup_logging(): statsd_client = StatsClient(host=settings.STATSD_HOST, port=settings.STATSD_PORT, prefix=settings.STATSD_PREFIX) limiter = Limiter(key_func=get_ipaddr, storage_uri=settings.LIMITER_STORAGE) -def enqueue(queue, func, *args, **kwargs): - q = Queue(queue, connection=redis_connection) - return q.enqueue(func, *args, **kwargs) - import_query_runners(settings.QUERY_RUNNERS) import_destinations(settings.DESTINATIONS) diff --git a/redash/authentication/account.py b/redash/authentication/account.py index 22d431a498..d18a589acf 100644 --- a/redash/authentication/account.py +++ b/redash/authentication/account.py @@ -1,7 +1,7 @@ import logging from flask import render_template -from redash import settings, redis_connection, enqueue +from redash import settings, redis_connection from redash.tasks import send_mail from redash.utils import base_url # noinspection PyUnresolvedReferences @@ -50,7 +50,7 @@ def send_verify_email(user, org): text_content = render_template('emails/verify.txt', **context) subject = "{}, please verify your email address".format(user.name) - enqueue('default', send_mail, [user.email], subject, html_content, text_content) + send_mail.delay([user.email], subject, html_content, text_content) def send_invite_email(inviter, invited, invite_url, org): @@ -59,7 +59,7 @@ def send_invite_email(inviter, invited, invite_url, org): text_content = render_template('emails/invite.txt', **context) subject = "{} invited you to join Redash".format(inviter.name) - enqueue('default', send_mail, [invited.email], subject, html_content, text_content) + send_mail.delay([invited.email], subject, html_content, text_content) def send_password_reset_email(user): @@ -69,7 +69,7 @@ def send_password_reset_email(user): text_content = render_template('emails/reset.txt', **context) subject = "Reset your password" - enqueue('default', send_mail, [user.email], subject, html_content, text_content) + send_mail.delay([user.email], subject, html_content, text_content) return reset_link @@ -78,4 +78,4 @@ def send_user_disabled_email(user): text_content = render_template('emails/reset_disabled.txt', user=user) subject = "Your Redash account is disabled" - enqueue('default', send_mail, [user.email], subject, html_content, text_content) + send_mail.delay([user.email], subject, html_content, text_content) diff --git a/redash/tasks/queries/execution.py b/redash/tasks/queries/execution.py index 76227848c0..f127b72515 100644 --- a/redash/tasks/queries/execution.py +++ b/redash/tasks/queries/execution.py @@ -247,7 +247,7 @@ def cleanup_query_results(): models.db.session.commit() logger.info("Deleted %d unused query results.", deleted_count) - +@job(settings.SCHEMAS_REFRESH_QUEUE) def refresh_schema(data_source_id): ds = models.DataSource.get_by_id(data_source_id) logger.info(u"task=refresh_schema state=start ds_id=%s", ds.id) @@ -282,7 +282,7 @@ def refresh_schemas(): elif ds.org.is_disabled: logger.info(u"task=refresh_schema state=skip ds_id=%s reason=org_disabled", ds.id) else: - enqueue(settings.SCHEMAS_REFRESH_QUEUE, refresh_schema, ds.id) + refresh_schema.delay(ds.id) logger.info(u"task=refresh_schemas state=finish total_runtime=%.2f", time.time() - global_start_time) From 825798373560629224fe3da38d70b5c775a12f5d Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Thu, 29 Aug 2019 23:50:51 +0300 Subject: [PATCH 11/43] =?UTF-8?q?=F0=9F=91=8B=20beat?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- redash/worker.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/redash/worker.py b/redash/worker.py index 83ef34eeff..18da7de7a2 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -49,9 +49,6 @@ def get_job_logger(name): redis_backend_use_ssl=settings.CELERY_SSL_CONFIG, include='redash.tasks') -# The internal periodic Celery tasks to automatically schedule. -celery_schedule = { -} celery.conf.update(result_backend=settings.CELERY_RESULT_BACKEND, timezone='UTC', From c906b36ea2cf7afd8159df0e842251f9d23331ea Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Thu, 29 Aug 2019 23:57:03 +0300 Subject: [PATCH 12/43] rename rq_scheduler to plain scheduler, now that there's no Celery scheduler entrypoint --- bin/docker-entrypoint | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/docker-entrypoint b/bin/docker-entrypoint index b3460a2c3f..8f99684b64 100755 --- a/bin/docker-entrypoint +++ b/bin/docker-entrypoint @@ -34,7 +34,7 @@ rq_scheduler() { exec /app/manage.py rq scheduler } -dev_rq_scheduler() { +dev_scheduler() { echo "Starting dev RQ scheduler..." exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq scheduler From 6b72ff606ae8e56ba734cb6a5960c8e21eefa962 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sun, 1 Sep 2019 14:01:22 +0300 Subject: [PATCH 13/43] add logging context to rq jobs (while keeping execute_query context via get_task_logger for now) --- redash/tasks/queries/execution.py | 7 ++++--- redash/worker.py | 21 +++++++++++++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/redash/tasks/queries/execution.py b/redash/tasks/queries/execution.py index f127b72515..9cd7c71c5c 100644 --- a/redash/tasks/queries/execution.py +++ b/redash/tasks/queries/execution.py @@ -14,7 +14,7 @@ from redash.utils import gen_query_hash, json_dumps, utcnow from redash.worker import celery -logger = get_task_logger(__name__) +logger = get_job_logger(__name__) TIMEOUT_MESSAGE = "Query exceeded Redash query execution time limit." @@ -316,6 +316,7 @@ def _resolve_user(user_id, is_api_key, query_id): class QueryExecutor(object): def __init__(self, task, query, data_source_id, user_id, is_api_key, metadata, scheduled_query): + self.logger = get_task_logger(__name__) self.task = task self.query = query self.data_source_id = data_source_id @@ -335,7 +336,7 @@ def run(self): signal.signal(signal.SIGINT, signal_handler) started_at = time.time() - logger.debug("Executing query:\n%s", self.query) + self.logger.debug("Executing query:\n%s", self.query) self._log_progress('executing_query') query_runner = self.data_source.query_runner @@ -407,7 +408,7 @@ def _log_progress(self, state): self.metadata.get('Username', 'unknown')) def _load_data_source(self): - logger.info("task=execute_query state=load_ds ds_id=%d", self.data_source_id) + self.logger.info("task=execute_query state=load_ds ds_id=%d", self.data_source_id) return models.DataSource.query.get(self.data_source_id) diff --git a/redash/worker.py b/redash/worker.py index 18da7de7a2..c8d482980d 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -43,6 +43,27 @@ def get_job_logger(name): return logger +class CurrentJobFilter(logging.Filter): + def filter(self, record): + current_job = get_current_job() + + record.job_id = current_job.id if current_job else '' + record.job_description = current_job.description if current_job else '' + + return True + +def get_job_logger(name): + logger = logging.getLogger('rq.job.' + name) + + handler = logging.StreamHandler() + handler.formatter = logging.Formatter(settings.RQ_WORKER_JOB_LOG_FORMAT) + handler.addFilter(CurrentJobFilter()) + + logger.addHandler(handler) + logger.propagate = False + + return logger + celery = Celery('redash', broker=settings.CELERY_BROKER, broker_use_ssl=settings.CELERY_SSL_CONFIG, From 6073a41e23c55b5bc5fd784128c846600494b778 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Mon, 2 Sep 2019 00:30:00 +0300 Subject: [PATCH 14/43] move schedule to its own module --- redash/cli/rq.py | 1 - redash/scheduler.py | 48 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 redash/scheduler.py diff --git a/redash/cli/rq.py b/redash/cli/rq.py index 1749b969c4..c9a96d9001 100644 --- a/redash/cli/rq.py +++ b/redash/cli/rq.py @@ -6,7 +6,6 @@ from click import argument from flask.cli import AppGroup from rq import Connection, Worker -from rq_scheduler import Scheduler SECONDS = 1 MINUTES = 60 * SECONDS diff --git a/redash/scheduler.py b/redash/scheduler.py new file mode 100644 index 0000000000..9a48c51611 --- /dev/null +++ b/redash/scheduler.py @@ -0,0 +1,48 @@ +from __future__ import absolute_import +from datetime import datetime, timedelta +from functools import partial +from random import randint + +from rq_scheduler import Scheduler + +from redash import settings, redis_connection +from redash.tasks import (sync_user_details, refresh_queries, + empty_schedules, refresh_schemas, + cleanup_query_results, + version_check, send_aggregated_errors) + +rq_scheduler = Scheduler(connection=redis_connection, + queue_name="periodic", + interval=5) + + +def _schedule(func, **kwargs): + interval = kwargs['interval'] + if isinstance(interval, timedelta): + interval = interval.seconds + + kwargs['interval'] = interval + kwargs['result_ttl'] = kwargs.get('result_ttl', interval) + + rq_scheduler.schedule(scheduled_time=datetime.utcnow(), func=func, **kwargs) + + +def schedule_periodic_jobs(): + _schedule(refresh_queries, interval=30) + _schedule(empty_schedules, interval=timedelta(minutes=60)) + _schedule(refresh_schemas, interval=timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)) + _schedule(sync_user_details, timeout=60, ttl=45, interval=timedelta(minutes=1)) + _schedule(send_aggregated_errors, interval=timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL)) + + for (func, kwargs) in settings.dynamic_settings.custom_tasks().iteritems(): + _schedule(func, **kwargs) + + if settings.QUERY_RESULTS_CLEANUP_ENABLED: + _schedule(cleanup_query_results, interval=timedelta(minutes=5)) + + if settings.VERSION_CHECK: + # We need to schedule the version check to run at a random hour/minute, to spread the requests from all users evenly. + rq_scheduler.cron('{minute} {hour} * * *'.format( + minute=randint(0, 59), + hour=randint(0, 23)), + func=version_check) From a627080b8fda2d1c2b522cb84c883960238f56b0 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Mon, 2 Sep 2019 00:30:24 +0300 Subject: [PATCH 15/43] cancel previously scheduled periodic jobs. not sure this is a good idea. --- redash/scheduler.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/redash/scheduler.py b/redash/scheduler.py index 9a48c51611..5d7f894754 100644 --- a/redash/scheduler.py +++ b/redash/scheduler.py @@ -17,6 +17,10 @@ def _schedule(func, **kwargs): + previously_scheduled_jobs = filter(lambda job: job.func == func, rq_scheduler.get_jobs()) + for job in previously_scheduled_jobs: + rq_scheduler.cancel(job) + interval = kwargs['interval'] if isinstance(interval, timedelta): interval = interval.seconds From 15819205dc4309345fcbeec2141378b9107e6923 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Mon, 2 Sep 2019 00:33:48 +0300 Subject: [PATCH 16/43] rename redash.scheduler to redash.schedule --- redash/schedule.py | 40 ++++++++++++++++------------------ redash/scheduler.py | 52 --------------------------------------------- 2 files changed, 18 insertions(+), 74 deletions(-) delete mode 100644 redash/scheduler.py diff --git a/redash/schedule.py b/redash/schedule.py index 0a6b75fa0a..5d7f894754 100644 --- a/redash/schedule.py +++ b/redash/schedule.py @@ -5,52 +5,48 @@ from rq_scheduler import Scheduler -from redash import settings, rq_redis_connection +from redash import settings, redis_connection from redash.tasks import (sync_user_details, refresh_queries, empty_schedules, refresh_schemas, cleanup_query_results, version_check, send_aggregated_errors) -rq_scheduler = Scheduler(connection=rq_redis_connection, +rq_scheduler = Scheduler(connection=redis_connection, queue_name="periodic", interval=5) -def schedule(**kwargs): +def _schedule(func, **kwargs): + previously_scheduled_jobs = filter(lambda job: job.func == func, rq_scheduler.get_jobs()) + for job in previously_scheduled_jobs: + rq_scheduler.cancel(job) + interval = kwargs['interval'] if isinstance(interval, timedelta): interval = interval.seconds kwargs['interval'] = interval - kwargs['result_ttl'] = kwargs.get('result_ttl', interval * 2) + kwargs['result_ttl'] = kwargs.get('result_ttl', interval) - rq_scheduler.schedule(scheduled_time=datetime.utcnow(), **kwargs) + rq_scheduler.schedule(scheduled_time=datetime.utcnow(), func=func, **kwargs) def schedule_periodic_jobs(): - for job in rq_scheduler.get_jobs(): - job.delete() + _schedule(refresh_queries, interval=30) + _schedule(empty_schedules, interval=timedelta(minutes=60)) + _schedule(refresh_schemas, interval=timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)) + _schedule(sync_user_details, timeout=60, ttl=45, interval=timedelta(minutes=1)) + _schedule(send_aggregated_errors, interval=timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL)) - jobs = [ - {"func": refresh_queries, "interval": 30, "result_ttl": 600}, - {"func": empty_schedules, "interval": timedelta(minutes=60)}, - {"func": refresh_schemas, "interval": timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)}, - {"func": sync_user_details, "timeout": 60, "ttl": 45, "interval": timedelta(minutes=1)}, - {"func": send_aggregated_errors, "interval": timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL)} - ] + for (func, kwargs) in settings.dynamic_settings.custom_tasks().iteritems(): + _schedule(func, **kwargs) if settings.QUERY_RESULTS_CLEANUP_ENABLED: - jobs.append({"func": cleanup_query_results, "interval": timedelta(minutes=5)}) + _schedule(cleanup_query_results, interval=timedelta(minutes=5)) if settings.VERSION_CHECK: - # We schedule the version check to run at a random time in order to spread the requests from all users evenly. + # We need to schedule the version check to run at a random hour/minute, to spread the requests from all users evenly. rq_scheduler.cron('{minute} {hour} * * *'.format( minute=randint(0, 59), hour=randint(0, 23)), func=version_check) - - # Add your own custom periodic jobs in your dynamic_settings module. - jobs.extend(settings.dynamic_settings.periodic_jobs() or []) - - for job in jobs: - schedule(**job) diff --git a/redash/scheduler.py b/redash/scheduler.py deleted file mode 100644 index 5d7f894754..0000000000 --- a/redash/scheduler.py +++ /dev/null @@ -1,52 +0,0 @@ -from __future__ import absolute_import -from datetime import datetime, timedelta -from functools import partial -from random import randint - -from rq_scheduler import Scheduler - -from redash import settings, redis_connection -from redash.tasks import (sync_user_details, refresh_queries, - empty_schedules, refresh_schemas, - cleanup_query_results, - version_check, send_aggregated_errors) - -rq_scheduler = Scheduler(connection=redis_connection, - queue_name="periodic", - interval=5) - - -def _schedule(func, **kwargs): - previously_scheduled_jobs = filter(lambda job: job.func == func, rq_scheduler.get_jobs()) - for job in previously_scheduled_jobs: - rq_scheduler.cancel(job) - - interval = kwargs['interval'] - if isinstance(interval, timedelta): - interval = interval.seconds - - kwargs['interval'] = interval - kwargs['result_ttl'] = kwargs.get('result_ttl', interval) - - rq_scheduler.schedule(scheduled_time=datetime.utcnow(), func=func, **kwargs) - - -def schedule_periodic_jobs(): - _schedule(refresh_queries, interval=30) - _schedule(empty_schedules, interval=timedelta(minutes=60)) - _schedule(refresh_schemas, interval=timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)) - _schedule(sync_user_details, timeout=60, ttl=45, interval=timedelta(minutes=1)) - _schedule(send_aggregated_errors, interval=timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL)) - - for (func, kwargs) in settings.dynamic_settings.custom_tasks().iteritems(): - _schedule(func, **kwargs) - - if settings.QUERY_RESULTS_CLEANUP_ENABLED: - _schedule(cleanup_query_results, interval=timedelta(minutes=5)) - - if settings.VERSION_CHECK: - # We need to schedule the version check to run at a random hour/minute, to spread the requests from all users evenly. - rq_scheduler.cron('{minute} {hour} * * *'.format( - minute=randint(0, 59), - hour=randint(0, 23)), - func=version_check) From 67ff9e1fff5a1798bc7220a3ae4e03964fc56099 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Mon, 2 Sep 2019 12:26:45 +0300 Subject: [PATCH 17/43] allow custom dynamic jobs to be added decleratively --- redash/schedule.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/redash/schedule.py b/redash/schedule.py index 5d7f894754..2375bbc204 100644 --- a/redash/schedule.py +++ b/redash/schedule.py @@ -16,7 +16,7 @@ interval=5) -def _schedule(func, **kwargs): +def schedule(func, **kwargs): previously_scheduled_jobs = filter(lambda job: job.func == func, rq_scheduler.get_jobs()) for job in previously_scheduled_jobs: rq_scheduler.cancel(job) @@ -32,17 +32,14 @@ def _schedule(func, **kwargs): def schedule_periodic_jobs(): - _schedule(refresh_queries, interval=30) - _schedule(empty_schedules, interval=timedelta(minutes=60)) - _schedule(refresh_schemas, interval=timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)) - _schedule(sync_user_details, timeout=60, ttl=45, interval=timedelta(minutes=1)) - _schedule(send_aggregated_errors, interval=timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL)) - - for (func, kwargs) in settings.dynamic_settings.custom_tasks().iteritems(): - _schedule(func, **kwargs) + schedule(refresh_queries, interval=30) + schedule(empty_schedules, interval=timedelta(minutes=60)) + schedule(refresh_schemas, interval=timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)) + schedule(sync_user_details, timeout=60, ttl=45, interval=timedelta(minutes=1)) + schedule(send_aggregated_errors, interval=timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL)) if settings.QUERY_RESULTS_CLEANUP_ENABLED: - _schedule(cleanup_query_results, interval=timedelta(minutes=5)) + schedule(cleanup_query_results, interval=timedelta(minutes=5)) if settings.VERSION_CHECK: # We need to schedule the version check to run at a random hour/minute, to spread the requests from all users evenly. From 4116fe52aa47498ada8dc23a1e6defe382409755 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Tue, 3 Sep 2019 10:27:25 +0300 Subject: [PATCH 18/43] pleasing the CodeClimate overlords --- redash/schedule.py | 2 +- redash/tasks/queries/execution.py | 1 + redash/worker.py | 3 +++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/redash/schedule.py b/redash/schedule.py index 2375bbc204..369d38a66a 100644 --- a/redash/schedule.py +++ b/redash/schedule.py @@ -42,7 +42,7 @@ def schedule_periodic_jobs(): schedule(cleanup_query_results, interval=timedelta(minutes=5)) if settings.VERSION_CHECK: - # We need to schedule the version check to run at a random hour/minute, to spread the requests from all users evenly. + # We schedule the version check to run at a random time in order to spread the requests from all users evenly. rq_scheduler.cron('{minute} {hour} * * *'.format( minute=randint(0, 59), hour=randint(0, 23)), diff --git a/redash/tasks/queries/execution.py b/redash/tasks/queries/execution.py index 9cd7c71c5c..eb0ab72cd3 100644 --- a/redash/tasks/queries/execution.py +++ b/redash/tasks/queries/execution.py @@ -247,6 +247,7 @@ def cleanup_query_results(): models.db.session.commit() logger.info("Deleted %d unused query results.", deleted_count) + @job(settings.SCHEMAS_REFRESH_QUEUE) def refresh_schema(data_source_id): ds = models.DataSource.get_by_id(data_source_id) diff --git a/redash/worker.py b/redash/worker.py index c8d482980d..c7af457819 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -43,6 +43,7 @@ def get_job_logger(name): return logger + class CurrentJobFilter(logging.Filter): def filter(self, record): current_job = get_current_job() @@ -52,6 +53,7 @@ def filter(self, record): return True + def get_job_logger(name): logger = logging.getLogger('rq.job.' + name) @@ -64,6 +66,7 @@ def get_job_logger(name): return logger + celery = Celery('redash', broker=settings.CELERY_BROKER, broker_use_ssl=settings.CELERY_SSL_CONFIG, From a5adbf31718e0a49f3a0ed3db126dbef27027481 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Tue, 3 Sep 2019 12:13:54 +0300 Subject: [PATCH 19/43] adjust cypress docker-compose.yml to include rq changes --- .circleci/docker-compose.cypress.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/docker-compose.cypress.yml b/.circleci/docker-compose.cypress.yml index ffa4ca6960..dd3a79d5ec 100644 --- a/.circleci/docker-compose.cypress.yml +++ b/.circleci/docker-compose.cypress.yml @@ -16,7 +16,7 @@ services: REDASH_RATELIMIT_ENABLED: "false" scheduler: build: ../ - command: scheduler + command: worker depends_on: - server environment: From 97a3dacb927e5a2b57fabe4d9c9e4b52eaaf9309 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Tue, 3 Sep 2019 12:20:46 +0300 Subject: [PATCH 20/43] DRY up Cypress docker-compose --- .circleci/docker-compose.cypress.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/docker-compose.cypress.yml b/.circleci/docker-compose.cypress.yml index dd3a79d5ec..3fcd666bdf 100644 --- a/.circleci/docker-compose.cypress.yml +++ b/.circleci/docker-compose.cypress.yml @@ -1,4 +1,4 @@ -version: '3' +version: '3.2' services: server: build: ../ From 9d507383ffc6c19a7f01a85c21f2ffe436246d07 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Wed, 4 Sep 2019 10:41:55 +0300 Subject: [PATCH 21/43] an odd attempt at watching docker-compose logs when running with Cypress --- client/cypress/cypress.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/cypress/cypress.js b/client/cypress/cypress.js index 1410228b5c..de46306bb3 100644 --- a/client/cypress/cypress.js +++ b/client/cypress/cypress.js @@ -23,8 +23,8 @@ function startServer() { console.log('Starting the server...'); execSync('docker-compose -p cypress build --build-arg skip_ds_deps=true', { stdio: 'inherit' }); - execSync('docker-compose -p cypress up -d', { stdio: 'inherit' }); - execSync('docker-compose -p cypress run server create_db', { stdio: 'inherit' }); + execSync('docker-compose -p cypress up &', { stdio: 'inherit' }); + execSync('sleep 1 && docker-compose -p cypress run server create_db', { stdio: 'inherit' }); } function stopServer() { From 3c4d25c644824651d580bee1b6cc55e60e5ef483 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Wed, 4 Sep 2019 10:59:03 +0300 Subject: [PATCH 22/43] Revert "an odd attempt at watching docker-compose logs when running with Cypress" This reverts commit 016bd1a93e3efa84a9f27d0f2acb972ce1957bcd. --- client/cypress/cypress.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/cypress/cypress.js b/client/cypress/cypress.js index de46306bb3..1410228b5c 100644 --- a/client/cypress/cypress.js +++ b/client/cypress/cypress.js @@ -23,8 +23,8 @@ function startServer() { console.log('Starting the server...'); execSync('docker-compose -p cypress build --build-arg skip_ds_deps=true', { stdio: 'inherit' }); - execSync('docker-compose -p cypress up &', { stdio: 'inherit' }); - execSync('sleep 1 && docker-compose -p cypress run server create_db', { stdio: 'inherit' }); + execSync('docker-compose -p cypress up -d', { stdio: 'inherit' }); + execSync('docker-compose -p cypress run server create_db', { stdio: 'inherit' }); } function stopServer() { From f714dd67f828348773dd53918fd015c6e3daf07f Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Wed, 4 Sep 2019 10:59:32 +0300 Subject: [PATCH 23/43] show docker-compose logs at Cypress shutdown --- client/cypress/cypress.js | 1 + 1 file changed, 1 insertion(+) diff --git a/client/cypress/cypress.js b/client/cypress/cypress.js index 1410228b5c..e34a41ae00 100644 --- a/client/cypress/cypress.js +++ b/client/cypress/cypress.js @@ -28,6 +28,7 @@ function startServer() { } function stopServer() { + execSync('docker-compose -p cypress logs', { stdio: 'inherit' }); console.log('Stopping the server...'); execSync('docker-compose -p cypress down', { stdio: 'inherit' }); } From 6c40c4e2ef8095ab99b8cc5abb93fff726283e99 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sun, 8 Sep 2019 13:05:56 +0300 Subject: [PATCH 24/43] Revert "DRY up Cypress docker-compose" This reverts commit 43abac7084c207ab9e39192ac79d520448c2c527. --- .circleci/docker-compose.cypress.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/docker-compose.cypress.yml b/.circleci/docker-compose.cypress.yml index 3fcd666bdf..dd3a79d5ec 100644 --- a/.circleci/docker-compose.cypress.yml +++ b/.circleci/docker-compose.cypress.yml @@ -1,4 +1,4 @@ -version: '3.2' +version: '3' services: server: build: ../ From 77df227bcf49f4f24556129ddc25c2eb9b1cbe0e Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sun, 8 Sep 2019 13:17:49 +0300 Subject: [PATCH 25/43] minimal version for binding is 3.2 --- .circleci/docker-compose.cypress.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/docker-compose.cypress.yml b/.circleci/docker-compose.cypress.yml index dd3a79d5ec..3fcd666bdf 100644 --- a/.circleci/docker-compose.cypress.yml +++ b/.circleci/docker-compose.cypress.yml @@ -1,4 +1,4 @@ -version: '3' +version: '3.2' services: server: build: ../ From 0540472c6dd69226b29f6958cfdd4f81df74f14f Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sun, 8 Sep 2019 13:41:18 +0300 Subject: [PATCH 26/43] remove unneccesary code reloads on cypress --- .circleci/docker-compose.cypress.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/docker-compose.cypress.yml b/.circleci/docker-compose.cypress.yml index 3fcd666bdf..dd3a79d5ec 100644 --- a/.circleci/docker-compose.cypress.yml +++ b/.circleci/docker-compose.cypress.yml @@ -1,4 +1,4 @@ -version: '3.2' +version: '3' services: server: build: ../ From ec5afb4ee611704ff5a85d97752ccb9d545558d7 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Mon, 9 Sep 2019 09:37:26 +0300 Subject: [PATCH 27/43] SCHEMAS_REFRESH_QUEUE is no longer a required setting --- redash/tasks/queries/execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redash/tasks/queries/execution.py b/redash/tasks/queries/execution.py index eb0ab72cd3..f446fa8312 100644 --- a/redash/tasks/queries/execution.py +++ b/redash/tasks/queries/execution.py @@ -248,7 +248,7 @@ def cleanup_query_results(): logger.info("Deleted %d unused query results.", deleted_count) -@job(settings.SCHEMAS_REFRESH_QUEUE) +@job('schemas') def refresh_schema(data_source_id): ds = models.DataSource.get_by_id(data_source_id) logger.info(u"task=refresh_schema state=start ds_id=%s", ds.id) From 814110f5ecfeb88eea24acc4f1dcf783d639977b Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Mon, 9 Sep 2019 09:49:54 +0300 Subject: [PATCH 28/43] split tasks/queries.py to execution.py and maintenance.py --- redash/tasks/queries/execution.py | 135 +----------------------------- 1 file changed, 3 insertions(+), 132 deletions(-) diff --git a/redash/tasks/queries/execution.py b/redash/tasks/queries/execution.py index f446fa8312..eead672319 100644 --- a/redash/tasks/queries/execution.py +++ b/redash/tasks/queries/execution.py @@ -14,7 +14,7 @@ from redash.utils import gen_query_hash, json_dumps, utcnow from redash.worker import celery -logger = get_job_logger(__name__) +logger = get_task_logger(__name__) TIMEOUT_MESSAGE = "Query exceeded Redash query execution time limit." @@ -160,134 +160,6 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query return job -def empty_schedules(): - logger.info("Deleting schedules of past scheduled queries...") - - queries = models.Query.past_scheduled_queries() - for query in queries: - query.schedule = None - models.db.session.commit() - - logger.info("Deleted %d schedules.", len(queries)) - - -def refresh_queries(): - logger.info("Refreshing queries...") - - outdated_queries_count = 0 - query_ids = [] - - with statsd_client.timer('manager.outdated_queries_lookup'): - for query in models.Query.outdated_queries(): - if settings.FEATURE_DISABLE_REFRESH_QUERIES: - logging.info("Disabled refresh queries.") - elif query.org.is_disabled: - logging.debug("Skipping refresh of %s because org is disabled.", query.id) - elif query.data_source is None: - logging.debug("Skipping refresh of %s because the datasource is none.", query.id) - elif query.data_source.paused: - logging.debug("Skipping refresh of %s because datasource - %s is paused (%s).", - query.id, query.data_source.name, query.data_source.pause_reason) - else: - query_text = query.query_text - - parameters = {p['name']: p.get('value') for p in query.parameters} - if any(parameters): - try: - query_text = query.parameterized.apply(parameters).query - except InvalidParameterError as e: - error = u"Skipping refresh of {} because of invalid parameters: {}".format(query.id, e.message) - track_failure(query, error) - continue - except QueryDetachedFromDataSourceError as e: - error = ("Skipping refresh of {} because a related dropdown " - "query ({}) is unattached to any datasource.").format(query.id, e.query_id) - track_failure(query, error) - continue - - enqueue_query(query_text, query.data_source, query.user_id, - scheduled_query=query, - metadata={'Query ID': query.id, 'Username': 'Scheduled'}) - - query_ids.append(query.id) - outdated_queries_count += 1 - - statsd_client.gauge('manager.outdated_queries', outdated_queries_count) - - logger.info("Done refreshing queries. Found %d outdated queries: %s" % (outdated_queries_count, query_ids)) - - status = redis_connection.hgetall('redash:status') - now = time.time() - - redis_connection.hmset('redash:status', { - 'outdated_queries_count': outdated_queries_count, - 'last_refresh_at': now, - 'query_ids': json_dumps(query_ids) - }) - - statsd_client.gauge('manager.seconds_since_refresh', now - float(status.get('last_refresh_at', now))) - - -def cleanup_query_results(): - """ - Job to cleanup unused query results -- such that no query links to them anymore, and older than - settings.QUERY_RESULTS_MAX_AGE (a week by default, so it's less likely to be open in someone's browser and be used). - - Each time the job deletes only settings.QUERY_RESULTS_CLEANUP_COUNT (100 by default) query results so it won't choke - the database in case of many such results. - """ - - logging.info("Running query results clean up (removing maximum of %d unused results, that are %d days old or more)", - settings.QUERY_RESULTS_CLEANUP_COUNT, settings.QUERY_RESULTS_CLEANUP_MAX_AGE) - - unused_query_results = models.QueryResult.unused(settings.QUERY_RESULTS_CLEANUP_MAX_AGE).limit(settings.QUERY_RESULTS_CLEANUP_COUNT) - deleted_count = models.QueryResult.query.filter( - models.QueryResult.id.in_(unused_query_results.subquery()) - ).delete(synchronize_session=False) - models.db.session.commit() - logger.info("Deleted %d unused query results.", deleted_count) - - -@job('schemas') -def refresh_schema(data_source_id): - ds = models.DataSource.get_by_id(data_source_id) - logger.info(u"task=refresh_schema state=start ds_id=%s", ds.id) - start_time = time.time() - try: - ds.get_schema(refresh=True) - logger.info(u"task=refresh_schema state=finished ds_id=%s runtime=%.2f", ds.id, time.time() - start_time) - statsd_client.incr('refresh_schema.success') - except SoftTimeLimitExceeded: - logger.info(u"task=refresh_schema state=timeout ds_id=%s runtime=%.2f", ds.id, time.time() - start_time) - statsd_client.incr('refresh_schema.timeout') - except Exception: - logger.warning(u"Failed refreshing schema for the data source: %s", ds.name, exc_info=1) - statsd_client.incr('refresh_schema.error') - logger.info(u"task=refresh_schema state=failed ds_id=%s runtime=%.2f", ds.id, time.time() - start_time) - - -def refresh_schemas(): - """ - Refreshes the data sources schemas. - """ - blacklist = [int(ds_id) for ds_id in redis_connection.smembers('data_sources:schema:blacklist') if ds_id] - global_start_time = time.time() - - logger.info(u"task=refresh_schemas state=start") - - for ds in models.DataSource.query: - if ds.paused: - logger.info(u"task=refresh_schema state=skip ds_id=%s reason=paused(%s)", ds.id, ds.pause_reason) - elif ds.id in blacklist: - logger.info(u"task=refresh_schema state=skip ds_id=%s reason=blacklist", ds.id) - elif ds.org.is_disabled: - logger.info(u"task=refresh_schema state=skip ds_id=%s reason=org_disabled", ds.id) - else: - refresh_schema.delay(ds.id) - - logger.info(u"task=refresh_schemas state=finish total_runtime=%.2f", time.time() - global_start_time) - - def signal_handler(*args): raise InterruptException @@ -317,7 +189,6 @@ def _resolve_user(user_id, is_api_key, query_id): class QueryExecutor(object): def __init__(self, task, query, data_source_id, user_id, is_api_key, metadata, scheduled_query): - self.logger = get_task_logger(__name__) self.task = task self.query = query self.data_source_id = data_source_id @@ -337,7 +208,7 @@ def run(self): signal.signal(signal.SIGINT, signal_handler) started_at = time.time() - self.logger.debug("Executing query:\n%s", self.query) + logger.debug("Executing query:\n%s", self.query) self._log_progress('executing_query') query_runner = self.data_source.query_runner @@ -409,7 +280,7 @@ def _log_progress(self, state): self.metadata.get('Username', 'unknown')) def _load_data_source(self): - self.logger.info("task=execute_query state=load_ds ds_id=%d", self.data_source_id) + logger.info("task=execute_query state=load_ds ds_id=%d", self.data_source_id) return models.DataSource.query.get(self.data_source_id) From f7fa7dc3a8f83bd542303af2bd78d67664717baa Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Mon, 9 Sep 2019 10:29:27 +0300 Subject: [PATCH 29/43] rename worker to celery_worker and rq_worker to worker --- bin/docker-entrypoint | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/docker-entrypoint b/bin/docker-entrypoint index 8f99684b64..09114c1d0d 100755 --- a/bin/docker-entrypoint +++ b/bin/docker-entrypoint @@ -46,7 +46,7 @@ rq_worker() { exec /app/manage.py rq worker $QUEUES } -dev_rq_worker() { +dev_worker() { echo "Starting dev RQ worker..." exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq worker $QUEUES From 47c5287ed8ca1d4b03c9a72c4df1780dc11e8b8a Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Thu, 12 Sep 2019 10:04:58 +0300 Subject: [PATCH 30/43] delete all existing periodic jobs before scheduling them --- redash/schedule.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/redash/schedule.py b/redash/schedule.py index 369d38a66a..c94ea5d275 100644 --- a/redash/schedule.py +++ b/redash/schedule.py @@ -16,11 +16,7 @@ interval=5) -def schedule(func, **kwargs): - previously_scheduled_jobs = filter(lambda job: job.func == func, rq_scheduler.get_jobs()) - for job in previously_scheduled_jobs: - rq_scheduler.cancel(job) - +def schedule(**kwargs): interval = kwargs['interval'] if isinstance(interval, timedelta): interval = interval.seconds @@ -28,18 +24,21 @@ def schedule(func, **kwargs): kwargs['interval'] = interval kwargs['result_ttl'] = kwargs.get('result_ttl', interval) - rq_scheduler.schedule(scheduled_time=datetime.utcnow(), func=func, **kwargs) + rq_scheduler.schedule(scheduled_time=datetime.utcnow(), **kwargs) def schedule_periodic_jobs(): - schedule(refresh_queries, interval=30) - schedule(empty_schedules, interval=timedelta(minutes=60)) - schedule(refresh_schemas, interval=timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)) - schedule(sync_user_details, timeout=60, ttl=45, interval=timedelta(minutes=1)) - schedule(send_aggregated_errors, interval=timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL)) + for job in rq_scheduler.get_jobs(): + job.delete() + + schedule(func=refresh_queries, interval=30) + schedule(func=empty_schedules, interval=timedelta(minutes=60)) + schedule(func=refresh_schemas, interval=timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)) + schedule(func=sync_user_details, timeout=60, ttl=45, interval=timedelta(minutes=1)) + schedule(func=send_aggregated_errors, interval=timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL)) if settings.QUERY_RESULTS_CLEANUP_ENABLED: - schedule(cleanup_query_results, interval=timedelta(minutes=5)) + schedule(func=cleanup_query_results, interval=timedelta(minutes=5)) if settings.VERSION_CHECK: # We schedule the version check to run at a random time in order to spread the requests from all users evenly. From c3d43c7af141780191f1f71feeab90976bbaef4d Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sun, 22 Sep 2019 11:04:55 +0300 Subject: [PATCH 31/43] remove some unrequired requires --- client/cypress/cypress.js | 1 - redash/__init__.py | 1 - redash/authentication/account.py | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/client/cypress/cypress.js b/client/cypress/cypress.js index e34a41ae00..1410228b5c 100644 --- a/client/cypress/cypress.js +++ b/client/cypress/cypress.js @@ -28,7 +28,6 @@ function startServer() { } function stopServer() { - execSync('docker-compose -p cypress logs', { stdio: 'inherit' }); console.log('Stopping the server...'); execSync('docker-compose -p cypress down', { stdio: 'inherit' }); } diff --git a/redash/__init__.py b/redash/__init__.py index db737c39af..87ac101c34 100644 --- a/redash/__init__.py +++ b/redash/__init__.py @@ -4,7 +4,6 @@ import sys import redis -from rq import Queue from flask_mail import Mail from flask_limiter import Limiter from flask_limiter.util import get_ipaddr diff --git a/redash/authentication/account.py b/redash/authentication/account.py index d18a589acf..c20b60aab2 100644 --- a/redash/authentication/account.py +++ b/redash/authentication/account.py @@ -1,7 +1,7 @@ import logging from flask import render_template -from redash import settings, redis_connection +from redash import settings from redash.tasks import send_mail from redash.utils import base_url # noinspection PyUnresolvedReferences From a16e392c230f7b578189d27297880af11f959567 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sun, 22 Sep 2019 12:14:14 +0300 Subject: [PATCH 32/43] move schedule example to redash.schedule --- redash/schedule.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/redash/schedule.py b/redash/schedule.py index c94ea5d275..b32dce8f19 100644 --- a/redash/schedule.py +++ b/redash/schedule.py @@ -17,6 +17,14 @@ def schedule(**kwargs): + """Schedule a job to run periodically. + A good place to provide any custom jobs you'd like to run periodically is your `dynamic_settings` module, for example: + + from datetime import timedelta + from redash.schedule import schedule + + schedule(func=my_func, interval=timedelta(minutes=5)) + """ interval = kwargs['interval'] if isinstance(interval, timedelta): interval = interval.seconds From 2aae335ec5bb685e7670559a71e77bac021c3f89 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sun, 22 Sep 2019 13:24:15 +0300 Subject: [PATCH 33/43] pleasing the CodeClimate overlords --- redash/schedule.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/redash/schedule.py b/redash/schedule.py index b32dce8f19..1d0dc5fa78 100644 --- a/redash/schedule.py +++ b/redash/schedule.py @@ -18,11 +18,11 @@ def schedule(**kwargs): """Schedule a job to run periodically. - A good place to provide any custom jobs you'd like to run periodically is your `dynamic_settings` module, for example: - + A good place to provide custom jobs you'd like to run periodically is your `dynamic_settings` module, for example: + from datetime import timedelta from redash.schedule import schedule - + schedule(func=my_func, interval=timedelta(minutes=5)) """ interval = kwargs['interval'] From 0be56a5db4d6cb2729e3c169d96d7f5e8443511c Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sun, 6 Oct 2019 10:57:39 +0300 Subject: [PATCH 34/43] revert to calling a function in dynamic settings to allow periodic jobs to be scheduled after app has been loaded --- redash/schedule.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/redash/schedule.py b/redash/schedule.py index 1d0dc5fa78..2058fc88ea 100644 --- a/redash/schedule.py +++ b/redash/schedule.py @@ -17,14 +17,6 @@ def schedule(**kwargs): - """Schedule a job to run periodically. - A good place to provide custom jobs you'd like to run periodically is your `dynamic_settings` module, for example: - - from datetime import timedelta - from redash.schedule import schedule - - schedule(func=my_func, interval=timedelta(minutes=5)) - """ interval = kwargs['interval'] if isinstance(interval, timedelta): interval = interval.seconds @@ -54,3 +46,6 @@ def schedule_periodic_jobs(): minute=randint(0, 59), hour=randint(0, 23)), func=version_check) + + # Add your own custom periodic jobs in your dynamic_settings module. + settings.dynamic_settings.schedule_periodic_jobs(schedule) From 5c499c84b4367c5ee6a6e206695d28596602a963 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Thu, 10 Oct 2019 12:37:17 +0300 Subject: [PATCH 35/43] set the timeout_ttl to double the interval to avoid job results from expiring and having periodic jobs not reschedule --- redash/schedule.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redash/schedule.py b/redash/schedule.py index 2058fc88ea..d500548f9d 100644 --- a/redash/schedule.py +++ b/redash/schedule.py @@ -22,7 +22,7 @@ def schedule(**kwargs): interval = interval.seconds kwargs['interval'] = interval - kwargs['result_ttl'] = kwargs.get('result_ttl', interval) + kwargs['result_ttl'] = kwargs.get('result_ttl', interval * 2) rq_scheduler.schedule(scheduled_time=datetime.utcnow(), **kwargs) From 9f5a68e6a5b0f8e0e3b896bd345cebb3f3577630 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Thu, 10 Oct 2019 15:49:07 +0300 Subject: [PATCH 36/43] a naive way to launch multiple workers in one container --- bin/docker-entrypoint | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/bin/docker-entrypoint b/bin/docker-entrypoint index 09114c1d0d..e7795c8394 100755 --- a/bin/docker-entrypoint +++ b/bin/docker-entrypoint @@ -23,9 +23,15 @@ dev_scheduler() { } worker() { - echo "Starting RQ worker..." + WORKERS_COUNT=${WORKERS_COUNT:-2} + echo "Starting $WORKERS_COUNT RQ workers for queues $QUEUES..." - exec /app/manage.py rq worker $QUEUES + for i in `seq 1 $WORKERS_COUNT`; + do + exec /app/manage.py rq worker $QUEUES & + done + + sleep infinity } rq_scheduler() { From 1e44681c56c514ff40bd964a7900bba037c0902d Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Thu, 10 Oct 2019 15:49:24 +0300 Subject: [PATCH 37/43] updated and less brittle healthcheck --- redash/cli/rq.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/redash/cli/rq.py b/redash/cli/rq.py index c9a96d9001..eab365489c 100644 --- a/redash/cli/rq.py +++ b/redash/cli/rq.py @@ -39,8 +39,31 @@ def healthcheck(): all_workers = Worker.all() local_workers = [w for w in all_workers if w.hostname == hostname] + row_format ="{:>10}" * (len(local_workers) + 1) + + print("Local worker PIDs:") + local_worker_pids = set([w.pid for w in local_workers]) + print(row_format.format("", *local_worker_pids)) + + print("Time since seen:") heartbeats = [w.last_heartbeat for w in local_workers] time_since_seen = [datetime.datetime.utcnow() - hb for hb in heartbeats] - active = [t.seconds < 60 for t in time_since_seen] + print(row_format.format("", *[t.seconds for t in time_since_seen])) + seen_lately = [t.seconds < 60 for t in time_since_seen] + + print("State:") + states = [w.state for w in local_workers] + print(row_format.format("", *states)) + busy = [s == "busy" for s in states] + + print("Jobs in queues:") + jobs_in_queues = [sum([len(q.jobs) for q in w.queues]) for w in local_workers] + print(row_format.format("", *jobs_in_queues)) + has_nothing_to_do = [j == 0 for j in jobs_in_queues] + + print("Healty:") + # a healthy worker is either busy, has been seen lately or has nothing to do + healthy = [any(w) for w in zip(busy, seen_lately, has_nothing_to_do)] + print(row_format.format("", *healthy)) - sys.exit(int(not all(active))) + sys.exit(int(not all(healthy))) From 7479493cb76c7311d9653967e1794b75d2d14405 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Tue, 15 Oct 2019 21:27:17 +0300 Subject: [PATCH 38/43] describe custom jobs and don't actually schedule them --- redash/schedule.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/redash/schedule.py b/redash/schedule.py index d500548f9d..43610a955b 100644 --- a/redash/schedule.py +++ b/redash/schedule.py @@ -31,14 +31,16 @@ def schedule_periodic_jobs(): for job in rq_scheduler.get_jobs(): job.delete() - schedule(func=refresh_queries, interval=30) - schedule(func=empty_schedules, interval=timedelta(minutes=60)) - schedule(func=refresh_schemas, interval=timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)) - schedule(func=sync_user_details, timeout=60, ttl=45, interval=timedelta(minutes=1)) - schedule(func=send_aggregated_errors, interval=timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL)) + jobs = [ + {"func": refresh_queries, "interval": 30}, + {"func": empty_schedules, "interval": timedelta(minutes=60)}, + {"func": refresh_schemas, "interval": timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)}, + {"func": sync_user_details, "timeout": 60, "ttl": 45, "interval": timedelta(minutes=1)}, + {"func": send_aggregated_errors, "interval": timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL)} + ] if settings.QUERY_RESULTS_CLEANUP_ENABLED: - schedule(func=cleanup_query_results, interval=timedelta(minutes=5)) + jobs.append({"func": cleanup_query_results, "interval": timedelta(minutes=5)}) if settings.VERSION_CHECK: # We schedule the version check to run at a random time in order to spread the requests from all users evenly. @@ -48,4 +50,7 @@ def schedule_periodic_jobs(): func=version_check) # Add your own custom periodic jobs in your dynamic_settings module. - settings.dynamic_settings.schedule_periodic_jobs(schedule) + jobs.extend(settings.dynamic_settings.periodic_jobs() or []) + + for job in jobs: + schedule(**job) From 91a2effd3757c9aaa81c1f3ff19ec60ebee5395d Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sun, 27 Oct 2019 14:06:14 +0200 Subject: [PATCH 39/43] launch multiple workers with Honcho --- bin/docker-entrypoint | 7 ++++--- redash/cli/rq.py | 16 +++++++++++++++- requirements.txt | 1 + 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/bin/docker-entrypoint b/bin/docker-entrypoint index e7795c8394..3b24110ea6 100755 --- a/bin/docker-entrypoint +++ b/bin/docker-entrypoint @@ -49,13 +49,14 @@ dev_scheduler() { rq_worker() { echo "Starting RQ worker..." - exec /app/manage.py rq worker $QUEUES + exec /app/manage.py rq workers $WORKERS_COUNT $QUEUES } dev_worker() { - echo "Starting dev RQ worker..." + WORKERS_COUNT=${WORKERS_COUNT:-2} + echo "Starting $WORKERS_COUNT dev RQ workers for queues: $QUEUES..." - exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq worker $QUEUES + exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq workers $WORKERS_COUNT $QUEUES } dev_worker() { diff --git a/redash/cli/rq.py b/redash/cli/rq.py index eab365489c..249aed531c 100644 --- a/redash/cli/rq.py +++ b/redash/cli/rq.py @@ -3,6 +3,7 @@ import sys import datetime +from honcho.manager import Manager from click import argument from flask.cli import AppGroup from rq import Connection, Worker @@ -22,6 +23,19 @@ def scheduler(): rq_scheduler.run() +@manager.command() +@argument('workers_count') +@argument('queues', nargs=-1) +def workers(workers_count=1, queues='default'): + m = Manager() + for i in range(int(workers_count)): + cmd = './manage.py rq worker {}'.format(" ".join(queues)) + m.add_process('worker #{}'.format(i+1), cmd) + + m.loop() + sys.exit(m.returncode) + + @manager.command() @argument('queues', nargs=-1) def worker(queues='default'): @@ -38,7 +52,7 @@ def healthcheck(): with Connection(rq_redis_connection): all_workers = Worker.all() - local_workers = [w for w in all_workers if w.hostname == hostname] + local_workers = [w for w in all_workers if w.hostname.decode() == hostname] row_format ="{:>10}" * (len(local_workers) + 1) print("Local worker PIDs:") diff --git a/requirements.txt b/requirements.txt index 771c2e5c60..3fa3b0d1d9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -59,6 +59,7 @@ chromelogger==0.4.3 pypd==1.1.0 disposable-email-domains>=0.0.52 gevent==1.4.0 +honcho==1.0.1 # Install the dependencies of the bin/bundle-extensions script here. # It has its own requirements file to simplify the frontend client build process -r requirements_bundles.txt From d0e3721215d1d3ad5e42e903fc4afa27eb2c9bed Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sun, 27 Oct 2019 22:21:27 +0200 Subject: [PATCH 40/43] fix my faulty rebase --- .circleci/docker-compose.cypress.yml | 2 +- bin/docker-entrypoint | 29 ---------------------------- redash/cli/rq.py | 5 +---- redash/schedule.py | 6 +++--- redash/worker.py | 24 ----------------------- 5 files changed, 5 insertions(+), 61 deletions(-) diff --git a/.circleci/docker-compose.cypress.yml b/.circleci/docker-compose.cypress.yml index dd3a79d5ec..ffa4ca6960 100644 --- a/.circleci/docker-compose.cypress.yml +++ b/.circleci/docker-compose.cypress.yml @@ -16,7 +16,7 @@ services: REDASH_RATELIMIT_ENABLED: "false" scheduler: build: ../ - command: worker + command: scheduler depends_on: - server environment: diff --git a/bin/docker-entrypoint b/bin/docker-entrypoint index 3b24110ea6..b9d1dc9025 100755 --- a/bin/docker-entrypoint +++ b/bin/docker-entrypoint @@ -26,29 +26,6 @@ worker() { WORKERS_COUNT=${WORKERS_COUNT:-2} echo "Starting $WORKERS_COUNT RQ workers for queues $QUEUES..." - for i in `seq 1 $WORKERS_COUNT`; - do - exec /app/manage.py rq worker $QUEUES & - done - - sleep infinity -} - -rq_scheduler() { - echo "Starting RQ scheduler..." - - exec /app/manage.py rq scheduler -} - -dev_scheduler() { - echo "Starting dev RQ scheduler..." - - exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq scheduler -} - -rq_worker() { - echo "Starting RQ worker..." - exec /app/manage.py rq workers $WORKERS_COUNT $QUEUES } @@ -59,12 +36,6 @@ dev_worker() { exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq workers $WORKERS_COUNT $QUEUES } -dev_worker() { - echo "Starting dev RQ worker..." - - exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq worker $QUEUES -} - dev_celery_worker() { WORKERS_COUNT=${WORKERS_COUNT:-2} QUEUES=${QUEUES:-queries,scheduled_queries} diff --git a/redash/cli/rq.py b/redash/cli/rq.py index 249aed531c..d7ec7dcd2e 100644 --- a/redash/cli/rq.py +++ b/redash/cli/rq.py @@ -8,9 +8,6 @@ from flask.cli import AppGroup from rq import Connection, Worker -SECONDS = 1 -MINUTES = 60 * SECONDS - from redash import rq_redis_connection from redash.schedule import rq_scheduler, schedule_periodic_jobs @@ -31,7 +28,7 @@ def workers(workers_count=1, queues='default'): for i in range(int(workers_count)): cmd = './manage.py rq worker {}'.format(" ".join(queues)) m.add_process('worker #{}'.format(i+1), cmd) - + m.loop() sys.exit(m.returncode) diff --git a/redash/schedule.py b/redash/schedule.py index 43610a955b..0a6b75fa0a 100644 --- a/redash/schedule.py +++ b/redash/schedule.py @@ -5,13 +5,13 @@ from rq_scheduler import Scheduler -from redash import settings, redis_connection +from redash import settings, rq_redis_connection from redash.tasks import (sync_user_details, refresh_queries, empty_schedules, refresh_schemas, cleanup_query_results, version_check, send_aggregated_errors) -rq_scheduler = Scheduler(connection=redis_connection, +rq_scheduler = Scheduler(connection=rq_redis_connection, queue_name="periodic", interval=5) @@ -32,7 +32,7 @@ def schedule_periodic_jobs(): job.delete() jobs = [ - {"func": refresh_queries, "interval": 30}, + {"func": refresh_queries, "interval": 30, "result_ttl": 600}, {"func": empty_schedules, "interval": timedelta(minutes=60)}, {"func": refresh_schemas, "interval": timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)}, {"func": sync_user_details, "timeout": 60, "ttl": 45, "interval": timedelta(minutes=1)}, diff --git a/redash/worker.py b/redash/worker.py index c7af457819..18da7de7a2 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -20,30 +20,6 @@ job = partial(rq_job, connection=rq_redis_connection) -class CurrentJobFilter(logging.Filter): - def filter(self, record): - current_job = get_current_job() - - record.job_id = current_job.id if current_job else '' - record.job_description = current_job.description if current_job else '' - - return True - - -def get_job_logger(name): - logger = logging.getLogger('rq.job.' + name) - - handler = logging.StreamHandler() - handler.formatter = logging.Formatter(settings.RQ_WORKER_JOB_LOG_FORMAT) - handler.addFilter(CurrentJobFilter()) - - logger.addHandler(handler) - logger.propagate = False - - return logger - - - class CurrentJobFilter(logging.Filter): def filter(self, record): current_job = get_current_job() From 770eaa241c69a43e74fbf7ff675e0493e660bba4 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sun, 27 Oct 2019 22:38:37 +0200 Subject: [PATCH 41/43] restart all workers when a worker dies --- redash/cli/rq.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/redash/cli/rq.py b/redash/cli/rq.py index d7ec7dcd2e..4d7abdc72e 100644 --- a/redash/cli/rq.py +++ b/redash/cli/rq.py @@ -24,13 +24,13 @@ def scheduler(): @argument('workers_count') @argument('queues', nargs=-1) def workers(workers_count=1, queues='default'): - m = Manager() - for i in range(int(workers_count)): - cmd = './manage.py rq worker {}'.format(" ".join(queues)) - m.add_process('worker #{}'.format(i+1), cmd) + while True: + m = Manager() + for i in range(int(workers_count)): + cmd = './manage.py rq worker {}'.format(" ".join(queues)) + m.add_process('worker #{}'.format(i+1), cmd) - m.loop() - sys.exit(m.returncode) + m.loop() @manager.command() From 306e258095aa8ab1baf035710e30ecf259ab5380 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sun, 27 Oct 2019 22:50:52 +0200 Subject: [PATCH 42/43] Revert "restart all workers when a worker dies" This reverts commit 32c989ef840b96fd32177a4ff67bc0c6105761b8. --- redash/cli/rq.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/redash/cli/rq.py b/redash/cli/rq.py index 4d7abdc72e..d7ec7dcd2e 100644 --- a/redash/cli/rq.py +++ b/redash/cli/rq.py @@ -24,13 +24,13 @@ def scheduler(): @argument('workers_count') @argument('queues', nargs=-1) def workers(workers_count=1, queues='default'): - while True: - m = Manager() - for i in range(int(workers_count)): - cmd = './manage.py rq worker {}'.format(" ".join(queues)) - m.add_process('worker #{}'.format(i+1), cmd) + m = Manager() + for i in range(int(workers_count)): + cmd = './manage.py rq worker {}'.format(" ".join(queues)) + m.add_process('worker #{}'.format(i+1), cmd) - m.loop() + m.loop() + sys.exit(m.returncode) @manager.command() From d5c54116d7ad29a243a507d58e0e9cb50d9b4e5f Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Tue, 29 Oct 2019 15:33:16 +0200 Subject: [PATCH 43/43] optimize work horse initialization by configuration mappers on the worker process --- redash/cli/rq.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/redash/cli/rq.py b/redash/cli/rq.py index d7ec7dcd2e..4fd9215003 100644 --- a/redash/cli/rq.py +++ b/redash/cli/rq.py @@ -7,6 +7,7 @@ from click import argument from flask.cli import AppGroup from rq import Connection, Worker +from sqlalchemy.orm import configure_mappers from redash import rq_redis_connection from redash.schedule import rq_scheduler, schedule_periodic_jobs @@ -36,6 +37,11 @@ def workers(workers_count=1, queues='default'): @manager.command() @argument('queues', nargs=-1) def worker(queues='default'): + # Configure any SQLAlchemy mappers loaded until now so that the mapping configuration + # will already be available to the forked work horses and they won't need + # to spend valuable time re-doing that on every fork. + configure_mappers() + if not queues: queues = ('default',) with Connection(rq_redis_connection):