-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Return batch results (as awaitable) to the worker function #366
Conversation
@@ -378,15 +408,18 @@ def download_json( | |||
_ReceiveJson(self._storage, src_path, on_download, limit, self._emitter) | |||
) | |||
|
|||
def commit(self, timeout: Optional[timedelta] = None) -> Work: | |||
def commit(self, timeout: Optional[timedelta] = None, wait_for_results: bool = True) -> Work: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd make this an internal interface and expose only the commit_async
to users
def commit(self, timeout: Optional[timedelta] = None, wait_for_results: bool = True) -> Work: | |
def commit(self, timeout: Optional[timedelta] = None, __wait_for_results: bool = True) -> Work: |
output_file = f"output_{frame}.png" | ||
ctx.download_file(f"/golem/output/out{frame:04d}.png", output_file) | ||
|
||
future_results = yield ctx.commit(wait_for_results=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as per TOOWTDI, I'd go with your "equivalent to" suggestion:
future_results = yield ctx.commit(wait_for_results=False) | |
future_results = yield ctx.commit_async() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as agreed with @azawlocki let's change it to a yielded tuple (the batch + optional parameters)
print(colors.yellow("[get_batch_results] Got results")) | ||
return results | ||
|
||
if batch.wait_for_results or batch.contains_init_step: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we assuming that existence of the init step should make the call blocking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just to prevent the user from sending another exe script before deploy
and start
commands are executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm.... I guess we might want to change the interface re: the init step anyway but for now, maybe we should add assert not batch.contains_init_step
in the else
clause here?
yapapi/executor/__init__.py
Outdated
# Block until the results are available | ||
print( | ||
colors.yellow( | ||
"[executor] Blocking until results are available..." | ||
) | ||
) | ||
results = await get_batch_results() | ||
print(colors.yellow("[executor] Returning results")) | ||
|
||
async def make_awaitable(): | ||
return results | ||
|
||
batch = await command_generator.asend(make_awaitable()) | ||
else: | ||
# Schedule the coroutine in a separate asyncio task | ||
print(colors.yellow("[executor] Scheduling getting the results...")) | ||
loop = asyncio.get_event_loop() | ||
results_task = loop.create_task(get_batch_results()) | ||
print(colors.yellow("[executor] Returning results task")) | ||
batch = await command_generator.asend(results_task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you don't have to make_awaitable
... since get_batch_results()
itself is an awaitable, isn't it?
# Block until the results are available | |
print( | |
colors.yellow( | |
"[executor] Blocking until results are available..." | |
) | |
) | |
results = await get_batch_results() | |
print(colors.yellow("[executor] Returning results")) | |
async def make_awaitable(): | |
return results | |
batch = await command_generator.asend(make_awaitable()) | |
else: | |
# Schedule the coroutine in a separate asyncio task | |
print(colors.yellow("[executor] Scheduling getting the results...")) | |
loop = asyncio.get_event_loop() | |
results_task = loop.create_task(get_batch_results()) | |
print(colors.yellow("[executor] Returning results task")) | |
batch = await command_generator.asend(results_task) | |
# Block until the results are available | |
print( | |
colors.yellow( | |
"[executor] Blocking until results are available..." | |
) | |
) | |
print(colors.yellow("[executor] Returning results")) | |
results = get_batch_results() | |
else: | |
# Schedule the coroutine in a separate asyncio task | |
print(colors.yellow("[executor] Scheduling getting the results...")) | |
loop = asyncio.get_event_loop() | |
results = loop.create_task(get_batch_results()) | |
print(colors.yellow("[executor] Returning results task")) | |
batch = await command_generator.asend(results) |
though, I still don't get how the first one is blocking since it returns an awaitable anyway ... will the .commit()
indeed block in this case? ... I generally still don't grok this async magic fully :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, now I get it -> it does await the results but then re-wraps them in an awaitable... but I still think it won't be compatible with the current code -> iow, I guess there will be an exception saying that an awaitable was never awaited in case of the blocking call.... (won't there?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the blocking case we do results = await get_batch_results()
to block the execution until the results are ready. When this statement is executed we have the results ready in results
and we could simply send them as is, doing:
command_generator.asend(results)
But that would mean that the things that are returned by yield
are sometimes immediate results (for non-blocking ctx.commit()
) and sometimes Awaitable
(for blocking ctx.commit()
):
results_or_future = yield ctx.commit(wait_for_results=dunno)
I thought that this could be confusing. We would also have to modify the type of worker
so that it's a generator to which one can send both immediate values and awaitables (the type of things that can be asend()
ed to an async generator is a part of the generator's type).
So in order for future_results = yield ctx.commit(...)
to always return an Awaitable
I'm wrapping results
in make_awaitable()
, which does not really block anything, since it just returns results
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that's why I think trying to cover it with one method is not the best idea and it would be better to have two separate commit
s:
- one (the current
def commit()
) would be always blocking and would not return any results - the other (
async def commit_and_get_results(blocking: bool = True)
) would be either blocking or non-blocking but would always return an awaitable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it's not commit()
that returns an awaitable, it's yield
! ctx.commit()
always returns a batch of commands (an instance Work
). Anyway, returning awaitable in the first case, that is, when we're not interested in the results, is not a problem, in 6986d7b I'm wrapping the results in a future instead of a coroutine thus avoiding a warning about a coroutine that's never awaited.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Therefore I think that just a single commit()
method, with blocking
flag defaulting to True
should be enough.
return Steps(*steps, timeout=timeout, wait_for_results=wait_for_results) | ||
|
||
def commit_async(self) -> Work: | ||
return self.commit(wait_for_results=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return self.commit(wait_for_results=False) | |
return self.commit(__wait_for_results=False) |
"""Creates a sequence of commands to be sent to provider. | ||
|
||
:return: Work object containing the sequence of commands | ||
scheduled within this work context before calling this method) | ||
""" | ||
steps = self._pending_steps | ||
self._pending_steps = [] | ||
return Steps(*steps, timeout=timeout) | ||
return Steps(*steps, timeout=timeout, wait_for_results=wait_for_results) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return Steps(*steps, timeout=timeout, wait_for_results=wait_for_results) | |
return Steps(*steps, timeout=timeout, wait_for_results=__wait_for_results) |
print(colors.yellow("[get_batch_results] Got results")) | ||
return results | ||
|
||
if batch.wait_for_results or batch.contains_init_step: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iow:
if batch.wait_for_results or batch.contains_init_step: | |
if batch.wait_for_results: |
return results | ||
|
||
batch = await command_generator.asend(make_awaitable()) | ||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plus:
else: | |
else: | |
assert not batch.contains_init_step, "init step must be blocking" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this looks like we're trying to assert
some properties of data that we do not control, namely the batch
argument. I think raising an exception would be more appropriate if this condition is not satisfied.
Another approach I think would be to leave the code as is (i.e. override the blocking
flag if the batch contains initialization commands) and document the blocking
flag saying something like:
`future_results = yield ctx.commit(blocking=False)` returns without waiting
for batch results _whenever possible_, if the batch contains `Deploy` or `Start`
commands, `yield` will block until they are executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, from the discussion with @azawlocki :
- it would be the user's responsibility to know whether
deploy
andstart
should be blocking and if they choose to run them in a non-blocking way, it's on them to cope with the exception (or lack of it -> because some other exeunit may not require it) - plus, we'd add it to the documentation of the VM payload that vm exeunit requires the deploy/start to finish before one can issue any other commands
Superseded by #367 |
Snippet from a modified
blender
example (https://github.com/golemfactory/yapapi/pull/366/files#diff-8181a3cb89d4ebc32f70360978ba0a86112b9249a500a7528854aaf071536563):