Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into macro-diagnostics
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed May 8, 2021
2 parents 2c36849 + 05c3cf3 commit bca8723
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 19 deletions.
6 changes: 3 additions & 3 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ edition = "2018"
# If you copy one of the examples into a new project, you should be using
# [dependencies] instead.
[dev-dependencies]
tokio = { version = "1.0.0", features = ["full", "tracing"] }
tokio-util = { version = "0.6.3", features = ["full"] }
tokio-stream = { version = "0.1" }
tokio = { version = "1.0.0", path = "../tokio",features = ["full", "tracing"] }
tokio-util = { version = "0.6.3", path = "../tokio-util",features = ["full"] }
tokio-stream = { version = "0.1", path = "../tokio-stream" }

tracing = "0.1"
tracing-subscriber = { version = "0.2.7", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log"] }
Expand Down
2 changes: 1 addition & 1 deletion tokio-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ signal = ["tokio/signal"]
futures-core = { version = "0.3.0" }
pin-project-lite = "0.2.0"
tokio = { version = "1.2.0", path = "../tokio", features = ["sync"] }
tokio-util = { version = "0.6.3", optional = true }
tokio-util = { version = "0.6.3", path = "../tokio-util", optional = true }

[dev-dependencies]
tokio = { version = "1.2.0", path = "../tokio", features = ["full", "test-util"] }
Expand Down
8 changes: 4 additions & 4 deletions tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ rt = ["tokio/rt"]
__docs_rs = ["futures-util"]

[dependencies]
tokio = { version = "1.0.0", features = ["sync"] }
tokio = { version = "1.0.0", path = "../tokio", features = ["sync"] }

bytes = "1.0.0"
futures-core = "0.3.0"
Expand All @@ -47,9 +47,9 @@ pin-project-lite = "0.2.0"
slab = { version = "0.4.1", optional = true } # Backs `DelayQueue`

[dev-dependencies]
tokio = { version = "1.0.0", features = ["full"] }
tokio-test = { version = "0.4.0" }
tokio-stream = { version = "0.1" }
tokio = { version = "1.0.0", path = "../tokio", features = ["full"] }
tokio-test = { version = "0.4.0", path = "../tokio-test" }
tokio-stream = { version = "0.1", path = "../tokio-stream" }

async-stream = "0.3.0"
futures = "0.3.0"
Expand Down
14 changes: 14 additions & 0 deletions tokio-util/src/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,17 @@ where
tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
}
}

#[cfg(unix)]
impl<T: std::os::unix::io::AsRawFd> std::os::unix::io::AsRawFd for Compat<T> {
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
self.inner.as_raw_fd()
}
}

#[cfg(windows)]
impl<T: std::os::windows::io::AsRawHandle> std::os::windows::io::AsRawHandle for Compat<T> {
fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
self.inner.as_raw_handle()
}
}
24 changes: 21 additions & 3 deletions tokio/src/io/util/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ use std::{
/// that can be used as in-memory IO types. Writing to one of the pairs will
/// allow that data to be read from the other, and vice versa.
///
/// # Closing a `DuplexStream`
///
/// If one end of the `DuplexStream` channel is dropped, any pending reads on
/// the other side will continue to read data until the buffer is drained, then
/// they will signal EOF by returning 0 bytes. Any writes to the other side,
/// including pending ones (that are waiting for free space in the buffer) will
/// return `Err(BrokenPipe)` immediately.
///
/// # Example
///
/// ```
Expand Down Expand Up @@ -134,7 +142,8 @@ impl AsyncWrite for DuplexStream {
impl Drop for DuplexStream {
fn drop(&mut self) {
// notify the other side of the closure
self.write.lock().close();
self.write.lock().close_write();
self.read.lock().close_read();
}
}

Expand All @@ -151,12 +160,21 @@ impl Pipe {
}
}

fn close(&mut self) {
fn close_write(&mut self) {
self.is_closed = true;
// needs to notify any readers that no more data will come
if let Some(waker) = self.read_waker.take() {
waker.wake();
}
}

fn close_read(&mut self) {
self.is_closed = true;
// needs to notify any writers that they have to abort
if let Some(waker) = self.write_waker.take() {
waker.wake();
}
}
}

impl AsyncRead for Pipe {
Expand Down Expand Up @@ -217,7 +235,7 @@ impl AsyncWrite for Pipe {
mut self: Pin<&mut Self>,
_: &mut task::Context<'_>,
) -> Poll<std::io::Result<()>> {
self.close();
self.close_write();
Poll::Ready(Ok(()))
}
}
29 changes: 24 additions & 5 deletions tokio/tests/io_mem_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,25 @@ async fn disconnect() {
t2.await.unwrap();
}

#[tokio::test]
async fn disconnect_reader() {
let (a, mut b) = duplex(2);

let t1 = tokio::spawn(async move {
// this will block, as not all data fits into duplex
b.write_all(b"ping").await.unwrap_err();
});

let t2 = tokio::spawn(async move {
// here we drop the reader side, and we expect the writer in the other
// task to exit with an error
drop(a);
});

t2.await.unwrap();
t1.await.unwrap();
}

#[tokio::test]
async fn max_write_size() {
let (mut a, mut b) = duplex(32);
Expand All @@ -73,11 +92,11 @@ async fn max_write_size() {
assert_eq!(n, 4);
});

let t2 = tokio::spawn(async move {
let mut buf = [0u8; 4];
b.read_exact(&mut buf).await.unwrap();
});
let mut buf = [0u8; 4];
b.read_exact(&mut buf).await.unwrap();

t1.await.unwrap();
t2.await.unwrap();

// drop b only after task t1 finishes writing
drop(b);
}
3 changes: 0 additions & 3 deletions tokio/tests/macros_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,6 @@ async fn join_with_select() {
async fn use_future_in_if_condition() {
use tokio::time::{self, Duration};

let sleep = time::sleep(Duration::from_millis(50));
tokio::pin!(sleep);

tokio::select! {
_ = time::sleep(Duration::from_millis(50)), if false => {
panic!("if condition ignored")
Expand Down

0 comments on commit bca8723

Please sign in to comment.