Skip to content

Commit

Permalink
config change, auto worker improvements, exhaustive issue worker
Browse files Browse the repository at this point in the history
config for workers now has a new format that replaces the old 'Controller' section. also the github worker now goes through all issues regardless if the issues have been inserted because this way is necessary to see all issue events and comments even on past issues. The starting and killing of worker processes when starting them automatically is now much cleaner
  • Loading branch information
gabe-heim committed Jul 17, 2019
1 parent 6b3efb0 commit 16f3e16
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 146 deletions.
174 changes: 122 additions & 52 deletions augur/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import atexit
import click
import subprocess
# import psutil

class AugurGunicornApp(gunicorn.app.base.BaseApplication):
"""
Expand Down Expand Up @@ -62,68 +61,133 @@ def get_process_id(name):
app.schedule_updates()
master = None

broker_switch = app.read_config('Controller', 'broker', 'AUGUR_BROKER', 0)
housekeeper_switch = app.read_config('Controller', 'housekeeper', 'AUGUR_HOUSEKEEPER', 0)
github_worker_switch = app.read_config('Controller', 'github_worker', 'AUGUR_GH_WORKER', 0)
# broker_switch = app.read_config('Controller', 'broker', 'AUGUR_BROKER', 0)
# housekeeper_switch = app.read_config('Controller', 'housekeeper', 'AUGUR_HOUSEKEEPER', 0)
github_worker_switch = app.read_config('Workers', 'github_worker', 'AUGUR_GH_WORKER', {"switch": 0})['switch']
repo_info_worker_switch = app.read_config('Workers', 'repo_info_worker', 'AUGUR_INFO_WORKER', {"switch": 0})['switch']
insight_worker_switch = app.read_config('Workers', 'insight_worker', 'AUGUR_INSIGHT_WORKER', {"switch": 0})['switch']
controller = {
'broker': broker_switch,
'housekeeper': housekeeper_switch,
# 'broker': broker_switch,
# 'housekeeper': housekeeper_switch,
'github_worker': github_worker_switch,
'repo_info_worker': repo_info_worker_switch,
'insight_worker': insight_worker_switch
}

logger.info("Controller specs ('1' for components that are set to automatically boot): {}".format(str(controller)))

# logger.info("Worker specs ('1' for components that are set to automatically boot): {}".format(str(controller)))
manager = None
broker = None
housekeeper = None

if controller['broker'] == 1:
logger.info("Booting broker and its manager")
manager = mp.Manager()
broker = manager.dict()
# if controller['broker'] == 1:
logger.info("Booting broker and its manager")
manager = mp.Manager()
broker = manager.dict()
# broker['worker_pids'] = manager.list()

if controller['housekeeper'] == 1:
logger.info("Booting housekeeper")
jobs = app.read_config('Housekeeper', 'jobs', 'AUGUR_JOBS', [])
housekeeper = Housekeeper(
jobs,
broker,
broker_port=app.read_config('Server', 'port', 'AUGUR_PORT', '5000'),
user=app.read_config('Database', 'user', 'AUGUR_DB_USER', 'root'),
password=app.read_config('Database', 'password', 'AUGUR_DB_PASS', 'password'),
host=app.read_config('Database', 'host', 'AUGUR_DB_HOST', '127.0.0.1'),
port=app.read_config('Database', 'port', 'AUGUR_DB_PORT', '3306'),
dbname=app.read_config('Database', 'database', 'AUGUR_DB_NAME', 'msr14')
)

if controller['github_worker'] == 1:
# if controller['housekeeper'] == 1:
logger.info("Booting housekeeper")
jobs = app.read_config('Housekeeper', 'jobs', 'AUGUR_JOBS', [])
housekeeper = Housekeeper(
jobs,
broker,
broker_port=app.read_config('Server', 'port', 'AUGUR_PORT', '5000'),
user=app.read_config('Database', 'user', 'AUGUR_DB_USER', 'root'),
password=app.read_config('Database', 'password', 'AUGUR_DB_PASS', 'password'),
host=app.read_config('Database', 'host', 'AUGUR_DB_HOST', '127.0.0.1'),
port=app.read_config('Database', 'port', 'AUGUR_DB_PORT', '3306'),
dbname=app.read_config('Database', 'database', 'AUGUR_DB_NAME', 'msr14')
)

worker_pids = []
process = subprocess.Popen(['ps', '-a'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, notused = process.communicate()
# data = [(int(p), c) for p, c in [x.rstrip('\n').split(' ', 1) \
# for x in os.popen('ps h -eo pid:1,command')]]
# for p in psutil.process_iter():
# logger.info(p)
# if 'nginx' in p.name() or 'nginx' in ' '.join(p.cmdline()):
# p.terminate()
# p.wait()
try:
for line in stdout.splitlines():
# for line in data:
logger.info(line)
pid, cmdline = line.split(' ', 1)
logger.info("HERE {}".format(type(cmdline)))
if 'github_worker' in cmdline and github_worker_switch == 1:
# logger.info(cmdline)
logger.info("Killing: {}".format(line))
if 'repo_info_worker' in cmdline and repo_info_worker_switch == 1:
# logger.info(cmdline)
logger.info("Killing: {}".format(line))
if 'insight_worker' in cmdline and insight_worker_switch == 1:
# logger.info(cmdline)
logger.info("Killing: {}".format(line))
except:
pass

if github_worker_switch == 1:
gh_pids = get_process_id("/bin/sh -c cd workers/augur_worker_github && github_worker")
logger.info(gh_pids)
worker_pids += gh_pids
if len(gh_pids) > 0:
worker_pids.append(gh_pids[0] + 1)
gh_pids.append(gh_pids[0] + 1)
logger.info("Found github worker pids: {}".format(gh_pids))
for pid in gh_pids:
try:
os.kill(pid, 9)
except:
logger.info("Worker process {} already killed".format(pid))

# proc_iter = psutil.process_iter(attrs=["pid", "name", "cmdline"])
# past_worker = any("/bin/sh -c cd workers/augur_worker_github && github_worker" in p.info["cmdline"] for p in proc_iter)
# logger.info(str(past_worker))

logger.info("Booting github worker")
logger.info("abt to call method")
up = mp.Process(target=worker_start, args=(), daemon=True)
up.start()
github_worker = mp.Process(target=github_worker_start, args=(), daemon=True)
github_worker.start()
time.sleep(2.5)

if controller['repo_info_worker'] == 1:
ri_pids = get_process_id("/bin/sh -c cd workers/gh_repo_info_worker && repo_info_worker")
worker_pids += ri_pids
if len(ri_pids) > 0:
worker_pids.append(ri_pids[0] + 1)
ri_pids.append(ri_pids[0] + 1)
logger.info("Found repo info worker pids: {}".format(ri_pids))
for pid in ri_pids:
try:
os.kill(pid, 9)
except:
logger.info("Worker process {} already killed".format(pid))

logger.info("Booting repo_info worker")
repo_info_worker = mp.Process(target=repo_info_worker_start, args=(), daemon=True)
repo_info_worker.start()
time.sleep(2.5)

if controller['insight_worker'] == 1:
insight_pids = get_process_id("/bin/sh -c cd workers/insight_worker && insight_worker")
worker_pids += insight_pids
if len(insight_pids) > 0:
worker_pids.append(insight_pids[0] + 1)
insight_pids.append(insight_pids[0] + 1)
logger.info("Found repo info worker pids: {}".format(insight_pids))
for pid in insight_pids:
try:
os.kill(pid, 9)
except:
logger.info("Worker process {} already killed".format(pid))

logger.info("Booting insight worker")
insight_worker = mp.Process(target=insight_worker_start, args=(), daemon=True)
insight_worker.start()
time.sleep(2.5)


@atexit.register
def exit():
# try:
# for pid in broker['worker_pids']:
# os.kill(pid, 9)
# except:
# logger.info("Worker process {} already killed".format(pid))
try:
for pid in worker_pids:
os.kill(pid, 9)
except:
logger.info("Worker process {} already killed".format(pid))
if master is not None:
master.halt()
logger.info("Shutting down app updates...")
Expand All @@ -134,17 +198,15 @@ def exit():
if housekeeper is not None:
housekeeper.shutdown_updates()

if controller['github_worker'] == 1:
for pid in gh_pids:
try:
os.kill(pid, 9)
logger.info("killed worker process {}".format(pid))
except:
logger.info("Worker process {} already killed".format(pid))
# logger.info("You had the github_worker startup enabled, please allow ~30 seconds for its processes to shutdown before running 'make dev' again")
if github_worker_switch == 1:
logger.info("Shutting down github worker...")

up.terminate()
github_worker.terminate()
if repo_info_worker_switch == 1:
logger.info("Shutting down github worker...")
repo_info_worker.terminate()
if insight_worker_switch == 1:
logger.info("Shutting down github worker...")
insight_worker.terminate()
# if hasattr(manager, "shutdown"):
# wait for the spawner and the worker threads to go down
#
Expand Down Expand Up @@ -173,6 +235,14 @@ def exit():
logger.info('Starting server...')
master = Arbiter(AugurGunicornApp(options, manager=manager, broker=broker, housekeeper=housekeeper)).run()

def worker_start():
def github_worker_start():
logger.info("Booting github worker")
process = subprocess.Popen("cd workers/augur_worker_github && github_worker", shell=True)

def repo_info_worker_start():
logger.info("Booting repo_info worker")
process = subprocess.Popen("cd workers/gh_repo_info_worker && repo_info_worker", shell=True)

def insight_worker_start():
logger.info("Booting insight worker")
process = subprocess.Popen("cd workers/insight_worker && insight_worker", shell=True)
59 changes: 30 additions & 29 deletions augur/routes/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,20 @@ def worker():
givens = server.broker[worker['id']]['given']
user_queue = server.broker[worker['id']]['user_queue']
maintain_queue = server.broker[worker['id']]['maintain_queue']

if server.broker[worker]['status'] != 'Disconnected':
try:
new_task = user_queue.pop(0)
logging.info("Worker {}'s user queue is not empty, preparing to send it the next 'user' task: {}".format(worker['id'], new_task['given']['git_url']))
logging.info("Remaining length of user queue: {}".format(str(len(user_queue))))
requests.post(server.broker[worker['id']]['location'] + '/AUGWOP/task', json=new_task)
except:
try:
new_task = user_queue.pop(0)
logging.info("Worker {}'s user queue is not empty, preparing to send it the next 'user' task: {}".format(worker['id'], new_task['given']['git_url']))
logging.info("Remaining length of user queue: {}".format(str(len(user_queue))))
new_task = maintain_queue.pop(0)
logging.info("Worker {}'s user queue is empty but the maintain queue is not, preparing to send it the next 'maintain' task: {}".format(worker['id'], new_task['given']['git_url']))
logging.info("Remaining length of maintain queue: {}".format(str(len(maintain_queue))))
requests.post(server.broker[worker['id']]['location'] + '/AUGWOP/task', json=new_task)
except:
try:
new_task = maintain_queue.pop(0)
logging.info("Worker {}'s user queue is empty but the maintain queue is not, preparing to send it the next 'maintain' task: {}".format(worker['id'], new_task['given']['git_url']))
logging.info("Remaining length of maintain queue: {}".format(str(len(maintain_queue))))
requests.post(server.broker[worker['id']]['location'] + '/AUGWOP/task', json=new_task)
except:
logging.info("Both queues are empty for worker {}".format(worker['id']))
server.broker[worker['id']]['status'] = 'Idle'
logging.info("Both queues are empty for worker {}".format(worker['id']))
server.broker[worker['id']]['status'] = 'Idle'

return Response(response=worker['id'],
status=200,
Expand All @@ -108,26 +106,29 @@ def sync_queue():
task = request.json
worker = task['worker_id']
logging.info("Message recieved that worker " + worker + " completed task: " + str(task))
models = server.broker[worker]['models']
givens = server.broker[worker]['given']
user_queue = server.broker[worker]['user_queue']
maintain_queue = server.broker[worker]['maintain_queue']
try:
models = server.broker[worker]['models']
givens = server.broker[worker]['given']
user_queue = server.broker[worker]['user_queue']
maintain_queue = server.broker[worker]['maintain_queue']

if server.broker[worker]['status'] != 'Disconnected':
try:
new_task = user_queue.pop(0)
logging.info("Worker {}'s user queue is not empty, preparing to send it the next 'user' task: {}".format(worker, new_task['given']['git_url']))
logging.info("Remaining length of user queue: {}".format(str(len(user_queue))))
requests.post(server.broker[worker]['location'] + '/AUGWOP/task', json=new_task)
except:
if server.broker[worker]['status'] != 'Disconnected':
try:
new_task = maintain_queue.pop(0)
logging.info("Worker {}'s user queue is empty but the maintain queue is not, preparing to send it the next 'maintain' task: {}".format(worker, new_task['given']['git_url']))
logging.info("Remaining length of maintain queue: {}".format(str(len(maintain_queue))))
new_task = user_queue.pop(0)
logging.info("Worker {}'s user queue is not empty, preparing to send it the next 'user' task: {}".format(worker, new_task['given']['git_url']))
logging.info("Remaining length of user queue: {}".format(str(len(user_queue))))
requests.post(server.broker[worker]['location'] + '/AUGWOP/task', json=new_task)
except:
logging.info("Both queues are empty for worker {}".format(worker))
server.broker[worker]['status'] = 'Idle'
try:
new_task = maintain_queue.pop(0)
logging.info("Worker {}'s user queue is empty but the maintain queue is not, preparing to send it the next 'maintain' task: {}".format(worker, new_task['given']['git_url']))
logging.info("Remaining length of maintain queue: {}".format(str(len(maintain_queue))))
requests.post(server.broker[worker]['location'] + '/AUGWOP/task', json=new_task)
except:
logging.info("Both queues are empty for worker {}".format(worker))
server.broker[worker]['status'] = 'Idle'
except:
logging.info("A past instance of the {} worker finished a previous leftover task.".format(worker))

return Response(response=task,
status=200,
Expand Down
25 changes: 14 additions & 11 deletions workers/augur_worker_github/augur_worker_github/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,20 @@ def main(augur_url, host, port):
#load credentials
credentials = read_config("Database", use_main_config=1)
server = read_config("Server", use_main_config=1)
worker_info = read_config("GitHubWorker", use_main_config=1)

worker_info = read_config("Workers", use_main_config=1)

worker_port = worker_info['port'] if 'port' in worker_info else port

while True:
try:
r = requests.get("http://localhost:{}".format(worker_port) + '/AUGWOP/task')
if r.status == 200:
worker_port += 1
except:
break
# while True:
# try:
# print("trying")
# r = requests.get("http://localhost:{}".format(worker_port) + '/AUGWOP/task', timeout=5)
# print(r.json)
# if r.status == 200:
# worker_port += 1
# except:
# break

config = {
"id": "com.augurlabs.core.github_worker.{}".format(worker_port),
Expand All @@ -70,7 +73,6 @@ def main(augur_url, host, port):
"port": credentials["port"],
"user": credentials["user"],
"database": credentials["database"],
"table": "repo_badging",
"endpoint": "https://bestpractices.coreinfrastructure.org/projects.json",
"display_name": "",
"description": "",
Expand All @@ -79,10 +81,11 @@ def main(augur_url, host, port):
}

#create instance of the worker

print("made it")
app.gh_worker = GitHubWorker(config) # declares the worker that will be running on this server with specified config

print("made it")
create_server(app, None)
print("made it")
logging.info("Starting Flask App with pid: " + str(os.getpid()) + "...")


Expand Down
Loading

0 comments on commit 16f3e16

Please sign in to comment.