Skip to content

Commit

Permalink
feat(process): add compio-process crate
Browse files Browse the repository at this point in the history
On Linux, it tries to use pidfd with nightly feature enabled, otherwise,
it uses spawn_blocking.
  • Loading branch information
Berrysoft committed May 2, 2024
1 parent 62a3a9e commit cfe1708
Show file tree
Hide file tree
Showing 18 changed files with 943 additions and 24 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ members = [
"compio-io",
"compio-tls",
"compio-log",
"compio-process",
]
resolver = "2"

Expand All @@ -34,6 +35,7 @@ compio-signal = { path = "./compio-signal", version = "0.2.0" }
compio-dispatcher = { path = "./compio-dispatcher", version = "0.2.0" }
compio-log = { path = "./compio-log", version = "0.1.0" }
compio-tls = { path = "./compio-tls", version = "0.2.0", default-features = false }
compio-process = { path = "./compio-process", version = "0.1.0" }

flume = "0.11.0"
cfg-if = "1.0.0"
Expand Down
13 changes: 9 additions & 4 deletions compio-driver/src/iocp/cp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use compio_buf::arrayvec::ArrayVec;
use compio_log::*;
use windows_sys::Win32::{
Foundation::{
RtlNtStatusToDosError, ERROR_BAD_COMMAND, ERROR_HANDLE_EOF, ERROR_IO_INCOMPLETE,
ERROR_NO_DATA, FACILITY_NTWIN32, INVALID_HANDLE_VALUE, NTSTATUS, STATUS_PENDING,
STATUS_SUCCESS,
RtlNtStatusToDosError, ERROR_BAD_COMMAND, ERROR_BROKEN_PIPE, ERROR_HANDLE_EOF,
ERROR_IO_INCOMPLETE, ERROR_NO_DATA, ERROR_PIPE_CONNECTED, ERROR_PIPE_NOT_CONNECTED,
FACILITY_NTWIN32, INVALID_HANDLE_VALUE, NTSTATUS, STATUS_PENDING, STATUS_SUCCESS,
},
Storage::FileSystem::SetFileCompletionNotificationModes,
System::{
Expand Down Expand Up @@ -177,7 +177,12 @@ impl CompletionPort {
} else {
let error = unsafe { RtlNtStatusToDosError(overlapped.base.Internal as _) };
match error {
ERROR_IO_INCOMPLETE | ERROR_HANDLE_EOF | ERROR_NO_DATA => Ok(0),
ERROR_IO_INCOMPLETE
| ERROR_HANDLE_EOF
| ERROR_BROKEN_PIPE
| ERROR_PIPE_CONNECTED
| ERROR_PIPE_NOT_CONNECTED
| ERROR_NO_DATA => Ok(0),
_ => Err(io::Error::from_raw_os_error(error as _)),
}
};
Expand Down
36 changes: 36 additions & 0 deletions compio-driver/src/iocp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,24 @@ impl AsRawFd for OwnedSocket {
}
}

impl AsRawFd for std::process::ChildStdin {
fn as_raw_fd(&self) -> RawFd {
self.as_raw_handle() as _
}
}

impl AsRawFd for std::process::ChildStdout {
fn as_raw_fd(&self) -> RawFd {
self.as_raw_handle() as _
}
}

impl AsRawFd for std::process::ChildStderr {
fn as_raw_fd(&self) -> RawFd {
self.as_raw_handle() as _
}
}

impl From<OwnedHandle> for OwnedFd {
fn from(value: OwnedHandle) -> Self {
Self::File(value)
Expand All @@ -112,6 +130,24 @@ impl From<std::fs::File> for OwnedFd {
}
}

impl From<std::process::ChildStdin> for OwnedFd {
fn from(value: std::process::ChildStdin) -> Self {
Self::File(OwnedHandle::from(value))
}
}

impl From<std::process::ChildStdout> for OwnedFd {
fn from(value: std::process::ChildStdout) -> Self {
Self::File(OwnedHandle::from(value))
}
}

impl From<std::process::ChildStderr> for OwnedFd {
fn from(value: std::process::ChildStderr) -> Self {
Self::File(OwnedHandle::from(value))
}
}

impl From<OwnedSocket> for OwnedFd {
fn from(value: OwnedSocket) -> Self {
Self::Socket(value)
Expand Down
14 changes: 9 additions & 5 deletions compio-driver/src/iocp/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use windows_sys::{
core::GUID,
Win32::{
Foundation::{
CloseHandle, GetLastError, ERROR_HANDLE_EOF, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING,
ERROR_NOT_FOUND, ERROR_NO_DATA, ERROR_PIPE_CONNECTED,
CloseHandle, GetLastError, ERROR_BROKEN_PIPE, ERROR_HANDLE_EOF, ERROR_IO_INCOMPLETE,
ERROR_IO_PENDING, ERROR_NOT_FOUND, ERROR_NO_DATA, ERROR_PIPE_CONNECTED,
ERROR_PIPE_NOT_CONNECTED,
},
Networking::WinSock::{
closesocket, setsockopt, shutdown, socklen_t, WSAIoctl, WSARecv, WSARecvFrom, WSASend,
Expand All @@ -45,9 +46,12 @@ fn winapi_result(transferred: u32) -> Poll<io::Result<usize>> {
assert_ne!(error, 0);
match error {
ERROR_IO_PENDING => Poll::Pending,
ERROR_IO_INCOMPLETE | ERROR_HANDLE_EOF | ERROR_PIPE_CONNECTED | ERROR_NO_DATA => {
Poll::Ready(Ok(transferred as _))
}
ERROR_IO_INCOMPLETE
| ERROR_HANDLE_EOF
| ERROR_BROKEN_PIPE
| ERROR_PIPE_CONNECTED
| ERROR_PIPE_NOT_CONNECTED
| ERROR_NO_DATA => Poll::Ready(Ok(transferred as _)),
_ => Poll::Ready(Err(io::Error::from_raw_os_error(error as _))),
}
}
Expand Down
12 changes: 12 additions & 0 deletions compio-driver/src/iour/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,3 +555,15 @@ impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
self.buffer
}
}

impl<S: AsRawFd> OpCode for PollOnce<S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let flags = match self.interest {
Interest::Readable => libc::POLLIN,
Interest::Writable => libc::POLLOUT,
};
opcode::PollAdd::new(Fd(self.fd.as_raw_fd()), flags as _)
.build()
.into()
}
}
4 changes: 2 additions & 2 deletions compio-driver/src/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ pub use crate::sys::op::{
};
#[cfg(unix)]
pub use crate::sys::op::{
CreateDir, CreateSocket, FileStat, HardLink, OpenFile, PathStat, ReadVectoredAt, Rename,
Symlink, Unlink, WriteVectoredAt,
CreateDir, CreateSocket, FileStat, HardLink, Interest, OpenFile, PathStat, PollOnce,
ReadVectoredAt, Rename, Symlink, Unlink, WriteVectoredAt,
};
use crate::{
sys::{sockaddr_storage, socklen_t},
Expand Down
11 changes: 1 addition & 10 deletions compio-driver/src/poll/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub(crate) use libc::{sockaddr_storage, socklen_t};
use polling::{Event, Events, PollMode, Poller};
use slab::Slab;

use crate::{syscall, AsyncifyPool, Entry, OutEntries, ProactorBuilder};
use crate::{op::Interest, syscall, AsyncifyPool, Entry, OutEntries, ProactorBuilder};

pub(crate) mod op;

Expand Down Expand Up @@ -88,15 +88,6 @@ pub struct WaitArg {
pub interest: Interest,
}

/// The interest of the operation
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Interest {
/// Represents a read operation.
Readable,
/// Represents a write operation.
Writable,
}

#[derive(Debug, Default)]
struct FdQueue {
read_queue: VecDeque<usize>,
Expand Down
15 changes: 15 additions & 0 deletions compio-driver/src/poll/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,3 +748,18 @@ impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
self.buffer
}
}

impl<S: AsRawFd> OpCode for PollOnce<S> {
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
Ok(Decision::wait_for(self.fd.as_raw_fd(), self.interest))
}

fn on_event(self: Pin<&mut Self>, event: &Event) -> Poll<io::Result<usize>> {
match self.interest {
Interest::Readable => debug_assert!(event.readable),
Interest::Writable => debug_assert!(event.writable),
}

Poll::Ready(Ok(0))
}
}
22 changes: 22 additions & 0 deletions compio-driver/src/unix/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,25 @@ impl<T: IoVectoredBuf, S> IntoInner for SendVectored<T, S> {
self.buffer
}
}

/// The interest to poll a file descriptor.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Interest {
/// Represents a read operation.
Readable,
/// Represents a write operation.
Writable,
}

