-
Notifications
You must be signed in to change notification settings - Fork 239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
L58: Async API for gRPC Python #155
Conversation
L58-python-async-api.md
Outdated
for item in self.items: | ||
yield item | ||
|
||
stub.SayHelloStreaming(AsyncIter([ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should wait until we get complaints before we introduce something like this. I don't think returning canned responses is a common use case outside of test code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only valid use case I can think about in practice: transmitting trunks of a large file.
On the other hand, testing code usability is also something we care about. Not only gRPC Python itself is writing end2end tests, but also our users.
But doing magic for different type of input makes me hesitate a bit. I'm also in favor of doing nothing, and only accepting asynchronous generator/iterator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could always send an email to the python-dev mailing list about implementing an aiter()
function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this looks easier to read:
stream = stub.SayHelloStreaming()
for chunk in ["Golden", "Retriever", "Pan", "Cake"]:
await stream.write(HelloRequest(name=chunk))
Why is there a need to use async iterators?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the client-side, the async iterator helps existing user to adopt asyncio
if they are using iterators today. On the other hand, the yield
and async for
semantic is Pythonic and useful for some use cases if the request and response are not tightly coupled locally. E.g. there is a robot application that requires uploading its sensor readings, while executes the command sent by its brain server:
async def environment_report_generator(sensors: List[RoboticSensor]):
report = EnvironmentReport()
for sensor in sensors:
report[sensor.name()] = await sensor.read()
yield report
response_iterator = stub.request_for_command(environment_report_generator(sensors))
async def command in response_iterator:
await actuators[command.actuator_name].perform(command.actuator_command)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work @lidizheng !
L58-python-async-api.md
Outdated
|
||
* `grpc.aio.server` no longer require the application to provide a | ||
`ThreadPoolExecutor`; | ||
* Interfaces returning `Future` object are replaced. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or removed since asyncio
already provides a way for running a coroutine as an independent task, and implementing the Future
contract, for example:
task = asyncio.run(stub.Hi(echo_pb2.EchoRequest(message="ping")))
task.add_done_callback(my_callback)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, so for those use case, they should return a Task or our context class instead. The idea being, if we must use Future
mechanism, we should hide the implementation details. Otherwise, we should yield the choice of Future
implementation to our users.
https://docs.python.org/3.7/library/asyncio-future.html#asyncio.Future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/asyncio.run/asyncio.ensure_future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, either the user uses the Asyncio API directly by scheduling a separated task or the aio
library does it internally and returns either an Asyncio.Task
[1] or the context class.
Seems that I will go for the second option only if the object that needs to be returned is not the Asyncio.Task
one. Otherwise we will be adding a superfluous wrapper IMO.
If we return a gRPC object this would need to implement all of the interfaces - like being awaitable - to be used by the set of the asyncio methods that deal with awaitable objects, for example wait
[2] and wait_for
[3]. Otherwise, the user would have a lot of friction using these asyncio methods when a gRPC future object is given as a parameter, for example:
grpc_task = stub.Hi.future(echo_pb2.EchoRequest(message="ping"))
await asyncio.wait([grpc_task])
This makes me think that the simpler the better, so going for the solution where the aio
module does not implement any ad-hoc method, and the user uses the Asyncio API directly. Going for the ad-hoc alternative would need a strong reason.
[1] https://docs.python.org/3/library/asyncio-task.html#asyncio.Task
[2] https://docs.python.org/3/library/asyncio-task.html#asyncio.wait
[3] https://docs.python.org/3/library/asyncio-task.html#asyncio.wait_for
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I fully understand your argument. What is your conclusion here? Do you support returning an asyncio.Task
object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
second thought, and related with this comment [1].
Having the feeling that we would need to return a custom object where internalities will be hidden. For example for the use case of the future()
method of the unary call, we would return a grpc.aio.Call
object which would implement the __await__
method, making it compatible with Asyncio API primitives that are expecting awaitables like wait
, wait_for
, ensure_future
or the await
keyword, so the following should work - I haven't tried it:
call = stub.Hi.future(...)
await call
Ofc the following snippet will have the same effect, since the object returned implement the await method
call = await stub.Hi.future(...)
Internally the grpc.aio.Call
wold start the task in the background, having the chance of using the methods provided by the asyncio.Task
for implementing some of the public methods of the grpc.aio.Call
object, for example
class Call:
def __init__(coro,......):
self._task = asyncio.create_task(coro)
....
def __await__(self):
return self._task.__await__()
async def cancel(self):
self._task.cancel()
....
[1] https://github.com/grpc/proposal/pull/155/files#r311288205
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about make the grpc.aio.Call
class inherit from asyncio.Task
? So we get all the subtle changes across multiple Python versions, and also get the chance to inject the logic we want?
class Call(asyncio.Task):
def __init__(coro, ...):
....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inheriting directly from the asyncio.Task
would put our API at merci of the Asyncio maintainers. If a new method is added for the asyncio.Task
object, will we have 100% confidence that is fully supported in our use case? I don't think so. While not exposing the task gives us the chance and freedom of using the methods that we want and in the way that we want.
But this discussion might be an implementation detail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think both direction is fine for me. My original intent to propose inherit from Task
is to reduce our maintenance burden. It can cut down the boilerplates we have write in grpc.aio.Call
class. The person who implements it can decide which way to go.
Also, there is another upside that users can use isinstance(call, asyncio.Task)
logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm in favor of resolving this during the implementation :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late reply. I was busy handling an internal issue.
L58-python-async-api.md
Outdated
|
||
* `grpc.aio.server` no longer require the application to provide a | ||
`ThreadPoolExecutor`; | ||
* Interfaces returning `Future` object are replaced. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I fully understand your argument. What is your conclusion here? Do you support returning an asyncio.Task
object?
L58-python-async-api.md
Outdated
* `grpc.aio.UnaryUnaryMultiCallable` and `grpc.aio.StreamUnaryMultiCallable` | ||
returns `Awaitable` object for `__call__`; | ||
* `grpc.aio.UnaryUnaryMultiCallable` and `grpc.aio.StreamUnaryMultiCallable` | ||
returns `(Awaitable, grpc.aio.Call)` for `with_call`; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the sake of simplicity and for having an orthogonal API with the synchronous one I would go for having a with_call
with an interface like this one
response, call = await stub.Hi.with_call(...)
task = asyncio.ensure_future(stub.Hi.with_call(...))
..
..
response, call = task.result()
So returning basically an awaitable object implemented by the coroutine, and having the coroutine the ability to return a tuple with two objects when it has finished.
IMO, if I'm not missing something, this would implement the same contract as the synchronous one and will make the implementation under the hood easier since its only a matter of returning two parameters instead of one when the with_call
is used.
Regarding the object to return for the call, we might return the same object grpc.aio.Call
that we could be using for the future()
method but without yielding if the await
expr is used, take a look to the Future
implementation of Asyncio [1].
The following should work, I did not test it, and the user should not suffer any context switch:
response, call = await stub.Hi.with_call(...)
await call
print(call.status)
Also a bit related, we would need to implement the cancelation of a task, which would impact the with_call
and the __call__
methods.
For example this should be doable:
task = asyncio.ensure_future(stub.Hi.with_call(...))
task.cancel()
Which theoretically should be implementable by changing a bit the call.pyx.pxi
[2] file with something like
try:
await self._waiter_call
except CancelledError:
if not self._canceled:
# Behind the scenes ends up by calling the grpc_call_cancel
# gRPC method
self.cancel()
.....
Later on, this cancellation mechanism might be used by the grpc.aio.Call.cancel
the method, which behind the scenes would cancel the task.
[1] https://github.com/python/cpython/blob/master/Lib/asyncio/futures.py#L260
[2] https://github.com/Skyscanner/grpc/pull/1/files#diff-d07d1d5928e4bc4defeb2e1218c57186R136
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope we can merge the __call__
, with_call
and futures
. It can be much simpler. By default, the invocation to the stub will return an asyncio.Task
object. Users can control the life cycle of that RPC by invoking methods on that object. On the other hand, if users just care about the result, they can await
on the Task
.
# Normal call
response = await stub.Hi(HiRequest)
# In the case of cancel
call = stub.Hi(HiRequest)
if number_of_greeting > 0:
call.cancel()
else:
response = await call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merging __call__
, with_call
would mean not having the same implementation btw the sync and async version of the API, are we ok with that?
If so, I kinda agree with your proposal, but adding the following modifications:
- The three original methods for making a call would be implemented with just one way. So
__call__
,with_call
andfuture
implemented under the__call__
method
leveraging on the awaitable property of agrpc.aio.Call
object, so the following expressions would be valid:
response = await stub.Hi(...)
stub.Hi(...).cancel()
asyncio.wait_for(stub.Hi(...), timeout=10)
- I would move everything that is not related to the call managing from the
grpc.aio.Call
object under a newgrpc.aio.Response
object. In the synchronous version,
the_Rendezvous
[1] implements both features, managing the call and returning properties of the response. If we do not change this, with the proposal of unifying thewith_call
method with the__call__
the user will be forced to repeat the following pattern:
call = stub.Hi(...)
response = await call
print(response)
print(call.status)
The alternative with a dedicated response object would be:
response = await stub.Hi(...)
print(response.content)
print(response.code)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Glad you agree with modification 1. Your modification 2 is a good catch that I overlooked the situation when users want to check status-code
and status-message
. It is a valid option; many frameworks are using it.
A slight drawback is that if we are going to add a grpc.aio.Response
, we might want to also add grpc.aio.Request
, grpc.aio.ResponseIterator
etc.. To start a new pattern, we should be cautious and discuss them thoroughly.
To solve this problem, this is my proposal. In current design, if a call finishes abnormally (status not OK), an exception will be raised. We can use the same pattern in the new API.
try:
response = await stub.Hi(...)
except grpc.RpcError as rpc_error:
print(rpc_error.code())
print(rpc_error.details())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any other attribute that the user might need when there is no error? What about initial and trailing metadata [1]?
So just for summarizing the output of the discussion:
- Won't be any more
__call__
,with_call
andfuture
methods, everything will be handled using the unique__call__
available. - The
__call__
method will return agrpc.aio.Call
object which would be awaitable, returning the body once the task is done.
am I right? If the metadata and trailing metadata are not an issue, I'm completely Ok with that proposal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Metadata does matter.
As you suggested, the metadata can be accessed with slightly different way. It separate the receiving of metadata and the entire response message. E.g. with_call
returns only after the RPC is finished. It prevents users from reacting to initial_metadata
if received early.
call = stub.AComplicateHi(...)
if validate(call.initial_metadata()):
response = await call
else:
call.cancel()
IMO this proposal should facilitate the usage to Call
objects since the logic suits asyncio
environment better.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
umm, you mean:
initial_metadata = await call.initial_metadata()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
@gnossen @pfreixes @argaen @rmariano Please take another look.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the work done here!
If there is enough agreement about the Unified Stub call [1], could you please create an issue in the gRPC Easy board? We can start working on this ASAP. |
@pfreixes Filed as grpc/grpc#20001 |
* Explicitly define the surface API interfaces * Add new streaming API pattern * Add context propagation feature * Add typing feature * Add more rationale about two stacks
@euroelessar @pfreixes @gnossen PTAL. I have updated the proposal, and here are the four major sections I added in this batch. Comments, suggestions are welcomed.
|
L58-python-async-api.md
Outdated
So, this gRFC introduces a new pattern of streaming API that reads/writes | ||
message to peer with explicit call. | ||
|
||
#### Snippet For New Streaming API |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe I'm missing something, but the snippet for "new streaming API" looks way more confusing and less fluent than the "current streaming API", so I'm a bit lost.
Btw, you're talking about "current streaming API" but you're talking about an API that AFAIK doesn't exist yet and it's being proposed in the proposal - hence my confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed it into Reader / Writer
API for the new one, and Async Iterator
for the existing one. Hope this will reduce the confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much better! Thanks.
L58-python-async-api.md
Outdated
request = echo_pb2.EchoRequest(message="ping") | ||
await streaming_call.write(request) | ||
response = await streaming_call.read() | ||
while response: # or response is not grpc.aio.EOF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: EOF = end of file, perhaps EOS is more fitting? (I have zero context here, so feel free to ignore me).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me, EOF
is more conventional to indicate the state that no more data to read.
unused_request_iterator, | ||
context: grpc.aio.ServicerContext | ||
) -> None: | ||
bootstrap_request = await context.read() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought: having access to context.read() might not be necessary as (IMHO) anext(request_iterator) does exactly the same thing, but it it might make sense to have servicer_context.read() for consistency as on the client side, one can read using call.read() (and there's no iterator access available on the client side?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussing in https://github.com/grpc/proposal/pull/155/files#r331248629 thread.
Let's move the discussion there.
Client side do have iterators. This API is meant to improve usability to make complex use case easier. Users can decide whether they want to use the reader / writer API or the iterator API.
|
||
Notably, both new and current streaming API allows read and write messages | ||
simultaneously in different thread or coroutine. However, **the application should** | ||
**not mixing new and current streaming API at the same time**. Reading or writing to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though mixing the two APIs is not supported officially, it might make sense to implement in a way that this is ok to do if it's relatively easy to achieve (e.g. servicer.read() could just call anext(request_iterator) and that's it?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, yes. Discussed at https://github.com/grpc/proposal/pull/155/files#r334628099. Wrapping one API over another is a good idea.
The conflict here is that I'm not sure if we should allow users to mixing the usage. To me, it doesn't look like good programming practice, and error-prone (blocking read in multiple coroutine).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! 👍
### Introduce Typing To Generated Code | ||
|
||
Typing makes a difference. This gRFC intended to introduce typing to gRPC Python | ||
as much as we can. To change generated code, we need to update the gRPC protoc | ||
plugin to generate Python 3 stubs and servicers. To date, **this gRFC intended to** | ||
**make the new async API compatible with existing generated code**. So, the Python 3 | ||
generated code will have two options: | ||
|
||
1. Add a third generation mode to protoc plugin that generates stubs / servicers | ||
with typing; | ||
2. Generating an additional file for the typed code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear from this description whether this would support protobuf messages generated from libraries other than the official one. I would like to request such a feature, and grpclib
provides a nice example for how it could be done as it includes a typing protocol (similar to a golang interface) specifically for this:
class IProtoMessage(Protocol):
@classmethod
def FromString(cls, s: bytes) -> 'IProtoMessage': ...
def SerializeToString(self) -> bytes: ...
From https://github.com/vmagamedov/grpclib/blob/master/grpclib/_protocols.py#L20-L24
Every method of a generated server stub takes in such a protocol type for the request/response message type, allowing you to pass any object which implements those methods and have a type checker like mypy
still work. While the generated output of grpclib
is a little cryptic it essentially boils down to something like this:
class MyServiceStub:
async def ListItems(request: IProtoMessage) -> IProtoMessage:
...
The reason I ask, is because I have an alternative protoc
plugin for Python (see https://github.com/danielgtaylor/python-betterproto) which outputs Python 3.7+ dataclasses which also support that protocol described above. They just work with grpclib
and I would love to have them just work with the official async grpcio
as well. The output of my plugin looks like this:
# Generated by the protocol buffer compiler. DO NOT EDIT!
# sources: int32.proto
# plugin: python-betterproto
from dataclasses import dataclass
import betterproto
@dataclass
class Test(betterproto.Message):
"""Some documentation about the Test message."""
# Some documentation about the count.
count: int = betterproto.int32_field(1)
The base class betterproto.Message
implements the FromString
and SerializeToString
methods, so I can basically do this and it just works:
stub = MyServiceStub(grpc_channel)
request = Test()
response = await stub.ListItems(request)
It would be great to give my users the choice of which gRPC library to use instead of me having to choose one for them. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before we dive into details, let me point to lidizheng/grpc-api-examples#1 where we are having discussion about how typing should be done in AsyncIO.
I found grpclib
is using grpc_tools
which is the protoc
plugin maintained by grpc/grpc repo. Current code generation doesn't support typing, so it allows application to pass in any type of messages. In near future, we are planning to support stubs from existing generated code, so applications should have the choice to between old stubs and new stubs.
@gnossen who might be interested in the usability improvement of betterproto
.
dataclass
and Protocol
are great 3.8 features, maybe this effort can include them as well, some day.
By other asynchronous libraries, they mean libraries that provide their own | ||
Future, Coroutine, Event-loops, like `gevent`/`Twisted`/`Tornado`. In general, | ||
it is challenging to support multiple async mechanisms inter-operate together, | ||
and it has to be discussed case by case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not expert in Curio
or Trio
at this moment, if I'm honest. I would appreciate if you can provide information about how they interact with AsyncIO and possibility of interacting with other async libraries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Amazing! Looking forward to integrate on Guillotina framework!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
An object that is both a Call for the RPC and an iterator of | ||
response values. Drawing response values from the returned | ||
Call-iterator may raise RpcError indicating termination of | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also mention the presence of new exception classes, like AioRpcError
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I adding a section description for exceptions that on our road map: grpc.BaseError
, grpc.AbortError
, and grpc.UsageError
.
As for grpc.RpcError
, existing design of the grpc.RpcError
class is empty (no methods / attributes). It generates confusion for many new gRPC users that it is unclear why the caught exception type doesn't contain any information. I hope at some point, we should promote the design of AioRpcError
to grpc.RpcError
.
Hi, any update on when this might be merged? |
Hey we see this helloworld example that does the unary_unary rpc. is there a working example for unary_stream, stream_unary or stream_stream with aio code? we also saw this example but it's also pseudo code. We could really benefit from some real working async code if there is any. Thanks |
@wintonzheng We are still working on examples. For now, you can check our unit tests for API usage, e.g., https://github.com/grpc/grpc/blob/master/src/python/grpcio_tests/tests_aio/unit/call_test.py. |
Link to gRFC: https://github.com/lidizheng/proposal/blob/grpc-python-async-api/L58-python-async-api.md
Discussions (besides this PR):
Fixes grpc/grpc#19494
Fixes grpc/grpc#19495