Skip to content

Commit

Permalink
auto merge of #13751 : alexcrichton/rust/io-close-read, r=brson
Browse files Browse the repository at this point in the history
Two new methods were added to TcpStream and UnixStream:

    fn close_read(&mut self) -> IoResult<()>;
    fn close_write(&mut self) -> IoResult<()>;

These two methods map to shutdown()'s behavior (the system call on unix),
closing the reading or writing half of a duplex stream. These methods are
primarily added to allow waking up a pending read in another task. By closing
the reading half of a connection, all pending readers will be woken up and will
return with EndOfFile. The close_write() method was added for symmetry with
close_read(), and I imagine that it will be quite useful at some point.

Implementation-wise, librustuv got the short end of the stick this time. The
native versions just delegate to the shutdown() syscall (easy). The uv versions
can leverage uv_shutdown() for tcp/unix streams, but only for closing the
writing half. Closing the reading half is done through some careful dancing to
wake up a pending reader.

As usual, windows likes to be different from unix. The windows implementation
uses shutdown() for sockets, but shutdown() is not available for named pipes.
Instead, CancelIoEx was used with same fancy synchronization to make sure
everyone knows what's up.

cc #11165
  • Loading branch information
bors committed May 8, 2014
2 parents c217a84 + ec9ade9 commit ab22d99
Show file tree
Hide file tree
Showing 14 changed files with 534 additions and 98 deletions.
6 changes: 5 additions & 1 deletion src/liblibc/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub use consts::os::bsd44::{SOL_SOCKET, SO_KEEPALIVE, SO_ERROR};
pub use consts::os::bsd44::{SO_REUSEADDR, SO_BROADCAST, SHUT_WR, IP_MULTICAST_LOOP};
pub use consts::os::bsd44::{IP_ADD_MEMBERSHIP, IP_DROP_MEMBERSHIP};
pub use consts::os::bsd44::{IPV6_ADD_MEMBERSHIP, IPV6_DROP_MEMBERSHIP};
pub use consts::os::bsd44::{IP_MULTICAST_TTL, IP_TTL};
pub use consts::os::bsd44::{IP_MULTICAST_TTL, IP_TTL, SHUT_RD};

pub use funcs::c95::ctype::{isalnum, isalpha, iscntrl, isdigit};
pub use funcs::c95::ctype::{islower, isprint, ispunct, isspace};
Expand Down Expand Up @@ -226,6 +226,8 @@ pub use funcs::bsd43::{shutdown};
#[cfg(windows)] pub use consts::os::extra::{FILE_WRITE_ATTRIBUTES, FILE_READ_ATTRIBUTES};
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_BUSY, ERROR_IO_PENDING};
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED, WAIT_OBJECT_0};
#[cfg(windows)] pub use consts::os::extra::{ERROR_NOT_FOUND};
#[cfg(windows)] pub use consts::os::extra::{ERROR_OPERATION_ABORTED};
#[cfg(windows)] pub use types::os::common::bsd44::{SOCKET};
#[cfg(windows)] pub use types::os::common::posix01::{stat, utimbuf};
#[cfg(windows)] pub use types::os::arch::extra::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES};
Expand Down Expand Up @@ -1740,8 +1742,10 @@ pub mod consts {
pub static ERROR_NO_DATA: c_int = 232;
pub static ERROR_INVALID_ADDRESS : c_int = 487;
pub static ERROR_PIPE_CONNECTED: c_int = 535;
pub static ERROR_OPERATION_ABORTED: c_int = 995;
pub static ERROR_IO_PENDING: c_int = 997;
pub static ERROR_FILE_INVALID : c_int = 1006;
pub static ERROR_NOT_FOUND: c_int = 1168;
pub static INVALID_HANDLE_VALUE : c_int = -1;

pub static DELETE : DWORD = 0x00010000;
Expand Down
2 changes: 2 additions & 0 deletions src/libnative/io/c_win32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,6 @@ extern "system" {
optlen: *mut libc::c_int) -> libc::c_int;

pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL;
pub fn CancelIoEx(hFile: libc::HANDLE,
lpOverlapped: libc::LPOVERLAPPED) -> libc::BOOL;
}
13 changes: 12 additions & 1 deletion src/libnative/io/file_unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@

use libc::{c_int, c_void};
use libc;
use std::sync::arc::UnsafeArc;
use std::c_str::CString;
use std::io::IoError;
use std::io;
use std::mem;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;

use io::{IoResult, retry, keep_going};

Expand Down Expand Up @@ -178,6 +178,17 @@ impl rtio::RtioPipe for FileDesc {
fn clone(&self) -> Box<rtio::RtioPipe:Send> {
box FileDesc { inner: self.inner.clone() } as Box<rtio::RtioPipe:Send>
}

// Only supported on named pipes currently. Note that this doesn't have an
// impact on the std::io primitives, this is never called via
// std::io::PipeStream. If the functionality is exposed in the future, then
// these methods will need to be implemented.
fn close_read(&mut self) -> Result<(), IoError> {
Err(io::standard_error(io::InvalidInput))
}
fn close_write(&mut self) -> Result<(), IoError> {
Err(io::standard_error(io::InvalidInput))
}
}

