Skip to content

Commit

Permalink
Use batch timeout value set in exec options
Browse files Browse the repository at this point in the history
  • Loading branch information
azawlocki committed May 14, 2021
1 parent a1b9061 commit a7972eb
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 30 deletions.
40 changes: 18 additions & 22 deletions yapapi/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,31 +550,34 @@ def unpack_work_item(item: WorkItem) -> Tuple[Work, ExecOptions]:
async def process_batches(
agreement: rest.market.Agreement,
act: rest.activity.Activity,
work_context: WorkContext,
command_generator: AsyncGenerator[Work, Awaitable[List[events.CommandEvent]]],
consumer: Consumer[Task[D, R]],
) -> None:
def log(msg):
logger.info("\033[33m%s\033[m", msg)

command_generator = worker(
work_context,
(Task.for_handle(handle, work_queue, emit) async for handle in consumer),
)

try:
item = await command_generator.__anext__()
except StopAsyncIteration:
return

batch, exec_options = unpack_work_item(item)
if batch.timeout:
if exec_options.batch_timeout:
logger.warning(
"Overriding batch timeout set with commit(batch_timeout)"
"by the value set in exec options"
)
else:
exec_options.batch_timeout = batch.timeout

while True:

batch_deadline = (
datetime.now(timezone.utc) + batch.timeout if batch.timeout else None
datetime.now(timezone.utc) + exec_options.batch_timeout
if exec_options.batch_timeout
else None
)
try:
current_worker_task = consumer.last_item
current_worker_task = consumer.current_item
if current_worker_task:
emit(
events.TaskStarted(
Expand All @@ -587,16 +590,11 @@ def log(msg):
await batch.prepare()
cc = CommandContainer()
batch.register(cc)

log(f"Batch: {len(cc.commands())} commands")

remote = await act.send(cc.commands(), stream_output, deadline=batch_deadline)
cmds = cc.commands()
emit(events.ScriptSent(agr_id=agreement.id, task_id=task_id, cmds=cmds))

async def get_batch_results() -> List[events.CommandEvent]:
log("[get_batch_results] Waiting for results...")

results = []
async for evt_ctx in remote:
evt = evt_ctx.event(agr_id=agreement.id, task_id=task_id, cmds=cmds)
Expand All @@ -609,30 +607,24 @@ async def get_batch_results() -> List[events.CommandEvent]:
await batch.post()
emit(events.ScriptFinished(agr_id=agreement.id, task_id=task_id))
await accept_payment_for_agreement(agreement.id, partial=True)
log("[get_batch_results] Got results")
return results

loop = asyncio.get_event_loop()

if exec_options.wait_for_results:
# Block until the results are available
log("Blocking until batch results are available...")
results = await get_batch_results()
log("Returning batch results")
future_results = loop.create_future()
future_results.set_result(results)
else:
# Schedule the coroutine in a separate asyncio task
log("Scheduling getting batch results...")
future_results = loop.create_task(get_batch_results())
log("Returning results task")

try:
item = await command_generator.asend(future_results)
except StopAsyncIteration:
break

log("Awaitable sent to worker")
batch, exec_options = unpack_work_item(item)

except Exception:
Expand Down Expand Up @@ -670,7 +662,11 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) ->

with work_queue.new_consumer() as consumer:
try:
await process_batches(agreement, act, work_context, consumer)
tasks = (
Task.for_handle(handle, work_queue, emit) async for handle in consumer
)
batch_generator = worker(work_context, tasks)
await process_batches(agreement, act, batch_generator, consumer)
emit(events.WorkerFinished(agr_id=agreement.id))
except Exception:
emit(
Expand Down
8 changes: 0 additions & 8 deletions yapapi/executor/ctx.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@ def timeout(self) -> Optional[timedelta]:
"""Return the optional timeout set for execution of this work."""
return None

# @property
# def wait_for_results(self) -> bool:
# """Return `True` iff yielding this work item should block until the results are returned."""
# return True

@property
def contains_init_step(self) -> bool:
"""Return `True` iff this work item contains the initialization step."""
Expand Down Expand Up @@ -242,12 +237,9 @@ def __init__(self, *steps: Work, timeout: Optional[timedelta] = None):
:param steps: sequence of steps to be executed
:param timeout: timeout for waiting for the steps' results
# :param wait_for_results: whether yielding this work item should block
until the results are available
"""
self._steps: Tuple[Work, ...] = steps
self._timeout: Optional[timedelta] = timeout
# self._wait_for_results = wait_for_results

@property
def timeout(self) -> Optional[timedelta]:
Expand Down

0 comments on commit a7972eb

Please sign in to comment.