Skip to content

Commit

Permalink
Allow concurrent subworkflows #1167
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Peng committed Jan 9, 2019
1 parent fb48ccb commit b30f63a
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 51 deletions.
6 changes: 4 additions & 2 deletions src/sos/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,11 +546,13 @@ def sos_run(workflow=None, targets=None, shared=None, args=None, source=None, **
if not hasattr(env, '__socket__'):
raise RuntimeError('Running nested workflow without a master is not acceptable')
# tell the master process to receive a workflow
env.__socket__.send_pyobj(f'workflow {uuid.uuid4()}')
# really send the workflow
shared = {
x: (env.sos_dict[x] if x in env.sos_dict else None) for x in shared}
env.__socket__.send_pyobj((wf, targets, args, shared, env.config))
env.__socket__.send_pyobj(['workflow', uuid.uuid4(), wf, targets, args, shared, env.config])
if env.sos_dict.get('__concurrent_subworkflow__', False):
return {}

res = env.__socket__.recv_pyobj()
if res is None:
sys.exit(0)
Expand Down
51 changes: 44 additions & 7 deletions src/sos/step_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,16 @@ def get_value_of_param(name, param_list, extra_dict={}):
except Exception as e:
return [eval(compile(ast.Expression(body=kwargs[0].value), filename='<string>', mode="eval"), extra_dict)]

def is_sos_run_the_only_last_stmt(stmt):
tree = ast.parse(stmt)
return len(tree.body) >= 1 and \
isinstance(tree.body[-1], ast.Expr) and \
isinstance(tree.body[-1].value, ast.Call) and \
hasattr(tree.body[-1].value.func, 'id') and \
tree.body[-1].value.func.id == 'sos_run' and \
len([x for x in ast.walk(tree) if isinstance(x, ast.Call) and hasattr(x.func, 'id') and x.func.id == 'sos_run']) == 1


class Base_Step_Executor:
# This base class defines how steps are executed. The derived classes will reimplement
# some function to behave differently in different modes.
Expand Down Expand Up @@ -390,7 +400,7 @@ def process_output_group_with(self, group_with):
var[vn] = vv
elif isinstance(vv, (list, tuple)):
if len(vv) != env.sos_dict["__num_groups__"]:
raise ValueError(f'Length of provided attributes ({len(vv)}) does not match number of input groups ({env.sos_dict["__num_groups__"]})')
raise ValueError(f'Length of provided attributes ({len(vv)}) does not match number of Substeps ({env.sos_dict["__num_groups__"]})')
var[vn] = vv[env.sos_dict["_index"]]
else:
raise ValueError('Unacceptable variables {vv} for option group_with')
Expand Down Expand Up @@ -854,6 +864,7 @@ def run(self):
self.output_groups = [sos_targets([]) for x in self._substeps]
# used to prevent overlapping output from substeps
self._all_outputs = set()
self._subworkflow_results = []

if self.concurrent_substep:
if len(self._substeps) <= 1 or self.run_mode == 'dryrun':
Expand All @@ -862,11 +873,19 @@ def run(self):
x for x in self.step.statements[input_statement_idx:] if x[0] != ':']) > 1:
self.concurrent_substep = False
env.logger.debug(
'Input groups are executed sequentially because of existence of directives between statements.')
'Substeps are executed sequentially because of existence of directives between statements.')
elif any('sos_run' in x[1] for x in self.step.statements[input_statement_idx:]):
self.concurrent_substep = False
env.logger.debug(
'Input groups are executed sequentially because of existence of nested workflow.')
if 'shared' not in self.step.options and not self.step.task and \
len([x for x in self.step.statements[input_statement_idx:] if x[0] == '!']) == 1 and \
self.step.statements[-1][0] == '!' and \
is_sos_run_the_only_last_stmt(self.step.statements[-1][1]):
env.sos_dict.set('__concurrent_subworkflow__', True)
env.logger.debug(
'Running nested workflows concurrently.')
else:
env.logger.debug(
'Substeps are executed sequentially because of existence of multiple nested workflow.')
else:
self.prepare_substep()