impl rtio::RtioTTY for FileDesc {
Expand Down
11 changes: 11 additions & 0 deletions src/libnative/io/file_win32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,17 @@ impl rtio::RtioPipe for FileDesc {
fn clone(&self) -> Box<rtio::RtioPipe:Send> {
box FileDesc { inner: self.inner.clone() } as Box<rtio::RtioPipe:Send>
}

// Only supported on named pipes currently. Note that this doesn't have an
// impact on the std::io primitives, this is never called via
// std::io::PipeStream. If the functionality is exposed in the future, then
// these methods will need to be implemented.
fn close_read(&mut self) -> IoResult<()> {
Err(io::standard_error(io::InvalidInput))
}
fn close_write(&mut self) -> IoResult<()> {
Err(io::standard_error(io::InvalidInput))
}
}

impl rtio::RtioTTY for FileDesc {
Expand Down
7 changes: 4 additions & 3 deletions src/libnative/io/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,10 @@ impl rtio::RtioTcpStream for TcpStream {
} as Box<rtio::RtioTcpStream:Send>
}
fn close_write(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe {
libc::shutdown(self.fd(), libc::SHUT_WR)
})
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
}
fn close_read(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/libnative/io/pipe_unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ impl rtio::RtioPipe for UnixStream {
inner: self.inner.clone(),
} as Box<rtio::RtioPipe:Send>
}

