From 6e757a3af92cbc8e206f50de86289d7a42bfda2c Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 10 Jul 2019 13:08:36 -0700 Subject: [PATCH 01/28] Draft of Async API for gRPC Python --- L58-python-async-api.md | 421 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 421 insertions(+) create mode 100644 L58-python-async-api.md diff --git a/L58-python-async-api.md b/L58-python-async-api.md new file mode 100644 index 000000000..9ca5bfaa0 --- /dev/null +++ b/L58-python-async-api.md @@ -0,0 +1,421 @@ +Async API for gRPC Python +---- +* Author(s): lidiz +* Approver: a11r +* Status: Draft +* Implemented in: Python +* Last updated: 2019-07-09 +* Discussion at: __TBD__ + +## Abstract + +A brand new set of async API that will solve concurrency issues and performance +issues for gRPC Python. This set of API is available to Python 3.6+. + +## Motivation + +* Asynchronous processing that perfectly fits IO-intensive gRPC use cases; +* Resolve a long-living design flaw of thread exhaustion problem; +* Performance is much better than the multi-threading model. + +## Background + +### What Is Asynchronous I/O In Python? + +Quote from [`asyncio`](https://docs.python.org/3/library/asyncio.html) package +documentation: + +> `asyncio` is a library to write concurrent code using the `async`/`await` +> syntax. +> +> `asyncio` is used as a foundation for multiple Python asynchronous frameworks +> that provide high-performance network and web-servers, database connection +> libraries, distributed task queues, etc. + +In the asynchronous I/O model, the computation tasks are packed as generators +(aka. coroutines) that will yield the ownership of thread while it is blocked by +IO operations or wait for other tasks to complete. + +The design of coroutine in Python can trace back to Python 2.5 in [PEP 342 -- +Coroutines via Enhanced Generators](https://www.python.org/dev/peps/pep-0342/). +It introduces new generator methods like `send`, `throw`, and `close`. With +these methods, the caller of the generator can pass values and exceptions into a +paused generator function, hence diminish the need for wiring complex callbacks. + +To further simplify the usage of a coroutine, Python 3.5 introduces +`async`/`await` syntax that can easily transform a normal function into a +coroutine ([PEP 492](https://www.python.org/dev/peps/pep-0492/)). And the +`Twisted` style event loop abstractions is introduced in [PEP +3156](https://www.python.org/dev/peps/pep-3156/). + +Another important piece is the asynchronous generator. It is introduced in +Python 3.6 by [PEP 525](https://www.python.org/dev/peps/pep-0525/). + +To read more about `asyncio` implementation in CPython, see: +* [C-Level + implementation](https://github.com/python/cpython/blob/3.7/Modules/_asynciomodule.c) +* [Python-Level + implementation](https://github.com/python/cpython/blob/3.7/Lib/asyncio) + + + +### Python's Future + +Currently, there are two futures in the standard library. + +* [`concurrent.futures.Future`](https://github.com/python/cpython/blob/master/Lib/concurrent/futures/_base.py#L313) + added in Python 3.2 +* [`asyncio.Future`](https://github.com/python/cpython/blob/master/Lib/asyncio/futures.py) + added in Python 3.4 + +They are built for different threading models. Luckily, the CPython maintainers +are actively trying to make them compatible (they are **almost** compatible). By +the time this design doc is written, comparing to `concurrent.futures.Future`, +`asyncio.Future` has the following difference: + +> - This class is not thread-safe. +> +>- result() and exception() do not take a timeout argument and raise an +> exception when the future isn't done yet. +> +>- Callbacks registered with add_done_callback() are always called via the event +> loop's call_soon(). +> +>- This class is not compatible with the wait() and as_completed() methods in +> the concurrent.futures package. + +gRPC Python also has its definition of +[`Future`](https://grpc.github.io/grpc/python/grpc.html#future-interfaces) +interface since 2015. The concurrency story by that time is not that clear as +now. Here is the reasoning of why gRPC Python needed a dedicated `Future` +object, and incompatibilities: + +> Python doesn't have a Future interface in its standard library. In the absence +> of such a standard, three separate, incompatible implementations +> (concurrent.futures.Future, ndb.Future, and asyncio.Future) have appeared. +> This interface attempts to be as compatible as possible with +> concurrent.futures.Future. From ndb.Future it adopts a traceback-object +> accessor method. +> +> Unlike the concrete and implemented Future classes listed above, the Future +> class defined in this module is an entirely abstract interface that anyone may +> implement and use. +> +> The one known incompatibility between this interface and the interface of +> concurrent.futures.Future is that this interface defines its own +> CancelledError and TimeoutError exceptions rather than raising the +> implementation-private concurrent.futures._base.CancelledError and the +> built-in-but-only-in-3.3-and-later TimeoutError. + +Although the design of `Future` in Python finally settled down, it's not +recommended to expose low-level API like `asyncio.Future` object. The Python +documentation suggests that we should let the application to decide which +`Future` implementation they want to use, and hide the ways to operate them +directly. + +> The rule of thumb is to never expose Future objects in user-facing APIs, and +> the recommended way to create a Future object is to call loop.create_future(). +> This way alternative event loop implementations can inject their own optimized +> implementations of a Future object. + +### Python Coroutines in `asyncio` + +Python has several types of coroutine objects (see +[Coroutines](https://docs.python.org/3/reference/datamodel.html#coroutines)). +But this section will focus on how they interact with `asyncio` module. In a +single-threaded application, creating a coroutine object doesn't necessarily +mean it is scheduled to be executed in the event loop. + +The functions defined by `async def` is primarily a function that returns a +Python generator. If the program calls an `async def` function, it will NOT be +executed. This behavior is one of the main reason why mixing `async def` +function with normal function is a bad idea. + +There are three mechanisms to schedule coroutines: + +1. Await the coroutine; +2. Submit the coroutine to the event loop object; +3. Turn coroutine into an `asyncio.Task` object. + +## Proposal + +This gRFC intended to fully utilize `asyncio` and create a Pythonic paradigm for +gRPC Python programming in Python 3 runtime. + +### Two Sets of API In One Package + +The new API will be isolated from the current API. The implementation of the new +API is an entirely different stack than the current stack. On the downside, most +gRPC objects and definitions can't be shared between these two sets of API. +After all, it is not a good practice to introduce uncertainty (return coroutine +or regular object) to our API. This decision is made to respect our contract of +API since GA in 2016, including the underlying behaviors. **Developers who use +our current API should not be affected by this effort.** + +For users who want to migrate to new API, the granularity migration is per +channel, per server level. For both channel-side and server-side, the adoption +of `asyncio` does not only include cosmetic changes of adding `async`/`await` +keywords throughout the code base, it also requires thinking about the change +from a potentially multi-threaded application to a single-threaded asynchronous +application. For example, the synchronization methods have to be changed to use +the `asyncio` edition (see [Synchronization +Primitives](https://docs.python.org/3/library/asyncio-sync.html)). + +### Overview of Async API + +All `asyncio` related API will be kept inside the `grpc.aio` module. Take the +most common use case as an example, the new API will diverge from the old API +since the creation of `Channel` and `Server` object. New creation methods are +added to instantiate async `Server` or `Channel` object: + +```Python +import grpc +server = grpc.aio.server(...) +channel = grpc.aio.insecure_channel(...) +channel = grpc.aio.secure_channel(...) +channel = grpc.aio.intercept_channel(...) +``` + +To reduce cognitive burden, the new `asyncio` API should share the same +parameters as the current API, and keep the usage similar, except the following +cases: + +* `grpc.aio.server` no longer require the application to provide a + `ThreadPoolExecutor`; +* Interfaces returning `Future` object are replaced. + +### Channel-Side + +Changes in `grpc.aio.Channel`: +* The object returned by `unary_unary`, `unary_stream`, `stream_unary`, and + `stream_tream` will be `MultiCallable` objects in `grpc.aio`. + +Changes in `grpc.aio.Call`: +* All methods return `asyncio.Future` object. + +Changes for `MultiCallable` classes in `grpc.aio`: +* `grpc.aio.UnaryUnaryMultiCallable` and `grpc.aio.StreamUnaryMultiCallable` + returns `Awaitable` object for `__call__`; +* `grpc.aio.UnaryUnaryMultiCallable` and `grpc.aio.StreamUnaryMultiCallable` + returns `(Awaitable, grpc.aio.Call)` for `with_call`; +* `grpc.aio.UnaryStreamMultiCallable` and `grpc.aio.StreamStreamMultiCallable` + returns an asynchronous generator for `__call__`; +* `grpc.aio.StreamUnaryMultiCallable` and `grpc.aio.StreamStreamMultiCallable` + takes in an asynchronous generator as `request_iterator` argument. + +### Server-Side + +Changes in `grpc.aio.Server`: +* `add_generic_rpc_handlers` method accepts tuple of + `grpc.aio.GenericRpcHandler`; +* `wait_for_termination` method is a coroutine; +* `stop` returns a `asyncio.Event`. + +Changes in `grpc.aio.HandlerCallDetails`: +* `invocation_metadata` method is a coroutine. + +Changes in `grpc.aio.GenericRpcHandler`: +* `service` method takes in `grpc.aio.HandlerCallDetails`; +* `service` method returns a `grpc.aio.RpcMethodHandler`. + +Changes in `grpc.aio.RpcMethodHandler`: +* All servicer handlers are coroutines; +* (Debatable) Deserializer / serializer remain normal Python functions. + +Changes in `grpc.ServicerContext`: +* `invocation_metadata` method returns an `asyncio.Future` object; +* `send_initial_metadata` method is a coroutine and returns an `asyncio.Task` + object. + +Changes in `grpc.aio.*_*_rpc_method_handler`: +* Accepts a coroutine; +* Returns a `grpc.aio.RpcMethodHandler`. + +Changes in `grpc.aio.method_handlers_generic_handler`: +* `method_handlers` is a dictionary that maps method names to corresponding + `grpc.aio.RpcMethodHandler`; +* Returns `grpc.aio.GenericRpcHandler`. + +### Interceptor + +Changes in `grpc.aio.ServerInterceptor`: +* Accepts `grpc.aio.HandlerCallDetails`; +* `continuation` is a function accepts `grpc.aio.HandlerCallDetails` and returns + `grpc.aio.RpcMethodHandler`. + +Changes in `ClientInterceptor` classes in `grpc.aio`: +* `continuation` is a coroutine that returns `Awaitable` object. + +### Utility Functions + +Changes in `grpc.aio.channel_ready_future`: +* Accepts a `grpc.aio.Channel`; +* Returns a coroutine object. + +### Shared APIs + +APIs in the following categories remain in top level: +* Credentials related classes including for channel, call, and server; +* Channel connectivity Enum class; +* Status code Enum class; +* Compression method Enum class; +* `grpc.RpcError` exception class; +* `grpc.RpcContext` class; +* `grpc.ServiceRpcHandler` class; + +### Re-Use ProtoBuf Generated Code For Services + +Here is an example of the core part of the gRPC ProtoBuf plugin generated code: + +```Python +# Client-side stub +class GreeterStub(object): + def __init__(self, channel): + self.SayHello = channel.unary_unary(...) + +# Server-side servicer +def add_GreeterServicer_to_server(servicer, server): + ... +``` + +As you can see, the `channel` and `server` object are passed in by the +application. Thanks to the dynamic nature of Python, the generated code is +agnostic to which set of API the application uses, as long as we respect the +method name in our API contract. + +### API Reference Doc Generation + +The async API related classes and functions, including shared APIs, should be +generated on the same page. + +## Controversial Details + +### Generator Or Asynchronous Generator? + +For streaming calls, the requests on the client-side are supplied by generator +for now. If a user wants to provide a pre-defined list of request messages, they +can use build-in `iter()` function. But there isn't an equivalent function for +async generator. Should we wrap it inside our library to increase usability? + +```Python +### Current usage of Python generator +stub.SayHelloStreaming(iter([ + HelloRequest(name='Golden'), + HelloRequest(name='Retriever'), + HelloRequest(name='Pan'), + HelloRequest(name='Cake'), +])) + +### The new usage is much verbose for same scenario +class AsyncIter: + def __init__(self, items): + self.items = items + + async def __aiter__(self): + for item in self.items: + yield item + +stub.SayHelloStreaming(AsyncIter([ + HelloRequest(name='Golden'), + HelloRequest(name='Retriever'), + HelloRequest(name='Pan'), + HelloRequest(name='Cake'), +])) +``` + +### Special Async Functions Name + +Fire-and-forget is a valid use case in async programming. For the non-critical +tasks, the program may schedule the execution without checking the result. As +mentioned in "Python Coroutines in `asyncio`" section, the coroutine won't be +scheduled unless we explicitly do so. A dedicated prefix or suffix of the +function should help to remind developers to await the coroutine. + +```Python +### Forget to await RPC +while 1: + stub.ReportLoad(ReportRequest( + timestamp=..., + metrics=[...], + )) # <- RPC not sent + await asyncio.sleep(3) + +### Await on whatever function starts with "Async" +while 1: + await stub.AsyncReportLoad(ReportRequest( + timestamp=..., + metrics=[...], + )) # <- RPC not sent + await asyncio.sleep(3) +``` + +CPython developers also consider this problem. Their solution is that if a +coroutine object gets deallocated without execution, the interpreter will log an +`RuntimeWarning` to the standard error output. + +``` +RuntimeWarning: coroutine '...' was never awaited +``` + +Except for the `MultiCallable` classes, I have inspected our APIs and found no +particularly valid use case for the fire-and-forget pattern. + +### Story for testing + +For new `asyncio` related behavior, we will write unit tests dedicated to the +new stack. However, currently, there are more than a hundred test cases in gRPC +Python. It's a significant amount of work to refactor each one of them for the +new API. + +A straight forward solution is building a wrapper over async API to simulate +current API. The wrapper itself shouldn't be too hard to implement. However, +some tests are tightly coupled with the implementation detail that might break; +manual labor may still be required for this approach. + +We should compare the cost of manual refactoring and wrapper when we want to +reuse the existing test cases. + +## Rationale + +### Compatibility With Other Asynchronous Libraries + +By other asynchronous libraries, I meant libraries that provide their own +Future, Coroutine, Event-loops, like `gevent`/`Twisted`/`Tornado`. In general, +it is challenging to support multiple async mechanisms inter-operate together, +and it has to be discussed case by case. + +For `Tornado`, they are using `asyncio` underneath their abstraction of +asynchronous mechanisms. After v6.0.0, they dropped support for Python 2 +entirely. So, it should work fine with our new API. + +For `Twisted`, with some monkey patch code to connect its `Deferred`/`Future` +object to `asyncio`'s API, it should work. + +For `gevent`, unfortunately, it works by monkey-patching Python APIs that +including `threading` and various transport library, and the work is executed in +`gevent` managed event loop. `gevent` and `asyncio` doesn't compatible with each +other out of box. + +### Wrapping Async Stack To Provide Current API Publicly + +1) It would be quite confusing to mix functions that return `asyncio` coroutine + and normal Python functions; +2) We don't want to imply that switching to use the new stack requires zero code + changes. +3) Also, the current contract of API constraint ourselves from changing their + behavior after GA for two years. + +## Implementation + +* N/A yet From 6cb840cf7c23137893289b4d8c750ae66cee0315 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Mon, 22 Jul 2019 10:30:26 -0700 Subject: [PATCH 02/28] Polish wording --- L58-python-async-api.md | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 9ca5bfaa0..9440c7975 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -10,11 +10,11 @@ Async API for gRPC Python ## Abstract A brand new set of async API that will solve concurrency issues and performance -issues for gRPC Python. This set of API is available to Python 3.6+. +issues for gRPC Python, which is available to Python 3.6+. ## Motivation -* Asynchronous processing that perfectly fits IO-intensive gRPC use cases; +* Asynchronous processing perfectly fits IO-intensive gRPC use cases; * Resolve a long-living design flaw of thread exhaustion problem; * Performance is much better than the multi-threading model. @@ -118,8 +118,8 @@ object, and incompatibilities: > implementation-private concurrent.futures._base.CancelledError and the > built-in-but-only-in-3.3-and-later TimeoutError. -Although the design of `Future` in Python finally settled down, it's not -recommended to expose low-level API like `asyncio.Future` object. The Python +Although the design of `Future` in Python finally settled down, it's **not +recommended** to expose low-level API like `asyncio.Future` object. The Python documentation suggests that we should let the application to decide which `Future` implementation they want to use, and hide the ways to operate them directly. @@ -131,22 +131,19 @@ directly. ### Python Coroutines in `asyncio` -Python has several types of coroutine objects (see -[Coroutines](https://docs.python.org/3/reference/datamodel.html#coroutines)). -But this section will focus on how they interact with `asyncio` module. In a -single-threaded application, creating a coroutine object doesn't necessarily -mean it is scheduled to be executed in the event loop. +In a single-threaded application, creating a coroutine object doesn't +necessarily mean it is scheduled to be executed in the event loop. -The functions defined by `async def` is primarily a function that returns a +The functions defined by `async def`, underlying, is a function that returns a Python generator. If the program calls an `async def` function, it will NOT be executed. This behavior is one of the main reason why mixing `async def` function with normal function is a bad idea. There are three mechanisms to schedule coroutines: -1. Await the coroutine; -2. Submit the coroutine to the event loop object; -3. Turn coroutine into an `asyncio.Task` object. +1. Await the coroutine `await asyncio.sleep(1)`; +2. Submit the coroutine to the event loop object `loop.call_soon(coro)`; +3. Turn coroutine into an `asyncio.Task` object `asyncio.ensure_future(coro)`. ## Proposal @@ -231,7 +228,7 @@ Changes in `grpc.aio.GenericRpcHandler`: Changes in `grpc.aio.RpcMethodHandler`: * All servicer handlers are coroutines; -* (Debatable) Deserializer / serializer remain normal Python functions. +* Deserializer / serializer remain normal Python functions. Changes in `grpc.ServicerContext`: * `invocation_metadata` method returns an `asyncio.Future` object; @@ -367,9 +364,9 @@ coroutine object gets deallocated without execution, the interpreter will log an ``` RuntimeWarning: coroutine '...' was never awaited ``` - + ### Story for testing @@ -418,4 +415,4 @@ other out of box. ## Implementation -* N/A yet +* TBD From 2f120be5a2a5f8f635b3d2032593ff6c61462ef9 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Mon, 22 Jul 2019 10:34:03 -0700 Subject: [PATCH 03/28] Add related issues --- L58-python-async-api.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 9440c7975..c8ffde001 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -413,6 +413,11 @@ other out of box. 3) Also, the current contract of API constraint ourselves from changing their behavior after GA for two years. +## Related Issues + +* https://github.com/grpc/grpc/issues/6046 +* https://github.com/grpc/grpc/issues/18376 + ## Implementation * TBD From a1a486e86300c40bcc1b24388aeedce2bf82cceb Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Mon, 22 Jul 2019 10:47:24 -0700 Subject: [PATCH 04/28] Add demo code snippet --- L58-python-async-api.md | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index c8ffde001..128f16bef 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -192,6 +192,36 @@ cases: `ThreadPoolExecutor`; * Interfaces returning `Future` object are replaced. +### Demo Snippet of Async API + +Server side: +```Python +class AsyncGreeter(helloworld_pb2_grpc.GreeterServicer): + + async def SayHello(self, request, context): + await asyncio.sleep(1) + return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name) + +server = grpc.aio.server() +server.add_insecure_port(":50051") +helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) +server.start() +await server.wait_for_termination() +``` + +Client side: +```Python +# Python 3.6 and lower +with grpc.aio.insecure_channel("localhost:50051") as channel: + stub = echo_pb2_grpc.EchoStub(channel) + response = await stub.Hi(echo_pb2.EchoRequest(message="ping")) + +# Or using asynchronous context manager for Python3.7 and up +async with grpc.aio.insecure_channel("localhost:50051") as channel: + stub = echo_pb2_grpc.EchoStub(channel) + response = await stub.Hi(echo_pb2.EchoRequest(message="ping")) +``` + ### Channel-Side Changes in `grpc.aio.Channel`: From 600dc5e5cab57d1e849af09ac79adb51d228c0b9 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Mon, 22 Jul 2019 10:49:49 -0700 Subject: [PATCH 05/28] Add grpc-io discussion link --- L58-python-async-api.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 128f16bef..1cc412f7c 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -4,8 +4,8 @@ Async API for gRPC Python * Approver: a11r * Status: Draft * Implemented in: Python -* Last updated: 2019-07-09 -* Discussion at: __TBD__ +* Last updated: 2019-07-22 +* Discussion at: https://groups.google.com/forum/#!topic/grpc-io/7V7HYM_aph4 ## Abstract From 7e10026a1a31e26e60b053fc8e3756227a20ccb8 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Mon, 22 Jul 2019 13:05:27 -0700 Subject: [PATCH 06/28] Correct comment in code snippet --- L58-python-async-api.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 1cc412f7c..151486a38 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -383,7 +383,7 @@ while 1: await stub.AsyncReportLoad(ReportRequest( timestamp=..., metrics=[...], - )) # <- RPC not sent + )) await asyncio.sleep(3) ``` From 7d7f0f3f1ee1e2e4787302a215f24dfe05254cf8 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 23 Jul 2019 16:00:45 -0700 Subject: [PATCH 07/28] Adopt reviewers advice --- L58-python-async-api.md | 27 +++------------------------ 1 file changed, 3 insertions(+), 24 deletions(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 151486a38..69cbff451 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -57,19 +57,6 @@ To read more about `asyncio` implementation in CPython, see: * [Python-Level implementation](https://github.com/python/cpython/blob/3.7/Lib/asyncio) - - ### Python's Future Currently, there are two futures in the standard library. @@ -211,12 +198,6 @@ await server.wait_for_termination() Client side: ```Python -# Python 3.6 and lower -with grpc.aio.insecure_channel("localhost:50051") as channel: - stub = echo_pb2_grpc.EchoStub(channel) - response = await stub.Hi(echo_pb2.EchoRequest(message="ping")) - -# Or using asynchronous context manager for Python3.7 and up async with grpc.aio.insecure_channel("localhost:50051") as channel: stub = echo_pb2_grpc.EchoStub(channel) response = await stub.Hi(echo_pb2.EchoRequest(message="ping")) @@ -229,7 +210,7 @@ Changes in `grpc.aio.Channel`: `stream_tream` will be `MultiCallable` objects in `grpc.aio`. Changes in `grpc.aio.Call`: -* All methods return `asyncio.Future` object. +* All methods return coroutine object. Changes for `MultiCallable` classes in `grpc.aio`: * `grpc.aio.UnaryUnaryMultiCallable` and `grpc.aio.StreamUnaryMultiCallable` @@ -261,7 +242,7 @@ Changes in `grpc.aio.RpcMethodHandler`: * Deserializer / serializer remain normal Python functions. Changes in `grpc.ServicerContext`: -* `invocation_metadata` method returns an `asyncio.Future` object; +* `invocation_metadata` method returns a coroutine object; * `send_initial_metadata` method is a coroutine and returns an `asyncio.Task` object. @@ -287,6 +268,7 @@ Changes in `ClientInterceptor` classes in `grpc.aio`: ### Utility Functions Changes in `grpc.aio.channel_ready_future`: +* Renamed into `grpc.aio.channel_ready`; * Accepts a `grpc.aio.Channel`; * Returns a coroutine object. @@ -394,9 +376,6 @@ coroutine object gets deallocated without execution, the interpreter will log an ``` RuntimeWarning: coroutine '...' was never awaited ``` - ### Story for testing From 8cad0462973826e2ece0867bda90df514e82faad Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 8 Aug 2019 15:58:55 -0700 Subject: [PATCH 08/28] Adopt reviewer's advice and resolve some discussion --- L58-python-async-api.md | 150 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 144 insertions(+), 6 deletions(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 69cbff451..5fc322dad 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -1,10 +1,10 @@ Async API for gRPC Python ---- -* Author(s): lidiz -* Approver: a11r -* Status: Draft +* Author(s): lidizheng +* Approver: gnossen +* Status: In Review * Implemented in: Python -* Last updated: 2019-07-22 +* Last updated: 2019-08-07 * Discussion at: https://groups.google.com/forum/#!topic/grpc-io/7V7HYM_aph4 ## Abstract @@ -187,7 +187,7 @@ class AsyncGreeter(helloworld_pb2_grpc.GreeterServicer): async def SayHello(self, request, context): await asyncio.sleep(1) - return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name) + return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name) server = grpc.aio.server() server.add_insecure_port(":50051") @@ -310,6 +310,79 @@ generated on the same page. ## Controversial Details +### Unified Stub Call + +Currently, for RPC invocations, gRPC Python provides 3 options: `__call__`, +`with_call`, and `futures`. They behaves slightly different in whether block or +not, and the number of return values (see +[UnaryUnaryMultiCallable](https://grpc.github.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable) +for details). Developers have to bare in mind the subtlety between these 3 +options across 4 types of RPC (not all combination are supported). + +Semantically, those 3 options grant users ability to: +1. Directly getting the RPC response. +2. Check metadata of the RPC. +3. Control the life cycle of the RPC. +4. Handle the RPC failure. + +It is understandable for current design to provide different options, since +Python doesn't have a consensus `Future` before. The simplicity of an RPC call +will be ruined if the original designer of gRPC Python API tries to merge those +3 options. + +However, thanks to the official definition of `asyncio.Task`/`asyncio.Future`. +They can be merged into one method by returning a `grpc.aio.Call` that extends +(or composites) `asyncio.Task`. + +In `asyncio`, it is expected to `await` on asynchronous operations. Hence, it is +nature for stub calls to return an `asyncio.Task` compatible object, which +representing the RPC call. + +Also, the `grpc.aio.Call` object provides gRPC specific semantics to manipulate +the ongoing RPC. So, all above functionality can be solved by one single merged +method. + +```Python +# Usage 1: A simple call +response = await stub.Hi(...) + +# Usage 2: Check metadata +call = stub.Hi(...) +if validate(await call.initial_metadata()): + response = await call + print(f'Getting response [{response}] with code [{call.code()}]') +else: + raise ValueError('Failed to validate initial metadata') + +# Usage 3: Control the life cycle +call = stub.Hi(...) +await async_stuff_that_takes_time() +if call.is_active() and call.time_remaining() < REMAINING_TIME_THRESHOLD: + call.cancel() + +# Usage 4: Error handling +try: + response = await stub.FailedHi(...) +except grpc.RpcError as rpc_error: + print(f'RPC failed: {rpc_error.code()}') +``` + +### Support Thread And Process Executors + +The new API intended to solve all concurrency issue with asynchronous I/O. +However, it also introduces a migration challenge for our users. For users who +want to gradually migrate from current stack to async stack, their potentially +non-asyncio native logic may block the entire thread. If the thread is blocked, +then the event loop will be blocked, then the whole process will end up +deadlocking. + +This is not needed for users who are willing to build entire program upon +`asyncio`, and it is introducing complex concurrency maintenance burden. + +Also, by supporting this executors, we can allow users to mix async and sync +method handlers on the server side, which further reduce the burden of +migration. + ### Generator Or Asynchronous Generator? For streaming calls, the requests on the client-side are supplied by generator @@ -377,7 +450,7 @@ coroutine object gets deallocated without execution, the interpreter will log an RuntimeWarning: coroutine '...' was never awaited ``` -### Story for testing +## Story For Testing For new `asyncio` related behavior, we will write unit tests dedicated to the new stack. However, currently, there are more than a hundred test cases in gRPC @@ -392,6 +465,71 @@ manual labor may still be required for this approach. We should compare the cost of manual refactoring and wrapper when we want to reuse the existing test cases. +## Other Official Packages + +Besides `grpcio` package, currently gRPC Python also own: + +* `grpcio-tools` (no service) +* `grpcio-testing` +* `grpcio-reflection` +* `grpcio-health-checking` +* `grpcio-channelz` +* `grpcio-status` (no service) + +Apart from `grpcio-tools` and `grpcio-status`, all other packages have at least +one gRPC service implementation. They will also require migration to adopt +`asyncio`. This design doc propose to keep their current API untouched, and add +new sets of async APIs. + +## Flow Control Enforcement + +To propagate HTTP/2 flow control push back, the new async API needs to aware of +the flow control mechanism. Most complex logic is handled in C-Core, except +there is a single rule the wrapper layer needs to follow: there can only be one +outstanding read/write operation on each call. + +It means if the application fires two write in parallel, one of them have to +wait until the other one finishes. Also, that rule doesn't prohibit reading and +writing at the same time. + +So, even if all the read/write is asynchronous in `asyncio`, we will have to +either enforce the rule ourselves by adding locks in our implementation. Or we +can pass down the synchronization responsibility to our users. + +## Concrete Class Instead of Interfaces + +Interface is a design pattern that defines the contract of an entity that allows +different implementation to work seamlessly in a system. In the past, gRPC +Python has been using metaclass based Python interface pattern. It works just +like interface in Golang and Java, except the error is generated in runtime +instead of compile time. + +If the gRPC Python has multiple implementation for a single interface, the use +of the design pattern provides productivity in unifying their behavior. However, +almost non interfaces has second implementation, even if they do, they are +depends directly on our concrete implementation, which should be considered as +inheritance than "implements". + +Also, in the past, dependents of gRPC Python have observed several failure +caused by the interface. The interface constraints our ability to add +experimental API. Once we change even slightly with interfaces, the downstream +implementations are likely to break. + +Since this is a new opportunity for us to re-design, we need to think cautiously +about how do we empower our users to extend our classes. For majority of cases, +we are providing the only implementation for the interface, and we should +convert them into concrete classes. + +On the other hand, there are actually one valid use case that we should keep +abstract class -- interceptors. To be more specific, the following interfaces +will remain the same: + +* grpc.ServerInterceptor +* grpc.UnaryUnaryClientInterceptor +* grpc.UnaryStreamClientInterceptor +* grpc.StreamUnaryClientInterceptor +* grpc.StreamStreamClientInterceptor + ## Rationale ### Compatibility With Other Asynchronous Libraries From 3f966605ab9abaea37b8b8fd847f30c3e131a828 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 8 Aug 2019 16:09:16 -0700 Subject: [PATCH 09/28] Fix some wording --- L58-python-async-api.md | 54 +++++++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 5fc322dad..b85a87d89 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -178,6 +178,7 @@ cases: * `grpc.aio.server` no longer require the application to provide a `ThreadPoolExecutor`; * Interfaces returning `Future` object are replaced. +* Client RPC invocation merges from `__call__`, `with_call` and `futures` into one. ### Demo Snippet of Async API @@ -201,6 +202,9 @@ Client side: async with grpc.aio.insecure_channel("localhost:50051") as channel: stub = echo_pb2_grpc.EchoStub(channel) response = await stub.Hi(echo_pb2.EchoRequest(message="ping")) + + async for response in stub.StreamingHi(...): + process(response) ``` ### Channel-Side @@ -210,15 +214,12 @@ Changes in `grpc.aio.Channel`: `stream_tream` will be `MultiCallable` objects in `grpc.aio`. Changes in `grpc.aio.Call`: -* All methods return coroutine object. +* All methods return coroutine object; +* Implements `grpc.aio.Task` API for unary response; +* Implements asynchronous generator API for stream response; Changes for `MultiCallable` classes in `grpc.aio`: -* `grpc.aio.UnaryUnaryMultiCallable` and `grpc.aio.StreamUnaryMultiCallable` - returns `Awaitable` object for `__call__`; -* `grpc.aio.UnaryUnaryMultiCallable` and `grpc.aio.StreamUnaryMultiCallable` - returns `(Awaitable, grpc.aio.Call)` for `with_call`; -* `grpc.aio.UnaryStreamMultiCallable` and `grpc.aio.StreamStreamMultiCallable` - returns an asynchronous generator for `__call__`; +* Returns a `grpc.aio.Call` object; * `grpc.aio.StreamUnaryMultiCallable` and `grpc.aio.StreamStreamMultiCallable` takes in an asynchronous generator as `request_iterator` argument. @@ -272,6 +273,12 @@ Changes in `grpc.aio.channel_ready_future`: * Accepts a `grpc.aio.Channel`; * Returns a coroutine object. +```Python +async with grpc.aio.insecure_channel(...) as channel: + await grpc.aio.channel_ready(channel) + ... +``` + ### Shared APIs APIs in the following categories remain in top level: @@ -298,10 +305,9 @@ def add_GreeterServicer_to_server(servicer, server): ... ``` -As you can see, the `channel` and `server` object are passed in by the -application. Thanks to the dynamic nature of Python, the generated code is -agnostic to which set of API the application uses, as long as we respect the -method name in our API contract. +Both `channel` and `server` object are passed in by the application. The +generated code is agnostic to which set of API the application uses, as long as +we respect the method name in our API contract. ### API Reference Doc Generation @@ -376,11 +382,8 @@ non-asyncio native logic may block the entire thread. If the thread is blocked, then the event loop will be blocked, then the whole process will end up deadlocking. -This is not needed for users who are willing to build entire program upon -`asyncio`, and it is introducing complex concurrency maintenance burden. - -Also, by supporting this executors, we can allow users to mix async and sync -method handlers on the server side, which further reduce the burden of +However, by supporting the executors, we can **allow mixing async and sync** +**method handlers** on the server side, which further reduce the cost of migration. ### Generator Or Asynchronous Generator? @@ -507,8 +510,8 @@ instead of compile time. If the gRPC Python has multiple implementation for a single interface, the use of the design pattern provides productivity in unifying their behavior. However, almost non interfaces has second implementation, even if they do, they are -depends directly on our concrete implementation, which should be considered as -inheritance than "implements". +depends directly on our concrete implementation, which should better using +inheritance or composition than interfaces. Also, in the past, dependents of gRPC Python have observed several failure caused by the interface. The interface constraints our ability to add @@ -517,12 +520,12 @@ implementations are likely to break. Since this is a new opportunity for us to re-design, we need to think cautiously about how do we empower our users to extend our classes. For majority of cases, -we are providing the only implementation for the interface, and we should +we are providing the only implementation for the interface. We should convert them into concrete classes. On the other hand, there are actually one valid use case that we should keep abstract class -- interceptors. To be more specific, the following interfaces -will remain the same: +won't be replaced by concrete classes: * grpc.ServerInterceptor * grpc.UnaryUnaryClientInterceptor @@ -560,11 +563,20 @@ other out of box. 3) Also, the current contract of API constraint ourselves from changing their behavior after GA for two years. +## Pending Topics + +Reviewers has thrown many valuable proposals. This design doc may not be the +ideal place for those discussions. +* Re-design streaming API on the server side to replace the iterator pattern. +* Re-design the connectivity API to be consistent with C-Core. +* Design a easy way to use channel arguments [grpc#19734](https://github.com/grpc/grpc/issues/19734). + ## Related Issues * https://github.com/grpc/grpc/issues/6046 * https://github.com/grpc/grpc/issues/18376 +* https://github.com/grpc/grpc/projects/16 ## Implementation -* TBD +* TODO From cfcf86b9f5c2b1b93d0064ac0d6ddf3e5eb86c18 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 8 Aug 2019 16:25:07 -0700 Subject: [PATCH 10/28] Fix some typo --- L58-python-async-api.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index b85a87d89..4862beb70 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -527,11 +527,11 @@ On the other hand, there are actually one valid use case that we should keep abstract class -- interceptors. To be more specific, the following interfaces won't be replaced by concrete classes: -* grpc.ServerInterceptor -* grpc.UnaryUnaryClientInterceptor -* grpc.UnaryStreamClientInterceptor -* grpc.StreamUnaryClientInterceptor -* grpc.StreamStreamClientInterceptor +* `grpc.ServerInterceptor` +* `grpc.UnaryUnaryClientInterceptor` +* `grpc.UnaryStreamClientInterceptor` +* `grpc.StreamUnaryClientInterceptor` +* `grpc.StreamStreamClientInterceptor` ## Rationale From 4cce83379d4e05372e0ba73935baffb4c8409235 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 1 Oct 2019 14:24:15 -0700 Subject: [PATCH 11/28] Summarize discussion and adding concensus to the proposal * Explicitly define the surface API interfaces * Add new streaming API pattern * Add context propagation feature * Add typing feature * Add more rationale about two stacks --- L58-python-async-api.md | 1287 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 1203 insertions(+), 84 deletions(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 4862beb70..9c4857fb1 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -4,8 +4,11 @@ Async API for gRPC Python * Approver: gnossen * Status: In Review * Implemented in: Python -* Last updated: 2019-08-07 -* Discussion at: https://groups.google.com/forum/#!topic/grpc-io/7V7HYM_aph4 +* Last updated: 2019-10-01 +* Discussions at: + * https://groups.google.com/forum/#!topic/grpc-io/7V7HYM_aph4 + * https://github.com/lidizheng/grpc-api-examples/pull/1 + * https://github.com/grpc/proposal/pull/155 ## Abstract @@ -139,13 +142,19 @@ gRPC Python programming in Python 3 runtime. ### Two Sets of API In One Package +`asyncio` is great, but not silver bullet to solve everything. It has its +limitations and unique advantages. gRPC Python as a framework should empower +tech-savvy users to use the cutting-edge features, in the same time as we allow +majority of our users to code in the way they familier. + The new API will be isolated from the current API. The implementation of the new API is an entirely different stack than the current stack. On the downside, most gRPC objects and definitions can't be shared between these two sets of API. After all, it is not a good practice to introduce uncertainty (return coroutine or regular object) to our API. This decision is made to respect our contract of API since GA in 2016, including the underlying behaviors. **Developers who use -our current API should not be affected by this effort.** +our current API should not be affected by this effort.** We plan to support two +stacks in the long-run. For users who want to migrate to new API, the granularity migration is per channel, per server level. For both channel-side and server-side, the adoption @@ -171,14 +180,8 @@ channel = grpc.aio.secure_channel(...) channel = grpc.aio.intercept_channel(...) ``` -To reduce cognitive burden, the new `asyncio` API should share the same -parameters as the current API, and keep the usage similar, except the following -cases: - -* `grpc.aio.server` no longer require the application to provide a - `ThreadPoolExecutor`; -* Interfaces returning `Future` object are replaced. -* Client RPC invocation merges from `__call__`, `with_call` and `futures` into one. +To reduce cognitive burden, this gRFC tries to make the new async API share the same +parameters as the current API, and keep the usage similar. ### Demo Snippet of Async API @@ -186,7 +189,10 @@ Server side: ```Python class AsyncGreeter(helloworld_pb2_grpc.GreeterServicer): - async def SayHello(self, request, context): + async def SayHello(self, + request: helloworld_pb2_grpc.HelloRequest, + context: grpc.aio.ServicerContext + ) -> helloworld_pb2_grpc.HelloReply: await asyncio.sleep(1) return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name) @@ -207,88 +213,138 @@ async with grpc.aio.insecure_channel("localhost:50051") as channel: process(response) ``` -### Channel-Side +### New Streaming API -Changes in `grpc.aio.Channel`: -* The object returned by `unary_unary`, `unary_stream`, `stream_unary`, and - `stream_tream` will be `MultiCallable` objects in `grpc.aio`. +Existing streaming API has usability problem that its logic complexes user +application. For client side, it requires the request iterator to be defined +above the line of call, and the consumption of responses to be coded below the +line of invocation. So when the application wants to send request based on +received response, it would be quite challenging and usually involves +multi-threading, which is not optimal in Python world. -Changes in `grpc.aio.Call`: -* All methods return coroutine object; -* Implements `grpc.aio.Task` API for unary response; -* Implements asynchronous generator API for stream response; +For server side, the same problem remains for non-trivial streaming servicer +handlers. The `yield` syntax constraint the message sending should happen in the +handler itself, otherwise the user needs to code a dedicated iterator class. For +larger application, it would be a pain to create such iterator class for each +streaming handler. -Changes for `MultiCallable` classes in `grpc.aio`: -* Returns a `grpc.aio.Call` object; -* `grpc.aio.StreamUnaryMultiCallable` and `grpc.aio.StreamStreamMultiCallable` - takes in an asynchronous generator as `request_iterator` argument. +euroelessar@ points out several great reasons for the new streaming API: +* Enable `with` statement for streaming API, which allows clean resource management; +* Easier to buffer outbound messages when peer pushes back (e.g. `yield` will + stop the execution of current coroutine, but `call.write` can be scheduled in + event loop); +* Enable application to have both pending read and pending write in one + coroutine, and react on the first finisher (with iterator syntax this will be + quite hard to get it right). -### Server-Side +So, this gRFC introduces a new pattern of streaming API that reads/writes +message to peer with explicit call. -Changes in `grpc.aio.Server`: -* `add_generic_rpc_handlers` method accepts tuple of - `grpc.aio.GenericRpcHandler`; -* `wait_for_termination` method is a coroutine; -* `stop` returns a `asyncio.Event`. +#### Snippet For New Streaming API -Changes in `grpc.aio.HandlerCallDetails`: -* `invocation_metadata` method is a coroutine. +```Python +### Client side +with stub.StreamingHi() as streaming_call: + request = echo_pb2.EchoRequest(message="ping") + await streaming_call.write(request) + response = await streaming_call.read() + while response: # or response is not grpc.aio.EOF + process(response) + response = await streaming_call.read() + +### Server side +class AsyncGreeter(helloworld_pb2_grpc.GreeterServicer): -Changes in `grpc.aio.GenericRpcHandler`: -* `service` method takes in `grpc.aio.HandlerCallDetails`; -* `service` method returns a `grpc.aio.RpcMethodHandler`. + async def StreamingHi(self, + unused_request_iterator, + context: grpc.aio.ServicerContext + ) -> None: + bootstrap_request = await context.read() + initialize_environment(bootstrap_request) + while has_response(): + response = ... + await context.write(response) +``` -Changes in `grpc.aio.RpcMethodHandler`: -* All servicer handlers are coroutines; -* Deserializer / serializer remain normal Python functions. +#### Snippet For Current Streaming API -Changes in `grpc.ServicerContext`: -* `invocation_metadata` method returns a coroutine object; -* `send_initial_metadata` method is a coroutine and returns an `asyncio.Task` - object. +```Python +### Client side +class RequestIterator: -Changes in `grpc.aio.*_*_rpc_method_handler`: -* Accepts a coroutine; -* Returns a `grpc.aio.RpcMethodHandler`. + def __init__(self): + self._queue = asyncio.Queue() -Changes in `grpc.aio.method_handlers_generic_handler`: -* `method_handlers` is a dictionary that maps method names to corresponding - `grpc.aio.RpcMethodHandler`; -* Returns `grpc.aio.GenericRpcHandler`. + async def send(self, message: HelloRequest): + await self._queue.put(message) -### Interceptor + def __aiter__(self) -> AsyncIterable[HelloRequest]: + return self -Changes in `grpc.aio.ServerInterceptor`: -* Accepts `grpc.aio.HandlerCallDetails`; -* `continuation` is a function accepts `grpc.aio.HandlerCallDetails` and returns - `grpc.aio.RpcMethodHandler`. + async def __anext__(self) -> HelloRequest: + return await self._queue.get(block=True) -Changes in `ClientInterceptor` classes in `grpc.aio`: -* `continuation` is a coroutine that returns `Awaitable` object. +request_iterator = RequestIterator() +# No await needed, the response_iterator is grpc.aio.Call +response_iterator = stub.StreamingHi(request_iterator) -### Utility Functions +# In sending coroutine +await request_iterator.send(proto_message) -Changes in `grpc.aio.channel_ready_future`: -* Renamed into `grpc.aio.channel_ready`; -* Accepts a `grpc.aio.Channel`; -* Returns a coroutine object. +# In receiving coroutine +async for response in response_iterator: + process(response) -```Python -async with grpc.aio.insecure_channel(...) as channel: - await grpc.aio.channel_ready(channel) - ... +### Server side +class ResponseIterator: + + def __init__(self): + self._queue = asyncio.Queue() + + async def send(self, message: HelloReply): + await self._queue.put(message) + + def __aiter__(self) -> AsyncIterable[HelloReply]: + return self + + async def __anext__(self) -> HelloReply: + return await self._queue.get() + +async def streaming_hi_worker( + request_iterator: AsyncIterable[HelloRequest], + response_iterator: ResponseIterator + ) -> None: + async for request in request_iterator: + if request.needs_respond: + await response_iterator.send(response) + +class Greeter(helloworld_pb2_grpc.GreeterServicer): + async def StreamingHi(self, + request_iterator: AsyncIterable[HelloRequest], + context: grpc.aio.ServicerContext + ) -> AsyncIterable[HelloReply]: + response_iterator = ResponseIterator() + # Handle the write in another coroutine + asyncio.get_event_loop.create_task(streaming_hi_worker(request_iterator, response_iterator)) + return response_iterator ``` -### Shared APIs +#### Co-existence Of New And Current Streaming API -APIs in the following categories remain in top level: -* Credentials related classes including for channel, call, and server; -* Channel connectivity Enum class; -* Status code Enum class; -* Compression method Enum class; -* `grpc.RpcError` exception class; -* `grpc.RpcContext` class; -* `grpc.ServiceRpcHandler` class; +Existing API still has advantage over simpler use cases of gRPC, and iterator +syntax is Pythonic to use. Also, for backward compatibility concern, +iterator-based API needs to stay. So, both new and current streaming API needs +to be included in the surface API. + +To keep the function signature stable, the new streaming API only requires +enhancement to the context object (`grpc.aio.Call` on client-side, and +`grpc.aio.ServicerContext` on the server-side). + +Notably, both new and current streaming API allows read and write messages +simultaneously in different thread or coroutine. However, **the application should** +**not mixing new and current streaming API at the same time**. Reading or writing to +the iterator and context object might have synchronization issue that the +framework does not guaranteed to protect against. ### Re-Use ProtoBuf Generated Code For Services @@ -314,8 +370,6 @@ we respect the method name in our API contract. The async API related classes and functions, including shared APIs, should be generated on the same page. -## Controversial Details - ### Unified Stub Call Currently, for RPC invocations, gRPC Python provides 3 options: `__call__`, @@ -386,7 +440,7 @@ However, by supporting the executors, we can **allow mixing async and sync** **method handlers** on the server side, which further reduce the cost of migration. -### Generator Or Asynchronous Generator? +### Using Asynchronous Generator For streaming calls, the requests on the client-side are supplied by generator for now. If a user wants to provide a pre-defined list of request messages, they @@ -419,7 +473,7 @@ stub.SayHelloStreaming(AsyncIter([ ])) ``` -### Special Async Functions Name +### No Special Async Functions Naming Pattern Fire-and-forget is a valid use case in async programming. For the non-critical tasks, the program may schedule the execution without checking the result. As @@ -453,7 +507,7 @@ coroutine object gets deallocated without execution, the interpreter will log an RuntimeWarning: coroutine '...' was never awaited ``` -## Story For Testing +### Story For Testing For new `asyncio` related behavior, we will write unit tests dedicated to the new stack. However, currently, there are more than a hundred test cases in gRPC @@ -468,7 +522,7 @@ manual labor may still be required for this approach. We should compare the cost of manual refactoring and wrapper when we want to reuse the existing test cases. -## Other Official Packages +### Other Official Packages Besides `grpcio` package, currently gRPC Python also own: @@ -484,7 +538,7 @@ one gRPC service implementation. They will also require migration to adopt `asyncio`. This design doc propose to keep their current API untouched, and add new sets of async APIs. -## Flow Control Enforcement +### Flow Control Enforcement To propagate HTTP/2 flow control push back, the new async API needs to aware of the flow control mechanism. Most complex logic is handled in C-Core, except @@ -499,7 +553,7 @@ So, even if all the read/write is asynchronous in `asyncio`, we will have to either enforce the rule ourselves by adding locks in our implementation. Or we can pass down the synchronization responsibility to our users. -## Concrete Class Instead of Interfaces +### Concrete Class Instead of "Interfaces" Interface is a design pattern that defines the contract of an entity that allows different implementation to work seamlessly in a system. In the past, gRPC @@ -533,11 +587,55 @@ won't be replaced by concrete classes: * `grpc.StreamUnaryClientInterceptor` * `grpc.StreamStreamClientInterceptor` +## Future Features + +### Implicit Context Propagation By `contextvars` + +`contextvars` is a Python 3.7 feature that allows applications to set coroutine +local variables (just like thread local variables). In the past, gRPC Python +uses thread local to achieve two goals: + +* Support distributed tracing library that work across languages; +* Support deadline propagation along the chain of RPCs. + +For distributed tracing library (like OpenCensus), the tracing information is +preserved as one of the metadata, and it is expected to be implicitly promoted +from inbound metadata to outbound metadata. The tracing information will be used +to monitor the life cycle of an request, and the length it has stayed in each +services. + +For deadline propagation, the deadline of upstream server will be implicitly +pass down to downstream server, so downstream services can react to that +information and save computation resources or perform flow control. + +Acceptence critiria of this feature: +* The implimentation of such feature should supported by official package, and +users are not expected to directly access those metadata; +* Application logic has higher priority than the implicit propagation (e.g. + setting timeout explicitly will override the propagated value); +* Application has the choice to diverse coroutine local variables; +* The exception error string should be informative (e.g. pointing out the + timeout is due to upstream deadline). + +Further discussion around this topic, please see related sections under [Rationale]. + +### Introduce Typing To Generated Code + +Typing makes a difference. This gRFC intended to introduce typing to gRPC Python +as much as we can. To change generated code, we need to update the gRPC protoc +plugin to generate Python 3 stubs and servicers. To date, **this gRFC intended to** +**make the new async API compatible with existing generated code**. So, the Python 3 +generated code will have two options: + +1. Add a third generation mode to protoc plugin that generates stubs / servicers + with typing; +2. Generating an additional file for the typed code. + ## Rationale ### Compatibility With Other Asynchronous Libraries -By other asynchronous libraries, I meant libraries that provide their own +By other asynchronous libraries, they mean libraries that provide their own Future, Coroutine, Event-loops, like `gevent`/`Twisted`/`Tornado`. In general, it is challenging to support multiple async mechanisms inter-operate together, and it has to be discussed case by case. @@ -563,11 +661,1032 @@ other out of box. 3) Also, the current contract of API constraint ourselves from changing their behavior after GA for two years. +### Explicit Context vs. Implicit Context + +TBD + +## API Interfaces + +### Channel-Side + +```Python +# grpc.aio.Channel +class Channel: + """Affords RPC invocation via generic methods on client-side. + + Channel objects implement the Async Context Manager type, although they need + not support being entered and exited multiple times. + """ + + def subscribe(self, + callback: Callable[[ChannelConnectivity], None], + try_to_connect: bool=False) -> None: + """Subscribe to this Channel's connectivity state machine. + + A Channel may be in any of the states described by ChannelConnectivity. + This method allows application to monitor the state transitions. + The typical use case is to debug or gain better visibility into gRPC + runtime's state. + + Args: + callback: A callable to be invoked with ChannelConnectivity argument. + ChannelConnectivity describes current state of the channel. + The callable will be invoked immediately upon subscription + and again for every change to ChannelConnectivity until it + is unsubscribed or this Channel object goes out of scope. + try_to_connect: A boolean indicating whether or not this Channel + should attempt to connect immediately. If set to False, gRPC + runtime decides when to connect. + """ + + def unsubscribe(self, + callback: Callable[[ChannelConnectivity], None]) -> None: + """Unsubscribes a subscribed callback from this Channel's connectivity. + + Args: + callback: A callable previously registered with this Channel from + having been passed to its "subscribe" method. + """ + + def unary_unary(self, + method: Text, + request_serializer: Optional[Callable[[Any], bytes]]=None, + response_deserializer: Optional[Callable[[bytes], Any]]=None + ) -> grpc.aio.UnaryUnaryMultiCallable: + """Creates a UnaryUnaryMultiCallable for a unary call method. + + Args: + method: The name of the RPC method. + request_serializer: Optional behaviour for serializing the request + message. Request goes unserialized in case None is passed. + response_deserializer: Optional behaviour for deserializing the + response message. Response goes undeserialized in case None + is passed. + + Returns: + A UnaryUnaryMultiCallable value for the named unary call method. + """ + + def unary_stream(self, + method: Text, + request_serializer: Optional[Callable[[Any], bytes]]=None, + response_deserializer: Optional[Callable[[bytes], Any]]=None + ) -> grpc.aio.UnaryStreamMultiCallable: + """Creates a UnaryStreamMultiCallable for a server streaming method. + + Args: + method: The name of the RPC method. + request_serializer: Optional behaviour for serializing the request + message. Request goes unserialized in case None is passed. + response_deserializer: Optional behaviour for deserializing the + response message. Response goes undeserialized in case None is + passed. + + Returns: + A UnaryStreamMultiCallable value for the name server streaming method. + """ + + def stream_unary(self, + method: Text, + request_serializer: Optional[Callable[[Any], bytes]]=None, + response_deserializer: Optional[Callable[[bytes], Any]]=None + ) -> grpc.aio.StreamUnaryMultiCallable: + """Creates a StreamUnaryMultiCallable for a client streaming method. + + Args: + method: The name of the RPC method. + request_serializer: Optional behaviour for serializing the request + message. Request goes unserialized in case None is passed. + response_deserializer: Optional behaviour for deserializing the + response message. Response goes undeserialized in case None is + passed. + + Returns: + A StreamUnaryMultiCallable value for the named client streaming method. + """ + + def stream_stream(self, + method: Text, + request_serializer: Optional[Callable[[Any], bytes]]=None, + response_deserializer: Optional[Callable[[bytes], Any]]=None + ) -> grpc.aio.StreamStreamMultiCallable: + """Creates a StreamStreamMultiCallable for a bi-directional streaming method. + + Args: + method: The name of the RPC method. + request_serializer: Optional behaviour for serializing the request + message. Request goes unserialized in case None is passed. + response_deserializer: Optional behaviour for deserializing the + response message. Response goes undeserialized in case None + is passed. + + Returns: + A StreamStreamMultiCallable value for the named bi-directional streaming method. + """ + + def close(self) -> None: + """Closes this Channel and releases all resources held by it. + + Closing the Channel will immediately terminate all RPCs active with the + Channel and it is not valid to invoke new RPCs with the Channel. + + This method is idempotent. + """ +``` + +```Python +# grpc.aio.UnaryUnaryMultiCallable +class UnaryUnaryMultiCallable: + """Affords invoking an async unary RPC from client-side.""" + + def __call__(self, + request: Any, + timeout: Optional[int]=None, + metadata: Optional[Sequence[Tuple[Text, Text]]]=None, + credentials: Optional[grpc.CallCredentials]=None, + wait_for_ready: Optional[bool]=None, + compression: Optional[grpc.Compression]=None + ) -> grpc.aio.Call[Any]: + """Schedules the underlying RPC. + + Args: + request: The request value for the RPC. + timeout: An optional duration of time in seconds to allow + for the RPC. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. Only valid for + secure Channel. + wait_for_ready: This is an EXPERIMENTAL argument. An optional + flag to enable wait for ready mechanism + compression: An element of grpc.compression, e.g. + grpc.compression.Gzip. This is an EXPERIMENTAL option. + + Returns: + An awaitable object grpc.aio.Call that returns the response value. + + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also be a Call for the RPC affording the RPC's + metadata, status code, and details. + """ + +# grpc.aio.UnaryStreamMultiCallable +class UnaryStreamMultiCallable: + """Affords invoking an async server streaming RPC from client-side.""" + + def __call__(self, + request: Any, + timeout: Optional[int]=None, + metadata: Optional[Sequence[Tuple[Text, Text]]]=None, + credentials: Optional[grpc.CallCredentials]=None, + wait_for_ready: Optional[bool]=None, + compression: Optional[grpc.Compression]=None + ) -> grpc.aio.Call[AsyncIterable[Any]]: + """Schedules the underlying RPC. + + Args: + request: The request value for the RPC. + timeout: An optional duration of time in seconds to allow + for the RPC. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. Only valid for + secure Channel. + wait_for_ready: This is an EXPERIMENTAL argument. An optional + flag to enable wait for ready mechanism + compression: An element of grpc.compression, e.g. + grpc.compression.Gzip. This is an EXPERIMENTAL option. + + Returns: + An awaitable object grpc.aio.Call that returns the async iterator of response values. + + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also be a Call for the RPC affording the RPC's + metadata, status code, and details. + """ + +# grpc.aio.StreamUnaryMultiCallable +class StreamUnaryMultiCallable: + """Affords invoking an async client streaming RPC from client-side.""" + + def __call__(self, + request_iterator: Optional[AsyncIterable[Any]]=None, + timeout: Optional[int]=None, + metadata: Optional[Sequence[Tuple[Text, Text]]]=None, + credentials: Optional[grpc.CallCredentials]=None, + wait_for_ready: Optional[bool]=None, + compression: Optional[grpc.Compression]=None + ) -> grpc.aio.Call[Any]: + """Schedules the underlying RPC. + + Args: + request_iterator: An async iterator that yields request values for + the RPC. + timeout: An optional duration of time in seconds to allow + for the RPC. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. Only valid for + secure Channel. + wait_for_ready: This is an EXPERIMENTAL argument. An optional + flag to enable wait for ready mechanism + compression: An element of grpc.compression, e.g. + grpc.compression.Gzip. This is an EXPERIMENTAL option. + + Returns: + An awaitable object grpc.aio.Call presents the RPC. + + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also be a Call for the RPC affording the RPC's + metadata, status code, and details. + """ + +# grpc.aio.StreamStreamMultiCallable +class StreamStreamMultiCallable: + """Affords invoking an async bi-directional RPC from client-side.""" + + def __call__(self, + request_iterator: Optional[AsyncIterable[Any]]=None, + timeout: Optional[int]=None, + metadata: Optional[Sequence[Tuple[Text, Text]]]=None, + credentials: Optional[grpc.CallCredentials]=None, + wait_for_ready: Optional[bool]=None, + compression: Optional[grpc.Compression]=None + ) -> grpc.aio.Call[AsyncIterable[Any]]: + """Schedules the underlying RPC. + + Args: + request_iterator: An async iterator that yields request values for + the RPC. + timeout: An optional duration of time in seconds to allow + for the RPC. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. Only valid for + secure Channel. + wait_for_ready: This is an EXPERIMENTAL argument. An optional + flag to enable wait for ready mechanism + compression: An element of grpc.compression, e.g. + grpc.compression.Gzip. This is an EXPERIMENTAL option. + + Returns: + An awaitable object grpc.aio.Call that returns the async iterator of response values. + + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also be a Call for the RPC affording the RPC's + metadata, status code, and details. + """ +``` + +```Python +# grpc.aio.Call +class Call(typing.Awaitable[T], grpc.RpcContext): + """The representation of an RPC on the client-side.""" + + def is_active(self) -> bool: + """Describes whether the RPC is active or has terminated. + + Returns: + True if RPC is active, False otherwise. + """ + + def time_remaining(self) -> float: + """Describes the length of allowed time remaining for the RPC. + + Returns: + A nonnegative float indicating the length of allowed time in seconds + remaining for the RPC to complete before it is considered to have + timed out, or None if no deadline was specified for the RPC. + """ + + def cancel(self) -> None: + """Cancels the RPC. + + Idempotent and has no effect if the RPC has already terminated. + """ + + def add_callback(self, callback: Callable[None, None]) -> None: + """Registers a callback to be called on RPC termination. + + Args: + callback: A no-parameter callable to be called on RPC termination. + + Returns: + True if the callback was added and will be called later; False if + the callback was not added and will not be called (because the RPC + already terminated or some other reason). + """ + + async def initial_metadata(self) -> Sequence[Tuple[Text, Text]]: + """Accesses the initial metadata sent by the server. + + Coroutine continues once the value is available. + + Returns: + The initial :term:`metadata`. + """ + + async def trailing_metadata(self) -> Sequence[Tuple[Text, Text]]: + """Accesses the trailing metadata sent by the server. + + Coroutine continues once the value is available. + + Returns: + The trailing :term:`metadata`. + """ + + async def code(self) -> grpc.StatusCode: + """Accesses the status code sent by the server. + + Coroutine continues once the value is available. + + Returns: + The StatusCode value for the RPC. + """ + + async def details(self) -> Text: + """Accesses the details sent by the server. + + Coroutine continues once the value is available. + + Returns: + The details string of the RPC. + """ + + def __aiter__(self) -> AsyncIterable[Any]: + """Returns the async iterable representation that yields messages. + + Returns: + An async iterable object that yields messages. + """ + + async def read(self) -> Any: + """Reads one message from the RPC. + + Only one read operation is allowed simultaneously. Mixing new streaming API and old + streaming API will resulted in undefined behavior. + + Returns: + A response message of the RPC. + + Raises: + An RpcError exception if the read failed. + """ + + async def write(self, message: Any) -> None: + """Writes one message to the RPC. + + Only one write operation is allowed simultaneously. Mixing new streaming API and old + streaming API will resulted in undefined behavior. + + Raises: + An RpcError exception if the write failed. + """ +``` + +### Server-Side + +```Python +# grpc.aio.Server +class Server: + """Serves RPCs.""" + + def add_generic_rpc_handlers( + self, + generic_rpc_handlers: Iterable[grpc.aio.GenericRpcHandlers] + ) -> None: + """Registers GenericRpcHandlers with this Server. + + This method is only safe to call before the server is started. + + Args: + generic_rpc_handlers: An iterable of GenericRpcHandlers that will be + used to service RPCs. + """ + + def add_insecure_port(self, address: Text) -> int: + """Opens an insecure port for accepting RPCs. + + This method may only be called before starting the server. + + Args: + address: The address for which to open a port. If the port is 0, + or not specified in the address, then gRPC runtime will choose a port. + + Returns: + An integer port on which server will accept RPC requests. + """ + + def add_secure_port(self, + address: Text, + server_credentials: grpc.ServerCredentials) -> int: + """Opens a secure port for accepting RPCs. + + This method may only be called before starting the server. + + Args: + address: The address for which to open a port. + if the port is 0, or not specified in the address, then gRPC + runtime will choose a port. + server_credentials: A ServerCredentials object. + + Returns: + An integer port on which server will accept RPC requests. + """ + + def start(self) -> None: + """Starts this Server. + + This method may only be called once. (i.e. it is not idempotent). + """ + + def stop(self, grace: Optional[float]) -> asyncio.Event: + """Stops this Server. + + This method immediately stop service of new RPCs in all cases. + + If a grace period is specified, this method returns immediately + and all RPCs active at the end of the grace period are aborted. + If a grace period is not specified (by passing None for `grace`), + all existing RPCs are aborted immediately and this method + blocks until the last RPC handler terminates. + + This method is idempotent and may be called at any time. + Passing a smaller grace value in a subsequent call will have + the effect of stopping the Server sooner (passing None will + have the effect of stopping the server immediately). Passing + a larger grace value in a subsequent call *will not* have the + effect of stopping the server later (i.e. the most restrictive + grace value is used). + + Args: + grace: A duration of time in seconds or None. + + Returns: + A threading.Event that will be set when this Server has completely + stopped, i.e. when running RPCs either complete or are aborted and + all handlers have terminated. + """ + + async def wait_for_termination(self, timeout: Optional[float]=None) -> bool: + """Block current thread until the server stops. + + This is an EXPERIMENTAL API. + + The wait will not consume computational resources during blocking, and + it will block until one of the two following conditions are met: + + 1) The server is stopped or terminated; + 2) A timeout occurs if timeout is not `None`. + + The timeout argument works in the same way as `threading.Event.wait()`. + https://docs.python.org/3/library/threading.html#threading.Event.wait + + Args: + timeout: A floating point number specifying a timeout for the + operation in seconds. + + Returns: + A bool indicates if the operation times out. + """ +``` + +```Python +# grpc.aio.GenericRpcHandler +class GenericRpcHandler: + """An implementation of arbitrarily many RPC methods.""" + + def service(self, + handler_call_details: grpc.HandlerCallDetails + ) -> Union[grpc.RpcMethodHandler, grpc.aio.RpcMethodHandler, None]: + """Returns the handler for servicing the RPC. + + Args: + handler_call_details: A HandlerCallDetails describing the RPC. + + Returns: + An grpc.RpcMethodHandler if the handler is a normal Python function; + or an grpc.aio.RpcMethodHandler if the handler is a coroutine; + otherwise, None. + """ +``` + +```Python +# grpc.aio.RpcMethodHandler +class RpcMethodHandler: + """An implementation of a single RPC method. + + Attributes: + request_streaming: Whether the RPC supports exactly one request message + or any arbitrary number of request messages. + response_streaming: Whether the RPC supports exactly one response message + or any arbitrary number of response messages. + request_deserializer: A callable behavior that accepts a byte string and + returns an object suitable to be passed to this object's business + logic, or None to indicate that this object's business logic should be + passed the raw request bytes. + response_serializer: A callable behavior that accepts an object produced + by this object's business logic and returns a byte string, or None to + indicate that the byte strings produced by this object's business logic + should be transmitted on the wire as they are. + unary_unary: This object's application-specific business logic as a + callable value that takes a request value and a ServicerContext object + and returns a response value. Only non-None if both request_streaming + and response_streaming are False. + unary_stream: This object's application-specific business logic as a + callable value that takes a request value and a ServicerContext object + and returns an iterator of response values. Only non-None if + request_streaming is False and response_streaming is True. + stream_unary: This object's application-specific business logic as a + callable value that takes an iterator of request values and a + ServicerContext object and returns a response value. Only non-None if + request_streaming is True and response_streaming is False. + stream_stream: This object's application-specific business logic as a + callable value that takes an iterator of request values and a + ServicerContext object and returns an iterator of response values. + Only non-None if request_streaming and response_streaming are both + True. + """ + request_streaming: bool + response_streaming: bool + request_deserializer: Optional[Callable[[bytes], Any]] + response_serializer: Optional[Callable[[Any], bytes]] + unary_unary: Optional[Callable[[Any, grpc.aio.ServicerContext], Any]] + unary_stream: Optional[Callable[[Any, grpc.aio.ServicerContext], Optional[AsyncIterable[Any]]]] + stream_unary: Optional[Callable[[AsyncIterable[Any], grpc.aio.ServicerContext], Any]] + stream_stream: Optional[Callable[[AsyncIterable[Any], grpc.aio.ServicerContext], Optional[AsyncIterable[Any]]]] +``` + +```Python +# grpc.aio.ServicerContext +class ServicerContext(grpc.RpcContext): + """A context object passed to method implementations.""" + + def invocation_metadata(self) -> Optional[Sequence[Tuple[Text, Text]]]: + """Accesses the metadata from the sent by the client. + + Returns: + The invocation :term:`metadata`. + """ + + def peer(self) -> Text: + """Identifies the peer that invoked the RPC being serviced. + + Returns: + A string identifying the peer that invoked the RPC being serviced. + The string format is determined by gRPC runtime. + """ + + def peer_identities(self) -> Iterable[Text]: + """Gets one or more peer identity(s). + + Equivalent to + servicer_context.auth_context().get(servicer_context.peer_identity_key()) + + Returns: + An iterable of the identities, or None if the call is not + authenticated. Each identity is returned as a raw bytes type. + """ + + def peer_identity_key(self) -> Text: + """The auth property used to identify the peer. + + For example, "x509_common_name" or "x509_subject_alternative_name" are + used to identify an SSL peer. + + Returns: + The auth property (string) that indicates the + peer identity, or None if the call is not authenticated. + """ + + def auth_context(self) -> Mapping[Text, Iterable[bytes]]: + """Gets the auth context for the call. + + Returns: + A map of strings to an iterable of bytes for each auth property. + """ + + def set_compression(self, compression: grpc.Compression) -> None: + """Set the compression algorithm to be used for the entire call. + + This is an EXPERIMENTAL method. + + Args: + compression: An element of grpc.compression, e.g. + grpc.compression.Gzip. + """ + + async def send_initial_metadata(self, initial_metadata: Sequence[Tuple[Text, Text]]) -> None: + """Sends the initial metadata value to the client. + + This method need not be called by implementations if they have no + metadata to add to what the gRPC runtime will transmit. + + Args: + initial_metadata: The initial :term:`metadata`. + """ + + async def set_trailing_metadata(self, trailing_metadata: Sequence[Tuple[Text, Text]]) -> None: + """Sends the trailing metadata for the RPC. + + This method need not be called by implementations if they have no + metadata to add to what the gRPC runtime will transmit. + + Args: + trailing_metadata: The trailing :term:`metadata`. + """ + + def abort(self, code: grpc.StatusCode, details: Text) -> NoReturn: + """Raises an exception to terminate the RPC with a non-OK status. + + The code and details passed as arguments will supercede any existing + ones. + + Args: + code: A StatusCode object to be sent to the client. + It must not be StatusCode.OK. + details: A UTF-8-encodable string to be sent to the client upon + termination of the RPC. + + Raises: + Exception: An exception is always raised to signal the abortion the + RPC to the gRPC runtime. + """ + + def abort_with_status(self, status: grpc.Status) -> NoReturn: + """Raises an exception to terminate the RPC with a non-OK status. + + The status passed as argument will supercede any existing status code, + status message and trailing metadata. + + This is an EXPERIMENTAL API. + + Args: + status: A grpc.Status object. The status code in it must not be + StatusCode.OK. + + Raises: + Exception: An exception is always raised to signal the abortion the + RPC to the gRPC runtime. + """ + + def set_code(self, code: grpc.StatusCode) -> None: + """Sets the value to be used as status code upon RPC completion. + + This method need not be called by method implementations if they wish + the gRPC runtime to determine the status code of the RPC. + + Args: + code: A StatusCode object to be sent to the client. + """ + + def set_details(self, details: Text) -> None: + """Sets the value to be used as detail string upon RPC completion. + + This method need not be called by method implementations if they have + no details to transmit. + + Args: + details: A UTF-8-encodable string to be sent to the client upon + termination of the RPC. + """ + + def disable_next_message_compression(self) -> None: + """Disables compression for the next response message. + + This is an EXPERIMENTAL method. + + This method will override any compression configuration set during + server creation or set on the call. + """ + + async def read(self) -> Any: + """Reads one message from the RPC. + + Only one read operation is allowed simultaneously. Mixing new streaming API and old + streaming API will resulted in undefined behavior. + + Returns: + A response message of the RPC. + + Raises: + An RpcError exception if the read failed. + """ + + async def write(self, message: Any) -> None: + """Writes one message to the RPC. + + Only one write operation is allowed simultaneously. Mixing new streaming API and old + streaming API will resulted in undefined behavior. + + Raises: + An RpcError exception if the write failed. + """ +``` + +```Python +# grpc.aio.unary_unary_rpc_method_handler +def unary_unary_rpc_method_handler(behavior: Callable[[Any, grpc.aio.ServicerContext], Any], + request_deserializer: Optional[Callable[[bytes], Any]]=None, + response_serializer: Optional[Callable[[Any], bytes]]=None + ) -> grpc.aio.RpcMethodHandler: + """Creates an RpcMethodHandler for a unary-unary RPC method. + + Args: + behavior: The implementation of an RPC that accepts one request + and returns one response. + request_deserializer: An optional behavior for request deserialization. + response_serializer: An optional behavior for response serialization. + + Returns: + An RpcMethodHandler object that is typically used by grpc.Server. + """ + + +# grpc.aio.unary_stream_rpc_method_handler +def unary_stream_rpc_method_handler(behavior: Callable[[Any, grpc.aio.ServicerContext], Optional[AsyncIterable[Any]]], + request_deserializer: Optional[Callable[[bytes], Any]]=None, + response_serializer: Optional[Callable[[Any], bytes]]=None + ) -> grpc.aio.RpcMethodHandler: + """Creates an RpcMethodHandler for a unary-stream RPC method. + + Args: + behavior: The implementation of an RPC that accepts one request + and returns an iterator of response values. + request_deserializer: An optional behavior for request deserialization. + response_serializer: An optional behavior for response serialization. + + Returns: + An RpcMethodHandler object that is typically used by grpc.Server. + """ + + +# grpc.aio.stream_unary_rpc_method_handler +def stream_unary_rpc_method_handler(behavior: Callable[[AsyncIterable[Any], grpc.aio.ServicerContext], Any], + request_deserializer: Optional[Callable[[bytes], Any]]=None, + response_serializer: Optional[Callable[[Any], bytes]]=None + ) -> grpc.aio.RpcMethodHandler: + """Creates an RpcMethodHandler for a stream-unary RPC method. + + Args: + behavior: The implementation of an RPC that accepts an iterator of + request values and returns a single response value. + request_deserializer: An optional behavior for request deserialization. + response_serializer: An optional behavior for response serialization. + + Returns: + An RpcMethodHandler object that is typically used by grpc.Server. + """ + + +# grpc.aio.stream_stream_rpc_method_handler +def stream_stream_rpc_method_handler(behavior: Callable[[AsyncIterable[Any], grpc.aio.ServicerContext], Optional[AsyncIterable[Any]]], + request_deserializer: Optional[Callable[[bytes], Any]]=None, + response_serializer: Optional[Callable[[Any], bytes]]=None + ) -> grpc.aio.RpcMethodHandler: + """Creates an RpcMethodHandler for a stream-stream RPC method. + + Args: + behavior: The implementation of an RPC that accepts an iterator of + request values and returns an iterator of response values. + request_deserializer: An optional behavior for request deserialization. + response_serializer: An optional behavior for response serialization. + + Returns: + An RpcMethodHandler object that is typically used by grpc.Server. + """ +``` + +```Python +# grpc.aio.method_handlers_generic_handler +def method_handlers_generic_handler(service: Text, + method_handlers: Mapping[Text, grpc.aio.RpcMethodHandler] + ) -> grpc.aio.GenericRpcHandler: + """Creates a GenericRpcHandler from RpcMethodHandlers. + + Args: + service: The name of the service that is implemented by the + method_handlers. + method_handlers: A dictionary that maps method names to corresponding + RpcMethodHandler. + + Returns: + A GenericRpcHandler. This is typically added to the grpc.Server object + with add_generic_rpc_handlers() before starting the server. + """ +``` + +### Server Interceptor + +```Python +# grpc.aio.ServerInterceptor +class ServerInterceptor: + """Affords intercepting incoming RPCs on the service-side. + + This is an EXPERIMENTAL API. + """ + + def intercept_service(self, + continuation: Callable[[grpc.HandlerCallDetails], grpc.aio.RpcMethodHandler], + handler_call_details: grpc.HandlerCallDetails + ) -> grpc.aio.RpcMethodHandler: + """Intercepts incoming RPCs before handing them over to a handler. + + Args: + continuation: A function that takes a HandlerCallDetails and + proceeds to invoke the next interceptor in the chain, if any, + or the RPC handler lookup logic, with the call details passed + as an argument, and returns an RpcMethodHandler instance if + the RPC is considered serviced, or None otherwise. + handler_call_details: A HandlerCallDetails describing the RPC. + + Returns: + An RpcMethodHandler with which the RPC may be serviced if the + interceptor chooses to service this RPC, or None otherwise. + """ +``` + +### Client Interceptor + +```Python +# grpc.aio.UnaryUnaryClientInterceptor +class UnaryUnaryClientInterceptor: + """Affords intercepting unary-unary invocations.""" + + def intercept_unary_unary(self, + continuation: Callable[[grpc.ClientCallDetails, Any], grpc.aio.Call[Any]], + client_call_details: grpc.ClientCallDetails, + request: Any + ) -> grpc.aio.Call[Any]: + """Intercepts a unary-unary invocation asynchronously. + + Args: + continuation: A function that proceeds with the invocation by + executing the next interceptor in chain or invoking the + actual RPC on the underlying Channel. It is the interceptor's + responsibility to call it if it decides to move the RPC forward. + The interceptor can use + `response_future = continuation(client_call_details, request)` + to continue with the RPC. `continuation` returns an object that is + both a Call for the RPC and a Future. In the event of RPC + completion, the return Call-Future's result value will be + the response message of the RPC. Should the event terminate + with non-OK status, the returned Call-Future's exception value + will be an RpcError. + client_call_details: A ClientCallDetails object describing the + outgoing RPC. + request: The request value for the RPC. + + Returns: + An object that is both a Call for the RPC and a Future. + In the event of RPC completion, the return Call-Future's + result value will be the response message of the RPC. + Should the event terminate with non-OK status, the returned + Call-Future's exception value will be an RpcError. + """ + raise NotImplementedError() + + +# grpc.aio.UnaryStreamClientInterceptor +class UnaryStreamClientInterceptor: + """Affords intercepting unary-stream invocations.""" + + def intercept_unary_stream(self, + continuation: Callable[[grpc.ClientCallDetails, Any], grpc.aio.Call[AsyncIterable[Any]]], + client_call_details: grpc.ClientCallDetails, + request: Any + ) -> grpc.aio.Call[AsyncIterable[Any]]: + """Intercepts a unary-stream invocation. + + Args: + continuation: A function that proceeds with the invocation by + executing the next interceptor in chain or invoking the + actual RPC on the underlying Channel. It is the interceptor's + responsibility to call it if it decides to move the RPC forward. + The interceptor can use + `response_iterator = continuation(client_call_details, request)` + to continue with the RPC. `continuation` returns an object that is + both a Call for the RPC and an iterator for response values. + Drawing response values from the returned Call-iterator may + raise RpcError indicating termination of the RPC with non-OK + status. + client_call_details: A ClientCallDetails object describing the + outgoing RPC. + request: The request value for the RPC. + + Returns: + An object that is both a Call for the RPC and an iterator of + response values. Drawing response values from the returned + Call-iterator may raise RpcError indicating termination of + the RPC with non-OK status. + """ + raise NotImplementedError() + + +# grpc.aio.StreamUnaryClientInterceptor +class StreamUnaryClientInterceptor: + """Affords intercepting stream-unary invocations.""" + + def intercept_stream_unary(self, + continuation: Callable[[grpc.ClientCallDetails, Optional[AsyncIterable[Any]]], grpc.aio.Call[Any]], + client_call_details: grpc.ClientCallDetails, + request_iterator: AsyncIterable[Any] + ) -> grpc.aio.Call[Any]: + """Intercepts a stream-unary invocation asynchronously. + + Args: + continuation: A function that proceeds with the invocation by + executing the next interceptor in chain or invoking the + actual RPC on the underlying Channel. It is the interceptor's + responsibility to call it if it decides to move the RPC forward. + The interceptor can use + `response_future = continuation(client_call_details, request_iterator)` + to continue with the RPC. `continuation` returns an object that is + both a Call for the RPC and a Future. In the event of RPC completion, + the return Call-Future's result value will be the response message + of the RPC. Should the event terminate with non-OK status, the + returned Call-Future's exception value will be an RpcError. + client_call_details: A ClientCallDetails object describing the + outgoing RPC. + request_iterator: An iterator that yields request values for the RPC. + + Returns: + An object that is both a Call for the RPC and a Future. + In the event of RPC completion, the return Call-Future's + result value will be the response message of the RPC. + Should the event terminate with non-OK status, the returned + Call-Future's exception value will be an RpcError. + """ + + +# grpc.aio.StreamStreamClientInterceptor +class StreamStreamClientInterceptor: + """Affords intercepting stream-stream invocations.""" + + def intercept_stream_stream(self, + continuation: Callable[[grpc.ClientCallDetails, Optional[AsyncIterable[Any]]], grpc.aio.Call[AsyncIterable[Any]]], + client_call_details: grpc.ClientCallDetails, + request_iterator: AsyncIterable[Any] + ) -> grpc.aio.Call[AsyncIterable[Any]]: + """Intercepts a stream-stream invocation. + + Args: + continuation: A function that proceeds with the invocation by + executing the next interceptor in chain or invoking the + actual RPC on the underlying Channel. It is the interceptor's + responsibility to call it if it decides to move the RPC forward. + The interceptor can use + `response_iterator = continuation(client_call_details, request_iterator)` + to continue with the RPC. `continuation` returns an object that is + both a Call for the RPC and an iterator for response values. + Drawing response values from the returned Call-iterator may + raise RpcError indicating termination of the RPC with non-OK + status. + client_call_details: A ClientCallDetails object describing the + outgoing RPC. + request_iterator: An iterator that yields request values for the RPC. + + Returns: + An object that is both a Call for the RPC and an iterator of + response values. Drawing response values from the returned + Call-iterator may raise RpcError indicating termination of + the RPC with non-OK status. + """ +``` + +### Utility Functions + +```Python +# grpc.aio.channel_ready +async def channel_ready(channel: grpc.aio.Channel) -> None: + """Creates a coroutine that ends when a Channel is ready. + + Args: + channel: A Channel object. + """ +``` + +### Shared APIs + +APIs in the following categories remain in top level: +* Credentials related classes including for channel, call, and server; +* Channel connectivity Enum class; +* Status code Enum class; +* Compression method Enum class; +* `grpc.RpcError` exception class; +* `grpc.RpcContext` class; +* `grpc.ClientCallDetails` class; +* `grpc.HandlerCallDetails` class; + + ## Pending Topics Reviewers has thrown many valuable proposals. This design doc may not be the ideal place for those discussions. -* Re-design streaming API on the server side to replace the iterator pattern. * Re-design the connectivity API to be consistent with C-Core. * Design a easy way to use channel arguments [grpc#19734](https://github.com/grpc/grpc/issues/19734). From dd7bc71ebf488ac4c9e133424c9030d1596f8336 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 1 Oct 2019 14:33:19 -0700 Subject: [PATCH 12/28] Add definition of grpc.aio.EOF --- L58-python-async-api.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 9c4857fb1..ad01ff1b6 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -1660,6 +1660,11 @@ class StreamStreamClientInterceptor: ### Utility Functions +```Python +# grpc.aio.EOF is a unique object per process that evaluates to False +(grpc.aio.EOF or False) == False +``` + ```Python # grpc.aio.channel_ready async def channel_ready(channel: grpc.aio.Channel) -> None: From 6c7c12a4949a85bf65587ac584e8b6e68fce994b Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 1 Oct 2019 18:02:23 -0700 Subject: [PATCH 13/28] Expand the definition of EOF --- L58-python-async-api.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index ad01ff1b6..8442ef7ba 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -1662,7 +1662,11 @@ class StreamStreamClientInterceptor: ```Python # grpc.aio.EOF is a unique object per process that evaluates to False -(grpc.aio.EOF or False) == False +class _EOF: + def __bool__(self): + return False + +EOF = _EOF() ``` ```Python From 4f37444d2bf2ff4e66afc50ce78524209ffb382d Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 2 Oct 2019 14:01:26 -0700 Subject: [PATCH 14/28] Fix the function signature of interceptors to be async --- L58-python-async-api.md | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 8442ef7ba..9c72f0b9b 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -1490,7 +1490,7 @@ class ServerInterceptor: This is an EXPERIMENTAL API. """ - def intercept_service(self, + async def intercept_service(self, continuation: Callable[[grpc.HandlerCallDetails], grpc.aio.RpcMethodHandler], handler_call_details: grpc.HandlerCallDetails ) -> grpc.aio.RpcMethodHandler: @@ -1517,7 +1517,7 @@ class ServerInterceptor: class UnaryUnaryClientInterceptor: """Affords intercepting unary-unary invocations.""" - def intercept_unary_unary(self, + async def intercept_unary_unary(self, continuation: Callable[[grpc.ClientCallDetails, Any], grpc.aio.Call[Any]], client_call_details: grpc.ClientCallDetails, request: Any @@ -1548,14 +1548,13 @@ class UnaryUnaryClientInterceptor: Should the event terminate with non-OK status, the returned Call-Future's exception value will be an RpcError. """ - raise NotImplementedError() # grpc.aio.UnaryStreamClientInterceptor class UnaryStreamClientInterceptor: """Affords intercepting unary-stream invocations.""" - def intercept_unary_stream(self, + async def intercept_unary_stream(self, continuation: Callable[[grpc.ClientCallDetails, Any], grpc.aio.Call[AsyncIterable[Any]]], client_call_details: grpc.ClientCallDetails, request: Any @@ -1584,14 +1583,13 @@ class UnaryStreamClientInterceptor: Call-iterator may raise RpcError indicating termination of the RPC with non-OK status. """ - raise NotImplementedError() # grpc.aio.StreamUnaryClientInterceptor class StreamUnaryClientInterceptor: """Affords intercepting stream-unary invocations.""" - def intercept_stream_unary(self, + async def intercept_stream_unary(self, continuation: Callable[[grpc.ClientCallDetails, Optional[AsyncIterable[Any]]], grpc.aio.Call[Any]], client_call_details: grpc.ClientCallDetails, request_iterator: AsyncIterable[Any] @@ -1627,7 +1625,7 @@ class StreamUnaryClientInterceptor: class StreamStreamClientInterceptor: """Affords intercepting stream-stream invocations.""" - def intercept_stream_stream(self, + async def intercept_stream_stream(self, continuation: Callable[[grpc.ClientCallDetails, Optional[AsyncIterable[Any]]], grpc.aio.Call[AsyncIterable[Any]]], client_call_details: grpc.ClientCallDetails, request_iterator: AsyncIterable[Any] From 6bd2c9cca59b8b921419a2f81f4e67afcc6087c7 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 2 Oct 2019 16:27:02 -0700 Subject: [PATCH 15/28] Fix a spelling error --- L58-python-async-api.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 9c72f0b9b..46b1331da 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -609,7 +609,7 @@ pass down to downstream server, so downstream services can react to that information and save computation resources or perform flow control. Acceptence critiria of this feature: -* The implimentation of such feature should supported by official package, and +* The implementation of such feature should supported by official package, and users are not expected to directly access those metadata; * Application logic has higher priority than the implicit propagation (e.g. setting timeout explicitly will override the propagated value); From 7f5b62562c3f21472c5b9f9b86607503d6dae635 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 2 Oct 2019 16:39:54 -0700 Subject: [PATCH 16/28] Add check_connectivity_state and watch_connectivity_state --- L58-python-async-api.md | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 46b1331da..551721c17 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -707,6 +707,37 @@ class Channel: callback: A callable previously registered with this Channel from having been passed to its "subscribe" method. """ + + def check_connectivity_state(self, try_to_connect: bool=False) -> grpc.ChannelConnectivity: + """Check the connectivity state of a channel. + + This is an EXPERIMENTAL API. + + Args: + try_to_connect: a bool indicate whether the Channel should try to connect to peer or not. + + Returns: + A ChannelConnectivity object. + """ + + async def watch_connectivity_state(self, + last_observed_state: grpc.ChannelConnectivity, + timeout_seconds: float) -> Optional[grpc.ChannelConnectivity]: + """Watch for a change in connectivity state. + + This is an EXPERIMENTAL API. + + Once the channel connectivity state is different from + last_observed_state, the function will return the new connectivity + state. If deadline expires BEFORE the state is changed, None will be + returned. + + Args: + try_to_connect: a bool indicate whether the Channel should try to connect to peer or not. + + Returns: + A ChannelConnectivity object or None. + """ def unary_unary(self, method: Text, From 2b8dd48fcd04c7bd8455858f2b59590dc22ac73c Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 3 Oct 2019 17:08:28 -0700 Subject: [PATCH 17/28] Fix typing in API interfaces * Use Request, Response type vars instead of Any * Use AnyStr instead of Text --- L58-python-async-api.md | 150 ++++++++++++++++++++-------------------- 1 file changed, 75 insertions(+), 75 deletions(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 551721c17..3819650b2 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -741,9 +741,9 @@ class Channel: def unary_unary(self, method: Text, - request_serializer: Optional[Callable[[Any], bytes]]=None, - response_deserializer: Optional[Callable[[bytes], Any]]=None - ) -> grpc.aio.UnaryUnaryMultiCallable: + request_serializer: Optional[Callable[[Request], bytes]]=None, + response_deserializer: Optional[Callable[[bytes], Response]]=None + ) -> grpc.aio.UnaryUnaryMultiCallable[Request, Response]: """Creates a UnaryUnaryMultiCallable for a unary call method. Args: @@ -760,9 +760,9 @@ class Channel: def unary_stream(self, method: Text, - request_serializer: Optional[Callable[[Any], bytes]]=None, - response_deserializer: Optional[Callable[[bytes], Any]]=None - ) -> grpc.aio.UnaryStreamMultiCallable: + request_serializer: Optional[Callable[[Request], bytes]]=None, + response_deserializer: Optional[Callable[[bytes], Response]]=None + ) -> grpc.aio.UnaryStreamMultiCallable[Request, Response]: """Creates a UnaryStreamMultiCallable for a server streaming method. Args: @@ -779,9 +779,9 @@ class Channel: def stream_unary(self, method: Text, - request_serializer: Optional[Callable[[Any], bytes]]=None, - response_deserializer: Optional[Callable[[bytes], Any]]=None - ) -> grpc.aio.StreamUnaryMultiCallable: + request_serializer: Optional[Callable[[Request], bytes]]=None, + response_deserializer: Optional[Callable[[bytes], Response]]=None + ) -> grpc.aio.StreamUnaryMultiCallable[Request, Response]: """Creates a StreamUnaryMultiCallable for a client streaming method. Args: @@ -798,9 +798,9 @@ class Channel: def stream_stream(self, method: Text, - request_serializer: Optional[Callable[[Any], bytes]]=None, - response_deserializer: Optional[Callable[[bytes], Any]]=None - ) -> grpc.aio.StreamStreamMultiCallable: + request_serializer: Optional[Callable[[Request], bytes]]=None, + response_deserializer: Optional[Callable[[bytes], Response]]=None + ) -> grpc.aio.StreamStreamMultiCallable[Request, Response]: """Creates a StreamStreamMultiCallable for a bi-directional streaming method. Args: @@ -827,17 +827,17 @@ class Channel: ```Python # grpc.aio.UnaryUnaryMultiCallable -class UnaryUnaryMultiCallable: +class UnaryUnaryMultiCallable(Generic[Request, Response]): """Affords invoking an async unary RPC from client-side.""" def __call__(self, - request: Any, + request: Request, timeout: Optional[int]=None, - metadata: Optional[Sequence[Tuple[Text, Text]]]=None, + metadata: Optional[Sequence[Tuple[Text, AnyStr]]]=None, credentials: Optional[grpc.CallCredentials]=None, wait_for_ready: Optional[bool]=None, compression: Optional[grpc.Compression]=None - ) -> grpc.aio.Call[Any]: + ) -> grpc.aio.Call[Response]: """Schedules the underlying RPC. Args: @@ -863,17 +863,17 @@ class UnaryUnaryMultiCallable: """ # grpc.aio.UnaryStreamMultiCallable -class UnaryStreamMultiCallable: +class UnaryStreamMultiCallable(Generic[Request, Response]): """Affords invoking an async server streaming RPC from client-side.""" def __call__(self, - request: Any, + request: Request, timeout: Optional[int]=None, - metadata: Optional[Sequence[Tuple[Text, Text]]]=None, + metadata: Optional[Sequence[Tuple[Text, AnyStr]]]=None, credentials: Optional[grpc.CallCredentials]=None, wait_for_ready: Optional[bool]=None, compression: Optional[grpc.Compression]=None - ) -> grpc.aio.Call[AsyncIterable[Any]]: + ) -> grpc.aio.Call[AsyncIterable[Response]]: """Schedules the underlying RPC. Args: @@ -899,17 +899,17 @@ class UnaryStreamMultiCallable: """ # grpc.aio.StreamUnaryMultiCallable -class StreamUnaryMultiCallable: +class StreamUnaryMultiCallable(Generic[Request, Response]): """Affords invoking an async client streaming RPC from client-side.""" def __call__(self, - request_iterator: Optional[AsyncIterable[Any]]=None, + request_iterator: Optional[AsyncIterable[Request]]=None, timeout: Optional[int]=None, - metadata: Optional[Sequence[Tuple[Text, Text]]]=None, + metadata: Optional[Sequence[Tuple[Text, AnyStr]]]=None, credentials: Optional[grpc.CallCredentials]=None, wait_for_ready: Optional[bool]=None, compression: Optional[grpc.Compression]=None - ) -> grpc.aio.Call[Any]: + ) -> grpc.aio.Call[Response]: """Schedules the underlying RPC. Args: @@ -936,17 +936,17 @@ class StreamUnaryMultiCallable: """ # grpc.aio.StreamStreamMultiCallable -class StreamStreamMultiCallable: +class StreamStreamMultiCallable(Generic[Request, Response]): """Affords invoking an async bi-directional RPC from client-side.""" def __call__(self, - request_iterator: Optional[AsyncIterable[Any]]=None, + request_iterator: Optional[AsyncIterable[Request]]=None, timeout: Optional[int]=None, - metadata: Optional[Sequence[Tuple[Text, Text]]]=None, + metadata: Optional[Sequence[Tuple[Text, AnyStr]]]=None, credentials: Optional[grpc.CallCredentials]=None, wait_for_ready: Optional[bool]=None, compression: Optional[grpc.Compression]=None - ) -> grpc.aio.Call[AsyncIterable[Any]]: + ) -> grpc.aio.Call[AsyncIterable[Response]]: """Schedules the underlying RPC. Args: @@ -975,7 +975,7 @@ class StreamStreamMultiCallable: ```Python # grpc.aio.Call -class Call(typing.Awaitable[T], grpc.RpcContext): +class Call(Generic[Request, Response], grpc.RpcContext): """The representation of an RPC on the client-side.""" def is_active(self) -> bool: @@ -1012,7 +1012,7 @@ class Call(typing.Awaitable[T], grpc.RpcContext): already terminated or some other reason). """ - async def initial_metadata(self) -> Sequence[Tuple[Text, Text]]: + async def initial_metadata(self) -> Sequence[Tuple[Text, AnyStr]]: """Accesses the initial metadata sent by the server. Coroutine continues once the value is available. @@ -1021,7 +1021,7 @@ class Call(typing.Awaitable[T], grpc.RpcContext): The initial :term:`metadata`. """ - async def trailing_metadata(self) -> Sequence[Tuple[Text, Text]]: + async def trailing_metadata(self) -> Sequence[Tuple[Text, AnyStr]]: """Accesses the trailing metadata sent by the server. Coroutine continues once the value is available. @@ -1048,14 +1048,14 @@ class Call(typing.Awaitable[T], grpc.RpcContext): The details string of the RPC. """ - def __aiter__(self) -> AsyncIterable[Any]: + def __aiter__(self) -> AsyncIterable[Response]: """Returns the async iterable representation that yields messages. Returns: An async iterable object that yields messages. """ - async def read(self) -> Any: + async def read(self) -> Response: """Reads one message from the RPC. Only one read operation is allowed simultaneously. Mixing new streaming API and old @@ -1068,7 +1068,7 @@ class Call(typing.Awaitable[T], grpc.RpcContext): An RpcError exception if the read failed. """ - async def write(self, message: Any) -> None: + async def write(self, message: Request) -> None: """Writes one message to the RPC. Only one write operation is allowed simultaneously. Mixing new streaming API and old @@ -1208,7 +1208,7 @@ class GenericRpcHandler: ```Python # grpc.aio.RpcMethodHandler -class RpcMethodHandler: +class RpcMethodHandler(Generic[Request, Response]): """An implementation of a single RPC method. Attributes: @@ -1244,20 +1244,20 @@ class RpcMethodHandler: """ request_streaming: bool response_streaming: bool - request_deserializer: Optional[Callable[[bytes], Any]] - response_serializer: Optional[Callable[[Any], bytes]] - unary_unary: Optional[Callable[[Any, grpc.aio.ServicerContext], Any]] - unary_stream: Optional[Callable[[Any, grpc.aio.ServicerContext], Optional[AsyncIterable[Any]]]] - stream_unary: Optional[Callable[[AsyncIterable[Any], grpc.aio.ServicerContext], Any]] - stream_stream: Optional[Callable[[AsyncIterable[Any], grpc.aio.ServicerContext], Optional[AsyncIterable[Any]]]] + request_deserializer: Optional[Callable[[bytes], Request]] + response_serializer: Optional[Callable[[Response], bytes]] + unary_unary: Optional[Callable[[Request, grpc.aio.ServicerContext], Response]] + unary_stream: Optional[Callable[[Request, grpc.aio.ServicerContext], Optional[AsyncIterable[Response]]]] + stream_unary: Optional[Callable[[AsyncIterable[Request], grpc.aio.ServicerContext], Response]] + stream_stream: Optional[Callable[[AsyncIterable[Request], grpc.aio.ServicerContext], Optional[AsyncIterable[Response]]]] ``` ```Python # grpc.aio.ServicerContext -class ServicerContext(grpc.RpcContext): +class ServicerContext(Generic[Request, Response], grpc.RpcContext): """A context object passed to method implementations.""" - def invocation_metadata(self) -> Optional[Sequence[Tuple[Text, Text]]]: + def invocation_metadata(self) -> Optional[Sequence[Tuple[Text, AnyStr]]]: """Accesses the metadata from the sent by the client. Returns: @@ -1311,7 +1311,7 @@ class ServicerContext(grpc.RpcContext): grpc.compression.Gzip. """ - async def send_initial_metadata(self, initial_metadata: Sequence[Tuple[Text, Text]]) -> None: + async def send_initial_metadata(self, initial_metadata: Sequence[Tuple[Text, AnyStr]]) -> None: """Sends the initial metadata value to the client. This method need not be called by implementations if they have no @@ -1321,7 +1321,7 @@ class ServicerContext(grpc.RpcContext): initial_metadata: The initial :term:`metadata`. """ - async def set_trailing_metadata(self, trailing_metadata: Sequence[Tuple[Text, Text]]) -> None: + async def set_trailing_metadata(self, trailing_metadata: Sequence[Tuple[Text, AnyStr]]) -> None: """Sends the trailing metadata for the RPC. This method need not be called by implementations if they have no @@ -1395,7 +1395,7 @@ class ServicerContext(grpc.RpcContext): server creation or set on the call. """ - async def read(self) -> Any: + async def read(self) -> Request: """Reads one message from the RPC. Only one read operation is allowed simultaneously. Mixing new streaming API and old @@ -1408,7 +1408,7 @@ class ServicerContext(grpc.RpcContext): An RpcError exception if the read failed. """ - async def write(self, message: Any) -> None: + async def write(self, message: Response) -> None: """Writes one message to the RPC. Only one write operation is allowed simultaneously. Mixing new streaming API and old @@ -1421,10 +1421,10 @@ class ServicerContext(grpc.RpcContext): ```Python # grpc.aio.unary_unary_rpc_method_handler -def unary_unary_rpc_method_handler(behavior: Callable[[Any, grpc.aio.ServicerContext], Any], - request_deserializer: Optional[Callable[[bytes], Any]]=None, - response_serializer: Optional[Callable[[Any], bytes]]=None - ) -> grpc.aio.RpcMethodHandler: +def unary_unary_rpc_method_handler(behavior: Callable[[Request, grpc.aio.ServicerContext], Response], + request_deserializer: Optional[Callable[[bytes], Request]]=None, + response_serializer: Optional[Callable[[Response], bytes]]=None + ) -> grpc.aio.RpcMethodHandler[Request, Response]: """Creates an RpcMethodHandler for a unary-unary RPC method. Args: @@ -1439,10 +1439,10 @@ def unary_unary_rpc_method_handler(behavior: Callable[[Any, grpc.aio.ServicerCon # grpc.aio.unary_stream_rpc_method_handler -def unary_stream_rpc_method_handler(behavior: Callable[[Any, grpc.aio.ServicerContext], Optional[AsyncIterable[Any]]], - request_deserializer: Optional[Callable[[bytes], Any]]=None, - response_serializer: Optional[Callable[[Any], bytes]]=None - ) -> grpc.aio.RpcMethodHandler: +def unary_stream_rpc_method_handler(behavior: Callable[[Request, grpc.aio.ServicerContext], Optional[AsyncIterable[Response]]], + request_deserializer: Optional[Callable[[bytes], Request]]=None, + response_serializer: Optional[Callable[[Response], bytes]]=None + ) -> grpc.aio.RpcMethodHandler[Request, Response]: """Creates an RpcMethodHandler for a unary-stream RPC method. Args: @@ -1457,10 +1457,10 @@ def unary_stream_rpc_method_handler(behavior: Callable[[Any, grpc.aio.ServicerCo # grpc.aio.stream_unary_rpc_method_handler -def stream_unary_rpc_method_handler(behavior: Callable[[AsyncIterable[Any], grpc.aio.ServicerContext], Any], - request_deserializer: Optional[Callable[[bytes], Any]]=None, - response_serializer: Optional[Callable[[Any], bytes]]=None - ) -> grpc.aio.RpcMethodHandler: +def stream_unary_rpc_method_handler(behavior: Callable[[AsyncIterable[Request], grpc.aio.ServicerContext], Response], + request_deserializer: Optional[Callable[[bytes], Request]]=None, + response_serializer: Optional[Callable[[Response], bytes]]=None + ) -> grpc.aio.RpcMethodHandler[Reqeust, Response]: """Creates an RpcMethodHandler for a stream-unary RPC method. Args: @@ -1475,10 +1475,10 @@ def stream_unary_rpc_method_handler(behavior: Callable[[AsyncIterable[Any], grpc # grpc.aio.stream_stream_rpc_method_handler -def stream_stream_rpc_method_handler(behavior: Callable[[AsyncIterable[Any], grpc.aio.ServicerContext], Optional[AsyncIterable[Any]]], - request_deserializer: Optional[Callable[[bytes], Any]]=None, - response_serializer: Optional[Callable[[Any], bytes]]=None - ) -> grpc.aio.RpcMethodHandler: +def stream_stream_rpc_method_handler(behavior: Callable[[AsyncIterable[Request], grpc.aio.ServicerContext], Optional[AsyncIterable[Response]]], + request_deserializer: Optional[Callable[[bytes], Request]]=None, + response_serializer: Optional[Callable[[Response], bytes]]=None + ) -> grpc.aio.RpcMethodHandler[Request, Response]: """Creates an RpcMethodHandler for a stream-stream RPC method. Args: @@ -1549,10 +1549,10 @@ class UnaryUnaryClientInterceptor: """Affords intercepting unary-unary invocations.""" async def intercept_unary_unary(self, - continuation: Callable[[grpc.ClientCallDetails, Any], grpc.aio.Call[Any]], + continuation: Callable[[grpc.ClientCallDetails, Request], grpc.aio.Call[Response]], client_call_details: grpc.ClientCallDetails, - request: Any - ) -> grpc.aio.Call[Any]: + request: Request + ) -> grpc.aio.Call[Response]: """Intercepts a unary-unary invocation asynchronously. Args: @@ -1586,10 +1586,10 @@ class UnaryStreamClientInterceptor: """Affords intercepting unary-stream invocations.""" async def intercept_unary_stream(self, - continuation: Callable[[grpc.ClientCallDetails, Any], grpc.aio.Call[AsyncIterable[Any]]], + continuation: Callable[[grpc.ClientCallDetails, Request], grpc.aio.Call[AsyncIterable[Response]]], client_call_details: grpc.ClientCallDetails, - request: Any - ) -> grpc.aio.Call[AsyncIterable[Any]]: + request: Request + ) -> grpc.aio.Call[AsyncIterable[Response]]: """Intercepts a unary-stream invocation. Args: @@ -1621,10 +1621,10 @@ class StreamUnaryClientInterceptor: """Affords intercepting stream-unary invocations.""" async def intercept_stream_unary(self, - continuation: Callable[[grpc.ClientCallDetails, Optional[AsyncIterable[Any]]], grpc.aio.Call[Any]], + continuation: Callable[[grpc.ClientCallDetails, Optional[AsyncIterable[Request]]], grpc.aio.Call[Response]], client_call_details: grpc.ClientCallDetails, - request_iterator: AsyncIterable[Any] - ) -> grpc.aio.Call[Any]: + request_iterator: AsyncIterable[Request] + ) -> grpc.aio.Call[Response]: """Intercepts a stream-unary invocation asynchronously. Args: @@ -1657,10 +1657,10 @@ class StreamStreamClientInterceptor: """Affords intercepting stream-stream invocations.""" async def intercept_stream_stream(self, - continuation: Callable[[grpc.ClientCallDetails, Optional[AsyncIterable[Any]]], grpc.aio.Call[AsyncIterable[Any]]], + continuation: Callable[[grpc.ClientCallDetails, Optional[AsyncIterable[Request]]], grpc.aio.Call[AsyncIterable[Response]]], client_call_details: grpc.ClientCallDetails, - request_iterator: AsyncIterable[Any] - ) -> grpc.aio.Call[AsyncIterable[Any]]: + request_iterator: AsyncIterable[Request] + ) -> grpc.aio.Call[AsyncIterable[Response]]: """Intercepts a stream-stream invocation. Args: From 34a5e6b54eb6ea52607b4572c3284593cb97dcaf Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 3 Oct 2019 17:31:54 -0700 Subject: [PATCH 18/28] Make the add_done_callback signature consistent with Future --- L58-python-async-api.md | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 3819650b2..13753f8a9 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -1000,16 +1000,11 @@ class Call(Generic[Request, Response], grpc.RpcContext): Idempotent and has no effect if the RPC has already terminated. """ - def add_callback(self, callback: Callable[None, None]) -> None: + def add_done_callback(self, callback: Callable[[Call], None]) -> None: """Registers a callback to be called on RPC termination. Args: - callback: A no-parameter callable to be called on RPC termination. - - Returns: - True if the callback was added and will be called later; False if - the callback was not added and will not be called (because the RPC - already terminated or some other reason). + callback: A callable that accepts the grpc.aio.Call itself. """ async def initial_metadata(self) -> Sequence[Tuple[Text, AnyStr]]: @@ -1417,6 +1412,13 @@ class ServicerContext(Generic[Request, Response], grpc.RpcContext): Raises: An RpcError exception if the write failed. """ + + def add_done_callback(self, callback: Callable[[ServicerContext], None]) -> None: + """Registers a callback to be called on RPC termination. + + Args: + callback: A callable that accepts the grpc.aio.ServicerContext itself. + """ ``` ```Python From fe0a88d21e68b2bb514381f20d0dd570b5b4d9be Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Mon, 14 Oct 2019 10:58:50 -0700 Subject: [PATCH 19/28] Fix typo --- L58-python-async-api.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 13753f8a9..91a8d480e 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -145,7 +145,7 @@ gRPC Python programming in Python 3 runtime. `asyncio` is great, but not silver bullet to solve everything. It has its limitations and unique advantages. gRPC Python as a framework should empower tech-savvy users to use the cutting-edge features, in the same time as we allow -majority of our users to code in the way they familier. +majority of our users to code in the way they familiar. The new API will be isolated from the current API. The implementation of the new API is an entirely different stack than the current stack. On the downside, most From cca5c473fcce7303b61045f5500172ce3daf08e1 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Mon, 14 Oct 2019 11:27:18 -0700 Subject: [PATCH 20/28] Remove subscribe & unsubscribe from channel object --- L58-python-async-api.md | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 91a8d480e..e1e17f6a1 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -677,36 +677,6 @@ class Channel: Channel objects implement the Async Context Manager type, although they need not support being entered and exited multiple times. """ - - def subscribe(self, - callback: Callable[[ChannelConnectivity], None], - try_to_connect: bool=False) -> None: - """Subscribe to this Channel's connectivity state machine. - - A Channel may be in any of the states described by ChannelConnectivity. - This method allows application to monitor the state transitions. - The typical use case is to debug or gain better visibility into gRPC - runtime's state. - - Args: - callback: A callable to be invoked with ChannelConnectivity argument. - ChannelConnectivity describes current state of the channel. - The callable will be invoked immediately upon subscription - and again for every change to ChannelConnectivity until it - is unsubscribed or this Channel object goes out of scope. - try_to_connect: A boolean indicating whether or not this Channel - should attempt to connect immediately. If set to False, gRPC - runtime decides when to connect. - """ - - def unsubscribe(self, - callback: Callable[[ChannelConnectivity], None]) -> None: - """Unsubscribes a subscribed callback from this Channel's connectivity. - - Args: - callback: A callable previously registered with this Channel from - having been passed to its "subscribe" method. - """ def check_connectivity_state(self, try_to_connect: bool=False) -> grpc.ChannelConnectivity: """Check the connectivity state of a channel. From c6b645181aa6d0a43fa67d79050aaf55c878243b Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Mon, 14 Oct 2019 12:53:09 -0700 Subject: [PATCH 21/28] Update the code snippets for async-iter API --- L58-python-async-api.md | 79 +++++++++-------------------------------- 1 file changed, 17 insertions(+), 62 deletions(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index e1e17f6a1..82aa3d032 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -270,63 +270,22 @@ class AsyncGreeter(helloworld_pb2_grpc.GreeterServicer): ```Python ### Client side -class RequestIterator: - def __init__(self): - self._queue = asyncio.Queue() +async def async_request_iterator() -> HelloRequest: + for request in [...]: + yield response - async def send(self, message: HelloRequest): - await self._queue.put(message) - - def __aiter__(self) -> AsyncIterable[HelloRequest]: - return self - - async def __anext__(self) -> HelloRequest: - return await self._queue.get(block=True) - -request_iterator = RequestIterator() # No await needed, the response_iterator is grpc.aio.Call -response_iterator = stub.StreamingHi(request_iterator) - -# In sending coroutine -await request_iterator.send(proto_message) - -# In receiving coroutine +response_iterator = stub.StreamingHi(async_request_iterator()) async for response in response_iterator: process(response) ### Server side -class ResponseIterator: - - def __init__(self): - self._queue = asyncio.Queue() - - async def send(self, message: HelloReply): - await self._queue.put(message) - - def __aiter__(self) -> AsyncIterable[HelloReply]: - return self - - async def __anext__(self) -> HelloReply: - return await self._queue.get() - -async def streaming_hi_worker( - request_iterator: AsyncIterable[HelloRequest], - response_iterator: ResponseIterator - ) -> None: - async for request in request_iterator: - if request.needs_respond: - await response_iterator.send(response) class Greeter(helloworld_pb2_grpc.GreeterServicer): - async def StreamingHi(self, - request_iterator: AsyncIterable[HelloRequest], - context: grpc.aio.ServicerContext - ) -> AsyncIterable[HelloReply]: - response_iterator = ResponseIterator() - # Handle the write in another coroutine - asyncio.get_event_loop.create_task(streaming_hi_worker(request_iterator, response_iterator)) - return response_iterator + async def StreamingHi(self, request_iterator, context): + async for request in request_iterator: + yield response ``` #### Co-existence Of New And Current Streaming API @@ -457,20 +416,16 @@ stub.SayHelloStreaming(iter([ ])) ### The new usage is much verbose for same scenario -class AsyncIter: - def __init__(self, items): - self.items = items - - async def __aiter__(self): - for item in self.items: - yield item - -stub.SayHelloStreaming(AsyncIter([ - HelloRequest(name='Golden'), - HelloRequest(name='Retriever'), - HelloRequest(name='Pan'), - HelloRequest(name='Cake'), -])) +async def request_iterator(): + for request from [ + HelloRequest(name='Golden'), + HelloRequest(name='Retriever'), + HelloRequest(name='Pan'), + HelloRequest(name='Cake'), + ]: + yield request + +stub.SayHelloStreaming(request_iterator()) ``` ### No Special Async Functions Naming Pattern From d51b0a8517c147fcbb60de0f6a219a34b96f079a Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Fri, 18 Oct 2019 11:36:43 -0700 Subject: [PATCH 22/28] Add done_writing API to grpc.aio.Call --- L58-python-async-api.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 82aa3d032..e23ee8fad 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -997,6 +997,13 @@ class Call(Generic[Request, Response], grpc.RpcContext): Raises: An RpcError exception if the write failed. """ + + def done_writing(self) -> None: + """Notifies server that the client is done sending messages. + + After done_writing is called, any additional invocation to the write + function will fail with RpcError. + """ ``` ### Server-Side From a878cd23024e46b49cb75de2571d38710f57d500 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Fri, 18 Oct 2019 13:09:33 -0700 Subject: [PATCH 23/28] Reduce confusion caused by the snippet --- L58-python-async-api.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index e23ee8fad..8319068eb 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -213,7 +213,7 @@ async with grpc.aio.insecure_channel("localhost:50051") as channel: process(response) ``` -### New Streaming API +### New Streaming API - Reader / Writer API Existing streaming API has usability problem that its logic complexes user application. For client side, it requires the request iterator to be defined @@ -240,7 +240,7 @@ euroelessar@ points out several great reasons for the new streaming API: So, this gRFC introduces a new pattern of streaming API that reads/writes message to peer with explicit call. -#### Snippet For New Streaming API +#### Snippet For Reader / Writer Streaming API ```Python ### Client side @@ -260,13 +260,13 @@ class AsyncGreeter(helloworld_pb2_grpc.GreeterServicer): context: grpc.aio.ServicerContext ) -> None: bootstrap_request = await context.read() - initialize_environment(bootstrap_request) - while has_response(): + ...(bootstrap_request) + while ...: response = ... await context.write(response) ``` -#### Snippet For Current Streaming API +#### Snippet For Async Iterator Streaming API ```Python ### Client side From 2c18bf39b0ecf44d02768c83bd7a544a3e460fd3 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Mon, 4 Nov 2019 17:34:20 -0800 Subject: [PATCH 24/28] Style the example snippets --- L58-python-async-api.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 8319068eb..3af0677c0 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -251,7 +251,9 @@ with stub.StreamingHi() as streaming_call: while response: # or response is not grpc.aio.EOF process(response) response = await streaming_call.read() +``` +```Python ### Server side class AsyncGreeter(helloworld_pb2_grpc.GreeterServicer): @@ -279,7 +281,9 @@ async def async_request_iterator() -> HelloRequest: response_iterator = stub.StreamingHi(async_request_iterator()) async for response in response_iterator: process(response) +``` +```Python ### Server side class Greeter(helloworld_pb2_grpc.GreeterServicer): From d15ff9fe8d2fb4238158894e58818718ec2a8d20 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 22 Jan 2020 14:39:12 -0800 Subject: [PATCH 25/28] Update the client-side interfaces --- L58-python-async-api.md | 147 ++++++++++++++++++++++++++++++---------- 1 file changed, 111 insertions(+), 36 deletions(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 3af0677c0..d53cee26d 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -1,10 +1,10 @@ Async API for gRPC Python ---- -* Author(s): lidizheng +* Author(s): lidizheng, pfreixes * Approver: gnossen * Status: In Review * Implemented in: Python -* Last updated: 2019-10-01 +* Last updated: 2020-01-22 * Discussions at: * https://groups.google.com/forum/#!topic/grpc-io/7V7HYM_aph4 * https://github.com/lidizheng/grpc-api-examples/pull/1 @@ -903,18 +903,29 @@ class StreamStreamMultiCallable(Generic[Request, Response]): ``` ```Python -# grpc.aio.Call -class Call(Generic[Request, Response], grpc.RpcContext): - """The representation of an RPC on the client-side.""" +# grpc.aio.RpcContext +class RpcContext: + """Provides RPC-related information and action.""" + + def cancelled(self) -> bool: + """Return True if the RPC is cancelled. + + The RPC is cancelled when the cancellation was requested with cancel(). + + Returns: + A bool indicates whether the RPC is cancelled or not. + """ - def is_active(self) -> bool: - """Describes whether the RPC is active or has terminated. + def done(self) -> bool: + """Return True if the RPC is done. + + An RPC is done if the RPC is completed, cancelled or aborted. Returns: - True if RPC is active, False otherwise. + A bool indicates if the RPC is done. """ - def time_remaining(self) -> float: + def time_remaining(self) -> Optional[float]: """Describes the length of allowed time remaining for the RPC. Returns: @@ -923,33 +934,39 @@ class Call(Generic[Request, Response], grpc.RpcContext): timed out, or None if no deadline was specified for the RPC. """ - def cancel(self) -> None: + def cancel(self) -> bool: """Cancels the RPC. Idempotent and has no effect if the RPC has already terminated. + + Returns: + A bool indicates if the cancellation is performed or not. """ - def add_done_callback(self, callback: Callable[[Call], None]) -> None: + def add_done_callback(self, callback: DoneCallbackType) -> None: """Registers a callback to be called on RPC termination. Args: - callback: A callable that accepts the grpc.aio.Call itself. + callback: A callable object will be called with the call object as + its only argument. """ +``` - async def initial_metadata(self) -> Sequence[Tuple[Text, AnyStr]]: - """Accesses the initial metadata sent by the server. +```Python +# grpc.aio.Call +class Call(grpc.aio.RpcContext): + """The abstract base class of an RPC on the client-side.""" - Coroutine continues once the value is available. + async def initial_metadata(self) -> MetadataType: + """Accesses the initial metadata sent by the server. Returns: The initial :term:`metadata`. """ - async def trailing_metadata(self) -> Sequence[Tuple[Text, AnyStr]]: + async def trailing_metadata(self) -> MetadataType: """Accesses the trailing metadata sent by the server. - Coroutine continues once the value is available. - Returns: The trailing :term:`metadata`. """ @@ -957,8 +974,6 @@ class Call(Generic[Request, Response], grpc.RpcContext): async def code(self) -> grpc.StatusCode: """Accesses the status code sent by the server. - Coroutine continues once the value is available. - Returns: The StatusCode value for the RPC. """ @@ -966,47 +981,107 @@ class Call(Generic[Request, Response], grpc.RpcContext): async def details(self) -> Text: """Accesses the details sent by the server. - Coroutine continues once the value is available. - Returns: The details string of the RPC. """ - def __aiter__(self) -> AsyncIterable[Response]: + +# grpc.aio.UnaryUnaryCall +class UnaryUnaryCall(Generic[RequestType, ResponseType], Call): + """The abstract base class of an unary-unary RPC on the client-side.""" + + def __await__(self) -> Awaitable[ResponseType]: + """Await the response message to be ready. + + Returns: + The response message of the RPC. + """ + + +# grpc.aio.UnaryStreamCall +class UnaryStreamCall(Generic[RequestType, ResponseType], Call): + + def __aiter__(self) -> AsyncIterable[ResponseType]: """Returns the async iterable representation that yields messages. + Under the hood, it is calling the "read" method. + Returns: An async iterable object that yields messages. """ - async def read(self) -> Response: - """Reads one message from the RPC. + async def read(self) -> Union[EOFType, ResponseType]: + """Reads one message from the stream. - Only one read operation is allowed simultaneously. Mixing new streaming API and old - streaming API will resulted in undefined behavior. + Read operations must be serialized when called from multiple + coroutines. Returns: - A response message of the RPC. - + A response message, or an `grpc.aio.EOF` to indicate the end of the + stream. + """ + + +# grpc.aio.StreamUnaryCall +class StreamUnaryCall(Generic[RequestType, ResponseType], Call): + + async def write(self, request: RequestType) -> None: + """Writes one message to the stream. + Raises: - An RpcError exception if the read failed. + An RpcError exception if the write failed. """ - async def write(self, message: Request) -> None: - """Writes one message to the RPC. + async def done_writing(self) -> None: + """Notifies server that the client is done sending messages. - Only one write operation is allowed simultaneously. Mixing new streaming API and old - streaming API will resulted in undefined behavior. + After done_writing is called, any additional invocation to the write + function will fail. This function is idempotent. + """ + + def __await__(self) -> Awaitable[ResponseType]: + """Await the response message to be ready. + + Returns: + The response message of the stream. + """ + + +# grpc.aio.StreamStreamCall +class StreamStreamCall(Generic[RequestType, ResponseType], Call): + + def __aiter__(self) -> AsyncIterable[ResponseType]: + """Returns the async iterable representation that yields messages. + + Under the hood, it is calling the "read" method. + + Returns: + An async iterable object that yields messages. + """ + + async def read(self) -> Union[EOFType, ResponseType]: + """Reads one message from the stream. + + Read operations must be serialized when called from multiple + coroutines. + + Returns: + A response message, or an `grpc.aio.EOF` to indicate the end of the + stream. + """ + + async def write(self, request: RequestType) -> None: + """Writes one message to the stream. Raises: An RpcError exception if the write failed. """ - def done_writing(self) -> None: + async def done_writing(self) -> None: """Notifies server that the client is done sending messages. After done_writing is called, any additional invocation to the write - function will fail with RpcError. + function will fail. This function is idempotent. """ ``` From b9074db9201a0f66f22ce94a63a793919ed9d1a6 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 23 Jan 2020 10:09:30 -0800 Subject: [PATCH 26/28] Fix code snippet: "Greeter" -> "AsyncGreeter" --- L58-python-async-api.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index d53cee26d..609f6e183 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -198,7 +198,7 @@ class AsyncGreeter(helloworld_pb2_grpc.GreeterServicer): server = grpc.aio.server() server.add_insecure_port(":50051") -helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) +helloworld_pb2_grpc.add_GreeterServicer_to_server(AsyncGreeter(), server) server.start() await server.wait_for_termination() ``` From 235c477f48246ac3f44fc6e26f7c8f57f53076b1 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 29 Jan 2020 10:57:59 -0800 Subject: [PATCH 27/28] Fix typos & adding 'New Exceptions' section --- L58-python-async-api.md | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index 609f6e183..afbee1b1a 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -1,7 +1,7 @@ Async API for gRPC Python ---- * Author(s): lidizheng, pfreixes -* Approver: gnossen +* Approver: gnossen, rmariano, bloodbare * Status: In Review * Implemented in: Python * Last updated: 2020-01-22 @@ -296,7 +296,7 @@ class Greeter(helloworld_pb2_grpc.GreeterServicer): Existing API still has advantage over simpler use cases of gRPC, and iterator syntax is Pythonic to use. Also, for backward compatibility concern, -iterator-based API needs to stay. So, both new and current streaming API needs +iterator-based API needs to stay. So, both new and current streaming API need to be included in the surface API. To keep the function signature stable, the new streaming API only requires @@ -421,7 +421,7 @@ stub.SayHelloStreaming(iter([ ### The new usage is much verbose for same scenario async def request_iterator(): - for request from [ + for request in [ HelloRequest(name='Golden'), HelloRequest(name='Retriever'), HelloRequest(name='Pan'), @@ -1721,6 +1721,26 @@ async def channel_ready(channel: grpc.aio.Channel) -> None: """ ``` +### New Exceptions + +```Python +# grpc.BaseError +class BaseError(Exception): + """The base class for all exceptions generated by gRPC framework.""" + +# grpc.AbortError +class AbortError(BaseError): + """Raised when calling abort in servicer methods. + + This exception should not be suppressed. Applications may catch it to + perform certain clean-up logic, and then re-raise it. + """ + +# grpc.UsageError +class UsageError(BaseError): + """Raised when the usage might lead to undefined behavior.""" +``` + ### Shared APIs APIs in the following categories remain in top level: From 16516df2fc6dab01721dda84e2279994c9b8f4ef Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 13 May 2020 10:30:48 -0700 Subject: [PATCH 28/28] Prepare the proposal for merging --- L58-python-async-api.md | 105 ++++++++-------------------------------- 1 file changed, 19 insertions(+), 86 deletions(-) diff --git a/L58-python-async-api.md b/L58-python-async-api.md index afbee1b1a..0cac4d72c 100644 --- a/L58-python-async-api.md +++ b/L58-python-async-api.md @@ -1,10 +1,10 @@ Async API for gRPC Python ---- * Author(s): lidizheng, pfreixes -* Approver: gnossen, rmariano, bloodbare -* Status: In Review +* Approver: gnossen, rmariano +* Status: Approved * Implemented in: Python -* Last updated: 2020-01-22 +* Last updated: 2020-05-13 * Discussions at: * https://groups.google.com/forum/#!topic/grpc-io/7V7HYM_aph4 * https://github.com/lidizheng/grpc-api-examples/pull/1 @@ -145,7 +145,7 @@ gRPC Python programming in Python 3 runtime. `asyncio` is great, but not silver bullet to solve everything. It has its limitations and unique advantages. gRPC Python as a framework should empower tech-savvy users to use the cutting-edge features, in the same time as we allow -majority of our users to code in the way they familiar. +majority of our users to use Python in their most familiar way. The new API will be isolated from the current API. The implementation of the new API is an entirely different stack than the current stack. On the downside, most @@ -199,7 +199,7 @@ class AsyncGreeter(helloworld_pb2_grpc.GreeterServicer): server = grpc.aio.server() server.add_insecure_port(":50051") helloworld_pb2_grpc.add_GreeterServicer_to_server(AsyncGreeter(), server) -server.start() +await server.start() await server.wait_for_termination() ``` @@ -248,7 +248,7 @@ with stub.StreamingHi() as streaming_call: request = echo_pb2.EchoRequest(message="ping") await streaming_call.write(request) response = await streaming_call.read() - while response: # or response is not grpc.aio.EOF + while response != grpc.aio.EOF: process(response) response = await streaming_call.read() ``` @@ -403,35 +403,6 @@ However, by supporting the executors, we can **allow mixing async and sync** **method handlers** on the server side, which further reduce the cost of migration. -### Using Asynchronous Generator - -For streaming calls, the requests on the client-side are supplied by generator -for now. If a user wants to provide a pre-defined list of request messages, they -can use build-in `iter()` function. But there isn't an equivalent function for -async generator. Should we wrap it inside our library to increase usability? - -```Python -### Current usage of Python generator -stub.SayHelloStreaming(iter([ - HelloRequest(name='Golden'), - HelloRequest(name='Retriever'), - HelloRequest(name='Pan'), - HelloRequest(name='Cake'), -])) - -### The new usage is much verbose for same scenario -async def request_iterator(): - for request in [ - HelloRequest(name='Golden'), - HelloRequest(name='Retriever'), - HelloRequest(name='Pan'), - HelloRequest(name='Cake'), - ]: - yield request - -stub.SayHelloStreaming(request_iterator()) -``` - ### No Special Async Functions Naming Pattern Fire-and-forget is a valid use case in async programming. For the non-critical @@ -512,40 +483,6 @@ So, even if all the read/write is asynchronous in `asyncio`, we will have to either enforce the rule ourselves by adding locks in our implementation. Or we can pass down the synchronization responsibility to our users. -### Concrete Class Instead of "Interfaces" - -Interface is a design pattern that defines the contract of an entity that allows -different implementation to work seamlessly in a system. In the past, gRPC -Python has been using metaclass based Python interface pattern. It works just -like interface in Golang and Java, except the error is generated in runtime -instead of compile time. - -If the gRPC Python has multiple implementation for a single interface, the use -of the design pattern provides productivity in unifying their behavior. However, -almost non interfaces has second implementation, even if they do, they are -depends directly on our concrete implementation, which should better using -inheritance or composition than interfaces. - -Also, in the past, dependents of gRPC Python have observed several failure -caused by the interface. The interface constraints our ability to add -experimental API. Once we change even slightly with interfaces, the downstream -implementations are likely to break. - -Since this is a new opportunity for us to re-design, we need to think cautiously -about how do we empower our users to extend our classes. For majority of cases, -we are providing the only implementation for the interface. We should -convert them into concrete classes. - -On the other hand, there are actually one valid use case that we should keep -abstract class -- interceptors. To be more specific, the following interfaces -won't be replaced by concrete classes: - -* `grpc.ServerInterceptor` -* `grpc.UnaryUnaryClientInterceptor` -* `grpc.UnaryStreamClientInterceptor` -* `grpc.StreamUnaryClientInterceptor` -* `grpc.StreamStreamClientInterceptor` - ## Future Features ### Implicit Context Propagation By `contextvars` @@ -567,7 +504,7 @@ For deadline propagation, the deadline of upstream server will be implicitly pass down to downstream server, so downstream services can react to that information and save computation resources or perform flow control. -Acceptence critiria of this feature: +Acceptance criteria of this feature: * The implementation of such feature should supported by official package, and users are not expected to directly access those metadata; * Application logic has higher priority than the implicit propagation (e.g. @@ -576,7 +513,7 @@ users are not expected to directly access those metadata; * The exception error string should be informative (e.g. pointing out the timeout is due to upstream deadline). -Further discussion around this topic, please see related sections under [Rationale]. +Further discussion around this topic, see [grpc-api-examples#2](https://github.com/lidizheng/grpc-api-examples/pull/2). ### Introduce Typing To Generated Code @@ -622,10 +559,12 @@ other out of box. ### Explicit Context vs. Implicit Context -TBD ## API Interfaces +The proposed API interface might be obsoleted, please refer to our [master](https://github.com/grpc/grpc) +branch or our [API reference](https://grpc.github.io/grpc/python/grpc_asyncio.html). + ### Channel-Side ```Python @@ -668,6 +607,9 @@ class Channel: A ChannelConnectivity object or None. """ + async def channel_ready(self) -> None: + """Creates a coroutine that blocks until the Channel is READY.""" + def unary_unary(self, method: Text, request_serializer: Optional[Callable[[Request], bytes]]=None, @@ -1337,7 +1279,7 @@ class ServicerContext(Generic[Request, Response], grpc.RpcContext): trailing_metadata: The trailing :term:`metadata`. """ - def abort(self, code: grpc.StatusCode, details: Text) -> NoReturn: + async def abort(self, code: grpc.StatusCode, details: Text) -> NoReturn: """Raises an exception to terminate the RPC with a non-OK status. The code and details passed as arguments will supercede any existing @@ -1354,7 +1296,7 @@ class ServicerContext(Generic[Request, Response], grpc.RpcContext): RPC to the gRPC runtime. """ - def abort_with_status(self, status: grpc.Status) -> NoReturn: + async def abort_with_status(self, status: grpc.Status) -> NoReturn: """Raises an exception to terminate the RPC with a non-OK status. The status passed as argument will supercede any existing status code, @@ -1711,16 +1653,6 @@ class _EOF: EOF = _EOF() ``` -```Python -# grpc.aio.channel_ready -async def channel_ready(channel: grpc.aio.Channel) -> None: - """Creates a coroutine that ends when a Channel is ready. - - Args: - channel: A Channel object. - """ -``` - ### New Exceptions ```Python @@ -1758,8 +1690,8 @@ APIs in the following categories remain in top level: Reviewers has thrown many valuable proposals. This design doc may not be the ideal place for those discussions. -* Re-design the connectivity API to be consistent with C-Core. * Design a easy way to use channel arguments [grpc#19734](https://github.com/grpc/grpc/issues/19734). +* Integrate type annotation into generated code [grpc#20479](https://github.com/grpc/grpc/issues/20479). ## Related Issues @@ -1769,4 +1701,5 @@ ideal place for those discussions. ## Implementation -* TODO +* [Project Dashboard: gRPC Async API](https://github.com/grpc/grpc/projects/16) +* API reference available since v1.28.0