Skip to content

Commit

Permalink
Allow concurrent execution of sos_run(['A', 'B']) #1167
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Peng committed Jan 9, 2019
1 parent b30f63a commit 56ee2d9
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 33 deletions.
46 changes: 30 additions & 16 deletions src/sos/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,19 +507,27 @@ def sos_run(workflow=None, targets=None, shared=None, args=None, source=None, **
raise RuntimeError(
'Executing nested workflow (action sos_run) in tasks is not supported.')

if isinstance(workflow, str):
workflow = [workflow]
elif isinstance(workflow, Sequence):
workflow = list(workflow)
elif workflow is not None:
raise ValueError('workflow has to be None, a workflow name, or a list of workflow names')

if source is None:
script = SoS_Script(env.sos_dict['__step_context__'].content,
env.sos_dict['__step_context__'].filename)
wf = script.workflow(workflow, use_default=not targets)
wfs = [script.workflow(wf, use_default=not targets) for wf in workflow]
else:
# reading workflow from another file
script = SoS_Script(filename=source)
wf = script.workflow(workflow, use_default=not targets)
wfs = [script.workflow(wf, use_default=not targets) for wf in workflow]
# if wf contains the current step or one of the previous one, this constitute
# recusive nested workflow and should not be allowed
if env.sos_dict['step_name'] in [f'{x.name}_{x.index}' for x in wf.sections]:
raise RuntimeError(
f'Nested workflow {workflow} contains the current step {env.sos_dict["step_name"]}')
for wf in wfs:
if env.sos_dict['step_name'] in [f'{x.name}_{x.index}' for x in wf.sections]:
raise RuntimeError(
f'Nested workflow {workflow} contains the current step {env.sos_dict["step_name"]}')
# args can be specified both as a dictionary or keyword arguments
if args is None:
args = kwargs
Expand Down Expand Up @@ -549,18 +557,24 @@ def sos_run(workflow=None, targets=None, shared=None, args=None, source=None, **
# really send the workflow
shared = {
x: (env.sos_dict[x] if x in env.sos_dict else None) for x in shared}
env.__socket__.send_pyobj(['workflow', uuid.uuid4(), wf, targets, args, shared, env.config])
if env.sos_dict.get('__concurrent_subworkflow__', False):
return {}

res = env.__socket__.recv_pyobj()
if res is None:
sys.exit(0)
elif isinstance(res, Exception):
raise res
else:
env.sos_dict.quick_update(res['shared'])
return res
wf_ids = [uuid.uuid4() for wf in wfs]
env.__socket__.send_pyobj(['workflow', wf_ids, wfs, targets, args, shared, env.config])

if env.sos_dict.get('__concurrent_subworkflow__', False):
return {'pending_workflows': wf_ids}

res = {}
for wf in wfs:
wf_res = env.__socket__.recv_pyobj()
res.update(wf_res)
if wf_res is None:
sys.exit(0)
elif isinstance(wf_res, Exception):
raise wf_res
else:
env.sos_dict.quick_update(wf_res['shared'])
return res
finally:
# restore step_name in case the subworkflow re-defines it
env.sos_dict.set('step_name', my_name)
Expand Down
14 changes: 9 additions & 5 deletions src/sos/step_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,10 +601,12 @@ def log(self, stage=None, msg=None):
env.logger.info(
f'output: ``{short_repr(env.sos_dict["step_output"])}``')

def execute(self, stmt):
def execute(self, stmt, return_result=False):
try:
self.last_res = SoS_exec(
stmt, return_result=self.run_mode == 'interactive')
stmt, return_result=return_result or self.run_mode == 'interactive')
if return_result:
return self.last_res
except (StopInputGroup, TerminateExecution, UnknownTarget, RemovedTarget, UnavailableLock):
raise
except subprocess.CalledProcessError as e:
Expand Down Expand Up @@ -1055,7 +1057,7 @@ def run(self):
verify_input()
if env.sos_dict.get('__concurrent_subworkflow__', False):
self._subworkflow_results.append(
self.execute(statement[1]))
self.execute(statement[1], return_result=True))
else:
self.execute(statement[1])
finally:
Expand Down Expand Up @@ -1096,7 +1098,7 @@ def run(self):
verify_input()
if env.sos_dict.get('__concurrent_subworkflow__', False):
self._subworkflow_results.append(
self.execute(statement[1]))
self.execute(statement[1], return_result=True))
else:
self.execute(statement[1])
if 'shared' in self.step.options:
Expand Down Expand Up @@ -1223,7 +1225,9 @@ def run(self):
# endfor loop for each input group
#
if self._subworkflow_results:
for swf in self._subworkflow_results:
wf_ids = sum([x['pending_workflows'] for x in self._subworkflow_results], [])
for swf in wf_ids:
# here we did not check if workflow ids match
res = env.__socket__.recv_pyobj()
if res is None:
sys.exit(0)
Expand Down
26 changes: 14 additions & 12 deletions src/sos/workflow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -964,29 +964,30 @@ def i_am():
self.step_queue[step_id] = step_params
continue
elif res[0] == 'workflow':
workflow_id, wf, targets, args, shared, config = res[1:]
workflow_ids, wfs, targets, args, shared, config = res[1:]
# receive the real definition
env.logger.debug(
f'{i_am()} receives workflow request {workflow_id}')
f'{i_am()} receives workflow request {workflow_ids}')
# a workflow needs to be executed immediately because otherwise if all workflows
# occupies all workers, no real step could be executed.