fn close_write(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
}
fn close_read(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
175 changes: 145 additions & 30 deletions src/libnative/io/pipe_win32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,17 @@
//! the test suite passing (the suite is in libstd), and that's good enough for
//! me!

use std::c_str::CString;
use libc;
use std::c_str::CString;
use std::intrinsics;
use std::io;
use std::os::win32::as_utf16_p;
use std::os;
use std::ptr;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
use std::intrinsics;
use std::sync::atomics;
use std::unstable::mutex;

use super::IoResult;
use super::c;
Expand Down Expand Up @@ -124,6 +128,20 @@ impl Drop for Event {

struct Inner {
handle: libc::HANDLE,
lock: mutex::NativeMutex,
read_closed: atomics::AtomicBool,
write_closed: atomics::AtomicBool,
}

impl Inner {
fn new(handle: libc::HANDLE) -> Inner {
Inner {
handle: handle,
lock: unsafe { mutex::NativeMutex::new() },
read_closed: atomics::AtomicBool::new(false),
write_closed: atomics::AtomicBool::new(false),
}
}
}

impl Drop for Inner {
Expand Down Expand Up @@ -218,7 +236,7 @@ impl UnixStream {
loop {
match UnixStream::try_connect(p) {
Some(handle) => {
let inner = Inner { handle: handle };
let inner = Inner::new(handle);
let mut mode = libc::PIPE_TYPE_BYTE |
libc::PIPE_READMODE_BYTE |
libc::PIPE_WAIT;
Expand Down Expand Up @@ -275,6 +293,24 @@ impl UnixStream {
}

fn handle(&self) -> libc::HANDLE { unsafe { (*self.inner.get()).handle } }

fn read_closed(&self) -> bool {
unsafe { (*self.inner.get()).read_closed.load(atomics::SeqCst) }
}

fn write_closed(&self) -> bool {
unsafe { (*self.inner.get()).write_closed.load(atomics::SeqCst) }
}

fn cancel_io(&self) -> IoResult<()> {
match unsafe { c::CancelIoEx(self.handle(), ptr::mut_null()) } {
0 if os::errno() == libc::ERROR_NOT_FOUND as uint => {
Ok(())
}
0 => Err(super::last_error()),
_ => Ok(())
}
}
}

impl rtio::RtioPipe for UnixStream {
Expand All @@ -287,31 +323,60 @@ impl rtio::RtioPipe for UnixStream {
let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() };
overlapped.hEvent = self.read.get_ref().handle();

// Pre-flight check to see if the reading half has been closed. This
// must be done before issuing the ReadFile request, but after we
// acquire the lock.
//
// See comments in close_read() about why this lock is necessary.
let guard = unsafe { (*self.inner.get()).lock.lock() };
if self.read_closed() {
return Err(io::standard_error(io::EndOfFile))
}

// Issue a nonblocking requests, succeeding quickly if it happened to
// succeed.
let ret = unsafe {
libc::ReadFile(self.handle(),
buf.as_ptr() as libc::LPVOID,
buf.len() as libc::DWORD,
&mut bytes_read,
&mut overlapped)
};
if ret == 0 {
let err = unsafe { libc::GetLastError() };
if err == libc::ERROR_IO_PENDING as libc::DWORD {
let ret = unsafe {
libc::GetOverlappedResult(self.handle(),
&mut overlapped,
&mut bytes_read,
libc::TRUE)
};
if ret == 0 {
return Err(super::last_error())
}
} else {
if ret != 0 { return Ok(bytes_read as uint) }

// If our errno doesn't say that the I/O is pending, then we hit some
// legitimate error and reeturn immediately.
if os::errno() != libc::ERROR_IO_PENDING as uint {
return Err(super::last_error())
}

// Now that we've issued a successful nonblocking request, we need to
// wait for it to finish. This can all be done outside the lock because
// we'll see any invocation of CancelIoEx. We also call this in a loop
// because we're woken up if the writing half is closed, we just need to
// realize that the reading half wasn't closed and we go right back to
// sleep.
drop(guard);
loop {
let ret = unsafe {
libc::GetOverlappedResult(self.handle(),
&mut overlapped,
&mut bytes_read,
libc::TRUE)
};
// If we succeeded, or we failed for some reason other than
// CancelIoEx, return immediately
if ret != 0 { return Ok(bytes_read as uint) }
if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
return Err(super::last_error())
}
}

Ok(bytes_read as uint)
// If the reading half is now closed, then we're done. If we woke up
// because the writing half was closed, keep trying.
if self.read_closed() {
return Err(io::standard_error(io::EndOfFile))
}
}
}

fn write(&mut self, buf: &[u8]) -> IoResult<()> {
Expand All @@ -325,27 +390,47 @@ impl rtio::RtioPipe for UnixStream {

while offset < buf.len() {
let mut bytes_written = 0;

// This sequence below is quite similar to the one found in read().
// Some careful looping is done to ensure that if close_write() is
// invoked we bail out early, and if close_read() is invoked we keep
// going after we woke up.
//
// See comments in close_read() about why this lock is necessary.
let guard = unsafe { (*self.inner.get()).lock.lock() };
if self.write_closed() {
return Err(io::standard_error(io::BrokenPipe))
}
let ret = unsafe {
libc::WriteFile(self.handle(),
buf.slice_from(offset).as_ptr() as libc::LPVOID,
(buf.len() - offset) as libc::DWORD,
&mut bytes_written,
&mut overlapped)
};
drop(guard);

if ret == 0 {
let err = unsafe { libc::GetLastError() };
if err == libc::ERROR_IO_PENDING as libc::DWORD {
let ret = unsafe {
libc::GetOverlappedResult(self.handle(),
&mut overlapped,
&mut bytes_written,
libc::TRUE)
};
if ret == 0 {
if os::errno() != libc::ERROR_IO_PENDING as uint {
return Err(super::last_error())
}
let ret = unsafe {
libc::GetOverlappedResult(self.handle(),
&mut overlapped,
&mut bytes_written,
libc::TRUE)
};
// If we weren't aborted, this was a legit error, if we were
// aborted, then check to see if the write half was actually
// closed or whether we woke up from the read half closing.
if ret == 0 {
if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
return Err(super::last_error())
}
} else {
return Err(super::last_error())
if self.write_closed() {
return Err(io::standard_error(io::BrokenPipe))
}
continue; // retry
}
}
offset += bytes_written as uint;
Expand All @@ -360,6 +445,36 @@ impl rtio::RtioPipe for UnixStream {
write: None,
} as Box<rtio::RtioPipe:Send>
}

fn close_read(&mut self) -> IoResult<()> {
// On windows, there's no actual shutdown() method for pipes, so we're
// forced to emulate the behavior manually at the application level. To
// do this, we need to both cancel any pending requests, as well as
// prevent all future requests from succeeding. These two operations are
// not atomic with respect to one another, so we must use a lock to do
// so.
//
// The read() code looks like:
//
// 1. Make sure the pipe is still open
// 2. Submit a read request
// 3. Wait for the read request to finish
//
// The race this lock is preventing is if another thread invokes
// close_read() between steps 1 and 2. By atomically executing steps 1
// and 2 with a lock with respect to close_read(), we're guaranteed that
// no thread will erroneously sit in a read forever.
let _guard = unsafe { (*self.inner.get()).lock.lock() };
unsafe { (*self.inner.get()).read_closed.store(true, atomics::SeqCst) }
self.cancel_io()
}

fn close_write(&mut self) -> IoResult<()> {
// see comments in close_read() for why this lock is necessary
let _guard = unsafe { (*self.inner.get()).lock.lock() };
unsafe { (*self.inner.get()).write_closed.store(true, atomics::SeqCst) }
self.cancel_io()
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -520,7 +635,7 @@ impl UnixAcceptor {

// Transfer ownership of our handle into this stream
Ok(UnixStream {
inner: UnsafeArc::new(Inner { handle: handle }),
inner: UnsafeArc::new(Inner::new(handle)),
read: None,
write: None,
})
Expand Down
Loading

0 comments on commit ab22d99

Please sign in to comment.