Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP): Adding Stream mock to tokio-test #4463

Closed
wants to merge 18 commits into from
Closed
275 changes: 275 additions & 0 deletions tokio-test/src/io_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
#![cfg(not(loom))]

//! A mock type implementing [`poll_next`].
//!
//!
//! # Overview
//!
//! TODO
//!
//! # Usage
//!
//! Attempting to write data that the mock isn't expecting will result in a
//! panic.
//!
//! [`AsyncRead`]: tokio::io::AsyncRead
//! [`AsyncWrite`]: tokio::io::AsyncWrite


use tokio::sync::mpsc;
use tokio::time::{Duration, Instant, Sleep};
use tokio_stream::wrappers::UnboundedReceiverStream;
use futures_core::{ready, Stream};
use std::future::Future;
use std::collections::VecDeque;
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{self, Context, Poll, Waker};
use std::{cmp, io};

/// An I/O object that follows a predefined script.
///
/// This value is created by `Builder` and implements `AsyncRead` + `AsyncWrite`. It
/// follows the scenario described by the builder and panics otherwise.
#[derive(Debug)]
pub struct Mock {
inner: Inner,
}

/// A handle to send additional actions to the related `Mock`.
#[derive(Debug)]
pub struct Handle {
tx: mpsc::UnboundedSender<Action>,
}

/// Builds `Mock` instances.
#[derive(Debug, Clone, Default)]
pub struct StreamBuilder {
// Sequence of actions for the Mock to take
actions: VecDeque<Action>,
}

#[derive(Debug, Clone)]
enum Action {
Read(Vec<u8>),
Write(Vec<u8>),
Wait(Duration),
Next(String),
// Wrapped in Arc so that Builder can be cloned and Send.
// Mock is not cloned as does not need to check Rc for ref counts.
ReadError(Option<Arc<io::Error>>),
WriteError(Option<Arc<io::Error>>),
}

struct Inner {
actions: VecDeque<Action>,
waiting: Option<Instant>,
sleep: Option<Pin<Box<Sleep>>>,
read_wait: Option<Waker>,
rx: UnboundedReceiverStream<Action>,
}

impl StreamBuilder {
/// Return a new, empty `Builder.
pub fn new() -> Self {
Self::default()
}

/// Sequence a `read` operation.
///
/// The next operation in the mock's script will be to expect a `read` call
/// and return `buf`.
pub fn read(&mut self, buf: &[u8]) -> &mut Self {
self.actions.push_back(Action::Read(buf.into()));
self
}

/// Sequence a `read` operation.
///
/// The next operation in the mock's script will be to expect a `read` call
/// and return `buf`.
pub fn read_stream(&mut self, string: String) -> &mut Self {
self.actions.push_back(Action::Read(string.into()));
self
}

/// Sequence a `read` operation that produces an error.
///
/// The next operation in the mock's script will be to expect a `read` call
/// and return `error`.
pub fn read_error(&mut self, error: io::Error) -> &mut Self {
let error = Some(error.into());
self.actions.push_back(Action::ReadError(error));
self
}

/// Sequence a `write` operation.
///
/// The next operation in the mock's script will be to expect a `write`
/// call.
pub fn write(&mut self, buf: &[u8]) -> &mut Self {
self.actions.push_back(Action::Write(buf.into()));
self
}

/// Sequence a `write` operation that produces an error.
///
/// The next operation in the mock's script will be to expect a `write`
/// call that provides `error`.
pub fn write_error(&mut self, error: io::Error) -> &mut Self {
let error = Some(error.into());
self.actions.push_back(Action::WriteError(error));
self
}

/// Sequence a wait.
///
/// The next operation in the mock's script will be to wait without doing so
/// for `duration` amount of time.
pub fn wait(&mut self, duration: Duration) -> &mut Self {
let duration = cmp::max(duration, Duration::from_millis(1));
self.actions.push_back(Action::Wait(duration));
self
}
/// calls next value within stream.
///
pub fn next(&mut self) -> &mut Self {
self.actions.pop_front();
self
}

/// Build a `Mock` value according to the defined script.
pub fn build(&mut self) -> Mock {
let (mock, _) = self.build_with_handle();
mock
}

/// Build a `Mock` value paired with a handle
pub fn build_with_handle(&mut self) -> (Mock, Handle) {
let (inner, handle) = Inner::new(self.actions.clone());

let mock = Mock { inner };

(mock, handle)
}
}



