diff --git a/src/sos/actions.py b/src/sos/actions.py index cdbcef216..8c18d6933 100644 --- a/src/sos/actions.py +++ b/src/sos/actions.py @@ -546,11 +546,13 @@ def sos_run(workflow=None, targets=None, shared=None, args=None, source=None, ** if not hasattr(env, '__socket__'): raise RuntimeError('Running nested workflow without a master is not acceptable') # tell the master process to receive a workflow - env.__socket__.send_pyobj(f'workflow {uuid.uuid4()}') # really send the workflow shared = { x: (env.sos_dict[x] if x in env.sos_dict else None) for x in shared} - env.__socket__.send_pyobj((wf, targets, args, shared, env.config)) + env.__socket__.send_pyobj(['workflow', uuid.uuid4(), wf, targets, args, shared, env.config]) + if env.sos_dict.get('__concurrent_subworkflow__', False): + return {} + res = env.__socket__.recv_pyobj() if res is None: sys.exit(0) diff --git a/src/sos/step_executor.py b/src/sos/step_executor.py index 053b7450a..4179ae78e 100755 --- a/src/sos/step_executor.py +++ b/src/sos/step_executor.py @@ -273,6 +273,16 @@ def get_value_of_param(name, param_list, extra_dict={}): except Exception as e: return [eval(compile(ast.Expression(body=kwargs[0].value), filename='', mode="eval"), extra_dict)] +def is_sos_run_the_only_last_stmt(stmt): + tree = ast.parse(stmt) + return len(tree.body) >= 1 and \ + isinstance(tree.body[-1], ast.Expr) and \ + isinstance(tree.body[-1].value, ast.Call) and \ + hasattr(tree.body[-1].value.func, 'id') and \ + tree.body[-1].value.func.id == 'sos_run' and \ + len([x for x in ast.walk(tree) if isinstance(x, ast.Call) and hasattr(x.func, 'id') and x.func.id == 'sos_run']) == 1 + + class Base_Step_Executor: # This base class defines how steps are executed. The derived classes will reimplement # some function to behave differently in different modes. @@ -390,7 +400,7 @@ def process_output_group_with(self, group_with): var[vn] = vv elif isinstance(vv, (list, tuple)): if len(vv) != env.sos_dict["__num_groups__"]: - raise ValueError(f'Length of provided attributes ({len(vv)}) does not match number of input groups ({env.sos_dict["__num_groups__"]})') + raise ValueError(f'Length of provided attributes ({len(vv)}) does not match number of Substeps ({env.sos_dict["__num_groups__"]})') var[vn] = vv[env.sos_dict["_index"]] else: raise ValueError('Unacceptable variables {vv} for option group_with') @@ -854,6 +864,7 @@ def run(self): self.output_groups = [sos_targets([]) for x in self._substeps] # used to prevent overlapping output from substeps self._all_outputs = set() + self._subworkflow_results = [] if self.concurrent_substep: if len(self._substeps) <= 1 or self.run_mode == 'dryrun': @@ -862,11 +873,19 @@ def run(self): x for x in self.step.statements[input_statement_idx:] if x[0] != ':']) > 1: self.concurrent_substep = False env.logger.debug( - 'Input groups are executed sequentially because of existence of directives between statements.') + 'Substeps are executed sequentially because of existence of directives between statements.') elif any('sos_run' in x[1] for x in self.step.statements[input_statement_idx:]): self.concurrent_substep = False - env.logger.debug( - 'Input groups are executed sequentially because of existence of nested workflow.') + if 'shared' not in self.step.options and not self.step.task and \ + len([x for x in self.step.statements[input_statement_idx:] if x[0] == '!']) == 1 and \ + self.step.statements[-1][0] == '!' and \ + is_sos_run_the_only_last_stmt(self.step.statements[-1][1]): + env.sos_dict.set('__concurrent_subworkflow__', True) + env.logger.debug( + 'Running nested workflows concurrently.') + else: + env.logger.debug( + 'Substeps are executed sequentially because of existence of multiple nested workflow.') else: self.prepare_substep() @@ -1034,7 +1053,11 @@ def run(self): env.logger.trace(f'Execute substep {env.sos_dict["step_name"]} without signature') try: verify_input() - self.execute(statement[1]) + if env.sos_dict.get('__concurrent_subworkflow__', False): + self._subworkflow_results.append( + self.execute(statement[1])) + else: + self.execute(statement[1]) finally: if not self.step.task: # if no task, this step is __completed @@ -1071,7 +1094,11 @@ def run(self): sig.lock() try: verify_input() - self.execute(statement[1]) + if env.sos_dict.get('__concurrent_subworkflow__', False): + self._subworkflow_results.append( + self.execute(statement[1])) + else: + self.execute(statement[1]) if 'shared' in self.step.options: try: self.shared_vars[env.sos_dict['_index']].update({ @@ -1195,6 +1222,16 @@ def run(self): # # endfor loop for each input group # + if self._subworkflow_results: + for swf in self._subworkflow_results: + res = env.__socket__.recv_pyobj() + if res is None: + sys.exit(0) + elif isinstance(res, Exception): + raise res + env.sos_dict.pop('__concurrent_subworkflow__') + # otherwise there should be nothing interesting in subworkflow + # return value (shared is not handled) self.wait_for_results(all_submitted=True) for idx, res in enumerate(self.proc_results): @@ -1302,7 +1339,7 @@ def submit_tasks(self, tasks): else: # otherwise, use workflow default host = '__default__' - self.socket.send_pyobj(f'tasks {host} {" ".join(tasks)}') + self.socket.send_pyobj(['tasks', host] + tasks) def wait_for_tasks(self, tasks, all_submitted): if not tasks: diff --git a/src/sos/workflow_executor.py b/src/sos/workflow_executor.py index d4b548b1a..193748f90 100755 --- a/src/sos/workflow_executor.py +++ b/src/sos/workflow_executor.py @@ -927,18 +927,18 @@ def i_am(): res = proc.socket.recv_pyobj() # # if this is NOT a result, rather some request for task, step, workflow etc - if isinstance(res, str): - if res.startswith('task'): + if isinstance(res, list): + if res[0] == 'tasks': env.logger.debug( f'{i_am()} receives task request {res}') - host = res.split(' ')[1] + host = res[1] if host == '__default__': if 'default_queue' in env.config: host = env.config['default_queue'] else: host = 'localhost' try: - new_tasks = res.split(' ')[2:] + new_tasks = res[2:] runnable._host = Host(host) if hasattr(runnable, '_pending_tasks'): runnable._pending_tasks.extend(new_tasks) @@ -955,26 +955,27 @@ def i_am(): env.logger.error(e) proc.set_status('failed') continue - elif res.startswith('step'): + elif res[0] == 'step': # step sent from nested workflow - step_id = res.split(' ')[1] - step_params = proc.socket.recv_pyobj() + step_id = res[1] + step_params = res[2:] env.logger.debug( f'{i_am()} receives step request {step_id} with args {step_params[3]}') self.step_queue[step_id] = step_params continue - elif res.startswith('workflow'): - workflow_id = res.split(' ')[1] + elif res[0] == 'workflow': + workflow_id, wf, targets, args, shared, config = res[1:] # receive the real definition env.logger.debug( f'{i_am()} receives workflow request {workflow_id}') - # (wf, args, shared, config) - wf, targets, args, shared, config = proc.socket.recv_pyobj() # a workflow needs to be executed immediately because otherwise if all workflows # occupies all workers, no real step could be executed. # now we would like to find a worker and - runnable._pending_workflow = workflow_id + if hasattr(runnable, '_pending_workflows'): + runnable._pending_workflows.append(workflow_id) + else: + runnable._pending_workflows = [workflow_id] runnable._status = 'workflow_pending' dag.save(env.config['output_dag']) @@ -982,7 +983,7 @@ def i_am(): wfrunnable._node_id = workflow_id wfrunnable._status = 'workflow_running_pending' dag.save(env.config['output_dag']) - wfrunnable._pending_workflow = workflow_id + wfrunnable._pending_workflows = [workflow_id] # manager.execute(wfrunnable, config=config, args=args, spec=('workflow', workflow_id, wf, targets, args, shared, config)) @@ -990,6 +991,9 @@ def i_am(): else: raise RuntimeError( f'Unexpected value from step {short_repr(res)}') + elif isinstance(res, str): + raise RuntimeError( + f'Unexpected value from step {short_repr(res)}') # if we does get the result, we send the process to pool manager.mark_idle(idx) @@ -1020,12 +1024,12 @@ def i_am(): exec_error.append(runnable._node_id, res) # if this is a node for a running workflow, need to mark it as failed as well # for proc in procs: - if isinstance(runnable, dummy_node) and hasattr(runnable, '_pending_workflow'): + if isinstance(runnable, dummy_node) and hasattr(runnable, '_pending_workflows'): for proc in manager.procs: if proc is None: continue if proc.is_pending() and hasattr(proc.step, '_pending_workflow') \ - and proc.step._pending_workflow == runnable._pending_workflow: + and proc.step._pending_workflow in runnable._pending_workflows: proc.set_status('failed') dag.save(env.config['output_dag']) raise exec_error @@ -1047,9 +1051,11 @@ def i_am(): for proc in manager.procs: if proc is None: continue - if proc.in_status('workflow_pending') and proc.step._pending_workflow == res['__workflow_id__']: + if proc.in_status('workflow_pending') and res['__workflow_id__'] in proc.step._pending_workflows: + proc.step._pending_workflows.remove(res['__workflow_id__']) + if not proc.step._pending_workflows: + proc.set_status('running') proc.socket.send_pyobj(res) - proc.set_status('running') break dag.save(env.config['output_dag']) else: @@ -1263,37 +1269,39 @@ def i_am(): exec_error.append(runnable._node_id, res) # if this is a node for a running workflow, need to mark it as failed as well # for proc in procs: - if isinstance(runnable, dummy_node) and hasattr(runnable, '_pending_workflow'): + if isinstance(runnable, dummy_node) and hasattr(runnable, '_pending_workflows'): for proc in manager.procs: if proc is None: continue if proc.is_pending() and hasattr(proc.step, '_pending_workflow') \ - and proc.step._pending_workflow == runnable._pending_workflow: + and proc.step._pending_workflow in runnable._pending_workflows: proc.set_status('failed') dag.save(env.config['output_dag']) elif '__step_name__' in res: env.logger.debug(f'{i_am()} receive step result ') self.step_completed(res, dag, runnable) - elif '__workflow_id__' in res: - # result from a workflow - # the worker process has been returned to the pool, now we need to - # notify the step that is waiting for the result - env.logger.debug(f'{i_am()} receive workflow result') - # aggregate steps etc with subworkflows - for k, v in res['__completed__'].items(): - self.completed[k] += v - # if res['__completed__']['__step_completed__'] == 0: - # self.completed['__subworkflow_skipped__'] += 1 - # else: - # self.completed['__subworkflow_completed__'] += 1 - for proc in manager.procs: - if proc is None: - continue - if proc.in_status('workflow_pending') and proc.step._pending_workflow == res['__workflow_id__']: - proc.socket.send_pyobj(res) - proc.set_status('running') - break - dag.save(env.config['output_dag']) + # elif '__workflow_id__' in res: + # # result from a workflow + # # the worker process has been returned to the pool, now we need to + # # notify the step that is waiting for the result + # env.logger.warning(f'{i_am()} receive workflow result {res}') + # # aggregate steps etc with subworkflows + # for k, v in res['__completed__'].items(): + # self.completed[k] += v + # # if res['__completed__']['__step_completed__'] == 0: + # # self.completed['__subworkflow_skipped__'] += 1 + # # else: + # # self.completed['__subworkflow_completed__'] += 1 + # for proc in manager.procs: + # if proc is None: + # continue + # if proc.in_status('workflow_pending') and res['__workflow_id__'] in proc.step._pending_workflows: + # proc.socket.send_pyobj(res) + # proc.step._pending_workflows.remove(res['__workflow_id__']) + # if not proc.step._pending_workflows: + # proc.set_status('running') + # break + # dag.save(env.config['output_dag']) else: raise RuntimeError( f'Unrecognized response from a step: {res}') @@ -1327,12 +1335,11 @@ def i_am(): step_id = uuid.uuid4() env.logger.debug( f'{i_am()} send step {section.step_name()} to master with args {self.args} and context {runnable._context}') - parent_socket.send_pyobj(f'step {step_id}') socket = env.zmq_context.socket(zmq.PAIR) port = socket.bind_to_random_port('tcp://127.0.0.1') - parent_socket.send_pyobj((section, runnable._context, shared, self.args, - env.config, env.verbosity, port)) + parent_socket.send_pyobj(['step', step_id, section, runnable._context, shared, self.args, + env.config, env.verbosity, port]) # this is a real step manager.add_placeholder_worker(runnable, socket) diff --git a/test/test_nested.py b/test/test_nested.py index e22cd2efd..3026cc803 100644 --- a/test/test_nested.py +++ b/test/test_nested.py @@ -680,6 +680,25 @@ def testNestedFromAnotherFile(self): self.assertTrue(os.path.isfile('a.txt'), 'a.txt should have been created by nested workflow from another file') + def testConcurrentSubWorkflow(self): + '''Test concurrent subworkflow sos_run ''' + script = SoS_Script(''' +[A] +parameter: idx=0 +import time +time.sleep(5) + +[default] +input: for_each=dict(i=range(6)) +sos_run('A', idx=i) +''') + import time + st = time.time() + wf = script.workflow() + # this should be ok. + Base_Executor(wf, config={'max_procs': 8}).run() + self.assertTrue(time.time() - st < 30) + if __name__ == '__main__': #suite = unittest.defaultTestLoader.loadTestsFromTestCase(TestParser)