From f0ea9d6f4c0a734ac4c235630f3d8cc51fb48f51 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 6 Feb 2018 07:26:21 -0800 Subject: [PATCH] Switch back to futures from crates.io (#113) Doing so requires copying the `current_thread` executor from GitHub into the repo. --- Cargo.toml | 4 - examples/chat-combinator.rs | 4 +- examples/chat.rs | 2 +- examples/compress.rs | 4 +- examples/connect.rs | 8 +- examples/echo-threads.rs | 6 +- examples/echo-udp.rs | 6 +- examples/echo.rs | 4 +- examples/hello.rs | 3 +- examples/hello_world.rs | 3 +- examples/proxy.rs | 5 +- examples/sink.rs | 4 +- examples/tinydb.rs | 4 +- examples/tinyhttp.rs | 2 +- examples/udp-codec.rs | 4 +- src/executor/current_thread.rs | 413 ++++++++++++++++++++ src/executor/mod.rs | 8 + src/executor/scheduler.rs | 663 +++++++++++++++++++++++++++++++++ src/executor/sleep.rs | 169 +++++++++ src/lib.rs | 1 + tests/buffered.rs | 3 +- tests/chain.rs | 3 +- tests/drop-core.rs | 8 +- tests/echo.rs | 3 +- tests/global.rs | 3 +- tests/limit.rs | 3 +- tests/line-frames.rs | 16 +- tests/pipe-hup.rs | 4 +- tests/stream-buffered.rs | 3 +- tests/tcp.rs | 7 +- tests/udp.rs | 13 +- 31 files changed, 1313 insertions(+), 70 deletions(-) create mode 100644 src/executor/current_thread.rs create mode 100644 src/executor/mod.rs create mode 100644 src/executor/scheduler.rs create mode 100644 src/executor/sleep.rs diff --git a/Cargo.toml b/Cargo.toml index 4c0293ea118..d3720eb374f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,3 @@ serde = "1.0" serde_derive = "1.0" serde_json = "1.0" time = "0.1" - -[patch.crates-io] -futures = { git = "https://github.com/rust-lang-nursery/futures-rs", branch = "tokio-reform" } -mio = { git = "https://github.com/carllerche/mio" } diff --git a/examples/chat-combinator.rs b/examples/chat-combinator.rs index 667f0e9a7b8..76e689b95e4 100644 --- a/examples/chat-combinator.rs +++ b/examples/chat-combinator.rs @@ -29,7 +29,7 @@ use std::io::{Error, ErrorKind, BufReader}; use std::sync::{Arc, Mutex}; use futures::Future; -use futures::future::{self, Executor}; +use futures::future::Executor; use futures::stream::{self, Stream}; use futures_cpupool::CpuPool; use tokio::net::TcpListener; @@ -134,5 +134,5 @@ fn main() { }); // execute server - future::blocking(srv).wait().unwrap(); + srv.wait().unwrap(); } diff --git a/examples/chat.rs b/examples/chat.rs index da8889fd8f6..1b155427db9 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -33,10 +33,10 @@ extern crate tokio; extern crate tokio_io; extern crate bytes; +use tokio::executor::current_thread; use tokio::net::{TcpListener, TcpStream}; use tokio_io::{AsyncRead}; use futures::prelude::*; -use futures::current_thread; use futures::sync::mpsc; use futures::future::{self, Either}; use bytes::{BytesMut, Bytes, BufMut}; diff --git a/examples/compress.rs b/examples/compress.rs index 501548ef619..3098abf746f 100644 --- a/examples/compress.rs +++ b/examples/compress.rs @@ -29,7 +29,7 @@ use std::env; use std::net::SocketAddr; use futures::{Future, Stream, Poll}; -use futures::future::{self, Executor}; +use futures::future::Executor; use futures_cpupool::CpuPool; use tokio::net::{TcpListener, TcpStream}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -62,7 +62,7 @@ fn main() { Ok(()) }); - future::blocking(server).wait().unwrap(); + server.wait().unwrap(); } /// The main workhorse of this example. This'll compress all data read from diff --git a/examples/connect.rs b/examples/connect.rs index f0619fbd490..a4160449f92 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -26,7 +26,7 @@ use std::net::SocketAddr; use std::thread; use futures::sync::mpsc; -use futures::{future, Sink, Stream}; +use futures::{Future, Sink, Stream}; use futures_cpupool::CpuPool; fn main() { @@ -71,9 +71,9 @@ fn main() { // loop. In this case, though, we know it's ok as the event loop isn't // otherwise running anything useful. let mut out = io::stdout(); - future::blocking(stdout.for_each(|chunk| { + stdout.for_each(|chunk| { out.write_all(&chunk) - })).wait().unwrap(); + }).wait().unwrap(); } mod tcp { @@ -244,7 +244,7 @@ fn read_stdin(mut tx: mpsc::Sender>) { Ok(n) => n, }; buf.truncate(n); - tx = match future::blocking(tx.send(buf)).wait() { + tx = match tx.send(buf).wait() { Ok(tx) => tx, Err(_) => break, }; diff --git a/examples/echo-threads.rs b/examples/echo-threads.rs index e2525c8055b..6ce8b1563a9 100644 --- a/examples/echo-threads.rs +++ b/examples/echo-threads.rs @@ -24,7 +24,7 @@ use std::net::SocketAddr; use std::thread; use futures::prelude::*; -use futures::future::{self, Executor}; +use futures::future::Executor; use futures::sync::mpsc; use futures_cpupool::CpuPool; use tokio_io::AsyncRead; @@ -61,7 +61,7 @@ fn main() { next = (next + 1) % channels.len(); Ok(()) }); - future::blocking(srv).wait().unwrap(); + srv.wait().unwrap(); } fn worker(rx: mpsc::UnboundedReceiver) { @@ -88,5 +88,5 @@ fn worker(rx: mpsc::UnboundedReceiver) { Ok(()) }); - future::blocking(done).wait().unwrap(); + done.wait().unwrap(); } diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs index 2ce43bc0e15..f7e2bf09188 100644 --- a/examples/echo-udp.rs +++ b/examples/echo-udp.rs @@ -18,7 +18,7 @@ extern crate tokio_io; use std::{env, io}; use std::net::SocketAddr; -use futures::{future, Future, Poll}; +use futures::{Future, Poll}; use tokio::net::UdpSocket; struct Server { @@ -58,9 +58,9 @@ fn main() { // Next we'll create a future to spawn (the one we defined above) and then // we'll block our current thread waiting on the result of the future - future::blocking(Server { + Server { socket: socket, buf: vec![0; 1024], to_send: None, - }).wait().unwrap(); + }.wait().unwrap(); } diff --git a/examples/echo.rs b/examples/echo.rs index 54a28ff7dd0..558f3a68a3d 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -26,7 +26,7 @@ use std::env; use std::net::SocketAddr; use futures::Future; -use futures::future::{self, Executor}; +use futures::future::Executor; use futures::stream::Stream; use futures_cpupool::CpuPool; use tokio_io::AsyncRead; @@ -114,5 +114,5 @@ fn main() { // And finally now that we've define what our server is, we run it! Here we // just need to execute the future we've created and wait for it to complete // using the standard methods in the `futures` crate. - future::blocking(done).wait().unwrap(); + done.wait().unwrap(); } diff --git a/examples/hello.rs b/examples/hello.rs index d9e46d170d8..5ceb431b164 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -19,7 +19,6 @@ extern crate tokio_io; use std::env; use std::net::SocketAddr; -use futures::future; use futures::prelude::*; use tokio::net::TcpListener; @@ -41,5 +40,5 @@ fn main() { Ok(()) }); - future::blocking(server).wait().unwrap(); + server.wait().unwrap(); } diff --git a/examples/hello_world.rs b/examples/hello_world.rs index 5cac1259859..fee06607115 100644 --- a/examples/hello_world.rs +++ b/examples/hello_world.rs @@ -18,9 +18,10 @@ extern crate tokio; extern crate tokio_io; extern crate futures; +use tokio::executor::current_thread; use tokio::net::TcpListener; use tokio_io::io; -use futures::{current_thread, Future, Stream}; +use futures::{Future, Stream}; pub fn main() { let addr = "127.0.0.1:6142".parse().unwrap(); diff --git a/examples/proxy.rs b/examples/proxy.rs index f73dd30d5e7..131fa41b500 100644 --- a/examples/proxy.rs +++ b/examples/proxy.rs @@ -28,7 +28,7 @@ use std::io::{self, Read, Write}; use futures::stream::Stream; use futures::{Future, Poll}; -use futures::future::{self, Executor}; +use futures::future::{Executor}; use futures_cpupool::CpuPool; use tokio::net::{TcpListener, TcpStream}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -92,7 +92,8 @@ fn main() { Ok(()) }); - future::blocking(done).wait().unwrap(); + + done.wait().unwrap(); } // This is a custom type used to have a custom implementation of the diff --git a/examples/sink.rs b/examples/sink.rs index 3fa5f5eda5f..21456adae1b 100644 --- a/examples/sink.rs +++ b/examples/sink.rs @@ -26,7 +26,7 @@ use std::iter; use std::net::SocketAddr; use futures::Future; -use futures::future::{self, Executor}; +use futures::future::Executor; use futures::stream::{self, Stream}; use futures_cpupool::CpuPool; use tokio_io::IoFuture; @@ -46,7 +46,7 @@ fn main() { pool.execute(write(socket).or_else(|_| Ok(()))).unwrap(); Ok(()) }); - future::blocking(server).wait().unwrap(); + server.wait().unwrap(); } fn write(socket: TcpStream) -> IoFuture<()> { diff --git a/examples/tinydb.rs b/examples/tinydb.rs index de75040487e..0a68a314633 100644 --- a/examples/tinydb.rs +++ b/examples/tinydb.rs @@ -51,7 +51,7 @@ use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use futures::prelude::*; -use futures::future::{self, Executor}; +use futures::future::Executor; use futures_cpupool::CpuPool; use tokio::net::TcpListener; use tokio_io::AsyncRead; @@ -160,7 +160,7 @@ fn main() { Ok(()) }); - future::blocking(done).wait().unwrap(); + done.wait().unwrap(); } impl Request { diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs index b0106d63672..2f982484927 100644 --- a/examples/tinyhttp.rs +++ b/examples/tinyhttp.rs @@ -90,7 +90,7 @@ fn worker(rx: mpsc::UnboundedReceiver) { })).unwrap(); Ok(()) }); - future::blocking(done).wait().unwrap(); + done.wait().unwrap(); } /// "Server logic" is implemented in this function. diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index 5c11e9f3002..c874ebd7810 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -15,7 +15,7 @@ use std::io; use std::net::SocketAddr; use futures::{Future, Stream, Sink}; -use futures::future::{self, Executor}; +use futures::future::Executor; use futures_cpupool::CpuPool; use tokio::net::{UdpSocket, UdpCodec}; @@ -76,5 +76,5 @@ fn main() { // Spawn the sender of pongs and then wait for our pinger to finish. pool.execute(b.then(|_| Ok(()))).unwrap(); - drop(future::blocking(a).wait()); + drop(a.wait()); } diff --git a/src/executor/current_thread.rs b/src/executor/current_thread.rs new file mode 100644 index 00000000000..edc011ed761 --- /dev/null +++ b/src/executor/current_thread.rs @@ -0,0 +1,413 @@ +//! Execute tasks on the current thread +//! +//! This module implements an executor that keeps futures on the same thread +//! that they are submitted on. This allows it to execute futures that are +//! not `Send`. +//! +//! Before being able to spawn futures with this module, an executor +//! context must be setup by calling [`run`]. From within that context [`spawn`] +//! may be called with the future to run in the background. +//! +//! ``` +//! # extern crate tokio; +//! # extern crate futures; +//! # use tokio::executor::current_thread; +//! use futures::future::lazy; +//! +//! // Calling execute here results in a panic +//! // current_thread::spawn(my_future); +//! +//! # pub fn main() { +//! current_thread::run(|_| { +//! // The execution context is setup, futures may be executed. +//! current_thread::spawn(lazy(|| { +//! println!("called from the current thread executor"); +//! Ok(()) +//! })); +//! }); +//! # } +//! ``` +//! +//! # Execution model +//! +//! When an execution context is setup with `run` the current thread will block +//! and all the futures managed by the executor are driven to completion. +//! Whenever a future receives a notification, it is pushed to the end of a +//! scheduled list. The executor will drain this list, advancing the state of +//! each future. +//! +//! All futures managed by this module will remain on the current thread, +//! as such, this module is able to safely execute futures that are not `Send`. +//! +//! Once a future is complete, it is dropped. Once all futures are completed, +//! [`run`] will unblock and return. +//! +//! This module makes a best effort to fairly schedule futures that it manages. +//! +//! [`spawn`]: fn.spawn.html +//! [`run`]: fn.run.html + +use super::{scheduler}; +use super::sleep::{self, Sleep, Wakeup}; + +use futures::Async; +use futures::executor::{self, Spawn}; +use futures::future::{Future, Executor, ExecuteError, ExecuteErrorKind}; + +use std::{fmt, thread}; +use std::cell::Cell; +use std::rc::Rc; + +/// Executes futures on the current thread. +/// +/// All futures executed using this executor will be executed on the current +/// thread. As such, `run` will wait for these futures to complete before +/// returning. +/// +/// For more details, see the [module level](index.html) documentation. +#[derive(Debug, Clone)] +pub struct TaskExecutor { + // Prevent the handle from moving across threads. + _p: ::std::marker::PhantomData>, +} + +/// A context yielded to the closure provided to `run`. +/// +/// This context is mostly a future-proofing of the library to add future +/// contextual information into it. Currently it only contains the `Enter` +/// instance used to reserve the current thread for blocking on futures. +#[derive(Debug)] +pub struct Context<'a> { + cancel: &'a Cell, +} + +/// Implements the "blocking" logic for the current thread executor. A +/// `TaskRunner` will be created during `run` and will sit on the stack until +/// execution is complete. +#[derive(Debug)] +struct TaskRunner { + /// Executes futures. + scheduler: Scheduler, +} + +struct CurrentRunner { + /// When set to true, the executor should return immediately, even if there + /// still futures to run. + cancel: Cell, + + /// Number of futures currently being executed by the runner. + num_futures: Cell, + + /// Raw pointer to the current scheduler pusher. + /// + /// The raw pointer is required in order to store it in a thread-local slot. + schedule: Cell>, +} + +type Scheduler = scheduler::Scheduler; +type Schedule = scheduler::Schedule; + +struct Task(Spawn>>); + +/// Current thread's task runner. This is set in `TaskRunner::with` +thread_local!(static CURRENT: CurrentRunner = CurrentRunner { + cancel: Cell::new(false), + num_futures: Cell::new(0), + schedule: Cell::new(None), +}); + +/// Calls the given closure, then block until all futures submitted for +/// execution complete. +/// +/// In more detail, this function will block until: +/// - All executing futures are complete, or +/// - `cancel_all_spawned` is invoked. +pub fn run(f: F) -> R +where F: FnOnce(&mut Context) -> R +{ + sleep::BlockThread::with_current(|mut sleep| { + TaskRunner::enter(&mut sleep, f) + }) +} + +/// Calls the given closure with a custom sleep strategy. +/// +/// This function is the same as `run` except that it allows customizing the +/// sleep strategy. +pub fn run_with_sleep(sleep: &mut S, f: F) -> R +where F: FnOnce(&mut Context) -> R, + S: Sleep, +{ + TaskRunner::enter(sleep, f) +} + +/// Executes a future on the current thread. +/// +/// The provided future must complete or be canceled before `run` will return. +/// +/// # Panics +/// +/// This function can only be invoked from the context of a `run` call; any +/// other use will result in a panic. +pub fn spawn(future: F) +where F: Future + 'static +{ + execute(future).unwrap_or_else(|_| { + panic!("cannot call `execute` unless the thread is already \ + in the context of a call to `run`") + }) +} + +/// Returns an executor that executes futures on the current thread. +/// +/// The user of `TaskExecutor` must ensure that when a future is submitted, +/// that it is done within the context of a call to `run`. +/// +/// For more details, see the [module level](index.html) documentation. +pub fn task_executor() -> TaskExecutor { + TaskExecutor { + _p: ::std::marker::PhantomData, + } +} + +impl Executor for TaskExecutor +where F: Future + 'static +{ + fn execute(&self, future: F) -> Result<(), ExecuteError> { + execute(future) + } +} + +impl<'a> Context<'a> { + /// Cancels *all* executing futures. + pub fn cancel_all_spawned(&self) { + self.cancel.set(true); + } +} + +/// Submits a future to the current executor. This is done by +/// checking the thread-local variable tracking the current executor. +/// +/// If this function is not called in context of an executor, i.e. outside of +/// `run`, then `Err` is returned. +/// +/// This function does not panic. +fn execute(future: F) -> Result<(), ExecuteError> +where F: Future + 'static, +{ + CURRENT.with(|current| { + match current.schedule.get() { + Some(schedule) => { + let spawned = Task::new(future); + + let num_futures = current.num_futures.get(); + current.num_futures.set(num_futures + 1); + + unsafe { (*schedule).schedule(spawned); } + + Ok(()) + } + None => { + Err(ExecuteError::new(ExecuteErrorKind::Shutdown, future)) + } + } + }) +} + +impl TaskRunner +where T: Wakeup, +{ + /// Return a new `TaskRunner` + fn new(wakeup: T) -> TaskRunner { + let scheduler = scheduler::Scheduler::new(wakeup); + + TaskRunner { + scheduler: scheduler, + } + } + + /// Enter a new `TaskRunner` context + /// + /// This function handles advancing the scheduler state and blocking while + /// listening for notified futures. + /// + /// First, a new task runner is created backed by the current + /// `sleep::BlockThread` handle. Passing `sleep::BlockThread` into the + /// scheduler is how scheduled futures unblock the thread, signalling that + /// there is more work to do. + /// + /// Before any future is polled, the scheduler must be set to a thread-local + /// variable so that `execute` is able to submit new futures to the current + /// executor. Because `Scheduler::schedule` requires `&mut self`, this + /// introduces a mutability hazard. This hazard is minimized with some + /// indirection. See `set_schedule` for more details. + /// + /// Once all context is setup, the init closure is invoked. This is the + /// "boostrapping" process that executes the initial futures into the + /// scheduler. After this, the function loops and advances the scheduler + /// state until all futures complete. When no scheduled futures are ready to + /// be advanced, the thread is blocked using `S: Sleep`. + fn enter(sleep: &mut S, f: F) -> R + where F: FnOnce(&mut Context) -> R, + S: Sleep, + { + let mut runner = TaskRunner::new(sleep.wakeup()); + + CURRENT.with(|current| { + // Make sure that another task runner is not set. + // + // This should not be ever possible due to how `set_schedule` + // is setup, but better safe than sorry! + assert!(current.schedule.get().is_none()); + + // Enter an execution scope + let mut ctx = Context { + cancel: ¤t.cancel, + }; + + // Set the scheduler to the TLS and perform setup work, + // returning a future to execute. + // + // This could possibly suubmit other futures for execution. + let ret = current.set_schedule(&mut runner.scheduler as &mut Schedule, || { + f(&mut ctx) + }); + + // Execute the runner. + // + // This function will not return until either + // + // a) All futures have completed execution + // b) `cancel_all_spawned` is called, forcing the executor to + // return. + runner.run(sleep, current); + + // Not technically required, but this makes the fact that `ctx` + // needs to live until this point explicit. + drop(ctx); + + ret + }) + } + + fn run(&mut self, sleep: &mut S, current: &CurrentRunner) + where S: Sleep, + { + use super::scheduler::Tick; + + while current.is_running() { + // Try to advance the scheduler state + let res = self.scheduler.tick(|scheduler, spawned, notify| { + // `scheduler` is a `&mut Scheduler` reference returned back + // from the scheduler to us, but only within the context of this + // closure. + // + // This lets us push new futures into the scheduler. It also + // lets us pass the scheduler mutable reference into + // `set_schedule`, which sets the thread-local variable that + // `spawn` uses for submitting new futures to the + // "current" executor. + // + // See `set_schedule` documentation for more details on how we + // guard against mutable pointer aliasing. + current.set_schedule(scheduler as &mut Schedule, || { + match spawned.0.poll_future_notify(notify, 0) { + Ok(Async::Ready(_)) | Err(_) => { + Async::Ready(()) + } + Ok(Async::NotReady) => Async::NotReady, + } + }) + }); + + // Process the result of ticking the scheduler + match res { + // A future completed. `is_daemon` is true when the future was + // submitted as a daemon future. + Tick::Data(_) => { + let num_futures = current.num_futures.get(); + debug_assert!(num_futures > 0); + current.num_futures.set(num_futures - 1); + }, + Tick::Empty => { + // The scheduler did not have any work to process. + // + // At this point, the scheduler is currently running given + // that the `while` condition was true and no user code has + // been executed. + + debug_assert!(current.is_running()); + + // Block the current thread until a future managed by the scheduler + // receives a readiness notification. + sleep.sleep(); + } + Tick::Inconsistent => { + // Yield the thread and loop + thread::yield_now(); + } + } + } + } +} + +impl CurrentRunner { + /// Set the provided schedule handle to the TLS slot for the duration of the + /// closure. + /// + /// `spawn` will access the CURRENT thread-local variable in + /// order to push a future into the scheduler. This requires a `&mut` + /// reference, introducing mutability hazards. + /// + /// Rust requires that `&mut` references are not aliases, i.e. there are + /// never two "live" mutable references to the same piece of data. In order + /// to store a `&mut` reference in a thread-local variable, we must ensure + /// that one can not access the scheduler anywhere else. + /// + /// To do this, we only allow access to the thread local variable from + /// within the closure passed to `set_schedule`. This function also takes a + /// &mut reference to the scheduler, which is essentially holding a "lock" + /// on that reference, preventing any other location in the code from + /// also getting that &mut reference. + /// + /// When `set_schedule` returns, the thread-local variable containing the + /// mut reference is set to null. This is done even if the closure panics. + /// + /// This reduces the odds of introducing pointer aliasing. + fn set_schedule(&self, schedule: &mut Schedule, f: F) -> R + where F: FnOnce() -> R + { + // Ensure that the runner is removed from the thread-local context + // when leaving the scope. This handles cases that involve panicking. + struct Reset<'a>(&'a CurrentRunner); + + impl<'a> Drop for Reset<'a> { + fn drop(&mut self) { + self.0.schedule.set(None); + } + } + + let _reset = Reset(self); + + self.schedule.set(Some(schedule as *mut Schedule)); + + f() + } + + fn is_running(&self) -> bool { + self.num_futures.get() > 0 && !self.cancel.get() + } +} + +impl Task { + fn new + 'static>(f: T) -> Self { + Task(executor::spawn(Box::new(f))) + } +} + +impl fmt::Debug for Task { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Task") + .finish() + } +} diff --git a/src/executor/mod.rs b/src/executor/mod.rs new file mode 100644 index 00000000000..896676c8129 --- /dev/null +++ b/src/executor/mod.rs @@ -0,0 +1,8 @@ +//! Task execution utilities. +//! +//! This module only contains `current_thread`, an executor for multiplexing +//! many tasks on a single thread. + +pub mod current_thread; +mod scheduler; +mod sleep; diff --git a/src/executor/scheduler.rs b/src/executor/scheduler.rs new file mode 100644 index 00000000000..e9e6e1a3112 --- /dev/null +++ b/src/executor/scheduler.rs @@ -0,0 +1,663 @@ +//! An unbounded set of futures. + +use super::sleep::Wakeup; + +use futures::Async; +use futures::executor::{self, UnsafeNotify, NotifyHandle}; + +use std::cell::UnsafeCell; +use std::fmt::{self, Debug}; +use std::marker::PhantomData; +use std::mem; +use std::ptr; +use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel}; +use std::sync::atomic::{AtomicPtr, AtomicBool}; +use std::sync::{Arc, Weak}; +use std::usize; + +/// A generic task-aware scheduler. +/// +/// This is used both by `FuturesUnordered` and the current-thread executor. +pub struct Scheduler { + inner: Arc>, + nodes: List, +} + +/// Schedule new futures +pub trait Schedule { + /// Schedule a new future. + fn schedule(&mut self, item: T); +} + +pub struct Notify<'a, T: 'a, W: 'a>(&'a Arc>); + +// A linked-list of nodes +struct List { + len: usize, + head: *const Node, + tail: *const Node, +} + +unsafe impl Send for Scheduler {} +unsafe impl Sync for Scheduler {} + +// Scheduler is implemented using two linked lists. The first linked list tracks +// all items managed by a `Scheduler`. This list is stored on the `Scheduler` +// struct and is **not** thread safe. The second linked list is an +// implementation of the intrusive MPSC queue algorithm described by +// 1024cores.net and is stored on `Inner`. This linked list can push items to +// the back concurrently but only one consumer may pop from the front. To +// enforce this requirement, all popping will be performed via fns on +// `Scheduler` that take `&mut self`. +// +// When a item is submitted to the set a node is allocated and inserted in +// both linked lists. This means that all insertion operations **must** be +// originated from `Scheduler` with `&mut self` The next call to `tick` will +// (eventually) see this node and call `poll` on the item. +// +// Nodes are wrapped in `Arc` cells which manage the lifetime of the node. +// However, `Arc` handles are sometimes cast to `*const Node` pointers. +// Specifically, when a node is stored in at least one of the two lists +// described above, this represents a logical `Arc` handle. This is how +// `Scheduler` maintains its reference to all nodes it manages. Each +// `NotifyHande` instance is an `Arc` as well. +// +// When `Scheduler` drops, it clears the linked list of all nodes that it +// manages. When doing so, it must attempt to decrement the reference count (by +// dropping an Arc handle). However, it can **only** decrement the reference +// count if the node is not currently stored in the mpsc channel. If the node +// **is** "queued" in the mpsc channel, then the arc reference count cannot be +// decremented. Once the node is popped from the mpsc channel, then the final +// arc reference count can be decremented, thus freeing the node. + +#[allow(missing_debug_implementations)] +struct Inner { + // The task using `Scheduler`. + wakeup: W, + + // Head/tail of the readiness queue + head_readiness: AtomicPtr>, + tail_readiness: UnsafeCell<*const Node>, + + // Used as part of the MPSC queue algorithm + stub: Arc>, +} + +struct Node { + // The item + item: UnsafeCell>, + + // Next pointer for linked list tracking all active nodes + next_all: UnsafeCell<*const Node>, + + // Previous node in linked list tracking all active nodes + prev_all: UnsafeCell<*const Node>, + + // Next pointer in readiness queue + next_readiness: AtomicPtr>, + + // Whether or not this node is currently in the mpsc queue. + queued: AtomicBool, + + // Queue that we'll be enqueued to when notified + queue: Weak>, +} + +/// Returned by the `Scheduler::tick` function, allowing the caller to decide +/// what action to take next. +pub enum Tick { + Data(T), + Empty, + Inconsistent, +} + +/// Returned by `Inner::dequeue`, representing either a dequeue success (with +/// the dequeued node), an empty list, or an inconsistent state. +/// +/// The inconsistent state is described in more detail at [1024cores], but +/// roughly indicates that a node will be ready to dequeue sometime shortly in +/// the future and the caller should try again soon. +/// +/// [1024cores]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue +enum Dequeue { + Data(*const Node), + Empty, + Inconsistent, +} + +impl Scheduler +where W: Wakeup, +{ + /// Constructs a new, empty `Scheduler` + /// + /// The returned `Scheduler` does not contain any items and, in this + /// state, `Scheduler::poll` will return `Ok(Async::Ready(None))`. + pub fn new(wakeup: W) -> Self { + let stub = Arc::new(Node { + item: UnsafeCell::new(None), + next_all: UnsafeCell::new(ptr::null()), + prev_all: UnsafeCell::new(ptr::null()), + next_readiness: AtomicPtr::new(ptr::null_mut()), + queued: AtomicBool::new(true), + queue: Weak::new(), + }); + let stub_ptr = &*stub as *const Node; + let inner = Arc::new(Inner { + wakeup: wakeup, + head_readiness: AtomicPtr::new(stub_ptr as *mut _), + tail_readiness: UnsafeCell::new(stub_ptr), + stub: stub, + }); + + Scheduler { + inner: inner, + nodes: List::new(), + } + } +} + +impl Scheduler { + /// Advance the scheduler state. + /// + /// This function should be called whenever the caller is notified via a + /// wakeup. + pub fn tick(&mut self, mut f: F) -> Tick + where F: FnMut(&mut Self, &mut T, &Notify) -> Async + { + loop { + let node = match unsafe { self.inner.dequeue() } { + Dequeue::Empty => { + return Tick::Empty; + } + Dequeue::Inconsistent => { + return Tick::Inconsistent; + } + Dequeue::Data(node) => node, + }; + + debug_assert!(node != self.inner.stub()); + + unsafe { + if (*(*node).item.get()).is_none() { + // The node has already been released. However, while it was + // being released, another thread notified it, which + // resulted in it getting pushed into the mpsc channel. + // + // In this case, we just dec the ref count. + let node = ptr2arc(node); + assert!((*node.next_all.get()).is_null()); + assert!((*node.prev_all.get()).is_null()); + continue + }; + + // We're going to need to be very careful if the `poll` + // function below panics. We need to (a) not leak memory and + // (b) ensure that we still don't have any use-after-frees. To + // manage this we do a few things: + // + // * This "bomb" here will call `release_node` if dropped + // abnormally. That way we'll be sure the memory management + // of the `node` is managed correctly. + // + // * We unlink the node from our internal queue to preemptively + // assume is is complete (will return Ready or panic), in + // which case we'll want to discard it regardless. + // + struct Bomb<'a, T: 'a, W: 'a> { + queue: &'a mut Scheduler, + node: Option>>, + } + + impl<'a, T, W> Drop for Bomb<'a, T, W> { + fn drop(&mut self) { + if let Some(node) = self.node.take() { + release_node(node); + } + } + } + + let mut bomb = Bomb { + node: Some(self.nodes.remove(node)), + queue: self, + }; + + // Now that the bomb holds the node, create a new scope. This + // scope ensures that the borrow will go out of scope before we + // mutate the node pointer in `bomb` again + let res = { + let node = bomb.node.as_ref().unwrap(); + + // Get a reference to the inner future. We already ensured + // that the item `is_some`. + let item = (*node.item.get()).as_mut().unwrap(); + + // Unset queued flag... this must be done before + // polling. This ensures that the item gets + // rescheduled if it is notified **during** a call + // to `poll`. + let prev = (*node).queued.swap(false, SeqCst); + assert!(prev); + + // Poll the underlying item with the appropriate `notify` + // implementation. This is where a large bit of the unsafety + // starts to stem from internally. The `notify` instance itself + // is basically just our `Arc>` and tracks the mpsc + // queue of ready items. + // + // Critically though `Node` won't actually access `T`, the + // item, while it's floating around inside of `Task` + // instances. These structs will basically just use `T` to size + // the internal allocation, appropriately accessing fields and + // deallocating the node if need be. + let queue = &mut *bomb.queue; + let notify = Notify(bomb.node.as_ref().unwrap()); + f(queue, item, ¬ify) + }; + + let ret = match res { + Async::NotReady => { + // The future is not done, push it back into the "all + // node" list. + let node = bomb.node.take().unwrap(); + bomb.queue.nodes.push_back(node); + continue; + } + Async::Ready(v) => { + // `bomb` will take care of unlinking and releasing the + // node. + Tick::Data(v) + } + }; + + return ret + } + } + } +} + +impl Schedule for Scheduler { + fn schedule(&mut self, item: T) { + let node = Arc::new(Node { + item: UnsafeCell::new(Some(item)), + next_all: UnsafeCell::new(ptr::null_mut()), + prev_all: UnsafeCell::new(ptr::null_mut()), + next_readiness: AtomicPtr::new(ptr::null_mut()), + queued: AtomicBool::new(true), + queue: Arc::downgrade(&self.inner), + }); + + // Right now our node has a strong reference count of 1. We transfer + // ownership of this reference count to our internal linked list + // and we'll reclaim ownership through the `unlink` function below. + let ptr = self.nodes.push_back(node); + + // We'll need to get the item "into the system" to start tracking it, + // e.g. getting its unpark notifications going to us tracking which + // items are ready. To do that we unconditionally enqueue it for + // polling here. + self.inner.enqueue(ptr); + } +} + +fn release_node(node: Arc>) { + // The item is done, try to reset the queued flag. This will prevent + // `notify` from doing any work in the item + let prev = node.queued.swap(true, SeqCst); + + // Drop the item, even if it hasn't finished yet. This is safe + // because we're dropping the item on the thread that owns + // `Scheduler`, which correctly tracks T's lifetimes and such. + unsafe { + drop((*node.item.get()).take()); + } + + // If the queued flag was previously set then it means that this node + // is still in our internal mpsc queue. We then transfer ownership + // of our reference count to the mpsc queue, and it'll come along and + // free it later, noticing that the item is `None`. + // + // If, however, the queued flag was *not* set then we're safe to + // release our reference count on the internal node. The queued flag + // was set above so all item `enqueue` operations will not actually + // enqueue the node, so our node will never see the mpsc queue again. + // The node itself will be deallocated once all reference counts have + // been dropped by the various owning tasks elsewhere. + if prev { + mem::forget(node); + } +} + +impl Debug for Scheduler { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "Scheduler {{ ... }}") + } +} + +impl Drop for Scheduler { + fn drop(&mut self) { + // When a `Scheduler` is dropped we want to drop all items associated + // with it. At the same time though there may be tons of `Task` handles + // flying around which contain `Node` references inside them. We'll + // let those naturally get deallocated when the `Task` itself goes out + // of scope or gets notified. + while let Some(node) = self.nodes.pop_front() { + release_node(node); + } + + // Note that at this point we could still have a bunch of nodes in the + // mpsc queue. None of those nodes, however, have items associated + // with them so they're safe to destroy on any thread. At this point + // the `Scheduler` struct, the owner of the one strong reference + // to `Inner` will drop the strong reference. At that point + // whichever thread releases the strong refcount last (be it this + // thread or some other thread as part of an `upgrade`) will clear out + // the mpsc queue and free all remaining nodes. + // + // While that freeing operation isn't guaranteed to happen here, it's + // guaranteed to happen "promptly" as no more "blocking work" will + // happen while there's a strong refcount held. + } +} + +impl Inner { + /// The enqueue function from the 1024cores intrusive MPSC queue algorithm. + fn enqueue(&self, node: *const Node) { + unsafe { + debug_assert!((*node).queued.load(Relaxed)); + + // This action does not require any coordination + (*node).next_readiness.store(ptr::null_mut(), Relaxed); + + // Note that these atomic orderings come from 1024cores + let node = node as *mut _; + let prev = self.head_readiness.swap(node, AcqRel); + (*prev).next_readiness.store(node, Release); + } + } + + /// The dequeue function from the 1024cores intrusive MPSC queue algorithm + /// + /// Note that this unsafe as it required mutual exclusion (only one thread + /// can call this) to be guaranteed elsewhere. + unsafe fn dequeue(&self) -> Dequeue { + let mut tail = *self.tail_readiness.get(); + let mut next = (*tail).next_readiness.load(Acquire); + + if tail == self.stub() { + if next.is_null() { + return Dequeue::Empty; + } + + *self.tail_readiness.get() = next; + tail = next; + next = (*next).next_readiness.load(Acquire); + } + + if !next.is_null() { + *self.tail_readiness.get() = next; + debug_assert!(tail != self.stub()); + return Dequeue::Data(tail); + } + + if self.head_readiness.load(Acquire) as *const _ != tail { + return Dequeue::Inconsistent; + } + + self.enqueue(self.stub()); + + next = (*tail).next_readiness.load(Acquire); + + if !next.is_null() { + *self.tail_readiness.get() = next; + return Dequeue::Data(tail); + } + + Dequeue::Inconsistent + } + + fn stub(&self) -> *const Node { + &*self.stub + } +} + +impl Drop for Inner { + fn drop(&mut self) { + // Once we're in the destructor for `Inner` we need to clear out the + // mpsc queue of nodes if there's anything left in there. + // + // Note that each node has a strong reference count associated with it + // which is owned by the mpsc queue. All nodes should have had their + // items dropped already by the `Scheduler` destructor above, + // so we're just pulling out nodes and dropping their refcounts. + unsafe { + loop { + match self.dequeue() { + Dequeue::Empty => break, + Dequeue::Inconsistent => abort("inconsistent in drop"), + Dequeue::Data(ptr) => drop(ptr2arc(ptr)), + } + } + } + } +} + +impl List { + fn new() -> Self { + List { + len: 0, + head: ptr::null_mut(), + tail: ptr::null_mut(), + } + } + + /// Prepends an element to the back of the list + fn push_back(&mut self, node: Arc>) -> *const Node { + let ptr = arc2ptr(node); + + unsafe { + // Point to the current last node in the list + *(*ptr).prev_all.get() = self.tail; + *(*ptr).next_all.get() = ptr::null_mut(); + + if !self.tail.is_null() { + *(*self.tail).next_all.get() = ptr; + self.tail = ptr; + } else { + // This is the first node + self.tail = ptr; + self.head = ptr; + } + } + + self.len += 1; + + return ptr + } + + /// Pop an element from the front of the list + fn pop_front(&mut self) -> Option>> { + if self.head.is_null() { + // The list is empty + return None; + } + + self.len -= 1; + + unsafe { + // Convert the ptr to Arc<_> + let node = ptr2arc(self.head); + + // Update the head pointer + self.head = *node.next_all.get(); + + // If the pointer is null, then the list is empty + if self.head.is_null() { + self.tail = ptr::null_mut(); + } else { + *(*self.head).prev_all.get() = ptr::null_mut(); + } + + Some(node) + } + } + + /// Remove a specific node + unsafe fn remove(&mut self, node: *const Node) -> Arc> { + let node = ptr2arc(node); + let next = *node.next_all.get(); + let prev = *node.prev_all.get(); + *node.next_all.get() = ptr::null_mut(); + *node.prev_all.get() = ptr::null_mut(); + + if !next.is_null() { + *(*next).prev_all.get() = prev; + } else { + self.tail = prev; + } + + if !prev.is_null() { + *(*prev).next_all.get() = next; + } else { + self.head = next; + } + + self.len -= 1; + + return node + } +} + +impl<'a, T, W> Clone for Notify<'a, T, W> { + fn clone(&self) -> Self { + Notify(self.0) + } +} + +impl<'a, T: fmt::Debug, W: fmt::Debug> fmt::Debug for Notify<'a, T, W> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Notiy").finish() + } +} + +impl<'a, T, W: Wakeup> From> for NotifyHandle { + fn from(handle: Notify<'a, T, W>) -> NotifyHandle { + unsafe { + let ptr = handle.0.clone(); + let ptr = mem::transmute::>, *mut ArcNode>(ptr); + NotifyHandle::new(hide_lt(ptr)) + } + } +} + +struct ArcNode(PhantomData<(T, W)>); + +// We should never touch `T` on any thread other than the one owning +// `Scheduler`, so this should be a safe operation. +// +// `W` already requires `Sync + Send` +unsafe impl Send for ArcNode {} +unsafe impl Sync for ArcNode {} + +impl executor::Notify for ArcNode { + fn notify(&self, _id: usize) { + unsafe { + let me: *const ArcNode = self; + let me: *const *const ArcNode = &me; + let me = me as *const Arc>; + Node::notify(&*me) + } + } +} + +unsafe impl UnsafeNotify for ArcNode { + unsafe fn clone_raw(&self) -> NotifyHandle { + let me: *const ArcNode = self; + let me: *const *const ArcNode = &me; + let me = &*(me as *const Arc>); + Notify(me).into() + } + + unsafe fn drop_raw(&self) { + let mut me: *const ArcNode = self; + let me = &mut me as *mut *const ArcNode as *mut Arc>; + ptr::drop_in_place(me); + } +} + +unsafe fn hide_lt(p: *mut ArcNode) -> *mut UnsafeNotify { + mem::transmute(p as *mut UnsafeNotify) +} + +impl Node { + fn notify(me: &Arc>) { + let inner = match me.queue.upgrade() { + Some(inner) => inner, + None => return, + }; + + // It's our job to notify the node that it's ready to get polled, + // meaning that we need to enqueue it into the readiness queue. To + // do this we flag that we're ready to be queued, and if successful + // we then do the literal queueing operation, ensuring that we're + // only queued once. + // + // Once the node is inserted we be sure to notify the parent task, + // as it'll want to come along and pick up our node now. + // + // Note that we don't change the reference count of the node here, + // we're just enqueueing the raw pointer. The `Scheduler` + // implementation guarantees that if we set the `queued` flag true that + // there's a reference count held by the main `Scheduler` queue + // still. + let prev = me.queued.swap(true, SeqCst); + if !prev { + inner.enqueue(&**me); + inner.wakeup.wakeup(); + } + } +} + +impl Drop for Node { + fn drop(&mut self) { + // Currently a `Node` is sent across all threads for any lifetime, + // regardless of `T`. This means that for memory safety we can't + // actually touch `T` at any time except when we have a reference to the + // `Scheduler` itself. + // + // Consequently it *should* be the case that we always drop items from + // the `Scheduler` instance, but this is a bomb in place to catch + // any bugs in that logic. + unsafe { + if (*self.item.get()).is_some() { + abort("item still here when dropping"); + } + } + } +} + +fn arc2ptr(ptr: Arc) -> *const T { + let addr = &*ptr as *const T; + mem::forget(ptr); + return addr +} + +unsafe fn ptr2arc(ptr: *const T) -> Arc { + let anchor = mem::transmute::>(0x10); + let addr = &*anchor as *const T; + mem::forget(anchor); + let offset = addr as isize - 0x10; + mem::transmute::>(ptr as isize - offset) +} + +fn abort(s: &str) -> ! { + struct DoublePanic; + + impl Drop for DoublePanic { + fn drop(&mut self) { + panic!("panicking twice to abort the program"); + } + } + + let _bomb = DoublePanic; + panic!("{}", s); +} diff --git a/src/executor/sleep.rs b/src/executor/sleep.rs new file mode 100644 index 00000000000..7058d81a2a8 --- /dev/null +++ b/src/executor/sleep.rs @@ -0,0 +1,169 @@ +use futures::executor::Notify; + +use std::fmt; +use std::sync::{Arc, Mutex, Condvar}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::{Duration, Instant}; + +/// Puts the current thread to sleep. +pub trait Sleep { + /// Wake up handle. + type Wakeup: Wakeup; + + /// Get a new `Wakeup` handle. + fn wakeup(&self) -> Self::Wakeup; + + /// Put the current thread to sleep. + fn sleep(&mut self); + + /// Put the current thread to sleep for at most `duration`. + fn sleep_timeout(&mut self, duration: Duration); +} + +/// Wake up a sleeping thread. +pub trait Wakeup: Clone + Send + 'static { + /// Wake up the sleeping thread. + fn wakeup(&self); +} + +/// Blocks the current thread +pub struct BlockThread { + state: AtomicUsize, + mutex: Mutex<()>, + condvar: Condvar, +} + +const IDLE: usize = 0; +const NOTIFY: usize = 1; +const SLEEP: usize = 2; + +thread_local! { + static CURRENT_THREAD_NOTIFY: Arc = Arc::new(BlockThread { + state: AtomicUsize::new(IDLE), + mutex: Mutex::new(()), + condvar: Condvar::new(), + }); +} + +// ===== impl BlockThread ===== + +impl BlockThread { + pub fn with_current(f: F) -> R + where F: FnOnce(&Arc) -> R, + { + CURRENT_THREAD_NOTIFY.with(|notify| f(notify)) + } + + pub fn park(&self) { + self.park_timeout(None); + } + + pub fn park_timeout(&self, dur: Option) { + // If currently notified, then we skip sleeping. This is checked outside + // of the lock to avoid acquiring a mutex if not necessary. + match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) { + NOTIFY => return, + IDLE => {}, + _ => unreachable!(), + } + + // The state is currently idle, so obtain the lock and then try to + // transition to a sleeping state. + let mut m = self.mutex.lock().unwrap(); + + // Transition to sleeping + match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) { + NOTIFY => { + // Notified before we could sleep, consume the notification and + // exit + self.state.store(IDLE, Ordering::SeqCst); + return; + } + IDLE => {}, + _ => unreachable!(), + } + + // Track (until, remaining) + let mut time = dur.map(|dur| (Instant::now() + dur, dur)); + + loop { + m = match time { + Some((until, rem)) => { + let (guard, _) = self.condvar.wait_timeout(m, rem).unwrap(); + let now = Instant::now(); + + if now >= until { + // Timed out... exit sleep state + self.state.store(IDLE, Ordering::SeqCst); + return; + } + + time = Some((until, until - now)); + guard + } + None => self.condvar.wait(m).unwrap(), + }; + + // Transition back to idle, loop otherwise + if NOTIFY == self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) { + return; + } + } + } + + fn unpark(&self) { + // First, try transitioning from IDLE -> NOTIFY, this does not require a + // lock. + match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) { + IDLE | NOTIFY => return, + SLEEP => {} + _ => unreachable!(), + } + + // The other half is sleeping, this requires a lock + let _m = self.mutex.lock().unwrap(); + + // Transition from SLEEP -> NOTIFY + match self.state.compare_and_swap(SLEEP, NOTIFY, Ordering::SeqCst) { + SLEEP => {} + _ => return, + } + + // Wakeup the sleeper + self.condvar.notify_one(); + } +} + +impl Notify for BlockThread { + fn notify(&self, _unpark_id: usize) { + self.unpark(); + } +} + +impl<'a> Sleep for &'a Arc { + type Wakeup = Arc; + + fn wakeup(&self) -> Self::Wakeup { + (*self).clone() + } + + fn sleep(&mut self) { + self.park(); + } + + fn sleep_timeout(&mut self, duration: Duration) { + self.park_timeout(Some(duration)); + } +} + +impl Wakeup for Arc { + fn wakeup(&self) { + self.unpark(); + } +} + +impl fmt::Debug for BlockThread { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("BlockThread").finish() + } +} diff --git a/src/lib.rs b/src/lib.rs index 414ac277456..7cfe7359cb2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -103,5 +103,6 @@ extern crate tokio_io; #[macro_use] extern crate log; +pub mod executor; pub mod net; pub mod reactor; diff --git a/tests/buffered.rs b/tests/buffered.rs index 60b574f6c3c..2ba16b04255 100644 --- a/tests/buffered.rs +++ b/tests/buffered.rs @@ -8,7 +8,6 @@ use std::thread; use std::io::{Read, Write, BufReader, BufWriter}; use futures::Future; -use futures::future::blocking; use futures::stream::Stream; use tokio_io::io::copy; use tokio::net::TcpListener; @@ -55,7 +54,7 @@ fn echo_server() { copy(a, b) }); - let (amt, _, _) = t!(blocking(copied).wait()); + let (amt, _, _) = t!(copied.wait()); let (expected, t2) = t.join().unwrap(); let actual = t2.join().unwrap(); diff --git a/tests/chain.rs b/tests/chain.rs index cd2712584b8..b9ac481860f 100644 --- a/tests/chain.rs +++ b/tests/chain.rs @@ -7,7 +7,6 @@ use std::thread; use std::io::{Write, Read}; use futures::Future; -use futures::future::blocking; use futures::stream::Stream; use tokio_io::io::read_to_end; use tokio::net::TcpListener; @@ -43,7 +42,7 @@ fn chain_clients() { read_to_end(a.chain(b).chain(c), Vec::new()) }); - let (_, data) = t!(blocking(copied).wait()); + let (_, data) = t!(copied.wait()); t.join().unwrap(); assert_eq!(data, b"foo bar baz"); diff --git a/tests/drop-core.rs b/tests/drop-core.rs index 503e81ef5c0..75ac9b7eb1d 100644 --- a/tests/drop-core.rs +++ b/tests/drop-core.rs @@ -4,7 +4,7 @@ extern crate futures; use std::thread; use std::net; -use futures::{future, stream}; +use futures::future; use futures::prelude::*; use futures::sync::oneshot; use tokio::net::TcpListener; @@ -17,7 +17,7 @@ fn tcp_doesnt_block() { let listener = net::TcpListener::bind("127.0.0.1:0").unwrap(); let listener = TcpListener::from_std(listener, &handle).unwrap(); drop(core); - assert!(stream::blocking(listener.incoming()).next().unwrap().is_err()); + assert!(listener.incoming().wait().next().unwrap().is_err()); } #[test] @@ -34,9 +34,9 @@ fn drop_wakes() { drop(tx); future::ok(()) }); - assert!(future::blocking(new_socket.join(drop_tx)).wait().is_err()); + assert!(new_socket.join(drop_tx).wait().is_err()); }); - drop(future::blocking(rx).wait()); + drop(rx.wait()); drop(core); t.join().unwrap(); } diff --git a/tests/echo.rs b/tests/echo.rs index 778bdeb91e8..d5bdae81182 100644 --- a/tests/echo.rs +++ b/tests/echo.rs @@ -8,7 +8,6 @@ use std::net::TcpStream; use std::thread; use futures::Future; -use futures::future::blocking; use futures::stream::Stream; use tokio::net::TcpListener; use tokio_io::AsyncRead; @@ -45,7 +44,7 @@ fn echo_server() { let halves = client.map(|s| s.split()); let copied = halves.and_then(|(a, b)| copy(a, b)); - let (amt, _, _) = t!(blocking(copied).wait()); + let (amt, _, _) = t!(copied.wait()); t.join().unwrap(); assert_eq!(amt, msg.len() as u64 * 1024); diff --git a/tests/global.rs b/tests/global.rs index 4702fc11670..bf5682fa06b 100644 --- a/tests/global.rs +++ b/tests/global.rs @@ -3,7 +3,6 @@ extern crate tokio; use std::thread; -use futures::future::blocking; use futures::prelude::*; use tokio::net::{TcpStream, TcpListener}; @@ -24,7 +23,7 @@ fn hammer() { let theirs = srv.incoming().into_future() .map(|(s, _)| s.unwrap()) .map_err(|(s, _)| s); - let (mine, theirs) = t!(blocking(mine.join(theirs)).wait()); + let (mine, theirs) = t!(mine.join(theirs).wait()); assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr())); diff --git a/tests/limit.rs b/tests/limit.rs index 053f438506e..7055ce9b671 100644 --- a/tests/limit.rs +++ b/tests/limit.rs @@ -7,7 +7,6 @@ use std::thread; use std::io::{Write, Read}; use futures::Future; -use futures::future::blocking; use futures::stream::Stream; use tokio_io::io::read_to_end; use tokio::net::TcpListener; @@ -37,7 +36,7 @@ fn limit() { read_to_end(a.take(4), Vec::new()) }); - let (_, data) = t!(blocking(copied).wait()); + let (_, data) = t!(copied.wait()); t.join().unwrap(); assert_eq!(data, b"foo "); diff --git a/tests/line-frames.rs b/tests/line-frames.rs index 27f1d19532f..3785dfeffc9 100644 --- a/tests/line-frames.rs +++ b/tests/line-frames.rs @@ -10,7 +10,7 @@ use std::net::Shutdown; use bytes::{BytesMut, BufMut}; use futures::{Future, Stream, Sink}; -use futures::future::{blocking, Executor}; +use futures::future::Executor; use futures_cpupool::CpuPool; use tokio::net::{TcpListener, TcpStream}; use tokio_io::codec::{Encoder, Decoder}; @@ -68,20 +68,20 @@ fn echo() { pool.execute(srv.map_err(|e| panic!("srv error: {}", e))).unwrap(); let client = TcpStream::connect(&addr); - let client = blocking(client).wait().unwrap(); - let (client, _) = blocking(write_all(client, b"a\n")).wait().unwrap(); - let (client, buf, amt) = blocking(read(client, vec![0; 1024])).wait().unwrap(); + let client = client.wait().unwrap(); + let (client, _) = write_all(client, b"a\n").wait().unwrap(); + let (client, buf, amt) = read(client, vec![0; 1024]).wait().unwrap(); assert_eq!(amt, 2); assert_eq!(&buf[..2], b"a\n"); - let (client, _) = blocking(write_all(client, b"\n")).wait().unwrap(); - let (client, buf, amt) = blocking(read(client, buf)).wait().unwrap(); + let (client, _) = write_all(client, b"\n").wait().unwrap(); + let (client, buf, amt) = read(client, buf).wait().unwrap(); assert_eq!(amt, 1); assert_eq!(&buf[..1], b"\n"); - let (client, _) = blocking(write_all(client, b"b")).wait().unwrap(); + let (client, _) = write_all(client, b"b").wait().unwrap(); client.shutdown(Shutdown::Write).unwrap(); - let (_client, buf, amt) = blocking(read(client, buf)).wait().unwrap(); + let (_client, buf, amt) = read(client, buf).wait().unwrap(); assert_eq!(amt, 1); assert_eq!(&buf[..1], b"b"); } diff --git a/tests/pipe-hup.rs b/tests/pipe-hup.rs index c04064873fd..0a09fa21c75 100644 --- a/tests/pipe-hup.rs +++ b/tests/pipe-hup.rs @@ -13,12 +13,12 @@ use std::os::unix::io::{AsRawFd, FromRawFd}; use std::thread; use std::time::Duration; -use futures::future::blocking; use mio::event::Evented; use mio::unix::{UnixReady, EventedFd}; use mio::{PollOpt, Ready, Token}; use tokio::reactor::{Handle, PollEvented}; use tokio_io::io::read_to_end; +use futures::Future; macro_rules! t { ($e:expr) => (match $e { @@ -81,7 +81,7 @@ fn hup() { let source = PollEvented::new(MyFile::new(read), &handle).unwrap(); let reader = read_to_end(source, Vec::new()); - let (_, content) = t!(blocking(reader).wait()); + let (_, content) = t!(reader.wait()); assert_eq!(&b"Hello!\nGood bye!\n"[..], &content[..]); t.join().unwrap(); } diff --git a/tests/stream-buffered.rs b/tests/stream-buffered.rs index 64786902317..78fe1c37249 100644 --- a/tests/stream-buffered.rs +++ b/tests/stream-buffered.rs @@ -8,7 +8,6 @@ use std::net::TcpStream; use std::thread; use futures::Future; -use futures::future::blocking; use futures::stream::Stream; use tokio_io::io::copy; use tokio_io::AsyncRead; @@ -49,7 +48,7 @@ fn echo_server() { .take(2) .collect(); - t!(blocking(future).wait()); + t!(future.wait()); t.join().unwrap(); } diff --git a/tests/tcp.rs b/tests/tcp.rs index a1cefacd8f0..83cc425c142 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -7,7 +7,6 @@ use std::sync::mpsc::channel; use std::thread; use futures::Future; -use futures::future::blocking; use futures::stream::Stream; use tokio::net::{TcpListener, TcpStream}; @@ -28,7 +27,7 @@ fn connect() { }); let stream = TcpStream::connect(&addr); - let mine = t!(blocking(stream).wait()); + let mine = t!(stream.wait()); let theirs = t.join().unwrap(); assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); @@ -51,7 +50,7 @@ fn accept() { net::TcpStream::connect(&addr).unwrap() }); - let (mine, _remaining) = t!(blocking(client).wait()); + let (mine, _remaining) = t!(client.wait()); let mine = mine.unwrap(); let theirs = t.join().unwrap(); @@ -76,7 +75,7 @@ fn accept2() { }).into_future().map_err(|e| e.0); assert!(rx.try_recv().is_err()); - let (mine, _remaining) = t!(blocking(client).wait()); + let (mine, _remaining) = t!(client.wait()); mine.unwrap(); t.join().unwrap(); } diff --git a/tests/udp.rs b/tests/udp.rs index 42b8906d7d1..f0a47d37c62 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -7,7 +7,6 @@ use std::io; use std::net::SocketAddr; use futures::{Future, Poll, Stream, Sink}; -use futures::future::blocking; use tokio::net::{UdpSocket, UdpCodec}; macro_rules! t { @@ -26,7 +25,7 @@ fn send_messages(send: S, recv: R) { { let send = SendMessage::new(a, send.clone(), b_addr, b"1234"); let recv = RecvMessage::new(b, recv.clone(), a_addr, b"1234"); - let (sendt, received) = t!(blocking(send.join(recv)).wait()); + let (sendt, received) = t!(send.join(recv).wait()); a = sendt; b = received; } @@ -34,7 +33,7 @@ fn send_messages(send: S, recv: R) { { let send = SendMessage::new(a, send, b_addr, b""); let recv = RecvMessage::new(b, recv, a_addr, b""); - t!(blocking(send.join(recv)).wait()); + t!(send.join(recv).wait()); } } @@ -173,7 +172,7 @@ fn send_dgrams() { { let send = a.send_dgram(&b"4321"[..], &b_addr); let recv = b.recv_dgram(&mut buf[..]); - let (sendt, received) = t!(blocking(send.join(recv)).wait()); + let (sendt, received) = t!(send.join(recv).wait()); assert_eq!(received.2, 4); assert_eq!(&received.1[..4], b"4321"); a = sendt.0; @@ -183,7 +182,7 @@ fn send_dgrams() { { let send = a.send_dgram(&b""[..], &b_addr); let recv = b.recv_dgram(&mut buf[..]); - let received = t!(blocking(send.join(recv)).wait()).1; + let received = t!(send.join(recv).wait()).1; assert_eq!(received.2, 0); } } @@ -226,7 +225,7 @@ fn send_framed() { let send = a.send(&b"4567"[..]); let recv = b.into_future().map_err(|e| e.0); - let (sendt, received) = t!(blocking(send.join(recv)).wait()); + let (sendt, received) = t!(send.join(recv).wait()); assert_eq!(received.0, Some(())); a_soc = sendt.into_inner(); @@ -239,7 +238,7 @@ fn send_framed() { let send = a.send(&b""[..]); let recv = b.into_future().map_err(|e| e.0); - let received = t!(blocking(send.join(recv)).wait()).1; + let received = t!(send.join(recv).wait()).1; assert_eq!(received.0, Some(())); } }