Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement augur routine #1961

Merged
merged 3 commits into from
Aug 26, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 41 additions & 16 deletions augur/tasks/start_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def deserialize_task_set(dict_obj):
def deploy_dependent_task(*args,task_set,bind=True):
logger = logging.getLogger(deploy_dependent_task.__name__)

logger.info(f"Ids are {args}...")
logger.info(f"Ids are {args} ...")
for task_id in args:
prereq = AsyncResult(str(task_id))
print(prereq.status)
Expand All @@ -54,6 +54,8 @@ def deploy_dependent_task(*args,task_set,bind=True):

child = to_execute.apply_async(task_id=(deploy_dependent_task.request.id + "child"))

logger.info(f"Started child task with id {child.id} ...")

if bind:
with allow_join_result():
child.wait()#Return when the child does so that tasks that are dependent on this dependent task can know when it is complete.
Expand Down Expand Up @@ -156,7 +158,7 @@ def add_dependency_relationship(self,job=None,depends_on=None):

self.dependency_relationships[job].append(depends_on)

def _update_dependency_relationship_with_celery_id(self,celery_id: str,dependency_name: str):
def _update_dependency_relationship_with_celery_id(self,celery_ids: [str],dependency_name: str):
"""One a task is ran it is assigned a uuid by celery to represent the instance that is now running.
This replaces the dependency relationship to reflect a now-running task as what is actually being
waited on for dependencies. Now the id can be passed to a listener.
Expand All @@ -166,7 +168,8 @@ def _update_dependency_relationship_with_celery_id(self,celery_id: str,dependenc
#self.dependency_relationships[group_name] = [celery_id if item == name else item for item in self.dependency_relationships[group_name]]
for index,item in enumerate(self.dependency_relationships[group_name]):
if item == dependency_name:
self.dependency_relationships[group_name][index] = celery_id
del self.dependency_relationships[group_name][index]# = celery_id
self.dependency_relationships[group_name].extend(celery_ids)
break #break once dependency_name found. Should only occur once.


Expand All @@ -176,11 +179,16 @@ def start_data_collection(self):
#First, start all task groups that have no dependencies.
for name, collection_set in self.jobs_dict.items():
if not len(self.dependency_relationships[name]):
self.logger.info(f"Starting non dependant collection group {name}...")

self.started_jobs.append(name)
task_collection = collection_set.apply_async()

self._update_dependency_relationship_with_celery_id(task_collection.id,name)
self.logger.info(f"Starting non dependant collection group {name} with id {task_collection.id} ...")
try:
#test if group or not
task_collection.results
self._update_dependency_relationship_with_celery_id([subtask.id for subtask in task_collection.results],name)
except AttributeError:
self._update_dependency_relationship_with_celery_id([task_collection.id], name)

#Then try to go after tasks with dependencies.
#'loop while there are elements of the jobs dict that haven't been started'
Expand All @@ -189,12 +197,17 @@ def start_data_collection(self):
#Check that task group has no dependencies that haven't been started yet and that it has not already been started.
if not any(group_key in self.dependency_relationships[name] for group_key in list(self.jobs_dict.keys())) and not name in self.started_jobs:
self.started_jobs.append(name)
self.logger.info(f"Starting dependant collection group {name}...")

dependent_task_collection = deploy_dependent_task.si(*self.dependency_relationships[name],task_set=self.jobs_dict[name])
result = dependent_task_collection.apply_async()
self.logger.info(f"Starting dependant collection group {name} with id {result.id} ...")
print(result)

self._update_dependency_relationship_with_celery_id(result.id,name)
try:
#First try to update with sub ids if they exist.
result.results
self._update_dependency_relationship_with_celery_id([subtask.id for subtask in result.results],name)
except AttributeError:
self._update_dependency_relationship_with_celery_id([result.id],name)


#if dependency_cycle:
Expand All @@ -205,25 +218,37 @@ def start_task():

logger = logging.getLogger(start_task.__name__)

logger.info(f"Collecting data for git and github")
logger.info(f"Collecting data for git and github...")

with DatabaseSession(logger) as session:

repos = session.query(Repo).all()

task_list = []
#task_list = []
augur_main_routine = AugurTaskRoutine()

augur_main_routine['facade'] = facade_commits_model.si()

task_list += [facade_commits_model.si()]
issues_and_pr_list = [collect_issues.si(repo.repo_git) for repo in repos]
issues_and_pr_list.extend([collect_pull_requests.si(repo.repo_git) for repo in repos])

task_list += [create_github_task_chain(repo.repo_git) for repo in repos]
augur_main_routine['collect_issues_and_pull_requests'] = group(issues_and_pr_list)

task_list += [process_contributors.si()]
augur_main_routine['collect_events'] = group([collect_events.si(repo.repo_git) for repo in repos])
augur_main_routine['collect_issue_and_pr_comments'] = group([collect_issue_and_pr_comments.si(repo.repo_git) for repo in repos])

task_chain = group(task_list)
augur_main_routine['process_contributors'] = process_contributors.si()


result = task_chain.apply_async()
augur_main_routine.add_dependency_relationship(job='collect_events',depends_on='collect_issues_and_pull_requests')
augur_main_routine.add_dependency_relationship(job='collect_issue_and_pr_comments',depends_on='collect_issues_and_pull_requests')
augur_main_routine.add_dependency_relationship(job='process_contributors',depends_on='collect_events')
augur_main_routine.add_dependency_relationship(job='process_contributors',depends_on='collect_issue_and_pr_comments')

augur_main_routine.logger.info(augur_main_routine.dependency_relationships)
augur_main_routine.start_data_collection()
augur_main_routine.logger.info(augur_main_routine.dependency_relationships)
print('no cycle!')

# routine = AugurTaskRoutine()
# routine['start'] = chain(start_tasks_group,secondary_task_group)
Expand Down