Skip to content

Commit

Permalink
sync: add buffer_len method for Sender in bounded
Browse files Browse the repository at this point in the history
  • Loading branch information
blasrodri committed Aug 28, 2020
1 parent 8270774 commit 19990ae
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 0 deletions.
4 changes: 4 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,4 +546,8 @@ impl<T> Sender<T> {
false
}
}
/// TODO: Add docs
pub fn buffer_len(&self) -> usize {
self.chan.available_permits()
}
}
14 changes: 14 additions & 0 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ pub(crate) trait Semaphore {
fn forget(&self, permit: &mut Self::Permit);

fn close(&self);

fn available_permits(&self) -> usize;
}

struct Chan<T, S> {
Expand Down Expand Up @@ -199,6 +201,10 @@ where
pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> {
self.inner.try_send(value, &mut self.permit)
}

pub(crate) fn available_permits(&self) -> usize {
self.inner.semaphore.available_permits()
}
}

impl<T> Tx<T, (crate::sync::semaphore_ll::Semaphore, usize)> {
Expand Down Expand Up @@ -465,6 +471,10 @@ impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) {
fn close(&self) {
self.0.close();
}

fn available_permits(&self) -> usize {
self.0.available_permits()
}
}

// ===== impl Semaphore for AtomicUsize =====
Expand Down Expand Up @@ -528,4 +538,8 @@ impl Semaphore for AtomicUsize {
fn close(&self) {
self.fetch_or(1, Release);
}

fn available_permits(&self) -> usize {
self.load(Relaxed)
}
}
9 changes: 9 additions & 0 deletions tokio/tests/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,3 +534,12 @@ async fn blocking_send_async() {
let (mut tx, _rx) = mpsc::channel::<()>(1);
let _ = tx.blocking_send(());
}

#[tokio::test]
async fn buffer_len_bounded() {
let buffer_size = 10;
let (mut tx, _rx) = mpsc::channel(buffer_size);
tx.try_send(0).unwrap();
tx.try_send(0).unwrap();
assert_eq!(tx.buffer_len(), buffer_size - 2);
}

0 comments on commit 19990ae

Please sign in to comment.