Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to hyper 1.0 and axum 0.7 #1595

Closed
wants to merge 15 commits into from
48 changes: 34 additions & 14 deletions tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ gzip = ["dep:flate2"]
zstd = ["dep:zstd"]
default = ["transport", "codegen", "prost"]
prost = ["dep:prost"]
tls = ["dep:rustls-pemfile", "transport", "dep:tokio-rustls", "dep:rustls", "tokio/rt", "tokio/macros"]
tls = [
"dep:rustls-pemfile",
"transport",
"dep:tokio-rustls",
"dep:rustls",
"tokio/rt",
"tokio/macros",
]
tls-roots = ["tls-roots-common", "dep:rustls-native-certs"]
tls-roots-common = ["tls"]
tls-webpki-roots = ["tls-roots-common", "dep:webpki-roots"]
Expand All @@ -52,29 +59,42 @@ channel = []
[dependencies]
base64 = "0.21"
bytes = "1.0"
http = "0.2"
http = "1.0"
tracing = "0.1"

tokio = "1.0.1"
http-body = "0.4.4"
http-body = "1.0"
http-body-util = "0.1"
percent-encoding = "2.1"
pin-project = "1.0.11"
tower-layer = "0.3"
tower-service = "0.3"

# prost
prost = {version = "0.12", default-features = false, features = ["std"], optional = true}
prost = { version = "0.12", default-features = false, features = [
"std",
], optional = true }

# codegen
async-trait = {version = "0.1.13", optional = true}
async-trait = { version = "0.1.13", optional = true }

# transport
h2 = {version = "0.3.17", optional = true}
hyper = {version = "0.14.26", features = ["full"], optional = true}
hyper-timeout = {version = "0.4", optional = true}
tokio-stream = "0.1"
tower = {version = "0.4.7", default-features = false, features = ["balance", "buffer", "discover", "limit", "load", "make", "timeout", "util"], optional = true}
axum = {version = "0.6.9", default_features = false, optional = true}
h2 = { version = "0.4", optional = true }
hyper = { version = "1.0", features = ["full"], optional = true }
hyper-util = { version = "0.1", features = ["full"] }
hyper-timeout = { version = "0.5", optional = true }
tokio-stream = { version = "0.1", features = ["net"] }
tower = { version = "0.4.7", default-features = false, features = [
"balance",
"buffer",
"discover",
"limit",
"load",
"make",
"timeout",
"util",
], optional = true }
axum = { version = "0.7", default_features = false, optional = true }

# rustls
async-stream = { version = "0.3", optional = true }
Expand All @@ -85,7 +105,7 @@ rustls = { version = "0.21.7", optional = true }
webpki-roots = { version = "0.25.0", optional = true }

# compression
flate2 = {version = "1.0", optional = true}
flate2 = { version = "1.0", optional = true }
zstd = { version = "0.12.3", optional = true }

[dev-dependencies]
Expand All @@ -94,8 +114,8 @@ quickcheck = "1.0"
quickcheck_macros = "1.0"
rand = "0.8"
static_assertions = "1.0"
tokio = {version = "1.0", features = ["rt", "macros"]}
tower = {version = "0.4.7", features = ["full"]}
tokio = { version = "1.0", features = ["rt", "macros"] }
tower = { version = "0.4.7", features = ["full"] }

[package.metadata.docs.rs]
all-features = true
Expand Down
29 changes: 16 additions & 13 deletions tonic/benches/decode.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use bencher::{benchmark_group, benchmark_main, Bencher};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use http_body::Body;
use std::{
fmt::{Error, Formatter},
pin::Pin,
task::{Context, Poll},
};

use bencher::{benchmark_group, benchmark_main, Bencher};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use http_body::{Body, Frame, SizeHint};

use tonic::{codec::DecodeBuf, codec::Decoder, Status, Streaming};

