diff --git a/augur/cli/run.py b/augur/cli/run.py index 73c86dd9cf..b2c903e336 100644 --- a/augur/cli/run.py +++ b/augur/cli/run.py @@ -16,7 +16,6 @@ import atexit import click import subprocess -# import psutil class AugurGunicornApp(gunicorn.app.base.BaseApplication): """ @@ -62,68 +61,133 @@ def get_process_id(name): app.schedule_updates() master = None - broker_switch = app.read_config('Controller', 'broker', 'AUGUR_BROKER', 0) - housekeeper_switch = app.read_config('Controller', 'housekeeper', 'AUGUR_HOUSEKEEPER', 0) - github_worker_switch = app.read_config('Controller', 'github_worker', 'AUGUR_GH_WORKER', 0) + # broker_switch = app.read_config('Controller', 'broker', 'AUGUR_BROKER', 0) + # housekeeper_switch = app.read_config('Controller', 'housekeeper', 'AUGUR_HOUSEKEEPER', 0) + github_worker_switch = app.read_config('Workers', 'github_worker', 'AUGUR_GH_WORKER', {"switch": 0})['switch'] + repo_info_worker_switch = app.read_config('Workers', 'repo_info_worker', 'AUGUR_INFO_WORKER', {"switch": 0})['switch'] + insight_worker_switch = app.read_config('Workers', 'insight_worker', 'AUGUR_INSIGHT_WORKER', {"switch": 0})['switch'] controller = { - 'broker': broker_switch, - 'housekeeper': housekeeper_switch, + # 'broker': broker_switch, + # 'housekeeper': housekeeper_switch, 'github_worker': github_worker_switch, + 'repo_info_worker': repo_info_worker_switch, + 'insight_worker': insight_worker_switch } - - logger.info("Controller specs ('1' for components that are set to automatically boot): {}".format(str(controller))) - + # logger.info("Worker specs ('1' for components that are set to automatically boot): {}".format(str(controller))) manager = None broker = None housekeeper = None - if controller['broker'] == 1: - logger.info("Booting broker and its manager") - manager = mp.Manager() - broker = manager.dict() + # if controller['broker'] == 1: + logger.info("Booting broker and its manager") + manager = mp.Manager() + broker = manager.dict() # broker['worker_pids'] = manager.list() - if controller['housekeeper'] == 1: - logger.info("Booting housekeeper") - jobs = app.read_config('Housekeeper', 'jobs', 'AUGUR_JOBS', []) - housekeeper = Housekeeper( - jobs, - broker, - broker_port=app.read_config('Server', 'port', 'AUGUR_PORT', '5000'), - user=app.read_config('Database', 'user', 'AUGUR_DB_USER', 'root'), - password=app.read_config('Database', 'password', 'AUGUR_DB_PASS', 'password'), - host=app.read_config('Database', 'host', 'AUGUR_DB_HOST', '127.0.0.1'), - port=app.read_config('Database', 'port', 'AUGUR_DB_PORT', '3306'), - dbname=app.read_config('Database', 'database', 'AUGUR_DB_NAME', 'msr14') - ) - - if controller['github_worker'] == 1: + # if controller['housekeeper'] == 1: + logger.info("Booting housekeeper") + jobs = app.read_config('Housekeeper', 'jobs', 'AUGUR_JOBS', []) + housekeeper = Housekeeper( + jobs, + broker, + broker_port=app.read_config('Server', 'port', 'AUGUR_PORT', '5000'), + user=app.read_config('Database', 'user', 'AUGUR_DB_USER', 'root'), + password=app.read_config('Database', 'password', 'AUGUR_DB_PASS', 'password'), + host=app.read_config('Database', 'host', 'AUGUR_DB_HOST', '127.0.0.1'), + port=app.read_config('Database', 'port', 'AUGUR_DB_PORT', '3306'), + dbname=app.read_config('Database', 'database', 'AUGUR_DB_NAME', 'msr14') + ) + + worker_pids = [] + process = subprocess.Popen(['ps', '-a'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, notused = process.communicate() + # data = [(int(p), c) for p, c in [x.rstrip('\n').split(' ', 1) \ + # for x in os.popen('ps h -eo pid:1,command')]] + # for p in psutil.process_iter(): + # logger.info(p) + # if 'nginx' in p.name() or 'nginx' in ' '.join(p.cmdline()): + # p.terminate() + # p.wait() + try: + for line in stdout.splitlines(): + # for line in data: + logger.info(line) + pid, cmdline = line.split(' ', 1) + logger.info("HERE {}".format(type(cmdline))) + if 'github_worker' in cmdline and github_worker_switch == 1: + # logger.info(cmdline) + logger.info("Killing: {}".format(line)) + if 'repo_info_worker' in cmdline and repo_info_worker_switch == 1: + # logger.info(cmdline) + logger.info("Killing: {}".format(line)) + if 'insight_worker' in cmdline and insight_worker_switch == 1: + # logger.info(cmdline) + logger.info("Killing: {}".format(line)) + except: + pass + + if github_worker_switch == 1: gh_pids = get_process_id("/bin/sh -c cd workers/augur_worker_github && github_worker") - logger.info(gh_pids) + worker_pids += gh_pids if len(gh_pids) > 0: + worker_pids.append(gh_pids[0] + 1) gh_pids.append(gh_pids[0] + 1) + logger.info("Found github worker pids: {}".format(gh_pids)) for pid in gh_pids: try: os.kill(pid, 9) except: logger.info("Worker process {} already killed".format(pid)) - # proc_iter = psutil.process_iter(attrs=["pid", "name", "cmdline"]) - # past_worker = any("/bin/sh -c cd workers/augur_worker_github && github_worker" in p.info["cmdline"] for p in proc_iter) - # logger.info(str(past_worker)) - logger.info("Booting github worker") - logger.info("abt to call method") - up = mp.Process(target=worker_start, args=(), daemon=True) - up.start() + github_worker = mp.Process(target=github_worker_start, args=(), daemon=True) + github_worker.start() + time.sleep(2.5) + + if controller['repo_info_worker'] == 1: + ri_pids = get_process_id("/bin/sh -c cd workers/gh_repo_info_worker && repo_info_worker") + worker_pids += ri_pids + if len(ri_pids) > 0: + worker_pids.append(ri_pids[0] + 1) + ri_pids.append(ri_pids[0] + 1) + logger.info("Found repo info worker pids: {}".format(ri_pids)) + for pid in ri_pids: + try: + os.kill(pid, 9) + except: + logger.info("Worker process {} already killed".format(pid)) + + logger.info("Booting repo_info worker") + repo_info_worker = mp.Process(target=repo_info_worker_start, args=(), daemon=True) + repo_info_worker.start() + time.sleep(2.5) + + if controller['insight_worker'] == 1: + insight_pids = get_process_id("/bin/sh -c cd workers/insight_worker && insight_worker") + worker_pids += insight_pids + if len(insight_pids) > 0: + worker_pids.append(insight_pids[0] + 1) + insight_pids.append(insight_pids[0] + 1) + logger.info("Found repo info worker pids: {}".format(insight_pids)) + for pid in insight_pids: + try: + os.kill(pid, 9) + except: + logger.info("Worker process {} already killed".format(pid)) + + logger.info("Booting insight worker") + insight_worker = mp.Process(target=insight_worker_start, args=(), daemon=True) + insight_worker.start() + time.sleep(2.5) + @atexit.register def exit(): - # try: - # for pid in broker['worker_pids']: - # os.kill(pid, 9) - # except: - # logger.info("Worker process {} already killed".format(pid)) + try: + for pid in worker_pids: + os.kill(pid, 9) + except: + logger.info("Worker process {} already killed".format(pid)) if master is not None: master.halt() logger.info("Shutting down app updates...") @@ -134,17 +198,15 @@ def exit(): if housekeeper is not None: housekeeper.shutdown_updates() - if controller['github_worker'] == 1: - for pid in gh_pids: - try: - os.kill(pid, 9) - logger.info("killed worker process {}".format(pid)) - except: - logger.info("Worker process {} already killed".format(pid)) - # logger.info("You had the github_worker startup enabled, please allow ~30 seconds for its processes to shutdown before running 'make dev' again") + if github_worker_switch == 1: logger.info("Shutting down github worker...") - - up.terminate() + github_worker.terminate() + if repo_info_worker_switch == 1: + logger.info("Shutting down github worker...") + repo_info_worker.terminate() + if insight_worker_switch == 1: + logger.info("Shutting down github worker...") + insight_worker.terminate() # if hasattr(manager, "shutdown"): # wait for the spawner and the worker threads to go down # @@ -173,6 +235,14 @@ def exit(): logger.info('Starting server...') master = Arbiter(AugurGunicornApp(options, manager=manager, broker=broker, housekeeper=housekeeper)).run() -def worker_start(): +def github_worker_start(): logger.info("Booting github worker") process = subprocess.Popen("cd workers/augur_worker_github && github_worker", shell=True) + +def repo_info_worker_start(): + logger.info("Booting repo_info worker") + process = subprocess.Popen("cd workers/gh_repo_info_worker && repo_info_worker", shell=True) + +def insight_worker_start(): + logger.info("Booting insight worker") + process = subprocess.Popen("cd workers/insight_worker && insight_worker", shell=True) diff --git a/augur/routes/broker.py b/augur/routes/broker.py index c52f59ec89..4ca3904864 100644 --- a/augur/routes/broker.py +++ b/augur/routes/broker.py @@ -82,22 +82,20 @@ def worker(): givens = server.broker[worker['id']]['given'] user_queue = server.broker[worker['id']]['user_queue'] maintain_queue = server.broker[worker['id']]['maintain_queue'] - - if server.broker[worker]['status'] != 'Disconnected': + try: + new_task = user_queue.pop(0) + logging.info("Worker {}'s user queue is not empty, preparing to send it the next 'user' task: {}".format(worker['id'], new_task['given']['git_url'])) + logging.info("Remaining length of user queue: {}".format(str(len(user_queue)))) + requests.post(server.broker[worker['id']]['location'] + '/AUGWOP/task', json=new_task) + except: try: - new_task = user_queue.pop(0) - logging.info("Worker {}'s user queue is not empty, preparing to send it the next 'user' task: {}".format(worker['id'], new_task['given']['git_url'])) - logging.info("Remaining length of user queue: {}".format(str(len(user_queue)))) + new_task = maintain_queue.pop(0) + logging.info("Worker {}'s user queue is empty but the maintain queue is not, preparing to send it the next 'maintain' task: {}".format(worker['id'], new_task['given']['git_url'])) + logging.info("Remaining length of maintain queue: {}".format(str(len(maintain_queue)))) requests.post(server.broker[worker['id']]['location'] + '/AUGWOP/task', json=new_task) except: - try: - new_task = maintain_queue.pop(0) - logging.info("Worker {}'s user queue is empty but the maintain queue is not, preparing to send it the next 'maintain' task: {}".format(worker['id'], new_task['given']['git_url'])) - logging.info("Remaining length of maintain queue: {}".format(str(len(maintain_queue)))) - requests.post(server.broker[worker['id']]['location'] + '/AUGWOP/task', json=new_task) - except: - logging.info("Both queues are empty for worker {}".format(worker['id'])) - server.broker[worker['id']]['status'] = 'Idle' + logging.info("Both queues are empty for worker {}".format(worker['id'])) + server.broker[worker['id']]['status'] = 'Idle' return Response(response=worker['id'], status=200, @@ -108,26 +106,29 @@ def sync_queue(): task = request.json worker = task['worker_id'] logging.info("Message recieved that worker " + worker + " completed task: " + str(task)) - models = server.broker[worker]['models'] - givens = server.broker[worker]['given'] - user_queue = server.broker[worker]['user_queue'] - maintain_queue = server.broker[worker]['maintain_queue'] + try: + models = server.broker[worker]['models'] + givens = server.broker[worker]['given'] + user_queue = server.broker[worker]['user_queue'] + maintain_queue = server.broker[worker]['maintain_queue'] - if server.broker[worker]['status'] != 'Disconnected': - try: - new_task = user_queue.pop(0) - logging.info("Worker {}'s user queue is not empty, preparing to send it the next 'user' task: {}".format(worker, new_task['given']['git_url'])) - logging.info("Remaining length of user queue: {}".format(str(len(user_queue)))) - requests.post(server.broker[worker]['location'] + '/AUGWOP/task', json=new_task) - except: + if server.broker[worker]['status'] != 'Disconnected': try: - new_task = maintain_queue.pop(0) - logging.info("Worker {}'s user queue is empty but the maintain queue is not, preparing to send it the next 'maintain' task: {}".format(worker, new_task['given']['git_url'])) - logging.info("Remaining length of maintain queue: {}".format(str(len(maintain_queue)))) + new_task = user_queue.pop(0) + logging.info("Worker {}'s user queue is not empty, preparing to send it the next 'user' task: {}".format(worker, new_task['given']['git_url'])) + logging.info("Remaining length of user queue: {}".format(str(len(user_queue)))) requests.post(server.broker[worker]['location'] + '/AUGWOP/task', json=new_task) except: - logging.info("Both queues are empty for worker {}".format(worker)) - server.broker[worker]['status'] = 'Idle' + try: + new_task = maintain_queue.pop(0) + logging.info("Worker {}'s user queue is empty but the maintain queue is not, preparing to send it the next 'maintain' task: {}".format(worker, new_task['given']['git_url'])) + logging.info("Remaining length of maintain queue: {}".format(str(len(maintain_queue)))) + requests.post(server.broker[worker]['location'] + '/AUGWOP/task', json=new_task) + except: + logging.info("Both queues are empty for worker {}".format(worker)) + server.broker[worker]['status'] = 'Idle' + except: + logging.info("A past instance of the {} worker finished a previous leftover task.".format(worker)) return Response(response=task, status=200, diff --git a/workers/augur_worker_github/augur_worker_github/runtime.py b/workers/augur_worker_github/augur_worker_github/runtime.py index 73d3f39aa1..d2bfbf4008 100644 --- a/workers/augur_worker_github/augur_worker_github/runtime.py +++ b/workers/augur_worker_github/augur_worker_github/runtime.py @@ -47,17 +47,20 @@ def main(augur_url, host, port): #load credentials credentials = read_config("Database", use_main_config=1) server = read_config("Server", use_main_config=1) - worker_info = read_config("GitHubWorker", use_main_config=1) + + worker_info = read_config("Workers", use_main_config=1) worker_port = worker_info['port'] if 'port' in worker_info else port - while True: - try: - r = requests.get("http://localhost:{}".format(worker_port) + '/AUGWOP/task') - if r.status == 200: - worker_port += 1 - except: - break + # while True: + # try: + # print("trying") + # r = requests.get("http://localhost:{}".format(worker_port) + '/AUGWOP/task', timeout=5) + # print(r.json) + # if r.status == 200: + # worker_port += 1 + # except: + # break config = { "id": "com.augurlabs.core.github_worker.{}".format(worker_port), @@ -70,7 +73,6 @@ def main(augur_url, host, port): "port": credentials["port"], "user": credentials["user"], "database": credentials["database"], - "table": "repo_badging", "endpoint": "https://bestpractices.coreinfrastructure.org/projects.json", "display_name": "", "description": "", @@ -79,10 +81,11 @@ def main(augur_url, host, port): } #create instance of the worker - + print("made it") app.gh_worker = GitHubWorker(config) # declares the worker that will be running on this server with specified config - + print("made it") create_server(app, None) + print("made it") logging.info("Starting Flask App with pid: " + str(os.getpid()) + "...") diff --git a/workers/augur_worker_github/augur_worker_github/worker.py b/workers/augur_worker_github/augur_worker_github/worker.py index e1899a535d..c9edf58314 100644 --- a/workers/augur_worker_github/augur_worker_github/worker.py +++ b/workers/augur_worker_github/augur_worker_github/worker.py @@ -115,47 +115,6 @@ def __init__(self, config, task=None): self.history_table = HelperBase.classes.gh_worker_history.__table__ self.job_table = HelperBase.classes.gh_worker_job.__table__ - - # # Query all repos and last repo id - # repoUrlSQL = s.sql.text(""" - # SELECT git_url, repo_id FROM repo ORDER BY repo_id ASC - # """) - - # rs = pd.read_sql(repoUrlSQL, self.db, params={}) - - # repoIdSQL = s.sql.text(""" - # SELECT since_id_str FROM gh_worker_job - # """) - - # job_df = pd.read_sql(repoIdSQL, self.helper_db, params={}) - - # last_id = int(job_df.iloc[0]['since_id_str']) - - # jobHistorySQL = s.sql.text(""" - # SELECT max(history_id) AS history_id, status FROM gh_worker_history - # GROUP BY status - # LIMIT 1 - # """) - - # history_df = pd.read_sql(jobHistorySQL, self.helper_db, params={}) - - # if history_df.iloc[0]['status'] == 'Stopped': - # self.history_id = int(history_df.iloc[0]['history_id']) - # self.finishing_task = True - # else: - # last_id += 1 - - # # Rearrange repos so the one after the last one that - # # was completed will be ran first - # before_repos = rs.loc[rs['repo_id'].astype(int) < last_id] - # after_repos = rs.loc[rs['repo_id'].astype(int) >= last_id] - - # reorganized_repos = after_repos.append(before_repos) - - # # Populate queue - # for index, row in reorganized_repos.iterrows(): - # self._maintain_queue.put(CollectorTask(message_type='TASK', entry_info=row)) - # Get max ids so we know where we are in our insertion and to have the current id when inserting FK's logging.info("Querying starting ids info...") maxIssueCntrbSQL = s.sql.text(""" @@ -181,8 +140,6 @@ def __init__(self, config, task=None): self.cntrb_id_inc = (cntrb_start + 1) self.msg_id_inc = (msg_start + 1) - # self.run() - requests.post('http://localhost:{}/api/unstable/workers'.format( self.config['broker_port']), json=specs) #hello message @@ -246,8 +203,8 @@ def run(self): if self._child is None: self._child = Process(target=self.collect, args=()) self._child.start() - requests.post("http://localhost:{}/api/unstable/add_pids".format( - self.config['broker_port']), json={'pids': [self._child.pid, os.getpid()]}) + # requests.post("http://localhost:{}/api/unstable/add_pids".format( + # self.config['broker_port']), json={'pids': [self._child.pid, os.getpid()]}) def collect(self): """ Function to process each entry in the worker's task queue @@ -551,7 +508,9 @@ def query_issues(self, entry_info): logging.info("No more pages with unknown issues, breaking from pagination.\n") break elif len(new_issues) != 0: - to_add = [obj for obj in new_issues if obj not in issues] + # to_add = [obj for obj in new_issues if obj not in issues] + # issues += to_add + to_add = [obj for obj in j if obj not in issues] issues += to_add i = i + 1 if self.finishing_task else i - 1 diff --git a/workers/gh_repo_info_worker/gh_repo_info_worker/worker.py b/workers/gh_repo_info_worker/gh_repo_info_worker/worker.py index 63dc0d5827..c273bf0bd3 100644 --- a/workers/gh_repo_info_worker/gh_repo_info_worker/worker.py +++ b/workers/gh_repo_info_worker/gh_repo_info_worker/worker.py @@ -340,6 +340,9 @@ def register_task_completion(self, repo_id, git_url): logging.info("Telling broker we completed task: " + str(task_completed) + "\n" + "This task inserted: " + str(self.results_counter) + " tuples.\n\n") - requests.post('http://localhost:{}/api/unstable/completed_task'.format( - self.config['broker_port']), json=task_completed) + try: + requests.post('http://localhost:{}/api/unstable/completed_task'.format( + self.config['broker_port']), json=task_completed) + except requests.exceptions.ConnectionError: + logging.info("Broker is booting and cannot accept the worker's message currently") self.results_counter = 0 diff --git a/workers/insight_worker/dist/insight_worker-0.1.0-py3.6.egg b/workers/insight_worker/dist/insight_worker-0.1.0-py3.6.egg index 14bba1ff11..e49a227d8e 100644 Binary files a/workers/insight_worker/dist/insight_worker-0.1.0-py3.6.egg and b/workers/insight_worker/dist/insight_worker-0.1.0-py3.6.egg differ diff --git a/workers/insight_worker/insight_worker/runtime.py b/workers/insight_worker/insight_worker/runtime.py index ac40785ec8..4272866c4b 100644 --- a/workers/insight_worker/insight_worker/runtime.py +++ b/workers/insight_worker/insight_worker/runtime.py @@ -40,7 +40,7 @@ def augwop_config(): @click.command() @click.option('--augur-url', default='http://localhost:5000/', help='Augur URL') @click.option('--host', default='localhost', help='Host') -@click.option('--port', default=51242, help='Port') +@click.option('--port', default=51252, help='Port') def main(augur_url, host, port): """ Declares singular worker and creates the server and flask app that it will be running on """ @@ -56,7 +56,7 @@ def main(augur_url, host, port): "password": credentials["password"], "port": credentials["port"], "user": credentials["user"], - "endpoint": "http://localhost:5000/api/unstable/metrics/status", + "endpoint": "http://localhost:{}/api/unstable/metrics/status".format(server['port']), "database": credentials["database"], "table": "insights", "display_name": "", diff --git a/workers/insight_worker/insight_worker/worker.py b/workers/insight_worker/insight_worker/worker.py index 28a9a68ca5..081ced1f4e 100644 --- a/workers/insight_worker/insight_worker/worker.py +++ b/workers/insight_worker/insight_worker/worker.py @@ -47,12 +47,12 @@ def __init__(self, config, task=None): logging.info("Worker initializing...\n") specs = { - "id": "com.augurlabs.core.badge_worker", - "location": "http://localhost:51235", + "id": "com.augurlabs.core.insight_worker", + "location": "http://localhost:51252", "qualifications": [ { "given": [["git_url"]], - "models":["badges"] + "models":["insights"] } ], "config": [self.config] diff --git a/workers/insight_worker/setup.py b/workers/insight_worker/setup.py index 0abfe7dd45..ef2b441c4a 100644 --- a/workers/insight_worker/setup.py +++ b/workers/insight_worker/setup.py @@ -31,7 +31,7 @@ def read(filename): entry_points={ 'console_scripts': [ - 'worker_start=insight_worker.runtime:main', + 'insight_worker=insight_worker.runtime:main', ], },