diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index 013cc6e72..aa070f3ac 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -28,7 +28,14 @@ gzip = ["dep:flate2"] zstd = ["dep:zstd"] default = ["transport", "codegen", "prost"] prost = ["dep:prost"] -tls = ["dep:rustls-pemfile", "transport", "dep:tokio-rustls", "dep:rustls", "tokio/rt", "tokio/macros"] +tls = [ + "dep:rustls-pemfile", + "transport", + "dep:tokio-rustls", + "dep:rustls", + "tokio/rt", + "tokio/macros", +] tls-roots = ["tls-roots-common", "dep:rustls-native-certs"] tls-roots-common = ["tls"] tls-webpki-roots = ["tls-roots-common", "dep:webpki-roots"] @@ -52,29 +59,42 @@ channel = [] [dependencies] base64 = "0.21" bytes = "1.0" -http = "0.2" +http = "1.0" tracing = "0.1" tokio = "1.0.1" -http-body = "0.4.4" +http-body = "1.0" +http-body-util = "0.1" percent-encoding = "2.1" pin-project = "1.0.11" tower-layer = "0.3" tower-service = "0.3" # prost -prost = {version = "0.12", default-features = false, features = ["std"], optional = true} +prost = { version = "0.12", default-features = false, features = [ + "std", +], optional = true } # codegen -async-trait = {version = "0.1.13", optional = true} +async-trait = { version = "0.1.13", optional = true } # transport -h2 = {version = "0.3.17", optional = true} -hyper = {version = "0.14.26", features = ["full"], optional = true} -hyper-timeout = {version = "0.4", optional = true} -tokio-stream = "0.1" -tower = {version = "0.4.7", default-features = false, features = ["balance", "buffer", "discover", "limit", "load", "make", "timeout", "util"], optional = true} -axum = {version = "0.6.9", default_features = false, optional = true} +h2 = { version = "0.4", optional = true } +hyper = { version = "1.0", features = ["full"], optional = true } +hyper-util = { version = "0.1", features = ["full"] } +hyper-timeout = { version = "0.5", optional = true } +tokio-stream = { version = "0.1", features = ["net"] } +tower = { version = "0.4.7", default-features = false, features = [ + "balance", + "buffer", + "discover", + "limit", + "load", + "make", + "timeout", + "util", +], optional = true } +axum = { version = "0.7", default_features = false, optional = true } # rustls async-stream = { version = "0.3", optional = true } @@ -85,7 +105,7 @@ rustls = { version = "0.21.7", optional = true } webpki-roots = { version = "0.25.0", optional = true } # compression -flate2 = {version = "1.0", optional = true} +flate2 = { version = "1.0", optional = true } zstd = { version = "0.12.3", optional = true } [dev-dependencies] @@ -94,8 +114,8 @@ quickcheck = "1.0" quickcheck_macros = "1.0" rand = "0.8" static_assertions = "1.0" -tokio = {version = "1.0", features = ["rt", "macros"]} -tower = {version = "0.4.7", features = ["full"]} +tokio = { version = "1.0", features = ["rt", "macros"] } +tower = { version = "0.4.7", features = ["full"] } [package.metadata.docs.rs] all-features = true diff --git a/tonic/benches/decode.rs b/tonic/benches/decode.rs index 5c7cd0159..ca9984c51 100644 --- a/tonic/benches/decode.rs +++ b/tonic/benches/decode.rs @@ -1,11 +1,13 @@ -use bencher::{benchmark_group, benchmark_main, Bencher}; -use bytes::{Buf, BufMut, Bytes, BytesMut}; -use http_body::Body; use std::{ fmt::{Error, Formatter}, pin::Pin, task::{Context, Poll}, }; + +use bencher::{benchmark_group, benchmark_main, Bencher}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use http_body::{Body, Frame, SizeHint}; + use tonic::{codec::DecodeBuf, codec::Decoder, Status, Streaming}; macro_rules! bench { @@ -58,23 +60,24 @@ impl Body for MockBody { type Data = Bytes; type Error = Status; - fn poll_data( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll>> { + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { if self.data.has_remaining() { let split = std::cmp::min(self.chunk_size, self.data.remaining()); - Poll::Ready(Some(Ok(self.data.split_to(split)))) + Poll::Ready(Some(Ok(Frame::data(self.data.split_to(split))))) } else { Poll::Ready(None) } } - fn poll_trailers( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll, Self::Error>> { - Poll::Ready(Ok(None)) + fn is_end_stream(&self) -> bool { + !self.data.is_empty() + } + + fn size_hint(&self) -> SizeHint { + SizeHint::with_exact(self.data.len() as u64) } } diff --git a/tonic/src/body.rs b/tonic/src/body.rs index ef95eec47..428c0dade 100644 --- a/tonic/src/body.rs +++ b/tonic/src/body.rs @@ -1,9 +1,9 @@ //! HTTP specific body utilities. -use http_body::Body; +use http_body_util::BodyExt; /// A type erased HTTP body used for tonic services. -pub type BoxBody = http_body::combinators::UnsyncBoxBody; +pub type BoxBody = http_body_util::combinators::UnsyncBoxBody; /// Convert a [`http_body::Body`] into a [`BoxBody`]. pub(crate) fn boxed(body: B) -> BoxBody @@ -16,7 +16,7 @@ where /// Create an empty `BoxBody` pub fn empty_body() -> BoxBody { - http_body::Empty::new() + http_body_util::Empty::new() .map_err(|err| match err {}) .boxed_unsync() } diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index cb88a0649..4ba5318de 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -4,6 +4,7 @@ use crate::{body::BoxBody, metadata::MetadataMap, Code, Status}; use bytes::{Buf, BufMut, BytesMut}; use http::StatusCode; use http_body::Body; +use http_body_util::BodyExt; use std::{ fmt, future, pin::Pin, @@ -122,7 +123,9 @@ impl Streaming { decoder: Box::new(decoder), inner: StreamingInner { body: body - .map_data(|mut buf| buf.copy_to_bytes(buf.remaining())) + .map_frame(|mut frame| { + frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining())) + }) .map_err(|err| Status::map_error(err.into())) .boxed_unsync(), state: State::ReadHeader, @@ -231,7 +234,7 @@ impl StreamingInner { // Returns Some(()) if data was found or None if the loop in `poll_next` should break fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll, Status>> { - let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) { + let chunk = match ready!(Pin::new(&mut self.body).poll_frame(cx)) { Some(Ok(d)) => Some(d), Some(Err(status)) => { if self.direction == Direction::Request && status.code() == Code::Cancelled { diff --git a/tonic/src/codec/encode.rs b/tonic/src/codec/encode.rs index 13eb2c96d..5586cc8de 100644 --- a/tonic/src/codec/encode.rs +++ b/tonic/src/codec/encode.rs @@ -3,7 +3,7 @@ use super::{EncodeBuf, Encoder, DEFAULT_MAX_SEND_MESSAGE_SIZE, HEADER_SIZE}; use crate::{Code, Status}; use bytes::{BufMut, Bytes, BytesMut}; use http::HeaderMap; -use http_body::Body; +use http_body::{Body, Frame}; use pin_project::pin_project; use std::{ pin::Pin, @@ -315,17 +315,13 @@ where type Data = Bytes; type Error = Status; - fn is_end_stream(&self) -> bool { - self.state.is_end_stream - } - - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll, Self::Error>>> { let self_proj = self.project(); match ready!(self_proj.inner.poll_next(cx)) { - Some(Ok(d)) => Some(Ok(d)).into(), + Some(Ok(d)) => Some(Ok(Frame::data(d))).into(), Some(Err(status)) => match self_proj.state.role { Role::Client => Some(Err(status)).into(), Role::Server => { @@ -337,10 +333,7 @@ where } } - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, Status>> { - Poll::Ready(self.project().state.trailers()) + fn is_end_stream(&self) -> bool { + self.state.is_end_stream } } diff --git a/tonic/src/codec/prost.rs b/tonic/src/codec/prost.rs index d2f1652f4..5b85ba24d 100644 --- a/tonic/src/codec/prost.rs +++ b/tonic/src/codec/prost.rs @@ -268,7 +268,7 @@ mod tests { mod body { use crate::Status; use bytes::Bytes; - use http_body::Body; + use http_body::{Body, Frame}; use std::{ pin::Pin, task::{Context, Poll}, @@ -299,10 +299,10 @@ mod tests { type Data = Bytes; type Error = Status; - fn poll_data( + fn poll_frame( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll, Self::Error>>> { // every other call to poll_data returns data let should_send = self.count % 2 == 0; let data_len = self.data.len(); @@ -325,13 +325,6 @@ mod tests { Poll::Ready(None) } } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - Poll::Ready(Ok(None)) - } } } } diff --git a/tonic/src/extensions.rs b/tonic/src/extensions.rs index 67a7137be..f74ec8910 100644 --- a/tonic/src/extensions.rs +++ b/tonic/src/extensions.rs @@ -24,7 +24,7 @@ impl Extensions { /// If a extension of this type already existed, it will /// be returned. #[inline] - pub fn insert(&mut self, val: T) -> Option { + pub fn insert(&mut self, val: T) -> Option { self.inner.insert(val) } diff --git a/tonic/src/service/interceptor.rs b/tonic/src/service/interceptor.rs index cadff466f..5f6a1b42b 100644 --- a/tonic/src/service/interceptor.rs +++ b/tonic/src/service/interceptor.rs @@ -232,7 +232,8 @@ where mod tests { #[allow(unused_imports)] use super::*; - use http::header::HeaderMap; + use http_body::Frame; + use http_body_util::Empty; use std::{ pin::Pin, task::{Context, Poll}, @@ -246,19 +247,12 @@ mod tests { type Data = Bytes; type Error = Status; - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, _cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll, Self::Error>>> { Poll::Ready(None) } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - Poll::Ready(Ok(None)) - } } #[tokio::test] @@ -318,17 +312,17 @@ mod tests { #[tokio::test] async fn doesnt_change_http_method() { - let svc = tower::service_fn(|request: http::Request| async move { + let svc = tower::service_fn(|request: http::Request>| async move { assert_eq!(request.method(), http::Method::OPTIONS); - Ok::<_, hyper::Error>(hyper::Response::new(hyper::Body::empty())) + Ok::<_, hyper::Error>(hyper::Response::new(Empty::new())) }); let svc = InterceptedService::new(svc, Ok); let request = http::Request::builder() .method(http::Method::OPTIONS) - .body(hyper::Body::empty()) + .body(Empty::new()) .unwrap(); svc.oneshot(request).await.unwrap(); diff --git a/tonic/src/status.rs b/tonic/src/status.rs index da8b792e5..2e691df0c 100644 --- a/tonic/src/status.rs +++ b/tonic/src/status.rs @@ -412,13 +412,7 @@ impl Status { // > status. Note that the frequency of PINGs is highly dependent on the network // > environment, implementations are free to adjust PING frequency based on network and // > application requirements, which is why it's mapped to unavailable here. - // - // Likewise, if we are unable to connect to the server, map this to UNAVAILABLE. This is - // consistent with the behavior of a C++ gRPC client when the server is not running, and - // matches the spec of: - // > The service is currently unavailable. This is most likely a transient condition that - // > can be corrected if retried with a backoff. - if err.is_timeout() || err.is_connect() { + if err.is_timeout() { return Some(Status::unavailable(err.to_string())); } diff --git a/tonic/src/transport/channel/endpoint.rs b/tonic/src/transport/channel/endpoint.rs index 6aacb57a5..4636b0fee 100644 --- a/tonic/src/transport/channel/endpoint.rs +++ b/tonic/src/transport/channel/endpoint.rs @@ -7,9 +7,9 @@ use crate::transport::service::TlsConnector; use crate::transport::{service::SharedExec, Error, Executor}; use bytes::Bytes; use http::{uri::Uri, HeaderValue}; +use hyper::rt; use std::{fmt, future::Future, pin::Pin, str::FromStr, time::Duration}; -use tower::make::MakeConnection; -// use crate::transport::E +use tower_service::Service; /// Channel builder. /// @@ -313,7 +313,7 @@ impl Endpoint { /// Create a channel from this config. pub async fn connect(&self) -> Result { - let mut http = hyper::client::connect::HttpConnector::new(); + let mut http = hyper_util::client::legacy::connect::HttpConnector::new(); http.enforce_http(false); http.set_nodelay(self.tcp_nodelay); http.set_keepalive(self.tcp_keepalive); @@ -334,7 +334,7 @@ impl Endpoint { /// The channel returned by this method does not attempt to connect to the endpoint until first /// use. pub fn connect_lazy(&self) -> Channel { - let mut http = hyper::client::connect::HttpConnector::new(); + let mut http = hyper_util::client::legacy::connect::HttpConnector::new(); http.enforce_http(false); http.set_nodelay(self.tcp_nodelay); http.set_keepalive(self.tcp_keepalive); @@ -359,8 +359,8 @@ impl Endpoint { /// The [`connect_timeout`](Endpoint::connect_timeout) will still be applied. pub async fn connect_with_connector(&self, connector: C) -> Result where - C: MakeConnection + Send + 'static, - C::Connection: Unpin + Send + 'static, + C: Service + Send + 'static, + C::Response: rt::Read + rt::Write + Send + Unpin + 'static, C::Future: Send + 'static, crate::Error: From + Send + 'static, { @@ -384,8 +384,8 @@ impl Endpoint { /// uses a Unix socket transport. pub fn connect_with_connector_lazy(&self, connector: C) -> Channel where - C: MakeConnection + Send + 'static, - C::Connection: Unpin + Send + 'static, + C: Service + Send + 'static, + C::Response: rt::Read + rt::Write + Send + Unpin + 'static, C::Future: Send + 'static, crate::Error: From + Send + 'static, { diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index b510a6980..420fd1f3c 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -10,14 +10,10 @@ pub use endpoint::Endpoint; pub use tls::ClientTlsConfig; use super::service::{Connection, DynamicServiceStream, SharedExec}; -use crate::body::BoxBody; use crate::transport::Executor; use bytes::Bytes; -use http::{ - uri::{InvalidUri, Uri}, - Request, Response, -}; -use hyper::client::connect::Connection as HyperConnection; +use http::uri::{InvalidUri, Uri}; +use hyper_util::client::legacy::connect::Connection as HyperConnection; use std::{ fmt, future::Future, @@ -25,11 +21,10 @@ use std::{ pin::Pin, task::{ready, Context, Poll}, }; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - sync::mpsc::{channel, Sender}, -}; +use tokio::sync::mpsc::{channel, Sender}; +use axum::{body::Body, extract::Request, response::Response}; +use hyper::rt; use tower::balance::p2c::Balance; use tower::{ buffer::{self, Buffer}, @@ -38,7 +33,7 @@ use tower::{ Service, }; -type Svc = Either, Response, crate::Error>>; +type Svc = Either>; const DEFAULT_BUFFER_SIZE: usize = 1024; @@ -67,14 +62,14 @@ const DEFAULT_BUFFER_SIZE: usize = 1024; /// cloning the `Channel` type is cheap and encouraged. #[derive(Clone)] pub struct Channel { - svc: Buffer>, + svc: Buffer, } /// A future that resolves to an HTTP response. /// /// This is returned by the `Service::call` on [`Channel`]. pub struct ResponseFuture { - inner: buffer::future::ResponseFuture<>>::Future>, + inner: buffer::future::ResponseFuture<>::Future>, } impl Channel { @@ -152,7 +147,7 @@ impl Channel { C: Service + Send + 'static, C::Error: Into + Send, C::Future: Unpin + Send, - C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, + C::Response: rt::Read + rt::Write + HyperConnection + Unpin + Send + 'static, { let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE); let executor = endpoint.executor.clone(); @@ -169,7 +164,7 @@ impl Channel { C: Service + Send + 'static, C::Error: Into + Send, C::Future: Unpin + Send, - C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, + C::Response: rt::Read + rt::Write + HyperConnection + Unpin + Send + 'static, { let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE); let executor = endpoint.executor.clone(); @@ -200,8 +195,8 @@ impl Channel { } } -impl Service> for Channel { - type Response = http::Response; +impl Service for Channel { + type Response = Response; type Error = super::Error; type Future = ResponseFuture; @@ -209,7 +204,7 @@ impl Service> for Channel { Service::poll_ready(&mut self.svc, cx).map_err(super::Error::from_source) } - fn call(&mut self, request: http::Request) -> Self::Future { + fn call(&mut self, request: Request) -> Self::Future { let inner = Service::call(&mut self.svc, request); ResponseFuture { inner } @@ -217,7 +212,7 @@ impl Service> for Channel { } impl Future for ResponseFuture { - type Output = Result, super::Error>; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let val = ready!(Pin::new(&mut self.inner).poll(cx)).map_err(super::Error::from_source)?; diff --git a/tonic/src/transport/mod.rs b/tonic/src/transport/mod.rs index a0435c797..8c892e5bc 100644 --- a/tonic/src/transport/mod.rs +++ b/tonic/src/transport/mod.rs @@ -107,8 +107,9 @@ pub use self::service::grpc_timeout::TimeoutExpired; #[cfg(feature = "tls")] #[cfg_attr(docsrs, doc(cfg(feature = "tls")))] pub use self::tls::Certificate; -pub use axum::{body::BoxBody as AxumBoxBody, Router as AxumRouter}; -pub use hyper::{Body, Uri}; +pub use axum::{body::Body as AxumBoxBody, Router as AxumRouter}; +pub use hyper::body::Body; +pub use hyper::Uri; pub(crate) use self::service::executor::Executor; diff --git a/tonic/src/transport/server/conn.rs b/tonic/src/transport/server/conn.rs index 907cf4965..11fcc4fd0 100644 --- a/tonic/src/transport/server/conn.rs +++ b/tonic/src/transport/server/conn.rs @@ -1,14 +1,14 @@ -use hyper::server::conn::AddrStream; use std::net::SocketAddr; -use tokio::net::TcpStream; - -#[cfg(feature = "tls")] -use crate::transport::Certificate; #[cfg(feature = "tls")] use std::sync::Arc; + +use tokio::net::TcpStream; #[cfg(feature = "tls")] use tokio_rustls::server::TlsStream; +#[cfg(feature = "tls")] +use crate::transport::Certificate; + /// Trait that connected IO resources implement and use to produce info about the connection. /// /// The goal for this trait is to allow users to implement @@ -86,17 +86,6 @@ impl TcpConnectInfo { } } -impl Connected for AddrStream { - type ConnectInfo = TcpConnectInfo; - - fn connect_info(&self) -> Self::ConnectInfo { - TcpConnectInfo { - local_addr: Some(self.local_addr()), - remote_addr: Some(self.remote_addr()), - } - } -} - impl Connected for TcpStream { type ConnectInfo = TcpConnectInfo; diff --git a/tonic/src/transport/server/incoming.rs b/tonic/src/transport/server/incoming.rs index 61aadc93d..af60d319c 100644 --- a/tonic/src/transport/server/incoming.rs +++ b/tonic/src/transport/server/incoming.rs @@ -1,19 +1,16 @@ use super::{Connected, Server}; use crate::transport::service::ServerIo; -use hyper::server::{ - accept::Accept, - conn::{AddrIncoming, AddrStream}, -}; use std::{ - net::SocketAddr, + net::{SocketAddr, TcpListener as StdTcpListener}, pin::Pin, task::{Context, Poll}, time::Duration, }; use tokio::{ io::{AsyncRead, AsyncWrite}, - net::TcpListener, + net::{TcpListener, TcpStream}, }; +use tokio_stream::wrappers::TcpListenerStream; use tokio_stream::{Stream, StreamExt}; #[cfg(not(feature = "tls"))] @@ -127,7 +124,9 @@ enum SelectOutput { /// of `AsyncRead + AsyncWrite` that communicate with clients that connect to a socket address. #[derive(Debug)] pub struct TcpIncoming { - inner: AddrIncoming, + inner: TcpListenerStream, + tcp_keepalive_timeout: Option, + tcp_nodelay: bool, } impl TcpIncoming { @@ -164,33 +163,37 @@ impl TcpIncoming { /// # } pub fn new( addr: SocketAddr, - nodelay: bool, - keepalive: Option, + tcp_nodelay: bool, + tcp_keepalive_timeout: Option, ) -> Result { - let mut inner = AddrIncoming::bind(&addr)?; - inner.set_nodelay(nodelay); - inner.set_keepalive(keepalive); - Ok(TcpIncoming { inner }) + let std_listener = StdTcpListener::bind(addr)?; + let inner = TcpListenerStream::new(TcpListener::from_std(std_listener)?); + Ok(Self { + inner, + tcp_nodelay, + tcp_keepalive_timeout, + }) } /// Creates a new `TcpIncoming` from an existing `tokio::net::TcpListener`. pub fn from_listener( listener: TcpListener, - nodelay: bool, - keepalive: Option, + tcp_nodelay: bool, + tcp_keepalive_timeout: Option, ) -> Result { - let mut inner = AddrIncoming::from_listener(listener)?; - inner.set_nodelay(nodelay); - inner.set_keepalive(keepalive); - Ok(TcpIncoming { inner }) + Ok(Self { + inner: TcpListenerStream::new(listener), + tcp_nodelay, + tcp_keepalive_timeout, + }) } } impl Stream for TcpIncoming { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_accept(cx) + Pin::new(&mut self.inner).poll_next(cx) } } diff --git a/tonic/src/transport/server/mod.rs b/tonic/src/transport/server/mod.rs index 7f2ffde2b..768bd29f8 100644 --- a/tonic/src/transport/server/mod.rs +++ b/tonic/src/transport/server/mod.rs @@ -13,6 +13,7 @@ pub use super::service::Routes; pub use super::service::RoutesBuilder; pub use conn::{Connected, TcpConnectInfo}; +use hyper_util::rt::{TokioExecutor, TokioIo}; #[cfg(feature = "tls")] pub use tls::ServerTlsConfig; @@ -35,12 +36,12 @@ use crate::transport::Error; use self::recover_error::RecoverError; use super::service::{GrpcTimeout, ServerIo}; -use crate::body::BoxBody; use crate::server::NamedService; +use axum::extract::Request; +use axum::response::Response; use bytes::Bytes; -use http::{Request, Response}; -use http_body::Body as _; -use hyper::{server::accept, Body}; +use http_body_util::BodyExt; +use hyper::body::Body; use pin_project::pin_project; use std::{ convert::Infallible, @@ -63,9 +64,8 @@ use tower::{ Service, ServiceBuilder, }; -type BoxHttpBody = http_body::combinators::UnsyncBoxBody; -type BoxService = tower::util::BoxService, Response, crate::Error>; -type TraceInterceptor = Arc) -> tracing::Span + Send + Sync + 'static>; +type BoxService = tower::util::BoxService; +type TraceInterceptor = Arc) -> tracing::Span + Send + Sync + 'static>; const DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS: u64 = 20; @@ -359,7 +359,7 @@ impl Server { /// route around different services. pub fn add_service(&mut self, svc: S) -> Router where - S: Service, Response = Response, Error = Infallible> + S: Service + NamedService + Clone + Send @@ -380,7 +380,7 @@ impl Server { /// As a result, one cannot use this to toggle between two identically named implementations. pub fn add_optional_service(&mut self, svc: Option) -> Router where - S: Service, Response = Response, Error = Infallible> + S: Service + NamedService + Clone + Send @@ -494,15 +494,15 @@ impl Server { ) -> Result<(), super::Error> where L: Layer, - L::Service: Service, Response = Response> + Clone + Send + 'static, - <>::Service as Service>>::Future: Send + 'static, - <>::Service as Service>>::Error: Into + Send, + L::Service: Service> + Clone + Send + 'static, + <>::Service as Service>::Future: Send + 'static, + <>::Service as Service>::Error: Into + Send, I: Stream>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IO::ConnectInfo: Clone + Send + Sync + 'static, IE: Into, F: Future, - ResBody: http_body::Body + Send + 'static, + ResBody: Body + Send + 'static, ResBody::Error: Into, { let trace_interceptor = self.trace_interceptor.clone(); @@ -523,9 +523,7 @@ impl Server { let svc = self.service_builder.service(svc); - let tcp = incoming::tcp_incoming(incoming, self); - let incoming = accept::from_stream::<_, _, crate::Error>(tcp); - + let incoming = incoming::tcp_incoming(incoming, self); let svc = MakeSvc { inner: svc, concurrency_limit, @@ -534,26 +532,22 @@ impl Server { _io: PhantomData, }; - let server = hyper::Server::builder(incoming) - .http2_only(http2_only) - .http2_initial_connection_window_size(init_connection_window_size) - .http2_initial_stream_window_size(init_stream_window_size) - .http2_max_concurrent_streams(max_concurrent_streams) - .http2_keep_alive_interval(http2_keepalive_interval) - .http2_keep_alive_timeout(http2_keepalive_timeout) - .http2_adaptive_window(http2_adaptive_window.unwrap_or_default()) - .http2_max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams) - .http2_max_frame_size(max_frame_size); - - if let Some(signal) = signal { - server - .serve(svc) - .with_graceful_shutdown(signal) - .await - .map_err(super::Error::from_source)? - } else { - server.serve(svc).await.map_err(super::Error::from_source)?; - } + let mut builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()); + + builder + .http2() + .initial_connection_window_size(init_connection_window_size) + .initial_stream_window_size(init_stream_window_size) + .max_concurrent_streams(max_concurrent_streams) + .keep_alive_interval(http2_keepalive_interval) + .keep_alive_timeout(http2_keepalive_timeout) + .adaptive_window(http2_adaptive_window.unwrap_or_default()) + // FIXME: wait for this to be added to hyper-util + //.max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams) + .max_frame_size(max_frame_size); + + let io = TokioIo::new(incoming); + let connection = builder.serve_connection(io, svc); Ok(()) } @@ -569,7 +563,7 @@ impl Router { /// Add a new service to this router. pub fn add_service(mut self, svc: S) -> Self where - S: Service, Response = Response, Error = Infallible> + S: Service + NamedService + Clone + Send @@ -588,7 +582,7 @@ impl Router { #[allow(clippy::type_complexity)] pub fn add_optional_service(mut self, svc: Option) -> Self where - S: Service, Response = Response, Error = Infallible> + S: Service + NamedService + Clone + Send @@ -614,10 +608,10 @@ impl Router { pub async fn serve(self, addr: SocketAddr) -> Result<(), super::Error> where L: Layer, - L::Service: Service, Response = Response> + Clone + Send + 'static, - <>::Service as Service>>::Future: Send + 'static, - <>::Service as Service>>::Error: Into + Send, - ResBody: http_body::Body + Send + 'static, + L::Service: Service> + Clone + Send + 'static, + <>::Service as Service>::Future: Send + 'static, + <>::Service as Service>::Error: Into + Send, + ResBody: Body + Send + 'static, ResBody::Error: Into, { let incoming = TcpIncoming::new(addr, self.server.tcp_nodelay, self.server.tcp_keepalive) @@ -644,10 +638,10 @@ impl Router { ) -> Result<(), super::Error> where L: Layer, - L::Service: Service, Response = Response> + Clone + Send + 'static, - <>::Service as Service>>::Future: Send + 'static, - <>::Service as Service>>::Error: Into + Send, - ResBody: http_body::Body + Send + 'static, + L::Service: Service> + Clone + Send + 'static, + <>::Service as Service>::Future: Send + 'static, + <>::Service as Service>::Error: Into + Send, + ResBody: Body + Send + 'static, ResBody::Error: Into, { let incoming = TcpIncoming::new(addr, self.server.tcp_nodelay, self.server.tcp_keepalive) @@ -673,10 +667,10 @@ impl Router { IO::ConnectInfo: Clone + Send + Sync + 'static, IE: Into, L: Layer, - L::Service: Service, Response = Response> + Clone + Send + 'static, - <>::Service as Service>>::Future: Send + 'static, - <>::Service as Service>>::Error: Into + Send, - ResBody: http_body::Body + Send + 'static, + L::Service: Service> + Clone + Send + 'static, + <>::Service as Service>::Future: Send + 'static, + <>::Service as Service>::Error: Into + Send, + ResBody: Body + Send + 'static, ResBody::Error: Into, { self.server @@ -708,10 +702,10 @@ impl Router { IE: Into, F: Future, L: Layer, - L::Service: Service, Response = Response> + Clone + Send + 'static, - <>::Service as Service>>::Future: Send + 'static, - <>::Service as Service>>::Error: Into + Send, - ResBody: http_body::Body + Send + 'static, + L::Service: Service> + Clone + Send + 'static, + <>::Service as Service>::Future: Send + 'static, + <>::Service as Service>::Error: Into + Send, + ResBody: Body + Send + 'static, ResBody::Error: Into, { self.server @@ -723,10 +717,10 @@ impl Router { pub fn into_service(self) -> L::Service where L: Layer, - L::Service: Service, Response = Response> + Clone + Send + 'static, - <>::Service as Service>>::Future: Send + 'static, - <>::Service as Service>>::Error: Into + Send, - ResBody: http_body::Body + Send + 'static, + L::Service: Service> + Clone + Send + 'static, + <>::Service as Service>::Future: Send + 'static, + <>::Service as Service>::Error: Into + Send, + ResBody: Body + Send + 'static, ResBody::Error: Into, { self.server.service_builder.service(self.routes.prepare()) @@ -744,14 +738,14 @@ struct Svc { trace_interceptor: Option, } -impl Service> for Svc +impl Service for Svc where - S: Service, Response = Response>, + S: Service>, S::Error: Into, - ResBody: http_body::Body + Send + 'static, + ResBody: Body + Send + 'static, ResBody::Error: Into, { - type Response = Response; + type Response = Response; type Error = crate::Error; type Future = SvcFuture; @@ -759,7 +753,7 @@ where self.inner.poll_ready(cx).map_err(Into::into) } - fn call(&mut self, mut req: Request) -> Self::Future { + fn call(&mut self, mut req: Request) -> Self::Future { let span = if let Some(trace_interceptor) = &self.trace_interceptor { let (parts, body) = req.into_parts(); let bodyless_request = Request::from_parts(parts, ()); @@ -792,10 +786,10 @@ impl Future for SvcFuture where F: Future, E>>, E: Into, - ResBody: http_body::Body + Send + 'static, + ResBody: Body + Send + 'static, ResBody::Error: Into, { - type Output = Result, crate::Error>; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); @@ -824,10 +818,10 @@ struct MakeSvc { impl Service<&ServerIo> for MakeSvc where IO: Connected, - S: Service, Response = Response> + Clone + Send + 'static, + S: Service> + Clone + Send + 'static, S::Future: Send + 'static, S::Error: Into + Send, - ResBody: http_body::Body + Send + 'static, + ResBody: Body + Send + 'static, ResBody::Error: Into, { type Response = BoxService; @@ -854,7 +848,7 @@ where let svc = ServiceBuilder::new() .layer(BoxService::layer()) - .map_request(move |mut request: Request| { + .map_request(move |mut request: Request| { match &conn_info { tower::util::Either::A(inner) => { request.extensions_mut().insert(inner.clone()); diff --git a/tonic/src/transport/server/recover_error.rs b/tonic/src/transport/server/recover_error.rs index 6d7e55bf4..d976b167e 100644 --- a/tonic/src/transport/server/recover_error.rs +++ b/tonic/src/transport/server/recover_error.rs @@ -1,5 +1,6 @@ use crate::Status; use http::Response; +use http_body::{Body, Frame, SizeHint}; use pin_project::pin_project; use std::{ future::Future, @@ -91,37 +92,34 @@ impl MaybeEmptyBody { } } -impl http_body::Body for MaybeEmptyBody +impl Body for MaybeEmptyBody where - B: http_body::Body + Send, + B: Body + Send, { type Data = B::Data; type Error = B::Error; - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll, Self::Error>>> { match self.project().inner.as_pin_mut() { - Some(b) => b.poll_data(cx), + Some(b) => b.poll_frame(cx), None => Poll::Ready(None), } } - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - match self.project().inner.as_pin_mut() { - Some(b) => b.poll_trailers(cx), - None => Poll::Ready(Ok(None)), - } - } - fn is_end_stream(&self) -> bool { match &self.inner { Some(b) => b.is_end_stream(), None => true, } } + + fn size_hint(&self) -> SizeHint { + match &self.inner { + Some(b) => b.size_hint(), + None => SizeHint::default(), + } + } } diff --git a/tonic/src/transport/service/connection.rs b/tonic/src/transport/service/connection.rs index 46a88dda5..389fc73a8 100644 --- a/tonic/src/transport/service/connection.rs +++ b/tonic/src/transport/service/connection.rs @@ -1,17 +1,12 @@ use super::{grpc_timeout::GrpcTimeout, reconnect::Reconnect, AddOrigin, UserAgent}; -use crate::{ - body::BoxBody, - transport::{BoxFuture, Endpoint}, -}; +use crate::transport::{BoxFuture, Endpoint}; use http::Uri; -use hyper::client::conn::Builder; -use hyper::client::connect::Connection as HyperConnection; -use hyper::client::service::Connect as HyperConnect; +use hyper::client::conn::http2::Builder; +use hyper::rt; use std::{ fmt, task::{Context, Poll}, }; -use tokio::io::{AsyncRead, AsyncWrite}; use tower::load::Load; use tower::{ layer::Layer, @@ -21,9 +16,8 @@ use tower::{ }; use tower_service::Service; -pub(crate) type Request = http::Request; -pub(crate) type Response = http::Response; - +pub(crate) type Request = axum::extract::Request; +pub(crate) type Response = axum::response::Response; pub(crate) struct Connection { inner: BoxService, } @@ -34,26 +28,24 @@ impl Connection { C: Service + Send + 'static, C::Error: Into + Send, C::Future: Unpin + Send, - C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, + C::Response: rt::Read + rt::Write + Unpin + Send + 'static, { - let mut settings = Builder::new() - .http2_initial_stream_window_size(endpoint.init_stream_window_size) - .http2_initial_connection_window_size(endpoint.init_connection_window_size) - .http2_only(true) - .http2_keep_alive_interval(endpoint.http2_keep_alive_interval) - .executor(endpoint.executor.clone()) + let mut settings = Builder::new(endpoint.executor) + .initial_stream_window_size(endpoint.init_stream_window_size) + .initial_connection_window_size(endpoint.init_connection_window_size) + .keep_alive_interval(endpoint.http2_keep_alive_interval) .clone(); if let Some(val) = endpoint.http2_keep_alive_timeout { - settings.http2_keep_alive_timeout(val); + settings.keep_alive_timeout(val); } if let Some(val) = endpoint.http2_keep_alive_while_idle { - settings.http2_keep_alive_while_idle(val); + settings.keep_alive_while_idle(val); } if let Some(val) = endpoint.http2_adaptive_window { - settings.http2_adaptive_window(val); + settings.adaptive_window(val); } let stack = ServiceBuilder::new() @@ -68,9 +60,7 @@ impl Connection { .option_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d))) .into_inner(); - let connector = HyperConnect::new(connector, settings); let conn = Reconnect::new(connector, endpoint.uri.clone(), is_lazy); - let inner = stack.layer(conn); Self { @@ -83,7 +73,7 @@ impl Connection { C: Service + Send + 'static, C::Error: Into + Send, C::Future: Unpin + Send, - C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, + C::Response: rt::Read + rt::Write + Unpin + Send + 'static, { Self::new(connector, endpoint, false).ready_oneshot().await } @@ -93,7 +83,7 @@ impl Connection { C: Service + Send + 'static, C::Error: Into + Send, C::Future: Unpin + Send, - C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, + C::Response: rt::Read + rt::Write + Unpin + Send + 'static, { Self::new(connector, endpoint, true) } diff --git a/tonic/src/transport/service/connector.rs b/tonic/src/transport/service/connector.rs index a5e0d9eb9..7a4a0eab2 100644 --- a/tonic/src/transport/service/connector.rs +++ b/tonic/src/transport/service/connector.rs @@ -1,12 +1,15 @@ +use std::fmt; +use std::task::{Context, Poll}; + +use http::Uri; +use hyper::rt; +use hyper_util::rt::TokioIo; +use tower_service::Service; + use super::super::BoxFuture; use super::io::BoxedIo; #[cfg(feature = "tls")] use super::tls::TlsConnector; -use http::Uri; -use std::fmt; -use std::task::{Context, Poll}; -use tower::make::MakeConnection; -use tower_service::Service; pub(crate) struct Connector { inner: C, @@ -47,8 +50,8 @@ impl Connector { impl Service for Connector where - C: MakeConnection, - C::Connection: Unpin + Send + 'static, + C: Service, + C::Response: rt::Read + rt::Write + Unpin + Send + 'static, C::Future: Send + 'static, crate::Error: From + Send + 'static, { @@ -57,7 +60,7 @@ where type Future = BoxFuture<'static, Result>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - MakeConnection::poll_ready(&mut self.inner, cx).map_err(Into::into) + self.inner.poll_ready(cx).map_err(Into::into) } fn call(&mut self, uri: Uri) -> Self::Future { @@ -69,7 +72,7 @@ where #[cfg(feature = "tls")] let is_https = uri.scheme_str() == Some("https"); - let connect = self.inner.make_connection(uri); + let connect = self.inner.call(uri); Box::pin(async move { let io = connect.await?; @@ -77,12 +80,12 @@ where #[cfg(feature = "tls")] { if let Some(tls) = tls { - if is_https { - let conn = tls.connect(io).await?; - return Ok(BoxedIo::new(conn)); + return if is_https { + let io = tls.connect(TokioIo::new(io)).await?; + Ok(io) } else { - return Ok(BoxedIo::new(io)); - } + Ok(BoxedIo::new(io)) + }; } else if is_https { return Err(HttpsUriWithoutTlsSupport(()).into()); } diff --git a/tonic/src/transport/service/discover.rs b/tonic/src/transport/service/discover.rs index 2d23ca74c..fb31c4f92 100644 --- a/tonic/src/transport/service/discover.rs +++ b/tonic/src/transport/service/discover.rs @@ -32,7 +32,7 @@ impl Stream for DynamicServiceStream { Poll::Pending | Poll::Ready(None) => Poll::Pending, Poll::Ready(Some(change)) => match change { Change::Insert(k, endpoint) => { - let mut http = hyper::client::connect::HttpConnector::new(); + let mut http = hyper_util::client::legacy::connect::HttpConnector::new(); http.set_nodelay(endpoint.tcp_nodelay); http.set_keepalive(endpoint.tcp_keepalive); http.set_connect_timeout(endpoint.connect_timeout); diff --git a/tonic/src/transport/service/io.rs b/tonic/src/transport/service/io.rs index 2230b9b2e..9be7fa343 100644 --- a/tonic/src/transport/service/io.rs +++ b/tonic/src/transport/service/io.rs @@ -1,19 +1,23 @@ -use crate::transport::server::Connected; -use hyper::client::connect::{Connected as HyperConnected, Connection}; +use hyper::rt; use std::io; use std::io::IoSlice; use std::pin::Pin; use std::task::{Context, Poll}; + +use hyper_util::client::legacy::connect::{Connected as HyperConnected, Connection}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; #[cfg(feature = "tls")] use tokio_rustls::server::TlsStream; +use tower::util::Either; + +use crate::transport::server::Connected; pub(in crate::transport) trait Io: - AsyncRead + AsyncWrite + Send + 'static + rt::Read + rt::Write + Send + 'static { } -impl Io for T where T: AsyncRead + AsyncWrite + Send + 'static {} +impl Io for T where T: rt::Read + rt::Write + Send + 'static {} pub(crate) struct BoxedIo(Pin>); @@ -40,17 +44,17 @@ impl Connected for BoxedIo { #[derive(Copy, Clone)] pub(crate) struct NoneConnectInfo; -impl AsyncRead for BoxedIo { +impl rt::Read for BoxedIo { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, + buf: rt::ReadBufCursor<'_>, ) -> Poll> { Pin::new(&mut self.0).poll_read(cx, buf) } } -impl AsyncWrite for BoxedIo { +impl rt::Write for BoxedIo { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -67,6 +71,10 @@ impl AsyncWrite for BoxedIo { Pin::new(&mut self.0).poll_shutdown(cx) } + fn is_write_vectored(&self) -> bool { + self.0.is_write_vectored() + } + fn poll_write_vectored( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -74,10 +82,6 @@ impl AsyncWrite for BoxedIo { ) -> Poll> { Pin::new(&mut self.0).poll_write_vectored(cx, bufs) } - - fn is_write_vectored(&self) -> bool { - self.0.is_write_vectored() - } } pub(crate) enum ServerIo { @@ -86,8 +90,6 @@ pub(crate) enum ServerIo { TlsIo(Box>), } -use tower::util::Either; - #[cfg(feature = "tls")] type ServerIoConnectInfo = Either<::ConnectInfo, as Connected>::ConnectInfo>; diff --git a/tonic/src/transport/service/router.rs b/tonic/src/transport/service/router.rs index 85636c4d4..89f2fc8ff 100644 --- a/tonic/src/transport/service/router.rs +++ b/tonic/src/transport/service/router.rs @@ -1,9 +1,5 @@ -use crate::{ - body::{boxed, BoxBody}, - server::NamedService, -}; -use http::{Request, Response}; -use hyper::Body; +use crate::server::NamedService; +use axum::{extract::Request, response::Response}; use pin_project::pin_project; use std::{ convert::Infallible, @@ -12,7 +8,6 @@ use std::{ pin::Pin, task::{ready, Context, Poll}, }; -use tower::ServiceExt; use tower_service::Service; /// A [`Service`] router. @@ -31,7 +26,7 @@ impl RoutesBuilder { /// Add a new service. pub fn add_service(&mut self, svc: S) -> &mut Self where - S: Service, Response = Response, Error = Infallible> + S: Service + NamedService + Clone + Send @@ -53,7 +48,7 @@ impl Routes { /// Create a new routes with `svc` already added to it. pub fn new(svc: S) -> Self where - S: Service, Response = Response, Error = Infallible> + S: Service + NamedService + Clone + Send @@ -68,7 +63,7 @@ impl Routes { /// Add a new service. pub fn add_service(mut self, svc: S) -> Self where - S: Service, Response = Response, Error = Infallible> + S: Service + NamedService + Clone + Send @@ -76,7 +71,6 @@ impl Routes { S::Future: Send + 'static, S::Error: Into + Send, { - let svc = svc.map_response(|res| res.map(axum::body::boxed)); self.router = self .router .route_service(&format!("/{}/*rest", S::NAME), svc); @@ -103,8 +97,8 @@ async fn unimplemented() -> impl axum::response::IntoResponse { (status, headers) } -impl Service> for Routes { - type Response = Response; +impl Service for Routes { + type Response = Response; type Error = crate::Error; type Future = RoutesFuture; @@ -113,13 +107,13 @@ impl Service> for Routes { Poll::Ready(Ok(())) } - fn call(&mut self, req: Request) -> Self::Future { + fn call(&mut self, req: Request) -> Self::Future { RoutesFuture(self.router.call(req)) } } #[pin_project] -pub struct RoutesFuture(#[pin] axum::routing::future::RouteFuture); +pub struct RoutesFuture(#[pin] axum::routing::future::RouteFuture); impl fmt::Debug for RoutesFuture { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -128,11 +122,11 @@ impl fmt::Debug for RoutesFuture { } impl Future for RoutesFuture { - type Output = Result, crate::Error>; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match ready!(self.project().0.poll(cx)) { - Ok(res) => Ok(res.map(boxed)).into(), + Ok(res) => Ok(res).into(), Err(err) => match err {}, } } diff --git a/tonic/src/transport/service/tls.rs b/tonic/src/transport/service/tls.rs index f956132fb..add4be892 100644 --- a/tonic/src/transport/service/tls.rs +++ b/tonic/src/transport/service/tls.rs @@ -3,6 +3,7 @@ use crate::transport::{ server::{Connected, TlsStream}, Certificate, Identity, }; +use hyper_util::rt::TokioIo; use std::{fmt, sync::Arc}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_rustls::{ @@ -83,7 +84,7 @@ impl TlsConnector { _ => return Err(TlsError::H2NotNegotiated.into()), }; - BoxedIo::new(io) + BoxedIo::new(TokioIo::new(io)) }; Ok(tls_io)