Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP): Adding Stream mock to tokio-test #4463

Closed
wants to merge 18 commits into from
Closed
5 changes: 5 additions & 0 deletions tokio-test/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ pub struct Builder {
// Sequence of actions for the Mock to take
actions: VecDeque<Action>,
}
#[derive(Debug, Clone, Default)]
pub struct StreamBuilder {
// Sequence of actions for the Mock to take
actions: VecDeque<Action>,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should go in its own file.


#[derive(Debug, Clone)]
enum Action {
Expand Down
373 changes: 373 additions & 0 deletions tokio-test/src/io_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,373 @@
#![cfg(not(loom))]

//! A mock type implementing [`AsyncRead`] and [`AsyncWrite`].
//!
//!
//! # Overview
//!
//! Provides a type that implements [`AsyncRead`] + [`AsyncWrite`] that can be configured
//! to handle an arbitrary sequence of read and write operations. This is useful
//! for writing unit tests for networking services as using an actual network
//! type is fairly non deterministic.
//!
//! # Usage
//!
//! Attempting to write data that the mock isn't expecting will result in a
//! panic.
//!
//! [`AsyncRead`]: tokio::io::AsyncRead
//! [`AsyncWrite`]: tokio::io::AsyncWrite

use tokio::io::{ReadBuf};
use tokio::sync::mpsc;
use tokio::time::{Duration, Instant, Sleep};
use tokio_stream::wrappers::UnboundedReceiverStream;

use futures_core::{Stream};
use std::collections::VecDeque;
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{self, Poll, Waker};
use std::{cmp, io};

/// An I/O object that follows a predefined script.
///
/// This value is created by `Builder` and implements `AsyncRead` + `AsyncWrite`. It
/// follows the scenario described by the builder and panics otherwise.
#[derive(Debug)]
pub struct Mock {
inner: Inner,
}

/// A handle to send additional actions to the related `Mock`.
#[derive(Debug)]
pub struct Handle {
tx: mpsc::UnboundedSender<Action>,
}

/// Builds `Mock` instances.
#[derive(Debug, Clone, Default)]
pub struct Builder {
// Sequence of actions for the Mock to take
actions: VecDeque<Action>,
}
#[derive(Debug, Clone, Default)]
pub struct StreamBuilder {
// Sequence of actions for the Mock to take
actions: VecDeque<Action>,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are there two builders?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry :/ I was sleppy but I did remove these two now


#[derive(Debug, Clone)]
enum Action {
Read(Vec<u8>),
Write(Vec<u8>),
Wait(Duration),
// Wrapped in Arc so that Builder can be cloned and Send.
// Mock is not cloned as does not need to check Rc for ref counts.
ReadError(Option<Arc<io::Error>>),
WriteError(Option<Arc<io::Error>>),
}

struct Inner {
actions: VecDeque<Action>,
waiting: Option<Instant>,
sleep: Option<Pin<Box<Sleep>>>,
read_wait: Option<Waker>,
rx: UnboundedReceiverStream<Action>,
}

impl StreamBuilder {
/// Return a new, empty `Builder.
pub fn new() -> Self {
Self::default()
}

/// Sequence a `read` operation.
///
/// The next operation in the mock's script will be to expect a `read` call
/// and return `buf`.
pub fn read(&mut self, buf: &[u8]) -> &mut Self {
self.actions.push_back(Action::Read(buf.into()));
self
}

/// Sequence a `read` operation that produces an error.
///
/// The next operation in the mock's script will be to expect a `read` call
/// and return `error`.
pub fn read_error(&mut self, error: io::Error) -> &mut Self {
let error = Some(error.into());
self.actions.push_back(Action::ReadError(error));
self
}

/// Sequence a `write` operation.
///
/// The next operation in the mock's script will be to expect a `write`
/// call.
pub fn write(&mut self, buf: &[u8]) -> &mut Self {
self.actions.push_back(Action::Write(buf.into()));
self
}

/// Sequence a `write` operation that produces an error.
///
/// The next operation in the mock's script will be to expect a `write`
/// call that provides `error`.
pub fn write_error(&mut self, error: io::Error) -> &mut Self {
let error = Some(error.into());
self.actions.push_back(Action::WriteError(error));
self
}

/// Sequence a wait.
///
/// The next operation in the mock's script will be to wait without doing so
/// for `duration` amount of time.
pub fn wait(&mut self, duration: Duration) -> &mut Self {
let duration = cmp::max(duration, Duration::from_millis(1));
self.actions.push_back(Action::Wait(duration));
self
}

/// Build a `Mock` value according to the defined script.
pub fn build(&mut self) -> Mock {
let (mock, _) = self.build_with_handle();
mock
}

/// Build a `Mock` value paired with a handle
pub fn build_with_handle(&mut self) -> (Mock, Handle) {
let (inner, handle) = Inner::new(self.actions.clone());

let mock = Mock { inner };

(mock, handle)
}
}



impl Handle {
/// Sequence a `read` operation.
///
/// The next operation in the mock's script will be to expect a `read` call
/// and return `buf`.
pub fn read(&mut self, buf: &[u8]) -> &mut Self {
self.tx.send(Action::Read(buf.into())).unwrap();
self
}

/// Sequence a `read` operation error.
///
/// The next operation in the mock's script will be to expect a `read` call
/// and return `error`.
pub fn read_error(&mut self, error: io::Error) -> &mut Self {
let error = Some(error.into());
self.tx.send(Action::ReadError(error)).unwrap();
self
}

/// Sequence a `write` operation.
///
/// The next operation in the mock's script will be to expect a `write`
/// call.
pub fn write(&mut self, buf: &[u8]) -> &mut Self {
self.tx.send(Action::Write(buf.into())).unwrap();
self
}

/// Sequence a `write` operation error.
///
/// The next operation in the mock's script will be to expect a `write`
/// call error.
pub fn write_error(&mut self, error: io::Error) -> &mut Self {
let error = Some(error.into());
self.tx.send(Action::WriteError(error)).unwrap();
self
}
}

impl Inner {
fn new(actions: VecDeque<Action>) -> (Inner, Handle) {
let (tx, rx) = mpsc::unbounded_channel();

let rx = UnboundedReceiverStream::new(rx);

let inner = Inner {
actions,
sleep: None,
read_wait: None,
rx,
waiting: None,
};

let handle = Handle { tx };

(inner, handle)
}

fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>> {
Pin::new(&mut self.rx).poll_next(cx)
}

fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> {
match self.action() {
Some(&mut Action::Read(ref mut data)) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are implementing a Stream. Streams do not have a read or write method. Instead, you should make an impl Stream for Mock block so that Mock becomes a stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now I see where I need to go. In impl Steam for Mock i should use :

    type Item = String; // like this because my mock would only use Strings as their items ?

then i would do something like pop from an array or like a loop from async read or async write within poll_next ? like this:

    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // TODO
        // Pop item from array who could i iterate self? or do a  while loop like in AsyncRead /AsyncWrite ?
        Poll::Pending
    }

// Figure out how much to copy
let n = cmp::min(dst.remaining(), data.len());

// Copy the data into the `dst` slice
dst.put_slice(&data[..n]);

// Drain the data from the source
data.drain(..n);

Ok(())
}
Some(&mut Action::ReadError(ref mut err)) => {
// As the
let err = err.take().expect("Should have been removed from actions.");
let err = Arc::try_unwrap(err).expect("There are no other references.");
Err(err)
}
Some(_) => {
// Either waiting or expecting a write
Err(io::ErrorKind::WouldBlock.into())
}
None => Ok(()),
}
}

fn write(&mut self, mut src: &[u8]) -> io::Result<usize> {
let mut ret = 0;

if self.actions.is_empty() {
return Err(io::ErrorKind::BrokenPipe.into());
}

if let Some(&mut Action::Wait(..)) = self.action() {
return Err(io::ErrorKind::WouldBlock.into());
}

if let Some(&mut Action::WriteError(ref mut err)) = self.action() {
let err = err.take().expect("Should have been removed from actions.");
let err = Arc::try_unwrap(err).expect("There are no other references.");
return Err(err);
}

for i in 0..self.actions.len() {
match self.actions[i] {
Action::Write(ref mut expect) => {
let n = cmp::min(src.len(), expect.len());

assert_eq!(&src[..n], &expect[..n]);

// Drop data that was matched
expect.drain(..n);
src = &src[n..];

ret += n;

if src.is_empty() {
return Ok(ret);
}
}
Action::Wait(..) | Action::WriteError(..) => {
break;
}
_ => {}
}

}

Ok(ret)
}

fn remaining_wait(&mut self) -> Option<Duration> {
match self.action() {
Some(&mut Action::Wait(dur)) => Some(dur),
_ => None,
}
}

fn action(&mut self) -> Option<&mut Action> {
loop {
if self.actions.is_empty() {
return None;
}

match self.actions[0] {
Action::Read(ref mut data) => {
if !data.is_empty() {
break;
}
}
Action::Write(ref mut data) => {
if !data.is_empty() {
break;
}
}
Action::Wait(ref mut dur) => {
if let Some(until) = self.waiting {
let now = Instant::now();

if now < until {
break;
}
Grubba27 marked this conversation as resolved.
Show resolved Hide resolved
} else {
self.waiting = Some(Instant::now() + *dur);
break;
}
}
Action::ReadError(ref mut error) | Action::WriteError(ref mut error) => {
if error.is_some() {
break;
}
}
}

let _action = self.actions.pop_front();
}

self.actions.front_mut()
}
}

// ===== impl Inner =====

impl Mock {
fn maybe_wakeup_reader(&mut self) {
match self.inner.action() {
Some(&mut Action::Read(_)) | Some(&mut Action::ReadError(_)) | None => {
if let Some(waker) = self.inner.read_wait.take() {
waker.wake();
}
}
_ => {}
}
}
}

/// Ensures that Mock isn't dropped with data "inside".
impl Drop for Mock {
fn drop(&mut self) {
// Avoid double panicking, since makes debugging much harder.
if std::thread::panicking() {
return;
}

self.inner.actions.iter().for_each(|a| match a {
Action::Read(data) => assert!(data.is_empty(), "There is still data left to read."),
Action::Write(data) => assert!(data.is_empty(), "There is still data left to write."),
_ => (),
})
}
}

impl fmt::Debug for Inner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Inner {{...}}")
}
}
1 change: 1 addition & 0 deletions tokio-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! Tokio and Futures based testing utilities

pub mod io;
pub mod io_stream;

mod macros;
pub mod task;
Expand Down
Loading