Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP): Adding Stream mock to tokio-test #4463

Closed
wants to merge 18 commits into from
Closed
305 changes: 305 additions & 0 deletions tokio-test/src/io_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
#![cfg(not(loom))]

//! A mock type implementing [`AsyncRead`] and [`AsyncWrite`].
//!
//!
//! # Overview
//!
//! Provides a type that implements [`AsyncRead`] + [`AsyncWrite`] that can be configured
//! to handle an arbitrary sequence of read and write operations. This is useful
//! for writing unit tests for networking services as using an actual network
//! type is fairly non deterministic.
//!
//! # Usage
//!
//! Attempting to write data that the mock isn't expecting will result in a
//! panic.
//!
//! [`AsyncRead`]: tokio::io::AsyncRead
//! [`AsyncWrite`]: tokio::io::AsyncWrite


use tokio::sync::mpsc;
use tokio::time::{Duration, Instant, Sleep};
use tokio_stream::wrappers::UnboundedReceiverStream;
use futures_core::{Stream};
use std::collections::VecDeque;
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{self, Context, Poll, Waker};
use std::{cmp, io};

/// An I/O object that follows a predefined script.
///
/// This value is created by `Builder` and implements `AsyncRead` + `AsyncWrite`. It
/// follows the scenario described by the builder and panics otherwise.
#[derive(Debug)]
pub struct Mock {
inner: Inner,
}

/// A handle to send additional actions to the related `Mock`.
#[derive(Debug)]
pub struct Handle {
tx: mpsc::UnboundedSender<Action>,
}

/// Builds `Mock` instances.
#[derive(Debug, Clone, Default)]
pub struct StreamBuilder {
// Sequence of actions for the Mock to take
actions: VecDeque<Action>,
}

#[derive(Debug, Clone)]
enum Action {
Read(Vec<u8>),
Write(Vec<u8>),
Wait(Duration),
// Wrapped in Arc so that Builder can be cloned and Send.
// Mock is not cloned as does not need to check Rc for ref counts.
ReadError(Option<Arc<io::Error>>),
WriteError(Option<Arc<io::Error>>),
}

struct Inner {
actions: VecDeque<Action>,
waiting: Option<Instant>,
sleep: Option<Pin<Box<Sleep>>>,
read_wait: Option<Waker>,
rx: UnboundedReceiverStream<Action>,
}

impl StreamBuilder {
/// Return a new, empty `Builder.
pub fn new() -> Self {
Self::default()
}

/// Sequence a `read` operation.
///
/// The next operation in the mock's script will be to expect a `read` call
/// and return `buf`.
pub fn read(&mut self, buf: &[u8]) -> &mut Self {
self.actions.push_back(Action::Read(buf.into()));
self
}

/// Sequence a `read` operation that produces an error.
///
/// The next operation in the mock's script will be to expect a `read` call
/// and return `error`.
pub fn read_error(&mut self, error: io::Error) -> &mut Self {
let error = Some(error.into());
self.actions.push_back(Action::ReadError(error));
self
}

/// Sequence a `write` operation.
///
/// The next operation in the mock's script will be to expect a `write`
/// call.
pub fn write(&mut self, buf: &[u8]) -> &mut Self {
self.actions.push_back(Action::Write(buf.into()));
self
}

/// Sequence a `write` operation that produces an error.
///
/// The next operation in the mock's script will be to expect a `write`
/// call that provides `error`.
pub fn write_error(&mut self, error: io::Error) -> &mut Self {
let error = Some(error.into());
self.actions.push_back(Action::WriteError(error));
self
}

/// Sequence a wait.
///
/// The next operation in the mock's script will be to wait without doing so
/// for `duration` amount of time.
pub fn wait(&mut self, duration: Duration) -> &mut Self {
let duration = cmp::max(duration, Duration::from_millis(1));
self.actions.push_back(Action::Wait(duration));
self
}

/// Build a `Mock` value according to the defined script.
pub fn build(&mut self) -> Mock {
let (mock, _) = self.build_with_handle();
mock
}

/// Build a `Mock` value paired with a handle
pub fn build_with_handle(&mut self) -> (Mock, Handle) {
let (inner, handle) = Inner::new(self.actions.clone());

let mock = Mock { inner };

(mock, handle)
}
}



