Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

discover: Timeout stalled resolutions #401

Merged
merged 3 commits into from
Dec 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,12 @@ impl<A: OrigDstAddr> Config<A> {

// Resolves the target via the control plane and balances requests
// over all endpoints returned from the destination service.
const DISCOVER_UPDATE_BUFFER_CAPACITY: usize = 2;
const DISCOVER_UPDATE_BUFFER_CAPACITY: usize = 10;
let balancer_layer = svc::layers()
.push_spawn_ready()
.push(discover::Layer::new(
DISCOVER_UPDATE_BUFFER_CAPACITY,
router_max_idle_age,
map_endpoint::Resolve::new(endpoint::FromMetadata, resolve.clone()),
))
.push(http::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY));
Expand Down
48 changes: 42 additions & 6 deletions linkerd/proxy/discover/src/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use futures::{try_ready, Async, Future, Poll, Stream};
use linkerd2_error::{Error, Never};
use std::fmt;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, oneshot};
use tokio::timer::Delay;
use tower::discover;
use tracing_futures::Instrument;

#[derive(Clone, Debug)]
pub struct Buffer<M> {
capacity: usize,
watchdog_timeout: Duration,
inner: M,
}

Expand All @@ -20,24 +23,31 @@ pub struct Discover<K, S> {
pub struct DiscoverFuture<F, D> {
future: F,
capacity: usize,
watchdog_timeout: Duration,
_marker: std::marker::PhantomData<fn() -> D>,
}

pub struct Daemon<D: discover::Discover> {
discover: D,
disconnect_rx: oneshot::Receiver<Never>,
tx: mpsc::Sender<discover::Change<D::Key, D::Service>>,
watchdog: Option<Delay>,
watchdog_timeout: Duration,
}

#[derive(Clone, Debug)]
pub struct Lost(());

impl<M> Buffer<M> {
pub fn new<T>(capacity: usize, inner: M) -> Self
pub fn new<T>(capacity: usize, watchdog_timeout: Duration, inner: M) -> Self
where
Self: tower::Service<T>,
{
Self { capacity, inner }
Self {
capacity,
watchdog_timeout,
inner,
}
}
}

Expand All @@ -63,6 +73,7 @@ where
Self::Future {
future,
capacity: self.capacity,
watchdog_timeout: self.watchdog_timeout,
_marker: std::marker::PhantomData,
}
}
Expand All @@ -88,6 +99,8 @@ where
discover,
disconnect_rx,
tx,
watchdog_timeout: self.watchdog_timeout,
watchdog: None,
};
tokio::spawn(fut.in_current_span());

Expand All @@ -111,10 +124,33 @@ where
Ok(Async::Ready(n)) => match n {},
}

try_ready!(self
.tx
.poll_ready()
.map_err(|_| tracing::trace!("lost sender")));
// The watchdog bounds the amount of time that the send buffer stays
// full. This is designed to release the `discover` resources, i.e.
// if we expect that the receiver has leaked.
match self.tx.poll_ready() {
Ok(Async::Ready(())) => {
self.watchdog = None;
}
Err(_) => {
tracing::trace!("lost sender");
return Err(());
}
Ok(Async::NotReady) => {
let mut watchdog = self
.watchdog
.take()
.unwrap_or_else(|| Delay::new(Instant::now() + self.watchdog_timeout));
if watchdog.poll().expect("timer must not fail").is_ready() {
tracing::warn!(
timeout = ?self.watchdog_timeout,
"dropping resolution due to watchdog",
);
return Err(());
}
self.watchdog = Some(watchdog);
return Ok(Async::NotReady);
}
}

let up = try_ready!(self.discover.poll().map_err(|e| {
let e: Error = e.into();
Expand Down
7 changes: 5 additions & 2 deletions linkerd/proxy/discover/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use linkerd2_error::Error;
use linkerd2_proxy_core::Resolve;
use std::fmt;
use std::time::Duration;

pub mod buffer;
pub mod from_resolve;
Expand All @@ -15,20 +16,22 @@ use self::make_endpoint::MakeEndpoint;
#[derive(Clone, Debug)]
pub struct Layer<T, R> {
capacity: usize,
watchdog: Duration,
resolve: R,
_marker: std::marker::PhantomData<fn(T)>,
}

// === impl Layer ===

impl<T, R> Layer<T, R> {
pub fn new(capacity: usize, resolve: R) -> Self
pub fn new(capacity: usize, watchdog: Duration, resolve: R) -> Self
where
R: Resolve<T> + Clone,
R::Endpoint: fmt::Debug + Clone + PartialEq,
{
Self {
capacity,
watchdog,
resolve,
_marker: std::marker::PhantomData,
}
Expand All @@ -53,6 +56,6 @@ where
fn layer(&self, make_endpoint: M) -> Self::Service {
let make_discover =
MakeEndpoint::new(make_endpoint, FromResolve::new(self.resolve.clone()));
Buffer::new(self.capacity, make_discover)
Buffer::new(self.capacity, self.watchdog, make_discover)
}
}