Skip to content

Commit

Permalink
port linkerd2-proxy-error::recover and linkerd2-proxy-resolve to …
Browse files Browse the repository at this point in the history
…std::future (#518)

This branch updates the `linkerd2-proxy-resolve` crate to `std::future`.
It also updates the `recover` module in `linkerd2-proxy-error`, which is
a dependency of `resolve`, and `linkerd2-reconnect`, which depends on
`recover` and therefore didn't compile after it was updated.

The `linkerd2-proxy-resolve::recover` code currently only works when the
underlying resolver's resolution and future types are both `Unpin`, and
the backoff type is also `Unpin`. This is because this logic relies a
bit on being able to move these values in and out of `Option`s. I tried
to implement this without requiring these values to be `Unpin`, but the
implementation of the `ResolveFuture`, which drives the resolution until
it's connected and then returns it, was more or less impossible to
translate in its current state. 
 
I suspect that this won't be an issue when updating code that depends on
`recover` — I think the resolutions will probably be `Unpin` without any
intervention from us. If not, we can always either `Box::pin` them, or
try to do a more complete rewrite of this code so that it's refactored
in a way that avoids these issues.

In other cases, we can sometimes just replace code that is difficult to
port to use `async`/`await`. Here, that was not possible, because we are
implementing the `Resolution` trait, rather than a future. We _may_ want
to consider rewriting `Resolution` to be a trait alias for a `Stream`,
so that we can use the `async-stream` crate, but that may have other
downsides, so I decided to not do that just yet.

Hopefully this all makes sense!

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw authored May 19, 2020
1 parent 7d208ef commit dead469
Show file tree
Hide file tree
Showing 10 changed files with 700 additions and 649 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@ version = "0.1.0"
name = "linkerd2-error"
version = "0.1.0"
dependencies = [
"futures 0.1.26",
"futures 0.3.4",
]

[[package]]
Expand Down Expand Up @@ -1361,12 +1361,12 @@ dependencies = [
name = "linkerd2-proxy-resolve"
version = "0.1.0"
dependencies = [
"futures 0.1.26",
"futures 0.3.4",
"indexmap",
"linkerd2-error",
"linkerd2-proxy-core",
"pin-project",
"tokio 0.1.22",
"tokio 0.2.20",
"tower 0.3.1",
"tracing",
]
Expand Down
2 changes: 1 addition & 1 deletion linkerd/error/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ edition = "2018"
publish = false

[dependencies]
futures = "0.1"
futures = "0.3"
14 changes: 7 additions & 7 deletions linkerd/error/src/recover.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use super::{Error, Never};
use futures::{stream, Stream};
use futures::stream::{self, TryStream};

/// An error recovery strategy.
pub trait Recover<E: Into<Error> = Error> {
type Error: Into<Error>;
type Backoff: Stream<Item = (), Error = Self::Error>;
type Backoff: TryStream<Ok = (), Error = Self::Error>;

/// Given an E-typed error, determine if the error is recoverable.
///
Expand All @@ -25,7 +25,7 @@ pub struct Immediately(());
impl<E, B, F> Recover<E> for F
where
E: Into<Error>,
B: Stream<Item = ()>,
B: TryStream<Ok = ()>,
B::Error: Into<Error>,
F: Fn(E) -> Result<B, E>,
{
Expand All @@ -47,17 +47,17 @@ impl Immediately {

impl<E: Into<Error>> Recover<E> for Immediately {
type Error = Never;
type Backoff = stream::IterOk<Immediately, Never>;
type Backoff = stream::Iter<Immediately>;

fn recover(&self, _: E) -> Result<Self::Backoff, E> {
Ok(stream::iter_ok(Immediately(())))
Ok(stream::iter(Immediately(())))
}
}

impl Iterator for Immediately {
type Item = ();
type Item = Result<(), Never>;

fn next(&mut self) -> Option<Self::Item> {
Some(())
Some(Ok(()))
}
}
2 changes: 1 addition & 1 deletion linkerd/proxy/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Core interfaces needed to implement proxy components
"""

[dependencies]
futures = {package = "futures", version = "0.3" }
futures = "0.3"
linkerd2-error = { path = "../../error" }
tokio = "0.2"
tower = {version = "0.3", default-features = false }
Expand Down
10 changes: 10 additions & 0 deletions linkerd/proxy/core/src/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ pub trait Resolution {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Update<Self::Endpoint>, Self::Error>>;

fn poll_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Update<Self::Endpoint>, Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll(cx)
}
}

#[derive(Clone, Debug)]
Expand Down
4 changes: 2 additions & 2 deletions linkerd/proxy/resolve/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ Utilities for working with `Resolve` implementations
"""

[dependencies]
futures = "0.1"
futures = "0.3"
linkerd2-error = { path = "../../error" }
linkerd2-proxy-core = { path = "../core" }
indexmap = "1.0"
tokio = "0.1"
tokio = "0.2"
tracing = "0.1"
pin-project = "0.4"

Expand Down
285 changes: 149 additions & 136 deletions linkerd/proxy/resolve/src/map_endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,136 +1,149 @@
// //! A middleware that wraps `Resolutions`, modifying their endpoint type.

// use futures::{try_ready, Async, Future, Poll};
// use linkerd2_proxy_core::resolve;
// use std::net::SocketAddr;

// pub trait MapEndpoint<Target, In> {
// type Out;
// fn map_endpoint(&self, target: &Target, addr: SocketAddr, in_ep: In) -> Self::Out;
// }

// #[derive(Clone, Debug)]
// pub struct Resolve<M, R> {
// resolve: R,
// map: M,
// }

// #[derive(Debug)]
// pub struct ResolveFuture<T, F, M> {
// future: F,
// target: Option<T>,
// map: Option<M>,
// }

// #[derive(Clone, Debug)]
// pub struct Resolution<T, M, R> {
// resolution: R,
// target: T,
// map: M,
// }

// // === impl Resolve ===

// impl<M, R> Resolve<M, R> {
// pub fn new<T>(map: M, resolve: R) -> Self
// where
// Self: resolve::Resolve<T>,
// {
// Self { resolve, map }
// }
// }

// impl<T, M, R> tower::Service<T> for Resolve<M, R>
// where
// T: Clone,
// R: resolve::Resolve<T>,
// M: MapEndpoint<T, R::Endpoint> + Clone,
// {
// type Response = Resolution<T, M, R::Resolution>;
// type Error = R::Error;
// type Future = ResolveFuture<T, R::Future, M>;

// #[inline]
// fn poll_ready(&mut self) -> Poll<(), Self::Error> {
// self.resolve.poll_ready()
// }

// #[inline]
// fn call(&mut self, target: T) -> Self::Future {
// let future = self.resolve.resolve(target.clone());
// Self::Future {
// future,
// target: Some(target),
// map: Some(self.map.clone()),
// }
// }
// }

// // === impl ResolveFuture ===

// impl<T, F, M> Future for ResolveFuture<T, F, M>
// where
// F: Future,
// F::Item: resolve::Resolution,
// M: MapEndpoint<T, <F::Item as resolve::Resolution>::Endpoint>,
// {
// type Item = Resolution<T, M, F::Item>;
// type Error = F::Error;

// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// let resolution = try_ready!(self.future.poll());
// let target = self.target.take().expect("polled after ready");
// let map = self.map.take().expect("polled after ready");
// Ok(Async::Ready(Resolution {
// resolution,
// target,
// map,
// }))
// }
// }

// // === impl Resolution ===

// impl<T, M, R> resolve::Resolution for Resolution<T, M, R>
// where
// R: resolve::Resolution,
// M: MapEndpoint<T, R::Endpoint>,
// {
// type Endpoint = M::Out;
// type Error = R::Error;

// fn poll(&mut self) -> Poll<resolve::Update<M::Out>, Self::Error> {
// let update = match try_ready!(self.resolution.poll()) {
// resolve::Update::Add(eps) => resolve::Update::Add(
// eps.into_iter()
// .map(|(a, ep)| {
// let ep = self.map.map_endpoint(&self.target, a, ep);
// (a, ep)
// })
// .collect(),
// ),
// resolve::Update::Remove(addrs) => resolve::Update::Remove(addrs),
// resolve::Update::DoesNotExist => resolve::Update::DoesNotExist,
// resolve::Update::Empty => resolve::Update::Empty,
// };
// Ok(update.into())
// }
// }

// // === impl MapEndpoint ===

// impl<T, N> MapEndpoint<T, N> for () {
// type Out = N;

// fn map_endpoint(&self, _: &T, _: SocketAddr, ep: N) -> Self::Out {
// ep
// }
// }

// impl<T, In, Out, F: Fn(&T, SocketAddr, In) -> Out> MapEndpoint<T, In> for F {
// type Out = Out;

// fn map_endpoint(&self, target: &T, addr: SocketAddr, ep: In) -> Self::Out {
// (self)(target, addr, ep)
// }
// }
//! A middleware that wraps `Resolutions`, modifying their endpoint type.
use futures::{ready, TryFuture};
use linkerd2_proxy_core::resolve;
use pin_project::pin_project;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait MapEndpoint<Target, In> {
type Out;
fn map_endpoint(&self, target: &Target, addr: SocketAddr, in_ep: In) -> Self::Out;
}

#[derive(Clone, Debug)]
pub struct Resolve<M, R> {
resolve: R,
map: M,
}

#[pin_project]
#[derive(Debug)]
pub struct ResolveFuture<T, F, M> {
#[pin]
future: F,
target: Option<T>,
map: Option<M>,
}

#[pin_project]
#[derive(Clone, Debug)]
pub struct Resolution<T, M, R> {
#[pin]
resolution: R,
target: T,
map: M,
}

// === impl Resolve ===

impl<M, R> Resolve<M, R> {
pub fn new<T>(map: M, resolve: R) -> Self
where
Self: resolve::Resolve<T>,
{
Self { resolve, map }
}
}

impl<T, M, R> tower::Service<T> for Resolve<M, R>
where
T: Clone,
R: resolve::Resolve<T>,
M: MapEndpoint<T, R::Endpoint> + Clone,
{
type Response = Resolution<T, M, R::Resolution>;
type Error = R::Error;
type Future = ResolveFuture<T, R::Future, M>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.resolve.poll_ready(cx)
}

#[inline]
fn call(&mut self, target: T) -> Self::Future {
let future = self.resolve.resolve(target.clone());
Self::Future {
future,
target: Some(target),
map: Some(self.map.clone()),
}
}
}

// === impl ResolveFuture ===

impl<T, F, M> Future for ResolveFuture<T, F, M>
where
F: TryFuture,
F::Ok: resolve::Resolution,
M: MapEndpoint<T, <F::Ok as resolve::Resolution>::Endpoint>,
{
type Output = Result<Resolution<T, M, F::Ok>, F::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let resolution = ready!(this.future.try_poll(cx))?;
let target = this.target.take().expect("polled after ready");
let map = this.map.take().expect("polled after ready");
Poll::Ready(Ok(Resolution {
resolution,
target,
map,
}))
}
}

// === impl Resolution ===

impl<T, M, R> resolve::Resolution for Resolution<T, M, R>
where
R: resolve::Resolution,
M: MapEndpoint<T, R::Endpoint>,
{
type Endpoint = M::Out;
type Error = R::Error;

fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<resolve::Update<M::Out>, Self::Error>> {
let this = self.project();
let update = match ready!(this.resolution.poll(cx))? {
resolve::Update::Add(eps) => {
let mut update = Vec::new();
for (a, ep) in eps.into_iter() {
let ep = this.map.map_endpoint(&this.target, a, ep);
update.push((a, ep));
}

resolve::Update::Add(update)
}
resolve::Update::Remove(addrs) => resolve::Update::Remove(addrs),
resolve::Update::DoesNotExist => resolve::Update::DoesNotExist,
resolve::Update::Empty => resolve::Update::Empty,
};
Poll::Ready(Ok(update))
}
}

// === impl MapEndpoint ===

impl<T, N> MapEndpoint<T, N> for () {
type Out = N;

fn map_endpoint(&self, _: &T, _: SocketAddr, ep: N) -> Self::Out {
ep
}
}

impl<T, In, Out, F: Fn(&T, SocketAddr, In) -> Out> MapEndpoint<T, In> for F {
type Out = Out;

fn map_endpoint(&self, target: &T, addr: SocketAddr, ep: In) -> Self::Out {
(self)(target, addr, ep)
}
}
Loading

0 comments on commit dead469

Please sign in to comment.