/// Poll a file descriptor for specified [`Interest`].
pub struct PollOnce<S> {
pub(crate) fd: SharedFd<S>,
pub(crate) interest: Interest,
}

impl<S> PollOnce<S> {
/// Create [`PollOnce`].
pub fn new(fd: SharedFd<S>, interest: Interest) -> Self {
Self { fd, interest }
}
}
4 changes: 2 additions & 2 deletions compio-fs/src/named_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ impl NamedPipeServer {
///
/// // Write fails with an OS-specific error after client has been
/// // disconnected.
/// let e = client.write("ping").await.0.unwrap_err();
/// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_NOT_CONNECTED as i32));
/// let e = client.write("ping").await.0.unwrap();
/// assert_eq!(e, 0);
/// # })
/// ```
pub fn disconnect(&self) -> io::Result<()> {
Expand Down
36 changes: 36 additions & 0 deletions compio-process/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
name = "compio-process"
version = "0.1.0"
description = "Processes for compio"
categories = ["asynchronous"]
keywords = ["async", "process"]
edition = { workspace = true }
authors = { workspace = true }
readme = { workspace = true }
license = { workspace = true }
repository = { workspace = true }

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
compio-buf = { workspace = true }
compio-driver = { workspace = true }
compio-io = { workspace = true }
compio-runtime = { workspace = true }

cfg-if = { workspace = true }
futures-util = { workspace = true }

[target.'cfg(windows)'.dependencies]
windows-sys = { workspace = true }

[dev-dependencies]
compio-macros = { workspace = true }

[features]
default = []

linux_pidfd = []
nightly = ["linux_pidfd"]
Loading

0 comments on commit cfe1708

Please sign in to comment.