From 5060e9845beef59b2d010994a89ce3d2d1fb30b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tymoteusz=20Wi=C5=9Bniewski?= Date: Tue, 31 Oct 2023 14:01:27 +0100 Subject: [PATCH 1/9] net: add support for anonymous unix pipes --- tokio/src/net/unix/pipe.rs | 222 ++++++++++++++++++++++++++++++----- tokio/tests/net_unix_pipe.rs | 79 +++++++++++++ 2 files changed, 274 insertions(+), 27 deletions(-) diff --git a/tokio/src/net/unix/pipe.rs b/tokio/src/net/unix/pipe.rs index 0b2508a9257..f898040260b 100644 --- a/tokio/src/net/unix/pipe.rs +++ b/tokio/src/net/unix/pipe.rs @@ -6,7 +6,8 @@ use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready}; use mio::unix::pipe as mio_pipe; use std::fs::File; use std::io::{self, Read, Write}; -use std::os::unix::fs::{FileTypeExt, OpenOptionsExt}; +use std::os::fd::OwnedFd; +use std::os::unix::fs::OpenOptionsExt; use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd}; use std::path::Path; use std::pin::Pin; @@ -16,6 +17,33 @@ cfg_io_util! { use bytes::BufMut; } +/// Creates a new anonymous Unix pipe. +/// +/// This function will open a new pipe and associate both pipe ends with the default +/// event loop. +/// +/// If you need to create a pipe for communicating with a spawned process, you can +/// also see how to use `Stdio::piped()` with [`tokio::process`]. +/// +/// [`tokio::process`]: crate::process +/// +/// # Errors +/// +/// If creating a pipe fails, this function will return with the related OS error. +/// +/// # Panics +/// +/// This function panics if it is not called from within a runtime with +/// IO enabled. +/// +/// The runtime is usually set implicitly when this function is called +/// from a future driven by a tokio runtime, otherwise runtime can be set +/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. +pub fn new() -> io::Result<(Sender, Receiver)> { + let (tx, rx) = mio_pipe::new()?; + Ok((Sender::from_mio(tx)?, Receiver::from_mio(rx)?)) +} + /// Options and flags which can be used to configure how a FIFO file is opened. /// /// This builder allows configuring how to create a pipe end from a FIFO file. @@ -218,7 +246,7 @@ impl OpenOptions { let file = options.open(path)?; - if !self.unchecked && !is_fifo(&file)? { + if !self.unchecked && !is_pipe(file.as_fd())? { return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe")); } @@ -338,15 +366,40 @@ impl Sender { /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. - pub fn from_file(mut file: File) -> io::Result { - if !is_fifo(&file)? { + pub fn from_file(file: File) -> io::Result { + Sender::from_owned_fd(file.into()) + } + + /// Creates a new `Sender` from an [`OwnedFd`]. + /// + /// This function is intended to construct a pipe from an [`OwnedFd`] representing + /// an anonymous pipe or a special FIFO file. It will check if the file descriptor + /// is a pipe and has write access, set it in non-blocking mode and perform the + /// conversion. + /// + /// # Errors + /// + /// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe + /// or it does not have write access. Also fails with any standard OS error if it + /// occurs. + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result { + if !is_pipe(owned_fd.as_fd())? { return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe")); } - let flags = get_file_flags(&file)?; + let flags = get_file_flags(owned_fd.as_fd())?; if has_write_access(flags) { - set_nonblocking(&mut file, flags)?; - Sender::from_file_unchecked(file) + set_nonblocking(owned_fd.as_fd(), flags)?; + Sender::from_owned_fd_unchecked(owned_fd) } else { Err(io::Error::new( io::ErrorKind::InvalidInput, @@ -394,8 +447,28 @@ impl Sender { /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_file_unchecked(file: File) -> io::Result { - let raw_fd = file.into_raw_fd(); - let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(raw_fd) }; + Sender::from_owned_fd_unchecked(file.into()) + } + + /// Creates a new `Sender` from an [`OwnedFd`] without checking pipe properties. + /// + /// This function is intended to construct a pipe from an [`OwnedFd`] representing + /// an anonymous pipe or a special FIFO file. The conversion assumes nothing about + /// the underlying pipe; it is left up to the user to make sure that the file + /// descriptor represents the writing end of a pipe and the pipe is set in + /// non-blocking mode. + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result { + // Safety: OwnedFd represents a valid, open file descriptor. + let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(owned_fd.into_raw_fd()) }; Sender::from_mio(mio_tx) } @@ -623,6 +696,21 @@ impl Sender { .registration() .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) } + + /// Converts the pipe into an [`OwnedFd`]. + /// + /// This function will deregister this pipe end from the event loop, set + /// it in blocking mode and perform the conversion. + pub fn into_owned_fd(self) -> io::Result { + let mio_pipe = self.io.into_inner()?; + set_blocking(&mio_pipe)?; + + // Safety: the pipe is now deregistered from the event loop + // and we are the only owner of this pipe end. + let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) }; + + Ok(owned_fd) + } } impl AsyncWrite for Sender { @@ -764,15 +852,40 @@ impl Receiver { /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. - pub fn from_file(mut file: File) -> io::Result { - if !is_fifo(&file)? { + pub fn from_file(file: File) -> io::Result { + Receiver::from_owned_fd(file.into()) + } + + /// Creates a new `Receiver` from an [`OwnedFd`]. + /// + /// This function is intended to construct a pipe from an [`OwnedFd`] representing + /// an anonymous pipe or a special FIFO file. It will check if the file descriptor + /// is a pipe and has read access, set it in non-blocking mode and perform the + /// conversion. + /// + /// # Errors + /// + /// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe + /// or it does not have read access. Also fails with any standard OS error if it + /// occurs. + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result { + if !is_pipe(owned_fd.as_fd())? { return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe")); } - let flags = get_file_flags(&file)?; + let flags = get_file_flags(owned_fd.as_fd())?; if has_read_access(flags) { - set_nonblocking(&mut file, flags)?; - Receiver::from_file_unchecked(file) + set_nonblocking(owned_fd.as_fd(), flags)?; + Receiver::from_owned_fd_unchecked(owned_fd) } else { Err(io::Error::new( io::ErrorKind::InvalidInput, @@ -820,8 +933,28 @@ impl Receiver { /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_file_unchecked(file: File) -> io::Result { - let raw_fd = file.into_raw_fd(); - let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(raw_fd) }; + Receiver::from_owned_fd_unchecked(file.into()) + } + + /// Creates a new `Receiver` from an [`OwnedFd`] without checking pipe properties. + /// + /// This function is intended to construct a pipe from an [`OwnedFd`] representing + /// an anonymous pipe or a special FIFO file. The conversion assumes nothing about + /// the underlying pipe; it is left up to the user to make sure that the file + /// descriptor represents the reading end of a pipe and the pipe is set in + /// non-blocking mode. + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result { + // Safety: OwnedFd represents a valid, open file descriptor. + let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(owned_fd.into_raw_fd()) }; Receiver::from_mio(mio_rx) } @@ -1146,6 +1279,21 @@ impl Receiver { }) } } + + /// Converts the pipe into an [`OwnedFd`]. + /// + /// This function will deregister this pipe end from the event loop, set + /// it in blocking mode and perform the conversion. + pub fn into_owned_fd(self) -> io::Result { + let mio_pipe = self.io.into_inner()?; + set_blocking(&mio_pipe)?; + + // Safety: the pipe is now deregistered from the event loop + // and we are the only owner of this pipe end. + let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) }; + + Ok(owned_fd) + } } impl AsyncRead for Receiver { @@ -1172,15 +1320,20 @@ impl AsFd for Receiver { } } -/// Checks if file is a FIFO -fn is_fifo(file: &File) -> io::Result { - Ok(file.metadata()?.file_type().is_fifo()) +/// Checks if the file descriptor is a pipe or a FIFO. +fn is_pipe(fd: BorrowedFd<'_>) -> io::Result { + let mut stat: libc::stat = unsafe { std::mem::zeroed() }; + let r = unsafe { libc::fstat(fd.as_raw_fd(), &mut stat) }; + if r == -1 { + Err(io::Error::last_os_error()) + } else { + Ok((stat.st_mode & libc::S_IFMT) == libc::S_IFIFO) + } } /// Gets file descriptor's flags by fcntl. -fn get_file_flags(file: &File) -> io::Result { - let fd = file.as_raw_fd(); - let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) }; +fn get_file_flags(fd: BorrowedFd<'_>) -> io::Result { + let flags = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) }; if flags < 0 { Err(io::Error::last_os_error()) } else { @@ -1200,14 +1353,12 @@ fn has_write_access(flags: libc::c_int) -> bool { mode == libc::O_WRONLY || mode == libc::O_RDWR } -/// Sets file's flags with `O_NONBLOCK` by fcntl. -fn set_nonblocking(file: &mut File, current_flags: libc::c_int) -> io::Result<()> { - let fd = file.as_raw_fd(); - +/// Sets file descriptor's flags with `O_NONBLOCK` by fcntl. +fn set_nonblocking(fd: BorrowedFd<'_>, current_flags: libc::c_int) -> io::Result<()> { let flags = current_flags | libc::O_NONBLOCK; if flags != current_flags { - let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags) }; + let ret = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, flags) }; if ret < 0 { return Err(io::Error::last_os_error()); } @@ -1215,3 +1366,20 @@ fn set_nonblocking(file: &mut File, current_flags: libc::c_int) -> io::Result<() Ok(()) } + +/// Removes `O_NONBLOCK` from fd's flags. +fn set_blocking(fd: &T) -> io::Result<()> { + let previous = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) }; + if previous == -1 { + return Err(io::Error::last_os_error()); + } + + let new = previous & !libc::O_NONBLOCK; + + let r = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, new) }; + if r == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } +} diff --git a/tokio/tests/net_unix_pipe.rs b/tokio/tests/net_unix_pipe.rs index c96d6e70fbd..90c288468ec 100644 --- a/tokio/tests/net_unix_pipe.rs +++ b/tokio/tests/net_unix_pipe.rs @@ -8,6 +8,7 @@ use tokio_test::{assert_err, assert_ok, assert_pending, assert_ready_ok}; use std::fs::File; use std::io; +use std::os::fd::{FromRawFd, OwnedFd}; use std::os::unix::fs::OpenOptionsExt; use std::os::unix::io::AsRawFd; use std::path::{Path, PathBuf}; @@ -427,3 +428,81 @@ async fn try_read_buf() -> std::io::Result<()> { Ok(()) } + +#[tokio::test] +async fn anon_pipe_simple_send() -> io::Result<()> { + const DATA: &[u8] = b"this is some data to write to the pipe"; + + let (mut writer, mut reader) = pipe::new()?; + + // Create a reading task which should wait for data from the pipe. + let mut read_fut = task::spawn(async move { + let mut buf = vec![0; DATA.len()]; + reader.read_exact(&mut buf).await?; + Ok::<_, io::Error>(buf) + }); + assert_pending!(read_fut.poll()); + + writer.write_all(DATA).await?; + + // Let the IO driver poll events for the reader. + while !read_fut.is_woken() { + tokio::task::yield_now().await; + } + + // Reading task should be ready now. + let read_data = assert_ready_ok!(read_fut.poll()); + assert_eq!(&read_data, DATA); + + Ok(()) +} + +#[tokio::test] +async fn anon_pipe_spawn_echo() -> std::io::Result<()> { + use tokio::process::Command; + + const DATA: &str = "this is some data to write to the pipe"; + + let (tx, mut rx) = pipe::new()?; + + let status = Command::new("echo") + .arg("-n") + .arg(DATA) + .stdout(tx.into_owned_fd()?) + .status(); + + let mut buf = vec![0; DATA.len()]; + rx.read_exact(&mut buf).await?; + assert_eq!(String::from_utf8(buf).unwrap(), DATA); + + let exit_code = status.await?; + assert!(exit_code.success()); + + // Check if the pipe is closed. + buf = Vec::new(); + let total = assert_ok!(rx.try_read(&mut buf)); + assert_eq!(total, 0); + + Ok(()) +} + +#[tokio::test] +#[cfg(target_os = "linux")] +async fn anon_pipe_from_owned_fd() -> std::io::Result<()> { + use nix::fcntl::OFlag; + + const DATA: &[u8] = b"this is some data to write to the pipe"; + + let fds = nix::unistd::pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK)?; + let (rx_fd, tx_fd) = unsafe { (OwnedFd::from_raw_fd(fds.0), OwnedFd::from_raw_fd(fds.1)) }; + + let mut rx = pipe::Receiver::from_owned_fd(rx_fd)?; + let mut tx = pipe::Sender::from_owned_fd(tx_fd)?; + + let mut buf = vec![0; DATA.len()]; + tx.write_all(DATA).await?; + rx.read_exact(&mut buf).await?; + assert_eq!(buf, DATA); + + Ok(()) +} From 09651e5bb9a31ae111ecb7712838a9a14274c49a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tymoteusz=20Wi=C5=9Bniewski?= Date: Fri, 3 Nov 2023 14:56:14 +0100 Subject: [PATCH 2/9] use `os::unix::io` instead of `os::fd` --- tokio/src/net/unix/pipe.rs | 3 +-- tokio/tests/net_unix_pipe.rs | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/tokio/src/net/unix/pipe.rs b/tokio/src/net/unix/pipe.rs index f898040260b..f49fbc8f27c 100644 --- a/tokio/src/net/unix/pipe.rs +++ b/tokio/src/net/unix/pipe.rs @@ -6,9 +6,8 @@ use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready}; use mio::unix::pipe as mio_pipe; use std::fs::File; use std::io::{self, Read, Write}; -use std::os::fd::OwnedFd; use std::os::unix::fs::OpenOptionsExt; -use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd}; +use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/tokio/tests/net_unix_pipe.rs b/tokio/tests/net_unix_pipe.rs index 90c288468ec..fd02a86f7e5 100644 --- a/tokio/tests/net_unix_pipe.rs +++ b/tokio/tests/net_unix_pipe.rs @@ -8,9 +8,8 @@ use tokio_test::{assert_err, assert_ok, assert_pending, assert_ready_ok}; use std::fs::File; use std::io; -use std::os::fd::{FromRawFd, OwnedFd}; use std::os::unix::fs::OpenOptionsExt; -use std::os::unix::io::AsRawFd; +use std::os::unix::io::{AsRawFd, FromRawFd, OwnedFd}; use std::path::{Path, PathBuf}; /// Helper struct which will clean up temporary files once dropped. From bb165648f55c22be5e76216e933dc2e25a6da84a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tymoteusz=20Wi=C5=9Bniewski?= Date: Fri, 3 Nov 2023 15:09:48 +0100 Subject: [PATCH 3/9] fix macOS and android CI --- tokio/src/net/unix/pipe.rs | 4 +++- tokio/tests/net_unix_pipe.rs | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/tokio/src/net/unix/pipe.rs b/tokio/src/net/unix/pipe.rs index f49fbc8f27c..b7b0b51db03 100644 --- a/tokio/src/net/unix/pipe.rs +++ b/tokio/src/net/unix/pipe.rs @@ -1326,7 +1326,9 @@ fn is_pipe(fd: BorrowedFd<'_>) -> io::Result { if r == -1 { Err(io::Error::last_os_error()) } else { - Ok((stat.st_mode & libc::S_IFMT) == libc::S_IFIFO) + // on some platforms `st_mode` is larger than `S_IFMT` and `S_IFIFO`. + #[allow(clippy::useless_conversion)] + Ok((stat.st_mode & libc::S_IFMT) == libc::S_IFIFO.into()) } } diff --git a/tokio/tests/net_unix_pipe.rs b/tokio/tests/net_unix_pipe.rs index fd02a86f7e5..8f239cbfe2c 100644 --- a/tokio/tests/net_unix_pipe.rs +++ b/tokio/tests/net_unix_pipe.rs @@ -9,7 +9,7 @@ use tokio_test::{assert_err, assert_ok, assert_pending, assert_ready_ok}; use std::fs::File; use std::io; use std::os::unix::fs::OpenOptionsExt; -use std::os::unix::io::{AsRawFd, FromRawFd, OwnedFd}; +use std::os::unix::io::AsRawFd; use std::path::{Path, PathBuf}; /// Helper struct which will clean up temporary files once dropped. @@ -489,6 +489,7 @@ async fn anon_pipe_spawn_echo() -> std::io::Result<()> { #[cfg(target_os = "linux")] async fn anon_pipe_from_owned_fd() -> std::io::Result<()> { use nix::fcntl::OFlag; + use std::os::unix::io::{FromRawFd, OwnedFd}; const DATA: &[u8] = b"this is some data to write to the pipe"; From 5f0b31b2425e40c5eb5ad15d79c5e40a2af4ed85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tymoteusz=20Wi=C5=9Bniewski?= Date: Fri, 3 Nov 2023 15:27:01 +0100 Subject: [PATCH 4/9] fix android CI again --- tokio/src/net/unix/pipe.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tokio/src/net/unix/pipe.rs b/tokio/src/net/unix/pipe.rs index b7b0b51db03..b9b7eed782e 100644 --- a/tokio/src/net/unix/pipe.rs +++ b/tokio/src/net/unix/pipe.rs @@ -1326,9 +1326,7 @@ fn is_pipe(fd: BorrowedFd<'_>) -> io::Result { if r == -1 { Err(io::Error::last_os_error()) } else { - // on some platforms `st_mode` is larger than `S_IFMT` and `S_IFIFO`. - #[allow(clippy::useless_conversion)] - Ok((stat.st_mode & libc::S_IFMT) == libc::S_IFIFO.into()) + Ok((stat.st_mode as libc::mode_t & libc::S_IFMT) == libc::S_IFIFO) } } From 024f938a121c74f981e9072cc204d7fe73893579 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tymoteusz=20Wi=C5=9Bniewski?= Date: Sat, 9 Dec 2023 18:52:42 +0100 Subject: [PATCH 5/9] rename `new` to `pipe` --- tokio/src/net/unix/pipe.rs | 2 +- tokio/tests/net_unix_pipe.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/net/unix/pipe.rs b/tokio/src/net/unix/pipe.rs index b9b7eed782e..9d8cb47a57e 100644 --- a/tokio/src/net/unix/pipe.rs +++ b/tokio/src/net/unix/pipe.rs @@ -38,7 +38,7 @@ cfg_io_util! { /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. -pub fn new() -> io::Result<(Sender, Receiver)> { +pub fn pipe() -> io::Result<(Sender, Receiver)> { let (tx, rx) = mio_pipe::new()?; Ok((Sender::from_mio(tx)?, Receiver::from_mio(rx)?)) } diff --git a/tokio/tests/net_unix_pipe.rs b/tokio/tests/net_unix_pipe.rs index 8f239cbfe2c..5537c8fd1a7 100644 --- a/tokio/tests/net_unix_pipe.rs +++ b/tokio/tests/net_unix_pipe.rs @@ -432,7 +432,7 @@ async fn try_read_buf() -> std::io::Result<()> { async fn anon_pipe_simple_send() -> io::Result<()> { const DATA: &[u8] = b"this is some data to write to the pipe"; - let (mut writer, mut reader) = pipe::new()?; + let (mut writer, mut reader) = pipe::pipe()?; // Create a reading task which should wait for data from the pipe. let mut read_fut = task::spawn(async move { @@ -462,7 +462,7 @@ async fn anon_pipe_spawn_echo() -> std::io::Result<()> { const DATA: &str = "this is some data to write to the pipe"; - let (tx, mut rx) = pipe::new()?; + let (tx, mut rx) = pipe::pipe()?; let status = Command::new("echo") .arg("-n") From 106e9357adbb7d791e0042a93c116eedf2720e47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tymoteusz=20Wi=C5=9Bniewski?= Date: Sat, 9 Dec 2023 19:35:59 +0100 Subject: [PATCH 6/9] Improve docs for `pipe()` --- tokio/src/net/unix/pipe.rs | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/tokio/src/net/unix/pipe.rs b/tokio/src/net/unix/pipe.rs index 9d8cb47a57e..a6b02f29fbf 100644 --- a/tokio/src/net/unix/pipe.rs +++ b/tokio/src/net/unix/pipe.rs @@ -21,15 +21,41 @@ cfg_io_util! { /// This function will open a new pipe and associate both pipe ends with the default /// event loop. /// -/// If you need to create a pipe for communicating with a spawned process, you can -/// also see how to use `Stdio::piped()` with [`tokio::process`]. +/// If you need to create a pipe for communication with a spawned process, you can +/// use [`Stdio::piped()`] instead. /// -/// [`tokio::process`]: crate::process +/// [`Stdio::piped()`]: std::process::Stdio::piped /// /// # Errors /// /// If creating a pipe fails, this function will return with the related OS error. /// +/// # Examples +/// +/// Create a pipe and pass the writing end to a spawned process. +/// +/// ```no_run +/// use tokio::net::unix::pipe; +/// use tokio::process::Command; +/// # use tokio::io::AsyncReadExt; +/// # use std::error::Error; +/// +/// # async fn dox() -> Result<(), Box> { +/// let (tx, mut rx) = pipe::pipe()?; +/// let mut buffer = String::new(); +/// +/// let status = Command::new("echo") +/// .arg("Hello, world!") +/// .stdout(tx.into_owned_fd()?) +/// .status(); +/// rx.read_to_string(&mut buffer).await?; +/// +/// assert!(status.await?.success()); +/// assert_eq!(buffer, "Hello, world!\n"); +/// # Ok(()) +/// # } +/// ``` +/// /// # Panics /// /// This function panics if it is not called from within a runtime with From 1e21ca4b65139af69c65baa91cf48a1bbe8b1a4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tymoteusz=20Wi=C5=9Bniewski?= Date: Sat, 9 Dec 2023 20:17:27 +0100 Subject: [PATCH 7/9] split `into_owned_fd` into two functions --- tokio/src/net/unix/pipe.rs | 34 +++++++++++++++++++++++++++------- tokio/tests/net_unix_pipe.rs | 28 +++++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 8 deletions(-) diff --git a/tokio/src/net/unix/pipe.rs b/tokio/src/net/unix/pipe.rs index a6b02f29fbf..10d38711a55 100644 --- a/tokio/src/net/unix/pipe.rs +++ b/tokio/src/net/unix/pipe.rs @@ -46,7 +46,7 @@ cfg_io_util! { /// /// let status = Command::new("echo") /// .arg("Hello, world!") -/// .stdout(tx.into_owned_fd()?) +/// .stdout(tx.into_blocking_fd()?) /// .status(); /// rx.read_to_string(&mut buffer).await?; /// @@ -722,13 +722,23 @@ impl Sender { .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) } - /// Converts the pipe into an [`OwnedFd`]. + /// Converts the pipe into an [`OwnedFd`] in blocking mode. /// /// This function will deregister this pipe end from the event loop, set /// it in blocking mode and perform the conversion. - pub fn into_owned_fd(self) -> io::Result { + pub fn into_blocking_fd(self) -> io::Result { + let fd = self.into_nonblocking_fd()?; + set_blocking(&fd)?; + Ok(fd) + } + + /// Converts the pipe into an [`OwnedFd`] in nonblocking mode. + /// + /// This function will deregister this pipe end from the event loop and + /// perform the conversion. Returned file descriptor will be in nonblocking + /// mode. + pub fn into_nonblocking_fd(self) -> io::Result { let mio_pipe = self.io.into_inner()?; - set_blocking(&mio_pipe)?; // Safety: the pipe is now deregistered from the event loop // and we are the only owner of this pipe end. @@ -1305,13 +1315,23 @@ impl Receiver { } } - /// Converts the pipe into an [`OwnedFd`]. + /// Converts the pipe into an [`OwnedFd`] in blocking mode. /// /// This function will deregister this pipe end from the event loop, set /// it in blocking mode and perform the conversion. - pub fn into_owned_fd(self) -> io::Result { + pub fn into_blocking_fd(self) -> io::Result { + let fd = self.into_nonblocking_fd()?; + set_blocking(&fd)?; + Ok(fd) + } + + /// Converts the pipe into an [`OwnedFd`] in nonblocking mode. + /// + /// This function will deregister this pipe end from the event loop and + /// perform the conversion. Returned file descriptor will be in nonblocking + /// mode. + pub fn into_nonblocking_fd(self) -> io::Result { let mio_pipe = self.io.into_inner()?; - set_blocking(&mio_pipe)?; // Safety: the pipe is now deregistered from the event loop // and we are the only owner of this pipe end. diff --git a/tokio/tests/net_unix_pipe.rs b/tokio/tests/net_unix_pipe.rs index 5537c8fd1a7..6706880ed1b 100644 --- a/tokio/tests/net_unix_pipe.rs +++ b/tokio/tests/net_unix_pipe.rs @@ -467,7 +467,7 @@ async fn anon_pipe_spawn_echo() -> std::io::Result<()> { let status = Command::new("echo") .arg("-n") .arg(DATA) - .stdout(tx.into_owned_fd()?) + .stdout(tx.into_blocking_fd()?) .status(); let mut buf = vec![0; DATA.len()]; @@ -506,3 +506,29 @@ async fn anon_pipe_from_owned_fd() -> std::io::Result<()> { Ok(()) } + +#[tokio::test] +async fn anon_pipe_into_nonblocking_fd() -> std::io::Result<()> { + let (tx, rx) = pipe::pipe()?; + + let tx_fd = tx.into_nonblocking_fd()?; + let rx_fd = rx.into_nonblocking_fd()?; + + assert!(is_nonblocking(&tx_fd)?); + assert!(is_nonblocking(&rx_fd)?); + + Ok(()) +} + +#[tokio::test] +async fn anon_pipe_into_blocking_fd() -> std::io::Result<()> { + let (tx, rx) = pipe::pipe()?; + + let tx_fd = tx.into_blocking_fd()?; + let rx_fd = rx.into_blocking_fd()?; + + assert!(!is_nonblocking(&tx_fd)?); + assert!(!is_nonblocking(&rx_fd)?); + + Ok(()) +} From 9097f3d4064c5624315b176dffd6df8982267b03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tymoteusz=20Wi=C5=9Bniewski?= Date: Sat, 9 Dec 2023 20:37:05 +0100 Subject: [PATCH 8/9] add safety comments --- tokio/src/net/unix/pipe.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tokio/src/net/unix/pipe.rs b/tokio/src/net/unix/pipe.rs index 10d38711a55..5ba34b7f568 100644 --- a/tokio/src/net/unix/pipe.rs +++ b/tokio/src/net/unix/pipe.rs @@ -1367,8 +1367,14 @@ impl AsFd for Receiver { /// Checks if the file descriptor is a pipe or a FIFO. fn is_pipe(fd: BorrowedFd<'_>) -> io::Result { + // Safety: `libc::stat` is C-like struct used for syscalls and all-zero + // byte pattern forms a valid value. let mut stat: libc::stat = unsafe { std::mem::zeroed() }; + + // Safety: it's safe to call `fstat` with a valid, open file descriptor + // and a valid pointer to a `stat` struct. let r = unsafe { libc::fstat(fd.as_raw_fd(), &mut stat) }; + if r == -1 { Err(io::Error::last_os_error()) } else { @@ -1378,6 +1384,7 @@ fn is_pipe(fd: BorrowedFd<'_>) -> io::Result { /// Gets file descriptor's flags by fcntl. fn get_file_flags(fd: BorrowedFd<'_>) -> io::Result { + // Safety: it's safe to use `fcntl` to read flags of a valid, open file descriptor. let flags = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) }; if flags < 0 { Err(io::Error::last_os_error()) @@ -1403,6 +1410,8 @@ fn set_nonblocking(fd: BorrowedFd<'_>, current_flags: libc::c_int) -> io::Result let flags = current_flags | libc::O_NONBLOCK; if flags != current_flags { + // Safety: it's safe to use `fcntl` to set the `O_NONBLOCK` flag of a valid, + // open file descriptor. let ret = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, flags) }; if ret < 0 { return Err(io::Error::last_os_error()); @@ -1414,6 +1423,7 @@ fn set_nonblocking(fd: BorrowedFd<'_>, current_flags: libc::c_int) -> io::Result /// Removes `O_NONBLOCK` from fd's flags. fn set_blocking(fd: &T) -> io::Result<()> { + // Safety: it's safe to use `fcntl` to read flags of a valid, open file descriptor. let previous = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) }; if previous == -1 { return Err(io::Error::last_os_error()); @@ -1421,6 +1431,8 @@ fn set_blocking(fd: &T) -> io::Result<()> { let new = previous & !libc::O_NONBLOCK; + // Safety: it's safe to use `fcntl` to unset the `O_NONBLOCK` flag of a valid, + // open file descriptor. let r = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, new) }; if r == -1 { Err(io::Error::last_os_error()) From f6350638e6aa3116243a0c92e30d09109b9b2848 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Sat, 30 Dec 2023 16:24:41 +0100 Subject: [PATCH 9/9] Update tokio/src/net/unix/pipe.rs --- tokio/src/net/unix/pipe.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/net/unix/pipe.rs b/tokio/src/net/unix/pipe.rs index 5ba34b7f568..7c279134dbf 100644 --- a/tokio/src/net/unix/pipe.rs +++ b/tokio/src/net/unix/pipe.rs @@ -735,7 +735,7 @@ impl Sender { /// Converts the pipe into an [`OwnedFd`] in nonblocking mode. /// /// This function will deregister this pipe end from the event loop and - /// perform the conversion. Returned file descriptor will be in nonblocking + /// perform the conversion. The returned file descriptor will be in nonblocking /// mode. pub fn into_nonblocking_fd(self) -> io::Result { let mio_pipe = self.io.into_inner()?;