# now we would like to find a worker and
if hasattr(runnable, '_pending_workflows'):
runnable._pending_workflows.append(workflow_id)
runnable._pending_workflows.extend(workflow_ids)
else:
runnable._pending_workflows = [workflow_id]
runnable._pending_workflows = workflow_ids
runnable._status = 'workflow_pending'
dag.save(env.config['output_dag'])

wfrunnable = dummy_node()
wfrunnable._node_id = workflow_id
wfrunnable._status = 'workflow_running_pending'
dag.save(env.config['output_dag'])
wfrunnable._pending_workflows = [workflow_id]
#
manager.execute(wfrunnable, config=config, args=args,
spec=('workflow', workflow_id, wf, targets, args, shared, config))
for wid, wf in zip(workflow_ids, wfs):
wfrunnable = dummy_node()
wfrunnable._node_id = wid
wfrunnable._status = 'workflow_running_pending'
dag.save(env.config['output_dag'])
wfrunnable._pending_workflows = [wid]
#
manager.execute(wfrunnable, config=config, args=args,
spec=('workflow', wid, wf, targets, args, shared, config))
continue
else:
raise RuntimeError(
Expand Down Expand Up @@ -1049,6 +1050,7 @@ def i_am():
# else:
# self.completed['__subworkflow_completed__'] += 1
for proc in manager.procs:
# do not care about dummy processes
if proc is None:
continue
if proc.in_status('workflow_pending') and res['__workflow_id__'] in proc.step._pending_workflows:
Expand Down
26 changes: 26 additions & 0 deletions test/test_nested.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,32 @@ def testConcurrentSubWorkflow(self):
self.assertTrue(time.time() - st < 30)



def testSoSMultiWorkflow(self):
'''Test multiple workflows in sos_run '''
script = SoS_Script('''
[B]
parameter: idx=2
import time
time.sleep(idx)
[A]
import time
time.sleep(idx)
[default]
input: for_each=dict(i=range(4))
sos_run(['A', 'B'], idx=i)
''')
import time
st = time.time()
wf = script.workflow()
# this should be ok.
Base_Executor(wf, config={'max_procs': 8}).run()
self.assertTrue(time.time() - st < 20)



if __name__ == '__main__':
#suite = unittest.defaultTestLoader.loadTestsFromTestCase(TestParser)
# unittest.TextTestRunner(, suite).run()
Expand Down

0 comments on commit 56ee2d9

Please sign in to comment.