From 735984147ac9e2f385e9037fa04e2ee0ba79bef5 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Thu, 17 Oct 2019 12:37:19 +0300 Subject: [PATCH 1/3] track general RQ queues (default, periodic and schemas) --- redash/monitor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/redash/monitor.py b/redash/monitor.py index f0d8abfa1b..003f852689 100644 --- a/redash/monitor.py +++ b/redash/monitor.py @@ -31,7 +31,8 @@ def get_queues(): scheduled_queue_names = db.session.query(DataSource.scheduled_queue_name).distinct() query = db.session.execute(union_all(queue_names, scheduled_queue_names)) - return ['celery'] + [row[0] for row in query] + general_rq_queues = map(lambda q: 'rq:queue:' + q, ['default', 'periodic', 'schemas']) + return ['celery'] + general_rq_queues + [row[0] for row in query] def get_queues_status(): From 54ba43d9ee0f3a47fe217f81758485254a3aa492 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Tue, 22 Oct 2019 14:38:03 +0300 Subject: [PATCH 2/3] get all active RQ queues --- redash/monitor.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/redash/monitor.py b/redash/monitor.py index 003f852689..b74fe1d403 100644 --- a/redash/monitor.py +++ b/redash/monitor.py @@ -26,24 +26,17 @@ def get_object_counts(): return status -def get_queues(): +def get_celery_queues(): queue_names = db.session.query(DataSource.queue_name).distinct() scheduled_queue_names = db.session.query(DataSource.scheduled_queue_name).distinct() query = db.session.execute(union_all(queue_names, scheduled_queue_names)) - general_rq_queues = map(lambda q: 'rq:queue:' + q, ['default', 'periodic', 'schemas']) - return ['celery'] + general_rq_queues + [row[0] for row in query] + return ['celery'] + [row[0] for row in query] def get_queues_status(): - queues = {} - - for queue in get_queues(): - queues[queue] = { - 'size': redis_connection.llen(queue) - } - - return queues + return dict({queue: {'size': redis_connection.llen(queue)} for queue in get_celery_queues()}.items() + + {queue.name: {'size': len(queue)} for queue in Queue.all(connection=redis_connection)}.items()) def get_db_sizes(): From 9959542052b6941e5e68cac1d815ea6c369b9537 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Wed, 23 Oct 2019 11:19:59 +0300 Subject: [PATCH 3/3] call get_celery_queues in another place --- redash/monitor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redash/monitor.py b/redash/monitor.py index b74fe1d403..684eb4103a 100644 --- a/redash/monitor.py +++ b/redash/monitor.py @@ -128,7 +128,7 @@ def celery_tasks(): tasks = parse_tasks(celery.control.inspect().active(), 'active') tasks += parse_tasks(celery.control.inspect().reserved(), 'reserved') - for queue_name in get_queues(): + for queue_name in get_celery_queues(): tasks += get_waiting_in_queue(queue_name) return tasks