From 29e770547698cf42edf8c5c73bc0f3ea188f32c7 Mon Sep 17 00:00:00 2001 From: Parth Sharma Date: Wed, 31 Jul 2019 18:52:35 +0530 Subject: [PATCH 1/2] error handling & improved logging in repo_info_worker Signed-off-by: Parth Sharma --- .../repo_info_worker/runtime.py | 2 + .../repo_info_worker/worker.py | 97 +++++++++++++------ 2 files changed, 69 insertions(+), 30 deletions(-) diff --git a/workers/repo_info_worker/repo_info_worker/runtime.py b/workers/repo_info_worker/repo_info_worker/runtime.py index 2a094dc7ab..3431e619a0 100644 --- a/workers/repo_info_worker/repo_info_worker/runtime.py +++ b/workers/repo_info_worker/repo_info_worker/runtime.py @@ -47,6 +47,8 @@ def main(augur_url, host, port): #load credentials credentials = read_config("Database", use_main_config=1) server = read_config("Server", use_main_config=1) + worker_info = read_config("Workers", use_main_config=1)['repo_info_worker'] + worker_port = worker_info['port'] if 'port' in worker_info else port config = { diff --git a/workers/repo_info_worker/repo_info_worker/worker.py b/workers/repo_info_worker/repo_info_worker/worker.py index 2c1a1a6e08..5e32bb2d04 100644 --- a/workers/repo_info_worker/repo_info_worker/worker.py +++ b/workers/repo_info_worker/repo_info_worker/worker.py @@ -1,17 +1,21 @@ +import logging +import os +import sys +import time +from datetime import datetime from multiprocessing import Process, Queue from urllib.parse import urlparse -import requests + import pandas as pd +import requests import sqlalchemy as s -from sqlalchemy.ext.automap import automap_base from sqlalchemy import MetaData -import logging -import time -from datetime import datetime -import os -import sys +from sqlalchemy.ext.automap import automap_base -logging.basicConfig(filename='worker.log', level=logging.INFO, filemode='w') + +LOG_FORMAT = '%(levelname)s:[%(name)s]: %(message)s' +logging.basicConfig(filename='worker.log', level=logging.INFO, filemode='w', format=LOG_FORMAT) +logger = logging.getLogger('RepoInfoWorker') class CollectorTask: @@ -73,7 +77,7 @@ def __init__(self, config, task=None): self.config['user'], self.config['password'], self.config['host'], self.config['port'], self.config['database'] ) - logging.info("Making database connections...") + logger.info("Making database connections...") dbschema = 'augur_data' self.db = s.create_engine(self.DB_STR, poolclass = s.pool.NullPool, @@ -95,7 +99,7 @@ def __init__(self, config, task=None): self.repo_info_table = Base.classes.repo_info.__table__ - logging.info('Getting max repo_info_id...') + logger.info('Getting max repo_info_id...') max_repo_info_id_sql = s.sql.text(""" SELECT MAX(repo_info_id) AS repo_info_id FROM repo_info @@ -113,7 +117,7 @@ def __init__(self, config, task=None): requests.post('http://localhost:{}/api/unstable/workers'.format( self.config['broker_port']), json=specs) except requests.exceptions.ConnectionError: - logging.error('Cannot connect to the broker') + logger.error('Cannot connect to the broker. Quitting...') sys.exit('Cannot connect to the broker! Quitting...') @property @@ -153,7 +157,7 @@ def task(self, value): self.finishing_task = True except Exception as e: - logging.error("Error: {}, or that repo is not in our database: {}".format(str(e), str(value))) + logger.error(f"Error: {e}, or that repo is not in our database: {value}") self._task = CollectorTask('TASK', {"git_url": git_url, "repo_id": repo_id}) self.run() @@ -163,7 +167,7 @@ def cancel(self): self._task = None def run(self): - logging.info("Running...") + logger.info("Running...") if self._child is None: self._child = Process(target=self.collect, args=()) self._child.start() @@ -174,13 +178,12 @@ def collect(self, repos=None): while True: time.sleep(4.5) - logging.info("running") if not self._queue.empty(): message = self._queue.get() self.working_on = 'UPDATE' elif not self._maintain_queue.empty(): message = self._maintain_queue.get() - logging.info("Popped off message: {}".format(str(message.entry_info))) + logger.info(f"Popped off message: {message.entry_info}") self.working_on = "MAINTAIN" else: break @@ -192,8 +195,13 @@ def collect(self, repos=None): raise ValueError(f'{message.type} is not a recognized task type') if message.type == 'TASK': - self.query_repo_info(message.entry_info['repo_id'], - message.entry_info['git_url']) + try: + self.query_repo_info(message.entry_info['repo_id'], + message.entry_info['git_url']) + except Exception: + logger.exception(f'Worker ran into an error for task {message.entry_info}') + self.register_task_failure(message.entry_info['repo_id'], + message.entry_info['git_url']) # if repos == None: @@ -260,16 +268,25 @@ def query_repo_info(self, repo_id, git_url): } """ % (owner, repo) - logging.info(f'Hitting endpoint {url}') + logger.info(f'Hitting endpoint {url}') try: # r = requests.get(url, headers=self.headers) r = requests.post(url, json={'query': query}, headers=self.headers) self.update_rate_limit(r) - j = r.json()['data']['repository'] + + j = r.json() + if 'errors' in j: + logger.error(f"[GitHub API]: {j['errors'][0]['type']}: {j['errors'][0]['message']}") + self.register_task_failure(repo_id, git_url) + return + + j = j['data']['repository'] + except requests.exceptions.ConnectionError: + logger.error('Could not connect to api.github.com') except Exception as e: - logging.error('Caught Exception: ' + str(e)) + logger.error(f'Caught Exception: {e}') - logging.info(f'Inserting repo info for repo with id:{repo_id}, owner:{owner}, name:{repo}') + logger.info(f'Inserting repo info for repo with id:{repo_id}, owner:{owner}, name:{repo}') rep_inf = { 'repo_info_id': self.info_id_inc, @@ -303,10 +320,10 @@ def query_repo_info(self, repo_id, git_url): } result = self.db.execute(self.repo_info_table.insert().values(rep_inf)) - logging.info(f"Primary Key inserted into repo_info table: {result.inserted_primary_key}") + logger.info(f"Primary Key inserted into repo_info table: {result.inserted_primary_key}") self.results_counter += 1 - logging.info(f"Inserted info for {owner}/{repo}") + logger.info(f"Inserted info for {owner}/{repo}") self.info_id_inc += 1 @@ -317,15 +334,15 @@ def query_repo_info(self, repo_id, git_url): def update_rate_limit(self, response): try: self.rate_limit = int(response.headers['X-RateLimit-Remaining']) - logging.info("Recieved rate limit from headers\n") + logger.info("[Rate Limit]: Recieved rate limit from headers") except: self.rate_limit -= 1 - logging.info("Headers did not work, had to decrement\n") - logging.info("Updated rate limit, you have: " + str(self.rate_limit) + " requests remaining.\n") + logger.info("[Rate Limit]: Headers did not work, had to decrement") + logger.info(f"[Rate Limit]: Updated rate limit, you have: {self.rate_limit} requests remaining") if self.rate_limit <= 0: reset_time = response.headers['X-RateLimit-Reset'] time_diff = datetime.datetime.fromtimestamp(int(reset_time)) - datetime.datetime.now() - logging.info("Rate limit exceeded, waiting " + str(time_diff.total_seconds()) + " seconds.\n") + logger.info(f"[Rate Limit]: Rate limit exceeded, waiting {time_diff.total_seconds()} seconds") time.sleep(time_diff.total_seconds()) self.rate_limit = int(response.headers['X-RateLimit-Limit']) @@ -337,12 +354,32 @@ def register_task_completion(self, repo_id, git_url): 'git_url': git_url } - logging.info("Telling broker we completed task: " + str(task_completed) + "\n" + - "This task inserted: " + str(self.results_counter) + " tuples.\n\n") + logger.info(f"Telling broker we completed task: {task_completed}") + logger.info(f"This task inserted {self.results_counter} tuples\n") try: requests.post('http://localhost:{}/api/unstable/completed_task'.format( self.config['broker_port']), json=task_completed) except requests.exceptions.ConnectionError: - logging.info("Broker is booting and cannot accept the worker's message currently") + logger.info("Broker is booting and cannot accept the worker's message currently") self.results_counter = 0 + + def register_task_failure(self, repo_id, git_url): + task_failed = { + 'worker_id': self.config['id'], + 'job_type': self.working_on, + 'repo_id': repo_id, + 'git_url': git_url + } + + logger.error('Task failed') + logger.error('Informing broker about Task Failure') + logger.info(f'This task inserted {self.results_counter} tuples\n') + + try: + requests.post('http://localhost:{}/api/unstable/task_error'.format( + self.config['broker_port']), json=task_failed) + except requests.exceptions.ConnectionError: + logger.error('Could not send task failure message to the broker') + except Exception: + logger.exception('An error occured while informing broker about task failure') From 9cfe10ceb359f9c522cb99384d6d5125218c5464 Mon Sep 17 00:00:00 2001 From: Parth Sharma Date: Wed, 31 Jul 2019 20:08:55 +0530 Subject: [PATCH 2/2] add error handling and port configuration to pull_request_worker Signed-off-by: Parth Sharma --- .../pull_request_worker/pull_request_worker/runtime.py | 5 ++--- workers/pull_request_worker/pull_request_worker/worker.py | 8 +++++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/workers/pull_request_worker/pull_request_worker/runtime.py b/workers/pull_request_worker/pull_request_worker/runtime.py index ca36c1622f..765e55c5e7 100644 --- a/workers/pull_request_worker/pull_request_worker/runtime.py +++ b/workers/pull_request_worker/pull_request_worker/runtime.py @@ -56,10 +56,9 @@ def main(augur_url, host, port): #load credentials credentials = read_config("Database", use_main_config=1) server = read_config("Server", use_main_config=1) - # worker_info = read_config("PullRequestWorker", use_main_config=1) + worker_info = read_config("Workers", use_main_config=1)['pull_request_worker'] - # worker_port = worker_info['port'] if 'port' in worker_info else port - worker_port = port + worker_port = worker_info['port'] if 'port' in worker_info else port while True: try: diff --git a/workers/pull_request_worker/pull_request_worker/worker.py b/workers/pull_request_worker/pull_request_worker/worker.py index d551260180..41a7714ee7 100644 --- a/workers/pull_request_worker/pull_request_worker/worker.py +++ b/workers/pull_request_worker/pull_request_worker/worker.py @@ -288,9 +288,11 @@ def collect(self): raise ValueError(f'{message.type} is not a recognized task type') if message.type == 'TASK': - # try: - git_url = message.entry_info['task']['given']['git_url'] - self.query_pr({'git_url': git_url, 'repo_id': message.entry_info['repo_id']}) + try: + git_url = message.entry_info['task']['given']['git_url'] + self.query_pr({'git_url': git_url, 'repo_id': message.entry_info['repo_id']}) + except Exception: + logging.exception('Worker ran into an unknown error') # except Exception as e: # logging.error("Worker ran into an error for task: {}\n".format(message.entry_info['task'])) # logging.error("Error encountered: " + str(e) + "\n")