diff --git a/linkerd/stack/src/failfast.rs b/linkerd/stack/src/failfast.rs index 2dba1ce693..04396edb2e 100644 --- a/linkerd/stack/src/failfast.rs +++ b/linkerd/stack/src/failfast.rs @@ -1,13 +1,10 @@ //! A middleware that limits the amount of time the service may be not ready //! before requests are failed. -mod gate; #[cfg(test)] mod test; -pub use self::gate::Gate; - -use crate::layer; +use crate::{gate, layer}; use futures::{FutureExt, TryFuture}; use linkerd_error::Error; use pin_project::pin_project; @@ -15,15 +12,10 @@ use std::{ future::Future, mem, pin::Pin, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, task::{Context, Poll}, }; use thiserror::Error; use tokio::{ - sync::Notify, task, time::{self, Duration, Instant, Sleep}, }; @@ -48,7 +40,7 @@ pub struct FailFast { timeout: Duration, wait: Pin>, state: State, - shared: Arc, + gate: Option, } /// An error representing that an operation timed out. @@ -65,12 +57,6 @@ enum State { Invalid, } -#[derive(Debug)] -struct Shared { - notify: Notify, - in_failfast: AtomicBool, -} - #[pin_project(project = ResponseFutureProj)] pub enum ResponseFuture { Inner(#[pin] F), @@ -80,18 +66,9 @@ pub enum ResponseFuture { // === impl FailFast === impl FailFast { - /// Returns a layer for producing a `FailFast` without a paired [`Gate`]. + /// Returns a layer for producing a `FailFast` without a paired [`gate::Gate`]. pub fn layer(timeout: Duration) -> impl layer::Layer + Clone { - layer::mk(move |inner| { - Self::new( - timeout, - Arc::new(Shared { - notify: Notify::new(), - in_failfast: AtomicBool::new(false), - }), - inner, - ) - }) + layer::mk(move |inner| Self::new(timeout, None, inner)) } /// Returns a layer for producing a `FailFast` pair wrapping an inner @@ -104,22 +81,18 @@ impl FailFast { pub fn layer_gated( timeout: Duration, inner_layer: L, - ) -> impl layer::Layer> + Clone + ) -> impl layer::Layer> + Clone where L: layer::Layer + Clone, { layer::mk(move |inner| { - let shared = Arc::new(Shared { - notify: Notify::new(), - in_failfast: AtomicBool::new(false), - }); - let inner = Self::new(timeout, shared.clone(), inner); - let inner = inner_layer.layer(inner); - Gate::new(inner, shared) + let (tx, rx) = gate::channel(); + let inner = inner_layer.layer(Self::new(timeout, Some(tx), inner)); + gate::Gate::new(inner, rx) }) } - fn new(timeout: Duration, shared: Arc, inner: S) -> Self { + fn new(timeout: Duration, gate: Option, inner: S) -> Self { Self { timeout, // The sleep is reset whenever the service becomes unavailable; this @@ -127,7 +100,7 @@ impl FailFast { // now. wait: Box::pin(time::sleep(Duration::default())), state: State::Open(inner), - shared, + gate, } } } @@ -195,15 +168,19 @@ where } warn!("Service entering failfast after {:?}", self.timeout); - self.shared.in_failfast.store(true, Ordering::Release); + if let Some(gate) = self.gate.as_ref() { + gate.shut(); + } - let shared = self.shared.clone(); + let gate = self.gate.clone(); self.state = State::FailFast(task::spawn(async move { let res = inner.ready().await; // Notify the paired `Gate` instances to begin // advertising readiness so that the failfast // service can advance. - shared.exit_failfast(); + if let Some(gate) = gate { + gate.open(); + } match res { Ok(_) => { info!("Service has recovered"); @@ -285,15 +262,3 @@ where } } } - -// === impl Shared === - -impl Shared { - fn exit_failfast(&self) { - // The load part of this operation can be `Relaxed` because this task - // is the only place where the the value is ever set. - if self.in_failfast.swap(false, Ordering::Release) { - self.notify.notify_waiters(); - } - } -} diff --git a/linkerd/stack/src/failfast/gate.rs b/linkerd/stack/src/failfast/gate.rs deleted file mode 100644 index 85a1f9146a..0000000000 --- a/linkerd/stack/src/failfast/gate.rs +++ /dev/null @@ -1,107 +0,0 @@ -use super::Shared; -use futures::{ready, FutureExt}; -use std::{ - sync::{atomic::Ordering, Arc}, - task::{Context, Poll}, -}; -use tokio_util::sync::ReusableBoxFuture; -use tracing::trace; - -/// A middleware which, when paired with a [`FailFast`] middleware, advertises -/// the *actual* readiness state of the [`FailFast`]'s inner service up the -/// stack. -/// -/// A [`FailFast`]/[`Gate`] pair is primarily intended to be used in -/// conjunction with a `tower::Buffer`. By placing the [`FailFast`] middleware -/// inside of the `Buffer` and the `Gate` middleware outside of the buffer, -/// the buffer's queue can be proactively drained when the inner service enters -/// failfast, while the outer `Gate` middleware will continue to return -/// [`Poll::Pending`] from its `poll_ready` method. This can be used to fail any -/// requests that have already been dispatched to the inner service while it is in -/// failfast, while allowing a load balancer or other traffic distributor to -/// send any new requests to a different backend until this backend actually -/// becomes available. -/// -/// A `Layer`, such as a `Buffer` layer, may be wrapped in a new `Layer` which -/// produces a [`FailFast`]/[`Gate`] pair around the inner `Layer`'s -/// service using the [`FailFast::layer_gated`] function. -/// -/// [`FailFast`]: super::FailFast -/// [`FailFast::layer_gated`]: super::FailFast::layer_gated -#[derive(Debug)] -pub struct Gate { - inner: S, - shared: Arc, - - /// Are we currently waiting on a notification that the inner service has - /// exited failfast? - is_waiting: bool, - - /// Future awaiting a notification from the inner `FailFast` service. - waiting: ReusableBoxFuture<'static, ()>, -} - -// === impl Gate === - -impl Gate { - pub(super) fn new(inner: S, shared: Arc) -> Self { - Self { - inner, - shared, - is_waiting: false, - waiting: ReusableBoxFuture::new(async move { unreachable!() }), - } - } -} - -impl Clone for Gate -where - S: Clone, -{ - fn clone(&self) -> Self { - Self::new(self.inner.clone(), self.shared.clone()) - } -} - -impl tower::Service for Gate -where - S: tower::Service, -{ - type Response = S::Response; - type Error = S::Error; - type Future = S::Future; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - loop { - // Check the inner service's state. If it's not in failfast, use the - // inner service directly. - if !self.shared.in_failfast.load(Ordering::Acquire) { - trace!("service is not in failfast"); - self.is_waiting = false; - return self.inner.poll_ready(cx); - } - trace!("service in failfast"); - - // Ensure that this task is notified when the inner service exits - // failfast. Return pending until we are notified about a change. - if !self.is_waiting { - trace!("resetting watch"); - let shared = self.shared.clone(); - self.waiting.set(async move { - shared.notify.notified().await; - trace!("notified"); - }); - self.is_waiting = true; - } - trace!("waiting for service to become ready",); - ready!(self.waiting.poll_unpin(cx)); - trace!("service became ready"); - self.is_waiting = false; - } - } - - #[inline] - fn call(&mut self, req: T) -> Self::Future { - self.inner.call(req) - } -} diff --git a/linkerd/stack/src/failfast/test.rs b/linkerd/stack/src/failfast/test.rs index 8c9d98b0f6..7ff84bdc9c 100644 --- a/linkerd/stack/src/failfast/test.rs +++ b/linkerd/stack/src/failfast/test.rs @@ -10,11 +10,7 @@ async fn fails_fast() { let max_unavailable = Duration::from_millis(100); let (service, mut handle) = mock::pair::<(), ()>(); - let shared = Arc::new(Shared { - notify: tokio::sync::Notify::new(), - in_failfast: AtomicBool::new(false), - }); - let mut service = Spawn::new(FailFast::new(max_unavailable, shared, service)); + let mut service = Spawn::new(FailFast::new(max_unavailable, None, service)); // The inner starts unavailable. handle.allow(0); diff --git a/linkerd/stack/src/gate.rs b/linkerd/stack/src/gate.rs new file mode 100644 index 0000000000..f0ce490427 --- /dev/null +++ b/linkerd/stack/src/gate.rs @@ -0,0 +1,261 @@ +use crate::Service; +use futures::{ready, FutureExt}; +use std::{ + sync::{atomic::AtomicBool, Arc}, + task::{Context, Poll}, +}; +use tokio::sync::Notify; +use tokio_util::sync::ReusableBoxFuture; +use tracing::{debug, trace}; + +/// A middleware that alters its readiness state according to a gate channel. +pub struct Gate { + inner: S, + rx: Rx, + is_waiting: bool, + waiting: ReusableBoxFuture<'static, ()>, +} + +/// Observes gate state changes. +#[derive(Clone, Debug)] +pub struct Rx(Arc, Arc<()>); + +/// Changes the gate state. +#[derive(Clone, Debug)] +pub struct Tx(Arc); + +#[derive(Debug)] +struct Shared { + open: AtomicBool, + notify: Notify, + closed: Notify, +} + +/// Creates a new gate channel. +pub fn channel() -> (Tx, Rx) { + let shared = Arc::new(Shared { + open: AtomicBool::new(true), + notify: Notify::new(), + closed: Notify::new(), + }); + (Tx(shared.clone()), Rx(shared, Arc::new(()))) +} + +// === impl Rx === + +impl Rx { + /// Indicates whether the gate is open. + pub fn is_open(&self) -> bool { + self.0.open.load(std::sync::atomic::Ordering::Acquire) + } + + /// Indicates whether the gate is closed. + #[inline] + pub fn is_shut(&self) -> bool { + !self.is_open() + } + + /// Waits for the gate state to change. + pub async fn changed(&self) -> bool { + self.0.notify.notified().await; + self.is_open() + } + + /// Waits for the gate state to change to be open. + pub async fn opened(&self) { + if self.is_open() { + return; + } + + while !self.changed().await {} + } +} + +impl Drop for Rx { + fn drop(&mut self) { + let Rx(_, handle) = self; + if Arc::strong_count(handle) == 1 { + self.0.closed.notify_waiters(); + } + } +} + +// === impl Tx === + +impl Tx { + /// Returns when all associated `Rx` clones are dropped. + pub async fn closed(&self) { + self.0.closed.notified().await; + } + + /// Opens the gate. + pub fn open(&self) { + if !self.0.open.swap(true, std::sync::atomic::Ordering::Release) { + debug!("Gate opened"); + self.0.notify.notify_waiters(); + } + } + + /// Closes the gate. + pub fn shut(&self) { + if self + .0 + .open + .swap(false, std::sync::atomic::Ordering::Release) + { + debug!("Gate shut"); + self.0.notify.notify_waiters(); + } + } +} + +// === impl Gate === + +impl Gate { + pub fn channel(inner: S) -> (Tx, Self) { + let (tx, rx) = channel(); + (tx, Self::new(inner, rx)) + } + + pub fn new(inner: S, rx: Rx) -> Self { + Self { + inner, + rx, + is_waiting: false, + waiting: ReusableBoxFuture::new(futures::future::pending()), + } + } +} + +impl Clone for Gate +where + S: Clone, +{ + fn clone(&self) -> Self { + Self::new(self.inner.clone(), self.rx.clone()) + } +} + +impl Service for Gate +where + S: Service, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // If the gate is shut, wait for it to open by storing a future that + // will complete when it opens. This ensures that waiters are notified. + while self.rx.is_shut() { + trace!(gate.open = false); + if !self.is_waiting { + let rx = self.rx.clone(); + self.waiting.set(async move { rx.opened().await }); + self.is_waiting = true; + } + ready!(self.waiting.poll_unpin(cx)); + } + + debug_assert!(self.rx.is_open()); + self.is_waiting = false; + trace!(gate.open = true); + + // When the gate is open, poll the inner service. + self.inner.poll_ready(cx) + } + + #[inline] + fn call(&mut self, req: Req) -> Self::Future { + self.inner.call(req) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn gate() { + let (tx, rx) = channel(); + let (mut gate, mut handle) = + tower_test::mock::spawn_with::<(), (), _, _>(move |inner| Gate::new(inner, rx.clone())); + + handle.allow(1); + tx.shut(); + assert!(gate.poll_ready().is_pending()); + + tx.open(); + assert!(gate.poll_ready().is_ready()); + } + + #[tokio::test] + async fn gate_polls_inner() { + let (tx, rx) = channel(); + let (mut gate, mut handle) = + tower_test::mock::spawn_with::<(), (), _, _>(move |inner| Gate::new(inner, rx.clone())); + + handle.allow(0); + assert!(gate.poll_ready().is_pending()); + + tx.shut(); + assert!(gate.poll_ready().is_pending()); + + tx.open(); + assert!(gate.poll_ready().is_pending()); + + handle.allow(1); + assert!(gate.poll_ready().is_ready()); + } + + #[tokio::test] + async fn notifies_on_open() { + let (tx, rx) = channel(); + let (mut gate, mut handle) = + tower_test::mock::spawn_with::<(), (), _, _>(move |inner| Gate::new(inner, rx.clone())); + + // Start with a shut gate on an available inner service. + handle.allow(1); + tx.shut(); + + // Wait for the gated service to become ready. + assert!(gate.poll_ready().is_pending()); + + // Open the gate and verify that the readiness future fires. + tx.open(); + assert!(gate.poll_ready().is_ready()); + } + + #[tokio::test] + async fn channel_closes() { + let (tx, rx) = channel(); + let mut closed = tokio_test::task::spawn(tx.closed()); + assert!(closed.poll().is_pending()); + drop(rx); + assert!(closed.poll().is_ready()); + } + + #[tokio::test] + async fn channel_closes_after_clones() { + let (tx, rx0) = channel(); + let mut closed = tokio_test::task::spawn(tx.closed()); + let rx1 = rx0.clone(); + assert!(closed.poll().is_pending()); + drop(rx0); + assert!(closed.poll().is_pending()); + drop(rx1); + assert!(closed.poll().is_ready()); + } + + #[tokio::test] + async fn channel_closes_after_clones_reordered() { + let (tx, rx0) = channel(); + let mut closed = tokio_test::task::spawn(tx.closed()); + let rx1 = rx0.clone(); + assert!(closed.poll().is_pending()); + drop(rx1); + assert!(closed.poll().is_pending()); + drop(rx0); + assert!(closed.poll().is_ready()); + } +} diff --git a/linkerd/stack/src/lib.rs b/linkerd/stack/src/lib.rs index 26c370f678..b419d03c47 100644 --- a/linkerd/stack/src/lib.rs +++ b/linkerd/stack/src/lib.rs @@ -12,6 +12,7 @@ mod fail; mod fail_on_error; pub mod failfast; mod filter; +pub mod gate; pub mod layer; mod lazy; mod loadshed; @@ -37,8 +38,9 @@ pub use self::{ either::{Either, NewEither}, fail::Fail, fail_on_error::FailOnError, - failfast::{FailFast, FailFastError, Gate}, + failfast::{FailFast, FailFastError}, filter::{Filter, FilterLayer, Predicate}, + gate::Gate, lazy::{Lazy, NewLazy}, loadshed::{LoadShed, LoadShedError}, map_err::{MapErr, MapErrBoxed, NewMapErr, WrapErr}, diff --git a/linkerd/stack/src/queue.rs b/linkerd/stack/src/queue.rs index 077b01c666..2e9368f8d0 100644 --- a/linkerd/stack/src/queue.rs +++ b/linkerd/stack/src/queue.rs @@ -1,5 +1,6 @@ use crate::{ - failfast::{self, FailFast}, + failfast::FailFast, + gate, layer::{self, Layer}, BoxService, ExtractParam, NewService, Service, }; @@ -33,7 +34,7 @@ pub struct NewQueueWithoutTimeout { _req: PhantomData, } -pub type Queue = failfast::Gate, Req>>; +pub type Queue = gate::Gate, Req>>; pub type QueueWithoutTimeout = Buffer, Req>; @@ -48,7 +49,7 @@ impl NewQueue { } } - /// Returns a [`Layer`] that constructs new [`failfast::Gate`]d [`Buffer`]s + /// Returns a [`Layer`] that constructs new [`gate::Gate`]d [`Buffer`]s /// using an `X`-typed [`ExtractParam`] implementation to extract /// [`Capacity`] and [`Timeout`] from a `T`-typed target. #[inline]