Skip to content

Commit

Permalink
feat(process,iocp): remove APC
Browse files Browse the repository at this point in the history
  • Loading branch information
Berrysoft committed Apr 9, 2024
1 parent 4a359eb commit 9a2a434
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 164 deletions.
22 changes: 7 additions & 15 deletions compio-driver/src/iocp/cp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ use compio_buf::arrayvec::ArrayVec;
use compio_log::*;
use windows_sys::Win32::{
Foundation::{
GetLastError, RtlNtStatusToDosError, ERROR_BAD_COMMAND, ERROR_BROKEN_PIPE,
ERROR_HANDLE_EOF, ERROR_IO_INCOMPLETE, ERROR_NO_DATA, ERROR_PIPE_CONNECTED,
ERROR_PIPE_NOT_CONNECTED, FACILITY_NTWIN32, INVALID_HANDLE_VALUE, NTSTATUS, STATUS_PENDING,
STATUS_SUCCESS, WAIT_IO_COMPLETION,
RtlNtStatusToDosError, ERROR_BAD_COMMAND, ERROR_BROKEN_PIPE, ERROR_HANDLE_EOF,
ERROR_IO_INCOMPLETE, ERROR_NO_DATA, ERROR_PIPE_CONNECTED, ERROR_PIPE_NOT_CONNECTED,
FACILITY_NTWIN32, INVALID_HANDLE_VALUE, NTSTATUS, STATUS_PENDING, STATUS_SUCCESS,
},
Storage::FileSystem::SetFileCompletionNotificationModes,
System::{
Expand Down Expand Up @@ -118,24 +117,17 @@ impl CompletionPort {
Some(timeout) => timeout.as_millis() as u32,
None => INFINITE,
};
let res = unsafe {
syscall!(
BOOL,
GetQueuedCompletionStatusEx(
self.port.as_raw_handle() as _,
entries.as_mut_ptr(),
DEFAULT_CAPACITY as _,
&mut recv_count,
timeout,
1,
0
)
};
if res == 0 {
let err = unsafe { GetLastError() };
if err == WAIT_IO_COMPLETION {
return Err(io::ErrorKind::Interrupted.into());
} else {
return Err(io::Error::from_raw_os_error(err as _));
}
}
)?;
trace!("recv_count: {recv_count}");
unsafe { entries.set_len(recv_count as _) };

Expand Down
53 changes: 41 additions & 12 deletions compio-driver/src/iocp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ use std::{
mem::ManuallyDrop,
os::{
raw::c_void,
windows::prelude::{
AsRawHandle, AsRawSocket, FromRawHandle, FromRawSocket, IntoRawHandle, IntoRawSocket,
RawHandle,
windows::{
io::OwnedHandle,
prelude::{
AsRawHandle, AsRawSocket, FromRawHandle, FromRawSocket, IntoRawHandle,
IntoRawSocket, RawHandle,
},
},
},
pin::Pin,
Expand Down Expand Up @@ -71,12 +74,26 @@ pub trait IntoRawFd {
fn into_raw_fd(self) -> RawFd;
}

impl AsRawFd for std::fs::File {
fn as_raw_fd(&self) -> RawFd {
self.as_raw_handle()
}
macro_rules! impl_raw_fd_for_raw_handle {
($t:ty) => {
impl AsRawFd for $t {
fn as_raw_fd(&self) -> RawFd {
self.as_raw_handle()
}
}
impl IntoRawFd for $t {
fn into_raw_fd(self) -> RawFd {
self.into_raw_handle()
}
}
};
}

impl_raw_fd_for_raw_handle!(std::fs::File);
impl_raw_fd_for_raw_handle!(std::process::ChildStdin);
impl_raw_fd_for_raw_handle!(std::process::ChildStdout);
impl_raw_fd_for_raw_handle!(std::process::ChildStderr);

impl AsRawFd for socket2::Socket {
fn as_raw_fd(&self) -> RawFd {
self.as_raw_socket() as _
Expand All @@ -89,15 +106,27 @@ impl FromRawFd for std::fs::File {
}
}

impl FromRawFd for socket2::Socket {
impl FromRawFd for std::process::ChildStdin {
unsafe fn from_raw_fd(fd: RawFd) -> Self {
Self::from_raw_socket(fd as _)
Self::from(OwnedHandle::from_raw_handle(fd))
}
}

impl IntoRawFd for std::fs::File {
fn into_raw_fd(self) -> RawFd {
self.into_raw_handle()
impl FromRawFd for std::process::ChildStdout {
unsafe fn from_raw_fd(fd: RawFd) -> Self {
Self::from(OwnedHandle::from_raw_handle(fd))
}
}

impl FromRawFd for std::process::ChildStderr {
unsafe fn from_raw_fd(fd: RawFd) -> Self {
Self::from(OwnedHandle::from_raw_handle(fd))
}
}

impl FromRawFd for socket2::Socket {
unsafe fn from_raw_fd(fd: RawFd) -> Self {
Self::from_raw_socket(fd as _)
}
}

Expand Down
51 changes: 41 additions & 10 deletions compio-process/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use std::os::unix::process::CommandExt;
use std::os::windows::process::CommandExt;
use std::{ffi::OsStr, io, path::Path, process};

use compio_buf::BufResult;
use compio_buf::{BufResult, IntoInner};
use compio_io::AsyncReadExt;
use compio_runtime::Attacher;
use futures_util::future::Either;

/// A process builder, providing fine-grained control
Expand Down Expand Up @@ -216,9 +217,21 @@ impl Command {
self.0.create_pidfd(true);
}
let mut child = self.0.spawn()?;
let stdin = child.stdin.take().map(ChildStdin);
let stdout = child.stdout.take().map(ChildStdout);
let stderr = child.stderr.take().map(ChildStderr);
let stdin = if let Some(stdin) = child.stdin.take() {
Some(ChildStdin::new(stdin)?)
} else {
None
};
let stdout = if let Some(stdout) = child.stdout.take() {
Some(ChildStdout::new(stdout)?)
} else {
None
};
let stderr = if let Some(stderr) = child.stderr.take() {
Some(ChildStderr::new(stderr)?)
} else {
None
};
Ok(Child {
child,
stdin,
Expand Down Expand Up @@ -388,29 +401,47 @@ impl Child {

/// A handle to a child process's standard output (stdout). See
/// [`std::process::ChildStdout`].
pub struct ChildStdout(process::ChildStdout);
pub struct ChildStdout(Attacher<process::ChildStdout>);

impl ChildStdout {
fn new(stdout: process::ChildStdout) -> io::Result<Self> {
Attacher::new(stdout).map(Self)
}
}

impl From<ChildStdout> for process::Stdio {
fn from(value: ChildStdout) -> Self {
Self::from(value.0)
Self::from(value.0.into_inner())
}
}

/// A handle to a child process's stderr. See [`std::process::ChildStderr`].
pub struct ChildStderr(process::ChildStderr);
pub struct ChildStderr(Attacher<process::ChildStderr>);

impl ChildStderr {
fn new(stderr: process::ChildStderr) -> io::Result<Self> {
Attacher::new(stderr).map(Self)
}
}

impl From<ChildStderr> for process::Stdio {
fn from(value: ChildStderr) -> Self {
Self::from(value.0)
Self::from(value.0.into_inner())
}
}

/// A handle to a child process's standard input (stdin). See
/// [`std::process::ChildStdin`].
pub struct ChildStdin(process::ChildStdin);
pub struct ChildStdin(Attacher<process::ChildStdin>);

impl ChildStdin {
fn new(stdin: process::ChildStdin) -> io::Result<Self> {
Attacher::new(stdin).map(Self)
}
}

impl From<ChildStdin> for process::Stdio {
fn from(value: ChildStdin) -> Self {
Self::from(value.0)
Self::from(value.0.into_inner())
}
}
135 changes: 8 additions & 127 deletions compio-process/src/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,13 @@ use std::{
};

use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
use compio_driver::{op::BufResultExt, syscall, AsRawFd, OpCode, OpType, Overlapped, RawFd};
use compio_driver::{
op::{BufResultExt, Recv, Send},
syscall, AsRawFd, OpCode, OpType, RawFd,
};
use compio_io::{AsyncRead, AsyncWrite};
use compio_runtime::Runtime;
use windows_sys::Win32::{
Foundation::ERROR_NOT_FOUND,
Storage::FileSystem::{ReadFileEx, WriteFileEx},
System::{
Threading::GetExitCodeProcess,
IO::{CancelIoEx, PostQueuedCompletionStatus, OVERLAPPED},
},
};
use windows_sys::Win32::System::{Threading::GetExitCodeProcess, IO::OVERLAPPED};

use crate::{ChildStderr, ChildStdin, ChildStdout};

Expand Down Expand Up @@ -52,81 +48,6 @@ pub async fn child_wait(child: process::Child) -> io::Result<process::ExitStatus
Ok(process::ExitStatus::from_raw(code as _))
}

unsafe extern "system" fn apc_callback(
_dwerrorcode: u32,
dwnumberofbytestransfered: u32,
lpoverlapped: *mut OVERLAPPED,
) {
let optr: *mut Overlapped<()> = lpoverlapped.cast();
if let Some(overlapped) = optr.as_ref() {
syscall!(
BOOL,
PostQueuedCompletionStatus(
overlapped.driver as _,
dwnumberofbytestransfered,
0,
lpoverlapped,
)
)
.ok();
}
}

#[inline]
fn apc_cancel(fd: RawFd, optr: *mut OVERLAPPED) -> io::Result<()> {
match syscall!(BOOL, CancelIoEx(fd as _, optr)) {
Ok(_) => Ok(()),
Err(e) => {
if e.raw_os_error() == Some(ERROR_NOT_FOUND as _) {
Ok(())
} else {
Err(e)
}
}
}
}

struct ReadApc<B: IoBufMut> {
fd: RawFd,
buffer: B,
}

impl<B: IoBufMut> ReadApc<B> {
pub fn new(fd: RawFd, buffer: B) -> Self {
Self { fd, buffer }
}
}

impl<B: IoBufMut> OpCode for ReadApc<B> {
unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
let fd = self.fd as _;
let slice = self.get_unchecked_mut().buffer.as_mut_slice();
syscall!(
BOOL,
ReadFileEx(
fd,
slice.as_mut_ptr() as _,
slice.len() as _,
optr,
Some(apc_callback)
)
)?;
Poll::Pending
}

unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
apc_cancel(self.fd, optr)
}
}

impl<B: IoBufMut> IntoInner for ReadApc<B> {
type Inner = B;

fn into_inner(self) -> Self::Inner {
self.buffer
}
}

impl AsRawFd for ChildStdout {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_handle()
Expand All @@ -136,7 +57,7 @@ impl AsRawFd for ChildStdout {
impl AsyncRead for ChildStdout {
async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
let fd = self.as_raw_fd();
let op = ReadApc::new(fd, buffer);
let op = Recv::new(fd, buffer);
Runtime::current()
.submit(op)
.await
Expand All @@ -154,7 +75,7 @@ impl AsRawFd for ChildStderr {
impl AsyncRead for ChildStderr {
async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
let fd = self.as_raw_fd();
let op = ReadApc::new(fd, buffer);
let op = Recv::new(fd, buffer);
Runtime::current()
.submit(op)
.await
Expand All @@ -163,46 +84,6 @@ impl AsyncRead for ChildStderr {
}
}

struct WriteApc<B: IoBuf> {
fd: RawFd,
buffer: B,
}

impl<B: IoBuf> WriteApc<B> {
pub fn new(fd: RawFd, buffer: B) -> Self {
Self { fd, buffer }
}
}

impl<B: IoBuf> OpCode for WriteApc<B> {
unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
let slice = self.buffer.as_slice();
syscall!(
BOOL,
WriteFileEx(
self.fd as _,
slice.as_ptr() as _,
slice.len() as _,
optr,
Some(apc_callback),
)
)?;
Poll::Pending
}

unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
apc_cancel(self.fd, optr)
}
}

impl<B: IoBuf> IntoInner for WriteApc<B> {
type Inner = B;

fn into_inner(self) -> Self::Inner {
self.buffer
}
}

impl AsRawFd for ChildStdin {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_handle()
Expand All @@ -212,7 +93,7 @@ impl AsRawFd for ChildStdin {
impl AsyncWrite for ChildStdin {
async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
let fd = self.as_raw_fd();
let op = WriteApc::new(fd, buffer);
let op = Send::new(fd, buffer);
Runtime::current().submit(op).await.into_inner()
}

Expand Down

0 comments on commit 9a2a434

Please sign in to comment.