Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: fix *_closed false positives #5231

Merged
merged 11 commits into from
Dec 5, 2022
12 changes: 12 additions & 0 deletions tokio/src/net/tcp/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ impl ReadHalf<'_> {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// This function is equivalent to [`TcpStream::ready`].
///
/// # Cancel safety
Expand Down Expand Up @@ -273,6 +279,12 @@ impl WriteHalf<'_> {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// This function is equivalent to [`TcpStream::ready`].
///
/// # Cancel safety
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/net/tcp/split_owned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ impl OwnedReadHalf {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// This function is equivalent to [`TcpStream::ready`].
///
/// # Cancel safety
Expand Down Expand Up @@ -355,6 +361,12 @@ impl OwnedWriteHalf {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// This function is equivalent to [`TcpStream::ready`].
///
/// # Cancel safety
Expand Down
6 changes: 6 additions & 0 deletions tokio/src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,12 @@ impl TcpStream {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
Expand Down
4 changes: 3 additions & 1 deletion tokio/src/net/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,9 @@ impl UdpSocket {
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`.
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// # Cancel safety
///
Expand Down
4 changes: 3 additions & 1 deletion tokio/src/net/unix/datagram/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ impl UnixDatagram {
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`.
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// # Cancel safety
///
Expand Down
6 changes: 6 additions & 0 deletions tokio/src/net/unix/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ impl WriteHalf<'_> {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/net/unix/split_owned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ impl OwnedReadHalf {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
Expand Down Expand Up @@ -265,6 +271,12 @@ impl OwnedWriteHalf {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
Expand Down
6 changes: 6 additions & 0 deletions tokio/src/net/unix/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ impl UnixStream {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/net/windows/named_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,12 @@ impl NamedPipeServer {
/// can be used to concurrently read / write to the same pipe on a single
/// task without splitting the pipe.
///
/// The function may complete without the pipe being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// # Examples
///
/// Concurrently read and write to the pipe on the same task without
Expand Down Expand Up @@ -989,6 +995,12 @@ impl NamedPipeClient {
/// can be used to concurrently read / write to the same pipe on a single
/// task without splitting the pipe.
///
/// The function may complete without the pipe being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// # Examples
///
/// Concurrently read and write to the pipe on the same task without
Expand Down
16 changes: 13 additions & 3 deletions tokio/src/runtime/io/scheduled_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,14 +510,24 @@ cfg_io_readiness! {
drop(waiters);
}
State::Done => {
let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8;

// Safety: State::Done means it is no longer shared
let w = unsafe { &mut *waiter.get() };

let curr = scheduled_io.readiness.load(Acquire);

// The returned tick might be newer than the event
// which notified our waker. This is ok because the future
// still didn't return `Poll::Ready`.
let tick = TICK.unpack(curr) as u8;

// The readiness state could have been cleared in the meantime,
// but we allow the returned ready set to be empty.
let curr_ready = Ready::from_usize(READINESS.unpack(curr));
let ready = curr_ready.intersection(w.interest);

return Poll::Ready(ReadyEvent {
tick,
ready: Ready::from_interest(w.interest),
ready,
});
}
}
Expand Down
49 changes: 45 additions & 4 deletions tokio/tests/tcp_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,30 +254,34 @@ async fn create_pair() -> (TcpStream, TcpStream) {
(client, server)
}

fn read_until_pending(stream: &mut TcpStream) {
fn read_until_pending(stream: &mut TcpStream) -> usize {
let mut buf = vec![0u8; 1024 * 1024];
let mut total = 0;
loop {
match stream.try_read(&mut buf) {
Ok(_) => (),
Ok(n) => total += n,
Err(err) => {
assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
break;
}
}
}
total
}

fn write_until_pending(stream: &mut TcpStream) {
fn write_until_pending(stream: &mut TcpStream) -> usize {
let buf = vec![0u8; 1024 * 1024];
let mut total = 0;
loop {
match stream.try_write(&buf) {
Ok(_) => (),
Ok(n) => total += n,
Err(err) => {
assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
break;
}
}
}
total
}

#[tokio::test]
Expand Down Expand Up @@ -357,3 +361,40 @@ async fn try_read_buf() {
}
}
}

// read_closed is a best effort event, so test only for no false positives.
#[tokio::test]
async fn read_closed() {
let (client, mut server) = create_pair().await;

let mut ready_fut = task::spawn(client.ready(Interest::READABLE));
assert_pending!(ready_fut.poll());

assert_ok!(server.write_all(b"ping").await);

let ready_event = assert_ok!(ready_fut.await);

assert!(!ready_event.is_read_closed());
}

// write_closed is a best effort event, so test only for no false positives.
#[tokio::test]
async fn write_closed() {
let (mut client, mut server) = create_pair().await;

// Fill the write buffer.
let write_size = write_until_pending(&mut client);
let mut ready_fut = task::spawn(client.ready(Interest::WRITABLE));
assert_pending!(ready_fut.poll());

// Drain the socket to make client writable.
let mut read_size = 0;
while read_size < write_size {
server.readable().await.unwrap();
read_size += read_until_pending(&mut server);
}

let ready_event = assert_ok!(ready_fut.await);

assert!(!ready_event.is_write_closed());
}