From f22613cdf5f86994d414ebd64911f4d9fff6d849 Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Mon, 19 Aug 2024 17:48:57 -0500 Subject: [PATCH 01/10] Set per page to 100 Signed-off-by: Andrew Brain --- augur/tasks/github/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/augur/tasks/github/events.py b/augur/tasks/github/events.py index da889d9326..2c894bae97 100644 --- a/augur/tasks/github/events.py +++ b/augur/tasks/github/events.py @@ -50,7 +50,7 @@ def collect_events(repo_git: str): def bulk_events_collection_endpoint_contains_all_data(key_auth, logger, owner, repo): - url = f"https://api.github.com/repos/{owner}/{repo}/issues/events" + url = f"https://api.github.com/repos/{owner}/{repo}/issues/events?per_page=100" github_data_access = GithubDataAccess(key_auth, logger) From 8f07cf4acf6f344e7f0fa996daf5bc4f04a12d2b Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Wed, 21 Aug 2024 18:41:20 -0500 Subject: [PATCH 02/10] start splitting out event collections into two classes --- augur/tasks/github/events.py | 197 +++++++++++++++++++++++++++++++++++ 1 file changed, 197 insertions(+) diff --git a/augur/tasks/github/events.py b/augur/tasks/github/events.py index 2c894bae97..3a3990a433 100644 --- a/augur/tasks/github/events.py +++ b/augur/tasks/github/events.py @@ -216,3 +216,200 @@ def process_github_event_contributors(logger, event, tool_source, tool_version, return event, event_cntrb +import abc +class NotMappableException(Exception): + pass + +class GithubEventCollection(abc.ABC): + + def __init__(self, logger): + self.logger = logger + self.tool_source = "Github events task" + self.tool_version = "2.0" + self.data_source = "Github API" + + @abc.abstractmethod + def process_events(self, events, repo_id): + pass + + @abc.abstractmethod + def collect_events(self, repo_git: str, key_auth): + pass + + def run(self, repo_git, key_auth): + repo_obj = get_repo_by_repo_git(repo_git) + repo_id = repo_obj.repo_id + + owner, repo = get_owner_repo(repo_git) + self.repo_identifier = f"{owner}/{repo}" + + events = [] + for event in self.collect_events(repo_git, key_auth): + events.append(event) + + # making this a decent size since process_events retrieves all the issues and prs each time + if len(events) >= 500: + self.process_events(events, repo_id) + events.clear() + + if events: + self.process_events(events, repo_id) + + def insert_issue_events(self, events): + issue_event_natural_keys = ["issue_id", "issue_event_src_id"] + bulk_insert_dicts(self.logger, events, IssueEvent, issue_event_natural_keys) + + def insert_pr_events(self, events): + pr_event_natural_keys = ["node_id"] + bulk_insert_dicts(self.logger, events, PullRequestEvent, pr_event_natural_keys) + + def insert_contributors(self, contributors): + bulk_insert_dicts(self.logger, contributors, Contributor, ["cntrb_id"]) + + +class BulkGithubEventCollection(GithubEventCollection): + + def __init__(self, logger): + + self.task_name = f"Bulk Github Event task" + self.repo_identifier = "" + + super().__init__(logger) + + def collect_events(self, repo_git: str, key_auth): + + owner, repo = get_owner_repo(repo_git) + + self.logger.debug(f"Collecting Github events for {owner}/{repo}") + + url = f"https://api.github.com/repos/{owner}/{repo}/issues/events" + + github_data_access = GithubDataAccess(key_auth, self.logger) + + return github_data_access.paginate_resource(url) + + def process_events(self, events, repo_id): + + issue_events = [] + pr_events = [] + not_mappable_events = [] + for event in events: + + try: + if self.__is_pr_event(event): + pr_events.append(event) + else: + issue_events.append(event) + except NotMappableException: + not_mappable_events.append(event) + + self.logger.warning(f"{self.repo_identifier} - {self.task_name}: Unable to map these github events to an issue or pr: {not_mappable_events}") + + self.__process_issue_events(issue_events, repo_id) + self.__process_pr_events(pr_events, repo_id) + + update_issue_closed_cntrbs_by_repo_id(repo_id) + + def __process_issue_events(self, issue_events, repo_id): + + issue_event_dicts = [] + contributors = [] + + issue_url_to_id_map = self.__get_map_from_issue_url_to_id(repo_id) + + for event in issue_events: + + event, contributor = process_github_event_contributors(self.logger, event, self.tool_source, self.tool_version, self.data_source) + + issue_url = event["issue"]["url"] + + try: + issue_id = issue_url_to_id_map[issue_url] + except KeyError: + self.logger.warning(f"{self.repo_identifier} - {self.task_name}: Could not find related issue. We were searching for: {issue_url}") + continue + + issue_event_dicts.append( + extract_issue_event_data(event, issue_id, platform_id, repo_id, + self.tool_source, self.tool_version, self.data_source) + ) + + if contributor: + contributors.append(contributor) + + contributors = remove_duplicate_dicts(contributors) + + self.insert_contributors(contributors) + + self.insert_issue_events(issue_event_dicts) + + def __process_pr_events(self, pr_events, repo_id): + + pr_event_dicts = [] + contributors = [] + + pr_url_to_id_map = self.__get_map_from_pr_url_to_id(repo_id) + + for event in pr_events: + + event, contributor = process_github_event_contributors(self.logger, event, self.tool_source, self.tool_version, self.data_source) + + pr_url = event["issue"]["pull_request"]["url"] + + try: + pull_request_id = pr_url_to_id_map[pr_url] + except KeyError: + self.logger.warning(f"{self.repo_identifier} - {self.task_name}: Could not find related pr. We were searching for: {pr_url}") + continue + + pr_event_dicts.append( + extract_pr_event_data(event, pull_request_id, platform_id, repo_id, + self.tool_source, self.tool_version, self.data_source) + ) + + if contributor: + contributors.append(contributor) + + contributors = remove_duplicate_dicts(contributors) + + self.insert_contributors(contributors) + + self.insert_pr_events(pr_event_dicts) + + def __get_map_from_pr_url_to_id(self, repo_id): + + pr_url_to_id_map = {} + prs = get_pull_requests_by_repo_id(repo_id) + for pr in prs: + pr_url_to_id_map[pr.pr_url] = pr.pull_request_id + + return pr_url_to_id_map + + def __get_map_from_issue_url_to_id(self, repo_id): + + issue_url_to_id_map = {} + issues = get_issues_by_repo_id(repo_id) + for issue in issues: + issue_url_to_id_map[issue.issue_url] = issue.issue_id + + return issue_url_to_id_map + + def __is_pr_event(self, event): + + if event["issue"] is None: + raise NotMappableException("Not mappable to pr or issue") + + return event["issue"].get('pull_request', None) != None + + +class ThoroughGithubEventCollection(GithubEventCollection): + + def __init__(self, logger): + super().__init__(logger) + + def collect_events(self, repo_git: str, key_auth): + pass + + def process_events(self, events, repo_id): + pass + From 1fd82669e3f132b03e756dd244480ac3967970d0 Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Sun, 25 Aug 2024 17:15:28 -0500 Subject: [PATCH 03/10] define thorough event collection --- augur/application/db/data_parse.py | 6 +- augur/tasks/github/events.py | 172 +++++++++++++++++++++++------ 2 files changed, 141 insertions(+), 37 deletions(-) diff --git a/augur/application/db/data_parse.py b/augur/application/db/data_parse.py index 1291276f59..219c09c0e7 100644 --- a/augur/application/db/data_parse.py +++ b/augur/application/db/data_parse.py @@ -283,7 +283,7 @@ def extract_pr_review_message_ref_data(comment: dict, augur_pr_review_id, github return pr_review_comment_message_ref -def extract_pr_event_data(event: dict, pr_id: int, platform_id: int, repo_id: int, tool_source: str, tool_version: str, data_source: str) -> dict: +def extract_pr_event_data(event: dict, pr_id: int, gh_src_id: int, platform_id: int, repo_id: int, tool_source: str, tool_version: str, data_source: str) -> dict: pr_event = { 'pull_request_id': pr_id, @@ -291,13 +291,13 @@ def extract_pr_event_data(event: dict, pr_id: int, platform_id: int, repo_id: in 'action': event['event'], 'action_commit_hash': None, 'created_at': event['created_at'], - 'issue_event_src_id': int(event['issue']["id"]), + 'issue_event_src_id': gh_src_id, 'node_id': event['node_id'], 'node_url': event['url'], 'tool_source': tool_source, 'tool_version': tool_version, 'data_source': data_source, - 'pr_platform_event_id': int(event['issue']["id"]), + 'pr_platform_event_id': gh_src_id, 'platform_id': platform_id, 'repo_id': repo_id } diff --git a/augur/tasks/github/events.py b/augur/tasks/github/events.py index 3a3990a433..8d0d0b4271 100644 --- a/augur/tasks/github/events.py +++ b/augur/tasks/github/events.py @@ -228,33 +228,6 @@ def __init__(self, logger): self.tool_version = "2.0" self.data_source = "Github API" - @abc.abstractmethod - def process_events(self, events, repo_id): - pass - - @abc.abstractmethod - def collect_events(self, repo_git: str, key_auth): - pass - - def run(self, repo_git, key_auth): - repo_obj = get_repo_by_repo_git(repo_git) - repo_id = repo_obj.repo_id - - owner, repo = get_owner_repo(repo_git) - self.repo_identifier = f"{owner}/{repo}" - - events = [] - for event in self.collect_events(repo_git, key_auth): - events.append(event) - - # making this a decent size since process_events retrieves all the issues and prs each time - if len(events) >= 500: - self.process_events(events, repo_id) - events.clear() - - if events: - self.process_events(events, repo_id) - def insert_issue_events(self, events): issue_event_natural_keys = ["issue_id", "issue_event_src_id"] bulk_insert_dicts(self.logger, events, IssueEvent, issue_event_natural_keys) @@ -266,6 +239,19 @@ def insert_pr_events(self, events): def insert_contributors(self, contributors): bulk_insert_dicts(self.logger, contributors, Contributor, ["cntrb_id"]) + def process_github_event_contributors(self, event): + + if event["actor"]: + + event_cntrb = extract_needed_contributor_data(event["actor"], self.tool_source, self.tool_version, self.data_source) + event["cntrb_id"] = event_cntrb["cntrb_id"] + + else: + event["cntrb_id"] = None + return event, None + + return event, event_cntrb + class BulkGithubEventCollection(GithubEventCollection): @@ -275,6 +261,26 @@ def __init__(self, logger): self.repo_identifier = "" super().__init__(logger) + + def collect(self, repo_git, key_auth): + + repo_obj = get_repo_by_repo_git(repo_git) + repo_id = repo_obj.repo_id + + owner, repo = get_owner_repo(repo_git) + self.repo_identifier = f"{owner}/{repo}" + + events = [] + for event in self.collect_events(repo_git, key_auth): + events.append(event) + + # making this a decent size since process_events retrieves all the issues and prs each time + if len(events) >= 500: + self.process_events(events, repo_id) + events.clear() + + if events: + self.process_events(events, repo_id) def collect_events(self, repo_git: str, key_auth): @@ -319,7 +325,7 @@ def __process_issue_events(self, issue_events, repo_id): for event in issue_events: - event, contributor = process_github_event_contributors(self.logger, event, self.tool_source, self.tool_version, self.data_source) + event, contributor = self.process_github_event_contributors(event) issue_url = event["issue"]["url"] @@ -352,7 +358,7 @@ def __process_pr_events(self, pr_events, repo_id): for event in pr_events: - event, contributor = process_github_event_contributors(self.logger, event, self.tool_source, self.tool_version, self.data_source) + event, contributor = self.process_github_event_contributors(event) pr_url = event["issue"]["pull_request"]["url"] @@ -363,7 +369,7 @@ def __process_pr_events(self, pr_events, repo_id): continue pr_event_dicts.append( - extract_pr_event_data(event, pull_request_id, platform_id, repo_id, + extract_pr_event_data(event, pull_request_id, int(event['issue']["id"]), platform_id, repo_id, self.tool_source, self.tool_version, self.data_source) ) @@ -407,9 +413,107 @@ class ThoroughGithubEventCollection(GithubEventCollection): def __init__(self, logger): super().__init__(logger) - def collect_events(self, repo_git: str, key_auth): - pass + def run(self, repo_git, key_auth): - def process_events(self, events, repo_id): - pass + repo_obj = get_repo_by_repo_git(repo_git) + repo_id = repo_obj.repo_id + + owner, repo = get_owner_repo(repo_git) + self.repo_identifier = f"{owner}/{repo}" + + self.collect_issue_events(owner, repo, repo_id, key_auth) + self.collect_pr_events(owner, repo, repo_id, key_auth) + + def collect_and_process_issue_events(self, owner, repo, repo_id, key_auth): + + # define logger for task + self.logger.debug(f"Collecting github events for {owner}/{repo}") + + engine = get_engine() + + with engine.connect() as connection: + + # TODO: Remove src id if it ends up not being needed + query = text(f""" + select issue_id as issue_id, gh_issue_number as issue_number, gh_issue_id as gh_src_id from issues WHERE repo_id={repo_id} order by created_at desc; + """) + + issue_result = connection.execute(query).fetchall() + + events = [] + contributors = [] + github_data_access = GithubDataAccess(key_auth, self.logger) + for db_issue in issue_result: + issue = dict(db_issue) + + issue_number = issue["issue_number"] + + event_url = f"https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}/events" + + for event in github_data_access.paginate_resource(event_url): + + event, contributor = self.process_github_event_contributors(event) + + contributors.append(contributor) + + events.append( + extract_issue_event_data(event, issue["issue_id"], platform_id, repo_id, + self.tool_source, self.tool_version, self.data_source) + ) + + if len(events) > 500: + self.insert_contributors(contributors) + self.insert_issue_events(events) + events.clear() + + if events: + self.insert_contributors(contributors) + self.insert_issue_events(events) + events.clear() + + + def collect_and_process_pr_events(self, owner, repo, repo_id, key_auth): + + # define logger for task + self.logger.debug(f"Collecting github events for {owner}/{repo}") + + engine = get_engine() + + with engine.connect() as connection: + + query = text(f""" + select pull_request_id, pr_src_number as gh_pr_number, pr_src_id from pull_requests order by pr_created_at desc; from pull_requests WHERE repo_id={repo_id} order by pr_created_at desc; + """) + + pr_result = connection.execute(query).fetchall() + + events = [] + contributors = [] + github_data_access = GithubDataAccess(key_auth, self.logger) + for db_pr in pr_result: + pr = dict(db_pr) + + pr_number = pr["gh_pr_number"] + + event_url = f"https://api.github.com/repos/{owner}/{repo}/issues/{pr_number}/events" + + for event in github_data_access.paginate_resource(event_url): + + event, contributor = self.process_github_event_contributors(event) + + contributors.append(contributor) + events.append( + extract_pr_event_data(event, pr["pull_request_id"], pr["pr_src_id"] , platform_id, repo_id, + self.tool_source, self.tool_version, self.data_source) + ) + + if len(events) > 500: + self.insert_contributors(contributors) + self.insert_pr_events(events) + events.clear() + + if events: + self.insert_contributors(contributors) + self.insert_pr_events(events) + events.clear() From c15dd094b6a6079b184ec1b81606e60d4b09b243 Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Sun, 25 Aug 2024 17:28:02 -0500 Subject: [PATCH 04/10] remove old implementation and clean up some things --- augur/tasks/github/events.py | 303 ++++++++--------------------------- 1 file changed, 64 insertions(+), 239 deletions(-) diff --git a/augur/tasks/github/events.py b/augur/tasks/github/events.py index 8d0d0b4271..f620d23ec9 100644 --- a/augur/tasks/github/events.py +++ b/augur/tasks/github/events.py @@ -2,6 +2,7 @@ import traceback import sqlalchemy as s from sqlalchemy.sql import text +from abc import ABC, abstractmethod from augur.tasks.init.celery_app import celery_app as celery from augur.tasks.init.celery_app import AugurCoreRepoCollectionTask @@ -21,9 +22,6 @@ def collect_events(repo_git: str): logger = logging.getLogger(collect_events.__name__) - repo_obj = get_repo_by_repo_git(repo_git) - repo_id = repo_obj.repo_id - owner, repo = get_owner_repo(repo_git) logger.debug(f"Collecting Github events for {owner}/{repo}") @@ -31,22 +29,11 @@ def collect_events(repo_git: str): key_auth = GithubRandomKeyAuth(logger) if bulk_events_collection_endpoint_contains_all_data(key_auth, logger, owner, repo): - event_generator = bulk_collect_pr_and_issue_events(repo_git, logger, key_auth) + collection_strategy = BulkGithubEventCollection(logger) else: - event_generator = collect_pr_and_issues_events_by_number(repo_id, repo_git, logger, key_auth, f"{owner}/{repo}: Event task") - - events = [] - for event in event_generator: - events.append(event) - - # making this a decent size since process_events retrieves all the issues and prs each time - if len(events) >= 500: - process_events(events, f"{owner}/{repo}: Event task", repo_id, logger) - events.clear() - - if events: - process_events(events, f"{owner}/{repo}: Event task", repo_id, logger) + collection_strategy = ThoroughGithubEventCollection(logger) + collection_strategy.collect() def bulk_events_collection_endpoint_contains_all_data(key_auth, logger, owner, repo): @@ -61,189 +48,37 @@ def bulk_events_collection_endpoint_contains_all_data(key_auth, logger, owner, r return page_count != 300 - -def bulk_collect_pr_and_issue_events(repo_git: str, logger, key_auth): - - owner, repo = get_owner_repo(repo_git) - - logger.debug(f"Collecting Github events for {owner}/{repo}") - - url = f"https://api.github.com/repos/{owner}/{repo}/issues/events" - - github_data_access = GithubDataAccess(key_auth, logger) - - return github_data_access.paginate_resource(url) - - -def collect_pr_and_issues_events_by_number(repo_id, repo_git: str, logger, key_auth, task_name) -> None: - - owner, repo = get_owner_repo(repo_git) - - # define logger for task - logger.debug(f"Collecting github events for {owner}/{repo}") - - engine = get_engine() - - with engine.connect() as connection: - - query = text(f""" - (select pr_src_number as number from pull_requests WHERE repo_id={repo_id} order by pr_created_at desc) - UNION - (select gh_issues_number as number from issues WHERE repo_id={repo_id} order by created_at desc); - """) - - result = connection.execute(query).fetchall() - numbers = [x[0] for x in result] - - github_data_access = GithubDataAccess(key_auth, logger) - for number in numbers: - - event_url = f"https://api.github.com/repos/{owner}/{repo}/issues/{number}/events" - - yield from github_data_access.paginate_resource(event_url) - -def process_events(events, task_name, repo_id, logger): - - tool_source = "Github events task" - tool_version = "2.0" - data_source = "Github API" - - pr_event_dicts = [] - issue_event_dicts = [] - contributors = [] - - - # create mapping from issue url to issue id of current issues - issue_url_to_id_map = {} - issues = get_issues_by_repo_id(repo_id) - for issue in issues: - issue_url_to_id_map[issue.issue_url] = issue.issue_id - - # create mapping from pr url to pr id of current pull requests - pr_url_to_id_map = {} - prs = get_pull_requests_by_repo_id(repo_id) - for pr in prs: - pr_url_to_id_map[pr.pr_url] = pr.pull_request_id - - not_mapable_event_count = 0 - event_len = len(events) - for event in events: - - event, contributor = process_github_event_contributors(logger, event, tool_source, tool_version, data_source) - - # event_mapping_data is the pr or issue data needed to relate the event to an issue or pr - event_mapping_data = event["issue"] - - if event_mapping_data is None: - not_mapable_event_count += 1 - continue - - pull_request = event_mapping_data.get('pull_request', None) - if pull_request: - pr_url = pull_request["url"] - - try: - pull_request_id = pr_url_to_id_map[pr_url] - - # query = augur_db.session.query(PullRequest).filter(PullRequest.pr_url == pr_url) - # related_pr = execute_session_query(query, 'one') - except KeyError: - logger.warning(f"{task_name}: Could not find related pr. We were searching for: {pr_url}") - continue - - pr_event_dicts.append( - extract_pr_event_data(event, pull_request_id, platform_id, repo_id, - tool_source, tool_version, data_source) - ) - - else: - issue_url = event_mapping_data["url"] - - try: - issue_id = issue_url_to_id_map[issue_url] - # query = augur_db.session.query(Issue).filter(Issue.issue_url == issue_url) - # related_issue = execute_session_query(query, 'one') - except KeyError: - logger.warning(f"{task_name}: Could not find related issue. We were searching for: {issue_url}") - continue - - issue_event_dicts.append( - extract_issue_event_data(event, issue_id, platform_id, repo_id, - tool_source, tool_version, data_source) - ) - - # add contributor to list after porcessing the event, - # so if it fails processing for some reason the contributor is not inserted - # NOTE: contributor is none when there is no contributor data on the event - if contributor: - contributors.append(contributor) - - # remove contributors that were found in the data more than once - contributors = remove_duplicate_dicts(contributors) - - bulk_insert_dicts(logger, contributors, Contributor, ["cntrb_id"]) - - issue_events_len = len(issue_event_dicts) - pr_events_len = len(pr_event_dicts) - if event_len != (issue_events_len + pr_events_len): - - unassigned_events = event_len - issue_events_len - pr_events_len - - logger.error(f"{task_name}: {event_len} events were processed, but {pr_events_len} pr events were found and related to a pr, and {issue_events_len} issue events were found and related to an issue. {not_mapable_event_count} events were not related to a pr or issue due to the api returning insufficient data. For some reason {unassigned_events} events were not able to be processed even when the api returned sufficient data. This is usually because pull requests or issues have not been collected, and the events are skipped because they cannot be related to a pr or issue") - - logger.info(f"{task_name}: Inserting {len(pr_event_dicts)} pr events and {len(issue_event_dicts)} issue events") - - # TODO: Could replace this with "id" but it isn't stored on the table for some reason - pr_event_natural_keys = ["node_id"] - bulk_insert_dicts(logger, pr_event_dicts, PullRequestEvent, pr_event_natural_keys) - - issue_event_natural_keys = ["issue_id", "issue_event_src_id"] - bulk_insert_dicts(logger, issue_event_dicts, IssueEvent, issue_event_natural_keys) - - update_issue_closed_cntrbs_by_repo_id(repo_id) - -# TODO: Should we skip an event if there is no contributor to resolve it o -def process_github_event_contributors(logger, event, tool_source, tool_version, data_source): - - if event["actor"]: - - event_cntrb = extract_needed_contributor_data(event["actor"], tool_source, tool_version, data_source) - event["cntrb_id"] = event_cntrb["cntrb_id"] - - else: - event["cntrb_id"] = None - return event, None - - return event, event_cntrb - -import abc class NotMappableException(Exception): pass -class GithubEventCollection(abc.ABC): +class GithubEventCollection(ABC): def __init__(self, logger): - self.logger = logger - self.tool_source = "Github events task" - self.tool_version = "2.0" - self.data_source = "Github API" + self._logger = logger + self._tool_source = "Github events task" + self._tool_version = "2.0" + self._data_source = "Github API" - def insert_issue_events(self, events): + @abstractmethod + def collect(self, repo_git, key_auth): + pass + + def _insert_issue_events(self, events): issue_event_natural_keys = ["issue_id", "issue_event_src_id"] - bulk_insert_dicts(self.logger, events, IssueEvent, issue_event_natural_keys) + bulk_insert_dicts(self._logger, events, IssueEvent, issue_event_natural_keys) - def insert_pr_events(self, events): + def _insert_pr_events(self, events): pr_event_natural_keys = ["node_id"] - bulk_insert_dicts(self.logger, events, PullRequestEvent, pr_event_natural_keys) + bulk_insert_dicts(self._logger, events, PullRequestEvent, pr_event_natural_keys) - def insert_contributors(self, contributors): - bulk_insert_dicts(self.logger, contributors, Contributor, ["cntrb_id"]) + def _insert_contributors(self, contributors): + bulk_insert_dicts(self._logger, contributors, Contributor, ["cntrb_id"]) - def process_github_event_contributors(self, event): + def _process_github_event_contributors(self, event): if event["actor"]: - event_cntrb = extract_needed_contributor_data(event["actor"], self.tool_source, self.tool_version, self.data_source) + event_cntrb = extract_needed_contributor_data(event["actor"], self._tool_source, self._tool_version, self._data_source) event["cntrb_id"] = event_cntrb["cntrb_id"] else: @@ -252,7 +87,6 @@ def process_github_event_contributors(self, event): return event, event_cntrb - class BulkGithubEventCollection(GithubEventCollection): def __init__(self, logger): @@ -271,7 +105,7 @@ def collect(self, repo_git, key_auth): self.repo_identifier = f"{owner}/{repo}" events = [] - for event in self.collect_events(repo_git, key_auth): + for event in self._collect_events(repo_git, key_auth): events.append(event) # making this a decent size since process_events retrieves all the issues and prs each time @@ -280,21 +114,19 @@ def collect(self, repo_git, key_auth): events.clear() if events: - self.process_events(events, repo_id) + self._process_events(events, repo_id) - def collect_events(self, repo_git: str, key_auth): + def _collect_events(self, repo_git: str, key_auth): owner, repo = get_owner_repo(repo_git) - self.logger.debug(f"Collecting Github events for {owner}/{repo}") - url = f"https://api.github.com/repos/{owner}/{repo}/issues/events" - github_data_access = GithubDataAccess(key_auth, self.logger) + github_data_access = GithubDataAccess(key_auth, self._logger) return github_data_access.paginate_resource(url) - def process_events(self, events, repo_id): + def _process_events(self, events, repo_id): issue_events = [] pr_events = [] @@ -302,42 +134,42 @@ def process_events(self, events, repo_id): for event in events: try: - if self.__is_pr_event(event): + if self._is_pr_event(event): pr_events.append(event) else: issue_events.append(event) except NotMappableException: not_mappable_events.append(event) - self.logger.warning(f"{self.repo_identifier} - {self.task_name}: Unable to map these github events to an issue or pr: {not_mappable_events}") + self._logger.warning(f"{self.repo_identifier} - {self.task_name}: Unable to map these github events to an issue or pr: {not_mappable_events}") - self.__process_issue_events(issue_events, repo_id) - self.__process_pr_events(pr_events, repo_id) + self._process_issue_events(issue_events, repo_id) + self._process_pr_events(pr_events, repo_id) update_issue_closed_cntrbs_by_repo_id(repo_id) - def __process_issue_events(self, issue_events, repo_id): + def _process_issue_events(self, issue_events, repo_id): issue_event_dicts = [] contributors = [] - issue_url_to_id_map = self.__get_map_from_issue_url_to_id(repo_id) + issue_url_to_id_map = self._get_map_from_issue_url_to_id(repo_id) for event in issue_events: - event, contributor = self.process_github_event_contributors(event) + event, contributor = self._process_github_event_contributors(event) issue_url = event["issue"]["url"] try: issue_id = issue_url_to_id_map[issue_url] except KeyError: - self.logger.warning(f"{self.repo_identifier} - {self.task_name}: Could not find related issue. We were searching for: {issue_url}") + self._logger.warning(f"{self.repo_identifier} - {self.task_name}: Could not find related issue. We were searching for: {issue_url}") continue issue_event_dicts.append( extract_issue_event_data(event, issue_id, platform_id, repo_id, - self.tool_source, self.tool_version, self.data_source) + self._tool_source, self._tool_version, self._data_source) ) if contributor: @@ -345,32 +177,32 @@ def __process_issue_events(self, issue_events, repo_id): contributors = remove_duplicate_dicts(contributors) - self.insert_contributors(contributors) + self._insert_contributors(contributors) - self.insert_issue_events(issue_event_dicts) + self._insert_issue_events(issue_event_dicts) - def __process_pr_events(self, pr_events, repo_id): + def _process_pr_events(self, pr_events, repo_id): pr_event_dicts = [] contributors = [] - pr_url_to_id_map = self.__get_map_from_pr_url_to_id(repo_id) + pr_url_to_id_map = self._get_map_from_pr_url_to_id(repo_id) for event in pr_events: - event, contributor = self.process_github_event_contributors(event) + event, contributor = self._process_github_event_contributors(event) pr_url = event["issue"]["pull_request"]["url"] try: pull_request_id = pr_url_to_id_map[pr_url] except KeyError: - self.logger.warning(f"{self.repo_identifier} - {self.task_name}: Could not find related pr. We were searching for: {pr_url}") + self._logger.warning(f"{self.repo_identifier} - {self.task_name}: Could not find related pr. We were searching for: {pr_url}") continue pr_event_dicts.append( extract_pr_event_data(event, pull_request_id, int(event['issue']["id"]), platform_id, repo_id, - self.tool_source, self.tool_version, self.data_source) + self._tool_source, self._tool_version, self._data_source) ) if contributor: @@ -378,11 +210,11 @@ def __process_pr_events(self, pr_events, repo_id): contributors = remove_duplicate_dicts(contributors) - self.insert_contributors(contributors) + self._insert_contributors(contributors) - self.insert_pr_events(pr_event_dicts) + self._insert_pr_events(pr_event_dicts) - def __get_map_from_pr_url_to_id(self, repo_id): + def _get_map_from_pr_url_to_id(self, repo_id): pr_url_to_id_map = {} prs = get_pull_requests_by_repo_id(repo_id) @@ -391,7 +223,7 @@ def __get_map_from_pr_url_to_id(self, repo_id): return pr_url_to_id_map - def __get_map_from_issue_url_to_id(self, repo_id): + def _get_map_from_issue_url_to_id(self, repo_id): issue_url_to_id_map = {} issues = get_issues_by_repo_id(repo_id) @@ -400,20 +232,19 @@ def __get_map_from_issue_url_to_id(self, repo_id): return issue_url_to_id_map - def __is_pr_event(self, event): + def _is_pr_event(self, event): if event["issue"] is None: raise NotMappableException("Not mappable to pr or issue") return event["issue"].get('pull_request', None) != None - class ThoroughGithubEventCollection(GithubEventCollection): def __init__(self, logger): super().__init__(logger) - def run(self, repo_git, key_auth): + def collect(self, repo_git, key_auth): repo_obj = get_repo_by_repo_git(repo_git) repo_id = repo_obj.repo_id @@ -424,10 +255,7 @@ def run(self, repo_git, key_auth): self.collect_issue_events(owner, repo, repo_id, key_auth) self.collect_pr_events(owner, repo, repo_id, key_auth) - def collect_and_process_issue_events(self, owner, repo, repo_id, key_auth): - - # define logger for task - self.logger.debug(f"Collecting github events for {owner}/{repo}") + def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth): engine = get_engine() @@ -442,7 +270,7 @@ def collect_and_process_issue_events(self, owner, repo, repo_id, key_auth): events = [] contributors = [] - github_data_access = GithubDataAccess(key_auth, self.logger) + github_data_access = GithubDataAccess(key_auth, self._logger) for db_issue in issue_result: issue = dict(db_issue) @@ -452,30 +280,27 @@ def collect_and_process_issue_events(self, owner, repo, repo_id, key_auth): for event in github_data_access.paginate_resource(event_url): - event, contributor = self.process_github_event_contributors(event) + event, contributor = self._process_github_event_contributors(event) contributors.append(contributor) events.append( extract_issue_event_data(event, issue["issue_id"], platform_id, repo_id, - self.tool_source, self.tool_version, self.data_source) + self._tool_source, self._tool_version, self._data_source) ) if len(events) > 500: - self.insert_contributors(contributors) - self.insert_issue_events(events) + self._insert_contributors(contributors) + self._insert_issue_events(events) events.clear() if events: - self.insert_contributors(contributors) - self.insert_issue_events(events) + self._insert_contributors(contributors) + self._insert_issue_events(events) events.clear() - def collect_and_process_pr_events(self, owner, repo, repo_id, key_auth): - - # define logger for task - self.logger.debug(f"Collecting github events for {owner}/{repo}") + def _collect_and_process_pr_events(self, owner, repo, repo_id, key_auth): engine = get_engine() @@ -489,7 +314,7 @@ def collect_and_process_pr_events(self, owner, repo, repo_id, key_auth): events = [] contributors = [] - github_data_access = GithubDataAccess(key_auth, self.logger) + github_data_access = GithubDataAccess(key_auth, self._logger) for db_pr in pr_result: pr = dict(db_pr) @@ -499,21 +324,21 @@ def collect_and_process_pr_events(self, owner, repo, repo_id, key_auth): for event in github_data_access.paginate_resource(event_url): - event, contributor = self.process_github_event_contributors(event) + event, contributor = self._process_github_event_contributors(event) contributors.append(contributor) events.append( extract_pr_event_data(event, pr["pull_request_id"], pr["pr_src_id"] , platform_id, repo_id, - self.tool_source, self.tool_version, self.data_source) + self._tool_source, self._tool_version, self._data_source) ) if len(events) > 500: - self.insert_contributors(contributors) - self.insert_pr_events(events) + self._insert_contributors(contributors) + self._insert_pr_events(events) events.clear() if events: - self.insert_contributors(contributors) - self.insert_pr_events(events) + self._insert_contributors(contributors) + self._insert_pr_events(events) events.clear() From a3169b06699e7868f3e4e02767ceb7e905d15122 Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Mon, 26 Aug 2024 17:22:29 -0500 Subject: [PATCH 05/10] pass params --- augur/tasks/github/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/augur/tasks/github/events.py b/augur/tasks/github/events.py index f620d23ec9..4f142e4806 100644 --- a/augur/tasks/github/events.py +++ b/augur/tasks/github/events.py @@ -33,7 +33,7 @@ def collect_events(repo_git: str): else: collection_strategy = ThoroughGithubEventCollection(logger) - collection_strategy.collect() + collection_strategy.collect(repo_git, key_auth) def bulk_events_collection_endpoint_contains_all_data(key_auth, logger, owner, repo): From d7c093ac5fdc5666ccea0e2458f2bcfb3f22d4ec Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Mon, 26 Aug 2024 17:32:25 -0500 Subject: [PATCH 06/10] call correct method name --- augur/tasks/github/events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/augur/tasks/github/events.py b/augur/tasks/github/events.py index 4f142e4806..91cf7df32a 100644 --- a/augur/tasks/github/events.py +++ b/augur/tasks/github/events.py @@ -252,8 +252,8 @@ def collect(self, repo_git, key_auth): owner, repo = get_owner_repo(repo_git) self.repo_identifier = f"{owner}/{repo}" - self.collect_issue_events(owner, repo, repo_id, key_auth) - self.collect_pr_events(owner, repo, repo_id, key_auth) + self._collect_issue_events(owner, repo, repo_id, key_auth) + self._collect_pr_events(owner, repo, repo_id, key_auth) def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth): From 36ad632c201105bfd96ae9a20193c53761de8df1 Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Mon, 26 Aug 2024 17:42:43 -0500 Subject: [PATCH 07/10] other syntax fixes --- augur/tasks/github/events.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/augur/tasks/github/events.py b/augur/tasks/github/events.py index 91cf7df32a..67d770018d 100644 --- a/augur/tasks/github/events.py +++ b/augur/tasks/github/events.py @@ -110,7 +110,7 @@ def collect(self, repo_git, key_auth): # making this a decent size since process_events retrieves all the issues and prs each time if len(events) >= 500: - self.process_events(events, repo_id) + self._process_events(events, repo_id) events.clear() if events: @@ -252,8 +252,8 @@ def collect(self, repo_git, key_auth): owner, repo = get_owner_repo(repo_git) self.repo_identifier = f"{owner}/{repo}" - self._collect_issue_events(owner, repo, repo_id, key_auth) - self._collect_pr_events(owner, repo, repo_id, key_auth) + self._collect_and_process_issue_events(owner, repo, repo_id, key_auth) + self._collect_and_process_pr_events(owner, repo, repo_id, key_auth) def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth): @@ -272,7 +272,7 @@ def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth): contributors = [] github_data_access = GithubDataAccess(key_auth, self._logger) for db_issue in issue_result: - issue = dict(db_issue) + issue = db_issue._asdict() issue_number = issue["issue_number"] @@ -316,7 +316,7 @@ def _collect_and_process_pr_events(self, owner, repo, repo_id, key_auth): contributors = [] github_data_access = GithubDataAccess(key_auth, self._logger) for db_pr in pr_result: - pr = dict(db_pr) + pr = db_pr._asdict() pr_number = pr["gh_pr_number"] From 347a2e22c3209fcae6a59c6b2ab97d825d200abe Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Mon, 26 Aug 2024 17:48:05 -0500 Subject: [PATCH 08/10] only log not mappable events when there are some --- augur/tasks/github/events.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/augur/tasks/github/events.py b/augur/tasks/github/events.py index 67d770018d..6bfb5a952f 100644 --- a/augur/tasks/github/events.py +++ b/augur/tasks/github/events.py @@ -141,7 +141,8 @@ def _process_events(self, events, repo_id): except NotMappableException: not_mappable_events.append(event) - self._logger.warning(f"{self.repo_identifier} - {self.task_name}: Unable to map these github events to an issue or pr: {not_mappable_events}") + if not_mappable_events: + self._logger.warning(f"{self.repo_identifier} - {self.task_name}: Unable to map these github events to an issue or pr: {not_mappable_events}") self._process_issue_events(issue_events, repo_id) self._process_pr_events(pr_events, repo_id) From b45b37f42abfaaa5e5d502cb2fb808dd9fbf869f Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Mon, 26 Aug 2024 17:54:02 -0500 Subject: [PATCH 09/10] only add non null contributors --- augur/tasks/github/events.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/augur/tasks/github/events.py b/augur/tasks/github/events.py index 6bfb5a952f..925a882098 100644 --- a/augur/tasks/github/events.py +++ b/augur/tasks/github/events.py @@ -283,7 +283,8 @@ def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth): event, contributor = self._process_github_event_contributors(event) - contributors.append(contributor) + if contributor: + contributors.append(contributor) events.append( extract_issue_event_data(event, issue["issue_id"], platform_id, repo_id, @@ -327,7 +328,8 @@ def _collect_and_process_pr_events(self, owner, repo, repo_id, key_auth): event, contributor = self._process_github_event_contributors(event) - contributors.append(contributor) + if contributor: + contributors.append(contributor) events.append( extract_pr_event_data(event, pr["pull_request_id"], pr["pr_src_id"] , platform_id, repo_id, From 2b5ce1aca7f83d4b17246a5ca5e9dc5b60e2e3a5 Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Tue, 27 Aug 2024 17:56:25 -0500 Subject: [PATCH 10/10] fix: add url not found catching --- augur/tasks/github/events.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/augur/tasks/github/events.py b/augur/tasks/github/events.py index 70115145e1..00789a3423 100644 --- a/augur/tasks/github/events.py +++ b/augur/tasks/github/events.py @@ -7,7 +7,7 @@ from augur.tasks.init.celery_app import celery_app as celery from augur.tasks.init.celery_app import AugurCoreRepoCollectionTask from augur.application.db.data_parse import * -from augur.tasks.github.util.github_data_access import GithubDataAccess +from augur.tasks.github.util.github_data_access import GithubDataAccess, UrlNotFoundException from augur.tasks.github.util.github_random_key_auth import GithubRandomKeyAuth from augur.tasks.github.util.util import get_owner_repo from augur.tasks.util.worker_util import remove_duplicate_dicts @@ -280,17 +280,20 @@ def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth): event_url = f"https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}/events" - for event in github_data_access.paginate_resource(event_url): + try: + for event in github_data_access.paginate_resource(event_url): - event, contributor = self._process_github_event_contributors(event) + event, contributor = self._process_github_event_contributors(event) - if contributor: - contributors.append(contributor) + if contributor: + contributors.append(contributor) - events.append( - extract_issue_event_data(event, issue["issue_id"], platform_id, repo_id, - self._tool_source, self._tool_version, self._data_source) - ) + events.append( + extract_issue_event_data(event, issue["issue_id"], platform_id, repo_id, + self._tool_source, self._tool_version, self._data_source) + ) + except UrlNotFoundException as e: + self._logger.warning(f"{self.repo_identifier}: Url not found for {event_url}") if len(events) > 500: self._insert_contributors(contributors)