Skip to content

Commit

Permalink
Merge pull request #1932 from chaoss/augur-new-invalid-repo
Browse files Browse the repository at this point in the history
Make tasks throw errors when repo does not exist
  • Loading branch information
ABrain7710 authored Jul 29, 2022
2 parents 4e64578 + 94d3e59 commit 3f64c89
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 47 deletions.
44 changes: 22 additions & 22 deletions augur/application/cli/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,38 +47,38 @@ def start(disable_collection):

repos = session.query(Repo).all()

repo_task_list = [start_task.si(repo.repo_git) for repo in repos] + [process_contributors.si(),]
# repo_task_list = [start_task.si(repo.repo_git) for repo in repos] + [process_contributors.si(),]

repos_chain = group(repo_task_list)
# repos_chain = group(repo_task_list)

logger.info(repos_chain)
# logger.info(repos_chain)

celery_process = subprocess.Popen(['celery', '-A', 'augur.tasks.init.celery_app.celery_app', 'worker', '--loglevel=info'])
# celery_process = subprocess.Popen(['celery', '-A', 'augur.tasks.init.celery_app.celery_app', 'worker', '--loglevel=info'])

repos_chain.apply_async()
# repos_chain.apply_async()

# repos_to_collect = []
# repo_task_list = []
repos_to_collect = []
repo_task_list = []

# logger.info("Repos available for collection")
# print_repos(repos)
# while True:
# try:
# user_input = int(input("Please select a repo to collect: "))
logger.info("Repos available for collection")
print_repos(repos)
while True:
try:
user_input = int(input("Please select a repo to collect: "))

# if user_input < 0 or user_input > len(repos)-1:
# print(f"Invalid input please input an integer between 0 and {len(repos)-1}")
# continue
if user_input < 0 or user_input > len(repos)-1:
print(f"Invalid input please input an integer between 0 and {len(repos)-1}")
continue

# repo = repos[user_input]
# break
repo = repos[user_input]
break

# except (IndexError, ValueError):
# print(f"Invalid input please input an integer between 0 and {len(repos)-1}")
except (IndexError, ValueError):
print(f"Invalid input please input an integer between 0 and {len(repos)-1}")

# logger.info("Starting celery to work on tasks")
# celery_process = subprocess.Popen(['celery', '-A', 'augur.tasks.init.celery_app.celery_app', 'worker', '--loglevel=info'])
# start_task.s(repo.repo_git).apply_async()
logger.info("Starting celery to work on tasks")
celery_process = subprocess.Popen(['celery', '-A', 'augur.tasks.init.celery_app.celery_app', 'worker', '--loglevel=info'])
start_task.s(repo.repo_git).apply_async()


# if len(repos) > 1:
Expand Down
35 changes: 31 additions & 4 deletions augur/tasks/github/issue_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ def collect_issues(repo_git: str) -> None:

for page_data, page in issues.iter_pages():

if page_data == None:
return

elif len(page_data) == 0:
logger.debug(f"{repo.capitalize()} Issues Page {page} contains no data...returning")
logger.info(f"{repo.capitalize()} Issues Page {page} of {num_pages}")
return

logger.info(f"{repo.capitalize()} Issues Page {page} of {num_pages}")

process_issues.s(page_data, f"{repo.capitalize()} Issues Page {page} Task", repo_id).apply_async()
Expand Down Expand Up @@ -196,9 +204,16 @@ def collect_pull_requests(repo_git: str) -> None:

for page_data, page in prs.iter_pages():

logger.info(f"Prs Page {page} of {num_pages}")
if page_data == None:
return

process_pull_requests.s(page_data, f"Pr Page {page} Task").apply_async()
elif len(page_data) == 0:
logger.debug(f"{repo.capitalize()} Prs Page {page} contains no data...returning")
logger.info(f"{repo.capitalize()} Prs Page {page} of {num_pages}")
return


process_pull_requests.s(page_data, f"{repo.capitalize()} Pr Page {page} Task").apply_async()


@celery.task
Expand Down Expand Up @@ -437,9 +452,15 @@ def collect_events(repo_git: str):

for page_data, page in events.iter_pages():

logger.info(f"Events Page {page} of {num_pages}")
if page_data is None:
return

elif len(page_data) == 0:
logger.debug(f"{repo.capitalize()} Events Page {page} contains no data...returning")
logger.info(f"Events Page {page} of {num_pages}")
return

process_events.s(page_data, f"Events Page {page} Task").apply_async()
process_events.s(page_data, f"{repo.capitalize()} Events Page {page} Task").apply_async()

logger.info("Completed events")

Expand Down Expand Up @@ -562,6 +583,12 @@ def collect_issue_and_pr_comments(repo_git: str) -> None:

for page_data, page in messages.iter_pages():

if page_data is None:
return
elif len(page_data) == 0:
logger.debug(f"{repo.capitalize()} Messages Page {page} contains no data...returning")
logger.info(f"Github Messages Page {page} of {num_pages}")

logger.info(f"Github Messages Page {page} of {num_pages}")

process_messages.s(page_data, f"Github Messages Page {page} Task").apply_async()
Expand Down
88 changes: 67 additions & 21 deletions augur/tasks/github/util/github_paginator.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ def process_dict_response(logger, response: httpx.Response, page_data: dict):
return None

