diff --git a/src/bind.rs b/src/bind.rs index 76b4c65e695c8..aac4ba72dff3b 100644 --- a/src/bind.rs +++ b/src/bind.rs @@ -63,6 +63,14 @@ where protocol: Protocol, } +pub struct ResponseFuture +where + B: tower_h2::Body + Send + 'static, + ::Buf: Send, +{ + inner: as tower::Service>::Future, +} + /// A type of service binding. /// /// Some services, for various reasons, may not be able to be used to serve multiple @@ -141,9 +149,9 @@ pub type Service = BoundService; pub type Stack = WatchService>; -type StackInner = Reconnect>>>; +type ReconnectStack = Reconnect>; -pub type NewHttp = telemetry::http::service::NewHttp, B, HttpBody>; +pub type NewHttp = orig_proto::Upgrade, B, HttpBody>>>; pub type HttpResponse = http::Response>; @@ -236,13 +244,13 @@ where /// /// When the TLS client configuration is invalidated, this function will /// be called again to bind a new stack. - fn bind_inner_stack( + fn bind_reconnect_stack( &self, ep: &Endpoint, protocol: &Protocol, tls_client_config: &tls::ConditionalClientConfig, - )-> StackInner { - debug!("bind_inner_stack endpoint={:?}, protocol={:?}", ep, protocol); + ) -> ReconnectStack { + debug!("bind_reconnect_stack endpoint={:?}, protocol={:?}", ep, protocol); let addr = ep.address(); let tls = ep.tls_identity().and_then(|identity| { @@ -292,7 +300,7 @@ where /// Binds the endpoint stack used to construct a bound service. /// - /// This will wrap the service stack returned by `bind_inner_stack` + /// This will wrap the service stack returned by `bind_reconnect_stack` /// with a middleware layer that causes it to be re-constructed when /// the TLS client configuration changes. /// @@ -456,8 +464,8 @@ where { type Request = as tower::Service>::Request; type Response = as tower::Service>::Response; - type Error = as tower::Service>::Error; - type Future = as tower::Service>::Future; + type Error = as tower::NewService>::Error; + type Future = ResponseFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { let ready = match self.binding { @@ -487,6 +495,20 @@ where // If they *don't* call `poll_ready` again, that's ok, we won't ever // try to connect again. match ready { + Ok(Async::NotReady) => Ok(Async::NotReady), + + Ok(ready) => { + trace!("poll_ready: ready for business"); + self.debounce_connect_error_log = false; + Ok(ready) + }, + + Err(ReconnectError::Inner(err)) => { + trace!("poll_ready: inner error"); + self.debounce_connect_error_log = false; + Err(err) + }, + Err(ReconnectError::Connect(err)) => { if !self.debounce_connect_error_log { self.debounce_connect_error_log = true; @@ -494,55 +516,50 @@ where } else { debug!("connect error to {:?}: {}", self.endpoint, err); } - match self.binding { - Binding::Bound(ref mut svc) => { - trace!("poll_ready: binding stack after error"); - *svc = self.bind.bind_stack(&self.endpoint, &self.protocol); - }, - Binding::BindsPerRequest { ref mut next } => { - trace!("poll_ready: dropping bound stack after error"); - next.take(); - } - } - // So, this service isn't "ready" yet. Instead of trying to make - // it ready, schedule the task for notification so the caller can + // `Reconnect` is currently idle and needs to be polled to + // rebuild its inner service. Instead of doing this immediately, + // schedule the task for notification so the caller can // determine whether readiness is still necessary (i.e. whether // there are still requests to be sent). // - // But, to return NotReady, we must notify the task. So, - // this notifies the task immediately, and figures that - // whoever owns this service will call `poll_ready` if they - // are still interested. + // This prevents busy-loops when the connection fails + // immediately. task::current().notify(); Ok(Async::NotReady) } - // don't debounce on NotReady... - Ok(Async::NotReady) => Ok(Async::NotReady), - other => { - trace!("poll_ready: ready for business"); - self.debounce_connect_error_log = false; - other - }, + + Err(ReconnectError::NotReady) => { + unreachable!("Reconnect::poll_ready cannot fail with NotReady"); + } } } fn call(&mut self, request: Self::Request) -> Self::Future { - match self.binding { + let inner = match self.binding { Binding::Bound(ref mut svc) => svc.call(request), Binding::BindsPerRequest { ref mut next } => { - // If a service has already been bound in `poll_ready`, consume it. - // Otherwise, bind a new service on-the-spot. - let bind = &self.bind; - let endpoint = &self.endpoint; - let protocol = &self.protocol; - let mut svc = next.take() - .unwrap_or_else(|| { - bind.bind_stack(endpoint, protocol) - }); + let mut svc = next.take().expect("poll_ready must be called before call"); svc.call(request) } - } + }; + ResponseFuture { inner } + } +} + +impl Future for ResponseFuture +where + B: tower_h2::Body + Send + 'static, + ::Buf: Send, +{ + type Item = as tower::Service>::Response; + type Error = as tower::NewService>::Error; + + fn poll(&mut self) -> Poll { + self.inner.poll().map_err(|e| match e { + ReconnectError::Inner(e) => e, + _ => unreachable!("Reconnect response futures can only fail with inner errors"), + }) } } @@ -622,12 +639,12 @@ where B: tower_h2::Body + Send + 'static, ::Buf: Send, { - type Service = StackInner; + type Service = ReconnectStack; fn rebind(&mut self, tls: &tls::ConditionalClientConfig) -> Self::Service { debug!( "rebinding endpoint stack for {:?}:{:?} on TLS config change", self.endpoint, self.protocol, ); - self.bind.bind_inner_stack(&self.endpoint, &self.protocol, tls) + self.bind.bind_reconnect_stack(&self.endpoint, &self.protocol, tls) } }