-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
StreamManager: Add mechanism to close the request iterator #6263
StreamManager: Add mechanism to close the request iterator #6263
Conversation
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## master #6263 +/- ##
==========================================
- Coverage 97.88% 97.88% -0.01%
==========================================
Files 1104 1104
Lines 95760 95819 +59
==========================================
+ Hits 93735 93790 +55
- Misses 2025 2029 +4
☔ View full report in Codecov by Sentry. |
self._executor.submit(self._init_request_queue).result() | ||
|
||
async def _init_request_queue(self) -> None: | ||
await asyncio.sleep(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this sleep? It's not needed if the only point is to make the function async, since async def
is enough for that and you don't actually need an await
(note this is different from generator functions which do need at least one yield
to be recognized as a generator).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not needed if the only point is to make the function async
Did not realize that, thanks!
@@ -121,6 +121,11 @@ def __init__(self, grpc_client: quantum.QuantumEngineServiceAsyncClient): | |||
# interface. | |||
self._response_demux = ResponseDemux() | |||
self._next_available_message_id = 0 | |||
self._executor.submit(self._init_request_queue).result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like self._request_queue
is only ever accessed on the duet side (in StreamManager.submit
). Is this to ensure that it gets bound to the correct event loop when constructed? If so, I would suggest just submitting a job to construct the queue but then assigning it here:
# Construct queue in executor to ensure it binds to the correct event loop
self._request_queue = self._executor.submit(self._make_request_queue).result()
async def _make_request_queue(self) -> asyncio.Queue[QuantumRunStreamRequest]:
return asyncio.Queue()
But would be good to check that this is needed. In the current implementation asyncio.Queue
doesn't bind its event loop until either get
or put
are called so would be fine to construct it on the duet side and pass to the executor to use, but this may have changed in python 3.10 when the loop
arg was removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, thanks for the suggestion, updated.
It indeed fails if the queue were constructed in the duet thread.
async def _request_iterator( | ||
request_queue: asyncio.Queue, | ||
) -> AsyncIterator[quantum.QuantumRunStreamRequest]: | ||
"""The request iterator for Quantum Engine client RPC quantum_run_stream(). | ||
|
||
Every call to this method generates a new iterator. | ||
""" | ||
while True: | ||
yield await request_queue.get() | ||
while (request := await request_queue.get()) != StreamManager._STOP_SIGNAL: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest using None
as the stop signal instead, then can simplify this a bit:
while (request := await request_queue.get()) != StreamManager._STOP_SIGNAL: | |
while request := await request_queue.get(): |
Would also be good to explicitly type the queue:
self._request_queue: asyncio.Queue[Optional[QuantumRunStreamRequest]] = ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. I kept the _STOP_SIGNAL
constant (but set it to None
) because the call to request_queue.put()
is more explicit about what the signal means.
7734b59
to
7a7a7a9
Compare
This PR builds on top of #6253 . Please review the last two commits.
This fix stemmed from the issue that requests were not sent after a stream breaks and reopens. The root cause is that the request iterator from the previous stream is still running, and although the Quantum Engine client doesn't actively yield from the iterator, the iterator still dequeues from the request queue behind the scenes.
This PR adds a dedicated stop signal to be sent to the request queue to signal that the iterator should stop. In addition to the issue above, this also addresses the TODO that the request iterator should be closed upon stream closing in order to send a half close to the server.
To make this fix work, I also made the request queue local to the execution and stream coroutines. Otherwise, once a user stops the manager, the queue is cleared in the duet thread while the stream coroutine tries to send a stop signal to the queue in the asyncio thread, leading to a race condition. This address another TODO.
@maffoo @wcourtney