if page_data['message'] == "Not Found":
logger.info(
logger.error(
"Github repo was not found or does not exist for endpoint: "
f"{response.url}\n"
f"{response.url}"
)
return "break"
return "Repo Not Found"

if "You have exceeded a secondary rate limit. Please wait a few minutes before you try again" in page_data['message']:
logger.info('\n\n\n\nSleeping for 100 seconds due to secondary rate limit issue.\n\n\n\n')
Expand All @@ -52,18 +52,24 @@ def process_dict_response(logger, response: httpx.Response, page_data: dict):
return "decrease_attempts"

if "API rate limit exceeded for user" in page_data['message']:
key_reset_time = (
datetime.datetime.fromtimestamp(
int(response.headers["X-RateLimit-Reset"])
) - datetime.datetime.now()
).total_seconds()
logger.info('\n\n\n\nSleeping for 100 seconds due to api rate limit being exceeded\n\n\n\n')
time.sleep(100)
logger.info(f"\n\n\nAPI rate limit exceeded. Sleeping until the key resets ({key_reset_time} seconds)")
time.sleep(key_reset_time)

return "decrease_attempts"
return "reset_attempts"

if "You have triggered an abuse detection mechanism." in page_data['message']:
#self.update_rate_limit(response, temporarily_disable=True,platform=platform)

return "decrease_attempts"

if page_data['message'] == "Bad credentials":
logger.info("\n\n\n\n\n\n\n POSSIBLY BAD TOKEN \n\n\n\n\n\n\n")
logger.error("\n\n\n\n\n\n\n Bad Token Detected \n\n\n\n\n\n\n")
#self.update_rate_limit(response, bad_credentials=True, platform=platform)
return "bad_credentials"

Expand Down Expand Up @@ -94,6 +100,7 @@ def __init__(self, url: str, key_manager: RandomKeyAuth, logger, from_datetime=N
self.from_datetime = from_datetime
self.to_datetime = to_datetime


def __getitem__(self, index: int) -> dict:

# get the page the item is on
Expand All @@ -116,6 +123,14 @@ def __getitem__(self, index: int) -> dict:
except KeyError:
raise IndexError

"""
This function is called when len() is called on the GithubPaginator class for example.
issues = GithubPaginator(url, session.oauths, logger)
issue_len = len(issues)
will call this function to get the length
"""
def __len__(self) -> int:

num_pages = self.get_num_pages()
Expand All @@ -141,10 +156,13 @@ def __iter__(self):

data_list, response = self.retrieve_data(self.url)

if data_list is None:
# if either the data or response is None then yield None and return
if response == None or data_list == None:
self.logger.debug("Response or data was none")
yield None
return
return

#yield the first page data
for data in data_list:
yield data

Expand All @@ -162,27 +180,35 @@ def __iter__(self):

def iter_pages(self):

# retrieves the data for the given url
data_list, response = self.retrieve_data(self.url)

# this retrieves the page for the given url
page_number = get_url_page_number(self.url)

if data_list is None:
# if either the data or response is None then yield None and return
if response is None or data_list is None:
yield None, page_number
return
return

# yields the first page of data and its page number
yield data_list, page_number

while 'next' in response.links.keys():

# gets the next page from the last responses header
next_page = response.links['next']['url']

# Here we don't need to pass in params with the page, or the default params because the url from the headers already has those values
data_list, response = self.retrieve_data(next_page)

page_number = get_url_page_number(next_page)

if data_list is None:
# if either the data or response is None then yield None and return
if data_list is None or response is None:
return

# yield the data from the page and its number
yield data_list, page_number


Expand All @@ -197,45 +223,64 @@ def hit_api(self, url: str, timeout, method='GET') -> httpx.Response:
method=method, url=url, auth=self.key_manager, timeout=timeout)

except TimeoutError:
self.logger.info("Request timed out. Sleeping 10 seconds and trying again...\n")
time.sleep(10)
self.logger.info(f"Request timed out. Sleeping {round(timeout)} seconds and trying again...\n")
time.sleep(round(timeout))
return None
except httpx.TimeoutException:
self.logger.info("Request timed out. Sleeping 10 seconds and trying again...\n")
time.sleep(10)
self.logger.info(f"Request timed out. Sleeping {round(timeout)} seconds and trying again...\n")
time.sleep(round(timeout))
return None

return response

def retrieve_data(self, url: str):

timeout = 5.0
timeout_count = 0
num_attempts = 1
while num_attempts <= 10:

response = self.hit_api(url, timeout)

# increment attempts
if response is None:
if timeout_count == 10:
self.logger.error(f"Request timed out 10 times for {url}")
return

timeout = timeout * 1.2
num_attempts += 1
continue
# update rate limit here


# try to get json from response
try:
page_data = response.json()
except:
page_data = json.loads(json.dumps(response.text))

# if the data is a list, then return it and the response
if type(page_data) == list:
return page_data, response

# if the data is a dict then call process_dict_response, and
elif type(page_data) == dict:
result = self.process_dict_response(response, page_data)

if result == "break":
break
if result is None:
self.logger.debug(f"Encountered new dict response from api on url: {url}. Response: {page_data}")
return None, None


if result == "Repo Not Found":
return None, None

# continue simply doesn't increase the attempts, which is what we are going for
elif result == "decrease_attempts":
num_attempts -= 1
continue


elif result == "reset_attempts":
num_attempts = 0
continue

elif type(page_data) == str:
result, data_loaded = self.process_str_response(response, page_data)
Expand All @@ -245,6 +290,7 @@ def retrieve_data(self, url: str):

num_attempts += 1

self.logger.error("Unable to collect data in 10 attempts")
return None, None

async def __aiter__(self):
Expand Down

0 comments on commit 3f64c89

Please sign in to comment.