Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ready-cache: Ensure cancelation can be observed #668

Merged
merged 10 commits into from
Jun 17, 2022
101 changes: 68 additions & 33 deletions tower/src/ready_cache/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@

use super::error;
use futures_core::Stream;
use futures_util::stream::FuturesUnordered;
use futures_util::{stream::FuturesUnordered, task::AtomicWaker};
pub use indexmap::Equivalent;
use indexmap::IndexMap;
use std::fmt;
use std::future::Future;
use std::hash::Hash;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::oneshot;
use tower_service::Service;
use tracing::{debug, trace};

Expand Down Expand Up @@ -75,8 +76,18 @@ where
// Safety: This is safe because we do not use `Pin::new_unchecked`.
impl<S, K: Eq + Hash, Req> Unpin for ReadyCache<K, S, Req> {}

type CancelRx = oneshot::Receiver<()>;
type CancelTx = oneshot::Sender<()>;
#[derive(Debug)]
struct Cancel {
waker: AtomicWaker,
canceled: AtomicBool,
}

#[derive(Debug)]
struct CancelRx(Arc<Cancel>);

#[derive(Debug)]
struct CancelTx(Arc<Cancel>);

type CancelPair = (CancelTx, CancelRx);

#[derive(Debug)]
Expand Down Expand Up @@ -195,7 +206,7 @@ where
/// must be called to cause the service to be dropped.
pub fn evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool {
let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) {
c.send(()).expect("cancel receiver lost");
c.cancel();
true
} else {
false
Expand Down Expand Up @@ -226,14 +237,14 @@ where
///
/// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending
pub fn push(&mut self, key: K, svc: S) {
let cancel = oneshot::channel();
let cancel = cancelable();
self.push_pending(key, svc, cancel);
}

fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) {
if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) {
// If there is already a service for this key, cancel it.
c.send(()).expect("cancel receiver lost");
c.cancel();
}
self.pending.push(Pending {
key: Some(key),
Expand Down Expand Up @@ -270,21 +281,10 @@ where
// recreated after the service is used.
self.ready.insert(key, (svc, (cancel_tx, cancel_rx)));
} else {
// This should not technically be possible. We must have decided to cancel
// a Service (by sending on the CancelTx), yet that same service then
// returns Ready. Since polling a Pending _first_ polls the CancelRx, that
// _should_ always see our CancelTx send. Yet empirically, that isn't true:
//
// https://github.com/tower-rs/tower/issues/415
//
// So, we instead detect the endpoint as canceled at this point. That
// should be fine, since the oneshot is only really there to ensure that
// the Pending is polled again anyway.
//
// We assert that this can't happen in debug mode so that hopefully one day
// we can find a test that triggers this reliably.
debug_assert!(cancel_tx.is_some());
debug!("canceled endpoint removed when ready");
assert!(
cancel_tx.is_some(),
"services that become ready must have a pending cancelation"
);
}
}
Poll::Ready(Some(Err(PendingError::Canceled(_)))) => {
Expand All @@ -294,13 +294,11 @@ where
}
Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => {
let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
if cancel_tx.is_some() {
return Err(error::Failed(key, e.into())).into();
} else {
// See comment for the same clause under Ready(Some(Ok)).
debug_assert!(cancel_tx.is_some());
debug!("canceled endpoint removed on error");
}
assert!(
cancel_tx.is_some(),
"services that return an error must have a pending cancelation"
);
return Err(error::Failed(key, e.into())).into();
}
}
}
Expand Down Expand Up @@ -400,6 +398,28 @@ where
}
}

// === impl Cancel ===

/// Creates a cancelation sender and receiver.
///
/// A `tokio::sync::oneshot` is NOT used, as a `Receiver` is not guaranteed to
/// observe results as soon as a `Sender` fires. Using an `AtomicBool` allows
/// the state to be observed as soon as the cancelation is triggered.
Comment on lines +405 to +407
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 for the comment

fn cancelable() -> CancelPair {
let cx = Arc::new(Cancel {
waker: AtomicWaker::new(),
canceled: AtomicBool::new(false),
});
(CancelTx(cx.clone()), CancelRx(cx))
}

impl CancelTx {
fn cancel(self) {
self.0.canceled.store(true, Ordering::SeqCst);
self.0.waker.wake();
}
}

// === Pending ===

impl<K, S, Req> Future for Pending<K, S, Req>
Expand All @@ -410,9 +430,10 @@ where

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut fut = this.cancel.as_mut().expect("polled after complete");
if let Poll::Ready(r) = Pin::new(&mut fut).poll(cx) {
assert!(r.is_ok(), "cancel sender lost");
// Before checking whether the service is ready, check to see whether
// readiness has been canceled.
let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete");
if cancel.canceled.load(Ordering::SeqCst) {
let key = this.key.take().expect("polled after complete");
return Err(PendingError::Canceled(key)).into();
}
Expand All @@ -423,7 +444,21 @@ where
.expect("polled after ready")
.poll_ready(cx)
{
Poll::Pending => Poll::Pending,
Poll::Pending => {
// Before returning Pending, register interest in cancelation so
// that this future is polled again if the state changes.
let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete");
cancel.waker.register(cx.waker());
// Because both the cancel receiver and cancel sender are held
// by the `ReadyCache` (i.e., on a single task), then it must
// not be possible for the cancelation state to change while
// polling a `Pending` service.
assert!(
!cancel.canceled.load(Ordering::SeqCst),
"cancelation cannot be notified while polling a pending service"
);
Poll::Pending
}
Poll::Ready(Ok(())) => {
let key = this.key.take().expect("polled after complete");
let cancel = this.cancel.take().expect("polled after complete");
Expand Down
32 changes: 31 additions & 1 deletion tower/tests/ready_cache/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
#[path = "../support.rs"]
mod support;

use std::pin::Pin;
use tokio_test::{assert_pending, assert_ready, task};
use tower::ready_cache::ReadyCache;
use tower::ready_cache::{error, ReadyCache};
use tower_test::mock;

type Req = &'static str;
Expand Down Expand Up @@ -191,3 +192,32 @@ fn duplicate_key_by_index() {
// _and_ service 0 should now be callable
assert!(task.enter(|cx, _| cache.check_ready(cx, &0)).unwrap());
}

// Tests https://github.com/tower-rs/tower/issues/415
#[tokio::test(flavor = "current_thread")]
async fn cancelation_observed() {
let mut cache = ReadyCache::default();
let mut handles = vec![];

// NOTE This test passes at 129 items, but fails at 130 items (if coop
// schedulding interferes with cancelation).
for _ in 0..130 {
let (svc, mut handle) = tower_test::mock::pair::<(), ()>();
handle.allow(1);
cache.push("ep0", svc);
handles.push(handle);
}

struct Ready(ReadyCache<&'static str, tower_test::mock::Mock<(), ()>, ()>);
impl Unpin for Ready {}
impl std::future::Future for Ready {
type Output = Result<(), error::Failed<&'static str>>;
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
self.get_mut().0.poll_pending(cx)
}
}
Ready(cache).await.unwrap();
}