Expand Down Expand Up @@ -1034,7 +1053,11 @@ def run(self):
env.logger.trace(f'Execute substep {env.sos_dict["step_name"]} without signature')
try:
verify_input()
self.execute(statement[1])
if env.sos_dict.get('__concurrent_subworkflow__', False):
self._subworkflow_results.append(
self.execute(statement[1]))
else:
self.execute(statement[1])
finally:
if not self.step.task:
# if no task, this step is __completed
Expand Down Expand Up @@ -1071,7 +1094,11 @@ def run(self):
sig.lock()
try:
verify_input()
self.execute(statement[1])
if env.sos_dict.get('__concurrent_subworkflow__', False):
self._subworkflow_results.append(
self.execute(statement[1]))
else:
self.execute(statement[1])
if 'shared' in self.step.options:
try:
self.shared_vars[env.sos_dict['_index']].update({
Expand Down Expand Up @@ -1195,6 +1222,16 @@ def run(self):
#
# endfor loop for each input group
#
if self._subworkflow_results:
for swf in self._subworkflow_results:
res = env.__socket__.recv_pyobj()
if res is None:
sys.exit(0)
elif isinstance(res, Exception):
raise res
env.sos_dict.pop('__concurrent_subworkflow__')
# otherwise there should be nothing interesting in subworkflow
# return value (shared is not handled)

self.wait_for_results(all_submitted=True)
for idx, res in enumerate(self.proc_results):
Expand Down Expand Up @@ -1302,7 +1339,7 @@ def submit_tasks(self, tasks):
else:
# otherwise, use workflow default
host = '__default__'
self.socket.send_pyobj(f'tasks {host} {" ".join(tasks)}')
self.socket.send_pyobj(['tasks', host] + tasks)

def wait_for_tasks(self, tasks, all_submitted):
if not tasks:
Expand Down
91 changes: 49 additions & 42 deletions src/sos/workflow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,18 +927,18 @@ def i_am():
res = proc.socket.recv_pyobj()
#
# if this is NOT a result, rather some request for task, step, workflow etc
if isinstance(res, str):
if res.startswith('task'):
if isinstance(res, list):
if res[0] == 'tasks':
env.logger.debug(
f'{i_am()} receives task request {res}')
host = res.split(' ')[1]
host = res[1]
if host == '__default__':
if 'default_queue' in env.config:
host = env.config['default_queue']
else:
host = 'localhost'
try:
new_tasks = res.split(' ')[2:]
new_tasks = res[2:]
runnable._host = Host(host)
if hasattr(runnable, '_pending_tasks'):
runnable._pending_tasks.extend(new_tasks)
Expand All @@ -955,41 +955,45 @@ def i_am():
env.logger.error(e)
proc.set_status('failed')
continue
elif res.startswith('step'):
elif res[0] == 'step':
# step sent from nested workflow
step_id = res.split(' ')[1]
step_params = proc.socket.recv_pyobj()
step_id = res[1]
step_params = res[2:]
env.logger.debug(
f'{i_am()} receives step request {step_id} with args {step_params[3]}')
self.step_queue[step_id] = step_params
continue
elif res.startswith('workflow'):
workflow_id = res.split(' ')[1]
elif res[0] == 'workflow':
workflow_id, wf, targets, args, shared, config = res[1:]
# receive the real definition
env.logger.debug(
f'{i_am()} receives workflow request {workflow_id}')
# (wf, args, shared, config)
wf, targets, args, shared, config = proc.socket.recv_pyobj()
# a workflow needs to be executed immediately because otherwise if all workflows
# occupies all workers, no real step could be executed.

# now we would like to find a worker and
runnable._pending_workflow = workflow_id
if hasattr(runnable, '_pending_workflows'):
runnable._pending_workflows.append(workflow_id)
else:
runnable._pending_workflows = [workflow_id]
runnable._status = 'workflow_pending'
dag.save(env.config['output_dag'])

wfrunnable = dummy_node()
wfrunnable._node_id = workflow_id
wfrunnable._status = 'workflow_running_pending'
dag.save(env.config['output_dag'])
wfrunnable._pending_workflow = workflow_id
wfrunnable._pending_workflows = [workflow_id]
#
manager.execute(wfrunnable, config=config, args=args,
spec=('workflow', workflow_id, wf, targets, args, shared, config))
continue
else:
raise RuntimeError(
f'Unexpected value from step {short_repr(res)}')
elif isinstance(res, str):
raise RuntimeError(
f'Unexpected value from step {short_repr(res)}')

# if we does get the result, we send the process to pool
manager.mark_idle(idx)
Expand Down Expand Up @@ -1020,12 +1024,12 @@ def i_am():
exec_error.append(runnable._node_id, res)
# if this is a node for a running workflow, need to mark it as failed as well
# for proc in procs:
if isinstance(runnable, dummy_node) and hasattr(runnable, '_pending_workflow'):
if isinstance(runnable, dummy_node) and hasattr(runnable, '_pending_workflows'):
for proc in manager.procs:
if proc is None:
continue
if proc.is_pending() and hasattr(proc.step, '_pending_workflow') \
and proc.step._pending_workflow == runnable._pending_workflow:
and proc.step._pending_workflow in runnable._pending_workflows:
proc.set_status('failed')
dag.save(env.config['output_dag'])
raise exec_error
Expand All @@ -1047,9 +1051,11 @@ def i_am():
for proc in manager.procs:
if proc is None:
continue
if proc.in_status('workflow_pending') and proc.step._pending_workflow == res['__workflow_id__']:
if proc.in_status('workflow_pending') and res['__workflow_id__'] in proc.step._pending_workflows:
proc.step._pending_workflows.remove(res['__workflow_id__'])
if not proc.step._pending_workflows:
proc.set_status('running')
proc.socket.send_pyobj(res)
proc.set_status('running')
break
dag.save(env.config['output_dag'])
else:
Expand Down Expand Up @@ -1263,37 +1269,39 @@ def i_am():
exec_error.append(runnable._node_id, res)
# if this is a node for a running workflow, need to mark it as failed as well
# for proc in procs:
if isinstance(runnable, dummy_node) and hasattr(runnable, '_pending_workflow'):
if isinstance(runnable, dummy_node) and hasattr(runnable, '_pending_workflows'):
for proc in manager.procs:
if proc is None:
continue
if proc.is_pending() and hasattr(proc.step, '_pending_workflow') \
and proc.step._pending_workflow == runnable._pending_workflow:
and proc.step._pending_workflow in runnable._pending_workflows:
proc.set_status('failed')
dag.save(env.config['output_dag'])
elif '__step_name__' in res:
env.logger.debug(f'{i_am()} receive step result ')
self.step_completed(res, dag, runnable)
elif '__workflow_id__' in res:
# result from a workflow
# the worker process has been returned to the pool, now we need to
# notify the step that is waiting for the result
env.logger.debug(f'{i_am()} receive workflow result')
# aggregate steps etc with subworkflows
for k, v in res['__completed__'].items():
self.completed[k] += v
# if res['__completed__']['__step_completed__'] == 0:
# self.completed['__subworkflow_skipped__'] += 1
# else:
# self.completed['__subworkflow_completed__'] += 1
for proc in manager.procs:
if proc is None:
continue
if proc.in_status('workflow_pending') and proc.step._pending_workflow == res['__workflow_id__']:
proc.socket.send_pyobj(res)
proc.set_status('running')
break
dag.save(env.config['output_dag'])
# elif '__workflow_id__' in res:
# # result from a workflow
# # the worker process has been returned to the pool, now we need to
# # notify the step that is waiting for the result
# env.logger.warning(f'{i_am()} receive workflow result {res}')
# # aggregate steps etc with subworkflows
# for k, v in res['__completed__'].items():
# self.completed[k] += v
# # if res['__completed__']['__step_completed__'] == 0:
# # self.completed['__subworkflow_skipped__'] += 1
# # else:
# # self.completed['__subworkflow_completed__'] += 1
# for proc in manager.procs:
# if proc is None:
# continue
# if proc.in_status('workflow_pending') and res['__workflow_id__'] in proc.step._pending_workflows:
# proc.socket.send_pyobj(res)
# proc.step._pending_workflows.remove(res['__workflow_id__'])
# if not proc.step._pending_workflows:
# proc.set_status('running')
# break
# dag.save(env.config['output_dag'])
else:
raise RuntimeError(
f'Unrecognized response from a step: {res}')
Expand Down Expand Up @@ -1327,12 +1335,11 @@ def i_am():
step_id = uuid.uuid4()
env.logger.debug(
f'{i_am()} send step {section.step_name()} to master with args {self.args} and context {runnable._context}')
parent_socket.send_pyobj(f'step {step_id}')

socket = env.zmq_context.socket(zmq.PAIR)
port = socket.bind_to_random_port('tcp://127.0.0.1')
parent_socket.send_pyobj((section, runnable._context, shared, self.args,
env.config, env.verbosity, port))
parent_socket.send_pyobj(['step', step_id, section, runnable._context, shared, self.args,
env.config, env.verbosity, port])
# this is a real step
manager.add_placeholder_worker(runnable, socket)

Expand Down
19 changes: 19 additions & 0 deletions test/test_nested.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,25 @@ def testNestedFromAnotherFile(self):
self.assertTrue(os.path.isfile('a.txt'),
'a.txt should have been created by nested workflow from another file')

def testConcurrentSubWorkflow(self):
'''Test concurrent subworkflow sos_run '''
script = SoS_Script('''
[A]
parameter: idx=0
import time
time.sleep(5)
[default]
input: for_each=dict(i=range(6))
sos_run('A', idx=i)
''')
import time
st = time.time()
wf = script.workflow()
# this should be ok.
Base_Executor(wf, config={'max_procs': 8}).run()
self.assertTrue(time.time() - st < 30)


if __name__ == '__main__':
#suite = unittest.defaultTestLoader.loadTestsFromTestCase(TestParser)
Expand Down

0 comments on commit b30f63a

Please sign in to comment.