diff --git a/bin/docker-entrypoint b/bin/docker-entrypoint index d88266fa2d..b9d1dc9025 100755 --- a/bin/docker-entrypoint +++ b/bin/docker-entrypoint @@ -23,15 +23,17 @@ dev_scheduler() { } worker() { - echo "Starting RQ worker..." + WORKERS_COUNT=${WORKERS_COUNT:-2} + echo "Starting $WORKERS_COUNT RQ workers for queues $QUEUES..." - exec /app/manage.py rq worker $QUEUES + exec /app/manage.py rq workers $WORKERS_COUNT $QUEUES } dev_worker() { - echo "Starting dev RQ worker..." + WORKERS_COUNT=${WORKERS_COUNT:-2} + echo "Starting $WORKERS_COUNT dev RQ workers for queues: $QUEUES..." - exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq worker $QUEUES + exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq workers $WORKERS_COUNT $QUEUES } dev_celery_worker() { diff --git a/redash/cli/rq.py b/redash/cli/rq.py index 357f7cfd2d..4fd9215003 100644 --- a/redash/cli/rq.py +++ b/redash/cli/rq.py @@ -3,9 +3,11 @@ import sys import datetime +from honcho.manager import Manager from click import argument from flask.cli import AppGroup from rq import Connection, Worker +from sqlalchemy.orm import configure_mappers from redash import rq_redis_connection from redash.schedule import rq_scheduler, schedule_periodic_jobs @@ -19,9 +21,27 @@ def scheduler(): rq_scheduler.run() +@manager.command() +@argument('workers_count') +@argument('queues', nargs=-1) +def workers(workers_count=1, queues='default'): + m = Manager() + for i in range(int(workers_count)): + cmd = './manage.py rq worker {}'.format(" ".join(queues)) + m.add_process('worker #{}'.format(i+1), cmd) + + m.loop() + sys.exit(m.returncode) + + @manager.command() @argument('queues', nargs=-1) def worker(queues='default'): + # Configure any SQLAlchemy mappers loaded until now so that the mapping configuration + # will already be available to the forked work horses and they won't need + # to spend valuable time re-doing that on every fork. + configure_mappers() + if not queues: queues = ('default',) with Connection(rq_redis_connection): @@ -35,9 +55,32 @@ def healthcheck(): with Connection(rq_redis_connection): all_workers = Worker.all() - local_workers = [w for w in all_workers if w.hostname == hostname] + local_workers = [w for w in all_workers if w.hostname.decode() == hostname] + row_format ="{:>10}" * (len(local_workers) + 1) + + print("Local worker PIDs:") + local_worker_pids = set([w.pid for w in local_workers]) + print(row_format.format("", *local_worker_pids)) + + print("Time since seen:") heartbeats = [w.last_heartbeat for w in local_workers] time_since_seen = [datetime.datetime.utcnow() - hb for hb in heartbeats] - active = [t.seconds < 60 for t in time_since_seen] + print(row_format.format("", *[t.seconds for t in time_since_seen])) + seen_lately = [t.seconds < 60 for t in time_since_seen] + + print("State:") + states = [w.state for w in local_workers] + print(row_format.format("", *states)) + busy = [s == "busy" for s in states] + + print("Jobs in queues:") + jobs_in_queues = [sum([len(q.jobs) for q in w.queues]) for w in local_workers] + print(row_format.format("", *jobs_in_queues)) + has_nothing_to_do = [j == 0 for j in jobs_in_queues] + + print("Healty:") + # a healthy worker is either busy, has been seen lately or has nothing to do + healthy = [any(w) for w in zip(busy, seen_lately, has_nothing_to_do)] + print(row_format.format("", *healthy)) - sys.exit(int(not all(active))) + sys.exit(int(not all(healthy))) diff --git a/requirements.txt b/requirements.txt index 771c2e5c60..3fa3b0d1d9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -59,6 +59,7 @@ chromelogger==0.4.3 pypd==1.1.0 disposable-email-domains>=0.0.52 gevent==1.4.0 +honcho==1.0.1 # Install the dependencies of the bin/bundle-extensions script here. # It has its own requirements file to simplify the frontend client build process -r requirements_bundles.txt