From 56ee2d9a6f9533d8ce667e564ecf01d23906c87f Mon Sep 17 00:00:00 2001 From: Bo Peng Date: Wed, 9 Jan 2019 16:40:30 -0600 Subject: [PATCH] Allow concurrent execution of sos_run(['A', 'B']) #1167 --- src/sos/actions.py | 46 +++++++++++++++++++++++------------- src/sos/step_executor.py | 14 +++++++---- src/sos/workflow_executor.py | 26 ++++++++++---------- test/test_nested.py | 26 ++++++++++++++++++++ 4 files changed, 79 insertions(+), 33 deletions(-) diff --git a/src/sos/actions.py b/src/sos/actions.py index 8c18d6933..dc2889af3 100644 --- a/src/sos/actions.py +++ b/src/sos/actions.py @@ -507,19 +507,27 @@ def sos_run(workflow=None, targets=None, shared=None, args=None, source=None, ** raise RuntimeError( 'Executing nested workflow (action sos_run) in tasks is not supported.') + if isinstance(workflow, str): + workflow = [workflow] + elif isinstance(workflow, Sequence): + workflow = list(workflow) + elif workflow is not None: + raise ValueError('workflow has to be None, a workflow name, or a list of workflow names') + if source is None: script = SoS_Script(env.sos_dict['__step_context__'].content, env.sos_dict['__step_context__'].filename) - wf = script.workflow(workflow, use_default=not targets) + wfs = [script.workflow(wf, use_default=not targets) for wf in workflow] else: # reading workflow from another file script = SoS_Script(filename=source) - wf = script.workflow(workflow, use_default=not targets) + wfs = [script.workflow(wf, use_default=not targets) for wf in workflow] # if wf contains the current step or one of the previous one, this constitute # recusive nested workflow and should not be allowed - if env.sos_dict['step_name'] in [f'{x.name}_{x.index}' for x in wf.sections]: - raise RuntimeError( - f'Nested workflow {workflow} contains the current step {env.sos_dict["step_name"]}') + for wf in wfs: + if env.sos_dict['step_name'] in [f'{x.name}_{x.index}' for x in wf.sections]: + raise RuntimeError( + f'Nested workflow {workflow} contains the current step {env.sos_dict["step_name"]}') # args can be specified both as a dictionary or keyword arguments if args is None: args = kwargs @@ -549,18 +557,24 @@ def sos_run(workflow=None, targets=None, shared=None, args=None, source=None, ** # really send the workflow shared = { x: (env.sos_dict[x] if x in env.sos_dict else None) for x in shared} - env.__socket__.send_pyobj(['workflow', uuid.uuid4(), wf, targets, args, shared, env.config]) - if env.sos_dict.get('__concurrent_subworkflow__', False): - return {} - res = env.__socket__.recv_pyobj() - if res is None: - sys.exit(0) - elif isinstance(res, Exception): - raise res - else: - env.sos_dict.quick_update(res['shared']) - return res + wf_ids = [uuid.uuid4() for wf in wfs] + env.__socket__.send_pyobj(['workflow', wf_ids, wfs, targets, args, shared, env.config]) + + if env.sos_dict.get('__concurrent_subworkflow__', False): + return {'pending_workflows': wf_ids} + + res = {} + for wf in wfs: + wf_res = env.__socket__.recv_pyobj() + res.update(wf_res) + if wf_res is None: + sys.exit(0) + elif isinstance(wf_res, Exception): + raise wf_res + else: + env.sos_dict.quick_update(wf_res['shared']) + return res finally: # restore step_name in case the subworkflow re-defines it env.sos_dict.set('step_name', my_name) diff --git a/src/sos/step_executor.py b/src/sos/step_executor.py index 4179ae78e..8010de1f3 100755 --- a/src/sos/step_executor.py +++ b/src/sos/step_executor.py @@ -601,10 +601,12 @@ def log(self, stage=None, msg=None): env.logger.info( f'output: ``{short_repr(env.sos_dict["step_output"])}``') - def execute(self, stmt): + def execute(self, stmt, return_result=False): try: self.last_res = SoS_exec( - stmt, return_result=self.run_mode == 'interactive') + stmt, return_result=return_result or self.run_mode == 'interactive') + if return_result: + return self.last_res except (StopInputGroup, TerminateExecution, UnknownTarget, RemovedTarget, UnavailableLock): raise except subprocess.CalledProcessError as e: @@ -1055,7 +1057,7 @@ def run(self): verify_input() if env.sos_dict.get('__concurrent_subworkflow__', False): self._subworkflow_results.append( - self.execute(statement[1])) + self.execute(statement[1], return_result=True)) else: self.execute(statement[1]) finally: @@ -1096,7 +1098,7 @@ def run(self): verify_input() if env.sos_dict.get('__concurrent_subworkflow__', False): self._subworkflow_results.append( - self.execute(statement[1])) + self.execute(statement[1], return_result=True)) else: self.execute(statement[1]) if 'shared' in self.step.options: @@ -1223,7 +1225,9 @@ def run(self): # endfor loop for each input group # if self._subworkflow_results: - for swf in self._subworkflow_results: + wf_ids = sum([x['pending_workflows'] for x in self._subworkflow_results], []) + for swf in wf_ids: + # here we did not check if workflow ids match res = env.__socket__.recv_pyobj() if res is None: sys.exit(0) diff --git a/src/sos/workflow_executor.py b/src/sos/workflow_executor.py index 193748f90..dffb82f6a 100755 --- a/src/sos/workflow_executor.py +++ b/src/sos/workflow_executor.py @@ -964,29 +964,30 @@ def i_am(): self.step_queue[step_id] = step_params continue elif res[0] == 'workflow': - workflow_id, wf, targets, args, shared, config = res[1:] + workflow_ids, wfs, targets, args, shared, config = res[1:] # receive the real definition env.logger.debug( - f'{i_am()} receives workflow request {workflow_id}') + f'{i_am()} receives workflow request {workflow_ids}') # a workflow needs to be executed immediately because otherwise if all workflows # occupies all workers, no real step could be executed. # now we would like to find a worker and if hasattr(runnable, '_pending_workflows'): - runnable._pending_workflows.append(workflow_id) + runnable._pending_workflows.extend(workflow_ids) else: - runnable._pending_workflows = [workflow_id] + runnable._pending_workflows = workflow_ids runnable._status = 'workflow_pending' dag.save(env.config['output_dag']) - wfrunnable = dummy_node() - wfrunnable._node_id = workflow_id - wfrunnable._status = 'workflow_running_pending' - dag.save(env.config['output_dag']) - wfrunnable._pending_workflows = [workflow_id] - # - manager.execute(wfrunnable, config=config, args=args, - spec=('workflow', workflow_id, wf, targets, args, shared, config)) + for wid, wf in zip(workflow_ids, wfs): + wfrunnable = dummy_node() + wfrunnable._node_id = wid + wfrunnable._status = 'workflow_running_pending' + dag.save(env.config['output_dag']) + wfrunnable._pending_workflows = [wid] + # + manager.execute(wfrunnable, config=config, args=args, + spec=('workflow', wid, wf, targets, args, shared, config)) continue else: raise RuntimeError( @@ -1049,6 +1050,7 @@ def i_am(): # else: # self.completed['__subworkflow_completed__'] += 1 for proc in manager.procs: + # do not care about dummy processes if proc is None: continue if proc.in_status('workflow_pending') and res['__workflow_id__'] in proc.step._pending_workflows: diff --git a/test/test_nested.py b/test/test_nested.py index 3026cc803..b202198bd 100644 --- a/test/test_nested.py +++ b/test/test_nested.py @@ -700,6 +700,32 @@ def testConcurrentSubWorkflow(self): self.assertTrue(time.time() - st < 30) + + def testSoSMultiWorkflow(self): + '''Test multiple workflows in sos_run ''' + script = SoS_Script(''' +[B] +parameter: idx=2 +import time +time.sleep(idx) + +[A] +import time +time.sleep(idx) + +[default] +input: for_each=dict(i=range(4)) +sos_run(['A', 'B'], idx=i) +''') + import time + st = time.time() + wf = script.workflow() + # this should be ok. + Base_Executor(wf, config={'max_procs': 8}).run() + self.assertTrue(time.time() - st < 20) + + + if __name__ == '__main__': #suite = unittest.defaultTestLoader.loadTestsFromTestCase(TestParser) # unittest.TextTestRunner(, suite).run()