From 7c2ba16038ba647337b067465a89bb62f9cc4c28 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 28 Feb 2023 16:25:41 +0000 Subject: [PATCH 1/8] stack: Make `failfast::Gate` general purpose `failfast::Gate` is a stack middleware that supports changing its readiness based on some external signal (the failfast state changing). Nothing about the gate's behavior is specific to failfast, though; and this type of gating is general useful on its own. To setup further reuse, this change moves `stack::failfast::gate` to `stack::gate`. The gate is now tested independently from failfast. Failfast continues to maintain an (optional) `gate::Tx` so that it can control a gate. --- linkerd/stack/src/failfast.rs | 69 +++------- linkerd/stack/src/failfast/gate.rs | 107 --------------- linkerd/stack/src/failfast/test.rs | 6 +- linkerd/stack/src/gate.rs | 210 +++++++++++++++++++++++++++++ linkerd/stack/src/lib.rs | 4 +- linkerd/stack/src/queue.rs | 5 +- 6 files changed, 234 insertions(+), 167 deletions(-) delete mode 100644 linkerd/stack/src/failfast/gate.rs create mode 100644 linkerd/stack/src/gate.rs diff --git a/linkerd/stack/src/failfast.rs b/linkerd/stack/src/failfast.rs index 2dba1ce693..04396edb2e 100644 --- a/linkerd/stack/src/failfast.rs +++ b/linkerd/stack/src/failfast.rs @@ -1,13 +1,10 @@ //! A middleware that limits the amount of time the service may be not ready //! before requests are failed. -mod gate; #[cfg(test)] mod test; -pub use self::gate::Gate; - -use crate::layer; +use crate::{gate, layer}; use futures::{FutureExt, TryFuture}; use linkerd_error::Error; use pin_project::pin_project; @@ -15,15 +12,10 @@ use std::{ future::Future, mem, pin::Pin, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, task::{Context, Poll}, }; use thiserror::Error; use tokio::{ - sync::Notify, task, time::{self, Duration, Instant, Sleep}, }; @@ -48,7 +40,7 @@ pub struct FailFast { timeout: Duration, wait: Pin>, state: State, - shared: Arc, + gate: Option, } /// An error representing that an operation timed out. @@ -65,12 +57,6 @@ enum State { Invalid, } -#[derive(Debug)] -struct Shared { - notify: Notify, - in_failfast: AtomicBool, -} - #[pin_project(project = ResponseFutureProj)] pub enum ResponseFuture { Inner(#[pin] F), @@ -80,18 +66,9 @@ pub enum ResponseFuture { // === impl FailFast === impl FailFast { - /// Returns a layer for producing a `FailFast` without a paired [`Gate`]. + /// Returns a layer for producing a `FailFast` without a paired [`gate::Gate`]. pub fn layer(timeout: Duration) -> impl layer::Layer + Clone { - layer::mk(move |inner| { - Self::new( - timeout, - Arc::new(Shared { - notify: Notify::new(), - in_failfast: AtomicBool::new(false), - }), - inner, - ) - }) + layer::mk(move |inner| Self::new(timeout, None, inner)) } /// Returns a layer for producing a `FailFast` pair wrapping an inner @@ -104,22 +81,18 @@ impl FailFast { pub fn layer_gated( timeout: Duration, inner_layer: L, - ) -> impl layer::Layer> + Clone + ) -> impl layer::Layer> + Clone where L: layer::Layer + Clone, { layer::mk(move |inner| { - let shared = Arc::new(Shared { - notify: Notify::new(), - in_failfast: AtomicBool::new(false), - }); - let inner = Self::new(timeout, shared.clone(), inner); - let inner = inner_layer.layer(inner); - Gate::new(inner, shared) + let (tx, rx) = gate::channel(); + let inner = inner_layer.layer(Self::new(timeout, Some(tx), inner)); + gate::Gate::new(inner, rx) }) } - fn new(timeout: Duration, shared: Arc, inner: S) -> Self { + fn new(timeout: Duration, gate: Option, inner: S) -> Self { Self { timeout, // The sleep is reset whenever the service becomes unavailable; this @@ -127,7 +100,7 @@ impl FailFast { // now. wait: Box::pin(time::sleep(Duration::default())), state: State::Open(inner), - shared, + gate, } } } @@ -195,15 +168,19 @@ where } warn!("Service entering failfast after {:?}", self.timeout); - self.shared.in_failfast.store(true, Ordering::Release); + if let Some(gate) = self.gate.as_ref() { + gate.shut(); + } - let shared = self.shared.clone(); + let gate = self.gate.clone(); self.state = State::FailFast(task::spawn(async move { let res = inner.ready().await; // Notify the paired `Gate` instances to begin // advertising readiness so that the failfast // service can advance. - shared.exit_failfast(); + if let Some(gate) = gate { + gate.open(); + } match res { Ok(_) => { info!("Service has recovered"); @@ -285,15 +262,3 @@ where } } } - -// === impl Shared === - -impl Shared { - fn exit_failfast(&self) { - // The load part of this operation can be `Relaxed` because this task - // is the only place where the the value is ever set. - if self.in_failfast.swap(false, Ordering::Release) { - self.notify.notify_waiters(); - } - } -} diff --git a/linkerd/stack/src/failfast/gate.rs b/linkerd/stack/src/failfast/gate.rs deleted file mode 100644 index 85a1f9146a..0000000000 --- a/linkerd/stack/src/failfast/gate.rs +++ /dev/null @@ -1,107 +0,0 @@ -use super::Shared; -use futures::{ready, FutureExt}; -use std::{ - sync::{atomic::Ordering, Arc}, - task::{Context, Poll}, -}; -use tokio_util::sync::ReusableBoxFuture; -use tracing::trace; - -/// A middleware which, when paired with a [`FailFast`] middleware, advertises -/// the *actual* readiness state of the [`FailFast`]'s inner service up the -/// stack. -/// -/// A [`FailFast`]/[`Gate`] pair is primarily intended to be used in -/// conjunction with a `tower::Buffer`. By placing the [`FailFast`] middleware -/// inside of the `Buffer` and the `Gate` middleware outside of the buffer, -/// the buffer's queue can be proactively drained when the inner service enters -/// failfast, while the outer `Gate` middleware will continue to return -/// [`Poll::Pending`] from its `poll_ready` method. This can be used to fail any -/// requests that have already been dispatched to the inner service while it is in -/// failfast, while allowing a load balancer or other traffic distributor to -/// send any new requests to a different backend until this backend actually -/// becomes available. -/// -/// A `Layer`, such as a `Buffer` layer, may be wrapped in a new `Layer` which -/// produces a [`FailFast`]/[`Gate`] pair around the inner `Layer`'s -/// service using the [`FailFast::layer_gated`] function. -/// -/// [`FailFast`]: super::FailFast -/// [`FailFast::layer_gated`]: super::FailFast::layer_gated -#[derive(Debug)] -pub struct Gate { - inner: S, - shared: Arc, - - /// Are we currently waiting on a notification that the inner service has - /// exited failfast? - is_waiting: bool, - - /// Future awaiting a notification from the inner `FailFast` service. - waiting: ReusableBoxFuture<'static, ()>, -} - -// === impl Gate === - -impl Gate { - pub(super) fn new(inner: S, shared: Arc) -> Self { - Self { - inner, - shared, - is_waiting: false, - waiting: ReusableBoxFuture::new(async move { unreachable!() }), - } - } -} - -impl Clone for Gate -where - S: Clone, -{ - fn clone(&self) -> Self { - Self::new(self.inner.clone(), self.shared.clone()) - } -} - -impl tower::Service for Gate -where - S: tower::Service, -{ - type Response = S::Response; - type Error = S::Error; - type Future = S::Future; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - loop { - // Check the inner service's state. If it's not in failfast, use the - // inner service directly. - if !self.shared.in_failfast.load(Ordering::Acquire) { - trace!("service is not in failfast"); - self.is_waiting = false; - return self.inner.poll_ready(cx); - } - trace!("service in failfast"); - - // Ensure that this task is notified when the inner service exits - // failfast. Return pending until we are notified about a change. - if !self.is_waiting { - trace!("resetting watch"); - let shared = self.shared.clone(); - self.waiting.set(async move { - shared.notify.notified().await; - trace!("notified"); - }); - self.is_waiting = true; - } - trace!("waiting for service to become ready",); - ready!(self.waiting.poll_unpin(cx)); - trace!("service became ready"); - self.is_waiting = false; - } - } - - #[inline] - fn call(&mut self, req: T) -> Self::Future { - self.inner.call(req) - } -} diff --git a/linkerd/stack/src/failfast/test.rs b/linkerd/stack/src/failfast/test.rs index 8c9d98b0f6..7ff84bdc9c 100644 --- a/linkerd/stack/src/failfast/test.rs +++ b/linkerd/stack/src/failfast/test.rs @@ -10,11 +10,7 @@ async fn fails_fast() { let max_unavailable = Duration::from_millis(100); let (service, mut handle) = mock::pair::<(), ()>(); - let shared = Arc::new(Shared { - notify: tokio::sync::Notify::new(), - in_failfast: AtomicBool::new(false), - }); - let mut service = Spawn::new(FailFast::new(max_unavailable, shared, service)); + let mut service = Spawn::new(FailFast::new(max_unavailable, None, service)); // The inner starts unavailable. handle.allow(0); diff --git a/linkerd/stack/src/gate.rs b/linkerd/stack/src/gate.rs new file mode 100644 index 0000000000..140fc06053 --- /dev/null +++ b/linkerd/stack/src/gate.rs @@ -0,0 +1,210 @@ +use crate::Service; +use futures::{ready, FutureExt}; +use std::{ + sync::{atomic::AtomicBool, Arc}, + task::{Context, Poll}, +}; +use tokio::sync::Notify; +use tokio_util::sync::ReusableBoxFuture; +use tracing::debug; + +/// A middleware that alters its readiness state according to a gate channel. +pub struct Gate { + inner: S, + rx: Rx, + is_waiting: bool, + waiting: ReusableBoxFuture<'static, ()>, +} + +/// Observes gate state changes. +#[derive(Clone, Debug)] +pub struct Rx(Arc); + +/// Changes the gate state. +#[derive(Clone, Debug)] +pub struct Tx(Arc); + +#[derive(Debug)] +struct Shared { + open: AtomicBool, + notify: Notify, +} + +/// Creates a new gate channel. +pub fn channel() -> (Tx, Rx) { + let shared = Arc::new(Shared { + open: AtomicBool::new(true), + notify: Notify::new(), + }); + (Tx(shared.clone()), Rx(shared)) +} + +// === impl Rx === + +impl Rx { + /// Indicates whether the gate is open. + pub fn is_open(&self) -> bool { + self.0.open.load(std::sync::atomic::Ordering::Acquire) + } + + /// Indicates whether the gate is closed. + #[inline] + pub fn is_shut(&self) -> bool { + !self.is_open() + } + + /// Waits for the gate state to change. + pub async fn changed(&self) -> bool { + self.0.notify.notified().await; + self.is_open() + } +} + +// === impl Tx === + +impl Tx { + /// Opens the gate. + pub fn open(&self) { + if !self.0.open.swap(true, std::sync::atomic::Ordering::Release) { + debug!("Gate opened"); + self.0.notify.notify_waiters(); + } + } + + /// Closes the gate. + pub fn shut(&self) { + if self + .0 + .open + .swap(false, std::sync::atomic::Ordering::Release) + { + debug!("Gate shut"); + self.0.notify.notify_waiters(); + } + } +} + +// === impl Gate === + +impl Gate { + pub fn channel(inner: S) -> (Tx, Self) { + let (tx, rx) = channel(); + (tx, Self::new(inner, rx)) + } + + pub fn new(inner: S, rx: Rx) -> Self { + let (waiting, is_waiting) = if rx.is_open() { + let waiting = ReusableBoxFuture::new(async { unreachable!() }); + (waiting, false) + } else { + let Rx(rx) = rx.clone(); + let waiting = ReusableBoxFuture::new(async move { + rx.notify.notified().await; + }); + (waiting, true) + }; + + Self { + inner, + rx, + is_waiting, + waiting, + } + } +} + +impl Clone for Gate +where + S: Clone, +{ + fn clone(&self) -> Self { + Self::new(self.inner.clone(), self.rx.clone()) + } +} + +impl Service for Gate +where + S: Service, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // If the gate is shut, wait for it to open by storing a future that + // will complete when it opens. This ensures that waiters are notified. + if self.rx.is_shut() { + if !self.is_waiting { + let rx = self.rx.clone(); + self.waiting.set(async move { + rx.changed().await; + }); + self.is_waiting = true; + } + ready!(self.waiting.poll_unpin(cx)); + } + + debug_assert!(self.rx.is_open()); + self.is_waiting = false; + + // When the gate is open, poll the inner service. + self.inner.poll_ready(cx) + } + + #[inline] + fn call(&mut self, req: Req) -> Self::Future { + self.inner.call(req) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ServiceExt; + + #[tokio::test] + async fn gate() { + let (tx, rx) = channel(); + let (mut gate, mut handle) = + tower_test::mock::spawn_with::<(), (), _, _>(move |inner| Gate::new(inner, rx.clone())); + + handle.allow(0); + assert!(gate.poll_ready().is_pending()); + + tx.shut(); + assert!(gate.poll_ready().is_pending()); + + tx.open(); + assert!(gate.poll_ready().is_pending()); + + handle.allow(1); + assert!(gate.poll_ready().is_ready()); + } + + #[tokio::test] + async fn notifies_on_open() { + let (tx, rx) = channel(); + let (inner, mut handle) = tower_test::mock::pair::<(), ()>(); + let mut gate = Gate::new(inner, rx); + + // Start with a shut gate on an available inner service. + handle.allow(1); + tx.shut(); + + // Wait for the gated service to become ready. + let mut ready = gate.ready(); + tokio::select! { + biased; + _ = &mut ready => panic!("unexpected ready"), + _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {} + } + + // Open the gate and verify that the readiness future fires. + tx.open(); + tokio::select! { + biased; + _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {} + _ = ready => println!("notified"), + } + } +} diff --git a/linkerd/stack/src/lib.rs b/linkerd/stack/src/lib.rs index 26c370f678..b419d03c47 100644 --- a/linkerd/stack/src/lib.rs +++ b/linkerd/stack/src/lib.rs @@ -12,6 +12,7 @@ mod fail; mod fail_on_error; pub mod failfast; mod filter; +pub mod gate; pub mod layer; mod lazy; mod loadshed; @@ -37,8 +38,9 @@ pub use self::{ either::{Either, NewEither}, fail::Fail, fail_on_error::FailOnError, - failfast::{FailFast, FailFastError, Gate}, + failfast::{FailFast, FailFastError}, filter::{Filter, FilterLayer, Predicate}, + gate::Gate, lazy::{Lazy, NewLazy}, loadshed::{LoadShed, LoadShedError}, map_err::{MapErr, MapErrBoxed, NewMapErr, WrapErr}, diff --git a/linkerd/stack/src/queue.rs b/linkerd/stack/src/queue.rs index 077b01c666..a55009a4a4 100644 --- a/linkerd/stack/src/queue.rs +++ b/linkerd/stack/src/queue.rs @@ -1,5 +1,6 @@ use crate::{ - failfast::{self, FailFast}, + failfast::FailFast, + gate, layer::{self, Layer}, BoxService, ExtractParam, NewService, Service, }; @@ -33,7 +34,7 @@ pub struct NewQueueWithoutTimeout { _req: PhantomData, } -pub type Queue = failfast::Gate, Req>>; +pub type Queue = gate::Gate, Req>>; pub type QueueWithoutTimeout = Buffer, Req>; From 401fa5b5436933d21fb33c9b3d91dc5a319c27dd Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 28 Feb 2023 17:55:25 +0000 Subject: [PATCH 2/8] trace events --- linkerd/stack/src/gate.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/linkerd/stack/src/gate.rs b/linkerd/stack/src/gate.rs index 140fc06053..7f4e6a0221 100644 --- a/linkerd/stack/src/gate.rs +++ b/linkerd/stack/src/gate.rs @@ -6,7 +6,7 @@ use std::{ }; use tokio::sync::Notify; use tokio_util::sync::ReusableBoxFuture; -use tracing::debug; +use tracing::{debug, trace}; /// A middleware that alters its readiness state according to a gate channel. pub struct Gate { @@ -133,7 +133,8 @@ where fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // If the gate is shut, wait for it to open by storing a future that // will complete when it opens. This ensures that waiters are notified. - if self.rx.is_shut() { + while self.rx.is_shut() { + trace!(gate.open = false); if !self.is_waiting { let rx = self.rx.clone(); self.waiting.set(async move { @@ -146,6 +147,7 @@ where debug_assert!(self.rx.is_open()); self.is_waiting = false; + trace!(gate.open = true); // When the gate is open, poll the inner service. self.inner.poll_ready(cx) From f046ea9ab282df8e2eeaa1d8af9466ae99466aa9 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 28 Feb 2023 18:04:09 +0000 Subject: [PATCH 3/8] fix test error condition --- linkerd/stack/src/gate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkerd/stack/src/gate.rs b/linkerd/stack/src/gate.rs index 7f4e6a0221..0c5cc85b5b 100644 --- a/linkerd/stack/src/gate.rs +++ b/linkerd/stack/src/gate.rs @@ -205,7 +205,7 @@ mod tests { tx.open(); tokio::select! { biased; - _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {} + _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => panic!("timed out"), _ = ready => println!("notified"), } } From bc1c5545e54862dcb6d4af431b6ab4265565c330 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 28 Feb 2023 18:23:49 +0000 Subject: [PATCH 4/8] golf --- linkerd/stack/src/gate.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/linkerd/stack/src/gate.rs b/linkerd/stack/src/gate.rs index 0c5cc85b5b..96bd43c6a6 100644 --- a/linkerd/stack/src/gate.rs +++ b/linkerd/stack/src/gate.rs @@ -13,7 +13,7 @@ pub struct Gate { inner: S, rx: Rx, is_waiting: bool, - waiting: ReusableBoxFuture<'static, ()>, + waiting: ReusableBoxFuture<'static, bool>, } /// Observes gate state changes. @@ -94,13 +94,11 @@ impl Gate { pub fn new(inner: S, rx: Rx) -> Self { let (waiting, is_waiting) = if rx.is_open() { - let waiting = ReusableBoxFuture::new(async { unreachable!() }); + let waiting = ReusableBoxFuture::new(async { true }); (waiting, false) } else { - let Rx(rx) = rx.clone(); - let waiting = ReusableBoxFuture::new(async move { - rx.notify.notified().await; - }); + let rx = rx.clone(); + let waiting = ReusableBoxFuture::new(async move { rx.changed().await }); (waiting, true) }; @@ -137,9 +135,7 @@ where trace!(gate.open = false); if !self.is_waiting { let rx = self.rx.clone(); - self.waiting.set(async move { - rx.changed().await; - }); + self.waiting.set(async move { rx.changed().await }); self.is_waiting = true; } ready!(self.waiting.poll_unpin(cx)); From 6c7bffe5565d6accd10b0f48fc902a2bf2fd4bf6 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 28 Feb 2023 19:37:56 +0000 Subject: [PATCH 5/8] more gate tests --- linkerd/stack/src/gate.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/linkerd/stack/src/gate.rs b/linkerd/stack/src/gate.rs index 96bd43c6a6..24320ceaed 100644 --- a/linkerd/stack/src/gate.rs +++ b/linkerd/stack/src/gate.rs @@ -166,6 +166,20 @@ mod tests { let (mut gate, mut handle) = tower_test::mock::spawn_with::<(), (), _, _>(move |inner| Gate::new(inner, rx.clone())); + handle.allow(1); + tx.shut(); + assert!(gate.poll_ready().is_pending()); + + tx.open(); + assert!(gate.poll_ready().is_ready()); + } + + #[tokio::test] + async fn gate_polls_inner() { + let (tx, rx) = channel(); + let (mut gate, mut handle) = + tower_test::mock::spawn_with::<(), (), _, _>(move |inner| Gate::new(inner, rx.clone())); + handle.allow(0); assert!(gate.poll_ready().is_pending()); From c19ebde08a1dc256673ed842ccea55bde5d4d553 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 28 Feb 2023 19:51:39 +0000 Subject: [PATCH 6/8] fixup docstring --- linkerd/stack/src/queue.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkerd/stack/src/queue.rs b/linkerd/stack/src/queue.rs index a55009a4a4..2e9368f8d0 100644 --- a/linkerd/stack/src/queue.rs +++ b/linkerd/stack/src/queue.rs @@ -49,7 +49,7 @@ impl NewQueue { } } - /// Returns a [`Layer`] that constructs new [`failfast::Gate`]d [`Buffer`]s + /// Returns a [`Layer`] that constructs new [`gate::Gate`]d [`Buffer`]s /// using an `X`-typed [`ExtractParam`] implementation to extract /// [`Capacity`] and [`Timeout`] from a `T`-typed target. #[inline] From 5bebe1133dc2250bd3d862e96a2c4fb5b9d7bf12 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 28 Feb 2023 20:19:15 +0000 Subject: [PATCH 7/8] Add Tx::closed, tests --- linkerd/stack/src/gate.rs | 71 ++++++++++++++++++++++++++++++--------- 1 file changed, 55 insertions(+), 16 deletions(-) diff --git a/linkerd/stack/src/gate.rs b/linkerd/stack/src/gate.rs index 24320ceaed..31548344fb 100644 --- a/linkerd/stack/src/gate.rs +++ b/linkerd/stack/src/gate.rs @@ -18,7 +18,7 @@ pub struct Gate { /// Observes gate state changes. #[derive(Clone, Debug)] -pub struct Rx(Arc); +pub struct Rx(Arc, Arc<()>); /// Changes the gate state. #[derive(Clone, Debug)] @@ -28,6 +28,7 @@ pub struct Tx(Arc); struct Shared { open: AtomicBool, notify: Notify, + closed: Notify, } /// Creates a new gate channel. @@ -35,8 +36,9 @@ pub fn channel() -> (Tx, Rx) { let shared = Arc::new(Shared { open: AtomicBool::new(true), notify: Notify::new(), + closed: Notify::new(), }); - (Tx(shared.clone()), Rx(shared)) + (Tx(shared.clone()), Rx(shared, Arc::new(()))) } // === impl Rx === @@ -60,9 +62,23 @@ impl Rx { } } +impl Drop for Rx { + fn drop(&mut self) { + let Rx(_, handle) = self; + if Arc::strong_count(handle) == 1 { + self.0.closed.notify_waiters(); + } + } +} + // === impl Tx === impl Tx { + /// Returns when all associated `Rx` clones are dropped. + pub async fn closed(&self) { + self.0.closed.notified().await; + } + /// Opens the gate. pub fn open(&self) { if !self.0.open.swap(true, std::sync::atomic::Ordering::Release) { @@ -158,7 +174,6 @@ where #[cfg(test)] mod tests { use super::*; - use crate::ServiceExt; #[tokio::test] async fn gate() { @@ -196,27 +211,51 @@ mod tests { #[tokio::test] async fn notifies_on_open() { let (tx, rx) = channel(); - let (inner, mut handle) = tower_test::mock::pair::<(), ()>(); - let mut gate = Gate::new(inner, rx); + let (mut gate, mut handle) = + tower_test::mock::spawn_with::<(), (), _, _>(move |inner| Gate::new(inner, rx.clone())); // Start with a shut gate on an available inner service. handle.allow(1); tx.shut(); // Wait for the gated service to become ready. - let mut ready = gate.ready(); - tokio::select! { - biased; - _ = &mut ready => panic!("unexpected ready"), - _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {} - } + assert!(gate.poll_ready().is_pending()); // Open the gate and verify that the readiness future fires. tx.open(); - tokio::select! { - biased; - _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => panic!("timed out"), - _ = ready => println!("notified"), - } + assert!(gate.poll_ready().is_ready()); + } + + #[tokio::test] + async fn channel_closes() { + let (tx, rx) = channel(); + let mut closed = tokio_test::task::spawn(tx.closed()); + assert!(closed.poll().is_pending()); + drop(rx); + assert!(closed.poll().is_ready()); + } + + #[tokio::test] + async fn channel_closes_after_clones() { + let (tx, rx0) = channel(); + let mut closed = tokio_test::task::spawn(tx.closed()); + let rx1 = rx0.clone(); + assert!(closed.poll().is_pending()); + drop(rx0); + assert!(closed.poll().is_pending()); + drop(rx1); + assert!(closed.poll().is_ready()); + } + + #[tokio::test] + async fn channel_closes_after_clones_reordered() { + let (tx, rx0) = channel(); + let mut closed = tokio_test::task::spawn(tx.closed()); + let rx1 = rx0.clone(); + assert!(closed.poll().is_pending()); + drop(rx1); + assert!(closed.poll().is_pending()); + drop(rx0); + assert!(closed.poll().is_ready()); } } From 347213236b2069930da9a5b6cd9ce223eb5cddb5 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 28 Feb 2023 20:41:41 +0000 Subject: [PATCH 8/8] simplify more --- linkerd/stack/src/gate.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/linkerd/stack/src/gate.rs b/linkerd/stack/src/gate.rs index 31548344fb..f0ce490427 100644 --- a/linkerd/stack/src/gate.rs +++ b/linkerd/stack/src/gate.rs @@ -13,7 +13,7 @@ pub struct Gate { inner: S, rx: Rx, is_waiting: bool, - waiting: ReusableBoxFuture<'static, bool>, + waiting: ReusableBoxFuture<'static, ()>, } /// Observes gate state changes. @@ -60,6 +60,15 @@ impl Rx { self.0.notify.notified().await; self.is_open() } + + /// Waits for the gate state to change to be open. + pub async fn opened(&self) { + if self.is_open() { + return; + } + + while !self.changed().await {} + } } impl Drop for Rx { @@ -109,20 +118,11 @@ impl Gate { } pub fn new(inner: S, rx: Rx) -> Self { - let (waiting, is_waiting) = if rx.is_open() { - let waiting = ReusableBoxFuture::new(async { true }); - (waiting, false) - } else { - let rx = rx.clone(); - let waiting = ReusableBoxFuture::new(async move { rx.changed().await }); - (waiting, true) - }; - Self { inner, rx, - is_waiting, - waiting, + is_waiting: false, + waiting: ReusableBoxFuture::new(futures::future::pending()), } } } @@ -151,7 +151,7 @@ where trace!(gate.open = false); if !self.is_waiting { let rx = self.rx.clone(); - self.waiting.set(async move { rx.changed().await }); + self.waiting.set(async move { rx.opened().await }); self.is_waiting = true; } ready!(self.waiting.poll_unpin(cx));