Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiprocess RQ workers #4233

Closed
wants to merge 43 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
824977b
add rq and an rq_worker service
Aug 21, 2019
7a59a1a
add rq_scheduler and an rq_scheduler service
Aug 21, 2019
4a7c7b7
move beat schedule to periodic_jobs queue
Aug 21, 2019
965ad8a
move version checks to RQ
Aug 27, 2019
13c2891
move query result cleanup to RQ
Aug 27, 2019
2b070f3
move custom tasks to RQ
Aug 27, 2019
6479c1f
do actual schema refreshes in rq
Aug 29, 2019
c368833
move send_email to rq
Aug 29, 2019
af94ea9
DRY up enqueues
Aug 29, 2019
d9f6d31
ditch and use a partially applied decorator
Aug 29, 2019
8257983
👋 beat
Aug 29, 2019
c906b36
rename rq_scheduler to plain scheduler, now that there's no Celery sc…
Aug 29, 2019
6b72ff6
add logging context to rq jobs (while keeping execute_query context v…
Sep 1, 2019
6073a41
move schedule to its own module
Sep 1, 2019
a627080
cancel previously scheduled periodic jobs. not sure this is a good idea.
Sep 1, 2019
1581920
rename redash.scheduler to redash.schedule
Sep 1, 2019
67ff9e1
allow custom dynamic jobs to be added decleratively
Sep 2, 2019
4116fe5
pleasing the CodeClimate overlords
Sep 3, 2019
a5adbf3
adjust cypress docker-compose.yml to include rq changes
Sep 3, 2019
97a3dac
DRY up Cypress docker-compose
Sep 3, 2019
9d50738
an odd attempt at watching docker-compose logs when running with Cypress
Sep 4, 2019
3c4d25c
Revert "an odd attempt at watching docker-compose logs when running w…
Sep 4, 2019
f714dd6
show docker-compose logs at Cypress shutdown
Sep 4, 2019
6c40c4e
Revert "DRY up Cypress docker-compose"
Sep 8, 2019
77df227
minimal version for binding is 3.2
Sep 8, 2019
0540472
remove unneccesary code reloads on cypress
Sep 8, 2019
ec5afb4
SCHEMAS_REFRESH_QUEUE is no longer a required setting
Sep 9, 2019
814110f
split tasks/queries.py to execution.py and maintenance.py
Sep 9, 2019
f7fa7dc
rename worker to celery_worker and rq_worker to worker
Sep 9, 2019
47c5287
delete all existing periodic jobs before scheduling them
Sep 12, 2019
c3d43c7
remove some unrequired requires
Sep 22, 2019
a16e392
move schedule example to redash.schedule
Sep 22, 2019
2aae335
pleasing the CodeClimate overlords
Sep 22, 2019
0be56a5
revert to calling a function in dynamic settings to allow periodic jo…
Oct 6, 2019
5c499c8
set the timeout_ttl to double the interval to avoid job results from …
Oct 10, 2019
9f5a68e
a naive way to launch multiple workers in one container
Oct 10, 2019
1e44681
updated and less brittle healthcheck
Oct 10, 2019
7479493
describe custom jobs and don't actually schedule them
Oct 15, 2019
91a2eff
launch multiple workers with Honcho
Oct 27, 2019
d0e3721
fix my faulty rebase
Oct 27, 2019
770eaa2
restart all workers when a worker dies
Oct 27, 2019
306e258
Revert "restart all workers when a worker dies"
Oct 27, 2019
d5c5411
optimize work horse initialization by configuration mappers on the wo…
Oct 29, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions bin/docker-entrypoint
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@ dev_scheduler() {
}

worker() {
echo "Starting RQ worker..."
WORKERS_COUNT=${WORKERS_COUNT:-2}
echo "Starting $WORKERS_COUNT RQ workers for queues $QUEUES..."

exec /app/manage.py rq worker $QUEUES
exec /app/manage.py rq workers $WORKERS_COUNT $QUEUES
}

dev_worker() {
echo "Starting dev RQ worker..."
WORKERS_COUNT=${WORKERS_COUNT:-2}
echo "Starting $WORKERS_COUNT dev RQ workers for queues: $QUEUES..."

exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq worker $QUEUES
exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq workers $WORKERS_COUNT $QUEUES
}

dev_celery_worker() {
Expand Down
49 changes: 46 additions & 3 deletions redash/cli/rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import sys
import datetime

from honcho.manager import Manager
from click import argument
from flask.cli import AppGroup
from rq import Connection, Worker
from sqlalchemy.orm import configure_mappers

from redash import rq_redis_connection
from redash.schedule import rq_scheduler, schedule_periodic_jobs
Expand All @@ -19,9 +21,27 @@ def scheduler():
rq_scheduler.run()


@manager.command()
@argument('workers_count')
@argument('queues', nargs=-1)
def workers(workers_count=1, queues='default'):
m = Manager()
for i in range(int(workers_count)):
cmd = './manage.py rq worker {}'.format(" ".join(queues))
m.add_process('worker #{}'.format(i+1), cmd)

m.loop()
sys.exit(m.returncode)


@manager.command()
@argument('queues', nargs=-1)
def worker(queues='default'):
# Configure any SQLAlchemy mappers loaded until now so that the mapping configuration
# will already be available to the forked work horses and they won't need
# to spend valuable time re-doing that on every fork.
configure_mappers()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably move this to its own PR, so it's not forgotten in case we abandon this one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙊#4314

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops :-)


if not queues:
queues = ('default',)
with Connection(rq_redis_connection):
Expand All @@ -35,9 +55,32 @@ def healthcheck():
with Connection(rq_redis_connection):
all_workers = Worker.all()

local_workers = [w for w in all_workers if w.hostname == hostname]
local_workers = [w for w in all_workers if w.hostname.decode() == hostname]
row_format ="{:>10}" * (len(local_workers) + 1)

print("Local worker PIDs:")
local_worker_pids = set([w.pid for w in local_workers])
print(row_format.format("", *local_worker_pids))

print("Time since seen:")
heartbeats = [w.last_heartbeat for w in local_workers]
time_since_seen = [datetime.datetime.utcnow() - hb for hb in heartbeats]
active = [t.seconds < 60 for t in time_since_seen]
print(row_format.format("", *[t.seconds for t in time_since_seen]))
seen_lately = [t.seconds < 60 for t in time_since_seen]

print("State:")
states = [w.state for w in local_workers]
print(row_format.format("", *states))
busy = [s == "busy" for s in states]

print("Jobs in queues:")
jobs_in_queues = [sum([len(q.jobs) for q in w.queues]) for w in local_workers]
print(row_format.format("", *jobs_in_queues))
has_nothing_to_do = [j == 0 for j in jobs_in_queues]

print("Healty:")
# a healthy worker is either busy, has been seen lately or has nothing to do
healthy = [any(w) for w in zip(busy, seen_lately, has_nothing_to_do)]
print(row_format.format("", *healthy))

sys.exit(int(not all(active)))
sys.exit(int(not all(healthy)))
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ chromelogger==0.4.3
pypd==1.1.0
disposable-email-domains>=0.0.52
gevent==1.4.0
honcho==1.0.1
# Install the dependencies of the bin/bundle-extensions script here.
# It has its own requirements file to simplify the frontend client build process
-r requirements_bundles.txt
Expand Down