From 63e5fb3c265a4ac8d5ad14f3ba401dd196cac422 Mon Sep 17 00:00:00 2001 From: Gabriel Grubba <70247653+Grubba27@users.noreply.github.com> Date: Wed, 2 Feb 2022 18:51:23 -0300 Subject: [PATCH 01/17] (WIP): Draft of PR --- tokio-test/tests/io.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tokio-test/tests/io.rs b/tokio-test/tests/io.rs index f164abaf1ba..c3412259f5f 100644 --- a/tokio-test/tests/io.rs +++ b/tokio-test/tests/io.rs @@ -84,3 +84,20 @@ async fn mock_panics_write_data_left() { use tokio_test::io::Builder; Builder::new().write(b"write").build(); } + +#[tokio::test] +async fn stream_read(){ + + let mut mock = Builder::new().read(b"hello ").read(b"world!").build(); + // let res = mock.poll_next(); or something like this ? + assert_eq!(res, b"hello "); +} + +#[tokio::test] +async fn stream_write(){ + let unit: u8 = 1; + let mut mock = Builder::new().write(b"hello ").write(b"world!").build(); + // let mut task = get task ? + // mock.poll_write(task, unit ).assert(b"hello " or something like this ? + // +} \ No newline at end of file From 6864246ca1e9ae336b667dad3f9936665690f4af Mon Sep 17 00:00:00 2001 From: Gabriel Grubba <70247653+Grubba27@users.noreply.github.com> Date: Wed, 2 Feb 2022 18:59:55 -0300 Subject: [PATCH 02/17] Added stream builder --- tokio-test/src/io.rs | 80 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/tokio-test/src/io.rs b/tokio-test/src/io.rs index 4ec66a45d1a..9bcab139436 100644 --- a/tokio-test/src/io.rs +++ b/tokio-test/src/io.rs @@ -53,6 +53,11 @@ pub struct Builder { // Sequence of actions for the Mock to take actions: VecDeque, } +#[derive(Debug, Clone, Default)] +pub struct StreamBuilder { + // Sequence of actions for the Mock to take + actions: VecDeque, +} #[derive(Debug, Clone)] enum Action { @@ -143,6 +148,81 @@ impl Builder { } } + +impl StreamBuilder { + /// Return a new, empty `Builder. + pub fn new() -> Self { + Self::default() + } + + /// call next value ? + pub fn poll_next(&mut self, buf: &[u8]) { + /// TODO + } + + /// Sequence a `read` operation. + /// + /// The next operation in the mock's script will be to expect a `read` call + /// and return `buf`. + pub fn read(&mut self, buf: &[u8]) -> &mut Self { + self.actions.push_back(Action::Read(buf.into())); + self + } + + /// Sequence a `read` operation that produces an error. + /// + /// The next operation in the mock's script will be to expect a `read` call + /// and return `error`. + pub fn read_error(&mut self, error: io::Error) -> &mut Self { + let error = Some(error.into()); + self.actions.push_back(Action::ReadError(error)); + self + } + + /// Sequence a `write` operation. + /// + /// The next operation in the mock's script will be to expect a `write` + /// call. + pub fn write(&mut self, buf: &[u8]) -> &mut Self { + self.actions.push_back(Action::Write(buf.into())); + self + } + + /// Sequence a `write` operation that produces an error. + /// + /// The next operation in the mock's script will be to expect a `write` + /// call that provides `error`. + pub fn write_error(&mut self, error: io::Error) -> &mut Self { + let error = Some(error.into()); + self.actions.push_back(Action::WriteError(error)); + self + } + + /// Sequence a wait. + /// + /// The next operation in the mock's script will be to wait without doing so + /// for `duration` amount of time. + pub fn wait(&mut self, duration: Duration) -> &mut Self { + let duration = cmp::max(duration, Duration::from_millis(1)); + self.actions.push_back(Action::Wait(duration)); + self + } + + /// Build a `Mock` value according to the defined script. + pub fn build(&mut self) -> Mock { + let (mock, _) = self.build_with_handle(); + mock + } + + /// Build a `Mock` value paired with a handle + pub fn build_with_handle(&mut self) -> (Mock, Handle) { + let (inner, handle) = Inner::new(self.actions.clone()); + + let mock = Mock { inner }; + + (mock, handle) + } +} impl Handle { /// Sequence a `read` operation. /// From 13d740d01fef71c6bf1af224861872f59c55ed5d Mon Sep 17 00:00:00 2001 From: Gabriel Grubba <70247653+Grubba27@users.noreply.github.com> Date: Mon, 7 Feb 2022 22:33:40 -0300 Subject: [PATCH 03/17] started adjusting and addressing comments --- tokio-test/src/io.rs | 75 -------- tokio-test/src/io_stream.rs | 374 ++++++++++++++++++++++++++++++++++++ tokio-test/src/lib.rs | 1 + tokio-test/tests/io.rs | 5 +- 4 files changed, 378 insertions(+), 77 deletions(-) create mode 100644 tokio-test/src/io_stream.rs diff --git a/tokio-test/src/io.rs b/tokio-test/src/io.rs index 9bcab139436..1ffdc18a4c2 100644 --- a/tokio-test/src/io.rs +++ b/tokio-test/src/io.rs @@ -148,81 +148,6 @@ impl Builder { } } - -impl StreamBuilder { - /// Return a new, empty `Builder. - pub fn new() -> Self { - Self::default() - } - - /// call next value ? - pub fn poll_next(&mut self, buf: &[u8]) { - /// TODO - } - - /// Sequence a `read` operation. - /// - /// The next operation in the mock's script will be to expect a `read` call - /// and return `buf`. - pub fn read(&mut self, buf: &[u8]) -> &mut Self { - self.actions.push_back(Action::Read(buf.into())); - self - } - - /// Sequence a `read` operation that produces an error. - /// - /// The next operation in the mock's script will be to expect a `read` call - /// and return `error`. - pub fn read_error(&mut self, error: io::Error) -> &mut Self { - let error = Some(error.into()); - self.actions.push_back(Action::ReadError(error)); - self - } - - /// Sequence a `write` operation. - /// - /// The next operation in the mock's script will be to expect a `write` - /// call. - pub fn write(&mut self, buf: &[u8]) -> &mut Self { - self.actions.push_back(Action::Write(buf.into())); - self - } - - /// Sequence a `write` operation that produces an error. - /// - /// The next operation in the mock's script will be to expect a `write` - /// call that provides `error`. - pub fn write_error(&mut self, error: io::Error) -> &mut Self { - let error = Some(error.into()); - self.actions.push_back(Action::WriteError(error)); - self - } - - /// Sequence a wait. - /// - /// The next operation in the mock's script will be to wait without doing so - /// for `duration` amount of time. - pub fn wait(&mut self, duration: Duration) -> &mut Self { - let duration = cmp::max(duration, Duration::from_millis(1)); - self.actions.push_back(Action::Wait(duration)); - self - } - - /// Build a `Mock` value according to the defined script. - pub fn build(&mut self) -> Mock { - let (mock, _) = self.build_with_handle(); - mock - } - - /// Build a `Mock` value paired with a handle - pub fn build_with_handle(&mut self) -> (Mock, Handle) { - let (inner, handle) = Inner::new(self.actions.clone()); - - let mock = Mock { inner }; - - (mock, handle) - } -} impl Handle { /// Sequence a `read` operation. /// diff --git a/tokio-test/src/io_stream.rs b/tokio-test/src/io_stream.rs new file mode 100644 index 00000000000..58cfa7f08a8 --- /dev/null +++ b/tokio-test/src/io_stream.rs @@ -0,0 +1,374 @@ +#![cfg(not(loom))] + +//! A mock type implementing [`AsyncRead`] and [`AsyncWrite`]. +//! +//! +//! # Overview +//! +//! Provides a type that implements [`AsyncRead`] + [`AsyncWrite`] that can be configured +//! to handle an arbitrary sequence of read and write operations. This is useful +//! for writing unit tests for networking services as using an actual network +//! type is fairly non deterministic. +//! +//! # Usage +//! +//! Attempting to write data that the mock isn't expecting will result in a +//! panic. +//! +//! [`AsyncRead`]: tokio::io::AsyncRead +//! [`AsyncWrite`]: tokio::io::AsyncWrite + +use tokio::io::{ReadBuf}; +use tokio::sync::mpsc; +use tokio::time::{self, Duration, Instant, Sleep}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use futures_core::{ready, Stream}; +use std::collections::VecDeque; +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{self, Poll, Waker}; +use std::{cmp, io}; + +/// An I/O object that follows a predefined script. +/// +/// This value is created by `Builder` and implements `AsyncRead` + `AsyncWrite`. It +/// follows the scenario described by the builder and panics otherwise. +#[derive(Debug)] +pub struct Mock { + inner: Inner, +} + +/// A handle to send additional actions to the related `Mock`. +#[derive(Debug)] +pub struct Handle { + tx: mpsc::UnboundedSender, +} + +/// Builds `Mock` instances. +#[derive(Debug, Clone, Default)] +pub struct Builder { + // Sequence of actions for the Mock to take + actions: VecDeque, +} +#[derive(Debug, Clone, Default)] +pub struct StreamBuilder { + // Sequence of actions for the Mock to take + actions: VecDeque, +} + +#[derive(Debug, Clone)] +enum Action { + Read(Vec), + Write(Vec), + Wait(Duration), + // Wrapped in Arc so that Builder can be cloned and Send. + // Mock is not cloned as does not need to check Rc for ref counts. + ReadError(Option>), + WriteError(Option>), +} + +struct Inner { + actions: VecDeque, + waiting: Option, + sleep: Option>>, + read_wait: Option, + rx: UnboundedReceiverStream, +} + +impl StreamBuilder { + /// Return a new, empty `Builder. + pub fn new() -> Self { + Self::default() + } + + /// Sequence a `read` operation. + /// + /// The next operation in the mock's script will be to expect a `read` call + /// and return `buf`. + pub fn read(&mut self, buf: &[u8]) -> &mut Self { + self.actions.push_back(Action::Read(buf.into())); + self + } + + /// Sequence a `read` operation that produces an error. + /// + /// The next operation in the mock's script will be to expect a `read` call + /// and return `error`. + pub fn read_error(&mut self, error: io::Error) -> &mut Self { + let error = Some(error.into()); + self.actions.push_back(Action::ReadError(error)); + self + } + + /// Sequence a `write` operation. + /// + /// The next operation in the mock's script will be to expect a `write` + /// call. + pub fn write(&mut self, buf: &[u8]) -> &mut Self { + self.actions.push_back(Action::Write(buf.into())); + self + } + + /// Sequence a `write` operation that produces an error. + /// + /// The next operation in the mock's script will be to expect a `write` + /// call that provides `error`. + pub fn write_error(&mut self, error: io::Error) -> &mut Self { + let error = Some(error.into()); + self.actions.push_back(Action::WriteError(error)); + self + } + + /// Sequence a wait. + /// + /// The next operation in the mock's script will be to wait without doing so + /// for `duration` amount of time. + pub fn wait(&mut self, duration: Duration) -> &mut Self { + let duration = cmp::max(duration, Duration::from_millis(1)); + self.actions.push_back(Action::Wait(duration)); + self + } + + /// Build a `Mock` value according to the defined script. + pub fn build(&mut self) -> Mock { + let (mock, _) = self.build_with_handle(); + mock + } + + /// Build a `Mock` value paired with a handle + pub fn build_with_handle(&mut self) -> (Mock, Handle) { + let (inner, handle) = Inner::new(self.actions.clone()); + + let mock = Mock { inner }; + + (mock, handle) + } +} + + + +impl Handle { + /// Sequence a `read` operation. + /// + /// The next operation in the mock's script will be to expect a `read` call + /// and return `buf`. + pub fn read(&mut self, buf: &[u8]) -> &mut Self { + self.tx.send(Action::Read(buf.into())).unwrap(); + self + } + + /// Sequence a `read` operation error. + /// + /// The next operation in the mock's script will be to expect a `read` call + /// and return `error`. + pub fn read_error(&mut self, error: io::Error) -> &mut Self { + let error = Some(error.into()); + self.tx.send(Action::ReadError(error)).unwrap(); + self + } + + /// Sequence a `write` operation. + /// + /// The next operation in the mock's script will be to expect a `write` + /// call. + pub fn write(&mut self, buf: &[u8]) -> &mut Self { + self.tx.send(Action::Write(buf.into())).unwrap(); + self + } + + /// Sequence a `write` operation error. + /// + /// The next operation in the mock's script will be to expect a `write` + /// call error. + pub fn write_error(&mut self, error: io::Error) -> &mut Self { + let error = Some(error.into()); + self.tx.send(Action::WriteError(error)).unwrap(); + self + } +} + +impl Inner { + fn new(actions: VecDeque) -> (Inner, Handle) { + let (tx, rx) = mpsc::unbounded_channel(); + + let rx = UnboundedReceiverStream::new(rx); + + let inner = Inner { + actions, + sleep: None, + read_wait: None, + rx, + waiting: None, + }; + + let handle = Handle { tx }; + + (inner, handle) + } + + fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll> { + Pin::new(&mut self.rx).poll_next(cx) + } + + fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> { + match self.action() { + Some(&mut Action::Read(ref mut data)) => { + // Figure out how much to copy + let n = cmp::min(dst.remaining(), data.len()); + + // Copy the data into the `dst` slice + dst.put_slice(&data[..n]); + + // Drain the data from the source + data.drain(..n); + + Ok(()) + } + Some(&mut Action::ReadError(ref mut err)) => { + // As the + let err = err.take().expect("Should have been removed from actions."); + let err = Arc::try_unwrap(err).expect("There are no other references."); + Err(err) + } + Some(_) => { + // Either waiting or expecting a write + Err(io::ErrorKind::WouldBlock.into()) + } + None => Ok(()), + } + } + + fn write(&mut self, mut src: &[u8]) -> io::Result { + let mut ret = 0; + + if self.actions.is_empty() { + return Err(io::ErrorKind::BrokenPipe.into()); + } + + if let Some(&mut Action::Wait(..)) = self.action() { + return Err(io::ErrorKind::WouldBlock.into()); + } + + if let Some(&mut Action::WriteError(ref mut err)) = self.action() { + let err = err.take().expect("Should have been removed from actions."); + let err = Arc::try_unwrap(err).expect("There are no other references."); + return Err(err); + } + + for i in 0..self.actions.len() { + match self.actions[i] { + Action::Write(ref mut expect) => { + let n = cmp::min(src.len(), expect.len()); + + assert_eq!(&src[..n], &expect[..n]); + + // Drop data that was matched + expect.drain(..n); + src = &src[n..]; + + ret += n; + + if src.is_empty() { + return Ok(ret); + } + } + Action::Wait(..) | Action::WriteError(..) => { + break; + } + _ => {} + } + + } + + Ok(ret) + } + + fn remaining_wait(&mut self) -> Option { + match self.action() { + Some(&mut Action::Wait(dur)) => Some(dur), + _ => None, + } + } + + fn action(&mut self) -> Option<&mut Action> { + loop { + if self.actions.is_empty() { + return None; + } + + match self.actions[0] { + Action::Read(ref mut data) => { + if !data.is_empty() { + break; + } + } + Action::Write(ref mut data) => { + if !data.is_empty() { + break; + } + } + Action::Wait(ref mut dur) => { + if let Some(until) = self.waiting { + let now = Instant::now(); + + if now < until { + break; + } + } else { + self.waiting = Some(Instant::now() + *dur); + break; + } + } + Action::ReadError(ref mut error) | Action::WriteError(ref mut error) => { + if error.is_some() { + break; + } + } + } + + let _action = self.actions.pop_front(); + } + + self.actions.front_mut() + } +} + +// ===== impl Inner ===== + +impl Mock { + fn maybe_wakeup_reader(&mut self) { + match self.inner.action() { + Some(&mut Action::Read(_)) | Some(&mut Action::ReadError(_)) | None => { + if let Some(waker) = self.inner.read_wait.take() { + waker.wake(); + } + } + _ => {} + } + } +} + +/// Ensures that Mock isn't dropped with data "inside". +impl Drop for Mock { + fn drop(&mut self) { + // Avoid double panicking, since makes debugging much harder. + if std::thread::panicking() { + return; + } + + self.inner.actions.iter().for_each(|a| match a { + Action::Read(data) => assert!(data.is_empty(), "There is still data left to read."), + Action::Write(data) => assert!(data.is_empty(), "There is still data left to write."), + _ => (), + }) + } +} + +impl fmt::Debug for Inner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Inner {{...}}") + } +} diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs index de3f0864a94..6df57d9ac77 100644 --- a/tokio-test/src/lib.rs +++ b/tokio-test/src/lib.rs @@ -12,6 +12,7 @@ //! Tokio and Futures based testing utilities pub mod io; +pub mod io_stream; mod macros; pub mod task; diff --git a/tokio-test/tests/io.rs b/tokio-test/tests/io.rs index c3412259f5f..344b70fe768 100644 --- a/tokio-test/tests/io.rs +++ b/tokio-test/tests/io.rs @@ -3,6 +3,7 @@ use std::io; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio_test::io::Builder; +use tokio_test::io_stream::StreamBuilder; #[tokio::test] async fn read() { @@ -88,7 +89,7 @@ async fn mock_panics_write_data_left() { #[tokio::test] async fn stream_read(){ - let mut mock = Builder::new().read(b"hello ").read(b"world!").build(); + let mut mock = StreamBuilder::new().read(b"hello ").read(b"world!").build(); // let res = mock.poll_next(); or something like this ? assert_eq!(res, b"hello "); } @@ -96,7 +97,7 @@ async fn stream_read(){ #[tokio::test] async fn stream_write(){ let unit: u8 = 1; - let mut mock = Builder::new().write(b"hello ").write(b"world!").build(); + let mut mock = StreamBuilder::new().write(b"hello ").write(b"world!").build(); // let mut task = get task ? // mock.poll_write(task, unit ).assert(b"hello " or something like this ? // From fa77e9f306c166f32ae7f48cf695cba877df72d7 Mon Sep 17 00:00:00 2001 From: Gabriel Grubba <70247653+Grubba27@users.noreply.github.com> Date: Mon, 7 Feb 2022 22:38:26 -0300 Subject: [PATCH 04/17] adjusted rust rust lint --- tokio-test/src/io_stream.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tokio-test/src/io_stream.rs b/tokio-test/src/io_stream.rs index 58cfa7f08a8..78668b41e66 100644 --- a/tokio-test/src/io_stream.rs +++ b/tokio-test/src/io_stream.rs @@ -20,13 +20,12 @@ use tokio::io::{ReadBuf}; use tokio::sync::mpsc; -use tokio::time::{self, Duration, Instant, Sleep}; +use tokio::time::{Duration, Instant, Sleep}; use tokio_stream::wrappers::UnboundedReceiverStream; use futures_core::{ready, Stream}; use std::collections::VecDeque; use std::fmt; -use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::{self, Poll, Waker}; From e5b9c8ab793dbf6046f5d94f6a62ec6ea72ff6bc Mon Sep 17 00:00:00 2001 From: Gabriel Grubba <70247653+Grubba27@users.noreply.github.com> Date: Mon, 7 Feb 2022 22:43:32 -0300 Subject: [PATCH 05/17] run cargo clippy --- tokio-test/src/io_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-test/src/io_stream.rs b/tokio-test/src/io_stream.rs index 78668b41e66..241bdb4f1a4 100644 --- a/tokio-test/src/io_stream.rs +++ b/tokio-test/src/io_stream.rs @@ -23,7 +23,7 @@ use tokio::sync::mpsc; use tokio::time::{Duration, Instant, Sleep}; use tokio_stream::wrappers::UnboundedReceiverStream; -use futures_core::{ready, Stream}; +use futures_core::{Stream}; use std::collections::VecDeque; use std::fmt; use std::pin::Pin; From 553abd0e7a4eaa874e64c9a56fc68d09e165a66d Mon Sep 17 00:00:00 2001 From: Gabriel Grubba <70247653+Grubba27@users.noreply.github.com> Date: Tue, 8 Feb 2022 19:55:52 -0300 Subject: [PATCH 06/17] started addressing comments --- .idea/modules.xml | 8 ++ .idea/vcs.xml | 6 ++ .idea/workspace.xml | 165 ++++++++++++++++++++++++++++++++++++ tokio-test/src/io_stream.rs | 94 +++----------------- tokio.iml | 27 ++++++ 5 files changed, 219 insertions(+), 81 deletions(-) create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 .idea/workspace.xml create mode 100644 tokio.iml diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 00000000000..58056f1ea06 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 00000000000..94a25f7f4cb --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml new file mode 100644 index 00000000000..a580550c6d3 --- /dev/null +++ b/.idea/workspace.xml @@ -0,0 +1,165 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 1644282037649 + + + + + + + + + + + - - \ No newline at end of file From e7ce82e37c2909f0952ad832561526d1f7620e6f Mon Sep 17 00:00:00 2001 From: Gabriel Grubba <70247653+Grubba27@users.noreply.github.com> Date: Tue, 8 Feb 2022 20:05:25 -0300 Subject: [PATCH 09/17] removed ./idea files --- tokio.iml | 27 --------------------------- 1 file changed, 27 deletions(-) delete mode 100644 tokio.iml diff --git a/tokio.iml b/tokio.iml deleted file mode 100644 index 22d7ad5cdd9..00000000000 --- a/tokio.iml +++ /dev/null @@ -1,27 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file From 7d8eb26179bb397d3b526c43c33b25962d6ab693 Mon Sep 17 00:00:00 2001 From: Gabriel Grubba <70247653+Grubba27@users.noreply.github.com> Date: Tue, 8 Feb 2022 20:07:48 -0300 Subject: [PATCH 10/17] removed StreamBuilder struct from io --- tokio-test/src/io.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tokio-test/src/io.rs b/tokio-test/src/io.rs index 1ffdc18a4c2..4ec66a45d1a 100644 --- a/tokio-test/src/io.rs +++ b/tokio-test/src/io.rs @@ -53,11 +53,6 @@ pub struct Builder { // Sequence of actions for the Mock to take actions: VecDeque, } -#[derive(Debug, Clone, Default)] -pub struct StreamBuilder { - // Sequence of actions for the Mock to take - actions: VecDeque, -} #[derive(Debug, Clone)] enum Action { From 29aee9d1a911fd1e542019a0d82b5f1b9b620435 Mon Sep 17 00:00:00 2001 From: Gabriel Grubba <70247653+Grubba27@users.noreply.github.com> Date: Wed, 9 Feb 2022 19:57:46 -0300 Subject: [PATCH 11/17] addresed comments --- tokio-test/src/io_stream.rs | 46 +++++++++++++++++++++++++++++-------- tokio-test/tests/io.rs | 10 +++++--- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/tokio-test/src/io_stream.rs b/tokio-test/src/io_stream.rs index 28171f21e4c..caae899f537 100644 --- a/tokio-test/src/io_stream.rs +++ b/tokio-test/src/io_stream.rs @@ -1,14 +1,11 @@ #![cfg(not(loom))] -//! A mock type implementing [`AsyncRead`] and [`AsyncWrite`]. +//! A mock type implementing [`poll_next`]. //! //! //! # Overview //! -//! Provides a type that implements [`AsyncRead`] + [`AsyncWrite`] that can be configured -//! to handle an arbitrary sequence of read and write operations. This is useful -//! for writing unit tests for networking services as using an actual network -//! type is fairly non deterministic. +//! TODO //! //! # Usage //! @@ -22,7 +19,8 @@ use tokio::sync::mpsc; use tokio::time::{Duration, Instant, Sleep}; use tokio_stream::wrappers::UnboundedReceiverStream; -use futures_core::{Stream}; +use futures_core::{ready, Stream}; +use std::future::Future; use std::collections::VecDeque; use std::fmt; use std::pin::Pin; @@ -57,6 +55,7 @@ enum Action { Read(Vec), Write(Vec), Wait(Duration), + Next(String), // Wrapped in Arc so that Builder can be cloned and Send. // Mock is not cloned as does not need to check Rc for ref counts. ReadError(Option>), @@ -124,6 +123,12 @@ impl StreamBuilder { self.actions.push_back(Action::Wait(duration)); self } + /// calls next value within stream. + /// + pub fn next(&mut self) -> &mut Self { + self.actions.pop_front(); + self + } /// Build a `Mock` value according to the defined script. pub fn build(&mut self) -> Mock { @@ -230,6 +235,11 @@ impl Inner { break; } } + Action::Next(ref mut data) => { + if !data.is_empty() { + break; + } + } Action::Wait(ref mut dur) => { if let Some(until) = self.waiting { let now = Instant::now(); @@ -274,10 +284,26 @@ impl Mock { impl Stream for Mock { type Item = String; - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - // TODO - // Pop item from array who could i iterate self? or do a while loop like in AsyncRead /AsyncWrite ? - Poll::Pending + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + + loop { + if let Some(ref mut sleep) = self.inner.sleep { + ready!(Pin::new(sleep).poll(_cx)); + } + + // If a sleep is set, it has already fired + self.inner.sleep = None; + match ready!(self.inner.poll_action(_cx)) { + None => { + return Poll::Pending; + } + Some(action) => { + self.inner.actions.push_back(action); + continue; + } + } + } + } } diff --git a/tokio-test/tests/io.rs b/tokio-test/tests/io.rs index 344b70fe768..a68b9de0dbf 100644 --- a/tokio-test/tests/io.rs +++ b/tokio-test/tests/io.rs @@ -1,7 +1,12 @@ #![warn(rust_2018_idioms)] use std::io; +use std::pin::Pin; +use std::task::{Context, Waker, Poll}; +use futures_core::Stream; +use futures_util::StreamExt; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::sync::futures; use tokio_test::io::Builder; use tokio_test::io_stream::StreamBuilder; @@ -88,10 +93,9 @@ async fn mock_panics_write_data_left() { #[tokio::test] async fn stream_read(){ - let mut mock = StreamBuilder::new().read(b"hello ").read(b"world!").build(); - // let res = mock.poll_next(); or something like this ? - assert_eq!(res, b"hello "); + let res = mock.next().await.expect("hello "); + assert_eq!(res, "hello "); } #[tokio::test] From 0631ec5b6078b1d564618df47c4ced61f69ff31f Mon Sep 17 00:00:00 2001 From: Gabriel Grubba <70247653+Grubba27@users.noreply.github.com> Date: Wed, 9 Feb 2022 20:01:47 -0300 Subject: [PATCH 12/17] ran rust linter --- tokio-test/tests/io.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tokio-test/tests/io.rs b/tokio-test/tests/io.rs index a68b9de0dbf..c4f079db461 100644 --- a/tokio-test/tests/io.rs +++ b/tokio-test/tests/io.rs @@ -1,12 +1,12 @@ #![warn(rust_2018_idioms)] use std::io; -use std::pin::Pin; -use std::task::{Context, Waker, Poll}; -use futures_core::Stream; + + + use futures_util::StreamExt; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::sync::futures; + use tokio_test::io::Builder; use tokio_test::io_stream::StreamBuilder; @@ -100,8 +100,8 @@ async fn stream_read(){ #[tokio::test] async fn stream_write(){ - let unit: u8 = 1; - let mut mock = StreamBuilder::new().write(b"hello ").write(b"world!").build(); + let _unit: u8 = 1; + let _mock = StreamBuilder::new().write(b"hello ").write(b"world!").build(); // let mut task = get task ? // mock.poll_write(task, unit ).assert(b"hello " or something like this ? // From f9e78fdee2af3b99bfcf5e8b4f3ea393607469ef Mon Sep 17 00:00:00 2001 From: Gabriel Grubba <70247653+Grubba27@users.noreply.github.com> Date: Wed, 9 Feb 2022 20:10:47 -0300 Subject: [PATCH 13/17] adjusted stream read and next --- tokio-test/src/io_stream.rs | 74 +++++-------------------------------- tokio-test/tests/io.rs | 5 ++- 2 files changed, 13 insertions(+), 66 deletions(-) diff --git a/tokio-test/src/io_stream.rs b/tokio-test/src/io_stream.rs index caae899f537..0d5c11e583a 100644 --- a/tokio-test/src/io_stream.rs +++ b/tokio-test/src/io_stream.rs @@ -85,6 +85,15 @@ impl StreamBuilder { self } + /// Sequence a `read` operation. + /// + /// The next operation in the mock's script will be to expect a `read` call + /// and return `buf`. + pub fn read_stream(&mut self, string: String) -> &mut Self { + self.actions.push_back(Action::Read(string.into())); + self + } + /// Sequence a `read` operation that produces an error. /// /// The next operation in the mock's script will be to expect a `read` call @@ -211,75 +220,10 @@ impl Inner { Pin::new(&mut self.rx).poll_next(cx) } - fn remaining_wait(&mut self) -> Option { - match self.action() { - Some(&mut Action::Wait(dur)) => Some(dur), - _ => None, - } - } - - fn action(&mut self) -> Option<&mut Action> { - loop { - if self.actions.is_empty() { - return None; - } - - match self.actions[0] { - Action::Read(ref mut data) => { - if !data.is_empty() { - break; - } - } - Action::Write(ref mut data) => { - if !data.is_empty() { - break; - } - } - Action::Next(ref mut data) => { - if !data.is_empty() { - break; - } - } - Action::Wait(ref mut dur) => { - if let Some(until) = self.waiting { - let now = Instant::now(); - - if now < until { - break; - } - } else { - self.waiting = Some(Instant::now() + *dur); - break; - } - } - Action::ReadError(ref mut error) | Action::WriteError(ref mut error) => { - if error.is_some() { - break; - } - } - } - - let _action = self.actions.pop_front(); - } - - self.actions.front_mut() - } } // ===== impl Inner ===== -impl Mock { - fn maybe_wakeup_reader(&mut self) { - match self.inner.action() { - Some(&mut Action::Read(_)) | Some(&mut Action::ReadError(_)) | None => { - if let Some(waker) = self.inner.read_wait.take() { - waker.wake(); - } - } - _ => {} - } - } -} impl Stream for Mock { type Item = String; diff --git a/tokio-test/tests/io.rs b/tokio-test/tests/io.rs index c4f079db461..32bd42c23ba 100644 --- a/tokio-test/tests/io.rs +++ b/tokio-test/tests/io.rs @@ -93,7 +93,10 @@ async fn mock_panics_write_data_left() { #[tokio::test] async fn stream_read(){ - let mut mock = StreamBuilder::new().read(b"hello ").read(b"world!").build(); + + let mut mock = StreamBuilder::new() + .read_stream(String::from("hello ")) + .read_stream(String::from("world!")).build(); let res = mock.next().await.expect("hello "); assert_eq!(res, "hello "); } From ab39083e7e7b35713ad728b6b0935477804a890c Mon Sep 17 00:00:00 2001 From: Gabriel Grubba <70247653+Grubba27@users.noreply.github.com> Date: Thu, 10 Feb 2022 18:52:54 -0300 Subject: [PATCH 14/17] started addressing comments --- tokio-test/src/io_stream.rs | 67 ++++++++++++++++++++++++++++++++----- tokio-test/tests/io.rs | 11 +++--- 2 files changed, 65 insertions(+), 13 deletions(-) diff --git a/tokio-test/src/io_stream.rs b/tokio-test/src/io_stream.rs index 0d5c11e583a..bc31ffe7126 100644 --- a/tokio-test/src/io_stream.rs +++ b/tokio-test/src/io_stream.rs @@ -90,7 +90,7 @@ impl StreamBuilder { /// The next operation in the mock's script will be to expect a `read` call /// and return `buf`. pub fn read_stream(&mut self, string: String) -> &mut Self { - self.actions.push_back(Action::Read(string.into())); + self.actions.push_back(Action::Next(string)); self } @@ -134,10 +134,6 @@ impl StreamBuilder { } /// calls next value within stream. /// - pub fn next(&mut self) -> &mut Self { - self.actions.pop_front(); - self - } /// Build a `Mock` value according to the defined script. pub fn build(&mut self) -> Mock { @@ -185,6 +181,14 @@ impl Handle { self.tx.send(Action::Write(buf.into())).unwrap(); self } + /// Sequence a `write` operation. + /// + /// The next operation in the mock's script will be to expect a `write` + /// call. + pub fn read_stream(&mut self, string: String) -> &mut Self { + self.tx.send(Action::Next(string)).unwrap(); + self + } /// Sequence a `write` operation error. /// @@ -220,6 +224,52 @@ impl Inner { Pin::new(&mut self.rx).poll_next(cx) } + fn action(&mut self) -> Option<&mut Action> { + loop { + if self.actions.is_empty() { + return None; + } + + match self.actions[0] { + Action::Read(ref mut data) => { + if !data.is_empty() { + break; + } + } + Action::Write(ref mut data) => { + if !data.is_empty() { + break; + } + } + Action::Next(ref mut data) => { + if !data.is_empty() { + break; + } + } + Action::Wait(ref mut dur) => { + if let Some(until) = self.waiting { + let now = Instant::now(); + + if now < until { + break; + } + } else { + self.waiting = Some(Instant::now() + *dur); + break; + } + } + Action::ReadError(ref mut error) | Action::WriteError(ref mut error) => { + if error.is_some() { + break; + } + } + } + + let _action = self.actions.pop_front(); + } + + self.actions.front_mut() + } } // ===== impl Inner ===== @@ -235,16 +285,17 @@ impl Stream for Mock { ready!(Pin::new(sleep).poll(_cx)); } + // If a sleep is set, it has already fired self.inner.sleep = None; match ready!(self.inner.poll_action(_cx)) { - None => { - return Poll::Pending; - } Some(action) => { self.inner.actions.push_back(action); continue; } + None => { + return Poll::Ready(None); + } } } diff --git a/tokio-test/tests/io.rs b/tokio-test/tests/io.rs index 32bd42c23ba..78a8f70d22f 100644 --- a/tokio-test/tests/io.rs +++ b/tokio-test/tests/io.rs @@ -1,7 +1,8 @@ #![warn(rust_2018_idioms)] +use std::future::Future; use std::io; - +use futures_core::{Stream, TryStream}; use futures_util::StreamExt; @@ -93,12 +94,12 @@ async fn mock_panics_write_data_left() { #[tokio::test] async fn stream_read(){ - let mut mock = StreamBuilder::new() .read_stream(String::from("hello ")) - .read_stream(String::from("world!")).build(); - let res = mock.next().await.expect("hello "); - assert_eq!(res, "hello "); + .build(); + let txt = String::from("hello "); + let res = mock.next().await; + assert_eq!(res, Some(txt)); } #[tokio::test] From 24fad5c7203a76f4df2e424b64344d2ffb8a91fb Mon Sep 17 00:00:00 2001 From: Gabriel Grubba <70247653+Grubba27@users.noreply.github.com> Date: Thu, 30 Jun 2022 11:54:09 -0300 Subject: [PATCH 15/17] fix: adjusted missing returns --- tokio-test/src/io_stream.rs | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/tokio-test/src/io_stream.rs b/tokio-test/src/io_stream.rs index bc31ffe7126..09bea4d8856 100644 --- a/tokio-test/src/io_stream.rs +++ b/tokio-test/src/io_stream.rs @@ -282,16 +282,30 @@ impl Stream for Mock { loop { if let Some(ref mut sleep) = self.inner.sleep { - ready!(Pin::new(sleep).poll(_cx)); + ready!(Pin::new(sleep).poll_next(_cx)); } // If a sleep is set, it has already fired self.inner.sleep = None; match ready!(self.inner.poll_action(_cx)) { - Some(action) => { - self.inner.actions.push_back(action); - continue; + Some(Action::Read(data)) => { + return Poll::Ready(Some(data.to_string())); + } + Some(Action::Write(data)) => { + return Poll::Ready(Some(data.to_string())); + } + Some(Action::Next(data)) => { + return Poll::Ready(Some(data)); + } + Some(Action::Wait(dur)) => { + self.inner.sleep = Some(dur); + } + Some(Action::ReadError(error)) => { + return Poll::Ready(Some(error.unwrap().to_string())); + } + Some(Action::WriteError(error)) => { + return Poll::Ready(Some(error.unwrap().to_string())); } None => { return Poll::Ready(None); From b22f664d3ff2eac7370fb07a0e533649bfaf11dc Mon Sep 17 00:00:00 2001 From: Gabriel Grubba <70247653+Grubba27@users.noreply.github.com> Date: Thu, 30 Jun 2022 12:09:18 -0300 Subject: [PATCH 16/17] fix compilation errors --- tokio-test/src/io_stream.rs | 25 ++++++++++++++++--------- tokio-test/tests/io.rs | 13 ++++++++----- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/tokio-test/src/io_stream.rs b/tokio-test/src/io_stream.rs index 09bea4d8856..3257803f5d2 100644 --- a/tokio-test/src/io_stream.rs +++ b/tokio-test/src/io_stream.rs @@ -276,45 +276,52 @@ impl Inner { impl Stream for Mock { - type Item = String; + type Item = Vec; fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { loop { if let Some(ref mut sleep) = self.inner.sleep { - ready!(Pin::new(sleep).poll_next(_cx)); + ready!(Pin::new(sleep).poll(_cx)); } // If a sleep is set, it has already fired self.inner.sleep = None; - match ready!(self.inner.poll_action(_cx)) { + + // match ready! + match ready!(self.inner.poll_action(_cx)) { Some(Action::Read(data)) => { - return Poll::Ready(Some(data.to_string())); + return Poll::Ready(Some(data)); } Some(Action::Write(data)) => { - return Poll::Ready(Some(data.to_string())); + return Poll::Ready(Some(data)); } Some(Action::Next(data)) => { - return Poll::Ready(Some(data)); + return Poll::Ready(Some(data.as_bytes().to_vec())); } Some(Action::Wait(dur)) => { - self.inner.sleep = Some(dur); + self.inner.sleep = Some(delay_for(dur)); } Some(Action::ReadError(error)) => { - return Poll::Ready(Some(error.unwrap().to_string())); + return Poll::Ready(None); } Some(Action::WriteError(error)) => { - return Poll::Ready(Some(error.unwrap().to_string())); + return Poll::Ready(None); + } + Some(_) => { + continue; } None => { return Poll::Ready(None); } + } } } + } /// Ensures that Mock isn't dropped with data "inside". diff --git a/tokio-test/tests/io.rs b/tokio-test/tests/io.rs index 78a8f70d22f..8432a458b32 100644 --- a/tokio-test/tests/io.rs +++ b/tokio-test/tests/io.rs @@ -99,14 +99,17 @@ async fn stream_read(){ .build(); let txt = String::from("hello "); let res = mock.next().await; + assert_eq!(res.unwrap(), txt); assert_eq!(res, Some(txt)); } #[tokio::test] async fn stream_write(){ - let _unit: u8 = 1; - let _mock = StreamBuilder::new().write(b"hello ").write(b"world!").build(); - // let mut task = get task ? - // mock.poll_write(task, unit ).assert(b"hello " or something like this ? - // + let mut mock = StreamBuilder::new() + .write_stream(String::from("hello ")) + .build(); + let txt = String::from("hello "); + let res = mock.next().await; + assert_eq!(res.unwrap(), txt); + assert_eq!(res, Some(txt)); } \ No newline at end of file From dac639db6e66426223079dd543f3f179650b8706 Mon Sep 17 00:00:00 2001 From: Gabriel Grubba <70247653+Grubba27@users.noreply.github.com> Date: Mon, 20 Mar 2023 09:23:01 -0300 Subject: [PATCH 17/17] Update tokio-test/src/io_stream.rs Co-authored-by: devensiv --- tokio-test/src/io_stream.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tokio-test/src/io_stream.rs b/tokio-test/src/io_stream.rs index 3257803f5d2..cd6315cc35a 100644 --- a/tokio-test/src/io_stream.rs +++ b/tokio-test/src/io_stream.rs @@ -252,6 +252,8 @@ impl Inner { if now < until { break; + } else { + self.waiting = None; } } else { self.waiting = Some(Instant::now() + *dur);