From fe350d7c054f80fa3ec0a09c4a2b8ef82e6a9d72 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Wed, 4 Jan 2023 10:27:28 -0500 Subject: [PATCH 1/6] Add broadcast::Sender::len --- tokio/src/sync/broadcast.rs | 86 ++++++++++++++++++++++++++++++++++- tokio/tests/sync_broadcast.rs | 34 ++++++++++++++ 2 files changed, 118 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index ede990b046e..50ad5cef138 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -603,6 +603,88 @@ impl Sender { new_receiver(shared) } + /// Returns the number of queued values. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::broadcast; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx1) = broadcast::channel(16); + /// let mut rx2 = tx.subscribe(); + /// + /// tx.send(10).unwrap(); + /// tx.send(20).unwrap(); + /// tx.send(30).unwrap(); + /// + /// assert_eq!(tx.len(), 3); + /// + /// rx1.recv().await.unwrap(); + /// + /// // The len is still 3 since rx2 hasn't seen the first value yet. + /// assert_eq!(tx.len(), 3); + /// + /// rx2.recv().await.unwrap(); + /// + /// assert_eq!(tx.len(), 2); + /// } + /// ``` + pub fn len(&self) -> usize { + let tail = self.shared.tail.lock(); + + let base_idx = (tail.pos & self.shared.mask as u64) as usize; + let mut low = 0; + let mut high = self.shared.buffer.len(); + while low < high { + let mid = low + (high - low) / 2; + let idx = base_idx.wrapping_add(mid) & self.shared.mask; + if self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0 { + low = mid + 1; + } else { + high = mid; + } + } + + self.shared.buffer.len() - low + } + + /// Returns true if there are no queued values. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::broadcast; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx1) = broadcast::channel(16); + /// let mut rx2 = tx.subscribe(); + /// + /// assert!(tx.is_empty()); + /// + /// tx.send(10).unwrap(); + /// + /// assert!(!tx.is_empty()); + /// + /// rx1.recv().await.unwrap(); + /// + /// // The queue is still not empty since rx2 hasn't seen the value. + /// assert!(!tx.is_empty()); + /// + /// rx2.recv().await.unwrap(); + /// + /// assert!(tx.is_empty()); + /// } + /// ``` + pub fn is_empty(&self) -> bool { + let tail = self.shared.tail.lock(); + + let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize; + self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0 + } + /// Returns the number of active receivers /// /// An active receiver is a [`Receiver`] handle returned from [`channel`] or @@ -731,7 +813,7 @@ impl Receiver { /// assert_eq!(rx1.len(), 2); /// assert_eq!(rx1.recv().await.unwrap(), 10); /// assert_eq!(rx1.len(), 1); - /// assert_eq!(rx1.recv().await.unwrap(), 20); + /// assert_eq!(rx1.recv().await.unwrap(), 20); /// assert_eq!(rx1.len(), 0); /// } /// ``` @@ -761,7 +843,7 @@ impl Receiver { /// /// assert!(!rx1.is_empty()); /// assert_eq!(rx1.recv().await.unwrap(), 10); - /// assert_eq!(rx1.recv().await.unwrap(), 20); + /// assert_eq!(rx1.recv().await.unwrap(), 20); /// assert!(rx1.is_empty()); /// } /// ``` diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index 9aa34841e26..34409d324b4 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -526,3 +526,37 @@ fn resubscribe_to_closed_channel() { let mut rx_resub = rx.resubscribe(); assert_closed!(rx_resub.try_recv()); } + +#[test] +fn sender_len() { + let (tx, mut rx1) = broadcast::channel(4); + let mut rx2 = tx.subscribe(); + + assert_eq!(tx.len(), 0); + assert!(tx.is_empty()); + + tx.send(1).unwrap(); + tx.send(2).unwrap(); + tx.send(3).unwrap(); + + assert_eq!(tx.len(), 3); + assert!(!tx.is_empty()); + + assert_recv!(rx1); + assert_recv!(rx1); + + assert_eq!(tx.len(), 3); + assert!(!tx.is_empty()); + + assert_recv!(rx2); + + assert_eq!(tx.len(), 2); + assert!(!tx.is_empty()); + + tx.send(4).unwrap(); + tx.send(5).unwrap(); + tx.send(6).unwrap(); + + assert_eq!(tx.len(), 4); + assert!(!tx.is_empty()); +} From 623f4dc4ebb2d9690889274600cb2f2912799605 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Wed, 11 Jan 2023 13:46:21 -0500 Subject: [PATCH 2/6] Add a randomized test for broadcast::Sender::len --- tokio/tests/sync_broadcast.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index 34409d324b4..9eb2b1701b0 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -11,6 +11,7 @@ use tokio_test::{ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, }; +use rand::Rng; use std::sync::Arc; macro_rules! assert_recv { @@ -560,3 +561,26 @@ fn sender_len() { assert_eq!(tx.len(), 4); assert!(!tx.is_empty()); } + +#[test] +fn sender_len_random() { + let (tx, mut rx1) = broadcast::channel(16); + let mut rx2 = tx.subscribe(); + + for _ in 0..1000 { + match rand::thread_rng().gen_range(0..3) { + 0 => { + let _ = rx1.try_recv(); + } + 1 => { + let _ = rx2.try_recv(); + } + _ => { + tx.send(0).unwrap(); + } + } + + let expected_len = usize::min(usize::max(rx1.len(), rx2.len()), 16); + assert_eq!(tx.len(), expected_len); + } +} From c3eb08ac637e978e7d66954c02ff5afe123ec93e Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Wed, 11 Jan 2023 13:54:33 -0500 Subject: [PATCH 3/6] fix wasm build --- tokio/tests/sync_broadcast.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index 9eb2b1701b0..0031695761b 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -11,6 +11,7 @@ use tokio_test::{ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, }; +#[cfg(any(not(tokio_wasm), tokio_wasi))] use rand::Rng; use std::sync::Arc; @@ -563,6 +564,7 @@ fn sender_len() { } #[test] +#[cfg(any(not(tokio_wasm), tokio_wasi))] fn sender_len_random() { let (tx, mut rx1) = broadcast::channel(16); let mut rx2 = tx.subscribe(); From d65f7c6496cc6bd108c4b0fcb2c03c6a10bfa86f Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Wed, 11 Jan 2023 14:03:57 -0500 Subject: [PATCH 4/6] less silly cfg --- tokio/tests/sync_broadcast.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index 0031695761b..4f794d93925 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -11,7 +11,7 @@ use tokio_test::{ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, }; -#[cfg(any(not(tokio_wasm), tokio_wasi))] +#[cfg(not(tokio_wasm_not_wasi))] use rand::Rng; use std::sync::Arc; @@ -564,7 +564,7 @@ fn sender_len() { } #[test] -#[cfg(any(not(tokio_wasm), tokio_wasi))] +#[cfg(not(tokio_wasm_not_wasi))] fn sender_len_random() { let (tx, mut rx1) = broadcast::channel(16); let mut rx2 = tx.subscribe(); From 997e61825015d46a1412f12d45b0c0a9b16c967f Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Thu, 12 Jan 2023 09:43:10 -0500 Subject: [PATCH 5/6] review feedback --- tokio/src/sync/broadcast.rs | 9 +++++++++ tokio/tests/sync_broadcast.rs | 6 +++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 50ad5cef138..3001a7a3d7e 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -605,6 +605,15 @@ impl Sender { /// Returns the number of queued values. /// + /// A value is queued until it has either been seen by all receivers that were alive at the time + /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the + /// queue's capacity. + /// + /// # Note + /// + /// In contrast to [`Receiver::len`], this method only reports queued values but not values that + /// have been evicted from the queue before being seen by all receivers. + /// /// # Examples /// /// ``` diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index 4f794d93925..67c378b84a6 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -11,8 +11,6 @@ use tokio_test::{ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, }; -#[cfg(not(tokio_wasm_not_wasi))] -use rand::Rng; use std::sync::Arc; macro_rules! assert_recv { @@ -566,11 +564,13 @@ fn sender_len() { #[test] #[cfg(not(tokio_wasm_not_wasi))] fn sender_len_random() { + use rand::Rng; + let (tx, mut rx1) = broadcast::channel(16); let mut rx2 = tx.subscribe(); for _ in 0..1000 { - match rand::thread_rng().gen_range(0..3) { + match rand::thread_rng().gen_range(0..4) { 0 => { let _ = rx1.try_recv(); } From 6d71d793f05adaa0a953b79318f8559ef57a2898 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Thu, 12 Jan 2023 09:46:02 -0500 Subject: [PATCH 6/6] grammar? --- tokio/src/sync/broadcast.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 3001a7a3d7e..1c6b2caa3bb 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -611,7 +611,7 @@ impl Sender { /// /// # Note /// - /// In contrast to [`Receiver::len`], this method only reports queued values but not values that + /// In contrast to [`Receiver::len`], this method only reports queued values and not values that /// have been evicted from the queue before being seen by all receivers. /// /// # Examples