impl Handle {
/// Sequence a `read` operation.
///
/// The next operation in the mock's script will be to expect a `read` call
/// and return `buf`.
pub fn read(&mut self, buf: &[u8]) -> &mut Self {
self.tx.send(Action::Read(buf.into())).unwrap();
self
}

/// Sequence a `read` operation error.
///
/// The next operation in the mock's script will be to expect a `read` call
/// and return `error`.
pub fn read_error(&mut self, error: io::Error) -> &mut Self {
let error = Some(error.into());
self.tx.send(Action::ReadError(error)).unwrap();
self
}

/// Sequence a `write` operation.
///
/// The next operation in the mock's script will be to expect a `write`
/// call.
pub fn write(&mut self, buf: &[u8]) -> &mut Self {
self.tx.send(Action::Write(buf.into())).unwrap();
self
}

/// Sequence a `write` operation error.
///
/// The next operation in the mock's script will be to expect a `write`
/// call error.
pub fn write_error(&mut self, error: io::Error) -> &mut Self {
let error = Some(error.into());
self.tx.send(Action::WriteError(error)).unwrap();
self
}
}

impl Inner {
fn new(actions: VecDeque<Action>) -> (Inner, Handle) {
let (tx, rx) = mpsc::unbounded_channel();

let rx = UnboundedReceiverStream::new(rx);

let inner = Inner {
actions,
sleep: None,
read_wait: None,
rx,
waiting: None,
};

let handle = Handle { tx };

(inner, handle)
}

fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>> {
Pin::new(&mut self.rx).poll_next(cx)
}

}

// ===== impl Inner =====


impl Stream for Mock {
type Item = String;

fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {

loop {
if let Some(ref mut sleep) = self.inner.sleep {
ready!(Pin::new(sleep).poll(_cx));
}

// If a sleep is set, it has already fired
self.inner.sleep = None;
match ready!(self.inner.poll_action(_cx)) {
None => {
return Poll::Pending;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If poll_action returns None, then isn't that because we are at the end and the stream is complete? Then you should return Poll::Ready(None) instead of Poll::Pending.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay now I see what I was doing wrong!!
I don't have any ideia why my mock.next.await is aways returning None for me. perhaps because of the cx ?
Maybe I should use a mock Context too for using pool_next(cx) ? or keep with next() but change the way that I call Action it at io_steram ?

}
Some(action) => {
self.inner.actions.push_back(action);
continue;
}
}
}

}

}

/// Ensures that Mock isn't dropped with data "inside".
impl Drop for Mock {
fn drop(&mut self) {
// Avoid double panicking, since makes debugging much harder.
if std::thread::panicking() {
return;
}

self.inner.actions.iter().for_each(|a| match a {
Action::Read(data) => assert!(data.is_empty(), "There is still data left to read."),
Action::Write(data) => assert!(data.is_empty(), "There is still data left to write."),
_ => (),
})
}
}

impl fmt::Debug for Inner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Inner {{...}}")
}
}
1 change: 1 addition & 0 deletions tokio-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! Tokio and Futures based testing utilities

pub mod io;
pub mod io_stream;

mod macros;
pub mod task;
Expand Down
25 changes: 25 additions & 0 deletions tokio-test/tests/io.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
#![warn(rust_2018_idioms)]

use std::io;



use futures_util::StreamExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

use tokio_test::io::Builder;
use tokio_test::io_stream::StreamBuilder;

#[tokio::test]
async fn read() {
Expand Down Expand Up @@ -84,3 +90,22 @@ async fn mock_panics_write_data_left() {
use tokio_test::io::Builder;
Builder::new().write(b"write").build();
}

#[tokio::test]
async fn stream_read(){

let mut mock = StreamBuilder::new()
.read_stream(String::from("hello "))
.read_stream(String::from("world!")).build();
let res = mock.next().await.expect("hello ");
assert_eq!(res, "hello ");
}

#[tokio::test]
async fn stream_write(){
let _unit: u8 = 1;
let _mock = StreamBuilder::new().write(b"hello ").write(b"world!").build();
// let mut task = get task ?
// mock.poll_write(task, unit ).assert(b"hello " or something like this ?
//
}