From 94d3e59ff580c3ac289bf2a12c091e968b763aad Mon Sep 17 00:00:00 2001 From: Andrew Brain <61482022+ABrain7710@users.noreply.github.com> Date: Fri, 29 Jul 2022 09:33:15 -0500 Subject: [PATCH] Make tasks throw errors when repo does not exist Signed-off-by: Andrew Brain <61482022+ABrain7710@users.noreply.github.com> --- augur/application/cli/backend.py | 44 +++++------ augur/tasks/github/issue_tasks.py | 35 +++++++- augur/tasks/github/util/github_paginator.py | 88 ++++++++++++++++----- 3 files changed, 120 insertions(+), 47 deletions(-) diff --git a/augur/application/cli/backend.py b/augur/application/cli/backend.py index a56ca80e3d..f058e43c7d 100644 --- a/augur/application/cli/backend.py +++ b/augur/application/cli/backend.py @@ -47,38 +47,38 @@ def start(disable_collection): repos = session.query(Repo).all() - repo_task_list = [start_task.si(repo.repo_git) for repo in repos] + [process_contributors.si(),] + # repo_task_list = [start_task.si(repo.repo_git) for repo in repos] + [process_contributors.si(),] - repos_chain = group(repo_task_list) + # repos_chain = group(repo_task_list) - logger.info(repos_chain) + # logger.info(repos_chain) - celery_process = subprocess.Popen(['celery', '-A', 'augur.tasks.init.celery_app.celery_app', 'worker', '--loglevel=info']) + # celery_process = subprocess.Popen(['celery', '-A', 'augur.tasks.init.celery_app.celery_app', 'worker', '--loglevel=info']) - repos_chain.apply_async() + # repos_chain.apply_async() - # repos_to_collect = [] - # repo_task_list = [] + repos_to_collect = [] + repo_task_list = [] - # logger.info("Repos available for collection") - # print_repos(repos) - # while True: - # try: - # user_input = int(input("Please select a repo to collect: ")) + logger.info("Repos available for collection") + print_repos(repos) + while True: + try: + user_input = int(input("Please select a repo to collect: ")) - # if user_input < 0 or user_input > len(repos)-1: - # print(f"Invalid input please input an integer between 0 and {len(repos)-1}") - # continue + if user_input < 0 or user_input > len(repos)-1: + print(f"Invalid input please input an integer between 0 and {len(repos)-1}") + continue - # repo = repos[user_input] - # break + repo = repos[user_input] + break - # except (IndexError, ValueError): - # print(f"Invalid input please input an integer between 0 and {len(repos)-1}") + except (IndexError, ValueError): + print(f"Invalid input please input an integer between 0 and {len(repos)-1}") - # logger.info("Starting celery to work on tasks") - # celery_process = subprocess.Popen(['celery', '-A', 'augur.tasks.init.celery_app.celery_app', 'worker', '--loglevel=info']) - # start_task.s(repo.repo_git).apply_async() + logger.info("Starting celery to work on tasks") + celery_process = subprocess.Popen(['celery', '-A', 'augur.tasks.init.celery_app.celery_app', 'worker', '--loglevel=info']) + start_task.s(repo.repo_git).apply_async() # if len(repos) > 1: diff --git a/augur/tasks/github/issue_tasks.py b/augur/tasks/github/issue_tasks.py index c4f390e6e7..b3cd254e44 100644 --- a/augur/tasks/github/issue_tasks.py +++ b/augur/tasks/github/issue_tasks.py @@ -43,6 +43,14 @@ def collect_issues(repo_git: str) -> None: for page_data, page in issues.iter_pages(): + if page_data == None: + return + + elif len(page_data) == 0: + logger.debug(f"{repo.capitalize()} Issues Page {page} contains no data...returning") + logger.info(f"{repo.capitalize()} Issues Page {page} of {num_pages}") + return + logger.info(f"{repo.capitalize()} Issues Page {page} of {num_pages}") process_issues.s(page_data, f"{repo.capitalize()} Issues Page {page} Task", repo_id).apply_async() @@ -196,9 +204,16 @@ def collect_pull_requests(repo_git: str) -> None: for page_data, page in prs.iter_pages(): - logger.info(f"Prs Page {page} of {num_pages}") + if page_data == None: + return - process_pull_requests.s(page_data, f"Pr Page {page} Task").apply_async() + elif len(page_data) == 0: + logger.debug(f"{repo.capitalize()} Prs Page {page} contains no data...returning") + logger.info(f"{repo.capitalize()} Prs Page {page} of {num_pages}") + return + + + process_pull_requests.s(page_data, f"{repo.capitalize()} Pr Page {page} Task").apply_async() @celery.task @@ -437,9 +452,15 @@ def collect_events(repo_git: str): for page_data, page in events.iter_pages(): - logger.info(f"Events Page {page} of {num_pages}") + if page_data is None: + return + + elif len(page_data) == 0: + logger.debug(f"{repo.capitalize()} Events Page {page} contains no data...returning") + logger.info(f"Events Page {page} of {num_pages}") + return - process_events.s(page_data, f"Events Page {page} Task").apply_async() + process_events.s(page_data, f"{repo.capitalize()} Events Page {page} Task").apply_async() logger.info("Completed events") @@ -562,6 +583,12 @@ def collect_issue_and_pr_comments(repo_git: str) -> None: for page_data, page in messages.iter_pages(): + if page_data is None: + return + elif len(page_data) == 0: + logger.debug(f"{repo.capitalize()} Messages Page {page} contains no data...returning") + logger.info(f"Github Messages Page {page} of {num_pages}") + logger.info(f"Github Messages Page {page} of {num_pages}") process_messages.s(page_data, f"Github Messages Page {page} Task").apply_async() diff --git a/augur/tasks/github/util/github_paginator.py b/augur/tasks/github/util/github_paginator.py index 1fd82c4554..0a9d620fed 100644 --- a/augur/tasks/github/util/github_paginator.py +++ b/augur/tasks/github/util/github_paginator.py @@ -39,11 +39,11 @@ def process_dict_response(logger, response: httpx.Response, page_data: dict): return None if page_data['message'] == "Not Found": - logger.info( + logger.error( "Github repo was not found or does not exist for endpoint: " - f"{response.url}\n" + f"{response.url}" ) - return "break" + return "Repo Not Found" if "You have exceeded a secondary rate limit. Please wait a few minutes before you try again" in page_data['message']: logger.info('\n\n\n\nSleeping for 100 seconds due to secondary rate limit issue.\n\n\n\n') @@ -52,10 +52,16 @@ def process_dict_response(logger, response: httpx.Response, page_data: dict): return "decrease_attempts" if "API rate limit exceeded for user" in page_data['message']: + key_reset_time = ( + datetime.datetime.fromtimestamp( + int(response.headers["X-RateLimit-Reset"]) + ) - datetime.datetime.now() + ).total_seconds() logger.info('\n\n\n\nSleeping for 100 seconds due to api rate limit being exceeded\n\n\n\n') - time.sleep(100) + logger.info(f"\n\n\nAPI rate limit exceeded. Sleeping until the key resets ({key_reset_time} seconds)") + time.sleep(key_reset_time) - return "decrease_attempts" + return "reset_attempts" if "You have triggered an abuse detection mechanism." in page_data['message']: #self.update_rate_limit(response, temporarily_disable=True,platform=platform) @@ -63,7 +69,7 @@ def process_dict_response(logger, response: httpx.Response, page_data: dict): return "decrease_attempts" if page_data['message'] == "Bad credentials": - logger.info("\n\n\n\n\n\n\n POSSIBLY BAD TOKEN \n\n\n\n\n\n\n") + logger.error("\n\n\n\n\n\n\n Bad Token Detected \n\n\n\n\n\n\n") #self.update_rate_limit(response, bad_credentials=True, platform=platform) return "bad_credentials" @@ -94,6 +100,7 @@ def __init__(self, url: str, key_manager: RandomKeyAuth, logger, from_datetime=N self.from_datetime = from_datetime self.to_datetime = to_datetime + def __getitem__(self, index: int) -> dict: # get the page the item is on @@ -116,6 +123,14 @@ def __getitem__(self, index: int) -> dict: except KeyError: raise IndexError + """ + This function is called when len() is called on the GithubPaginator class for example. + + issues = GithubPaginator(url, session.oauths, logger) + issue_len = len(issues) + + will call this function to get the length + """ def __len__(self) -> int: num_pages = self.get_num_pages() @@ -141,10 +156,13 @@ def __iter__(self): data_list, response = self.retrieve_data(self.url) - if data_list is None: + # if either the data or response is None then yield None and return + if response == None or data_list == None: + self.logger.debug("Response or data was none") yield None - return + return + #yield the first page data for data in data_list: yield data @@ -162,17 +180,23 @@ def __iter__(self): def iter_pages(self): + # retrieves the data for the given url data_list, response = self.retrieve_data(self.url) + # this retrieves the page for the given url page_number = get_url_page_number(self.url) - if data_list is None: + # if either the data or response is None then yield None and return + if response is None or data_list is None: yield None, page_number - return + return + # yields the first page of data and its page number yield data_list, page_number while 'next' in response.links.keys(): + + # gets the next page from the last responses header next_page = response.links['next']['url'] # Here we don't need to pass in params with the page, or the default params because the url from the headers already has those values @@ -180,9 +204,11 @@ def iter_pages(self): page_number = get_url_page_number(next_page) - if data_list is None: + # if either the data or response is None then yield None and return + if data_list is None or response is None: return + # yield the data from the page and its number yield data_list, page_number @@ -197,12 +223,12 @@ def hit_api(self, url: str, timeout, method='GET') -> httpx.Response: method=method, url=url, auth=self.key_manager, timeout=timeout) except TimeoutError: - self.logger.info("Request timed out. Sleeping 10 seconds and trying again...\n") - time.sleep(10) + self.logger.info(f"Request timed out. Sleeping {round(timeout)} seconds and trying again...\n") + time.sleep(round(timeout)) return None except httpx.TimeoutException: - self.logger.info("Request timed out. Sleeping 10 seconds and trying again...\n") - time.sleep(10) + self.logger.info(f"Request timed out. Sleeping {round(timeout)} seconds and trying again...\n") + time.sleep(round(timeout)) return None return response @@ -210,32 +236,51 @@ def hit_api(self, url: str, timeout, method='GET') -> httpx.Response: def retrieve_data(self, url: str): timeout = 5.0 + timeout_count = 0 num_attempts = 1 while num_attempts <= 10: response = self.hit_api(url, timeout) - # increment attempts if response is None: + if timeout_count == 10: + self.logger.error(f"Request timed out 10 times for {url}") + return + timeout = timeout * 1.2 + num_attempts += 1 continue - # update rate limit here - + + # try to get json from response try: page_data = response.json() except: page_data = json.loads(json.dumps(response.text)) + # if the data is a list, then return it and the response if type(page_data) == list: return page_data, response + # if the data is a dict then call process_dict_response, and elif type(page_data) == dict: result = self.process_dict_response(response, page_data) - if result == "break": - break + if result is None: + self.logger.debug(f"Encountered new dict response from api on url: {url}. Response: {page_data}") + return None, None + + + if result == "Repo Not Found": + return None, None + + # continue simply doesn't increase the attempts, which is what we are going for elif result == "decrease_attempts": - num_attempts -= 1 + continue + + + elif result == "reset_attempts": + num_attempts = 0 + continue elif type(page_data) == str: result, data_loaded = self.process_str_response(response, page_data) @@ -245,6 +290,7 @@ def retrieve_data(self, url: str): num_attempts += 1 + self.logger.error("Unable to collect data in 10 attempts") return None, None async def __aiter__(self):