From 9a2a4349b5861803b16219a91e35c1b89a6483ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Tue, 9 Apr 2024 19:14:15 +0800 Subject: [PATCH] feat(process,iocp): remove APC --- compio-driver/src/iocp/cp/mod.rs | 22 ++--- compio-driver/src/iocp/mod.rs | 53 +++++++++--- compio-process/src/lib.rs | 51 +++++++++--- compio-process/src/windows.rs | 135 ++----------------------------- 4 files changed, 97 insertions(+), 164 deletions(-) diff --git a/compio-driver/src/iocp/cp/mod.rs b/compio-driver/src/iocp/cp/mod.rs index d63b8066..b50ed9b3 100644 --- a/compio-driver/src/iocp/cp/mod.rs +++ b/compio-driver/src/iocp/cp/mod.rs @@ -22,10 +22,9 @@ use compio_buf::arrayvec::ArrayVec; use compio_log::*; use windows_sys::Win32::{ Foundation::{ - GetLastError, RtlNtStatusToDosError, ERROR_BAD_COMMAND, ERROR_BROKEN_PIPE, - ERROR_HANDLE_EOF, ERROR_IO_INCOMPLETE, ERROR_NO_DATA, ERROR_PIPE_CONNECTED, - ERROR_PIPE_NOT_CONNECTED, FACILITY_NTWIN32, INVALID_HANDLE_VALUE, NTSTATUS, STATUS_PENDING, - STATUS_SUCCESS, WAIT_IO_COMPLETION, + RtlNtStatusToDosError, ERROR_BAD_COMMAND, ERROR_BROKEN_PIPE, ERROR_HANDLE_EOF, + ERROR_IO_INCOMPLETE, ERROR_NO_DATA, ERROR_PIPE_CONNECTED, ERROR_PIPE_NOT_CONNECTED, + FACILITY_NTWIN32, INVALID_HANDLE_VALUE, NTSTATUS, STATUS_PENDING, STATUS_SUCCESS, }, Storage::FileSystem::SetFileCompletionNotificationModes, System::{ @@ -118,24 +117,17 @@ impl CompletionPort { Some(timeout) => timeout.as_millis() as u32, None => INFINITE, }; - let res = unsafe { + syscall!( + BOOL, GetQueuedCompletionStatusEx( self.port.as_raw_handle() as _, entries.as_mut_ptr(), DEFAULT_CAPACITY as _, &mut recv_count, timeout, - 1, + 0 ) - }; - if res == 0 { - let err = unsafe { GetLastError() }; - if err == WAIT_IO_COMPLETION { - return Err(io::ErrorKind::Interrupted.into()); - } else { - return Err(io::Error::from_raw_os_error(err as _)); - } - } + )?; trace!("recv_count: {recv_count}"); unsafe { entries.set_len(recv_count as _) }; diff --git a/compio-driver/src/iocp/mod.rs b/compio-driver/src/iocp/mod.rs index d61144ed..cdaefdd8 100644 --- a/compio-driver/src/iocp/mod.rs +++ b/compio-driver/src/iocp/mod.rs @@ -4,9 +4,12 @@ use std::{ mem::ManuallyDrop, os::{ raw::c_void, - windows::prelude::{ - AsRawHandle, AsRawSocket, FromRawHandle, FromRawSocket, IntoRawHandle, IntoRawSocket, - RawHandle, + windows::{ + io::OwnedHandle, + prelude::{ + AsRawHandle, AsRawSocket, FromRawHandle, FromRawSocket, IntoRawHandle, + IntoRawSocket, RawHandle, + }, }, }, pin::Pin, @@ -71,12 +74,26 @@ pub trait IntoRawFd { fn into_raw_fd(self) -> RawFd; } -impl AsRawFd for std::fs::File { - fn as_raw_fd(&self) -> RawFd { - self.as_raw_handle() - } +macro_rules! impl_raw_fd_for_raw_handle { + ($t:ty) => { + impl AsRawFd for $t { + fn as_raw_fd(&self) -> RawFd { + self.as_raw_handle() + } + } + impl IntoRawFd for $t { + fn into_raw_fd(self) -> RawFd { + self.into_raw_handle() + } + } + }; } +impl_raw_fd_for_raw_handle!(std::fs::File); +impl_raw_fd_for_raw_handle!(std::process::ChildStdin); +impl_raw_fd_for_raw_handle!(std::process::ChildStdout); +impl_raw_fd_for_raw_handle!(std::process::ChildStderr); + impl AsRawFd for socket2::Socket { fn as_raw_fd(&self) -> RawFd { self.as_raw_socket() as _ @@ -89,15 +106,27 @@ impl FromRawFd for std::fs::File { } } -impl FromRawFd for socket2::Socket { +impl FromRawFd for std::process::ChildStdin { unsafe fn from_raw_fd(fd: RawFd) -> Self { - Self::from_raw_socket(fd as _) + Self::from(OwnedHandle::from_raw_handle(fd)) } } -impl IntoRawFd for std::fs::File { - fn into_raw_fd(self) -> RawFd { - self.into_raw_handle() +impl FromRawFd for std::process::ChildStdout { + unsafe fn from_raw_fd(fd: RawFd) -> Self { + Self::from(OwnedHandle::from_raw_handle(fd)) + } +} + +impl FromRawFd for std::process::ChildStderr { + unsafe fn from_raw_fd(fd: RawFd) -> Self { + Self::from(OwnedHandle::from_raw_handle(fd)) + } +} + +impl FromRawFd for socket2::Socket { + unsafe fn from_raw_fd(fd: RawFd) -> Self { + Self::from_raw_socket(fd as _) } } diff --git a/compio-process/src/lib.rs b/compio-process/src/lib.rs index 397ae183..8dbe469c 100644 --- a/compio-process/src/lib.rs +++ b/compio-process/src/lib.rs @@ -26,8 +26,9 @@ use std::os::unix::process::CommandExt; use std::os::windows::process::CommandExt; use std::{ffi::OsStr, io, path::Path, process}; -use compio_buf::BufResult; +use compio_buf::{BufResult, IntoInner}; use compio_io::AsyncReadExt; +use compio_runtime::Attacher; use futures_util::future::Either; /// A process builder, providing fine-grained control @@ -216,9 +217,21 @@ impl Command { self.0.create_pidfd(true); } let mut child = self.0.spawn()?; - let stdin = child.stdin.take().map(ChildStdin); - let stdout = child.stdout.take().map(ChildStdout); - let stderr = child.stderr.take().map(ChildStderr); + let stdin = if let Some(stdin) = child.stdin.take() { + Some(ChildStdin::new(stdin)?) + } else { + None + }; + let stdout = if let Some(stdout) = child.stdout.take() { + Some(ChildStdout::new(stdout)?) + } else { + None + }; + let stderr = if let Some(stderr) = child.stderr.take() { + Some(ChildStderr::new(stderr)?) + } else { + None + }; Ok(Child { child, stdin, @@ -388,29 +401,47 @@ impl Child { /// A handle to a child process's standard output (stdout). See /// [`std::process::ChildStdout`]. -pub struct ChildStdout(process::ChildStdout); +pub struct ChildStdout(Attacher); + +impl ChildStdout { + fn new(stdout: process::ChildStdout) -> io::Result { + Attacher::new(stdout).map(Self) + } +} impl From for process::Stdio { fn from(value: ChildStdout) -> Self { - Self::from(value.0) + Self::from(value.0.into_inner()) } } /// A handle to a child process's stderr. See [`std::process::ChildStderr`]. -pub struct ChildStderr(process::ChildStderr); +pub struct ChildStderr(Attacher); + +impl ChildStderr { + fn new(stderr: process::ChildStderr) -> io::Result { + Attacher::new(stderr).map(Self) + } +} impl From for process::Stdio { fn from(value: ChildStderr) -> Self { - Self::from(value.0) + Self::from(value.0.into_inner()) } } /// A handle to a child process's standard input (stdin). See /// [`std::process::ChildStdin`]. -pub struct ChildStdin(process::ChildStdin); +pub struct ChildStdin(Attacher); + +impl ChildStdin { + fn new(stdin: process::ChildStdin) -> io::Result { + Attacher::new(stdin).map(Self) + } +} impl From for process::Stdio { fn from(value: ChildStdin) -> Self { - Self::from(value.0) + Self::from(value.0.into_inner()) } } diff --git a/compio-process/src/windows.rs b/compio-process/src/windows.rs index b3a63b17..79b396f1 100644 --- a/compio-process/src/windows.rs +++ b/compio-process/src/windows.rs @@ -7,17 +7,13 @@ use std::{ }; use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut}; -use compio_driver::{op::BufResultExt, syscall, AsRawFd, OpCode, OpType, Overlapped, RawFd}; +use compio_driver::{ + op::{BufResultExt, Recv, Send}, + syscall, AsRawFd, OpCode, OpType, RawFd, +}; use compio_io::{AsyncRead, AsyncWrite}; use compio_runtime::Runtime; -use windows_sys::Win32::{ - Foundation::ERROR_NOT_FOUND, - Storage::FileSystem::{ReadFileEx, WriteFileEx}, - System::{ - Threading::GetExitCodeProcess, - IO::{CancelIoEx, PostQueuedCompletionStatus, OVERLAPPED}, - }, -}; +use windows_sys::Win32::System::{Threading::GetExitCodeProcess, IO::OVERLAPPED}; use crate::{ChildStderr, ChildStdin, ChildStdout}; @@ -52,81 +48,6 @@ pub async fn child_wait(child: process::Child) -> io::Result = lpoverlapped.cast(); - if let Some(overlapped) = optr.as_ref() { - syscall!( - BOOL, - PostQueuedCompletionStatus( - overlapped.driver as _, - dwnumberofbytestransfered, - 0, - lpoverlapped, - ) - ) - .ok(); - } -} - -#[inline] -fn apc_cancel(fd: RawFd, optr: *mut OVERLAPPED) -> io::Result<()> { - match syscall!(BOOL, CancelIoEx(fd as _, optr)) { - Ok(_) => Ok(()), - Err(e) => { - if e.raw_os_error() == Some(ERROR_NOT_FOUND as _) { - Ok(()) - } else { - Err(e) - } - } - } -} - -struct ReadApc { - fd: RawFd, - buffer: B, -} - -impl ReadApc { - pub fn new(fd: RawFd, buffer: B) -> Self { - Self { fd, buffer } - } -} - -impl OpCode for ReadApc { - unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll> { - let fd = self.fd as _; - let slice = self.get_unchecked_mut().buffer.as_mut_slice(); - syscall!( - BOOL, - ReadFileEx( - fd, - slice.as_mut_ptr() as _, - slice.len() as _, - optr, - Some(apc_callback) - ) - )?; - Poll::Pending - } - - unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> { - apc_cancel(self.fd, optr) - } -} - -impl IntoInner for ReadApc { - type Inner = B; - - fn into_inner(self) -> Self::Inner { - self.buffer - } -} - impl AsRawFd for ChildStdout { fn as_raw_fd(&self) -> RawFd { self.0.as_raw_handle() @@ -136,7 +57,7 @@ impl AsRawFd for ChildStdout { impl AsyncRead for ChildStdout { async fn read(&mut self, buffer: B) -> BufResult { let fd = self.as_raw_fd(); - let op = ReadApc::new(fd, buffer); + let op = Recv::new(fd, buffer); Runtime::current() .submit(op) .await @@ -154,7 +75,7 @@ impl AsRawFd for ChildStderr { impl AsyncRead for ChildStderr { async fn read(&mut self, buffer: B) -> BufResult { let fd = self.as_raw_fd(); - let op = ReadApc::new(fd, buffer); + let op = Recv::new(fd, buffer); Runtime::current() .submit(op) .await @@ -163,46 +84,6 @@ impl AsyncRead for ChildStderr { } } -struct WriteApc { - fd: RawFd, - buffer: B, -} - -impl WriteApc { - pub fn new(fd: RawFd, buffer: B) -> Self { - Self { fd, buffer } - } -} - -impl OpCode for WriteApc { - unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll> { - let slice = self.buffer.as_slice(); - syscall!( - BOOL, - WriteFileEx( - self.fd as _, - slice.as_ptr() as _, - slice.len() as _, - optr, - Some(apc_callback), - ) - )?; - Poll::Pending - } - - unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> { - apc_cancel(self.fd, optr) - } -} - -impl IntoInner for WriteApc { - type Inner = B; - - fn into_inner(self) -> Self::Inner { - self.buffer - } -} - impl AsRawFd for ChildStdin { fn as_raw_fd(&self) -> RawFd { self.0.as_raw_handle() @@ -212,7 +93,7 @@ impl AsRawFd for ChildStdin { impl AsyncWrite for ChildStdin { async fn write(&mut self, buffer: T) -> BufResult { let fd = self.as_raw_fd(); - let op = WriteApc::new(fd, buffer); + let op = Send::new(fd, buffer); Runtime::current().submit(op).await.into_inner() }