Skip to content

Commit

Permalink
Handle GRPC body errors (#493)
Browse files Browse the repository at this point in the history
When gRPC implementations encounter an underlying transport error, they
surface the error via an `UNAVAILABLE` gRPC status code. Linkerd breaks this
behavior by converting these errors to stream resets, which instead appear as
an `UNKNOWN` error to applications.

This change modifies the proxy's error handling logic to set the the correct gRPC
status code when a stream fails with a hyper error, which should work more
seamlessly with gRPC error handling. This status is only set when the request's
content-type indicates the stream is sending gRPC messages.
  • Loading branch information
zaharidichev authored Apr 30, 2020
1 parent c72c5f5 commit 610309e
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 59 deletions.
172 changes: 124 additions & 48 deletions linkerd/app/core/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::proxy::identity;
use futures::{Async, Poll};
use http::{header::HeaderValue, StatusCode};
use linkerd2_buffer as buffer;
use linkerd2_error::Error;
Expand All @@ -9,10 +10,10 @@ use linkerd2_lock as lock;
use linkerd2_proxy_http::HasH2Reason;
use linkerd2_timeout::{error::ResponseTimeout, FailFastError};
use tower_grpc::{self as grpc, Code};
use tracing::debug;
use tracing::{debug, warn};

pub fn layer<B: Default>() -> respond::RespondLayer<NewRespond<B>> {
respond::RespondLayer::new(NewRespond(std::marker::PhantomData))
pub fn layer() -> respond::RespondLayer<NewRespond> {
respond::RespondLayer::new(NewRespond(()))
}

#[derive(Clone, Default)]
Expand All @@ -35,20 +36,82 @@ pub enum Reason {
Unexpected,
}

#[derive(Debug)]
pub struct NewRespond<B>(std::marker::PhantomData<fn() -> B>);
#[derive(Copy, Clone, Debug)]
pub struct NewRespond(());

#[derive(Copy, Clone, Debug)]
pub enum Respond<B> {
Http1(http::Version, std::marker::PhantomData<fn() -> B>),
pub enum Respond {
Http1(http::Version),
Http2 { is_grpc: bool },
}

impl<A, B: Default> respond::NewRespond<http::Request<A>> for NewRespond<B> {
type Response = http::Response<B>;
type Respond = Respond<B>;
pub enum ResponseBody<B> {
NonGrpc(B),
Grpc {
inner: B,
trailers: Option<http::HeaderMap>,
},
}

impl<B: hyper::body::Payload> hyper::body::Payload for ResponseBody<B>
where
B::Error: Into<Error>,
{
type Data = B::Data;
type Error = B::Error;

fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
match self {
Self::NonGrpc(inner) => inner.poll_data(),
Self::Grpc { inner, trailers } => {
// should not be calling poll_data if we have set trailers derived from an error
assert!(trailers.is_none());
match inner.poll_data() {
Err(error) => {
let error = error.into();
let mut error_trailers = http::HeaderMap::new();
let code = set_grpc_status(&error, &mut error_trailers);
debug!(%error, grpc.status = ?code, "Handling gRPC stream failure");
*trailers = Some(error_trailers);
Ok(Async::Ready(None))
}
data => data,
}
}
}
}

fn poll_trailers(&mut self) -> futures::Poll<Option<http::HeaderMap>, Self::Error> {
match self {
Self::NonGrpc(inner) => inner.poll_trailers(),
Self::Grpc { inner, trailers } => match trailers.take() {
Some(t) => Ok(Async::Ready(Some(t))),
None => inner.poll_trailers(),
},
}
}

fn is_end_stream(&self) -> bool {
match self {
Self::NonGrpc(inner) => inner.is_end_stream(),
Self::Grpc { inner, trailers } => trailers.is_none() && inner.is_end_stream(),
}
}
}

fn new_respond(&self, req: &http::Request<A>) -> Self::Respond {
impl<B: Default + hyper::body::Payload> Default for ResponseBody<B> {
fn default() -> ResponseBody<B> {
ResponseBody::NonGrpc(B::default())
}
}

impl<ReqB, RspB: Default + hyper::body::Payload>
respond::NewRespond<http::Request<ReqB>, http::Response<RspB>> for NewRespond
{
type Response = http::Response<ResponseBody<RspB>>;
type Respond = Respond;

fn new_respond(&self, req: &http::Request<ReqB>) -> Self::Respond {
match req.version() {
http::Version::HTTP_2 => {
let is_grpc = req
Expand All @@ -58,54 +121,62 @@ impl<A, B: Default> respond::NewRespond<http::Request<A>> for NewRespond<B> {
.unwrap_or(false);
Respond::Http2 { is_grpc }
}
version => Respond::Http1(version, self.0),
version => Respond::Http1(version),
}
}
}

impl<B> Clone for NewRespond<B> {
fn clone(&self) -> Self {
NewRespond(self.0)
}
}
impl<RspB: Default + hyper::body::Payload> respond::Respond<http::Response<RspB>> for Respond {
type Response = http::Response<ResponseBody<RspB>>;

impl<B: Default> respond::Respond for Respond<B> {
type Response = http::Response<B>;
fn respond(
&self,
reseponse: Result<http::Response<RspB>, Error>,
) -> Result<Self::Response, Error> {
match reseponse {
Ok(response) => Ok(response.map(|b| match *self {
Respond::Http2 { is_grpc } if is_grpc == true => ResponseBody::Grpc {
inner: b,
trailers: None,
},
_ => ResponseBody::NonGrpc(b),
})),
Err(error) => {
warn!("Failed to proxy request: {}", error);

fn respond(&self, error: Error) -> Result<Self::Response, Error> {
tracing::warn!("Failed to proxy request: {}", error);
if let Respond::Http2 { is_grpc } = self {
if let Some(reset) = error.h2_reason() {
debug!(%reset, "Propagating HTTP2 reset");
return Err(error);
}

if let Respond::Http2 { is_grpc } = self {
if let Some(reset) = error.h2_reason() {
debug!(%reset, "Propagating HTTP2 reset");
return Err(error);
}
if *is_grpc {
let mut rsp = http::Response::builder()
.version(http::Version::HTTP_2)
.header(http::header::CONTENT_LENGTH, "0")
.body(ResponseBody::default())
.expect("app::errors response is valid");
let code = set_grpc_status(&error, rsp.headers_mut());
debug!(?code, "Handling error with gRPC status");
return Ok(rsp);
}
}

if *is_grpc {
let mut rsp = http::Response::builder()
.version(http::Version::HTTP_2)
let version = match self {
Respond::Http1(ref version) => version.clone(),
Respond::Http2 { .. } => http::Version::HTTP_2,
};

let status = http_status(&error);
debug!(%status, ?version, "Handling error with HTTP response");
Ok(http::Response::builder()
.version(version)
.status(status)
.header(http::header::CONTENT_LENGTH, "0")
.body(B::default())
.expect("app::errors response is valid");
let code = set_grpc_status(&error, rsp.headers_mut());
debug!(?code, "Handling error with gRPC status");
return Ok(rsp);
.body(ResponseBody::default())
.expect("error response must be valid"))
}
}

let version = match self {
Respond::Http1(ref version, _) => version.clone(),
Respond::Http2 { .. } => http::Version::HTTP_2,
};

let status = http_status(&error);
debug!(%status, ?version, "Handling error with HTTP response");
Ok(http::Response::builder()
.version(version)
.status(status)
.header(http::header::CONTENT_LENGTH, "0")
.body(B::default())
.expect("error response must be valid"))
}
}

Expand Down Expand Up @@ -159,6 +230,11 @@ fn set_grpc_status(error: &Error, headers: &mut http::HeaderMap) -> grpc::Code {
headers.insert(GRPC_MESSAGE, msg);
}
code
} else if error.is::<hyper::error::Error>() {
let code = Code::Unavailable;
headers.insert(GRPC_STATUS, code_header(code));
headers.insert(GRPC_MESSAGE, HeaderValue::from_static("connection closed"));
code
} else if let Some(e) = error.downcast_ref::<lock::error::ServiceError>() {
set_grpc_status(e.inner(), headers)
} else if let Some(e) = error.downcast_ref::<buffer::error::ServiceError>() {
Expand Down
22 changes: 11 additions & 11 deletions linkerd/error-respond/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@ use futures::{Async, Future, Poll};
use linkerd2_error::Error;

/// Creates an error responder for a request.
pub trait NewRespond<Req, E = Error> {
pub trait NewRespond<Req, Rsp, E = Error> {
type Response;
type Respond: Respond<E, Response = Self::Response>;
type Respond: Respond<Rsp, E, Response = Self::Response>;

fn new_respond(&self, req: &Req) -> Self::Respond;
}

/// Creates a response for an error.
pub trait Respond<E = Error> {
pub trait Respond<Rsp, E = Error> {
type Response;

fn respond(&self, error: E) -> Result<Self::Response, E>;
fn respond(&self, response: Result<Rsp, E>) -> Result<Self::Response, E>;
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -57,9 +56,9 @@ impl<N: Clone, S> tower::layer::Layer<S> for RespondLayer<N> {
impl<Req, N, S> tower::Service<Req> for RespondService<N, S>
where
S: tower::Service<Req>,
N: NewRespond<Req, S::Error, Response = S::Response>,
N: NewRespond<Req, S::Response, S::Error>,
{
type Response = S::Response;
type Response = N::Response;
type Error = S::Error;
type Future = RespondFuture<N::Respond, S::Future>;

Expand All @@ -77,15 +76,16 @@ where
impl<R, F> Future for RespondFuture<R, F>
where
F: Future,
R: Respond<F::Error, Response = F::Item>,
R: Respond<F::Item, F::Error>,
{
type Item = F::Item;
type Item = R::Response;
type Error = F::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner.poll() {
Ok(ok) => Ok(ok),
Err(err) => self.respond.respond(err).map(Async::Ready),
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(rsp)) => self.respond.respond(Ok(rsp)).map(Async::Ready),
Err(err) => self.respond.respond(Err(err)).map(Async::Ready),
}
}
}

0 comments on commit 610309e

Please sign in to comment.