From a7972ebea3289de983b0aefd432f5235e90eae4f Mon Sep 17 00:00:00 2001 From: azawlocki Date: Thu, 13 May 2021 17:12:05 +0200 Subject: [PATCH] Use batch timeout value set in exec options --- yapapi/executor/__init__.py | 40 +++++++++++++++++-------------------- yapapi/executor/ctx.py | 8 -------- 2 files changed, 18 insertions(+), 30 deletions(-) diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index 7a45a10df..d9082b8e8 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -550,16 +550,9 @@ def unpack_work_item(item: WorkItem) -> Tuple[Work, ExecOptions]: async def process_batches( agreement: rest.market.Agreement, act: rest.activity.Activity, - work_context: WorkContext, + command_generator: AsyncGenerator[Work, Awaitable[List[events.CommandEvent]]], consumer: Consumer[Task[D, R]], ) -> None: - def log(msg): - logger.info("\033[33m%s\033[m", msg) - - command_generator = worker( - work_context, - (Task.for_handle(handle, work_queue, emit) async for handle in consumer), - ) try: item = await command_generator.__anext__() @@ -567,14 +560,24 @@ def log(msg): return batch, exec_options = unpack_work_item(item) + if batch.timeout: + if exec_options.batch_timeout: + logger.warning( + "Overriding batch timeout set with commit(batch_timeout)" + "by the value set in exec options" + ) + else: + exec_options.batch_timeout = batch.timeout while True: batch_deadline = ( - datetime.now(timezone.utc) + batch.timeout if batch.timeout else None + datetime.now(timezone.utc) + exec_options.batch_timeout + if exec_options.batch_timeout + else None ) try: - current_worker_task = consumer.last_item + current_worker_task = consumer.current_item if current_worker_task: emit( events.TaskStarted( @@ -587,16 +590,11 @@ def log(msg): await batch.prepare() cc = CommandContainer() batch.register(cc) - - log(f"Batch: {len(cc.commands())} commands") - remote = await act.send(cc.commands(), stream_output, deadline=batch_deadline) cmds = cc.commands() emit(events.ScriptSent(agr_id=agreement.id, task_id=task_id, cmds=cmds)) async def get_batch_results() -> List[events.CommandEvent]: - log("[get_batch_results] Waiting for results...") - results = [] async for evt_ctx in remote: evt = evt_ctx.event(agr_id=agreement.id, task_id=task_id, cmds=cmds) @@ -609,30 +607,24 @@ async def get_batch_results() -> List[events.CommandEvent]: await batch.post() emit(events.ScriptFinished(agr_id=agreement.id, task_id=task_id)) await accept_payment_for_agreement(agreement.id, partial=True) - log("[get_batch_results] Got results") return results loop = asyncio.get_event_loop() if exec_options.wait_for_results: # Block until the results are available - log("Blocking until batch results are available...") results = await get_batch_results() - log("Returning batch results") future_results = loop.create_future() future_results.set_result(results) else: # Schedule the coroutine in a separate asyncio task - log("Scheduling getting batch results...") future_results = loop.create_task(get_batch_results()) - log("Returning results task") try: item = await command_generator.asend(future_results) except StopAsyncIteration: break - log("Awaitable sent to worker") batch, exec_options = unpack_work_item(item) except Exception: @@ -670,7 +662,11 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> with work_queue.new_consumer() as consumer: try: - await process_batches(agreement, act, work_context, consumer) + tasks = ( + Task.for_handle(handle, work_queue, emit) async for handle in consumer + ) + batch_generator = worker(work_context, tasks) + await process_batches(agreement, act, batch_generator, consumer) emit(events.WorkerFinished(agr_id=agreement.id)) except Exception: emit( diff --git a/yapapi/executor/ctx.py b/yapapi/executor/ctx.py index 0e146c8ed..e06ed2c0c 100644 --- a/yapapi/executor/ctx.py +++ b/yapapi/executor/ctx.py @@ -50,11 +50,6 @@ def timeout(self) -> Optional[timedelta]: """Return the optional timeout set for execution of this work.""" return None - # @property - # def wait_for_results(self) -> bool: - # """Return `True` iff yielding this work item should block until the results are returned.""" - # return True - @property def contains_init_step(self) -> bool: """Return `True` iff this work item contains the initialization step.""" @@ -242,12 +237,9 @@ def __init__(self, *steps: Work, timeout: Optional[timedelta] = None): :param steps: sequence of steps to be executed :param timeout: timeout for waiting for the steps' results - # :param wait_for_results: whether yielding this work item should block - until the results are available """ self._steps: Tuple[Work, ...] = steps self._timeout: Optional[timedelta] = timeout - # self._wait_for_results = wait_for_results @property def timeout(self) -> Optional[timedelta]: