From 195bda64f73584c1a67e45f8483a844b9200169f Mon Sep 17 00:00:00 2001 From: Alexey Gerasev Date: Sat, 3 Feb 2024 11:47:46 +0700 Subject: [PATCH] Move uninit slice, add pop to vec methods --- async/src/tests.rs | 23 +++++- async/src/traits/consumer.rs | 57 ++++++++++++++- async/src/traits/producer.rs | 2 +- blocking/src/tests.rs | 132 ++++++++++++++++++++++++----------- blocking/src/wrap/cons.rs | 24 ++++++- blocking/src/wrap/prod.rs | 2 +- src/traits/consumer.rs | 27 ++++--- src/utils.rs | 19 +++-- 8 files changed, 226 insertions(+), 60 deletions(-) diff --git a/async/src/tests.rs b/async/src/tests.rs index 2d099ca..14f9056 100644 --- a/async/src/tests.rs +++ b/async/src/tests.rs @@ -71,18 +71,37 @@ fn push_pop_slice() { async move { let mut prod = prod; let data = (0..COUNT).collect::>(); - prod.push_slice_all(&data).await.unwrap(); + prod.push_exact(&data).await.unwrap(); }, async move { let mut cons = cons; let mut data = [0; COUNT + 1]; - let count = cons.pop_slice_all(&mut data).await.unwrap_err(); + let count = cons.pop_exact(&mut data).await.unwrap_err(); assert_eq!(count, COUNT); assert!(data.into_iter().take(COUNT).eq(0..COUNT)); }, ); } +#[test] +fn push_pop_vec() { + let (prod, cons) = AsyncHeapRb::::new(3).split(); + execute!( + async move { + let mut prod = prod; + let data = (0..COUNT).collect::>(); + prod.push_exact(&data).await.unwrap(); + }, + async move { + let mut cons = cons; + let mut data = Vec::new(); + cons.pop_until_end(&mut data).await; + assert_eq!(data.len(), COUNT); + assert!(data.into_iter().eq(0..COUNT)); + }, + ); +} + #[test] fn sink_stream() { use futures::{ diff --git a/async/src/traits/consumer.rs b/async/src/traits/consumer.rs index 5e2abdf..f3ca8f3 100644 --- a/async/src/traits/consumer.rs +++ b/async/src/traits/consumer.rs @@ -43,7 +43,7 @@ pub trait AsyncConsumer: Consumer { /// Future returns: /// + `Ok` - the whole slice is filled with the items from the buffer. /// + `Err(count)` - the buffer is empty and the corresponding producer was dropped, number of items copied to slice is returned. - fn pop_slice_all<'a: 'b, 'b>(&'a mut self, slice: &'b mut [Self::Item]) -> PopSliceFuture<'a, 'b, Self> + fn pop_exact<'a: 'b, 'b>(&'a mut self, slice: &'b mut [Self::Item]) -> PopSliceFuture<'a, 'b, Self> where Self::Item: Copy, { @@ -54,6 +54,14 @@ pub trait AsyncConsumer: Consumer { } } + #[cfg(feature = "alloc")] + fn pop_until_end<'a: 'b, 'b>(&'a mut self, vec: &'b mut alloc::vec::Vec) -> PopVecFuture<'a, 'b, Self> { + PopVecFuture { + owner: self, + vec: Some(vec), + } + } + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> where Self: Unpin, @@ -177,6 +185,53 @@ where } } +#[cfg(feature = "alloc")] +pub struct PopVecFuture<'a, 'b, A: AsyncConsumer + ?Sized> { + owner: &'a mut A, + vec: Option<&'b mut alloc::vec::Vec>, +} +#[cfg(feature = "alloc")] +impl<'a, 'b, A: AsyncConsumer> Unpin for PopVecFuture<'a, 'b, A> {} +#[cfg(feature = "alloc")] +impl<'a, 'b, A: AsyncConsumer> FusedFuture for PopVecFuture<'a, 'b, A> { + fn is_terminated(&self) -> bool { + self.vec.is_none() + } +} +#[cfg(feature = "alloc")] +impl<'a, 'b, A: AsyncConsumer> Future for PopVecFuture<'a, 'b, A> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut waker_registered = false; + loop { + let closed = self.owner.is_closed(); + let vec = self.vec.take().unwrap(); + + loop { + if vec.len() == vec.capacity() { + vec.reserve(vec.capacity().max(16)); + } + let n = self.owner.pop_slice_uninit(vec.spare_capacity_mut()); + if n == 0 { + break; + } + unsafe { vec.set_len(vec.len() + n) }; + } + + if closed { + break Poll::Ready(()); + } + self.vec.replace(vec); + if waker_registered { + break Poll::Pending; + } + self.owner.register_waker(cx.waker()); + waker_registered = true; + } + } +} + pub struct WaitOccupiedFuture<'a, A: AsyncConsumer + ?Sized> { owner: &'a A, count: usize, diff --git a/async/src/traits/producer.rs b/async/src/traits/producer.rs index dbf17a3..29a2732 100644 --- a/async/src/traits/producer.rs +++ b/async/src/traits/producer.rs @@ -59,7 +59,7 @@ pub trait AsyncProducer: Producer { /// Future returns: /// + `Ok` - all slice contents are copied. /// + `Err(count)` - the corresponding consumer was dropped, number of copied items returned. - fn push_slice_all<'a: 'b, 'b>(&'a mut self, slice: &'b [Self::Item]) -> PushSliceFuture<'a, 'b, Self> + fn push_exact<'a: 'b, 'b>(&'a mut self, slice: &'b [Self::Item]) -> PushSliceFuture<'a, 'b, Self> where Self::Item: Copy, { diff --git a/blocking/src/tests.rs b/blocking/src/tests.rs index abf2176..960f1c3 100644 --- a/blocking/src/tests.rs +++ b/blocking/src/tests.rs @@ -1,6 +1,7 @@ use crate::{traits::*, wrap::WaitError, BlockingHeapRb}; use std::{ io::{Read, Write}, + sync::Arc, thread, time::Duration, vec, @@ -17,6 +18,7 @@ This book fully embraces the potential of Rust to empower its users. It's a frie - Nicholas Matsakis and Aaron Turon "; +const N_REP: usize = 10; const TIMEOUT: Option = Some(Duration::from_millis(1000)); @@ -26,16 +28,19 @@ fn wait() { let rb = BlockingHeapRb::::new(7); let (mut prod, mut cons) = rb.split(); - let smsg = THE_BOOK_FOREWORD; - - let pjh = thread::spawn(move || { - let mut bytes = smsg; - prod.set_timeout(TIMEOUT); - while !bytes.is_empty() { - assert_eq!(prod.wait_vacant(1), Ok(())); - let n = prod.push_slice(bytes); - assert!(n > 0); - bytes = &bytes[n..bytes.len()] + let smsg = Arc::new(THE_BOOK_FOREWORD.repeat(N_REP)); + + let pjh = thread::spawn({ + let smsg = smsg.clone(); + move || { + let mut bytes = smsg.as_slice(); + prod.set_timeout(TIMEOUT); + while !bytes.is_empty() { + assert_eq!(prod.wait_vacant(1), Ok(())); + let n = prod.push_slice(bytes); + assert!(n > 0); + bytes = &bytes[n..bytes.len()] + } } }); @@ -59,7 +64,7 @@ fn wait() { pjh.join().unwrap(); let rmsg = cjh.join().unwrap(); - assert_eq!(smsg, rmsg); + assert_eq!(*smsg, rmsg); } #[test] @@ -68,25 +73,65 @@ fn slice_all() { let rb = BlockingHeapRb::::new(7); let (mut prod, mut cons) = rb.split(); - let smsg = THE_BOOK_FOREWORD; + let smsg = Arc::new(THE_BOOK_FOREWORD.repeat(N_REP)); - let pjh = thread::spawn(move || { - let bytes = smsg; - prod.set_timeout(TIMEOUT); - assert_eq!(prod.push_all_slice(bytes), bytes.len()); + let pjh = thread::spawn({ + let smsg = smsg.clone(); + move || { + let bytes = smsg; + prod.set_timeout(TIMEOUT); + assert_eq!(prod.push_exact(&bytes), bytes.len()); + } }); - let cjh = thread::spawn(move || { - let mut bytes = vec![0u8; smsg.len()]; - cons.set_timeout(TIMEOUT); - assert_eq!(cons.pop_all_slice(&mut bytes), bytes.len()); - bytes + let cjh = thread::spawn({ + let smsg = smsg.clone(); + move || { + let mut bytes = vec![0u8; smsg.len()]; + cons.set_timeout(TIMEOUT); + assert_eq!(cons.pop_exact(&mut bytes), bytes.len()); + bytes + } }); pjh.join().unwrap(); let rmsg = cjh.join().unwrap(); - assert_eq!(smsg, rmsg); + assert_eq!(*smsg, rmsg); +} + +#[test] +#[cfg_attr(miri, ignore)] +fn vec_all() { + let rb = BlockingHeapRb::::new(7); + let (mut prod, mut cons) = rb.split(); + + let smsg = Arc::new(THE_BOOK_FOREWORD.repeat(N_REP)); + + let pjh = thread::spawn({ + let smsg = smsg.clone(); + move || { + let bytes = smsg; + prod.set_timeout(TIMEOUT); + assert_eq!(prod.push_exact(&bytes), bytes.len()); + } + }); + + let cjh = thread::spawn({ + let smsg = smsg.clone(); + move || { + let mut bytes = Vec::new(); + cons.set_timeout(TIMEOUT); + cons.pop_until_end(&mut bytes); + assert_eq!(bytes.len(), smsg.len()); + bytes + } + }); + + pjh.join().unwrap(); + let rmsg = cjh.join().unwrap(); + + assert_eq!(*smsg, rmsg); } #[test] @@ -95,12 +140,15 @@ fn iter_all() { let rb = BlockingHeapRb::::new(7); let (mut prod, mut cons) = rb.split(); - let smsg = THE_BOOK_FOREWORD; + let smsg = Arc::new(THE_BOOK_FOREWORD.repeat(N_REP)); - let pjh = thread::spawn(move || { - prod.set_timeout(TIMEOUT); - let bytes = smsg; - assert_eq!(prod.push_all_iter(bytes.iter().copied()), bytes.len()); + let pjh = thread::spawn({ + let smsg = smsg.clone(); + move || { + prod.set_timeout(TIMEOUT); + let bytes = smsg; + assert_eq!(prod.push_all_iter(bytes.iter().copied()), bytes.len()); + } }); let cjh = thread::spawn(move || { @@ -111,7 +159,7 @@ fn iter_all() { pjh.join().unwrap(); let rmsg = cjh.join().unwrap(); - assert_eq!(smsg, rmsg); + assert_eq!(*smsg, rmsg); } #[test] @@ -120,23 +168,29 @@ fn write_read() { let rb = BlockingHeapRb::::new(7); let (mut prod, mut cons) = rb.split(); - let smsg = THE_BOOK_FOREWORD; + let smsg = Arc::new(THE_BOOK_FOREWORD.repeat(N_REP)); - let pjh = thread::spawn(move || { - prod.set_timeout(TIMEOUT); - let bytes = smsg; - prod.write_all(bytes).unwrap(); + let pjh = thread::spawn({ + let smsg = smsg.clone(); + move || { + prod.set_timeout(TIMEOUT); + let bytes = smsg; + prod.write_all(&bytes).unwrap(); + } }); - let cjh = thread::spawn(move || { - cons.set_timeout(TIMEOUT); - let mut bytes = Vec::new(); - assert_eq!(cons.read_to_end(&mut bytes).unwrap(), smsg.len()); - bytes + let cjh = thread::spawn({ + let smsg = smsg.clone(); + move || { + cons.set_timeout(TIMEOUT); + let mut bytes = Vec::new(); + assert_eq!(cons.read_to_end(&mut bytes).unwrap(), smsg.len()); + bytes + } }); pjh.join().unwrap(); let rmsg = cjh.join().unwrap(); - assert_eq!(smsg, rmsg); + assert_eq!(*smsg, rmsg); } diff --git a/blocking/src/wrap/cons.rs b/blocking/src/wrap/cons.rs index 42eb25e..d559b92 100644 --- a/blocking/src/wrap/cons.rs +++ b/blocking/src/wrap/cons.rs @@ -67,7 +67,7 @@ impl BlockingCons where ::Item: Copy, { - pub fn pop_all_slice(&mut self, mut slice: &mut [::Item]) -> usize { + pub fn pop_exact(&mut self, mut slice: &mut [::Item]) -> usize { if slice.is_empty() { return 0; } @@ -83,6 +83,28 @@ where } count } + + #[cfg(feature = "alloc")] + pub fn pop_until_end(&mut self, vec: &mut alloc::vec::Vec<::Item>) { + if self.is_closed() && self.is_empty() { + return; + } + for _ in wait_iter!(self) { + loop { + if vec.len() == vec.capacity() { + vec.reserve(vec.capacity().max(16)); + } + let n = self.base.pop_slice_uninit(vec.spare_capacity_mut()); + if n == 0 { + break; + } + unsafe { vec.set_len(vec.len() + n) }; + } + if self.is_closed() && self.is_empty() { + break; + } + } + } } #[cfg(feature = "std")] diff --git a/blocking/src/wrap/prod.rs b/blocking/src/wrap/prod.rs index c54ab78..7fec66d 100644 --- a/blocking/src/wrap/prod.rs +++ b/blocking/src/wrap/prod.rs @@ -83,7 +83,7 @@ impl BlockingProd where ::Item: Copy, { - pub fn push_all_slice(&mut self, mut slice: &[::Item]) -> usize { + pub fn push_exact(&mut self, mut slice: &[::Item]) -> usize { if slice.is_empty() { return 0; } diff --git a/src/traits/consumer.rs b/src/traits/consumer.rs index 7e0be92..89ed088 100644 --- a/src/traits/consumer.rs +++ b/src/traits/consumer.rs @@ -2,7 +2,7 @@ use super::{ observer::{DelegateObserver, Observer}, utils::modulus, }; -use crate::utils::{slice_assume_init_mut, slice_assume_init_ref, write_uninit_slice}; +use crate::utils::{move_uninit_slice, slice_as_uninit_mut, slice_assume_init_mut, slice_assume_init_ref}; use core::{iter::Chain, marker::PhantomData, mem::MaybeUninit, ptr, slice}; #[cfg(feature = "std")] use std::io::{self, Write}; @@ -121,26 +121,23 @@ pub trait Consumer: Observer { } } - /// Removes items from the ring buffer and writes them into a slice. + /// Removes items from the ring buffer and writes them into an uninit slice. /// /// Returns count of items been removed. - fn pop_slice(&mut self, elems: &mut [Self::Item]) -> usize - where - Self::Item: Copy, - { + fn pop_slice_uninit(&mut self, elems: &mut [MaybeUninit]) -> usize { let (left, right) = self.occupied_slices(); let count = if elems.len() < left.len() { - unsafe { write_uninit_slice(elems, left.get_unchecked(..elems.len())) }; + move_uninit_slice(elems, unsafe { left.get_unchecked(..elems.len()) }); elems.len() } else { let (left_elems, elems) = elems.split_at_mut(left.len()); - unsafe { write_uninit_slice(left_elems, left) }; + move_uninit_slice(left_elems, left); left.len() + if elems.len() < right.len() { - unsafe { write_uninit_slice(elems, right.get_unchecked(..elems.len())) }; + move_uninit_slice(elems, unsafe { right.get_unchecked(..elems.len()) }); elems.len() } else { - unsafe { write_uninit_slice(elems.get_unchecked_mut(..right.len()), right) }; + move_uninit_slice(unsafe { elems.get_unchecked_mut(..right.len()) }, right); right.len() } }; @@ -148,6 +145,16 @@ pub trait Consumer: Observer { count } + /// Removes items from the ring buffer and writes them into a slice. + /// + /// Returns count of items been removed. + fn pop_slice(&mut self, elems: &mut [Self::Item]) -> usize + where + Self::Item: Copy, + { + self.pop_slice_uninit(unsafe { slice_as_uninit_mut(elems) }) + } + /// Returns an iterator that removes items one by one from the ring buffer. fn pop_iter(&mut self) -> PopIter<&mut Self, Self> where diff --git a/src/utils.rs b/src/utils.rs index 68645fd..e70e05b 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,17 +1,24 @@ #[cfg(feature = "alloc")] use alloc::{boxed::Box, vec::Vec}; -use core::mem::{self, MaybeUninit}; +use core::{ + mem::{self, MaybeUninit}, + ptr, +}; // TODO: Remove on `maybe_uninit_uninit_array` stabilization. pub fn uninit_array() -> [MaybeUninit; N] { unsafe { MaybeUninit::<[MaybeUninit; N]>::uninit().assume_init() } } +// TODO: Remove on `maybe_uninit_slice` stabilization. +pub unsafe fn slice_as_uninit_mut(slice: &mut [T]) -> &mut [MaybeUninit] { + &mut *(slice as *mut [T] as *mut [MaybeUninit]) +} + // TODO: Remove on `maybe_uninit_slice` stabilization. pub unsafe fn slice_assume_init_ref(slice: &[MaybeUninit]) -> &[T] { &*(slice as *const [MaybeUninit] as *const [T]) } - // TODO: Remove on `maybe_uninit_slice` stabilization. pub unsafe fn slice_assume_init_mut(slice: &mut [MaybeUninit]) -> &mut [T] { &mut *(slice as *mut [MaybeUninit] as *mut [T]) @@ -24,9 +31,11 @@ pub fn write_slice<'a, T: Copy>(dst: &'a mut [MaybeUninit], src: &[T]) -> &'a unsafe { slice_assume_init_mut(dst) } } -pub unsafe fn write_uninit_slice<'a, T: Copy>(dst: &'a mut [T], src: &[MaybeUninit]) -> &'a mut [T] { - dst.copy_from_slice(slice_assume_init_ref(src)); - dst +pub fn move_uninit_slice(dst: &mut [MaybeUninit], src: &[MaybeUninit]) { + assert_eq!(dst.len(), src.len()); + for i in 0..dst.len() { + unsafe { *dst.get_unchecked_mut(i) = ptr::read(src.get_unchecked(i) as *const _) }; + } } pub fn array_to_uninit(value: [T; N]) -> [MaybeUninit; N] {