From 43b35b6fb40ebae981c082b2e34eb8320432caeb Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Wed, 23 Oct 2019 12:31:32 +0300 Subject: [PATCH] Monitor general RQ queues (default, periodic and schemas) (#4256) * track general RQ queues (default, periodic and schemas) * get all active RQ queues * call get_celery_queues in another place --- redash/monitor.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/redash/monitor.py b/redash/monitor.py index b2fd10323e..7c4f4e5a80 100644 --- a/redash/monitor.py +++ b/redash/monitor.py @@ -26,7 +26,7 @@ def get_object_counts(): return status -def get_queues(): +def get_celery_queues(): queue_names = db.session.query(DataSource.queue_name).distinct() scheduled_queue_names = db.session.query(DataSource.scheduled_queue_name).distinct() query = db.session.execute(union_all(queue_names, scheduled_queue_names)) @@ -35,14 +35,8 @@ def get_queues(): def get_queues_status(): - queues = {} - - for queue in get_queues(): - queues[queue] = { - 'size': redis_connection.llen(queue) - } - - return queues + return dict({queue: {'size': redis_connection.llen(queue)} for queue in get_celery_queues()}.items() + + {queue.name: {'size': len(queue)} for queue in Queue.all(connection=redis_connection)}.items()) def get_db_sizes(): @@ -134,7 +128,7 @@ def celery_tasks(): tasks = parse_tasks(celery.control.inspect().active(), 'active') tasks += parse_tasks(celery.control.inspect().reserved(), 'reserved') - for queue_name in get_queues(): + for queue_name in get_celery_queues(): tasks += get_waiting_in_queue(queue_name) return tasks