Skip to content

Commit

Permalink
retry: Enforce a cap on max replay buffer size (#1043)
Browse files Browse the repository at this point in the history
This branch adds a limit to the maximum number of bytes a `ReplayBody`
will buffer as a defense-in-depth against unbounded buffering.
Currently, the retry policy should ensure that requests whose
`content-length` header is over the limit will not be wrapped with
`ReplayBody`. However, if there's a bug in the retry policy, or if a
request claims to have an acceptable `content-length` but keeps sending
data past that length, we might buffer a potentially unbounded amount of
data. This limit is intended to _ensure_ we don't buffer data over the
limit despite the `content-length`-based retry policy.

The limit is checked whenever new data is added to the buffer, and if it
is reached, the buffer is discarded and the `ReplayBody` will cease
buffering additional data. However, the body will be allowed to continue
streaming so that the _initial_ request may still complete --- but
attempting to retry it will fail, because the buffer was discarded. This
way, in the event of a request that somehow passes the limit, we won't
fail that request immediately, but instead will fall back to not
retrying it.

In order to make this change nicely, the `ReplayBody` has to be
constructed by the retry policy, rather than by the `Retry` service.
This is because the policy knows what the size limits for retries is. To
allow this, I did some refactoring. In particular, I introduced a new
trait for retry policies that's an extension of `tower::retry::Policy`,
but with the addition of a method that's called to modify the request
for a retry. This method returns an `Either`, with `Either::A`
indicating that the request should be retried, and `Either::B`
indicating that it should not; and in the retry case, the policy may
change the request's type. This allows the retry policy to be
responsible for wrapping the request in a `ReplayBody`, rather than
doing that in the `Retry` service. A nice side-effect of this change is
that the `linkerd-retry` crate no longer needs to be HTTP-specific. All
the HTTP-specific code now lives in `linkerd_app_core::retry`, with the
exception of the `ReplayBody` type, which I put in its own crate (mainly
just to allow running its tests without having to compiel all of
`app-core`).

Co-authored-by: Oliver Gould <ver@buoyant.io>
  • Loading branch information
hawkw and olix0r authored Jun 14, 2021
1 parent 59b9ad3 commit dea0b09
Show file tree
Hide file tree
Showing 16 changed files with 1,252 additions and 977 deletions.
16 changes: 13 additions & 3 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ dependencies = [
"linkerd-proxy-tcp",
"linkerd-proxy-transport",
"linkerd-reconnect",
"linkerd-retry",
"linkerd-service-profiles",
"linkerd-stack",
"linkerd-stack-metrics",
Expand Down Expand Up @@ -1019,13 +1020,11 @@ dependencies = [
"http-body",
"hyper",
"linkerd-error",
"linkerd-http-box",
"linkerd-stack",
"linkerd-tracing",
"parking_lot",
"pin-project",
"thiserror",
"tokio",
"tower",
"tracing",
]

Expand Down Expand Up @@ -1298,6 +1297,17 @@ dependencies = [
"tracing",
]

[[package]]
name = "linkerd-retry"
version = "0.1.0"
dependencies = [
"linkerd-error",
"linkerd-stack",
"pin-project",
"tower",
"tracing",
]

[[package]]
name = "linkerd-service-profiles"
version = "0.1.0"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ members = [
"linkerd/proxy/tcp",
"linkerd/proxy/transport",
"linkerd/reconnect",
"linkerd/retry",
"linkerd/service-profiles",
"linkerd/signal",
"linkerd/stack",
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ linkerd-proxy-tap = { path = "../../proxy/tap" }
linkerd-proxy-tcp = { path = "../../proxy/tcp" }
linkerd-proxy-transport = { path = "../../proxy/transport" }
linkerd-reconnect = { path = "../../reconnect" }
linkerd-retry = { path = "../../retry" }
linkerd-timeout = { path = "../../timeout" }
linkerd-tracing = { path = "../../tracing" }
linkerd-service-profiles = { path = "../../service-profiles" }
Expand Down
102 changes: 61 additions & 41 deletions linkerd/app/core/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,50 @@ use super::http_metrics::retries::Handle;
use super::metrics::HttpRouteRetry;
use crate::profiles;
use futures::future;
use linkerd_error::Error;
use linkerd_http_classify::{Classify, ClassifyEos, ClassifyResponse};
use linkerd_stack::Param;
use linkerd_http_retry::ReplayBody;
use linkerd_retry as retry;
use linkerd_stack::{layer, Either, Param};
use std::sync::Arc;
use tower::retry::budget::Budget;

pub use linkerd_http_retry::*;

pub fn layer(metrics: HttpRouteRetry) -> NewRetryLayer<NewRetry> {
NewRetryLayer::new(NewRetry::new(metrics))
pub fn layer<N>(
metrics: HttpRouteRetry,
) -> impl layer::Layer<N, Service = retry::NewRetry<NewRetryPolicy, N>> + Clone {
retry::NewRetry::<_, N>::layer(NewRetryPolicy::new(metrics))
}

#[derive(Clone, Debug)]
pub struct NewRetry {
pub struct NewRetryPolicy {
metrics: HttpRouteRetry,
}

#[derive(Clone, Debug)]
pub struct Retry {
pub struct RetryPolicy {
metrics: Handle,
budget: Arc<Budget>,
budget: Arc<retry::Budget>,
response_classes: profiles::http::ResponseClasses,
}

/// Allow buffering requests up to 64 kb
const MAX_BUFFERED_BYTES: usize = 64 * 1024;

// === impl NewRetry ===
// === impl NewRetryPolicy ===

impl NewRetry {
impl NewRetryPolicy {
pub fn new(metrics: HttpRouteRetry) -> Self {
Self { metrics }
}
}

impl NewPolicy<Route> for NewRetry {
type Policy = Retry;
impl retry::NewPolicy<Route> for NewRetryPolicy {
type Policy = RetryPolicy;

fn new_policy(&self, route: &Route) -> Option<Self::Policy> {
let retries = route.route.retries().cloned()?;

let metrics = self.metrics.get_handle(route.param());
Some(Retry {
Some(RetryPolicy {
metrics,
budget: retries.budget().clone(),
response_classes: route.route.response_classes().clone(),
Expand All @@ -55,7 +57,38 @@ impl NewPolicy<Route> for NewRetry {

// === impl Retry ===

impl<A, B, E> Policy<http::Request<A>, http::Response<B>, E> for Retry
impl RetryPolicy {
fn can_retry<A: http_body::Body>(&self, req: &http::Request<A>) -> bool {
let content_length = |req: &http::Request<_>| {
req.headers()
.get(http::header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok()?.parse::<usize>().ok())
};

// Requests without bodies can always be retried, as we will not need to
// buffer the body. If the request *does* have a body, retry it if and
// only if the request contains a `content-length` header and the
// content length is >= 64 kb.
let has_body = !req.body().is_end_stream();
if has_body && content_length(&req).unwrap_or(usize::MAX) > MAX_BUFFERED_BYTES {
tracing::trace!(
req.has_body = has_body,
req.content_length = ?content_length(&req),
"not retryable",
);
return false;
}

tracing::trace!(
req.has_body = has_body,
req.content_length = ?content_length(&req),
"retryable",
);
true
}
}

impl<A, B, E> retry::Policy<http::Request<A>, http::Response<B>, E> for RetryPolicy
where
A: http_body::Body + Clone,
{
Expand Down Expand Up @@ -109,33 +142,20 @@ where
}
}

impl<A: http_body::Body> CanRetry<A> for Retry {
fn can_retry(&self, req: &http::Request<A>) -> bool {
let content_length = |req: &http::Request<_>| {
req.headers()
.get(http::header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok()?.parse::<usize>().ok())
};
impl<A, B, E> retry::PrepareRequest<http::Request<A>, http::Response<B>, E> for RetryPolicy
where
A: http_body::Body + Unpin,
A::Error: Into<Error>,
{
type RetryRequest = http::Request<ReplayBody<A>>;

// Requests without bodies can always be retried, as we will not need to
// buffer the body. If the request *does* have a body, retry it if and
// only if the request contains a `content-length` header and the
// content length is >= 64 kb.
let has_body = !req.body().is_end_stream();
if has_body && content_length(&req).unwrap_or(usize::MAX) > MAX_BUFFERED_BYTES {
tracing::trace!(
req.has_body = has_body,
req.content_length = ?content_length(&req),
"not retryable",
);
return false;
fn prepare_request(
&self,
req: http::Request<A>,
) -> Either<Self::RetryRequest, http::Request<A>> {
if self.can_retry(&req) {
return Either::A(req.map(|body| ReplayBody::new(body, MAX_BUFFERED_BYTES)));
}

tracing::trace!(
req.has_body = has_body,
req.content_length = ?content_length(&req),
"retryable",
);
true
Either::B(req)
}
}
16 changes: 13 additions & 3 deletions linkerd/app/inbound/fuzz/Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ dependencies = [
"linkerd-proxy-tcp",
"linkerd-proxy-transport",
"linkerd-reconnect",
"linkerd-retry",
"linkerd-service-profiles",
"linkerd-stack",
"linkerd-stack-metrics",
Expand Down Expand Up @@ -863,11 +864,9 @@ dependencies = [
"http",
"http-body",
"linkerd-error",
"linkerd-http-box",
"linkerd-stack",
"parking_lot",
"pin-project",
"tower",
"thiserror",
"tracing",
]

Expand Down Expand Up @@ -1127,6 +1126,17 @@ dependencies = [
"tracing",
]

[[package]]
name = "linkerd-retry"
version = "0.1.0"
dependencies = [
"linkerd-error",
"linkerd-stack",
"pin-project",
"tower",
"tracing",
]

[[package]]
name = "linkerd-service-profiles"
version = "0.1.0"
Expand Down
8 changes: 7 additions & 1 deletion linkerd/app/outbound/src/http/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ impl<E> Outbound<E> {
.http_route_actual
.to_layer::<classify::Response, _>(),
)
// Depending on whether or not the request can be retried,
// it may have one of two `Body` types. This layer unifies
// any `Body` type into `BoxBody` so that the rest of the
// stack doesn't have to implement `Service` for requests
// with both body types.
.push_on_response(http::BoxRequest::erased())
// Sets an optional retry policy.
.push(retry::layer(rt.metrics.http_route_retry.clone()))
// Sets an optional request timeout.
Expand All @@ -134,7 +140,7 @@ impl<E> Outbound<E> {
))
// Strips headers that may be set by this proxy and add an outbound
// canonical-dst-header. The response body is boxed unify the profile
// stack's response type. withthat of to endpoint stack.
// stack's response type with that of to endpoint stack.
.push(http::NewHeaderFromTarget::<CanonicalDstHeader, _>::layer())
.push_on_response(svc::layers().push(http::BoxResponse::layer()))
.instrument(|l: &Logical| debug_span!("logical", dst = %l.logical_addr))
Expand Down
84 changes: 84 additions & 0 deletions linkerd/http-box/src/erase_request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
//! A middleware that boxes HTTP request bodies.
use crate::BoxBody;
use linkerd_error::Error;
use linkerd_stack::{layer, Proxy};
use std::task::{Context, Poll};

/// Boxes request bodies, erasing the original type.
///
/// This is *very* similar to the [`BoxRequest`] middleware. However, that
/// middleware is generic over a specific body type that is erased. A given
/// instance of `EraseRequest` can only erase the type of one particular `Body`
/// type, while this middleware will erase bodies of *any* type.
///
/// An astute reader may ask, why not simply replace `BoxRequest` with this
/// middleware, if it is a more flexible superset of the same behavior? The
/// answer is that in many cases, the use of this more flexible middleware
/// renders request body types uninferrable. If all `BoxRequest`s in the stack
/// are replaced with `EraseRequest`, suddenly a great deal of
/// `check_new_service` and `check_service` checks will require explicit
/// annotations for the pre-erasure body type. This is not great.
///
/// Instead, this type is implemented separately and should be used only when a
/// stack must be able to implement `Service<http::Request<B>>` for *multiple
/// distinct values of `B`*.
#[derive(Debug)]
pub struct EraseRequest<S>(S);

impl<S> EraseRequest<S> {
pub fn new(inner: S) -> Self {
Self(inner)
}

pub fn layer() -> impl layer::Layer<S, Service = Self> + Clone + Copy {
layer::mk(Self::new)
}
}

impl<S: Clone> Clone for EraseRequest<S> {
fn clone(&self) -> Self {
EraseRequest(self.0.clone())
}
}

impl<S, B> tower::Service<http::Request<B>> for EraseRequest<S>
where
B: http_body::Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Error>,
S: tower::Service<http::Request<BoxBody>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx)
}

#[inline]
fn call(&mut self, req: http::Request<B>) -> Self::Future {
self.0.call(req.map(BoxBody::new))
}
}

impl<S, B, P> Proxy<http::Request<B>, S> for EraseRequest<P>
where
B: http_body::Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Error>,
S: tower::Service<P::Request>,
P: Proxy<http::Request<BoxBody>, S>,
{
type Request = P::Request;
type Response = P::Response;
type Error = P::Error;
type Future = P::Future;

#[inline]
fn proxy(&self, inner: &mut S, req: http::Request<B>) -> Self::Future {
self.0.proxy(inner, req.map(BoxBody::new))
}
}
2 changes: 2 additions & 0 deletions linkerd/http-box/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
#![allow(clippy::inconsistent_struct_constructor)]

mod body;
mod erase_request;
mod request;
mod response;

pub use self::{
body::{BoxBody, Data},
erase_request::EraseRequest,
request::BoxRequest,
response::BoxResponse,
};
Loading

0 comments on commit dea0b09

Please sign in to comment.