Skip to content

Commit

Permalink
Remove redundant reconnect logic (linkerd#87)
Browse files Browse the repository at this point in the history
`bind::BoundService` wraps a `Reconnect` service and handles its Connect
errors. However, `BoundService` exposes `Reconnect`'s Error type to
callers even though these errors can never be returned.

Furthermore, `Reconnect` is allowed be polled after returning an error,
triggering the inner service to be rebuilt. We needlessly duplicate this
logic in `BoundService`.

Before splitting this file up into smaller chunks, let's update
`BoundService` to more narrowly adhere to `Reconnect`s API:

- Only the inner error type is returned. `unreachable!` assertions
  have been made where error variants cannot be returned.
- Do not "rebind" the stack explicitly. Instead, let `Reconnect` do
  this.
- Now BoundService::call may panic if invoked before poll_ready. It's a
  programming error, since `Reconnect` requires that `poll_ready` be
  called first.
  • Loading branch information
olix0r authored Aug 31, 2018
1 parent d98c834 commit b866945
Showing 1 changed file with 61 additions and 44 deletions.
105 changes: 61 additions & 44 deletions src/bind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ where
protocol: Protocol,
}

pub struct ResponseFuture<B>
where
B: tower_h2::Body + Send + 'static,
<B::Data as ::bytes::IntoBuf>::Buf: Send,
{
inner: <Stack<B> as tower::Service>::Future,
}

/// A type of service binding.
///
/// Some services, for various reasons, may not be able to be used to serve multiple
Expand Down Expand Up @@ -141,9 +149,9 @@ pub type Service<B> = BoundService<B>;

pub type Stack<B> = WatchService<tls::ConditionalClientConfig, RebindTls<B>>;

type StackInner<B> = Reconnect<orig_proto::Upgrade<NormalizeUri<NewHttp<B>>>>;
type ReconnectStack<B> = Reconnect<NewHttp<B>>;

pub type NewHttp<B> = telemetry::http::service::NewHttp<Client<B>, B, HttpBody>;
pub type NewHttp<B> = orig_proto::Upgrade<NormalizeUri<telemetry::http::service::NewHttp<Client<B>, B, HttpBody>>>;

pub type HttpResponse = http::Response<telemetry::http::service::ResponseBody<HttpBody>>;

Expand Down Expand Up @@ -236,13 +244,13 @@ where
///
/// When the TLS client configuration is invalidated, this function will
/// be called again to bind a new stack.
fn bind_inner_stack(
fn bind_reconnect_stack(
&self,
ep: &Endpoint,
protocol: &Protocol,
tls_client_config: &tls::ConditionalClientConfig,
)-> StackInner<B> {
debug!("bind_inner_stack endpoint={:?}, protocol={:?}", ep, protocol);
) -> ReconnectStack<B> {
debug!("bind_reconnect_stack endpoint={:?}, protocol={:?}", ep, protocol);
let addr = ep.address();

let tls = ep.tls_identity().and_then(|identity| {
Expand Down Expand Up @@ -292,7 +300,7 @@ where

/// Binds the endpoint stack used to construct a bound service.
///
/// This will wrap the service stack returned by `bind_inner_stack`
/// This will wrap the service stack returned by `bind_reconnect_stack`
/// with a middleware layer that causes it to be re-constructed when
/// the TLS client configuration changes.
///
Expand Down Expand Up @@ -456,8 +464,8 @@ where
{
type Request = <Stack<B> as tower::Service>::Request;
type Response = <Stack<B> as tower::Service>::Response;
type Error = <Stack<B> as tower::Service>::Error;
type Future = <Stack<B> as tower::Service>::Future;
type Error = <NewHttp<B> as tower::NewService>::Error;
type Future = ResponseFuture<B>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
let ready = match self.binding {
Expand Down Expand Up @@ -487,62 +495,71 @@ where
// If they *don't* call `poll_ready` again, that's ok, we won't ever
// try to connect again.
match ready {
Ok(Async::NotReady) => Ok(Async::NotReady),

Ok(ready) => {
trace!("poll_ready: ready for business");
self.debounce_connect_error_log = false;
Ok(ready)
},

Err(ReconnectError::Inner(err)) => {
trace!("poll_ready: inner error");
self.debounce_connect_error_log = false;
Err(err)
},

Err(ReconnectError::Connect(err)) => {
if !self.debounce_connect_error_log {
self.debounce_connect_error_log = true;
warn!("connect error to {:?}: {}", self.endpoint, err);
} else {
debug!("connect error to {:?}: {}", self.endpoint, err);
}
match self.binding {
Binding::Bound(ref mut svc) => {
trace!("poll_ready: binding stack after error");
*svc = self.bind.bind_stack(&self.endpoint, &self.protocol);
},
Binding::BindsPerRequest { ref mut next } => {
trace!("poll_ready: dropping bound stack after error");
next.take();
}
}

// So, this service isn't "ready" yet. Instead of trying to make
// it ready, schedule the task for notification so the caller can
// `Reconnect` is currently idle and needs to be polled to
// rebuild its inner service. Instead of doing this immediately,
// schedule the task for notification so the caller can
// determine whether readiness is still necessary (i.e. whether
// there are still requests to be sent).
//
// But, to return NotReady, we must notify the task. So,
// this notifies the task immediately, and figures that
// whoever owns this service will call `poll_ready` if they
// are still interested.
// This prevents busy-loops when the connection fails
// immediately.
task::current().notify();
Ok(Async::NotReady)
}
// don't debounce on NotReady...
Ok(Async::NotReady) => Ok(Async::NotReady),
other => {
trace!("poll_ready: ready for business");
self.debounce_connect_error_log = false;
other
},

Err(ReconnectError::NotReady) => {
unreachable!("Reconnect::poll_ready cannot fail with NotReady");
}
}
}

fn call(&mut self, request: Self::Request) -> Self::Future {
match self.binding {
let inner = match self.binding {
Binding::Bound(ref mut svc) => svc.call(request),
Binding::BindsPerRequest { ref mut next } => {
// If a service has already been bound in `poll_ready`, consume it.
// Otherwise, bind a new service on-the-spot.
let bind = &self.bind;
let endpoint = &self.endpoint;
let protocol = &self.protocol;
let mut svc = next.take()
.unwrap_or_else(|| {
bind.bind_stack(endpoint, protocol)
});
let mut svc = next.take().expect("poll_ready must be called before call");
svc.call(request)
}
}
};
ResponseFuture { inner }
}
}

impl<B> Future for ResponseFuture<B>
where
B: tower_h2::Body + Send + 'static,
<B::Data as ::bytes::IntoBuf>::Buf: Send,
{
type Item = <Stack<B> as tower::Service>::Response;
type Error = <NewHttp<B> as tower::NewService>::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll().map_err(|e| match e {
ReconnectError::Inner(e) => e,
_ => unreachable!("Reconnect response futures can only fail with inner errors"),
})
}
}

Expand Down Expand Up @@ -622,12 +639,12 @@ where
B: tower_h2::Body + Send + 'static,
<B::Data as ::bytes::IntoBuf>::Buf: Send,
{
type Service = StackInner<B>;
type Service = ReconnectStack<B>;
fn rebind(&mut self, tls: &tls::ConditionalClientConfig) -> Self::Service {
debug!(
"rebinding endpoint stack for {:?}:{:?} on TLS config change",
self.endpoint, self.protocol,
);
self.bind.bind_inner_stack(&self.endpoint, &self.protocol, tls)
self.bind.bind_reconnect_stack(&self.endpoint, &self.protocol, tls)
}
}

0 comments on commit b866945

Please sign in to comment.