Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes & minor changes to Repo Info Worker & PR Worker #337

Merged
merged 2 commits into from
Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions workers/pull_request_worker/pull_request_worker/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,9 @@ def main(augur_url, host, port):
#load credentials
credentials = read_config("Database", use_main_config=1)
server = read_config("Server", use_main_config=1)
# worker_info = read_config("PullRequestWorker", use_main_config=1)
worker_info = read_config("Workers", use_main_config=1)['pull_request_worker']

# worker_port = worker_info['port'] if 'port' in worker_info else port
worker_port = port
worker_port = worker_info['port'] if 'port' in worker_info else port

while True:
try:
Expand Down
8 changes: 5 additions & 3 deletions workers/pull_request_worker/pull_request_worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,11 @@ def collect(self):
raise ValueError(f'{message.type} is not a recognized task type')

if message.type == 'TASK':
# try:
git_url = message.entry_info['task']['given']['git_url']
self.query_pr({'git_url': git_url, 'repo_id': message.entry_info['repo_id']})
try:
git_url = message.entry_info['task']['given']['git_url']
self.query_pr({'git_url': git_url, 'repo_id': message.entry_info['repo_id']})
except Exception:
logging.exception('Worker ran into an unknown error')
# except Exception as e:
# logging.error("Worker ran into an error for task: {}\n".format(message.entry_info['task']))
# logging.error("Error encountered: " + str(e) + "\n")
Expand Down
2 changes: 2 additions & 0 deletions workers/repo_info_worker/repo_info_worker/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def main(augur_url, host, port):
#load credentials
credentials = read_config("Database", use_main_config=1)
server = read_config("Server", use_main_config=1)
worker_info = read_config("Workers", use_main_config=1)['repo_info_worker']

worker_port = worker_info['port'] if 'port' in worker_info else port

config = {
Expand Down
97 changes: 67 additions & 30 deletions workers/repo_info_worker/repo_info_worker/worker.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import logging
import os
import sys
import time
from datetime import datetime
from multiprocessing import Process, Queue
from urllib.parse import urlparse
import requests

import pandas as pd
import requests
import sqlalchemy as s
from sqlalchemy.ext.automap import automap_base
from sqlalchemy import MetaData
import logging
import time
from datetime import datetime
import os
import sys
from sqlalchemy.ext.automap import automap_base

logging.basicConfig(filename='worker.log', level=logging.INFO, filemode='w')

LOG_FORMAT = '%(levelname)s:[%(name)s]: %(message)s'
logging.basicConfig(filename='worker.log', level=logging.INFO, filemode='w', format=LOG_FORMAT)
logger = logging.getLogger('RepoInfoWorker')


class CollectorTask:
Expand Down Expand Up @@ -73,7 +77,7 @@ def __init__(self, config, task=None):
self.config['user'], self.config['password'], self.config['host'], self.config['port'], self.config['database']
)

logging.info("Making database connections...")
logger.info("Making database connections...")

