Skip to content

Commit

Permalink
Move control::Bind to svc::NewClient (linkerd#86)
Browse files Browse the repository at this point in the history
The `control::destination` exposes an important trait, `Bind`, that
abstracts the logic of instantiating a new service for an individual
endpoint (i.e., in a load balancer).

This interface is not specific to our service discovery implementation,
and can easily be used to model other types of client factory.

In the spirit of consolidating our HTTP-specific logic, and making the
core APIs of the proxy more visible, this change renames the `Bind`
trait to `NewClient`, simplifies the trait to have fewer type
parameters, and documents this new generalized API.
  • Loading branch information
olix0r authored Aug 28, 2018
1 parent 8ea9a36 commit d98c834
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 54 deletions.
15 changes: 6 additions & 9 deletions src/bind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use tower_service as tower;
use tower_h2;
use tower_reconnect::{Reconnect, Error as ReconnectError};

use control;
use control::destination::Endpoint;
use ctx;
use svc::NewClient;
use telemetry;
use transparency::{self, HttpBody, h1, orig_proto};
use transport;
Expand Down Expand Up @@ -360,19 +360,16 @@ impl<C, B> Bind<C, B> {
}
}

impl<B> control::destination::Bind for BindProtocol<ctx::Proxy, B>
impl<B> NewClient for BindProtocol<ctx::Proxy, B>
where
B: tower_h2::Body + Send + 'static,
<B::Data as ::bytes::IntoBuf>::Buf: Send,
{
type Endpoint = Endpoint;
type Request = http::Request<B>;
type Response = HttpResponse;
type Error = <Service<B> as tower::Service>::Error;
type Service = Service<B>;
type BindError = ();
type Target = Endpoint;
type Error = ();
type Client = Service<B>;

fn bind(&self, ep: &Endpoint) -> Result<Self::Service, Self::BindError> {
fn new_client(&mut self, ep: &Endpoint) -> Result<Self::Client, ()> {
Ok(self.bind.bind_service(ep, &self.protocol))
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/control/destination/background/destination_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,12 @@ impl<T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
) {
let (update_str, update, addr) = match change {
CacheChange::Insertion { key, value } => {
("insert", Update::Bind(key, value.clone()), key)
("insert", Update::NewClient(key, value.clone()), key)
},
CacheChange::Removal { key } => ("remove", Update::Remove(key), key),
CacheChange::Modification { key, new_value } => (
"change metadata for",
Update::Bind(key, new_value.clone()),
Update::NewClient(key, new_value.clone()),
key,
),
};
Expand Down
2 changes: 1 addition & 1 deletion src/control/destination/background/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ where
// them onto the new watch first
match occ.get().addrs {
Exists::Yes(ref cache) => for (&addr, meta) in cache {
let update = Update::Bind(addr, meta.clone());
let update = Update::NewClient(addr, meta.clone());
resolve.responder.update_tx
.unbounded_send(update)
.expect("unbounded_send does not fail");
Expand Down
56 changes: 18 additions & 38 deletions src/control/destination/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ use futures::{
Poll,
Stream
};
use http;
use tower_discover::{Change, Discover};
use tower_service::Service;

use dns;
use svc::NewClient;
use tls;
use transport::{DnsNameAndPort, HostAndPort};

Expand Down Expand Up @@ -76,7 +76,7 @@ struct Responder {

/// A `tower_discover::Discover`, given to a `tower_balance::Balance`.
#[derive(Debug)]
pub struct Resolution<B> {
pub struct Resolution<N> {
/// Receives updates from the controller.
update_rx: mpsc::UnboundedReceiver<Update>,

Expand All @@ -86,8 +86,8 @@ pub struct Resolution<B> {
/// reference has been dropped.
_active: Arc<()>,

/// Binds an update endpoint to a Service.
bind: B,
/// Creates clients for each new endpoint in the resolution.
new_endpoint: N,
}

/// Metadata describing an endpoint.
Expand Down Expand Up @@ -120,34 +120,11 @@ enum Update {
///
/// If there was already an endpoint in the load balancer for this
/// address, it should be replaced with the new one.
Bind(SocketAddr, Metadata),
NewClient(SocketAddr, Metadata),
/// Indicates that the endpoint for this `SocketAddr` should be removed.
Remove(SocketAddr),
}

/// Bind a `SocketAddr` with a protocol.
pub trait Bind {
/// The type of endpoint upon which a `Service` is bound.
type Endpoint;

/// Requests handled by the discovered services
type Request;

/// Responses given by the discovered services
type Response;

/// Errors produced by the discovered services
type Error;

type BindError;

/// The discovered `Service` instance.
type Service: Service<Request = Self::Request, Response = Self::Response, Error = Self::Error>;

/// Bind a service from an endpoint.
fn bind(&self, addr: &Self::Endpoint) -> Result<Self::Service, Self::BindError>;
}

/// Returns a `Resolver` and a background task future.
///
/// The `Resolver` is used by a listener to request resolutions, while
Expand Down Expand Up @@ -179,7 +156,10 @@ pub fn new(

impl Resolver {
/// Start watching for address changes for a certain authority.
pub fn resolve<B>(&self, authority: &DnsNameAndPort, bind: B) -> Resolution<B> {
pub fn resolve<N>(&self, authority: &DnsNameAndPort, new_endpoint: N) -> Resolution<N>
where
N: NewClient,
{
trace!("resolve; authority={:?}", authority);
let (update_tx, update_rx) = mpsc::unbounded();
let active = Arc::new(());
Expand All @@ -200,22 +180,22 @@ impl Resolver {
Resolution {
update_rx,
_active: active,
bind,
new_endpoint,
}
}
}

// ==== impl Resolution =====

impl<B, A> Discover for Resolution<B>
impl<N> Discover for Resolution<N>
where
B: Bind<Endpoint = Endpoint, Request = http::Request<A>>,
N: NewClient<Target = Endpoint>,
{
type Key = SocketAddr;
type Request = B::Request;
type Response = B::Response;
type Error = B::Error;
type Service = B::Service;
type Request = <N::Client as Service>::Request;
type Response = <N::Client as Service>::Response;
type Error = <N::Client as Service>::Error;
type Service = N::Client;
type DiscoverError = ();

fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
Expand All @@ -225,14 +205,14 @@ where
let update = try_ready!(up).expect("destination stream must be infinite");

match update {
Update::Bind(addr, meta) => {
Update::NewClient(addr, meta) => {
// We expect the load balancer to handle duplicate inserts
// by replacing the old endpoint with the new one, so
// insertions of new endpoints and metadata changes for
// existing ones can be handled in the same way.
let endpoint = Endpoint::new(addr, meta);

let service = self.bind.bind(&endpoint).map_err(|_| ())?;
let service = self.new_endpoint.new_client(&endpoint).map_err(|_| ())?;

return Ok(Async::Ready(Change::Insert(addr, service)));
},
Expand Down
1 change: 0 additions & 1 deletion src/control/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,5 @@ pub mod pb;
mod remote_stream;
mod serve_http;

pub use self::destination::Bind;
pub use self::observe::Observe;
pub use self::serve_http::serve_http;
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ mod logging;
mod map_err;
mod outbound;
pub mod stream;
mod svc;
pub mod task;
pub mod telemetry;
mod transparency;
Expand Down
7 changes: 4 additions & 3 deletions src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use tower_h2_balance::{PendingUntilFirstData, PendingUntilFirstDataBody};
use linkerd2_proxy_router::Recognize;

use bind::{self, Bind, Protocol};
use control::destination::{self, Bind as BindTrait, Resolution};
use control::destination::{self, Resolution};
use svc::NewClient;
use ctx;
use telemetry::http::service::{ResponseBody as SensorBody};
use timeout::Timeout;
Expand Down Expand Up @@ -223,8 +224,8 @@ where
// in the Balancer forever. However, when we finally add
// circuit-breaking, this should be able to take care of itself,
// closing down when the connection is no longer usable.
if let Some((addr, bind)) = opt.take() {
let svc = bind.bind(&addr.into())
if let Some((addr, mut bind)) = opt.take() {
let svc = bind.new_client(&addr.into())
.map_err(|_| BindError::External { addr })?;
Ok(Async::Ready(Change::Insert(addr, svc)))
} else {
Expand Down
56 changes: 56 additions & 0 deletions src/svc/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//! Infrastructure for proxying request-response message streams
//!
//! This module contains utilities for proxying request-response streams. This
//! module borrows (and re-exports) from `tower`.
//!
//! ## Clients
//!
//! A client is a `Service` through which the proxy may dispatch requests.
//!
//! In the proxy, there are currently two types of clients:
//!
//! - As the proxy routes requests to an outbound `Destination`, a client
//! service is resolves the destination to and load balances requests
//! over its endpoints.
//!
//! - As an outbound load balancer dispatches a request to an endpoint, or as
//! the inbound proxy fowards an inbound request, a client service models an
//! individual `SocketAddr`.
//!
//! ## TODO
//!
//! * Move HTTP-specific service infrastructure into `svc::http`.
pub use tower_service::Service;

pub trait NewClient {

/// Describes a resource to which the client will be attached.
///
/// Depending on the implementation, the target may describe a logical name
/// to be resolved (i.e. via DNS) and load balanced, or it may describe a
/// specific network address to which one or more connections will be
/// established, or it may describe an entirely arbitrary "virtual" service
/// (i.e. that exists locally in memory).
type Target;

/// Indicates why the provided `Target` cannot be used to instantiate a client.
type Error;

/// Serves requests on behalf of a target.
///
/// `Client`s are expected to acquire resources lazily as
/// `Service::poll_ready` is called. `Service::poll_ready` must not return
/// `Async::Ready` until the service is ready to service requests.
/// `Service::call` must not be called until `Service::poll_ready` returns
/// `Async::Ready`. When `Service::poll_ready` returns an error, the
/// client must be discarded.
type Client: Service;

/// Creates a client
///
/// If the provided `Target` is valid, immediately return a `Client` that may
/// become ready lazily, i.e. as the target is resolved and connections are
/// established.
fn new_client(&mut self, t: &Self::Target) -> Result<Self::Client, Self::Error>;
}

0 comments on commit d98c834

Please sign in to comment.