Skip to content

Commit

Permalink
Monitor general RQ queues (default, periodic and schemas) (#4256)
Browse files Browse the repository at this point in the history
* track general RQ queues (default, periodic and schemas)

* get all active RQ queues

* call get_celery_queues in another place
  • Loading branch information
Omer Lachish authored and arikfr committed Oct 23, 2019
1 parent f0f85ec commit 43b35b6
Showing 1 changed file with 4 additions and 10 deletions.
14 changes: 4 additions & 10 deletions redash/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def get_object_counts():
return status


def get_queues():
def get_celery_queues():
queue_names = db.session.query(DataSource.queue_name).distinct()
scheduled_queue_names = db.session.query(DataSource.scheduled_queue_name).distinct()
query = db.session.execute(union_all(queue_names, scheduled_queue_names))
Expand All @@ -35,14 +35,8 @@ def get_queues():


def get_queues_status():
queues = {}

for queue in get_queues():
queues[queue] = {
'size': redis_connection.llen(queue)
}

return queues
return dict({queue: {'size': redis_connection.llen(queue)} for queue in get_celery_queues()}.items() +
{queue.name: {'size': len(queue)} for queue in Queue.all(connection=redis_connection)}.items())


def get_db_sizes():
Expand Down Expand Up @@ -134,7 +128,7 @@ def celery_tasks():
tasks = parse_tasks(celery.control.inspect().active(), 'active')
tasks += parse_tasks(celery.control.inspect().reserved(), 'reserved')

for queue_name in get_queues():
for queue_name in get_celery_queues():
tasks += get_waiting_in_queue(queue_name)

return tasks
Expand Down

0 comments on commit 43b35b6

Please sign in to comment.