-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Yield completed tasks from Engine.map()
#44
Conversation
yapapi/runner/__init__.py
Outdated
@@ -329,7 +324,7 @@ async def find_offers(): | |||
await proposal.respond(builder.props, builder.cons) | |||
emit(Event.ProposalResponded(prop_id=proposal.id)) | |||
except Exception as ex: | |||
emit(Event.ProposalFailed(sub_id=subscription.id, reason=str(ex))) | |||
emit(Event.ProposalFailed(prop_id=proposal.id, reason=str(ex))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error has been reported by mypy
but only after I added a return type annotation to def find_offers()
.
yapapi/runner/__init__.py
Outdated
@@ -409,8 +404,8 @@ async def worker_starter(): | |||
task = None | |||
try: | |||
agreement = await b.proposal.agreement() | |||
provider_id = (await agreement.details()).view_prov(Identification) | |||
emit(Event.AgreementCreated(agr_id=agreement.id, provider_id=provider_id)) | |||
prov_idn = (await agreement.details()).view_prov(Identification) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
provider_id = ...
shadowed another declaration with a different type
): | ||
if datetime.now(timezone.utc) > self._expires: | ||
raise TimeoutError(f"task timeout exceeded. timeout={self._conf.timeout}") | ||
|
||
if not get_done_task: | ||
get_done_task = loop.create_task(done_queue.get()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This (asyncio) task just removes a single completed (yapapi) task from done_queue.
@@ -508,113 +516,11 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): | |||
await self._stack.aclose() | |||
|
|||
|
|||
class TaskStatus(Enum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code has been moved to yapapi/runner/task.py
e9a2310
to
d005c4d
Compare
57d6ef3
to
1d477a9
Compare
8b6de25
to
89226f5
Compare
89226f5
to
fc2ed6e
Compare
Resolves #26
Engine.map()
now yields completed tasks. Its return type is nowAsyncIterator[Task[D, R]]
, whereD
andR
are type variables representing task data and results, respectively.The
Task
class is in a separate moduleyapapi.runner.task
.