Skip to content

Commit

Permalink
RQ: periodically clear failed jobs (#4306)
Browse files Browse the repository at this point in the history
* add some logging to scheduler

* clean failed RQ job data from Redis

* move stale job purging to tasks/general.py

* provide better documentation on why we don't reject keys in FailedJobRegistry at the moment

* pleasing the CodeClimate overlords

* simplified clenaup by deleting both job data and registry entry

* use FailedJobRegistry as source of truth for purging

* remove redundant key deletion

* Update redash/settings/__init__.py

Co-Authored-By: Arik Fraimovich <arik@arikfr.com>
  • Loading branch information
Omer Lachish and arikfr authored Nov 7, 2019
1 parent 6f791a0 commit a33d11d
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 3 deletions.
7 changes: 6 additions & 1 deletion redash/schedule.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import absolute_import
import logging
from datetime import datetime, timedelta
from functools import partial
from random import randint
Expand All @@ -8,9 +9,11 @@
from redash import settings, rq_redis_connection
from redash.tasks import (sync_user_details, refresh_queries,
empty_schedules, refresh_schemas,
cleanup_query_results,
cleanup_query_results, purge_failed_jobs,
version_check, send_aggregated_errors)

logger = logging.getLogger(__name__)

rq_scheduler = Scheduler(connection=rq_redis_connection,
queue_name="periodic",
interval=5)
Expand All @@ -36,6 +39,7 @@ def schedule_periodic_jobs():
{"func": empty_schedules, "interval": timedelta(minutes=60)},
{"func": refresh_schemas, "interval": timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)},
{"func": sync_user_details, "timeout": 60, "ttl": 45, "interval": timedelta(minutes=1)},
{"func": purge_failed_jobs, "interval": timedelta(days=1)},
{"func": send_aggregated_errors, "interval": timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL)}
]

Expand All @@ -53,4 +57,5 @@ def schedule_periodic_jobs():
jobs.extend(settings.dynamic_settings.periodic_jobs() or [])

for job in jobs:
logger.info("Scheduling %s with interval %s.", job['func'].__name__, job.get('interval'))
schedule(**job)
1 change: 1 addition & 0 deletions redash/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@
STATIC_ASSETS_PATH = fix_assets_path(os.environ.get("REDASH_STATIC_ASSETS_PATH", "../client/dist/"))

JOB_EXPIRY_TIME = int(os.environ.get("REDASH_JOB_EXPIRY_TIME", 3600 * 12))
JOB_DEFAULT_FAILURE_TTL = int(os.environ.get("REDASH_JOB_DEFAULT_FAILURE_TTL", 7 * 24 * 60 * 60))

LOG_LEVEL = os.environ.get("REDASH_LOG_LEVEL", "INFO")
LOG_STDOUT = parse_boolean(os.environ.get('REDASH_LOG_STDOUT', 'false'))
Expand Down
2 changes: 1 addition & 1 deletion redash/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .general import record_event, version_check, send_mail, sync_user_details
from .general import record_event, version_check, send_mail, sync_user_details, purge_failed_jobs
from .queries import (QueryTask, enqueue_query, execute_query, refresh_queries,
refresh_schemas, cleanup_query_results, empty_schedules)
from .alerts import check_alerts_for_query
Expand Down
20 changes: 19 additions & 1 deletion redash/tasks/general.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import requests
from datetime import datetime

from flask_mail import Message
from redash import mail, models, settings
from rq import Connection, Queue
from rq.registry import FailedJobRegistry
from rq.job import Job
from redash import mail, models, settings, rq_redis_connection
from redash.models import users
from redash.version_check import run_version_check
from redash.worker import job, get_job_logger
Expand Down Expand Up @@ -60,3 +64,17 @@ def send_mail(to, subject, html, text):

def sync_user_details():
users.sync_last_active_at()


def purge_failed_jobs():
with Connection(rq_redis_connection):
for queue in Queue.all():
failed_job_ids = FailedJobRegistry(queue=queue).get_job_ids()
failed_jobs = Job.fetch_many(failed_job_ids, rq_redis_connection)
stale_jobs = [job for job in failed_jobs if (datetime.utcnow() - job.ended_at).seconds > settings.JOB_DEFAULT_FAILURE_TTL]

for job in stale_jobs:
job.delete()

if stale_jobs:
logger.info('Purged %d old failed jobs from the %s queue.', len(stale_jobs), queue.name)

0 comments on commit a33d11d

Please sign in to comment.