diff --git a/tokio-test/src/io_stream.rs b/tokio-test/src/io_stream.rs new file mode 100644 index 00000000000..cd6315cc35a --- /dev/null +++ b/tokio-test/src/io_stream.rs @@ -0,0 +1,349 @@ +#![cfg(not(loom))] + +//! A mock type implementing [`poll_next`]. +//! +//! +//! # Overview +//! +//! TODO +//! +//! # Usage +//! +//! Attempting to write data that the mock isn't expecting will result in a +//! panic. +//! +//! [`AsyncRead`]: tokio::io::AsyncRead +//! [`AsyncWrite`]: tokio::io::AsyncWrite + + +use tokio::sync::mpsc; +use tokio::time::{Duration, Instant, Sleep}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use futures_core::{ready, Stream}; +use std::future::Future; +use std::collections::VecDeque; +use std::fmt; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{self, Context, Poll, Waker}; +use std::{cmp, io}; + +/// An I/O object that follows a predefined script. +/// +/// This value is created by `Builder` and implements `AsyncRead` + `AsyncWrite`. It +/// follows the scenario described by the builder and panics otherwise. +#[derive(Debug)] +pub struct Mock { + inner: Inner, +} + +/// A handle to send additional actions to the related `Mock`. +#[derive(Debug)] +pub struct Handle { + tx: mpsc::UnboundedSender, +} + +/// Builds `Mock` instances. +#[derive(Debug, Clone, Default)] +pub struct StreamBuilder { + // Sequence of actions for the Mock to take + actions: VecDeque, +} + +#[derive(Debug, Clone)] +enum Action { + Read(Vec), + Write(Vec), + Wait(Duration), + Next(String), + // Wrapped in Arc so that Builder can be cloned and Send. + // Mock is not cloned as does not need to check Rc for ref counts. + ReadError(Option>), + WriteError(Option>), +} + +struct Inner { + actions: VecDeque, + waiting: Option, + sleep: Option>>, + read_wait: Option, + rx: UnboundedReceiverStream, +} + +impl StreamBuilder { + /// Return a new, empty `Builder. + pub fn new() -> Self { + Self::default() + } + + /// Sequence a `read` operation. + /// + /// The next operation in the mock's script will be to expect a `read` call + /// and return `buf`. + pub fn read(&mut self, buf: &[u8]) -> &mut Self { + self.actions.push_back(Action::Read(buf.into())); + self + } + + /// Sequence a `read` operation. + /// + /// The next operation in the mock's script will be to expect a `read` call + /// and return `buf`. + pub fn read_stream(&mut self, string: String) -> &mut Self { + self.actions.push_back(Action::Next(string)); + self + } + + /// Sequence a `read` operation that produces an error. + /// + /// The next operation in the mock's script will be to expect a `read` call + /// and return `error`. + pub fn read_error(&mut self, error: io::Error) -> &mut Self { + let error = Some(error.into()); + self.actions.push_back(Action::ReadError(error)); + self + } + + /// Sequence a `write` operation. + /// + /// The next operation in the mock's script will be to expect a `write` + /// call. + pub fn write(&mut self, buf: &[u8]) -> &mut Self { + self.actions.push_back(Action::Write(buf.into())); + self + } + + /// Sequence a `write` operation that produces an error. + /// + /// The next operation in the mock's script will be to expect a `write` + /// call that provides `error`. + pub fn write_error(&mut self, error: io::Error) -> &mut Self { + let error = Some(error.into()); + self.actions.push_back(Action::WriteError(error)); + self + } + + /// Sequence a wait. + /// + /// The next operation in the mock's script will be to wait without doing so + /// for `duration` amount of time. + pub fn wait(&mut self, duration: Duration) -> &mut Self { + let duration = cmp::max(duration, Duration::from_millis(1)); + self.actions.push_back(Action::Wait(duration)); + self + } + /// calls next value within stream. + /// + + /// Build a `Mock` value according to the defined script. + pub fn build(&mut self) -> Mock { + let (mock, _) = self.build_with_handle(); + mock + } + + /// Build a `Mock` value paired with a handle + pub fn build_with_handle(&mut self) -> (Mock, Handle) { + let (inner, handle) = Inner::new(self.actions.clone()); + + let mock = Mock { inner }; + + (mock, handle) + } +} + + + +impl Handle { + /// Sequence a `read` operation. + /// + /// The next operation in the mock's script will be to expect a `read` call + /// and return `buf`. + pub fn read(&mut self, buf: &[u8]) -> &mut Self { + self.tx.send(Action::Read(buf.into())).unwrap(); + self + } + + /// Sequence a `read` operation error. + /// + /// The next operation in the mock's script will be to expect a `read` call + /// and return `error`. + pub fn read_error(&mut self, error: io::Error) -> &mut Self { + let error = Some(error.into()); + self.tx.send(Action::ReadError(error)).unwrap(); + self + } + + /// Sequence a `write` operation. + /// + /// The next operation in the mock's script will be to expect a `write` + /// call. + pub fn write(&mut self, buf: &[u8]) -> &mut Self { + self.tx.send(Action::Write(buf.into())).unwrap(); + self + } + /// Sequence a `write` operation. + /// + /// The next operation in the mock's script will be to expect a `write` + /// call. + pub fn read_stream(&mut self, string: String) -> &mut Self { + self.tx.send(Action::Next(string)).unwrap(); + self + } + + /// Sequence a `write` operation error. + /// + /// The next operation in the mock's script will be to expect a `write` + /// call error. + pub fn write_error(&mut self, error: io::Error) -> &mut Self { + let error = Some(error.into()); + self.tx.send(Action::WriteError(error)).unwrap(); + self + } +} + +impl Inner { + fn new(actions: VecDeque) -> (Inner, Handle) { + let (tx, rx) = mpsc::unbounded_channel(); + + let rx = UnboundedReceiverStream::new(rx); + + let inner = Inner { + actions, + sleep: None, + read_wait: None, + rx, + waiting: None, + }; + + let handle = Handle { tx }; + + (inner, handle) + } + + fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll> { + Pin::new(&mut self.rx).poll_next(cx) + } + + fn action(&mut self) -> Option<&mut Action> { + loop { + if self.actions.is_empty() { + return None; + } + + match self.actions[0] { + Action::Read(ref mut data) => { + if !data.is_empty() { + break; + } + } + Action::Write(ref mut data) => { + if !data.is_empty() { + break; + } + } + Action::Next(ref mut data) => { + if !data.is_empty() { + break; + } + } + Action::Wait(ref mut dur) => { + if let Some(until) = self.waiting { + let now = Instant::now(); + + if now < until { + break; + } else { + self.waiting = None; + } + } else { + self.waiting = Some(Instant::now() + *dur); + break; + } + } + Action::ReadError(ref mut error) | Action::WriteError(ref mut error) => { + if error.is_some() { + break; + } + } + } + + let _action = self.actions.pop_front(); + } + + self.actions.front_mut() + } +} + +// ===== impl Inner ===== + + +impl Stream for Mock { + type Item = Vec; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + + loop { + if let Some(ref mut sleep) = self.inner.sleep { + ready!(Pin::new(sleep).poll(_cx)); + } + + + // If a sleep is set, it has already fired + self.inner.sleep = None; + + // match ready! + match ready!(self.inner.poll_action(_cx)) { + Some(Action::Read(data)) => { + return Poll::Ready(Some(data)); + } + Some(Action::Write(data)) => { + return Poll::Ready(Some(data)); + } + Some(Action::Next(data)) => { + return Poll::Ready(Some(data.as_bytes().to_vec())); + } + Some(Action::Wait(dur)) => { + self.inner.sleep = Some(delay_for(dur)); + } + Some(Action::ReadError(error)) => { + return Poll::Ready(None); + } + Some(Action::WriteError(error)) => { + return Poll::Ready(None); + } + Some(_) => { + continue; + } + None => { + return Poll::Ready(None); + } + + } + } + + } + + +} + +/// Ensures that Mock isn't dropped with data "inside". +impl Drop for Mock { + fn drop(&mut self) { + // Avoid double panicking, since makes debugging much harder. + if std::thread::panicking() { + return; + } + + self.inner.actions.iter().for_each(|a| match a { + Action::Read(data) => assert!(data.is_empty(), "There is still data left to read."), + Action::Write(data) => assert!(data.is_empty(), "There is still data left to write."), + _ => (), + }) + } +} + +impl fmt::Debug for Inner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Inner {{...}}") + } +} diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs index de3f0864a94..6df57d9ac77 100644 --- a/tokio-test/src/lib.rs +++ b/tokio-test/src/lib.rs @@ -12,6 +12,7 @@ //! Tokio and Futures based testing utilities pub mod io; +pub mod io_stream; mod macros; pub mod task; diff --git a/tokio-test/tests/io.rs b/tokio-test/tests/io.rs index f164abaf1ba..8432a458b32 100644 --- a/tokio-test/tests/io.rs +++ b/tokio-test/tests/io.rs @@ -1,8 +1,15 @@ #![warn(rust_2018_idioms)] +use std::future::Future; use std::io; +use futures_core::{Stream, TryStream}; + + +use futures_util::StreamExt; use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio_test::io::Builder; +use tokio_test::io_stream::StreamBuilder; #[tokio::test] async fn read() { @@ -84,3 +91,25 @@ async fn mock_panics_write_data_left() { use tokio_test::io::Builder; Builder::new().write(b"write").build(); } + +#[tokio::test] +async fn stream_read(){ + let mut mock = StreamBuilder::new() + .read_stream(String::from("hello ")) + .build(); + let txt = String::from("hello "); + let res = mock.next().await; + assert_eq!(res.unwrap(), txt); + assert_eq!(res, Some(txt)); +} + +#[tokio::test] +async fn stream_write(){ + let mut mock = StreamBuilder::new() + .write_stream(String::from("hello ")) + .build(); + let txt = String::from("hello "); + let res = mock.next().await; + assert_eq!(res.unwrap(), txt); + assert_eq!(res, Some(txt)); +} \ No newline at end of file