macro_rules! bench {
Expand Down Expand Up @@ -58,23 +60,24 @@ impl Body for MockBody {
type Data = Bytes;
type Error = Status;

fn poll_data(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
if self.data.has_remaining() {
let split = std::cmp::min(self.chunk_size, self.data.remaining());
Poll::Ready(Some(Ok(self.data.split_to(split))))
Poll::Ready(Some(Ok(Frame::data(self.data.split_to(split)))))
} else {
Poll::Ready(None)
}
}

fn poll_trailers(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
fn is_end_stream(&self) -> bool {
!self.data.is_empty()
}

fn size_hint(&self) -> SizeHint {
SizeHint::with_exact(self.data.len() as u64)
}
}

Expand Down
6 changes: 3 additions & 3 deletions tonic/src/body.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! HTTP specific body utilities.

use http_body::Body;
use http_body_util::BodyExt;

/// A type erased HTTP body used for tonic services.
pub type BoxBody = http_body::combinators::UnsyncBoxBody<bytes::Bytes, crate::Status>;
pub type BoxBody = http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, crate::Status>;

/// Convert a [`http_body::Body`] into a [`BoxBody`].
pub(crate) fn boxed<B>(body: B) -> BoxBody
Expand All @@ -16,7 +16,7 @@ where

/// Create an empty `BoxBody`
pub fn empty_body() -> BoxBody {
http_body::Empty::new()
http_body_util::Empty::new()
.map_err(|err| match err {})
.boxed_unsync()
}
7 changes: 5 additions & 2 deletions tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use bytes::{Buf, BufMut, BytesMut};
use http::StatusCode;
use http_body::Body;
use http_body_util::BodyExt;
use std::{
fmt, future,
pin::Pin,
Expand Down Expand Up @@ -122,7 +123,9 @@
decoder: Box::new(decoder),
inner: StreamingInner {
body: body
.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()))
.map_frame(|mut frame| {
frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()))
})
.map_err(|err| Status::map_error(err.into()))
.boxed_unsync(),
state: State::ReadHeader,
Expand Down Expand Up @@ -231,7 +234,7 @@

