From 74cf2f298fabd1dbf7b96271df87b0fdbd319fdb Mon Sep 17 00:00:00 2001 From: azawlocki Date: Fri, 7 May 2021 13:22:53 +0200 Subject: [PATCH 1/7] Make executor return results (as awaitable) to the worker function --- yapapi/executor/__init__.py | 75 +++++++++++++++++++++++++++++++------ yapapi/executor/ctx.py | 39 +++++++++++++++++-- 2 files changed, 100 insertions(+), 14 deletions(-) diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index 01f389f6a..b1569c83e 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -387,7 +387,10 @@ async def _find_offers(self, state: "Executor.SubmissionState") -> None: async def _submit( self, - worker: Callable[[WorkContext, AsyncIterator[Task[D, R]]], AsyncGenerator[Work, None]], + worker: Callable[ + [WorkContext, AsyncIterator[Task[D, R]]], + AsyncGenerator[Work, asyncio.Task] + ], data: Union[AsyncIterator[Task[D, R]], Iterable[Task[D, R]]], services: Set[asyncio.Task], workers: Set[asyncio.Task], @@ -555,7 +558,13 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> work_context, (Task.for_handle(handle, work_queue, emit) async for handle in consumer), ) - async for batch in command_generator: + + try: + batch: Optional[Work] = await command_generator.__anext__() + except StopAsyncIteration: + batch = None + + while batch: batch_deadline = ( datetime.now(timezone.utc) + batch.timeout if batch.timeout else None ) @@ -573,22 +582,66 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> await batch.prepare() cc = CommandContainer() batch.register(cc) + + import colors + + print(colors.yellow("[executor] Batch:"), cc.commands()) + remote = await act.send( cc.commands(), stream_output, deadline=batch_deadline ) cmds = cc.commands() emit(events.ScriptSent(agr_id=agreement.id, task_id=task_id, cmds=cmds)) - async for evt_ctx in remote: - evt = evt_ctx.event(agr_id=agreement.id, task_id=task_id, cmds=cmds) - emit(evt) - if isinstance(evt, events.CommandExecuted) and not evt.success: - raise CommandExecutionError(evt.command, evt.message) + async def get_batch_results(): + + print(colors.yellow("[get_batch_results] Waiting for results...")) - emit(events.GettingResults(agr_id=agreement.id, task_id=task_id)) - await batch.post() - emit(events.ScriptFinished(agr_id=agreement.id, task_id=task_id)) - await accept_payment_for_agreement(agreement.id, partial=True) + results = [] + async for evt_ctx in remote: + evt = evt_ctx.event( + agr_id=agreement.id, task_id=task_id, cmds=cmds + ) + emit(evt) + results.append(evt) + if isinstance(evt, events.CommandExecuted) and not evt.success: + raise CommandExecutionError(evt.command, evt.message) + + emit(events.GettingResults(agr_id=agreement.id, task_id=task_id)) + await batch.post() + emit(events.ScriptFinished(agr_id=agreement.id, task_id=task_id)) + await accept_payment_for_agreement(agreement.id, partial=True) + + print(colors.yellow("[get_batch_results] Got results")) + return results + + if batch.wait_for_results or batch.contains_init_step: + # Block until the results are available + print( + colors.yellow( + "[executor] Blocking until results are available..." + ) + ) + results = await get_batch_results() + print(colors.yellow("[executor] Returning results")) + + async def make_awaitable(): + return results + + batch = await command_generator.asend(make_awaitable()) + else: + # Schedule the coroutine in a separate asyncio task + print(colors.yellow("[executor] Scheduling getting the results...")) + loop = asyncio.get_event_loop() + results_task = loop.create_task(get_batch_results()) + print(colors.yellow("[executor] Returning results task")) + batch = await command_generator.asend(results_task) + + print(colors.yellow("[executor] After asend()")) + + except StopAsyncIteration: + print(colors.yellow("[executor] Got StopAsyncIteration")) + break except Exception: diff --git a/yapapi/executor/ctx.py b/yapapi/executor/ctx.py index 777210957..0813f1c72 100644 --- a/yapapi/executor/ctx.py +++ b/yapapi/executor/ctx.py @@ -50,6 +50,16 @@ def timeout(self) -> Optional[timedelta]: """Return the optional timeout set for execution of this work.""" return None + @property + def wait_for_results(self) -> bool: + """Return `True` iff yielding this work item should block until the results are returned.""" + return True + + @property + def contains_init_step(self) -> bool: + """Return `True` iff this work item contains the initialization step.""" + return False + class _InitStep(Work): def register(self, commands: CommandContainer): @@ -227,15 +237,35 @@ async def __on_json_download(on_download: Callable[[bytes], Awaitable], content: class Steps(Work): - def __init__(self, *steps: Work, timeout: Optional[timedelta] = None): + def __init__( + self, *steps: Work, timeout: Optional[timedelta] = None, wait_for_results: bool = True + ): + """Create a `Work` item consisting of a sequence of steps (subitems). + + :param steps: sequence of steps to be executed + :param timeout: timeout for waiting for the steps' results + :param wait_for_results: whether yielding this work item should block + until the results are available + """ self._steps: Tuple[Work, ...] = steps self._timeout: Optional[timedelta] = timeout + self._wait_for_results = wait_for_results @property def timeout(self) -> Optional[timedelta]: """Return the optional timeout set for execution of all steps.""" return self._timeout + @property + def wait_for_results(self) -> bool: + """Return `True` iff yielding these steps should block until the results are available.""" + return self._wait_for_results + + @property + def contains_init_step(self) -> bool: + """Return `True` iff the steps include an initialization step.""" + return any(isinstance(step, _InitStep) for step in self._steps) + async def prepare(self): """Execute the `prepare` hook for all the defined steps.""" for step in self._steps: @@ -378,7 +408,7 @@ def download_json( _ReceiveJson(self._storage, src_path, on_download, limit, self._emitter) ) - def commit(self, timeout: Optional[timedelta] = None) -> Work: + def commit(self, timeout: Optional[timedelta] = None, wait_for_results: bool = True) -> Work: """Creates a sequence of commands to be sent to provider. :return: Work object containing the sequence of commands @@ -386,7 +416,10 @@ def commit(self, timeout: Optional[timedelta] = None) -> Work: """ steps = self._pending_steps self._pending_steps = [] - return Steps(*steps, timeout=timeout) + return Steps(*steps, timeout=timeout, wait_for_results=wait_for_results) + + def commit_async(self) -> Work: + return self.commit(wait_for_results=False) class CaptureMode(enum.Enum): From 5131dcf4e0715b48c50a592d5cd431a6dc91158e Mon Sep 17 00:00:00 2001 From: azawlocki Date: Fri, 7 May 2021 13:23:33 +0200 Subject: [PATCH 2/7] Add variant of blender example that awaits batch results --- examples/blender/blender-async-results.py | 174 ++++++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100755 examples/blender/blender-async-results.py diff --git a/examples/blender/blender-async-results.py b/examples/blender/blender-async-results.py new file mode 100755 index 000000000..a5e32b754 --- /dev/null +++ b/examples/blender/blender-async-results.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +import asyncio +from datetime import datetime, timedelta +import pathlib +import sys + +from yapapi import ( + Executor, + NoPaymentAccountError, + Task, + __version__ as yapapi_version, + WorkContext, + windows_event_loop_fix, +) +from yapapi.log import enable_default_logger, log_summary, log_event_repr # noqa +from yapapi.package import vm +from yapapi.rest.activity import BatchTimeoutError + +examples_dir = pathlib.Path(__file__).resolve().parent.parent +sys.path.append(str(examples_dir)) + +from utils import ( + build_parser, + TEXT_COLOR_CYAN, + TEXT_COLOR_DEFAULT, + TEXT_COLOR_RED, + TEXT_COLOR_YELLOW, +) + + +async def main(subnet_tag, driver=None, network=None): + package = await vm.repo( + image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae", + min_mem_gib=0.5, + min_storage_gib=2.0, + ) + + async def worker(ctx: WorkContext, tasks): + script_dir = pathlib.Path(__file__).resolve().parent + scene_path = str(script_dir / "cubes.blend") + ctx.send_file(scene_path, "/golem/resource/scene.blend") + async for task in tasks: + frame = task.data + crops = [{"outfilebasename": "out", "borders_x": [0.0, 1.0], "borders_y": [0.0, 1.0]}] + ctx.send_json( + "/golem/work/params.json", + { + "scene_file": "/golem/resource/scene.blend", + "resolution": (400, 300), + "use_compositing": False, + "crops": crops, + "samples": 100, + "frames": [frame], + "output_format": "PNG", + "RESOURCES_DIR": "/golem/resources", + "WORK_DIR": "/golem/work", + "OUTPUT_DIR": "/golem/output", + }, + ) + ctx.run("/golem/entrypoints/run-blender.sh") + output_file = f"output_{frame}.png" + ctx.download_file(f"/golem/output/out{frame:04d}.png", output_file) + + future_results = yield ctx.commit(wait_for_results=False) + # equivalent to: + # future_results = yield ctx.commit_async() + # print("[Worker]: got future_results: ", future_results) + print("Maybe doing some work before looking at the results...") + await asyncio.sleep(1) + results = await future_results + print(f"Got results for {len(results)} commands") + task.accept_result(result=output_file) + + # Iterator over the frame indices that we want to render + frames: range = range(0, 60, 10) + # Worst-case overhead, in minutes, for initialization (negotiation, file transfer etc.) + # TODO: make this dynamic, e.g. depending on the size of files to transfer + init_overhead = 3 + # Providers will not accept work if the timeout is outside of the [5 min, 30min] range. + # We increase the lower bound to 6 min to account for the time needed for our demand to + # reach the providers. + min_timeout, max_timeout = 6, 30 + + timeout = timedelta(minutes=max(min(init_overhead + len(frames) * 2, max_timeout), min_timeout)) + + # By passing `event_consumer=log_summary()` we enable summary logging. + # See the documentation of the `yapapi.log` module on how to set + # the level of detail and format of the logged information. + async with Executor( + package=package, + max_workers=3, + budget=10.0, + timeout=timeout, + subnet_tag=subnet_tag, + driver=driver, + network=network, + event_consumer=log_summary(log_event_repr), + ) as executor: + + print( + f"yapapi version: {TEXT_COLOR_YELLOW}{yapapi_version}{TEXT_COLOR_DEFAULT}\n" + f"Using subnet: {TEXT_COLOR_YELLOW}{subnet_tag}{TEXT_COLOR_DEFAULT}, " + f"payment driver: {TEXT_COLOR_YELLOW}{executor.driver}{TEXT_COLOR_DEFAULT}, " + f"and network: {TEXT_COLOR_YELLOW}{executor.network}{TEXT_COLOR_DEFAULT}\n" + ) + + num_tasks = 0 + start_time = datetime.now() + + async for task in executor.submit(worker, [Task(data=frame) for frame in frames]): + num_tasks += 1 + print( + f"{TEXT_COLOR_CYAN}" + f"Task computed: {task}, result: {task.result}, time: {task.running_time}" + f"{TEXT_COLOR_DEFAULT}" + ) + + print( + f"{TEXT_COLOR_CYAN}" + f"{num_tasks} tasks computed, total time: {datetime.now() - start_time}" + f"{TEXT_COLOR_DEFAULT}" + ) + + +if __name__ == "__main__": + parser = build_parser("Render a Blender scene") + now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S") + parser.set_defaults(log_file=f"blender-yapapi-{now}.log") + args = parser.parse_args() + + # This is only required when running on Windows with Python prior to 3.8: + windows_event_loop_fix() + + enable_default_logger( + log_file=args.log_file, + debug_activity_api=True, + debug_market_api=True, + debug_payment_api=True, + ) + + loop = asyncio.get_event_loop() + task = loop.create_task( + main(subnet_tag=args.subnet_tag, driver=args.driver, network=args.network) + ) + + try: + loop.run_until_complete(task) + except NoPaymentAccountError as e: + handbook_url = ( + "https://handbook.golem.network/requestor-tutorials/" + "flash-tutorial-of-requestor-development" + ) + print( + f"{TEXT_COLOR_RED}" + f"No payment account initialized for driver `{e.required_driver}` " + f"and network `{e.required_network}`.\n\n" + f"See {handbook_url} on how to initialize payment accounts for a requestor node." + f"{TEXT_COLOR_DEFAULT}" + ) + except KeyboardInterrupt: + print( + f"{TEXT_COLOR_YELLOW}" + "Shutting down gracefully, please wait a short while " + "or press Ctrl+C to exit immediately..." + f"{TEXT_COLOR_DEFAULT}" + ) + task.cancel() + try: + loop.run_until_complete(task) + print( + f"{TEXT_COLOR_YELLOW}Shutdown completed, thank you for waiting!{TEXT_COLOR_DEFAULT}" + ) + except (asyncio.CancelledError, KeyboardInterrupt): + pass From ef058817eab179fee2f5e49c0e9c3ff7c3a72607 Mon Sep 17 00:00:00 2001 From: azawlocki Date: Mon, 10 May 2021 09:19:59 +0200 Subject: [PATCH 3/7] Fix issues (mypy, coroutine not awaited) --- yapapi/executor/__init__.py | 47 ++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index b1569c83e..656fd689b 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -12,6 +12,7 @@ from typing import ( AsyncContextManager, AsyncIterator, + Awaitable, Callable, Dict, Iterable, @@ -205,7 +206,10 @@ def strategy(self) -> MarketStrategy: async def submit( self, - worker: Callable[[WorkContext, AsyncIterator[Task[D, R]]], AsyncGenerator[Work, None]], + worker: Callable[ + [WorkContext, AsyncIterator[Task[D, R]]], + AsyncGenerator[Work, Awaitable[List[events.CommandEvent]]], + ], data: Union[AsyncIterator[Task[D, R]], Iterable[Task[D, R]]], ) -> AsyncIterator[Task[D, R]]: """Submit a computation to be executed on providers. @@ -389,7 +393,7 @@ async def _submit( self, worker: Callable[ [WorkContext, AsyncIterator[Task[D, R]]], - AsyncGenerator[Work, asyncio.Task] + AsyncGenerator[Work, Awaitable[List[events.CommandEvent]]], ], data: Union[AsyncIterator[Task[D, R]], Iterable[Task[D, R]]], services: Set[asyncio.Task], @@ -530,6 +534,9 @@ async def accept_payment_for_agreement(agreement_id: str, *, partial: bool = Fal storage_manager = await self._stack.enter_async_context(gftp.provider()) async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> None: + def log(msg): + logger.info("\033[33m%s\033[m", msg) + nonlocal last_wid wid = last_wid last_wid += 1 @@ -583,9 +590,7 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> cc = CommandContainer() batch.register(cc) - import colors - - print(colors.yellow("[executor] Batch:"), cc.commands()) + log(f"Batch: {len(cc.commands())} commands") remote = await act.send( cc.commands(), stream_output, deadline=batch_deadline @@ -593,9 +598,9 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> cmds = cc.commands() emit(events.ScriptSent(agr_id=agreement.id, task_id=task_id, cmds=cmds)) - async def get_batch_results(): + async def get_batch_results() -> List[events.CommandEvent]: - print(colors.yellow("[get_batch_results] Waiting for results...")) + log("[get_batch_results] Waiting for results...") results = [] async for evt_ctx in remote: @@ -608,39 +613,33 @@ async def get_batch_results(): raise CommandExecutionError(evt.command, evt.message) emit(events.GettingResults(agr_id=agreement.id, task_id=task_id)) + assert batch await batch.post() emit(events.ScriptFinished(agr_id=agreement.id, task_id=task_id)) await accept_payment_for_agreement(agreement.id, partial=True) - print(colors.yellow("[get_batch_results] Got results")) + log("[get_batch_results] Got results") return results + loop = asyncio.get_event_loop() if batch.wait_for_results or batch.contains_init_step: # Block until the results are available - print( - colors.yellow( - "[executor] Blocking until results are available..." - ) - ) + log("Blocking until batch results are available...") results = await get_batch_results() - print(colors.yellow("[executor] Returning results")) - - async def make_awaitable(): - return results - - batch = await command_generator.asend(make_awaitable()) + log("Returning batch results") + future_results = loop.create_future() + future_results.set_result(results) + batch = await command_generator.asend(future_results) else: # Schedule the coroutine in a separate asyncio task - print(colors.yellow("[executor] Scheduling getting the results...")) - loop = asyncio.get_event_loop() + log("Scheduling getting batch results...") results_task = loop.create_task(get_batch_results()) - print(colors.yellow("[executor] Returning results task")) + log("Returning results task") batch = await command_generator.asend(results_task) - print(colors.yellow("[executor] After asend()")) + log("Awaitable sent to worker") except StopAsyncIteration: - print(colors.yellow("[executor] Got StopAsyncIteration")) break except Exception: From 733d181e0fe858ca5ccb45052e24cdac87325f01 Mon Sep 17 00:00:00 2001 From: azawlocki Date: Mon, 10 May 2021 17:39:43 +0200 Subject: [PATCH 4/7] Optionally yield options together with a batch from a worker function --- examples/blender/blender-async-results.py | 9 +- yapapi/executor/__init__.py | 221 ++++++++++++---------- yapapi/executor/ctx.py | 34 ++-- 3 files changed, 141 insertions(+), 123 deletions(-) diff --git a/examples/blender/blender-async-results.py b/examples/blender/blender-async-results.py index a5e32b754..f11baa2a8 100755 --- a/examples/blender/blender-async-results.py +++ b/examples/blender/blender-async-results.py @@ -9,9 +9,9 @@ NoPaymentAccountError, Task, __version__ as yapapi_version, - WorkContext, windows_event_loop_fix, ) +from yapapi.executor.ctx import ExecOptions, WorkContext from yapapi.log import enable_default_logger, log_summary, log_event_repr # noqa from yapapi.package import vm from yapapi.rest.activity import BatchTimeoutError @@ -61,10 +61,9 @@ async def worker(ctx: WorkContext, tasks): output_file = f"output_{frame}.png" ctx.download_file(f"/golem/output/out{frame:04d}.png", output_file) - future_results = yield ctx.commit(wait_for_results=False) - # equivalent to: - # future_results = yield ctx.commit_async() - # print("[Worker]: got future_results: ", future_results) + batch = ctx.commit() + future_results = yield (batch, ExecOptions(wait_for_results=False)) + print("Maybe doing some work before looking at the results...") await asyncio.sleep(1) results = await future_results diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index 656fd689b..f75da26d8 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -19,6 +19,7 @@ List, Optional, Set, + Tuple, TypeVar, Union, cast, @@ -30,7 +31,7 @@ from yapapi.executor.agreements_pool import AgreementsPool from typing_extensions import Final, AsyncGenerator -from .ctx import CaptureContext, CommandContainer, Work, WorkContext +from .ctx import CaptureContext, CommandContainer, ExecOptions, Work, WorkContext from .events import Event from . import events from .task import Task, TaskStatus @@ -42,7 +43,7 @@ from ..rest.activity import CommandExecutionError from ..rest.market import OfferProposal, Subscription from ..storage import gftp -from ._smartq import SmartQueue, Handle +from ._smartq import Consumer, Handle, SmartQueue from .strategy import ( DecreaseScoreForUnconfirmedAgreement, LeastExpensiveLinearPayuMS, @@ -100,6 +101,10 @@ class _ExecutorConfig: traceback: bool = bool(os.getenv("YAPAPI_TRACEBACK", 0)) +WorkItem = Union[Work, Tuple[Work, ExecOptions]] +"""The type of items yielded by a generator created by the `worker` function supplied by user.""" + + D = TypeVar("D") # Type var for task data R = TypeVar("R") # Type var for task result @@ -533,10 +538,111 @@ async def accept_payment_for_agreement(agreement_id: str, *, partial: bool = Fal storage_manager = await self._stack.enter_async_context(gftp.provider()) - async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> None: + def unpack_work_item(item: WorkItem) -> Tuple[Work, ExecOptions]: + """Extract `Work` object and options from a work item. + If the item does not specify options, default ones are provided. + """ + if isinstance(item, tuple): + return item[0], item[1] + else: + return item, ExecOptions() + + async def process_batches( + agreement: rest.market.Agreement, + act: rest.activity.Activity, + work_context: WorkContext, + consumer: Consumer[Task[D, R]], + ) -> None: def log(msg): logger.info("\033[33m%s\033[m", msg) + command_generator = worker( + work_context, + (Task.for_handle(handle, work_queue, emit) async for handle in consumer), + ) + + try: + item = await command_generator.__anext__() + except StopAsyncIteration: + return + + batch, exec_options = unpack_work_item(item) + + while True: + + batch_deadline = ( + datetime.now(timezone.utc) + batch.timeout if batch.timeout else None + ) + try: + current_worker_task = consumer.last_item + if current_worker_task: + emit( + events.TaskStarted( + agr_id=agreement.id, + task_id=current_worker_task.id, + task_data=current_worker_task.data, + ) + ) + task_id = current_worker_task.id if current_worker_task else None + await batch.prepare() + cc = CommandContainer() + batch.register(cc) + + log(f"Batch: {len(cc.commands())} commands") + + remote = await act.send(cc.commands(), stream_output, deadline=batch_deadline) + cmds = cc.commands() + emit(events.ScriptSent(agr_id=agreement.id, task_id=task_id, cmds=cmds)) + + async def get_batch_results() -> List[events.CommandEvent]: + log("[get_batch_results] Waiting for results...") + + results = [] + async for evt_ctx in remote: + evt = evt_ctx.event(agr_id=agreement.id, task_id=task_id, cmds=cmds) + emit(evt) + results.append(evt) + if isinstance(evt, events.CommandExecuted) and not evt.success: + raise CommandExecutionError(evt.command, evt.message) + + emit(events.GettingResults(agr_id=agreement.id, task_id=task_id)) + await batch.post() + emit(events.ScriptFinished(agr_id=agreement.id, task_id=task_id)) + await accept_payment_for_agreement(agreement.id, partial=True) + log("[get_batch_results] Got results") + return results + + loop = asyncio.get_event_loop() + + if exec_options.wait_for_results: + # Block until the results are available + log("Blocking until batch results are available...") + results = await get_batch_results() + log("Returning batch results") + future_results = loop.create_future() + future_results.set_result(results) + else: + # Schedule the coroutine in a separate asyncio task + log("Scheduling getting batch results...") + future_results = loop.create_task(get_batch_results()) + log("Returning results task") + + try: + item = await command_generator.asend(future_results) + except StopAsyncIteration: + break + + log("Awaitable sent to worker") + batch, exec_options = unpack_work_item(item) + + except Exception: + # Raise the exception in the command_generator (the `worker` coroutine). + # If the client code is able to handle it then we'll proceed with + # subsequent batches. Otherwise the worker finishes with error. + await command_generator.athrow(*sys.exc_info()) + + async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> None: + nonlocal last_wid wid = last_wid last_wid += 1 @@ -553,111 +659,28 @@ def log(msg): ) emit(events.WorkerFinished(agr_id=agreement.id)) raise + async with act: + emit(events.ActivityCreated(act_id=act.id, agr_id=agreement.id)) agreements_accepting_debit_notes.add(agreement.id) work_context = WorkContext( f"worker-{wid}", node_info, storage_manager, emitter=emit ) - with work_queue.new_consumer() as consumer: - - command_generator = worker( - work_context, - (Task.for_handle(handle, work_queue, emit) async for handle in consumer), - ) + with work_queue.new_consumer() as consumer: try: - batch: Optional[Work] = await command_generator.__anext__() - except StopAsyncIteration: - batch = None - - while batch: - batch_deadline = ( - datetime.now(timezone.utc) + batch.timeout if batch.timeout else None - ) - try: - current_worker_task = consumer.current_item - if current_worker_task: - emit( - events.TaskStarted( - agr_id=agreement.id, - task_id=current_worker_task.id, - task_data=current_worker_task.data, - ) - ) - task_id = current_worker_task.id if current_worker_task else None - await batch.prepare() - cc = CommandContainer() - batch.register(cc) - - log(f"Batch: {len(cc.commands())} commands") - - remote = await act.send( - cc.commands(), stream_output, deadline=batch_deadline + await process_batches(agreement, act, work_context, consumer) + emit(events.WorkerFinished(agr_id=agreement.id)) + except Exception: + emit( + events.WorkerFinished( + agr_id=agreement.id, exc_info=sys.exc_info() # type: ignore ) - cmds = cc.commands() - emit(events.ScriptSent(agr_id=agreement.id, task_id=task_id, cmds=cmds)) - - async def get_batch_results() -> List[events.CommandEvent]: - - log("[get_batch_results] Waiting for results...") - - results = [] - async for evt_ctx in remote: - evt = evt_ctx.event( - agr_id=agreement.id, task_id=task_id, cmds=cmds - ) - emit(evt) - results.append(evt) - if isinstance(evt, events.CommandExecuted) and not evt.success: - raise CommandExecutionError(evt.command, evt.message) - - emit(events.GettingResults(agr_id=agreement.id, task_id=task_id)) - assert batch - await batch.post() - emit(events.ScriptFinished(agr_id=agreement.id, task_id=task_id)) - await accept_payment_for_agreement(agreement.id, partial=True) - - log("[get_batch_results] Got results") - return results - - loop = asyncio.get_event_loop() - if batch.wait_for_results or batch.contains_init_step: - # Block until the results are available - log("Blocking until batch results are available...") - results = await get_batch_results() - log("Returning batch results") - future_results = loop.create_future() - future_results.set_result(results) - batch = await command_generator.asend(future_results) - else: - # Schedule the coroutine in a separate asyncio task - log("Scheduling getting batch results...") - results_task = loop.create_task(get_batch_results()) - log("Returning results task") - batch = await command_generator.asend(results_task) - - log("Awaitable sent to worker") - - except StopAsyncIteration: - break - - except Exception: - - try: - await command_generator.athrow(*sys.exc_info()) - except Exception: - if self._conf.traceback: - traceback.print_exc() - emit( - events.WorkerFinished( - agr_id=agreement.id, exc_info=sys.exc_info() # type: ignore - ) - ) - raise - - await accept_payment_for_agreement(agreement.id) - emit(events.WorkerFinished(agr_id=agreement.id)) + ) + raise + finally: + await accept_payment_for_agreement(agreement.id) async def worker_starter() -> None: while True: diff --git a/yapapi/executor/ctx.py b/yapapi/executor/ctx.py index 0813f1c72..0e146c8ed 100644 --- a/yapapi/executor/ctx.py +++ b/yapapi/executor/ctx.py @@ -50,10 +50,10 @@ def timeout(self) -> Optional[timedelta]: """Return the optional timeout set for execution of this work.""" return None - @property - def wait_for_results(self) -> bool: - """Return `True` iff yielding this work item should block until the results are returned.""" - return True + # @property + # def wait_for_results(self) -> bool: + # """Return `True` iff yielding this work item should block until the results are returned.""" + # return True @property def contains_init_step(self) -> bool: @@ -237,30 +237,23 @@ async def __on_json_download(on_download: Callable[[bytes], Awaitable], content: class Steps(Work): - def __init__( - self, *steps: Work, timeout: Optional[timedelta] = None, wait_for_results: bool = True - ): + def __init__(self, *steps: Work, timeout: Optional[timedelta] = None): """Create a `Work` item consisting of a sequence of steps (subitems). :param steps: sequence of steps to be executed :param timeout: timeout for waiting for the steps' results - :param wait_for_results: whether yielding this work item should block + # :param wait_for_results: whether yielding this work item should block until the results are available """ self._steps: Tuple[Work, ...] = steps self._timeout: Optional[timedelta] = timeout - self._wait_for_results = wait_for_results + # self._wait_for_results = wait_for_results @property def timeout(self) -> Optional[timedelta]: """Return the optional timeout set for execution of all steps.""" return self._timeout - @property - def wait_for_results(self) -> bool: - """Return `True` iff yielding these steps should block until the results are available.""" - return self._wait_for_results - @property def contains_init_step(self) -> bool: """Return `True` iff the steps include an initialization step.""" @@ -282,6 +275,12 @@ async def post(self): await step.post() +@dataclass +class ExecOptions: + wait_for_results: bool = True + batch_timeout: Optional[timedelta] = None + + class WorkContext: """An object used to schedule commands to be sent to provider.""" @@ -408,7 +407,7 @@ def download_json( _ReceiveJson(self._storage, src_path, on_download, limit, self._emitter) ) - def commit(self, timeout: Optional[timedelta] = None, wait_for_results: bool = True) -> Work: + def commit(self, timeout: Optional[timedelta] = None) -> Work: """Creates a sequence of commands to be sent to provider. :return: Work object containing the sequence of commands @@ -416,10 +415,7 @@ def commit(self, timeout: Optional[timedelta] = None, wait_for_results: bool = T """ steps = self._pending_steps self._pending_steps = [] - return Steps(*steps, timeout=timeout, wait_for_results=wait_for_results) - - def commit_async(self) -> Work: - return self.commit(wait_for_results=False) + return Steps(*steps, timeout=timeout) class CaptureMode(enum.Enum): From a1b9061f20b10404e658f8920dcd9329f2dbfb19 Mon Sep 17 00:00:00 2001 From: azawlocki Date: Thu, 13 May 2021 16:05:52 +0200 Subject: [PATCH 5/7] Accept a suggestion from code review Co-authored-by: shadeofblue --- yapapi/executor/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index f75da26d8..7a45a10df 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -543,7 +543,7 @@ def unpack_work_item(item: WorkItem) -> Tuple[Work, ExecOptions]: If the item does not specify options, default ones are provided. """ if isinstance(item, tuple): - return item[0], item[1] + return item else: return item, ExecOptions() From a7972ebea3289de983b0aefd432f5235e90eae4f Mon Sep 17 00:00:00 2001 From: azawlocki Date: Thu, 13 May 2021 17:12:05 +0200 Subject: [PATCH 6/7] Use batch timeout value set in exec options --- yapapi/executor/__init__.py | 40 +++++++++++++++++-------------------- yapapi/executor/ctx.py | 8 -------- 2 files changed, 18 insertions(+), 30 deletions(-) diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index 7a45a10df..d9082b8e8 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -550,16 +550,9 @@ def unpack_work_item(item: WorkItem) -> Tuple[Work, ExecOptions]: async def process_batches( agreement: rest.market.Agreement, act: rest.activity.Activity, - work_context: WorkContext, + command_generator: AsyncGenerator[Work, Awaitable[List[events.CommandEvent]]], consumer: Consumer[Task[D, R]], ) -> None: - def log(msg): - logger.info("\033[33m%s\033[m", msg) - - command_generator = worker( - work_context, - (Task.for_handle(handle, work_queue, emit) async for handle in consumer), - ) try: item = await command_generator.__anext__() @@ -567,14 +560,24 @@ def log(msg): return batch, exec_options = unpack_work_item(item) + if batch.timeout: + if exec_options.batch_timeout: + logger.warning( + "Overriding batch timeout set with commit(batch_timeout)" + "by the value set in exec options" + ) + else: + exec_options.batch_timeout = batch.timeout while True: batch_deadline = ( - datetime.now(timezone.utc) + batch.timeout if batch.timeout else None + datetime.now(timezone.utc) + exec_options.batch_timeout + if exec_options.batch_timeout + else None ) try: - current_worker_task = consumer.last_item + current_worker_task = consumer.current_item if current_worker_task: emit( events.TaskStarted( @@ -587,16 +590,11 @@ def log(msg): await batch.prepare() cc = CommandContainer() batch.register(cc) - - log(f"Batch: {len(cc.commands())} commands") - remote = await act.send(cc.commands(), stream_output, deadline=batch_deadline) cmds = cc.commands() emit(events.ScriptSent(agr_id=agreement.id, task_id=task_id, cmds=cmds)) async def get_batch_results() -> List[events.CommandEvent]: - log("[get_batch_results] Waiting for results...") - results = [] async for evt_ctx in remote: evt = evt_ctx.event(agr_id=agreement.id, task_id=task_id, cmds=cmds) @@ -609,30 +607,24 @@ async def get_batch_results() -> List[events.CommandEvent]: await batch.post() emit(events.ScriptFinished(agr_id=agreement.id, task_id=task_id)) await accept_payment_for_agreement(agreement.id, partial=True) - log("[get_batch_results] Got results") return results loop = asyncio.get_event_loop() if exec_options.wait_for_results: # Block until the results are available - log("Blocking until batch results are available...") results = await get_batch_results() - log("Returning batch results") future_results = loop.create_future() future_results.set_result(results) else: # Schedule the coroutine in a separate asyncio task - log("Scheduling getting batch results...") future_results = loop.create_task(get_batch_results()) - log("Returning results task") try: item = await command_generator.asend(future_results) except StopAsyncIteration: break - log("Awaitable sent to worker") batch, exec_options = unpack_work_item(item) except Exception: @@ -670,7 +662,11 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> with work_queue.new_consumer() as consumer: try: - await process_batches(agreement, act, work_context, consumer) + tasks = ( + Task.for_handle(handle, work_queue, emit) async for handle in consumer + ) + batch_generator = worker(work_context, tasks) + await process_batches(agreement, act, batch_generator, consumer) emit(events.WorkerFinished(agr_id=agreement.id)) except Exception: emit( diff --git a/yapapi/executor/ctx.py b/yapapi/executor/ctx.py index 0e146c8ed..e06ed2c0c 100644 --- a/yapapi/executor/ctx.py +++ b/yapapi/executor/ctx.py @@ -50,11 +50,6 @@ def timeout(self) -> Optional[timedelta]: """Return the optional timeout set for execution of this work.""" return None - # @property - # def wait_for_results(self) -> bool: - # """Return `True` iff yielding this work item should block until the results are returned.""" - # return True - @property def contains_init_step(self) -> bool: """Return `True` iff this work item contains the initialization step.""" @@ -242,12 +237,9 @@ def __init__(self, *steps: Work, timeout: Optional[timedelta] = None): :param steps: sequence of steps to be executed :param timeout: timeout for waiting for the steps' results - # :param wait_for_results: whether yielding this work item should block - until the results are available """ self._steps: Tuple[Work, ...] = steps self._timeout: Optional[timedelta] = timeout - # self._wait_for_results = wait_for_results @property def timeout(self) -> Optional[timedelta]: From 2b1e0a8c89013be616a7cc5cea068279bce35745 Mon Sep 17 00:00:00 2001 From: azawlocki Date: Fri, 14 May 2021 16:48:55 +0200 Subject: [PATCH 7/7] Fix error handling in `process_batches()` --- examples/blender/blender-async-results.py | 173 ---------------------- yapapi/__init__.py | 9 +- yapapi/executor/__init__.py | 135 +++++++++-------- yapapi/executor/ctx.py | 12 +- 4 files changed, 77 insertions(+), 252 deletions(-) delete mode 100755 examples/blender/blender-async-results.py diff --git a/examples/blender/blender-async-results.py b/examples/blender/blender-async-results.py deleted file mode 100755 index f11baa2a8..000000000 --- a/examples/blender/blender-async-results.py +++ /dev/null @@ -1,173 +0,0 @@ -#!/usr/bin/env python3 -import asyncio -from datetime import datetime, timedelta -import pathlib -import sys - -from yapapi import ( - Executor, - NoPaymentAccountError, - Task, - __version__ as yapapi_version, - windows_event_loop_fix, -) -from yapapi.executor.ctx import ExecOptions, WorkContext -from yapapi.log import enable_default_logger, log_summary, log_event_repr # noqa -from yapapi.package import vm -from yapapi.rest.activity import BatchTimeoutError - -examples_dir = pathlib.Path(__file__).resolve().parent.parent -sys.path.append(str(examples_dir)) - -from utils import ( - build_parser, - TEXT_COLOR_CYAN, - TEXT_COLOR_DEFAULT, - TEXT_COLOR_RED, - TEXT_COLOR_YELLOW, -) - - -async def main(subnet_tag, driver=None, network=None): - package = await vm.repo( - image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae", - min_mem_gib=0.5, - min_storage_gib=2.0, - ) - - async def worker(ctx: WorkContext, tasks): - script_dir = pathlib.Path(__file__).resolve().parent - scene_path = str(script_dir / "cubes.blend") - ctx.send_file(scene_path, "/golem/resource/scene.blend") - async for task in tasks: - frame = task.data - crops = [{"outfilebasename": "out", "borders_x": [0.0, 1.0], "borders_y": [0.0, 1.0]}] - ctx.send_json( - "/golem/work/params.json", - { - "scene_file": "/golem/resource/scene.blend", - "resolution": (400, 300), - "use_compositing": False, - "crops": crops, - "samples": 100, - "frames": [frame], - "output_format": "PNG", - "RESOURCES_DIR": "/golem/resources", - "WORK_DIR": "/golem/work", - "OUTPUT_DIR": "/golem/output", - }, - ) - ctx.run("/golem/entrypoints/run-blender.sh") - output_file = f"output_{frame}.png" - ctx.download_file(f"/golem/output/out{frame:04d}.png", output_file) - - batch = ctx.commit() - future_results = yield (batch, ExecOptions(wait_for_results=False)) - - print("Maybe doing some work before looking at the results...") - await asyncio.sleep(1) - results = await future_results - print(f"Got results for {len(results)} commands") - task.accept_result(result=output_file) - - # Iterator over the frame indices that we want to render - frames: range = range(0, 60, 10) - # Worst-case overhead, in minutes, for initialization (negotiation, file transfer etc.) - # TODO: make this dynamic, e.g. depending on the size of files to transfer - init_overhead = 3 - # Providers will not accept work if the timeout is outside of the [5 min, 30min] range. - # We increase the lower bound to 6 min to account for the time needed for our demand to - # reach the providers. - min_timeout, max_timeout = 6, 30 - - timeout = timedelta(minutes=max(min(init_overhead + len(frames) * 2, max_timeout), min_timeout)) - - # By passing `event_consumer=log_summary()` we enable summary logging. - # See the documentation of the `yapapi.log` module on how to set - # the level of detail and format of the logged information. - async with Executor( - package=package, - max_workers=3, - budget=10.0, - timeout=timeout, - subnet_tag=subnet_tag, - driver=driver, - network=network, - event_consumer=log_summary(log_event_repr), - ) as executor: - - print( - f"yapapi version: {TEXT_COLOR_YELLOW}{yapapi_version}{TEXT_COLOR_DEFAULT}\n" - f"Using subnet: {TEXT_COLOR_YELLOW}{subnet_tag}{TEXT_COLOR_DEFAULT}, " - f"payment driver: {TEXT_COLOR_YELLOW}{executor.driver}{TEXT_COLOR_DEFAULT}, " - f"and network: {TEXT_COLOR_YELLOW}{executor.network}{TEXT_COLOR_DEFAULT}\n" - ) - - num_tasks = 0 - start_time = datetime.now() - - async for task in executor.submit(worker, [Task(data=frame) for frame in frames]): - num_tasks += 1 - print( - f"{TEXT_COLOR_CYAN}" - f"Task computed: {task}, result: {task.result}, time: {task.running_time}" - f"{TEXT_COLOR_DEFAULT}" - ) - - print( - f"{TEXT_COLOR_CYAN}" - f"{num_tasks} tasks computed, total time: {datetime.now() - start_time}" - f"{TEXT_COLOR_DEFAULT}" - ) - - -if __name__ == "__main__": - parser = build_parser("Render a Blender scene") - now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S") - parser.set_defaults(log_file=f"blender-yapapi-{now}.log") - args = parser.parse_args() - - # This is only required when running on Windows with Python prior to 3.8: - windows_event_loop_fix() - - enable_default_logger( - log_file=args.log_file, - debug_activity_api=True, - debug_market_api=True, - debug_payment_api=True, - ) - - loop = asyncio.get_event_loop() - task = loop.create_task( - main(subnet_tag=args.subnet_tag, driver=args.driver, network=args.network) - ) - - try: - loop.run_until_complete(task) - except NoPaymentAccountError as e: - handbook_url = ( - "https://handbook.golem.network/requestor-tutorials/" - "flash-tutorial-of-requestor-development" - ) - print( - f"{TEXT_COLOR_RED}" - f"No payment account initialized for driver `{e.required_driver}` " - f"and network `{e.required_network}`.\n\n" - f"See {handbook_url} on how to initialize payment accounts for a requestor node." - f"{TEXT_COLOR_DEFAULT}" - ) - except KeyboardInterrupt: - print( - f"{TEXT_COLOR_YELLOW}" - "Shutting down gracefully, please wait a short while " - "or press Ctrl+C to exit immediately..." - f"{TEXT_COLOR_DEFAULT}" - ) - task.cancel() - try: - loop.run_until_complete(task) - print( - f"{TEXT_COLOR_YELLOW}Shutdown completed, thank you for waiting!{TEXT_COLOR_DEFAULT}" - ) - except (asyncio.CancelledError, KeyboardInterrupt): - pass diff --git a/yapapi/__init__.py b/yapapi/__init__.py index c7e46d197..cdc3e4d35 100644 --- a/yapapi/__init__.py +++ b/yapapi/__init__.py @@ -6,7 +6,14 @@ from pathlib import Path from pkg_resources import get_distribution -from .executor import Executor, NoPaymentAccountError, Task, WorkContext, CaptureContext +from .executor import ( + CaptureContext, + ExecOptions, + Executor, + NoPaymentAccountError, + Task, + WorkContext, +) def get_version() -> str: diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index d9082b8e8..fd42f2fe0 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -548,90 +548,86 @@ def unpack_work_item(item: WorkItem) -> Tuple[Work, ExecOptions]: return item, ExecOptions() async def process_batches( - agreement: rest.market.Agreement, - act: rest.activity.Activity, + agreement_id: str, + activity: rest.activity.Activity, command_generator: AsyncGenerator[Work, Awaitable[List[events.CommandEvent]]], consumer: Consumer[Task[D, R]], ) -> None: + """Send command batches produced by `command_generator` to `activity`.""" - try: - item = await command_generator.__anext__() - except StopAsyncIteration: - return - - batch, exec_options = unpack_work_item(item) - if batch.timeout: - if exec_options.batch_timeout: - logger.warning( - "Overriding batch timeout set with commit(batch_timeout)" - "by the value set in exec options" - ) - else: - exec_options.batch_timeout = batch.timeout + item = await command_generator.__anext__() while True: + batch, exec_options = unpack_work_item(item) + if batch.timeout: + if exec_options.batch_timeout: + logger.warning( + "Overriding batch timeout set with commit(batch_timeout)" + "by the value set in exec options" + ) + else: + exec_options.batch_timeout = batch.timeout + batch_deadline = ( datetime.now(timezone.utc) + exec_options.batch_timeout if exec_options.batch_timeout else None ) - try: - current_worker_task = consumer.current_item - if current_worker_task: - emit( - events.TaskStarted( - agr_id=agreement.id, - task_id=current_worker_task.id, - task_data=current_worker_task.data, - ) + + current_worker_task = consumer.current_item + if current_worker_task: + emit( + events.TaskStarted( + agr_id=agreement_id, + task_id=current_worker_task.id, + task_data=current_worker_task.data, ) - task_id = current_worker_task.id if current_worker_task else None - await batch.prepare() - cc = CommandContainer() - batch.register(cc) - remote = await act.send(cc.commands(), stream_output, deadline=batch_deadline) - cmds = cc.commands() - emit(events.ScriptSent(agr_id=agreement.id, task_id=task_id, cmds=cmds)) - - async def get_batch_results() -> List[events.CommandEvent]: - results = [] - async for evt_ctx in remote: - evt = evt_ctx.event(agr_id=agreement.id, task_id=task_id, cmds=cmds) - emit(evt) - results.append(evt) - if isinstance(evt, events.CommandExecuted) and not evt.success: - raise CommandExecutionError(evt.command, evt.message) - - emit(events.GettingResults(agr_id=agreement.id, task_id=task_id)) - await batch.post() - emit(events.ScriptFinished(agr_id=agreement.id, task_id=task_id)) - await accept_payment_for_agreement(agreement.id, partial=True) - return results - - loop = asyncio.get_event_loop() - - if exec_options.wait_for_results: - # Block until the results are available - results = await get_batch_results() + ) + task_id = current_worker_task.id if current_worker_task else None + + await batch.prepare() + cc = CommandContainer() + batch.register(cc) + remote = await activity.send(cc.commands(), stream_output, deadline=batch_deadline) + cmds = cc.commands() + emit(events.ScriptSent(agr_id=agreement_id, task_id=task_id, cmds=cmds)) + + async def get_batch_results() -> List[events.CommandEvent]: + results = [] + async for evt_ctx in remote: + evt = evt_ctx.event(agr_id=agreement_id, task_id=task_id, cmds=cmds) + emit(evt) + results.append(evt) + if isinstance(evt, events.CommandExecuted) and not evt.success: + raise CommandExecutionError(evt.command, evt.message) + + emit(events.GettingResults(agr_id=agreement_id, task_id=task_id)) + await batch.post() + emit(events.ScriptFinished(agr_id=agreement_id, task_id=task_id)) + await accept_payment_for_agreement(agreement_id, partial=True) + return results + + loop = asyncio.get_event_loop() + + if exec_options.wait_for_results: + # Block until the results are available + try: future_results = loop.create_future() + results = await get_batch_results() future_results.set_result(results) - else: - # Schedule the coroutine in a separate asyncio task - future_results = loop.create_task(get_batch_results()) - - try: item = await command_generator.asend(future_results) except StopAsyncIteration: - break - - batch, exec_options = unpack_work_item(item) - - except Exception: - # Raise the exception in the command_generator (the `worker` coroutine). - # If the client code is able to handle it then we'll proceed with - # subsequent batches. Otherwise the worker finishes with error. - await command_generator.athrow(*sys.exc_info()) + raise + except Exception: + # Raise the exception in `command_generator` (the `worker` coroutine). + # If the client code is able to handle it then we'll proceed with + # subsequent batches. Otherwise the worker finishes with error. + item = await command_generator.athrow(*sys.exc_info()) + else: + # Schedule the coroutine in a separate asyncio task + future_results = loop.create_task(get_batch_results()) + item = await command_generator.asend(future_results) async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> None: @@ -666,7 +662,10 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> Task.for_handle(handle, work_queue, emit) async for handle in consumer ) batch_generator = worker(work_context, tasks) - await process_batches(agreement, act, batch_generator, consumer) + try: + await process_batches(agreement.id, act, batch_generator, consumer) + except StopAsyncIteration: + pass emit(events.WorkerFinished(agr_id=agreement.id)) except Exception: emit( diff --git a/yapapi/executor/ctx.py b/yapapi/executor/ctx.py index e06ed2c0c..0e225ff81 100644 --- a/yapapi/executor/ctx.py +++ b/yapapi/executor/ctx.py @@ -50,11 +50,6 @@ def timeout(self) -> Optional[timedelta]: """Return the optional timeout set for execution of this work.""" return None - @property - def contains_init_step(self) -> bool: - """Return `True` iff this work item contains the initialization step.""" - return False - class _InitStep(Work): def register(self, commands: CommandContainer): @@ -246,11 +241,6 @@ def timeout(self) -> Optional[timedelta]: """Return the optional timeout set for execution of all steps.""" return self._timeout - @property - def contains_init_step(self) -> bool: - """Return `True` iff the steps include an initialization step.""" - return any(isinstance(step, _InitStep) for step in self._steps) - async def prepare(self): """Execute the `prepare` hook for all the defined steps.""" for step in self._steps: @@ -269,6 +259,8 @@ async def post(self): @dataclass class ExecOptions: + """Options related to command batch execution.""" + wait_for_results: bool = True batch_timeout: Optional[timedelta] = None