impl Handle {
/// Sequence a `read` operation.
///
/// The next operation in the mock's script will be to expect a `read` call
/// and return `buf`.
pub fn read(&mut self, buf: &[u8]) -> &mut Self {
self.tx.send(Action::Read(buf.into())).unwrap();
self
}

/// Sequence a `read` operation error.
///
/// The next operation in the mock's script will be to expect a `read` call
/// and return `error`.
pub fn read_error(&mut self, error: io::Error) -> &mut Self {
let error = Some(error.into());
self.tx.send(Action::ReadError(error)).unwrap();
self
}

/// Sequence a `write` operation.
///
/// The next operation in the mock's script will be to expect a `write`
/// call.
pub fn write(&mut self, buf: &[u8]) -> &mut Self {
self.tx.send(Action::Write(buf.into())).unwrap();
self
}

/// Sequence a `write` operation error.
///
/// The next operation in the mock's script will be to expect a `write`
/// call error.
pub fn write_error(&mut self, error: io::Error) -> &mut Self {
let error = Some(error.into());
self.tx.send(Action::WriteError(error)).unwrap();
self
}
}

impl Inner {
fn new(actions: VecDeque<Action>) -> (Inner, Handle) {
let (tx, rx) = mpsc::unbounded_channel();

let rx = UnboundedReceiverStream::new(rx);

let inner = Inner {
actions,
sleep: None,
read_wait: None,
rx,
waiting: None,
};

let handle = Handle { tx };

(inner, handle)
}

fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>> {
Pin::new(&mut self.rx).poll_next(cx)
}

fn remaining_wait(&mut self) -> Option<Duration> {
match self.action() {
Some(&mut Action::Wait(dur)) => Some(dur),
_ => None,
}
}

fn action(&mut self) -> Option<&mut Action> {
loop {
if self.actions.is_empty() {
return None;
}

match self.actions[0] {
Action::Read(ref mut data) => {
if !data.is_empty() {
break;
}
}
Action::Write(ref mut data) => {
if !data.is_empty() {
break;
}
}
Action::Wait(ref mut dur) => {
if let Some(until) = self.waiting {
let now = Instant::now();

if now < until {
break;
}
Grubba27 marked this conversation as resolved.
Show resolved Hide resolved
} else {
self.waiting = Some(Instant::now() + *dur);
break;
}
}
Action::ReadError(ref mut error) | Action::WriteError(ref mut error) => {
if error.is_some() {
break;
}
}
}

let _action = self.actions.pop_front();
}

self.actions.front_mut()
}
}

// ===== impl Inner =====

impl Mock {
fn maybe_wakeup_reader(&mut self) {
match self.inner.action() {
Some(&mut Action::Read(_)) | Some(&mut Action::ReadError(_)) | None => {
if let Some(waker) = self.inner.read_wait.take() {
waker.wake();
}
}
_ => {}
}
}
}

impl Stream for Mock {
type Item = String;

fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// TODO
// Pop item from array who could i iterate self? or do a while loop like in AsyncRead /AsyncWrite ?
Poll::Pending
}

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically you need to return the next action from the list of actions, but if that action is a sleep, then you need to sleep instead. And if you are polled during a sleep, you need to keep sleeping until the sleep completes.

The other mock type implements this by having a loop, where the loop first checks if its currently sleeping, then if the ready! macro lets it get past the sleep (i.e. if the sleep has completed), then it pops the next item. If the next item is a sleep, then it sets up that new sleep and goes around the loop. Otherwise it returns the item.


/// Ensures that Mock isn't dropped with data "inside".
impl Drop for Mock {
fn drop(&mut self) {
// Avoid double panicking, since makes debugging much harder.
if std::thread::panicking() {
return;
}

self.inner.actions.iter().for_each(|a| match a {
Action::Read(data) => assert!(data.is_empty(), "There is still data left to read."),
Action::Write(data) => assert!(data.is_empty(), "There is still data left to write."),
_ => (),
})
}
}

impl fmt::Debug for Inner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Inner {{...}}")
}
}
1 change: 1 addition & 0 deletions tokio-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! Tokio and Futures based testing utilities

pub mod io;
pub mod io_stream;

mod macros;
pub mod task;
Expand Down
18 changes: 18 additions & 0 deletions tokio-test/tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::io;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_test::io::Builder;
use tokio_test::io_stream::StreamBuilder;

#[tokio::test]
async fn read() {
Expand Down Expand Up @@ -84,3 +85,20 @@ async fn mock_panics_write_data_left() {
use tokio_test::io::Builder;
Builder::new().write(b"write").build();
}

#[tokio::test]
async fn stream_read(){

let mut mock = StreamBuilder::new().read(b"hello ").read(b"world!").build();
// let res = mock.poll_next(); or something like this ?
assert_eq!(res, b"hello ");
}

#[tokio::test]
async fn stream_write(){
let unit: u8 = 1;
let mut mock = StreamBuilder::new().write(b"hello ").write(b"world!").build();
// let mut task = get task ?
// mock.poll_write(task, unit ).assert(b"hello " or something like this ?
//
}