// Returns Some(()) if data was found or None if the loop in `poll_next` should break
fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, Status>> {
let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) {
let chunk = match ready!(Pin::new(&mut self.body).poll_frame(cx)) {
Some(Ok(d)) => Some(d),
Some(Err(status)) => {
if self.direction == Direction::Request && status.code() == Code::Cancelled {
Expand All @@ -246,7 +249,7 @@
};

Poll::Ready(if let Some(data) = chunk {
self.buf.put(data);

Check failure on line 252 in tonic/src/codec/decode.rs

View workflow job for this annotation

GitHub Actions / check (ubuntu-latest)

the trait bound `http_body::Frame<bytes::Bytes>: Buf` is not satisfied

Check failure on line 252 in tonic/src/codec/decode.rs

View workflow job for this annotation

GitHub Actions / Interop Tests (ubuntu-latest)

the trait bound `http_body::Frame<bytes::Bytes>: Buf` is not satisfied

Check failure on line 252 in tonic/src/codec/decode.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

the trait bound `http_body::Frame<bytes::Bytes>: Buf` is not satisfied

Check failure on line 252 in tonic/src/codec/decode.rs

View workflow job for this annotation

GitHub Actions / Check MSRV

the trait bound `http_body::Frame<bytes::Bytes>: Buf` is not satisfied

Check failure on line 252 in tonic/src/codec/decode.rs

View workflow job for this annotation

GitHub Actions / Check MSRV

the trait bound `http_body::Frame<bytes::Bytes>: bytes::Buf` is not satisfied
Ok(Some(()))
} else {
// FIXME: improve buf usage.
Expand All @@ -264,7 +267,7 @@

fn poll_response(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Status>> {
if let Direction::Response(status) = self.direction {
match ready!(Pin::new(&mut self.body).poll_trailers(cx)) {

Check failure on line 270 in tonic/src/codec/decode.rs

View workflow job for this annotation

GitHub Actions / check (ubuntu-latest)

no method named `poll_trailers` found for struct `Pin<&mut UnsyncBoxBody<bytes::Bytes, Status>>` in the current scope

Check failure on line 270 in tonic/src/codec/decode.rs

View workflow job for this annotation

GitHub Actions / Interop Tests (ubuntu-latest)

no method named `poll_trailers` found for struct `Pin<&mut UnsyncBoxBody<bytes::Bytes, Status>>` in the current scope

Check failure on line 270 in tonic/src/codec/decode.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

no method named `poll_trailers` found for struct `Pin<&mut UnsyncBoxBody<bytes::Bytes, status::Status>>` in the current scope
Ok(trailer) => {
if let Err(e) = crate::status::infer_grpc_status(trailer.as_ref(), status) {
if let Some(e) = e {
Expand Down Expand Up @@ -356,7 +359,7 @@

// Trailers were not caught during poll_next and thus lets poll for
// them manually.
let map = future::poll_fn(|cx| Pin::new(&mut self.inner.body).poll_trailers(cx))

Check failure on line 362 in tonic/src/codec/decode.rs

View workflow job for this annotation

GitHub Actions / check (ubuntu-latest)

no method named `poll_trailers` found for struct `Pin<&mut UnsyncBoxBody<bytes::Bytes, Status>>` in the current scope

Check failure on line 362 in tonic/src/codec/decode.rs

View workflow job for this annotation

GitHub Actions / Interop Tests (ubuntu-latest)

no method named `poll_trailers` found for struct `Pin<&mut UnsyncBoxBody<bytes::Bytes, Status>>` in the current scope

Check failure on line 362 in tonic/src/codec/decode.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

no method named `poll_trailers` found for struct `Pin<&mut UnsyncBoxBody<bytes::Bytes, status::Status>>` in the current scope

Check failure on line 362 in tonic/src/codec/decode.rs

View workflow job for this annotation

GitHub Actions / Check MSRV

no method named `poll_trailers` found for struct `Pin<&mut UnsyncBoxBody<bytes::Bytes, status::Status>>` in the current scope
.await
.map_err(|e| Status::from_error(Box::new(e)));

Expand Down
19 changes: 6 additions & 13 deletions tonic/src/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::{EncodeBuf, Encoder, DEFAULT_MAX_SEND_MESSAGE_SIZE, HEADER_SIZE};
use crate::{Code, Status};
use bytes::{BufMut, Bytes, BytesMut};
use http::HeaderMap;
use http_body::Body;
use http_body::{Body, Frame};
use pin_project::pin_project;
use std::{
pin::Pin,
Expand Down Expand Up @@ -315,17 +315,13 @@ where
type Data = Bytes;
type Error = Status;

fn is_end_stream(&self) -> bool {
self.state.is_end_stream
}

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let self_proj = self.project();
match ready!(self_proj.inner.poll_next(cx)) {
Some(Ok(d)) => Some(Ok(d)).into(),
Some(Ok(d)) => Some(Ok(Frame::data(d))).into(),
Some(Err(status)) => match self_proj.state.role {
Role::Client => Some(Err(status)).into(),
Role::Server => {
Expand All @@ -337,10 +333,7 @@ where
}
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Status>> {
Poll::Ready(self.project().state.trailers())
fn is_end_stream(&self) -> bool {
self.state.is_end_stream
}
}
13 changes: 3 additions & 10 deletions tonic/src/codec/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ mod tests {
mod body {
use crate::Status;
use bytes::Bytes;
use http_body::Body;
use http_body::{Body, Frame};
use std::{
pin::Pin,
task::{Context, Poll},
Expand Down Expand Up @@ -299,10 +299,10 @@ mod tests {
type Data = Bytes;
type Error = Status;

fn poll_data(
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
// every other call to poll_data returns data
let should_send = self.count % 2 == 0;
let data_len = self.data.len();
Expand All @@ -325,13 +325,6 @@ mod tests {
Poll::Ready(None)
}
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}
}
}
2 changes: 1 addition & 1 deletion tonic/src/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl Extensions {
/// If a extension of this type already existed, it will
/// be returned.
#[inline]
pub fn insert<T: Send + Sync + 'static>(&mut self, val: T) -> Option<T> {
pub fn insert<T: Send + Sync + Clone + 'static>(&mut self, val: T) -> Option<T> {
self.inner.insert(val)
}

Expand Down
20 changes: 7 additions & 13 deletions tonic/src/service/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ where
mod tests {
#[allow(unused_imports)]
use super::*;
use http::header::HeaderMap;
use http_body::Frame;
use http_body_util::Empty;
use std::{
pin::Pin,
task::{Context, Poll},
Expand All @@ -246,19 +247,12 @@ mod tests {
type Data = Bytes;
type Error = Status;

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Poll::Ready(None)
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}

#[tokio::test]
Expand Down Expand Up @@ -318,17 +312,17 @@ mod tests {

#[tokio::test]
async fn doesnt_change_http_method() {
let svc = tower::service_fn(|request: http::Request<hyper::Body>| async move {
let svc = tower::service_fn(|request: http::Request<Empty<()>>| async move {
assert_eq!(request.method(), http::Method::OPTIONS);

Ok::<_, hyper::Error>(hyper::Response::new(hyper::Body::empty()))
Ok::<_, hyper::Error>(hyper::Response::new(Empty::new()))
});

let svc = InterceptedService::new(svc, Ok);

let request = http::Request::builder()
.method(http::Method::OPTIONS)
.body(hyper::Body::empty())
.body(Empty::new())
.unwrap();

svc.oneshot(request).await.unwrap();
Expand Down
8 changes: 1 addition & 7 deletions tonic/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,13 +412,7 @@ impl Status {
// > status. Note that the frequency of PINGs is highly dependent on the network
// > environment, implementations are free to adjust PING frequency based on network and
// > application requirements, which is why it's mapped to unavailable here.
//
// Likewise, if we are unable to connect to the server, map this to UNAVAILABLE. This is
// consistent with the behavior of a C++ gRPC client when the server is not running, and
// matches the spec of:
// > The service is currently unavailable. This is most likely a transient condition that
// > can be corrected if retried with a backoff.
if err.is_timeout() || err.is_connect() {
if err.is_timeout() {
return Some(Status::unavailable(err.to_string()));
}

Expand Down
16 changes: 8 additions & 8 deletions tonic/src/transport/channel/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use crate::transport::service::TlsConnector;
use crate::transport::{service::SharedExec, Error, Executor};
use bytes::Bytes;
use http::{uri::Uri, HeaderValue};
use hyper::rt;
use std::{fmt, future::Future, pin::Pin, str::FromStr, time::Duration};
use tower::make::MakeConnection;
// use crate::transport::E
use tower_service::Service;

/// Channel builder.
///
Expand Down Expand Up @@ -313,7 +313,7 @@ impl Endpoint {

/// Create a channel from this config.
pub async fn connect(&self) -> Result<Channel, Error> {
let mut http = hyper::client::connect::HttpConnector::new();
let mut http = hyper_util::client::legacy::connect::HttpConnector::new();
http.enforce_http(false);
http.set_nodelay(self.tcp_nodelay);
http.set_keepalive(self.tcp_keepalive);
Expand All @@ -334,7 +334,7 @@ impl Endpoint {
/// The channel returned by this method does not attempt to connect to the endpoint until first
/// use.
pub fn connect_lazy(&self) -> Channel {
let mut http = hyper::client::connect::HttpConnector::new();
let mut http = hyper_util::client::legacy::connect::HttpConnector::new();
http.enforce_http(false);
http.set_nodelay(self.tcp_nodelay);
http.set_keepalive(self.tcp_keepalive);
Expand All @@ -359,8 +359,8 @@ impl Endpoint {
/// The [`connect_timeout`](Endpoint::connect_timeout) will still be applied.
pub async fn connect_with_connector<C>(&self, connector: C) -> Result<Channel, Error>
where
C: MakeConnection<Uri> + Send + 'static,
C::Connection: Unpin + Send + 'static,
C: Service<Uri> + Send + 'static,
C::Response: rt::Read + rt::Write + Send + Unpin + 'static,
C::Future: Send + 'static,
crate::Error: From<C::Error> + Send + 'static,
{
Expand All @@ -384,8 +384,8 @@ impl Endpoint {
/// uses a Unix socket transport.
pub fn connect_with_connector_lazy<C>(&self, connector: C) -> Channel
where
C: MakeConnection<Uri> + Send + 'static,
C::Connection: Unpin + Send + 'static,
C: Service<Uri> + Send + 'static,
C::Response: rt::Read + rt::Write + Send + Unpin + 'static,
C::Future: Send + 'static,
crate::Error: From<C::Error> + Send + 'static,
{
Expand Down
Loading
Loading