diff --git a/augur/tasks/start_tasks.py b/augur/tasks/start_tasks.py index 9e5cfbb175..52a56e248a 100644 --- a/augur/tasks/start_tasks.py +++ b/augur/tasks/start_tasks.py @@ -41,7 +41,7 @@ def deserialize_task_set(dict_obj): def deploy_dependent_task(*args,task_set,bind=True): logger = logging.getLogger(deploy_dependent_task.__name__) - logger.info(f"Ids are {args}...") + logger.info(f"Ids are {args} ...") for task_id in args: prereq = AsyncResult(str(task_id)) print(prereq.status) @@ -54,6 +54,8 @@ def deploy_dependent_task(*args,task_set,bind=True): child = to_execute.apply_async(task_id=(deploy_dependent_task.request.id + "child")) + logger.info(f"Started child task with id {child.id} ...") + if bind: with allow_join_result(): child.wait()#Return when the child does so that tasks that are dependent on this dependent task can know when it is complete. @@ -156,7 +158,7 @@ def add_dependency_relationship(self,job=None,depends_on=None): self.dependency_relationships[job].append(depends_on) - def _update_dependency_relationship_with_celery_id(self,celery_id: str,dependency_name: str): + def _update_dependency_relationship_with_celery_id(self,celery_ids: [str],dependency_name: str): """One a task is ran it is assigned a uuid by celery to represent the instance that is now running. This replaces the dependency relationship to reflect a now-running task as what is actually being waited on for dependencies. Now the id can be passed to a listener. @@ -166,7 +168,8 @@ def _update_dependency_relationship_with_celery_id(self,celery_id: str,dependenc #self.dependency_relationships[group_name] = [celery_id if item == name else item for item in self.dependency_relationships[group_name]] for index,item in enumerate(self.dependency_relationships[group_name]): if item == dependency_name: - self.dependency_relationships[group_name][index] = celery_id + del self.dependency_relationships[group_name][index]# = celery_id + self.dependency_relationships[group_name].extend(celery_ids) break #break once dependency_name found. Should only occur once. @@ -176,11 +179,16 @@ def start_data_collection(self): #First, start all task groups that have no dependencies. for name, collection_set in self.jobs_dict.items(): if not len(self.dependency_relationships[name]): - self.logger.info(f"Starting non dependant collection group {name}...") + self.started_jobs.append(name) task_collection = collection_set.apply_async() - - self._update_dependency_relationship_with_celery_id(task_collection.id,name) + self.logger.info(f"Starting non dependant collection group {name} with id {task_collection.id} ...") + try: + #test if group or not + task_collection.results + self._update_dependency_relationship_with_celery_id([subtask.id for subtask in task_collection.results],name) + except AttributeError: + self._update_dependency_relationship_with_celery_id([task_collection.id], name) #Then try to go after tasks with dependencies. #'loop while there are elements of the jobs dict that haven't been started' @@ -189,12 +197,17 @@ def start_data_collection(self): #Check that task group has no dependencies that haven't been started yet and that it has not already been started. if not any(group_key in self.dependency_relationships[name] for group_key in list(self.jobs_dict.keys())) and not name in self.started_jobs: self.started_jobs.append(name) - self.logger.info(f"Starting dependant collection group {name}...") + dependent_task_collection = deploy_dependent_task.si(*self.dependency_relationships[name],task_set=self.jobs_dict[name]) result = dependent_task_collection.apply_async() + self.logger.info(f"Starting dependant collection group {name} with id {result.id} ...") print(result) - - self._update_dependency_relationship_with_celery_id(result.id,name) + try: + #First try to update with sub ids if they exist. + result.results + self._update_dependency_relationship_with_celery_id([subtask.id for subtask in result.results],name) + except AttributeError: + self._update_dependency_relationship_with_celery_id([result.id],name) #if dependency_cycle: @@ -205,25 +218,37 @@ def start_task(): logger = logging.getLogger(start_task.__name__) - logger.info(f"Collecting data for git and github") + logger.info(f"Collecting data for git and github...") with DatabaseSession(logger) as session: repos = session.query(Repo).all() - task_list = [] + #task_list = [] + augur_main_routine = AugurTaskRoutine() + + augur_main_routine['facade'] = facade_commits_model.si() - task_list += [facade_commits_model.si()] + issues_and_pr_list = [collect_issues.si(repo.repo_git) for repo in repos] + issues_and_pr_list.extend([collect_pull_requests.si(repo.repo_git) for repo in repos]) - task_list += [create_github_task_chain(repo.repo_git) for repo in repos] + augur_main_routine['collect_issues_and_pull_requests'] = group(issues_and_pr_list) - task_list += [process_contributors.si()] + augur_main_routine['collect_events'] = group([collect_events.si(repo.repo_git) for repo in repos]) + augur_main_routine['collect_issue_and_pr_comments'] = group([collect_issue_and_pr_comments.si(repo.repo_git) for repo in repos]) - task_chain = group(task_list) + augur_main_routine['process_contributors'] = process_contributors.si() - result = task_chain.apply_async() + augur_main_routine.add_dependency_relationship(job='collect_events',depends_on='collect_issues_and_pull_requests') + augur_main_routine.add_dependency_relationship(job='collect_issue_and_pr_comments',depends_on='collect_issues_and_pull_requests') + augur_main_routine.add_dependency_relationship(job='process_contributors',depends_on='collect_events') + augur_main_routine.add_dependency_relationship(job='process_contributors',depends_on='collect_issue_and_pr_comments') + augur_main_routine.logger.info(augur_main_routine.dependency_relationships) + augur_main_routine.start_data_collection() + augur_main_routine.logger.info(augur_main_routine.dependency_relationships) + print('no cycle!') # routine = AugurTaskRoutine() # routine['start'] = chain(start_tasks_group,secondary_task_group)