Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windows: Synchronize asynchronous pipe reads and writes #95467

Merged
merged 3 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions library/std/src/sys/windows/c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,12 @@ impl Default for IO_STATUS_BLOCK {
}
}

pub type LPOVERLAPPED_COMPLETION_ROUTINE = unsafe extern "system" fn(
dwErrorCode: DWORD,
dwNumberOfBytesTransfered: DWORD,
lpOverlapped: *mut OVERLAPPED,
);

#[repr(C)]
#[cfg(not(target_pointer_width = "64"))]
pub struct WSADATA {
Expand Down Expand Up @@ -891,6 +897,7 @@ extern "system" {
pub fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD;
pub fn SwitchToThread() -> BOOL;
pub fn Sleep(dwMilliseconds: DWORD);
pub fn SleepEx(dwMilliseconds: DWORD, bAlertable: BOOL) -> DWORD;
pub fn GetProcessId(handle: HANDLE) -> DWORD;
pub fn CopyFileExW(
lpExistingFileName: LPCWSTR,
Expand Down Expand Up @@ -957,13 +964,27 @@ extern "system" {
lpNumberOfBytesRead: LPDWORD,
lpOverlapped: LPOVERLAPPED,
) -> BOOL;
pub fn ReadFileEx(
hFile: BorrowedHandle<'_>,
lpBuffer: LPVOID,
nNumberOfBytesToRead: DWORD,
lpOverlapped: LPOVERLAPPED,
lpCompletionRoutine: LPOVERLAPPED_COMPLETION_ROUTINE,
) -> BOOL;
pub fn WriteFile(
hFile: BorrowedHandle<'_>,
lpBuffer: LPVOID,
nNumberOfBytesToWrite: DWORD,
lpNumberOfBytesWritten: LPDWORD,
lpOverlapped: LPOVERLAPPED,
) -> BOOL;
pub fn WriteFileEx(
hFile: BorrowedHandle<'_>,
lpBuffer: LPVOID,
nNumberOfBytesToWrite: DWORD,
lpOverlapped: LPOVERLAPPED,
lpCompletionRoutine: LPOVERLAPPED_COMPLETION_ROUTINE,
) -> BOOL;
pub fn CloseHandle(hObject: HANDLE) -> BOOL;
pub fn MoveFileExW(lpExistingFileName: LPCWSTR, lpNewFileName: LPCWSTR, dwFlags: DWORD)
-> BOOL;
Expand Down
121 changes: 119 additions & 2 deletions library/std/src/sys/windows/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ fn random_number() -> usize {
}
}

// Abstracts over `ReadFileEx` and `WriteFileEx`
type AlertableIoFn = unsafe extern "system" fn(
BorrowedHandle<'_>,
c::LPVOID,
c::DWORD,
c::LPOVERLAPPED,
c::LPOVERLAPPED_COMPLETION_ROUTINE,
) -> c::BOOL;

impl AnonPipe {
pub fn handle(&self) -> &Handle {
&self.inner
Expand All @@ -182,7 +191,19 @@ impl AnonPipe {
}

pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
let result = unsafe {
let len = crate::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD;
self.alertable_io_internal(c::ReadFileEx, buf.as_mut_ptr() as _, len)
};

match result {
// The special treatment of BrokenPipe is to deal with Windows
// pipe semantics, which yields this error when *reading* from
// a pipe after the other end has closed; we interpret that as
// EOF on the pipe.
Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(0),
_ => result,
}
}

pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
Expand All @@ -195,7 +216,10 @@ impl AnonPipe {
}

pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
unsafe {
let len = crate::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD;
self.alertable_io_internal(c::WriteFileEx, buf.as_ptr() as _, len)
}
}

pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
Expand All @@ -206,6 +230,99 @@ impl AnonPipe {
pub fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored()
}

/// Synchronizes asynchronous reads or writes using our anonymous pipe.
///
/// This is a wrapper around [`ReadFileEx`] or [`WriteFileEx`] that uses
/// [Asynchronous Procedure Call] (APC) to synchronize reads or writes.
///
/// Note: This should not be used for handles we don't create.
///
/// # Safety
///
/// `buf` must be a pointer to a buffer that's valid for reads or writes
/// up to `len` bytes. The `AlertableIoFn` must be either `ReadFileEx` or `WriteFileEx`
///
/// [`ReadFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfileex
/// [`WriteFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-writefileex
/// [Asynchronous Procedure Call]: https://docs.microsoft.com/en-us/windows/win32/sync/asynchronous-procedure-calls
unsafe fn alertable_io_internal(
&self,
io: AlertableIoFn,
buf: c::LPVOID,
len: c::DWORD,
) -> io::Result<usize> {
// Use "alertable I/O" to synchronize the pipe I/O.
// This has four steps.
//
// STEP 1: Start the asynchronous I/O operation.
// This simply calls either `ReadFileEx` or `WriteFileEx`,
// giving it a pointer to the buffer and callback function.
//
// STEP 2: Enter an alertable state.
// The callback set in step 1 will not be called until the thread
// enters an "alertable" state. This can be done using `SleepEx`.
//
// STEP 3: The callback
// Once the I/O is complete and the thread is in an alertable state,
// the callback will be run on the same thread as the call to
// `ReadFileEx` or `WriteFileEx` done in step 1.
// In the callback we simply set the result of the async operation.
//
// STEP 4: Return the result.
// At this point we'll have a result from the callback function
// and can simply return it. Note that we must not return earlier,
// while the I/O is still in progress.

// The result that will be set from the asynchronous callback.
let mut async_result: Option<AsyncResult> = None;
struct AsyncResult {
error: u32,
transfered: u32,
}

// STEP 3: The callback.
unsafe extern "system" fn callback(
dwErrorCode: u32,
dwNumberOfBytesTransfered: u32,
lpOverlapped: *mut c::OVERLAPPED,
) {
// Set `async_result` using a pointer smuggled through `hEvent`.
let result = AsyncResult { error: dwErrorCode, transfered: dwNumberOfBytesTransfered };
*(*lpOverlapped).hEvent.cast::<Option<AsyncResult>>() = Some(result);
}

// STEP 1: Start the I/O operation.
let mut overlapped: c::OVERLAPPED = crate::mem::zeroed();
// `hEvent` is unused by `ReadFileEx` and `WriteFileEx`.
// Therefore the documentation suggests using it to smuggle a pointer to the callback.
overlapped.hEvent = &mut async_result as *mut _ as *mut _;

// Asynchronous read of the pipe.
// If successful, `callback` will be called once it completes.
let result = io(self.inner.as_handle(), buf, len, &mut overlapped, callback);
if result == c::FALSE {
// We can return here because the call failed.
// After this we must not return until the I/O completes.
return Err(io::Error::last_os_error());
}

// Wait indefinitely for the result.
let result = loop {
// STEP 2: Enter an alertable state.
// The second parameter of `SleepEx` is used to make this sleep alertable.
c::SleepEx(c::INFINITE, c::TRUE);
if let Some(result) = async_result {
break result;
}
};
// STEP 4: Return the result.
// `async_result` is always `Some` at this point
match result.error {
c::ERROR_SUCCESS => Ok(result.transfered as usize),
error => Err(io::Error::from_raw_os_error(error as _)),
}
}
}

pub fn read2(p1: AnonPipe, v1: &mut Vec<u8>, p2: AnonPipe, v2: &mut Vec<u8>) -> io::Result<()> {
Expand Down