diff --git a/redash/schedule.py b/redash/schedule.py index 0a6b75fa0a..3f6c7e4035 100644 --- a/redash/schedule.py +++ b/redash/schedule.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +import logging from datetime import datetime, timedelta from functools import partial from random import randint @@ -8,9 +9,11 @@ from redash import settings, rq_redis_connection from redash.tasks import (sync_user_details, refresh_queries, empty_schedules, refresh_schemas, - cleanup_query_results, + cleanup_query_results, purge_failed_jobs, version_check, send_aggregated_errors) +logger = logging.getLogger(__name__) + rq_scheduler = Scheduler(connection=rq_redis_connection, queue_name="periodic", interval=5) @@ -36,6 +39,7 @@ def schedule_periodic_jobs(): {"func": empty_schedules, "interval": timedelta(minutes=60)}, {"func": refresh_schemas, "interval": timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)}, {"func": sync_user_details, "timeout": 60, "ttl": 45, "interval": timedelta(minutes=1)}, + {"func": purge_failed_jobs, "interval": timedelta(days=1)}, {"func": send_aggregated_errors, "interval": timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL)} ] @@ -53,4 +57,5 @@ def schedule_periodic_jobs(): jobs.extend(settings.dynamic_settings.periodic_jobs() or []) for job in jobs: + logger.info("Scheduling %s with interval %s.", job['func'].__name__, job.get('interval')) schedule(**job) diff --git a/redash/settings/__init__.py b/redash/settings/__init__.py index a5de3b64f6..57ecba499b 100644 --- a/redash/settings/__init__.py +++ b/redash/settings/__init__.py @@ -197,6 +197,7 @@ STATIC_ASSETS_PATH = fix_assets_path(os.environ.get("REDASH_STATIC_ASSETS_PATH", "../client/dist/")) JOB_EXPIRY_TIME = int(os.environ.get("REDASH_JOB_EXPIRY_TIME", 3600 * 12)) +JOB_DEFAULT_FAILURE_TTL = int(os.environ.get("REDASH_JOB_DEFAULT_FAILURE_TTL", 7 * 24 * 60 * 60)) LOG_LEVEL = os.environ.get("REDASH_LOG_LEVEL", "INFO") LOG_STDOUT = parse_boolean(os.environ.get('REDASH_LOG_STDOUT', 'false')) diff --git a/redash/tasks/__init__.py b/redash/tasks/__init__.py index 613111f4f7..6f0d590514 100644 --- a/redash/tasks/__init__.py +++ b/redash/tasks/__init__.py @@ -1,4 +1,4 @@ -from .general import record_event, version_check, send_mail, sync_user_details +from .general import record_event, version_check, send_mail, sync_user_details, purge_failed_jobs from .queries import (QueryTask, enqueue_query, execute_query, refresh_queries, refresh_schemas, cleanup_query_results, empty_schedules) from .alerts import check_alerts_for_query diff --git a/redash/tasks/general.py b/redash/tasks/general.py index fcac675f5c..bfe2a1065d 100644 --- a/redash/tasks/general.py +++ b/redash/tasks/general.py @@ -1,7 +1,11 @@ import requests +from datetime import datetime from flask_mail import Message -from redash import mail, models, settings +from rq import Connection, Queue +from rq.registry import FailedJobRegistry +from rq.job import Job +from redash import mail, models, settings, rq_redis_connection from redash.models import users from redash.version_check import run_version_check from redash.worker import job, get_job_logger @@ -60,3 +64,17 @@ def send_mail(to, subject, html, text): def sync_user_details(): users.sync_last_active_at() + + +def purge_failed_jobs(): + with Connection(rq_redis_connection): + for queue in Queue.all(): + failed_job_ids = FailedJobRegistry(queue=queue).get_job_ids() + failed_jobs = Job.fetch_many(failed_job_ids, rq_redis_connection) + stale_jobs = [job for job in failed_jobs if (datetime.utcnow() - job.ended_at).seconds > settings.JOB_DEFAULT_FAILURE_TTL] + + for job in stale_jobs: + job.delete() + + if stale_jobs: + logger.info('Purged %d old failed jobs from the %s queue.', len(stale_jobs), queue.name)