From 9b37761d90f60e8620da273f0c75cc0fd23dd725 Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Wed, 6 Dec 2023 22:08:40 -0500 Subject: [PATCH 01/15] Update deps, hyper 1.0, axum 0.7 --- tonic/Cargo.toml | 46 +++++++++++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index 013cc6e72..747635056 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -28,7 +28,14 @@ gzip = ["dep:flate2"] zstd = ["dep:zstd"] default = ["transport", "codegen", "prost"] prost = ["dep:prost"] -tls = ["dep:rustls-pemfile", "transport", "dep:tokio-rustls", "dep:rustls", "tokio/rt", "tokio/macros"] +tls = [ + "dep:rustls-pemfile", + "transport", + "dep:tokio-rustls", + "dep:rustls", + "tokio/rt", + "tokio/macros", +] tls-roots = ["tls-roots-common", "dep:rustls-native-certs"] tls-roots-common = ["tls"] tls-webpki-roots = ["tls-roots-common", "dep:webpki-roots"] @@ -52,29 +59,42 @@ channel = [] [dependencies] base64 = "0.21" bytes = "1.0" -http = "0.2" +http = "1.0" tracing = "0.1" tokio = "1.0.1" -http-body = "0.4.4" +http-body = "1.0" +http-body-util = "0.1" percent-encoding = "2.1" pin-project = "1.0.11" tower-layer = "0.3" tower-service = "0.3" # prost -prost = {version = "0.12", default-features = false, features = ["std"], optional = true} +prost = { version = "0.12", default-features = false, features = [ + "std", +], optional = true } # codegen -async-trait = {version = "0.1.13", optional = true} +async-trait = { version = "0.1.13", optional = true } # transport -h2 = {version = "0.3.17", optional = true} -hyper = {version = "0.14.26", features = ["full"], optional = true} -hyper-timeout = {version = "0.4", optional = true} +h2 = { version = "0.4", optional = true } +hyper = { version = "1.0", features = ["full"], optional = true } +hyper-util = { version = "0.1", features = ["full"] } +hyper-timeout = { version = "0.5", optional = true } tokio-stream = "0.1" -tower = {version = "0.4.7", default-features = false, features = ["balance", "buffer", "discover", "limit", "load", "make", "timeout", "util"], optional = true} -axum = {version = "0.6.9", default_features = false, optional = true} +tower = { version = "0.4.7", default-features = false, features = [ + "balance", + "buffer", + "discover", + "limit", + "load", + "make", + "timeout", + "util", +], optional = true } +axum = { version = "0.7", default_features = false, optional = true } # rustls async-stream = { version = "0.3", optional = true } @@ -85,7 +105,7 @@ rustls = { version = "0.21.7", optional = true } webpki-roots = { version = "0.25.0", optional = true } # compression -flate2 = {version = "1.0", optional = true} +flate2 = { version = "1.0", optional = true } zstd = { version = "0.12.3", optional = true } [dev-dependencies] @@ -94,8 +114,8 @@ quickcheck = "1.0" quickcheck_macros = "1.0" rand = "0.8" static_assertions = "1.0" -tokio = {version = "1.0", features = ["rt", "macros"]} -tower = {version = "0.4.7", features = ["full"]} +tokio = { version = "1.0", features = ["rt", "macros"] } +tower = { version = "0.4.7", features = ["full"] } [package.metadata.docs.rs] all-features = true From 2d4595a59ab6845f56e662081abc9f50755ea840 Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Wed, 6 Dec 2023 22:17:34 -0500 Subject: [PATCH 02/15] Update imports to use util crates Some low-hanging import fixes --- tonic/src/body.rs | 6 ++---- tonic/src/transport/channel/endpoint.rs | 4 ++-- tonic/src/transport/channel/mod.rs | 3 ++- tonic/src/transport/server/mod.rs | 2 +- tonic/src/transport/service/connection.rs | 3 ++- tonic/src/transport/service/discover.rs | 2 +- tonic/src/transport/service/io.rs | 2 +- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/tonic/src/body.rs b/tonic/src/body.rs index ef95eec47..624b5f692 100644 --- a/tonic/src/body.rs +++ b/tonic/src/body.rs @@ -1,9 +1,7 @@ //! HTTP specific body utilities. -use http_body::Body; - /// A type erased HTTP body used for tonic services. -pub type BoxBody = http_body::combinators::UnsyncBoxBody; +pub type BoxBody = http_body_util::combinators::UnsyncBoxBody; /// Convert a [`http_body::Body`] into a [`BoxBody`]. pub(crate) fn boxed(body: B) -> BoxBody @@ -16,7 +14,7 @@ where /// Create an empty `BoxBody` pub fn empty_body() -> BoxBody { - http_body::Empty::new() + http_body_util::Empty::new() .map_err(|err| match err {}) .boxed_unsync() } diff --git a/tonic/src/transport/channel/endpoint.rs b/tonic/src/transport/channel/endpoint.rs index 6aacb57a5..ac23de98e 100644 --- a/tonic/src/transport/channel/endpoint.rs +++ b/tonic/src/transport/channel/endpoint.rs @@ -313,7 +313,7 @@ impl Endpoint { /// Create a channel from this config. pub async fn connect(&self) -> Result { - let mut http = hyper::client::connect::HttpConnector::new(); + let mut http = hyper_util::client::legacy::connect::HttpConnector::new(); http.enforce_http(false); http.set_nodelay(self.tcp_nodelay); http.set_keepalive(self.tcp_keepalive); @@ -334,7 +334,7 @@ impl Endpoint { /// The channel returned by this method does not attempt to connect to the endpoint until first /// use. pub fn connect_lazy(&self) -> Channel { - let mut http = hyper::client::connect::HttpConnector::new(); + let mut http = hyper_util::client::legacy::connect::HttpConnector::new(); http.enforce_http(false); http.set_nodelay(self.tcp_nodelay); http.set_keepalive(self.tcp_keepalive); diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index b510a6980..94ed09953 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -17,7 +17,7 @@ use http::{ uri::{InvalidUri, Uri}, Request, Response, }; -use hyper::client::connect::Connection as HyperConnection; +use hyper_util::client::legacy::connect::Connection as HyperConnection; use std::{ fmt, future::Future, @@ -236,3 +236,4 @@ impl fmt::Debug for ResponseFuture { f.debug_struct("ResponseFuture").finish() } } + diff --git a/tonic/src/transport/server/mod.rs b/tonic/src/transport/server/mod.rs index 7f2ffde2b..9123e98eb 100644 --- a/tonic/src/transport/server/mod.rs +++ b/tonic/src/transport/server/mod.rs @@ -63,7 +63,7 @@ use tower::{ Service, ServiceBuilder, }; -type BoxHttpBody = http_body::combinators::UnsyncBoxBody; +type BoxHttpBody = http_body_util::combinators::UnsyncBoxBody; type BoxService = tower::util::BoxService, Response, crate::Error>; type TraceInterceptor = Arc) -> tracing::Span + Send + Sync + 'static>; diff --git a/tonic/src/transport/service/connection.rs b/tonic/src/transport/service/connection.rs index 46a88dda5..10a45c0ec 100644 --- a/tonic/src/transport/service/connection.rs +++ b/tonic/src/transport/service/connection.rs @@ -5,8 +5,8 @@ use crate::{ }; use http::Uri; use hyper::client::conn::Builder; -use hyper::client::connect::Connection as HyperConnection; use hyper::client::service::Connect as HyperConnect; +use hyper_util::client::legacy::connect::Connection as HyperConnection; use std::{ fmt, task::{Context, Poll}, @@ -126,3 +126,4 @@ impl fmt::Debug for Connection { f.debug_struct("Connection").finish() } } + diff --git a/tonic/src/transport/service/discover.rs b/tonic/src/transport/service/discover.rs index 2d23ca74c..fb31c4f92 100644 --- a/tonic/src/transport/service/discover.rs +++ b/tonic/src/transport/service/discover.rs @@ -32,7 +32,7 @@ impl Stream for DynamicServiceStream { Poll::Pending | Poll::Ready(None) => Poll::Pending, Poll::Ready(Some(change)) => match change { Change::Insert(k, endpoint) => { - let mut http = hyper::client::connect::HttpConnector::new(); + let mut http = hyper_util::client::legacy::connect::HttpConnector::new(); http.set_nodelay(endpoint.tcp_nodelay); http.set_keepalive(endpoint.tcp_keepalive); http.set_connect_timeout(endpoint.connect_timeout); diff --git a/tonic/src/transport/service/io.rs b/tonic/src/transport/service/io.rs index 2230b9b2e..e5e75287a 100644 --- a/tonic/src/transport/service/io.rs +++ b/tonic/src/transport/service/io.rs @@ -1,5 +1,5 @@ use crate::transport::server::Connected; -use hyper::client::connect::{Connected as HyperConnected, Connection}; +use hyper_util::client::legacy::connect::{Connected as HyperConnected, Connection}; use std::io; use std::io::IoSlice; use std::pin::Pin; From 16b6ae2ce698332b3da3ce74d112c005036f2ee0 Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Wed, 6 Dec 2023 23:39:25 -0500 Subject: [PATCH 03/15] Use Axum Request and Response in transport This commit is primarily converting Request and Response types within the transport module to Axum 0.7 Request/Response. There is still more to come to finish this conversion. There are also small changes such as updating the hyper service builder syntax. Over the course of this commit,it was discovered that hyper-util is missing `http2_max_pending_accept_reset_streams`. --- tonic/src/transport/channel/mod.rs | 22 ++++++++------------ tonic/src/transport/server/mod.rs | 23 ++++++++++++--------- tonic/src/transport/service/connection.rs | 25 ++++++++--------------- tonic/src/transport/service/router.rs | 25 ++++++++++------------- 4 files changed, 42 insertions(+), 53 deletions(-) diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index 94ed09953..b1bbc6046 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -10,13 +10,9 @@ pub use endpoint::Endpoint; pub use tls::ClientTlsConfig; use super::service::{Connection, DynamicServiceStream, SharedExec}; -use crate::body::BoxBody; use crate::transport::Executor; use bytes::Bytes; -use http::{ - uri::{InvalidUri, Uri}, - Request, Response, -}; +use http::uri::{InvalidUri, Uri}; use hyper_util::client::legacy::connect::Connection as HyperConnection; use std::{ fmt, @@ -30,6 +26,7 @@ use tokio::{ sync::mpsc::{channel, Sender}, }; +use axum::{extract::Request, response::Response, body::Body}; use tower::balance::p2c::Balance; use tower::{ buffer::{self, Buffer}, @@ -38,7 +35,7 @@ use tower::{ Service, }; -type Svc = Either, Response, crate::Error>>; +type Svc = Either>; const DEFAULT_BUFFER_SIZE: usize = 1024; @@ -67,14 +64,14 @@ const DEFAULT_BUFFER_SIZE: usize = 1024; /// cloning the `Channel` type is cheap and encouraged. #[derive(Clone)] pub struct Channel { - svc: Buffer>, + svc: Buffer, } /// A future that resolves to an HTTP response. /// /// This is returned by the `Service::call` on [`Channel`]. pub struct ResponseFuture { - inner: buffer::future::ResponseFuture<>>::Future>, + inner: buffer::future::ResponseFuture<>::Future>, } impl Channel { @@ -200,8 +197,8 @@ impl Channel { } } -impl Service> for Channel { - type Response = http::Response; +impl Service for Channel { + type Response = Response; type Error = super::Error; type Future = ResponseFuture; @@ -209,7 +206,7 @@ impl Service> for Channel { Service::poll_ready(&mut self.svc, cx).map_err(super::Error::from_source) } - fn call(&mut self, request: http::Request) -> Self::Future { + fn call(&mut self, request: Request) -> Self::Future { let inner = Service::call(&mut self.svc, request); ResponseFuture { inner } @@ -217,7 +214,7 @@ impl Service> for Channel { } impl Future for ResponseFuture { - type Output = Result, super::Error>; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let val = ready!(Pin::new(&mut self.inner).poll(cx)).map_err(super::Error::from_source)?; @@ -236,4 +233,3 @@ impl fmt::Debug for ResponseFuture { f.debug_struct("ResponseFuture").finish() } } - diff --git a/tonic/src/transport/server/mod.rs b/tonic/src/transport/server/mod.rs index 9123e98eb..7835fd9ab 100644 --- a/tonic/src/transport/server/mod.rs +++ b/tonic/src/transport/server/mod.rs @@ -13,6 +13,7 @@ pub use super::service::Routes; pub use super::service::RoutesBuilder; pub use conn::{Connected, TcpConnectInfo}; +use hyper_util::rt::TokioExecutor; #[cfg(feature = "tls")] pub use tls::ServerTlsConfig; @@ -534,16 +535,17 @@ impl Server { _io: PhantomData, }; - let server = hyper::Server::builder(incoming) - .http2_only(http2_only) - .http2_initial_connection_window_size(init_connection_window_size) - .http2_initial_stream_window_size(init_stream_window_size) - .http2_max_concurrent_streams(max_concurrent_streams) - .http2_keep_alive_interval(http2_keepalive_interval) - .http2_keep_alive_timeout(http2_keepalive_timeout) - .http2_adaptive_window(http2_adaptive_window.unwrap_or_default()) - .http2_max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams) - .http2_max_frame_size(max_frame_size); + let server = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()) + .http2() + .initial_connection_window_size(init_connection_window_size) + .initial_stream_window_size(init_stream_window_size) + .max_concurrent_streams(max_concurrent_streams) + .keep_alive_interval(http2_keepalive_interval) + .keep_alive_timeout(http2_keepalive_timeout) + .adaptive_window(http2_adaptive_window.unwrap_or_default()) + // FIXME: wait for this to be added to hyper-util + //.max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams) + .max_frame_size(max_frame_size); if let Some(signal) = signal { server @@ -885,3 +887,4 @@ where future::ready(Ok(svc)) } } + diff --git a/tonic/src/transport/service/connection.rs b/tonic/src/transport/service/connection.rs index 10a45c0ec..6184de983 100644 --- a/tonic/src/transport/service/connection.rs +++ b/tonic/src/transport/service/connection.rs @@ -1,12 +1,8 @@ use super::{grpc_timeout::GrpcTimeout, reconnect::Reconnect, AddOrigin, UserAgent}; -use crate::{ - body::BoxBody, - transport::{BoxFuture, Endpoint}, -}; +use crate::transport::{BoxFuture, Endpoint}; use http::Uri; -use hyper::client::conn::Builder; -use hyper::client::service::Connect as HyperConnect; -use hyper_util::client::legacy::connect::Connection as HyperConnection; +use hyper::client::conn::http2::Builder; +use hyper_util::client::legacy::connect::{Connect as HyperConnect, Connection as HyperConnection}; use std::{ fmt, task::{Context, Poll}, @@ -21,9 +17,8 @@ use tower::{ }; use tower_service::Service; -pub(crate) type Request = http::Request; -pub(crate) type Response = http::Response; - +pub(crate) type Request = axum::extract::Request; +pub(crate) type Response = axum::response::Response; pub(crate) struct Connection { inner: BoxService, } @@ -36,12 +31,10 @@ impl Connection { C::Future: Unpin + Send, C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, { - let mut settings = Builder::new() - .http2_initial_stream_window_size(endpoint.init_stream_window_size) - .http2_initial_connection_window_size(endpoint.init_connection_window_size) - .http2_only(true) - .http2_keep_alive_interval(endpoint.http2_keep_alive_interval) - .executor(endpoint.executor.clone()) + let mut settings = Builder::new(endpoint.executor) + .initial_stream_window_size(endpoint.init_stream_window_size) + .initial_connection_window_size(endpoint.init_connection_window_size) + .keep_alive_interval(endpoint.http2_keep_alive_interval) .clone(); if let Some(val) = endpoint.http2_keep_alive_timeout { diff --git a/tonic/src/transport/service/router.rs b/tonic/src/transport/service/router.rs index 85636c4d4..ab3d43978 100644 --- a/tonic/src/transport/service/router.rs +++ b/tonic/src/transport/service/router.rs @@ -1,9 +1,5 @@ -use crate::{ - body::{boxed, BoxBody}, - server::NamedService, -}; -use http::{Request, Response}; -use hyper::Body; +use crate::{body::boxed, server::NamedService}; +use axum::{extract::Request, response::Response}; use pin_project::pin_project; use std::{ convert::Infallible, @@ -31,7 +27,7 @@ impl RoutesBuilder { /// Add a new service. pub fn add_service(&mut self, svc: S) -> &mut Self where - S: Service, Response = Response, Error = Infallible> + S: Service + NamedService + Clone + Send @@ -53,7 +49,7 @@ impl Routes { /// Create a new routes with `svc` already added to it. pub fn new(svc: S) -> Self where - S: Service, Response = Response, Error = Infallible> + S: Service + NamedService + Clone + Send @@ -68,7 +64,7 @@ impl Routes { /// Add a new service. pub fn add_service(mut self, svc: S) -> Self where - S: Service, Response = Response, Error = Infallible> + S: Service + NamedService + Clone + Send @@ -103,8 +99,8 @@ async fn unimplemented() -> impl axum::response::IntoResponse { (status, headers) } -impl Service> for Routes { - type Response = Response; +impl Service for Routes { + type Response = Response; type Error = crate::Error; type Future = RoutesFuture; @@ -113,13 +109,13 @@ impl Service> for Routes { Poll::Ready(Ok(())) } - fn call(&mut self, req: Request) -> Self::Future { + fn call(&mut self, req: Request) -> Self::Future { RoutesFuture(self.router.call(req)) } } #[pin_project] -pub struct RoutesFuture(#[pin] axum::routing::future::RouteFuture); +pub struct RoutesFuture(#[pin] axum::routing::future::RouteFuture); impl fmt::Debug for RoutesFuture { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -128,7 +124,7 @@ impl fmt::Debug for RoutesFuture { } impl Future for RoutesFuture { - type Output = Result, crate::Error>; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match ready!(self.project().0.poll(cx)) { @@ -137,3 +133,4 @@ impl Future for RoutesFuture { } } } + From a3f9d8021fcaba0f079e584bd90fb1e93964f586 Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Wed, 6 Dec 2023 23:52:39 -0500 Subject: [PATCH 04/15] Chop out AddrStream Replaced with `tokio::net::TcpStream`. Inspired by https://github.com/hyperium/hyper/issues/2850 --- tonic/src/transport/server/conn.rs | 12 ------------ tonic/src/transport/server/incoming.rs | 9 +++------ 2 files changed, 3 insertions(+), 18 deletions(-) diff --git a/tonic/src/transport/server/conn.rs b/tonic/src/transport/server/conn.rs index 907cf4965..37bcc561b 100644 --- a/tonic/src/transport/server/conn.rs +++ b/tonic/src/transport/server/conn.rs @@ -1,4 +1,3 @@ -use hyper::server::conn::AddrStream; use std::net::SocketAddr; use tokio::net::TcpStream; @@ -86,17 +85,6 @@ impl TcpConnectInfo { } } -impl Connected for AddrStream { - type ConnectInfo = TcpConnectInfo; - - fn connect_info(&self) -> Self::ConnectInfo { - TcpConnectInfo { - local_addr: Some(self.local_addr()), - remote_addr: Some(self.remote_addr()), - } - } -} - impl Connected for TcpStream { type ConnectInfo = TcpConnectInfo; diff --git a/tonic/src/transport/server/incoming.rs b/tonic/src/transport/server/incoming.rs index 61aadc93d..ce7175173 100644 --- a/tonic/src/transport/server/incoming.rs +++ b/tonic/src/transport/server/incoming.rs @@ -1,9 +1,5 @@ use super::{Connected, Server}; use crate::transport::service::ServerIo; -use hyper::server::{ - accept::Accept, - conn::{AddrIncoming, AddrStream}, -}; use std::{ net::SocketAddr, pin::Pin, @@ -12,7 +8,7 @@ use std::{ }; use tokio::{ io::{AsyncRead, AsyncWrite}, - net::TcpListener, + net::{TcpListener, TcpStream}, }; use tokio_stream::{Stream, StreamExt}; @@ -187,7 +183,7 @@ impl TcpIncoming { } impl Stream for TcpIncoming { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.inner).poll_accept(cx) @@ -207,3 +203,4 @@ mod tests { let _t3 = TcpIncoming::new(addr, true, None).unwrap(); } } + From c54b484de2b6fa90946a54e0d317a7f3eb8e89bf Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Thu, 7 Dec 2023 00:01:44 -0500 Subject: [PATCH 05/15] Replace AddrIncoming with TcpListener --- tonic/src/transport/server/incoming.rs | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/tonic/src/transport/server/incoming.rs b/tonic/src/transport/server/incoming.rs index ce7175173..dc42bc002 100644 --- a/tonic/src/transport/server/incoming.rs +++ b/tonic/src/transport/server/incoming.rs @@ -123,7 +123,7 @@ enum SelectOutput { /// of `AsyncRead + AsyncWrite` that communicate with clients that connect to a socket address. #[derive(Debug)] pub struct TcpIncoming { - inner: AddrIncoming, + inner: TcpListener, } impl TcpIncoming { @@ -163,22 +163,16 @@ impl TcpIncoming { nodelay: bool, keepalive: Option, ) -> Result { - let mut inner = AddrIncoming::bind(&addr)?; + let mut inner = TcpListener::bind(&addr)?; inner.set_nodelay(nodelay); inner.set_keepalive(keepalive); Ok(TcpIncoming { inner }) } +} - /// Creates a new `TcpIncoming` from an existing `tokio::net::TcpListener`. - pub fn from_listener( - listener: TcpListener, - nodelay: bool, - keepalive: Option, - ) -> Result { - let mut inner = AddrIncoming::from_listener(listener)?; - inner.set_nodelay(nodelay); - inner.set_keepalive(keepalive); - Ok(TcpIncoming { inner }) +impl From for TcpIncoming { + fn from(inner: TcpListener) -> Self { + TcpIncoming { inner } } } From e0b454cd1e1f80a8adecfbe726b39993de6aec0d Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Thu, 7 Dec 2023 00:35:05 -0500 Subject: [PATCH 06/15] Update impl of http_body::Body - use `Frame`. - update `poll_data` to `poll_frame` - remove `poll_trailers` in most places I am not sure that a simple rename of `poll_data` to `poll_frame` is right. I also left one instance of `poll_trailers` in *tonic/src/codec/decode.rs*. --- tonic/src/codec/encode.rs | 14 ++++---------- tonic/src/codec/prost.rs | 14 ++++---------- tonic/src/service/interceptor.rs | 21 ++++++++------------- 3 files changed, 16 insertions(+), 33 deletions(-) diff --git a/tonic/src/codec/encode.rs b/tonic/src/codec/encode.rs index 13eb2c96d..ccb2a7945 100644 --- a/tonic/src/codec/encode.rs +++ b/tonic/src/codec/encode.rs @@ -3,7 +3,7 @@ use super::{EncodeBuf, Encoder, DEFAULT_MAX_SEND_MESSAGE_SIZE, HEADER_SIZE}; use crate::{Code, Status}; use bytes::{BufMut, Bytes, BytesMut}; use http::HeaderMap; -use http_body::Body; +use http_body::{Body, Frame}; use pin_project::pin_project; use std::{ pin::Pin, @@ -319,10 +319,10 @@ where self.state.is_end_stream } - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll, Self::Error>>> { let self_proj = self.project(); match ready!(self_proj.inner.poll_next(cx)) { Some(Ok(d)) => Some(Ok(d)).into(), @@ -336,11 +336,5 @@ where None => None.into(), } } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, Status>> { - Poll::Ready(self.project().state.trailers()) - } } + diff --git a/tonic/src/codec/prost.rs b/tonic/src/codec/prost.rs index d2f1652f4..74e307352 100644 --- a/tonic/src/codec/prost.rs +++ b/tonic/src/codec/prost.rs @@ -268,7 +268,7 @@ mod tests { mod body { use crate::Status; use bytes::Bytes; - use http_body::Body; + use http_body::{Body, Frame}; use std::{ pin::Pin, task::{Context, Poll}, @@ -299,10 +299,10 @@ mod tests { type Data = Bytes; type Error = Status; - fn poll_data( + fn poll_frame( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll, Self::Error>>> { // every other call to poll_data returns data let should_send = self.count % 2 == 0; let data_len = self.data.len(); @@ -325,13 +325,7 @@ mod tests { Poll::Ready(None) } } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - Poll::Ready(Ok(None)) - } } } } + diff --git a/tonic/src/service/interceptor.rs b/tonic/src/service/interceptor.rs index cadff466f..a23b5aede 100644 --- a/tonic/src/service/interceptor.rs +++ b/tonic/src/service/interceptor.rs @@ -232,7 +232,8 @@ where mod tests { #[allow(unused_imports)] use super::*; - use http::header::HeaderMap; + use http_body::Frame; + use http_body_util::Empty; use std::{ pin::Pin, task::{Context, Poll}, @@ -246,19 +247,12 @@ mod tests { type Data = Bytes; type Error = Status; - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, _cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll, Self::Error>>> { Poll::Ready(None) } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - Poll::Ready(Ok(None)) - } } #[tokio::test] @@ -318,19 +312,20 @@ mod tests { #[tokio::test] async fn doesnt_change_http_method() { - let svc = tower::service_fn(|request: http::Request| async move { + let svc = tower::service_fn(|request: http::Request| async move { assert_eq!(request.method(), http::Method::OPTIONS); - Ok::<_, hyper::Error>(hyper::Response::new(hyper::Body::empty())) + Ok::<_, hyper::Error>(hyper::Response::new(Empty::new())) }); let svc = InterceptedService::new(svc, Ok); let request = http::Request::builder() .method(http::Method::OPTIONS) - .body(hyper::Body::empty()) + .body(Empty::new()) .unwrap(); svc.oneshot(request).await.unwrap(); } } + From ba2299f08eb94a82fd233376b8f01efea9ee114d Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Thu, 7 Dec 2023 01:22:40 -0500 Subject: [PATCH 07/15] More http2 method renames --- tonic/src/transport/service/connection.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tonic/src/transport/service/connection.rs b/tonic/src/transport/service/connection.rs index 6184de983..170000c37 100644 --- a/tonic/src/transport/service/connection.rs +++ b/tonic/src/transport/service/connection.rs @@ -38,15 +38,15 @@ impl Connection { .clone(); if let Some(val) = endpoint.http2_keep_alive_timeout { - settings.http2_keep_alive_timeout(val); + settings.keep_alive_timeout(val); } if let Some(val) = endpoint.http2_keep_alive_while_idle { - settings.http2_keep_alive_while_idle(val); + settings.keep_alive_while_idle(val); } if let Some(val) = endpoint.http2_adaptive_window { - settings.http2_adaptive_window(val); + settings.adaptive_window(val); } let stack = ServiceBuilder::new() From 85da9ef87b231afd40c09f8117d9a11280dd79b3 Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Thu, 7 Dec 2023 01:23:04 -0500 Subject: [PATCH 08/15] Use BodyExt trait --- tonic/src/body.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tonic/src/body.rs b/tonic/src/body.rs index 624b5f692..568849bf2 100644 --- a/tonic/src/body.rs +++ b/tonic/src/body.rs @@ -1,5 +1,7 @@ //! HTTP specific body utilities. +use http_body_util::BodyExt; + /// A type erased HTTP body used for tonic services. pub type BoxBody = http_body_util::combinators::UnsyncBoxBody; @@ -18,3 +20,4 @@ pub fn empty_body() -> BoxBody { .map_err(|err| match err {}) .boxed_unsync() } + From 686dbf5c214f01757ca413b18fe3f09dc19f4701 Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Thu, 7 Dec 2023 01:31:44 -0500 Subject: [PATCH 09/15] Remove `hyper::Error::is_connect` `is_connect` was deprecated when the higher-level client was removed from hyper. The corresponding comments are removed as well. --- tonic/src/status.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tonic/src/status.rs b/tonic/src/status.rs index da8b792e5..d0ba18a34 100644 --- a/tonic/src/status.rs +++ b/tonic/src/status.rs @@ -412,13 +412,7 @@ impl Status { // > status. Note that the frequency of PINGs is highly dependent on the network // > environment, implementations are free to adjust PING frequency based on network and // > application requirements, which is why it's mapped to unavailable here. - // - // Likewise, if we are unable to connect to the server, map this to UNAVAILABLE. This is - // consistent with the behavior of a C++ gRPC client when the server is not running, and - // matches the spec of: - // > The service is currently unavailable. This is most likely a transient condition that - // > can be corrected if retried with a backoff. - if err.is_timeout() || err.is_connect() { + if err.is_timeout() { return Some(Status::unavailable(err.to_string())); } @@ -1009,3 +1003,4 @@ mod tests { assert_eq!(status.details(), DETAILS); } } + From 5e32e00f9b8425ccb0b9985ac4b373295701df82 Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Thu, 7 Dec 2023 01:34:17 -0500 Subject: [PATCH 10/15] Add Clone to Extensions::insert `http::Extensions::insert` requires `T: Clone` so we add it. --- tonic/src/extensions.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tonic/src/extensions.rs b/tonic/src/extensions.rs index 67a7137be..896a9f873 100644 --- a/tonic/src/extensions.rs +++ b/tonic/src/extensions.rs @@ -24,7 +24,7 @@ impl Extensions { /// If a extension of this type already existed, it will /// be returned. #[inline] - pub fn insert(&mut self, val: T) -> Option { + pub fn insert(&mut self, val: T) -> Option { self.inner.insert(val) } @@ -95,3 +95,4 @@ impl GrpcMethod { self.method } } + From cd056733739c6b419849386ad41e686962105a93 Mon Sep 17 00:00:00 2001 From: Ludea Date: Tue, 19 Dec 2023 06:50:59 +0000 Subject: [PATCH 11/15] Use TcpStream instead of hyper connect --- tonic/src/transport/server/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tonic/src/transport/server/mod.rs b/tonic/src/transport/server/mod.rs index 7835fd9ab..b78bae881 100644 --- a/tonic/src/transport/server/mod.rs +++ b/tonic/src/transport/server/mod.rs @@ -41,7 +41,7 @@ use crate::server::NamedService; use bytes::Bytes; use http::{Request, Response}; use http_body::Body as _; -use hyper::{server::accept, Body}; +use hyper::Body; use pin_project::pin_project; use std::{ convert::Infallible, @@ -55,6 +55,7 @@ use std::{ time::Duration, }; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::TcpStream; use tokio_stream::Stream; use tower::{ layer::util::{Identity, Stack}, @@ -525,7 +526,7 @@ impl Server { let svc = self.service_builder.service(svc); let tcp = incoming::tcp_incoming(incoming, self); - let incoming = accept::from_stream::<_, _, crate::Error>(tcp); + let incoming = TcpStream::accept::from_stream::<_, _, crate::Error>(tcp); let svc = MakeSvc { inner: svc, From 46ee6280e44b4d84899ce4efc7f6872f2f99ca5d Mon Sep 17 00:00:00 2001 From: Ludea Date: Tue, 19 Dec 2023 06:43:16 +0000 Subject: [PATCH 12/15] add poll_frame, delete poll_trailers --- tonic/src/transport/server/recover_error.rs | 24 ++++++--------------- 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/tonic/src/transport/server/recover_error.rs b/tonic/src/transport/server/recover_error.rs index 6d7e55bf4..20c583aac 100644 --- a/tonic/src/transport/server/recover_error.rs +++ b/tonic/src/transport/server/recover_error.rs @@ -98,30 +98,18 @@ where type Data = B::Data; type Error = B::Error; - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { - match self.project().inner.as_pin_mut() { - Some(b) => b.poll_data(cx), - None => Poll::Ready(None), - } + ) -> Poll, Self::Error>>> { + Pin::new(&mut self.0).poll_frame(cx) } - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - match self.project().inner.as_pin_mut() { - Some(b) => b.poll_trailers(cx), - None => Poll::Ready(Ok(None)), - } + fn size_hint(&self) -> http_body::SizeHint { + self.0.size_hint() } fn is_end_stream(&self) -> bool { - match &self.inner { - Some(b) => b.is_end_stream(), - None => true, - } + self.body.is_end_stream() } } From 60eef43ecb9132819a11614d52f5a2224dfdf123 Mon Sep 17 00:00:00 2001 From: Ivan Krivosheev Date: Fri, 12 Jan 2024 20:58:18 +0300 Subject: [PATCH 13/15] Fixed incoming --- tonic/src/body.rs | 1 - tonic/src/codec/encode.rs | 10 +++--- tonic/src/service/interceptor.rs | 3 +- tonic/src/transport/server/incoming.rs | 40 +++++++++++++-------- tonic/src/transport/server/recover_error.rs | 26 +++++++++----- 5 files changed, 50 insertions(+), 30 deletions(-) diff --git a/tonic/src/body.rs b/tonic/src/body.rs index 568849bf2..428c0dade 100644 --- a/tonic/src/body.rs +++ b/tonic/src/body.rs @@ -20,4 +20,3 @@ pub fn empty_body() -> BoxBody { .map_err(|err| match err {}) .boxed_unsync() } - diff --git a/tonic/src/codec/encode.rs b/tonic/src/codec/encode.rs index ccb2a7945..fdaa450ff 100644 --- a/tonic/src/codec/encode.rs +++ b/tonic/src/codec/encode.rs @@ -315,17 +315,13 @@ where type Data = Bytes; type Error = Status; - fn is_end_stream(&self) -> bool { - self.state.is_end_stream - } - fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>>> { let self_proj = self.project(); match ready!(self_proj.inner.poll_next(cx)) { - Some(Ok(d)) => Some(Ok(d)).into(), + Some(Ok(d)) => Some(Ok(Frame::data(d))).into(), Some(Err(status)) => match self_proj.state.role { Role::Client => Some(Err(status)).into(), Role::Server => { @@ -336,5 +332,9 @@ where None => None.into(), } } + + fn is_end_stream(&self) -> bool { + self.state.is_end_stream + } } diff --git a/tonic/src/service/interceptor.rs b/tonic/src/service/interceptor.rs index a23b5aede..5f6a1b42b 100644 --- a/tonic/src/service/interceptor.rs +++ b/tonic/src/service/interceptor.rs @@ -312,7 +312,7 @@ mod tests { #[tokio::test] async fn doesnt_change_http_method() { - let svc = tower::service_fn(|request: http::Request| async move { + let svc = tower::service_fn(|request: http::Request>| async move { assert_eq!(request.method(), http::Method::OPTIONS); Ok::<_, hyper::Error>(hyper::Response::new(Empty::new())) @@ -328,4 +328,3 @@ mod tests { svc.oneshot(request).await.unwrap(); } } - diff --git a/tonic/src/transport/server/incoming.rs b/tonic/src/transport/server/incoming.rs index dc42bc002..af60d319c 100644 --- a/tonic/src/transport/server/incoming.rs +++ b/tonic/src/transport/server/incoming.rs @@ -1,7 +1,7 @@ use super::{Connected, Server}; use crate::transport::service::ServerIo; use std::{ - net::SocketAddr, + net::{SocketAddr, TcpListener as StdTcpListener}, pin::Pin, task::{Context, Poll}, time::Duration, @@ -10,6 +10,7 @@ use tokio::{ io::{AsyncRead, AsyncWrite}, net::{TcpListener, TcpStream}, }; +use tokio_stream::wrappers::TcpListenerStream; use tokio_stream::{Stream, StreamExt}; #[cfg(not(feature = "tls"))] @@ -123,7 +124,9 @@ enum SelectOutput { /// of `AsyncRead + AsyncWrite` that communicate with clients that connect to a socket address. #[derive(Debug)] pub struct TcpIncoming { - inner: TcpListener, + inner: TcpListenerStream, + tcp_keepalive_timeout: Option, + tcp_nodelay: bool, } impl TcpIncoming { @@ -160,19 +163,29 @@ impl TcpIncoming { /// # } pub fn new( addr: SocketAddr, - nodelay: bool, - keepalive: Option, + tcp_nodelay: bool, + tcp_keepalive_timeout: Option, ) -> Result { - let mut inner = TcpListener::bind(&addr)?; - inner.set_nodelay(nodelay); - inner.set_keepalive(keepalive); - Ok(TcpIncoming { inner }) + let std_listener = StdTcpListener::bind(addr)?; + let inner = TcpListenerStream::new(TcpListener::from_std(std_listener)?); + Ok(Self { + inner, + tcp_nodelay, + tcp_keepalive_timeout, + }) } -} -impl From for TcpIncoming { - fn from(inner: TcpListener) -> Self { - TcpIncoming { inner } + /// Creates a new `TcpIncoming` from an existing `tokio::net::TcpListener`. + pub fn from_listener( + listener: TcpListener, + tcp_nodelay: bool, + tcp_keepalive_timeout: Option, + ) -> Result { + Ok(Self { + inner: TcpListenerStream::new(listener), + tcp_nodelay, + tcp_keepalive_timeout, + }) } } @@ -180,7 +193,7 @@ impl Stream for TcpIncoming { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_accept(cx) + Pin::new(&mut self.inner).poll_next(cx) } } @@ -197,4 +210,3 @@ mod tests { let _t3 = TcpIncoming::new(addr, true, None).unwrap(); } } - diff --git a/tonic/src/transport/server/recover_error.rs b/tonic/src/transport/server/recover_error.rs index 20c583aac..d976b167e 100644 --- a/tonic/src/transport/server/recover_error.rs +++ b/tonic/src/transport/server/recover_error.rs @@ -1,5 +1,6 @@ use crate::Status; use http::Response; +use http_body::{Body, Frame, SizeHint}; use pin_project::pin_project; use std::{ future::Future, @@ -91,9 +92,9 @@ impl MaybeEmptyBody { } } -impl http_body::Body for MaybeEmptyBody +impl Body for MaybeEmptyBody where - B: http_body::Body + Send, + B: Body + Send, { type Data = B::Data; type Error = B::Error; @@ -101,15 +102,24 @@ where fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll, Self::Error>>> { - Pin::new(&mut self.0).poll_frame(cx) + ) -> Poll, Self::Error>>> { + match self.project().inner.as_pin_mut() { + Some(b) => b.poll_frame(cx), + None => Poll::Ready(None), + } } - fn size_hint(&self) -> http_body::SizeHint { - self.0.size_hint() + fn is_end_stream(&self) -> bool { + match &self.inner { + Some(b) => b.is_end_stream(), + None => true, + } } - fn is_end_stream(&self) -> bool { - self.body.is_end_stream() + fn size_hint(&self) -> SizeHint { + match &self.inner { + Some(b) => b.size_hint(), + None => SizeHint::default(), + } } } From 03a428d3eac4e7b2bdaa838f186392345e5ad137 Mon Sep 17 00:00:00 2001 From: Ivan Krivosheev Date: Sat, 13 Jan 2024 17:53:22 +0300 Subject: [PATCH 14/15] Fixed --- tonic/Cargo.toml | 2 +- tonic/benches/decode.rs | 29 +++--- tonic/src/codec/decode.rs | 7 +- tonic/src/codec/encode.rs | 1 - tonic/src/codec/prost.rs | 1 - tonic/src/extensions.rs | 1 - tonic/src/status.rs | 1 - tonic/src/transport/channel/endpoint.rs | 12 +-- tonic/src/transport/channel/mod.rs | 12 +-- tonic/src/transport/mod.rs | 5 +- tonic/src/transport/server/conn.rs | 9 +- tonic/src/transport/server/mod.rs | 112 ++++++++++------------ tonic/src/transport/service/connection.rs | 12 +-- tonic/src/transport/service/connector.rs | 31 +++--- tonic/src/transport/service/io.rs | 28 +++--- tonic/src/transport/service/router.rs | 7 +- 16 files changed, 130 insertions(+), 140 deletions(-) diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index 747635056..aa070f3ac 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -83,7 +83,7 @@ h2 = { version = "0.4", optional = true } hyper = { version = "1.0", features = ["full"], optional = true } hyper-util = { version = "0.1", features = ["full"] } hyper-timeout = { version = "0.5", optional = true } -tokio-stream = "0.1" +tokio-stream = { version = "0.1", features = ["net"] } tower = { version = "0.4.7", default-features = false, features = [ "balance", "buffer", diff --git a/tonic/benches/decode.rs b/tonic/benches/decode.rs index 5c7cd0159..ca9984c51 100644 --- a/tonic/benches/decode.rs +++ b/tonic/benches/decode.rs @@ -1,11 +1,13 @@ -use bencher::{benchmark_group, benchmark_main, Bencher}; -use bytes::{Buf, BufMut, Bytes, BytesMut}; -use http_body::Body; use std::{ fmt::{Error, Formatter}, pin::Pin, task::{Context, Poll}, }; + +use bencher::{benchmark_group, benchmark_main, Bencher}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use http_body::{Body, Frame, SizeHint}; + use tonic::{codec::DecodeBuf, codec::Decoder, Status, Streaming}; macro_rules! bench { @@ -58,23 +60,24 @@ impl Body for MockBody { type Data = Bytes; type Error = Status; - fn poll_data( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll>> { + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { if self.data.has_remaining() { let split = std::cmp::min(self.chunk_size, self.data.remaining()); - Poll::Ready(Some(Ok(self.data.split_to(split)))) + Poll::Ready(Some(Ok(Frame::data(self.data.split_to(split))))) } else { Poll::Ready(None) } } - fn poll_trailers( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll, Self::Error>> { - Poll::Ready(Ok(None)) + fn is_end_stream(&self) -> bool { + !self.data.is_empty() + } + + fn size_hint(&self) -> SizeHint { + SizeHint::with_exact(self.data.len() as u64) } } diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index cb88a0649..4ba5318de 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -4,6 +4,7 @@ use crate::{body::BoxBody, metadata::MetadataMap, Code, Status}; use bytes::{Buf, BufMut, BytesMut}; use http::StatusCode; use http_body::Body; +use http_body_util::BodyExt; use std::{ fmt, future, pin::Pin, @@ -122,7 +123,9 @@ impl Streaming { decoder: Box::new(decoder), inner: StreamingInner { body: body - .map_data(|mut buf| buf.copy_to_bytes(buf.remaining())) + .map_frame(|mut frame| { + frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining())) + }) .map_err(|err| Status::map_error(err.into())) .boxed_unsync(), state: State::ReadHeader, @@ -231,7 +234,7 @@ impl StreamingInner { // Returns Some(()) if data was found or None if the loop in `poll_next` should break fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll, Status>> { - let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) { + let chunk = match ready!(Pin::new(&mut self.body).poll_frame(cx)) { Some(Ok(d)) => Some(d), Some(Err(status)) => { if self.direction == Direction::Request && status.code() == Code::Cancelled { diff --git a/tonic/src/codec/encode.rs b/tonic/src/codec/encode.rs index fdaa450ff..5586cc8de 100644 --- a/tonic/src/codec/encode.rs +++ b/tonic/src/codec/encode.rs @@ -337,4 +337,3 @@ where self.state.is_end_stream } } - diff --git a/tonic/src/codec/prost.rs b/tonic/src/codec/prost.rs index 74e307352..5b85ba24d 100644 --- a/tonic/src/codec/prost.rs +++ b/tonic/src/codec/prost.rs @@ -328,4 +328,3 @@ mod tests { } } } - diff --git a/tonic/src/extensions.rs b/tonic/src/extensions.rs index 896a9f873..f74ec8910 100644 --- a/tonic/src/extensions.rs +++ b/tonic/src/extensions.rs @@ -95,4 +95,3 @@ impl GrpcMethod { self.method } } - diff --git a/tonic/src/status.rs b/tonic/src/status.rs index d0ba18a34..2e691df0c 100644 --- a/tonic/src/status.rs +++ b/tonic/src/status.rs @@ -1003,4 +1003,3 @@ mod tests { assert_eq!(status.details(), DETAILS); } } - diff --git a/tonic/src/transport/channel/endpoint.rs b/tonic/src/transport/channel/endpoint.rs index ac23de98e..4636b0fee 100644 --- a/tonic/src/transport/channel/endpoint.rs +++ b/tonic/src/transport/channel/endpoint.rs @@ -7,9 +7,9 @@ use crate::transport::service::TlsConnector; use crate::transport::{service::SharedExec, Error, Executor}; use bytes::Bytes; use http::{uri::Uri, HeaderValue}; +use hyper::rt; use std::{fmt, future::Future, pin::Pin, str::FromStr, time::Duration}; -use tower::make::MakeConnection; -// use crate::transport::E +use tower_service::Service; /// Channel builder. /// @@ -359,8 +359,8 @@ impl Endpoint { /// The [`connect_timeout`](Endpoint::connect_timeout) will still be applied. pub async fn connect_with_connector(&self, connector: C) -> Result where - C: MakeConnection + Send + 'static, - C::Connection: Unpin + Send + 'static, + C: Service + Send + 'static, + C::Response: rt::Read + rt::Write + Send + Unpin + 'static, C::Future: Send + 'static, crate::Error: From + Send + 'static, { @@ -384,8 +384,8 @@ impl Endpoint { /// uses a Unix socket transport. pub fn connect_with_connector_lazy(&self, connector: C) -> Channel where - C: MakeConnection + Send + 'static, - C::Connection: Unpin + Send + 'static, + C: Service + Send + 'static, + C::Response: rt::Read + rt::Write + Send + Unpin + 'static, C::Future: Send + 'static, crate::Error: From + Send + 'static, { diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index b1bbc6046..420fd1f3c 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -21,12 +21,10 @@ use std::{ pin::Pin, task::{ready, Context, Poll}, }; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - sync::mpsc::{channel, Sender}, -}; +use tokio::sync::mpsc::{channel, Sender}; -use axum::{extract::Request, response::Response, body::Body}; +use axum::{body::Body, extract::Request, response::Response}; +use hyper::rt; use tower::balance::p2c::Balance; use tower::{ buffer::{self, Buffer}, @@ -149,7 +147,7 @@ impl Channel { C: Service + Send + 'static, C::Error: Into + Send, C::Future: Unpin + Send, - C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, + C::Response: rt::Read + rt::Write + HyperConnection + Unpin + Send + 'static, { let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE); let executor = endpoint.executor.clone(); @@ -166,7 +164,7 @@ impl Channel { C: Service + Send + 'static, C::Error: Into + Send, C::Future: Unpin + Send, - C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, + C::Response: rt::Read + rt::Write + HyperConnection + Unpin + Send + 'static, { let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE); let executor = endpoint.executor.clone(); diff --git a/tonic/src/transport/mod.rs b/tonic/src/transport/mod.rs index a0435c797..8c892e5bc 100644 --- a/tonic/src/transport/mod.rs +++ b/tonic/src/transport/mod.rs @@ -107,8 +107,9 @@ pub use self::service::grpc_timeout::TimeoutExpired; #[cfg(feature = "tls")] #[cfg_attr(docsrs, doc(cfg(feature = "tls")))] pub use self::tls::Certificate; -pub use axum::{body::BoxBody as AxumBoxBody, Router as AxumRouter}; -pub use hyper::{Body, Uri}; +pub use axum::{body::Body as AxumBoxBody, Router as AxumRouter}; +pub use hyper::body::Body; +pub use hyper::Uri; pub(crate) use self::service::executor::Executor; diff --git a/tonic/src/transport/server/conn.rs b/tonic/src/transport/server/conn.rs index 37bcc561b..11fcc4fd0 100644 --- a/tonic/src/transport/server/conn.rs +++ b/tonic/src/transport/server/conn.rs @@ -1,13 +1,14 @@ use std::net::SocketAddr; -use tokio::net::TcpStream; - -#[cfg(feature = "tls")] -use crate::transport::Certificate; #[cfg(feature = "tls")] use std::sync::Arc; + +use tokio::net::TcpStream; #[cfg(feature = "tls")] use tokio_rustls::server::TlsStream; +#[cfg(feature = "tls")] +use crate::transport::Certificate; + /// Trait that connected IO resources implement and use to produce info about the connection. /// /// The goal for this trait is to allow users to implement diff --git a/tonic/src/transport/server/mod.rs b/tonic/src/transport/server/mod.rs index b78bae881..768bd29f8 100644 --- a/tonic/src/transport/server/mod.rs +++ b/tonic/src/transport/server/mod.rs @@ -13,7 +13,7 @@ pub use super::service::Routes; pub use super::service::RoutesBuilder; pub use conn::{Connected, TcpConnectInfo}; -use hyper_util::rt::TokioExecutor; +use hyper_util::rt::{TokioExecutor, TokioIo}; #[cfg(feature = "tls")] pub use tls::ServerTlsConfig; @@ -36,12 +36,12 @@ use crate::transport::Error; use self::recover_error::RecoverError; use super::service::{GrpcTimeout, ServerIo}; -use crate::body::BoxBody; use crate::server::NamedService; +use axum::extract::Request; +use axum::response::Response; use bytes::Bytes; -use http::{Request, Response}; -use http_body::Body as _; -use hyper::Body; +use http_body_util::BodyExt; +use hyper::body::Body; use pin_project::pin_project; use std::{ convert::Infallible, @@ -55,7 +55,6 @@ use std::{ time::Duration, }; use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::net::TcpStream; use tokio_stream::Stream; use tower::{ layer::util::{Identity, Stack}, @@ -65,9 +64,8 @@ use tower::{ Service, ServiceBuilder, }; -type BoxHttpBody = http_body_util::combinators::UnsyncBoxBody; -type BoxService = tower::util::BoxService, Response, crate::Error>; -type TraceInterceptor = Arc) -> tracing::Span + Send + Sync + 'static>; +type BoxService = tower::util::BoxService; +type TraceInterceptor = Arc) -> tracing::Span + Send + Sync + 'static>; const DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS: u64 = 20; @@ -361,7 +359,7 @@ impl Server { /// route around different services. pub fn add_service(&mut self, svc: S) -> Router where - S: Service, Response = Response, Error = Infallible> + S: Service + NamedService + Clone + Send @@ -382,7 +380,7 @@ impl Server { /// As a result, one cannot use this to toggle between two identically named implementations. pub fn add_optional_service(&mut self, svc: Option) -> Router where - S: Service, Response = Response, Error = Infallible> + S: Service + NamedService + Clone + Send @@ -496,15 +494,15 @@ impl Server { ) -> Result<(), super::Error> where L: Layer, - L::Service: Service, Response = Response> + Clone + Send + 'static, - <>::Service as Service>>::Future: Send + 'static, - <>::Service as Service>>::Error: Into + Send, + L::Service: Service> + Clone + Send + 'static, + <>::Service as Service>::Future: Send + 'static, + <>::Service as Service>::Error: Into + Send, I: Stream>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IO::ConnectInfo: Clone + Send + Sync + 'static, IE: Into, F: Future, - ResBody: http_body::Body + Send + 'static, + ResBody: Body + Send + 'static, ResBody::Error: Into, { let trace_interceptor = self.trace_interceptor.clone(); @@ -525,9 +523,7 @@ impl Server { let svc = self.service_builder.service(svc); - let tcp = incoming::tcp_incoming(incoming, self); - let incoming = TcpStream::accept::from_stream::<_, _, crate::Error>(tcp); - + let incoming = incoming::tcp_incoming(incoming, self); let svc = MakeSvc { inner: svc, concurrency_limit, @@ -536,7 +532,9 @@ impl Server { _io: PhantomData, }; - let server = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()) + let mut builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()); + + builder .http2() .initial_connection_window_size(init_connection_window_size) .initial_stream_window_size(init_stream_window_size) @@ -548,15 +546,8 @@ impl Server { //.max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams) .max_frame_size(max_frame_size); - if let Some(signal) = signal { - server - .serve(svc) - .with_graceful_shutdown(signal) - .await - .map_err(super::Error::from_source)? - } else { - server.serve(svc).await.map_err(super::Error::from_source)?; - } + let io = TokioIo::new(incoming); + let connection = builder.serve_connection(io, svc); Ok(()) } @@ -572,7 +563,7 @@ impl Router { /// Add a new service to this router. pub fn add_service(mut self, svc: S) -> Self where - S: Service, Response = Response, Error = Infallible> + S: Service + NamedService + Clone + Send @@ -591,7 +582,7 @@ impl Router { #[allow(clippy::type_complexity)] pub fn add_optional_service(mut self, svc: Option) -> Self where - S: Service, Response = Response, Error = Infallible> + S: Service + NamedService + Clone + Send @@ -617,10 +608,10 @@ impl Router { pub async fn serve(self, addr: SocketAddr) -> Result<(), super::Error> where L: Layer, - L::Service: Service, Response = Response> + Clone + Send + 'static, - <>::Service as Service>>::Future: Send + 'static, - <>::Service as Service>>::Error: Into + Send, - ResBody: http_body::Body + Send + 'static, + L::Service: Service> + Clone + Send + 'static, + <>::Service as Service>::Future: Send + 'static, + <>::Service as Service>::Error: Into + Send, + ResBody: Body + Send + 'static, ResBody::Error: Into, { let incoming = TcpIncoming::new(addr, self.server.tcp_nodelay, self.server.tcp_keepalive) @@ -647,10 +638,10 @@ impl Router { ) -> Result<(), super::Error> where L: Layer, - L::Service: Service, Response = Response> + Clone + Send + 'static, - <>::Service as Service>>::Future: Send + 'static, - <>::Service as Service>>::Error: Into + Send, - ResBody: http_body::Body + Send + 'static, + L::Service: Service> + Clone + Send + 'static, + <>::Service as Service>::Future: Send + 'static, + <>::Service as Service>::Error: Into + Send, + ResBody: Body + Send + 'static, ResBody::Error: Into, { let incoming = TcpIncoming::new(addr, self.server.tcp_nodelay, self.server.tcp_keepalive) @@ -676,10 +667,10 @@ impl Router { IO::ConnectInfo: Clone + Send + Sync + 'static, IE: Into, L: Layer, - L::Service: Service, Response = Response> + Clone + Send + 'static, - <>::Service as Service>>::Future: Send + 'static, - <>::Service as Service>>::Error: Into + Send, - ResBody: http_body::Body + Send + 'static, + L::Service: Service> + Clone + Send + 'static, + <>::Service as Service>::Future: Send + 'static, + <>::Service as Service>::Error: Into + Send, + ResBody: Body + Send + 'static, ResBody::Error: Into, { self.server @@ -711,10 +702,10 @@ impl Router { IE: Into, F: Future, L: Layer, - L::Service: Service, Response = Response> + Clone + Send + 'static, - <>::Service as Service>>::Future: Send + 'static, - <>::Service as Service>>::Error: Into + Send, - ResBody: http_body::Body + Send + 'static, + L::Service: Service> + Clone + Send + 'static, + <>::Service as Service>::Future: Send + 'static, + <>::Service as Service>::Error: Into + Send, + ResBody: Body + Send + 'static, ResBody::Error: Into, { self.server @@ -726,10 +717,10 @@ impl Router { pub fn into_service(self) -> L::Service where L: Layer, - L::Service: Service, Response = Response> + Clone + Send + 'static, - <>::Service as Service>>::Future: Send + 'static, - <>::Service as Service>>::Error: Into + Send, - ResBody: http_body::Body + Send + 'static, + L::Service: Service> + Clone + Send + 'static, + <>::Service as Service>::Future: Send + 'static, + <>::Service as Service>::Error: Into + Send, + ResBody: Body + Send + 'static, ResBody::Error: Into, { self.server.service_builder.service(self.routes.prepare()) @@ -747,14 +738,14 @@ struct Svc { trace_interceptor: Option, } -impl Service> for Svc +impl Service for Svc where - S: Service, Response = Response>, + S: Service>, S::Error: Into, - ResBody: http_body::Body + Send + 'static, + ResBody: Body + Send + 'static, ResBody::Error: Into, { - type Response = Response; + type Response = Response; type Error = crate::Error; type Future = SvcFuture; @@ -762,7 +753,7 @@ where self.inner.poll_ready(cx).map_err(Into::into) } - fn call(&mut self, mut req: Request) -> Self::Future { + fn call(&mut self, mut req: Request) -> Self::Future { let span = if let Some(trace_interceptor) = &self.trace_interceptor { let (parts, body) = req.into_parts(); let bodyless_request = Request::from_parts(parts, ()); @@ -795,10 +786,10 @@ impl Future for SvcFuture where F: Future, E>>, E: Into, - ResBody: http_body::Body + Send + 'static, + ResBody: Body + Send + 'static, ResBody::Error: Into, { - type Output = Result, crate::Error>; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); @@ -827,10 +818,10 @@ struct MakeSvc { impl Service<&ServerIo> for MakeSvc where IO: Connected, - S: Service, Response = Response> + Clone + Send + 'static, + S: Service> + Clone + Send + 'static, S::Future: Send + 'static, S::Error: Into + Send, - ResBody: http_body::Body + Send + 'static, + ResBody: Body + Send + 'static, ResBody::Error: Into, { type Response = BoxService; @@ -857,7 +848,7 @@ where let svc = ServiceBuilder::new() .layer(BoxService::layer()) - .map_request(move |mut request: Request| { + .map_request(move |mut request: Request| { match &conn_info { tower::util::Either::A(inner) => { request.extensions_mut().insert(inner.clone()); @@ -888,4 +879,3 @@ where future::ready(Ok(svc)) } } - diff --git a/tonic/src/transport/service/connection.rs b/tonic/src/transport/service/connection.rs index 170000c37..389fc73a8 100644 --- a/tonic/src/transport/service/connection.rs +++ b/tonic/src/transport/service/connection.rs @@ -2,12 +2,11 @@ use super::{grpc_timeout::GrpcTimeout, reconnect::Reconnect, AddOrigin, UserAgen use crate::transport::{BoxFuture, Endpoint}; use http::Uri; use hyper::client::conn::http2::Builder; -use hyper_util::client::legacy::connect::{Connect as HyperConnect, Connection as HyperConnection}; +use hyper::rt; use std::{ fmt, task::{Context, Poll}, }; -use tokio::io::{AsyncRead, AsyncWrite}; use tower::load::Load; use tower::{ layer::Layer, @@ -29,7 +28,7 @@ impl Connection { C: Service + Send + 'static, C::Error: Into + Send, C::Future: Unpin + Send, - C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, + C::Response: rt::Read + rt::Write + Unpin + Send + 'static, { let mut settings = Builder::new(endpoint.executor) .initial_stream_window_size(endpoint.init_stream_window_size) @@ -61,9 +60,7 @@ impl Connection { .option_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d))) .into_inner(); - let connector = HyperConnect::new(connector, settings); let conn = Reconnect::new(connector, endpoint.uri.clone(), is_lazy); - let inner = stack.layer(conn); Self { @@ -76,7 +73,7 @@ impl Connection { C: Service + Send + 'static, C::Error: Into + Send, C::Future: Unpin + Send, - C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, + C::Response: rt::Read + rt::Write + Unpin + Send + 'static, { Self::new(connector, endpoint, false).ready_oneshot().await } @@ -86,7 +83,7 @@ impl Connection { C: Service + Send + 'static, C::Error: Into + Send, C::Future: Unpin + Send, - C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, + C::Response: rt::Read + rt::Write + Unpin + Send + 'static, { Self::new(connector, endpoint, true) } @@ -119,4 +116,3 @@ impl fmt::Debug for Connection { f.debug_struct("Connection").finish() } } - diff --git a/tonic/src/transport/service/connector.rs b/tonic/src/transport/service/connector.rs index a5e0d9eb9..7a4a0eab2 100644 --- a/tonic/src/transport/service/connector.rs +++ b/tonic/src/transport/service/connector.rs @@ -1,12 +1,15 @@ +use std::fmt; +use std::task::{Context, Poll}; + +use http::Uri; +use hyper::rt; +use hyper_util::rt::TokioIo; +use tower_service::Service; + use super::super::BoxFuture; use super::io::BoxedIo; #[cfg(feature = "tls")] use super::tls::TlsConnector; -use http::Uri; -use std::fmt; -use std::task::{Context, Poll}; -use tower::make::MakeConnection; -use tower_service::Service; pub(crate) struct Connector { inner: C, @@ -47,8 +50,8 @@ impl Connector { impl Service for Connector where - C: MakeConnection, - C::Connection: Unpin + Send + 'static, + C: Service, + C::Response: rt::Read + rt::Write + Unpin + Send + 'static, C::Future: Send + 'static, crate::Error: From + Send + 'static, { @@ -57,7 +60,7 @@ where type Future = BoxFuture<'static, Result>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - MakeConnection::poll_ready(&mut self.inner, cx).map_err(Into::into) + self.inner.poll_ready(cx).map_err(Into::into) } fn call(&mut self, uri: Uri) -> Self::Future { @@ -69,7 +72,7 @@ where #[cfg(feature = "tls")] let is_https = uri.scheme_str() == Some("https"); - let connect = self.inner.make_connection(uri); + let connect = self.inner.call(uri); Box::pin(async move { let io = connect.await?; @@ -77,12 +80,12 @@ where #[cfg(feature = "tls")] { if let Some(tls) = tls { - if is_https { - let conn = tls.connect(io).await?; - return Ok(BoxedIo::new(conn)); + return if is_https { + let io = tls.connect(TokioIo::new(io)).await?; + Ok(io) } else { - return Ok(BoxedIo::new(io)); - } + Ok(BoxedIo::new(io)) + }; } else if is_https { return Err(HttpsUriWithoutTlsSupport(()).into()); } diff --git a/tonic/src/transport/service/io.rs b/tonic/src/transport/service/io.rs index e5e75287a..9be7fa343 100644 --- a/tonic/src/transport/service/io.rs +++ b/tonic/src/transport/service/io.rs @@ -1,19 +1,23 @@ -use crate::transport::server::Connected; -use hyper_util::client::legacy::connect::{Connected as HyperConnected, Connection}; +use hyper::rt; use std::io; use std::io::IoSlice; use std::pin::Pin; use std::task::{Context, Poll}; + +use hyper_util::client::legacy::connect::{Connected as HyperConnected, Connection}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; #[cfg(feature = "tls")] use tokio_rustls::server::TlsStream; +use tower::util::Either; + +use crate::transport::server::Connected; pub(in crate::transport) trait Io: - AsyncRead + AsyncWrite + Send + 'static + rt::Read + rt::Write + Send + 'static { } -impl Io for T where T: AsyncRead + AsyncWrite + Send + 'static {} +impl Io for T where T: rt::Read + rt::Write + Send + 'static {} pub(crate) struct BoxedIo(Pin>); @@ -40,17 +44,17 @@ impl Connected for BoxedIo { #[derive(Copy, Clone)] pub(crate) struct NoneConnectInfo; -impl AsyncRead for BoxedIo { +impl rt::Read for BoxedIo { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, + buf: rt::ReadBufCursor<'_>, ) -> Poll> { Pin::new(&mut self.0).poll_read(cx, buf) } } -impl AsyncWrite for BoxedIo { +impl rt::Write for BoxedIo { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -67,6 +71,10 @@ impl AsyncWrite for BoxedIo { Pin::new(&mut self.0).poll_shutdown(cx) } + fn is_write_vectored(&self) -> bool { + self.0.is_write_vectored() + } + fn poll_write_vectored( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -74,10 +82,6 @@ impl AsyncWrite for BoxedIo { ) -> Poll> { Pin::new(&mut self.0).poll_write_vectored(cx, bufs) } - - fn is_write_vectored(&self) -> bool { - self.0.is_write_vectored() - } } pub(crate) enum ServerIo { @@ -86,8 +90,6 @@ pub(crate) enum ServerIo { TlsIo(Box>), } -use tower::util::Either; - #[cfg(feature = "tls")] type ServerIoConnectInfo = Either<::ConnectInfo, as Connected>::ConnectInfo>; diff --git a/tonic/src/transport/service/router.rs b/tonic/src/transport/service/router.rs index ab3d43978..89f2fc8ff 100644 --- a/tonic/src/transport/service/router.rs +++ b/tonic/src/transport/service/router.rs @@ -1,4 +1,4 @@ -use crate::{body::boxed, server::NamedService}; +use crate::server::NamedService; use axum::{extract::Request, response::Response}; use pin_project::pin_project; use std::{ @@ -8,7 +8,6 @@ use std::{ pin::Pin, task::{ready, Context, Poll}, }; -use tower::ServiceExt; use tower_service::Service; /// A [`Service`] router. @@ -72,7 +71,6 @@ impl Routes { S::Future: Send + 'static, S::Error: Into + Send, { - let svc = svc.map_response(|res| res.map(axum::body::boxed)); self.router = self .router .route_service(&format!("/{}/*rest", S::NAME), svc); @@ -128,9 +126,8 @@ impl Future for RoutesFuture { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match ready!(self.project().0.poll(cx)) { - Ok(res) => Ok(res.map(boxed)).into(), + Ok(res) => Ok(res).into(), Err(err) => match err {}, } } } - From 69f904c1a4bbfa5670ada0159c3b3575398beff0 Mon Sep 17 00:00:00 2001 From: Ivan Krivosheev Date: Sat, 13 Jan 2024 17:55:28 +0300 Subject: [PATCH 15/15] Fixes --- tonic/src/transport/service/tls.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tonic/src/transport/service/tls.rs b/tonic/src/transport/service/tls.rs index f956132fb..add4be892 100644 --- a/tonic/src/transport/service/tls.rs +++ b/tonic/src/transport/service/tls.rs @@ -3,6 +3,7 @@ use crate::transport::{ server::{Connected, TlsStream}, Certificate, Identity, }; +use hyper_util::rt::TokioIo; use std::{fmt, sync::Arc}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_rustls::{ @@ -83,7 +84,7 @@ impl TlsConnector { _ => return Err(TlsError::H2NotNegotiated.into()), }; - BoxedIo::new(io) + BoxedIo::new(TokioIo::new(io)) }; Ok(tls_io)