diff --git a/src/bind.rs b/src/bind.rs index 7b08f07115f75..76b4c65e695c8 100644 --- a/src/bind.rs +++ b/src/bind.rs @@ -9,9 +9,9 @@ use tower_service as tower; use tower_h2; use tower_reconnect::{Reconnect, Error as ReconnectError}; -use control; use control::destination::Endpoint; use ctx; +use svc::NewClient; use telemetry; use transparency::{self, HttpBody, h1, orig_proto}; use transport; @@ -360,19 +360,16 @@ impl Bind { } } -impl control::destination::Bind for BindProtocol +impl NewClient for BindProtocol where B: tower_h2::Body + Send + 'static, ::Buf: Send, { - type Endpoint = Endpoint; - type Request = http::Request; - type Response = HttpResponse; - type Error = as tower::Service>::Error; - type Service = Service; - type BindError = (); + type Target = Endpoint; + type Error = (); + type Client = Service; - fn bind(&self, ep: &Endpoint) -> Result { + fn new_client(&mut self, ep: &Endpoint) -> Result { Ok(self.bind.bind_service(ep, &self.protocol)) } } diff --git a/src/control/destination/background/destination_set.rs b/src/control/destination/background/destination_set.rs index e9401ef86b6c7..d23a5ae25173e 100644 --- a/src/control/destination/background/destination_set.rs +++ b/src/control/destination/background/destination_set.rs @@ -254,12 +254,12 @@ impl> DestinationSet { ) { let (update_str, update, addr) = match change { CacheChange::Insertion { key, value } => { - ("insert", Update::Bind(key, value.clone()), key) + ("insert", Update::NewClient(key, value.clone()), key) }, CacheChange::Removal { key } => ("remove", Update::Remove(key), key), CacheChange::Modification { key, new_value } => ( "change metadata for", - Update::Bind(key, new_value.clone()), + Update::NewClient(key, new_value.clone()), key, ), }; diff --git a/src/control/destination/background/mod.rs b/src/control/destination/background/mod.rs index 9d85cd197de89..f9adb3f524872 100644 --- a/src/control/destination/background/mod.rs +++ b/src/control/destination/background/mod.rs @@ -226,7 +226,7 @@ where // them onto the new watch first match occ.get().addrs { Exists::Yes(ref cache) => for (&addr, meta) in cache { - let update = Update::Bind(addr, meta.clone()); + let update = Update::NewClient(addr, meta.clone()); resolve.responder.update_tx .unbounded_send(update) .expect("unbounded_send does not fail"); diff --git a/src/control/destination/mod.rs b/src/control/destination/mod.rs index 42aa4bd1199cb..288c4ef9fd4ec 100644 --- a/src/control/destination/mod.rs +++ b/src/control/destination/mod.rs @@ -36,11 +36,11 @@ use futures::{ Poll, Stream }; -use http; use tower_discover::{Change, Discover}; use tower_service::Service; use dns; +use svc::NewClient; use tls; use transport::{DnsNameAndPort, HostAndPort}; @@ -76,7 +76,7 @@ struct Responder { /// A `tower_discover::Discover`, given to a `tower_balance::Balance`. #[derive(Debug)] -pub struct Resolution { +pub struct Resolution { /// Receives updates from the controller. update_rx: mpsc::UnboundedReceiver, @@ -86,8 +86,8 @@ pub struct Resolution { /// reference has been dropped. _active: Arc<()>, - /// Binds an update endpoint to a Service. - bind: B, + /// Creates clients for each new endpoint in the resolution. + new_endpoint: N, } /// Metadata describing an endpoint. @@ -120,34 +120,11 @@ enum Update { /// /// If there was already an endpoint in the load balancer for this /// address, it should be replaced with the new one. - Bind(SocketAddr, Metadata), + NewClient(SocketAddr, Metadata), /// Indicates that the endpoint for this `SocketAddr` should be removed. Remove(SocketAddr), } -/// Bind a `SocketAddr` with a protocol. -pub trait Bind { - /// The type of endpoint upon which a `Service` is bound. - type Endpoint; - - /// Requests handled by the discovered services - type Request; - - /// Responses given by the discovered services - type Response; - - /// Errors produced by the discovered services - type Error; - - type BindError; - - /// The discovered `Service` instance. - type Service: Service; - - /// Bind a service from an endpoint. - fn bind(&self, addr: &Self::Endpoint) -> Result; -} - /// Returns a `Resolver` and a background task future. /// /// The `Resolver` is used by a listener to request resolutions, while @@ -179,7 +156,10 @@ pub fn new( impl Resolver { /// Start watching for address changes for a certain authority. - pub fn resolve(&self, authority: &DnsNameAndPort, bind: B) -> Resolution { + pub fn resolve(&self, authority: &DnsNameAndPort, new_endpoint: N) -> Resolution + where + N: NewClient, + { trace!("resolve; authority={:?}", authority); let (update_tx, update_rx) = mpsc::unbounded(); let active = Arc::new(()); @@ -200,22 +180,22 @@ impl Resolver { Resolution { update_rx, _active: active, - bind, + new_endpoint, } } } // ==== impl Resolution ===== -impl Discover for Resolution +impl Discover for Resolution where - B: Bind>, + N: NewClient, { type Key = SocketAddr; - type Request = B::Request; - type Response = B::Response; - type Error = B::Error; - type Service = B::Service; + type Request = ::Request; + type Response = ::Response; + type Error = ::Error; + type Service = N::Client; type DiscoverError = (); fn poll(&mut self) -> Poll, Self::DiscoverError> { @@ -225,14 +205,14 @@ where let update = try_ready!(up).expect("destination stream must be infinite"); match update { - Update::Bind(addr, meta) => { + Update::NewClient(addr, meta) => { // We expect the load balancer to handle duplicate inserts // by replacing the old endpoint with the new one, so // insertions of new endpoints and metadata changes for // existing ones can be handled in the same way. let endpoint = Endpoint::new(addr, meta); - let service = self.bind.bind(&endpoint).map_err(|_| ())?; + let service = self.new_endpoint.new_client(&endpoint).map_err(|_| ())?; return Ok(Async::Ready(Change::Insert(addr, service))); }, diff --git a/src/control/mod.rs b/src/control/mod.rs index b319f75952b59..3c37838d186b3 100644 --- a/src/control/mod.rs +++ b/src/control/mod.rs @@ -6,6 +6,5 @@ pub mod pb; mod remote_stream; mod serve_http; -pub use self::destination::Bind; pub use self::observe::Observe; pub use self::serve_http::serve_http; diff --git a/src/lib.rs b/src/lib.rs index 9b29d6966b0f1..8752190ba2e3a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -88,6 +88,7 @@ mod logging; mod map_err; mod outbound; pub mod stream; +mod svc; pub mod task; pub mod telemetry; mod transparency; diff --git a/src/outbound.rs b/src/outbound.rs index 4f47217bd11fe..9aa129e001cdd 100644 --- a/src/outbound.rs +++ b/src/outbound.rs @@ -15,7 +15,8 @@ use tower_h2_balance::{PendingUntilFirstData, PendingUntilFirstDataBody}; use linkerd2_proxy_router::Recognize; use bind::{self, Bind, Protocol}; -use control::destination::{self, Bind as BindTrait, Resolution}; +use control::destination::{self, Resolution}; +use svc::NewClient; use ctx; use telemetry::http::service::{ResponseBody as SensorBody}; use timeout::Timeout; @@ -223,8 +224,8 @@ where // in the Balancer forever. However, when we finally add // circuit-breaking, this should be able to take care of itself, // closing down when the connection is no longer usable. - if let Some((addr, bind)) = opt.take() { - let svc = bind.bind(&addr.into()) + if let Some((addr, mut bind)) = opt.take() { + let svc = bind.new_client(&addr.into()) .map_err(|_| BindError::External { addr })?; Ok(Async::Ready(Change::Insert(addr, svc))) } else { diff --git a/src/svc/mod.rs b/src/svc/mod.rs new file mode 100644 index 0000000000000..31336192e0c58 --- /dev/null +++ b/src/svc/mod.rs @@ -0,0 +1,56 @@ +//! Infrastructure for proxying request-response message streams +//! +//! This module contains utilities for proxying request-response streams. This +//! module borrows (and re-exports) from `tower`. +//! +//! ## Clients +//! +//! A client is a `Service` through which the proxy may dispatch requests. +//! +//! In the proxy, there are currently two types of clients: +//! +//! - As the proxy routes requests to an outbound `Destination`, a client +//! service is resolves the destination to and load balances requests +//! over its endpoints. +//! +//! - As an outbound load balancer dispatches a request to an endpoint, or as +//! the inbound proxy fowards an inbound request, a client service models an +//! individual `SocketAddr`. +//! +//! ## TODO +//! +//! * Move HTTP-specific service infrastructure into `svc::http`. + +pub use tower_service::Service; + +pub trait NewClient { + + /// Describes a resource to which the client will be attached. + /// + /// Depending on the implementation, the target may describe a logical name + /// to be resolved (i.e. via DNS) and load balanced, or it may describe a + /// specific network address to which one or more connections will be + /// established, or it may describe an entirely arbitrary "virtual" service + /// (i.e. that exists locally in memory). + type Target; + + /// Indicates why the provided `Target` cannot be used to instantiate a client. + type Error; + + /// Serves requests on behalf of a target. + /// + /// `Client`s are expected to acquire resources lazily as + /// `Service::poll_ready` is called. `Service::poll_ready` must not return + /// `Async::Ready` until the service is ready to service requests. + /// `Service::call` must not be called until `Service::poll_ready` returns + /// `Async::Ready`. When `Service::poll_ready` returns an error, the + /// client must be discarded. + type Client: Service; + + /// Creates a client + /// + /// If the provided `Target` is valid, immediately return a `Client` that may + /// become ready lazily, i.e. as the target is resolved and connections are + /// established. + fn new_client(&mut self, t: &Self::Target) -> Result; +}