Skip to content

Commit

Permalink
port buffer to std::future (#488)
Browse files Browse the repository at this point in the history
This branch ports the `linkerd2-buffer` to use `std::future` and `tokio`
0.2. I've also re-enabled the `push_spawn_buffer` fns in `svc`.

Like #486 and #487, this change should be pretty straightforward to
read, as it's fairly mechanical in nature. I've basically just changed
the code to account for the new APIs. This one might be a _little_ more
interesting, since it shows off some of the API differences in
`tokio::sync` in 0.2 as well.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw authored Apr 24, 2020
1 parent f424f0a commit ce96fd1
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 157 deletions.
23 changes: 7 additions & 16 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -896,11 +896,13 @@ dependencies = [
name = "linkerd2-buffer"
version = "0.1.0"
dependencies = [
"futures 0.1.26",
"futures 0.3.4",
"linkerd2-error",
"tokio 0.1.22",
"tower 0.1.1",
"tower-test 0.1.0",
"pin-project",
"tokio 0.2.17",
"tokio-test",
"tower 0.3.1",
"tower-test",
"tracing",
"tracing-futures 0.1.0",
]
Expand Down Expand Up @@ -1487,7 +1489,7 @@ dependencies = [
"tokio-connect",
"tokio-test",
"tower 0.3.1",
"tower-test 0.3.0",
"tower-test",
"tracing",
]

Expand Down Expand Up @@ -2955,17 +2957,6 @@ dependencies = [
"tower-util",
]

[[package]]
name = "tower-test"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99a0b25036e2c58f681a04a5b312e9283d229445ad5e1221a1b7a4630df6fbdf"
dependencies = [
"futures 0.1.26",
"tokio-sync",
"tower-service 0.2.0",
]

[[package]]
name = "tower-test"
version = "0.3.0"
Expand Down
22 changes: 11 additions & 11 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,17 @@ impl<S> Stack<S> {
self.push(stack::MakeReadyLayer::new())
}

// /// Buffer requests when when the next layer is out of capacity.
// pub fn spawn_buffer<Req>(self, capacity: usize) -> Stack<buffer::Buffer<Req, S::Future>>
// where
// Req: Send + 'static,
// S: Service<Req> + Send + 'static,
// S::Response: Send + 'static,
// S::Error: Into<Error> + Send + Sync,
// S::Future: Send,
// {
// self.push(buffer::SpawnBufferLayer::new(capacity))
// }
/// Buffer requests when when the next layer is out of capacity.
pub fn spawn_buffer<Req>(self, capacity: usize) -> Stack<buffer::Buffer<Req, S::Future>>
where
Req: Send + 'static,
S: Service<Req> + Send + 'static,
S::Response: Send + 'static,
S::Error: Into<Error> + Send + Sync,
S::Future: Send,
{
self.push(buffer::SpawnBufferLayer::new(capacity))
}

// /// Assuming `S` implements `NewService` or `MakeService`, applies the given
// /// `L`-typed layer on each service produced by `S`.
Expand Down
12 changes: 7 additions & 5 deletions linkerd/buffer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ edition = "2018"
publish = false

[dependencies]
futures = "0.1"
futures = "0.3"
linkerd2-error = { path = "../error" }
tokio = "0.1"
tower = "0.1"
tokio = { version = "0.2", features = ["sync", "stream", "macros"] }
tower = "0.3"
tracing = "0.1"
tracing-futures = "0.1"
tracing-futures = { version = "0.1", features = ["std-future"] }
pin-project = "0.4"

[dev-dependencies]
tower-test = "0.1"
tower-test = "0.3"
tokio-test = "0.2"
71 changes: 40 additions & 31 deletions linkerd/buffer/src/dispatch.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use crate::error::ServiceError;
use crate::InFlight;
use futures::{Async, Future, Poll, Stream};
use linkerd2_error::{Error, Never};
use std::sync::Arc;
use linkerd2_error::Error;
use pin_project::pin_project;
use std::task::{Context, Poll};
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::sync::{mpsc, watch};
use tracing::trace;

/// A future that drives the inner service.
#[pin_project]
pub struct Dispatch<S, Req, F> {
inner: Option<S>,
#[pin]
rx: mpsc::Receiver<InFlight<Req, F>>,
ready: watch::Sender<Poll<(), ServiceError>>,
ready: watch::Sender<Poll<Result<(), ServiceError>>>,
}

impl<S, Req> Dispatch<S, Req, S::Future>
Expand All @@ -23,7 +26,7 @@ where
pub(crate) fn new(
inner: S,
rx: mpsc::Receiver<InFlight<Req, S::Future>>,
ready: watch::Sender<Poll<(), ServiceError>>,
ready: watch::Sender<Poll<Result<(), ServiceError>>>,
) -> Self {
Self {
inner: Some(inner),
Expand All @@ -36,7 +39,7 @@ where
macro_rules! return_ready {
() => {{
trace!("Complete");
return Ok(Async::Ready(()));
return Poll::Ready(());
}};
}

Expand All @@ -55,76 +58,82 @@ where
S::Response: Send + 'static,
S::Future: Send + 'static,
{
type Item = ();
type Error = Never;
type Output = ();

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut this = self.project();
// Complete the task when all services have dropped.
return_ready_if!(self.ready.poll_close().expect("must not fail").is_ready());
return_ready_if!({
// `watch::Sender::poll_close` is private in `tokio::sync`.
let closed = this.ready.closed();
tokio::pin!(closed);
closed.poll(cx).is_ready()
});

// Drive requests from the queue to the inner service.
loop {
let ready = match self.inner.as_mut() {
Some(inner) => inner.poll_ready(),
let ready = match this.inner.as_mut() {
Some(inner) => inner.poll_ready(cx),
None => {
// This is safe because ready.poll_close has returned NotReady.
return Ok(Async::NotReady);
// This is safe because ready.closed() has returned Pending.
return Poll::Pending;
}
};

match ready {
// If it's not ready, wait for it..
Ok(Async::NotReady) => {
return_ready_if!(self.ready.broadcast(Ok(Async::NotReady)).is_err());
Poll::Pending => {
return_ready_if!(this.ready.broadcast(Poll::Pending).is_err());

trace!("Waiting for inner service");
return Ok(Async::NotReady);
return Poll::Pending;
}

// If the service fails, propagate the failure to all pending
// requests and then complete.
Err(error) => {
Poll::Ready(Err(error)) => {
let shared = ServiceError(Arc::new(error.into()));
trace!(%shared, "Inner service failed");

// First, notify services of the readiness change to prevent new requests from
// being buffered.
let is_active = self.ready.broadcast(Err(shared.clone())).is_ok();
let is_active = this
.ready
.broadcast(Poll::Ready(Err(shared.clone())))
.is_ok();

// Propagate the error to all in-flight requests.
while let Ok(Async::Ready(Some(InFlight { tx, .. }))) = self.rx.poll() {
while let Poll::Ready(Some(InFlight { tx, .. })) = this.rx.poll_recv(cx) {
let _ = tx.send(Err(shared.clone().into()));
}

// Drop the inner Service to free its resources. It won't be used again.
self.inner = None;
let _ = this.inner.take();

// Ensure the task remains active until all services have observed the error.
return_ready_if!(!is_active);

// This is safe because ready.poll_close has returned NotReady. The task will
// This is safe because ready.closed() has returned Pending. The task will
// complete when all observes have dropped their interest in `ready`.
return Ok(Async::NotReady);
return Poll::Pending;
}

// If inner service can receive requests, start polling the channel.
Ok(Async::Ready(())) => {
return_ready_if!(self.ready.broadcast(Ok(Async::Ready(()))).is_err());
Poll::Ready(Ok(())) => {
return_ready_if!(this.ready.broadcast(Poll::Ready(Ok(()))).is_err());
trace!("Ready for requests");
}
}

// The inner service is ready, so poll for new requests.
match self.rx.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),

match futures::ready!(this.rx.poll_recv(cx)) {
// All senders have been dropped, complete.
Err(_) | Ok(Async::Ready(None)) => return_ready!(),
None => return_ready!(),

// If a request was ready return it to the caller.
Ok(Async::Ready(Some(InFlight { request, tx }))) => {
Some(InFlight { request, tx }) => {
trace!("Dispatching a request");
let fut = self
let fut = this
.inner
.as_mut()
.expect("Service must not be dropped")
Expand Down
3 changes: 1 addition & 2 deletions linkerd/buffer/src/layer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::Buffer;
use futures::Future;
use linkerd2_error::Error;
use tracing_futures::Instrument;

Expand Down Expand Up @@ -38,7 +37,7 @@ where

fn layer(&self, inner: S) -> Self::Service {
let (buffer, dispatch) = crate::new(inner, self.capacity);
tokio::spawn(dispatch.in_current_span().map_err(|n| match n {}));
tokio::spawn(dispatch.in_current_span());
buffer
}
}
Loading

0 comments on commit ce96fd1

Please sign in to comment.