dbschema = 'augur_data'
self.db = s.create_engine(self.DB_STR, poolclass = s.pool.NullPool,
Expand All @@ -95,7 +99,7 @@ def __init__(self, config, task=None):

self.repo_info_table = Base.classes.repo_info.__table__

logging.info('Getting max repo_info_id...')
logger.info('Getting max repo_info_id...')
max_repo_info_id_sql = s.sql.text("""
SELECT MAX(repo_info_id) AS repo_info_id
FROM repo_info
Expand All @@ -113,7 +117,7 @@ def __init__(self, config, task=None):
requests.post('http://localhost:{}/api/unstable/workers'.format(
self.config['broker_port']), json=specs)
except requests.exceptions.ConnectionError:
logging.error('Cannot connect to the broker')
logger.error('Cannot connect to the broker. Quitting...')
sys.exit('Cannot connect to the broker! Quitting...')

@property
Expand Down Expand Up @@ -153,7 +157,7 @@ def task(self, value):
self.finishing_task = True

except Exception as e:
logging.error("Error: {}, or that repo is not in our database: {}".format(str(e), str(value)))
logger.error(f"Error: {e}, or that repo is not in our database: {value}")

self._task = CollectorTask('TASK', {"git_url": git_url, "repo_id": repo_id})
self.run()
Expand All @@ -163,7 +167,7 @@ def cancel(self):
self._task = None

def run(self):
logging.info("Running...")
logger.info("Running...")
if self._child is None:
self._child = Process(target=self.collect, args=())
self._child.start()
Expand All @@ -174,13 +178,12 @@ def collect(self, repos=None):

while True:
time.sleep(4.5)
logging.info("running")
if not self._queue.empty():
message = self._queue.get()
self.working_on = 'UPDATE'
elif not self._maintain_queue.empty():
message = self._maintain_queue.get()
logging.info("Popped off message: {}".format(str(message.entry_info)))
logger.info(f"Popped off message: {message.entry_info}")
self.working_on = "MAINTAIN"
else:
break
Expand All @@ -192,8 +195,13 @@ def collect(self, repos=None):
raise ValueError(f'{message.type} is not a recognized task type')

if message.type == 'TASK':
self.query_repo_info(message.entry_info['repo_id'],
message.entry_info['git_url'])
try:
self.query_repo_info(message.entry_info['repo_id'],
message.entry_info['git_url'])
except Exception:
logger.exception(f'Worker ran into an error for task {message.entry_info}')
self.register_task_failure(message.entry_info['repo_id'],
message.entry_info['git_url'])


# if repos == None:
Expand Down Expand Up @@ -260,16 +268,25 @@ def query_repo_info(self, repo_id, git_url):
}
""" % (owner, repo)

logging.info(f'Hitting endpoint {url}')
logger.info(f'Hitting endpoint {url}')
try:
# r = requests.get(url, headers=self.headers)
r = requests.post(url, json={'query': query}, headers=self.headers)
self.update_rate_limit(r)
j = r.json()['data']['repository']

j = r.json()
if 'errors' in j:
logger.error(f"[GitHub API]: {j['errors'][0]['type']}: {j['errors'][0]['message']}")
self.register_task_failure(repo_id, git_url)
return

j = j['data']['repository']
except requests.exceptions.ConnectionError:
logger.error('Could not connect to api.github.com')
except Exception as e:
logging.error('Caught Exception: ' + str(e))
logger.error(f'Caught Exception: {e}')

logging.info(f'Inserting repo info for repo with id:{repo_id}, owner:{owner}, name:{repo}')
logger.info(f'Inserting repo info for repo with id:{repo_id}, owner:{owner}, name:{repo}')

rep_inf = {
'repo_info_id': self.info_id_inc,
Expand Down Expand Up @@ -303,10 +320,10 @@ def query_repo_info(self, repo_id, git_url):
}

result = self.db.execute(self.repo_info_table.insert().values(rep_inf))
logging.info(f"Primary Key inserted into repo_info table: {result.inserted_primary_key}")
logger.info(f"Primary Key inserted into repo_info table: {result.inserted_primary_key}")
self.results_counter += 1

logging.info(f"Inserted info for {owner}/{repo}")
logger.info(f"Inserted info for {owner}/{repo}")

self.info_id_inc += 1

Expand All @@ -317,15 +334,15 @@ def query_repo_info(self, repo_id, git_url):
def update_rate_limit(self, response):
try:
self.rate_limit = int(response.headers['X-RateLimit-Remaining'])
logging.info("Recieved rate limit from headers\n")
logger.info("[Rate Limit]: Recieved rate limit from headers")
except:
self.rate_limit -= 1
logging.info("Headers did not work, had to decrement\n")
logging.info("Updated rate limit, you have: " + str(self.rate_limit) + " requests remaining.\n")
logger.info("[Rate Limit]: Headers did not work, had to decrement")
logger.info(f"[Rate Limit]: Updated rate limit, you have: {self.rate_limit} requests remaining")
if self.rate_limit <= 0:
reset_time = response.headers['X-RateLimit-Reset']
time_diff = datetime.datetime.fromtimestamp(int(reset_time)) - datetime.datetime.now()
logging.info("Rate limit exceeded, waiting " + str(time_diff.total_seconds()) + " seconds.\n")
logger.info(f"[Rate Limit]: Rate limit exceeded, waiting {time_diff.total_seconds()} seconds")
time.sleep(time_diff.total_seconds())
self.rate_limit = int(response.headers['X-RateLimit-Limit'])

Expand All @@ -337,12 +354,32 @@ def register_task_completion(self, repo_id, git_url):
'git_url': git_url
}

logging.info("Telling broker we completed task: " + str(task_completed) + "\n" +
"This task inserted: " + str(self.results_counter) + " tuples.\n\n")
logger.info(f"Telling broker we completed task: {task_completed}")
logger.info(f"This task inserted {self.results_counter} tuples\n")

try:
requests.post('http://localhost:{}/api/unstable/completed_task'.format(
self.config['broker_port']), json=task_completed)
except requests.exceptions.ConnectionError:
logging.info("Broker is booting and cannot accept the worker's message currently")
logger.info("Broker is booting and cannot accept the worker's message currently")
self.results_counter = 0

def register_task_failure(self, repo_id, git_url):
task_failed = {
'worker_id': self.config['id'],
'job_type': self.working_on,
'repo_id': repo_id,
'git_url': git_url
}

logger.error('Task failed')
logger.error('Informing broker about Task Failure')
logger.info(f'This task inserted {self.results_counter} tuples\n')

try:
requests.post('http://localhost:{}/api/unstable/task_error'.format(
self.config['broker_port']), json=task_failed)
except requests.exceptions.ConnectionError:
logger.error('Could not send task failure message to the broker')
except Exception:
logger.exception('An error occured while informing broker about task failure')