diff --git a/Cargo.toml b/Cargo.toml index 802c9905e..e16186069 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,10 +22,10 @@ rust-version = "1.53" [target.'cfg(unix)'.dependencies] # Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866 # which is still an issue with `resolver = "1"`. -libc = { version = "0.2.62", default-features = false } +libc = { version = "0.2.62", default-features = false, optional = true } [features] -parallel = [] +parallel = ["libc"] [dev-dependencies] tempfile = "3" diff --git a/dev-tools/gen-windows-sys-binding/windows_sys.list b/dev-tools/gen-windows-sys-binding/windows_sys.list index f5ed55e16..ec8b02793 100644 --- a/dev-tools/gen-windows-sys-binding/windows_sys.list +++ b/dev-tools/gen-windows-sys-binding/windows_sys.list @@ -1,5 +1,4 @@ Windows.Win32.Foundation.FILETIME -Windows.Win32.Foundation.INVALID_HANDLE_VALUE Windows.Win32.Foundation.ERROR_NO_MORE_ITEMS Windows.Win32.Foundation.ERROR_SUCCESS Windows.Win32.Foundation.SysFreeString @@ -20,7 +19,7 @@ Windows.Win32.System.Com.COINIT_MULTITHREADED Windows.Win32.System.Com.CoCreateInstance Windows.Win32.System.Com.CoInitializeEx -Windows.Win32.System.Pipes.CreatePipe +Windows.Win32.System.Pipes.PeekNamedPipe Windows.Win32.System.Registry.RegCloseKey Windows.Win32.System.Registry.RegEnumKeyExW diff --git a/src/command_helpers.rs b/src/command_helpers.rs index 3b3e923df..8b6148a79 100644 --- a/src/command_helpers.rs +++ b/src/command_helpers.rs @@ -4,13 +4,12 @@ use std::{ collections::hash_map, ffi::OsString, fmt::Display, - fs::{self, File}, + fs, hash::Hasher, - io::{self, BufRead, BufReader, Read, Write}, + io::{self, Read, Write}, path::Path, - process::{Child, Command, Stdio}, + process::{Child, ChildStderr, Command, Stdio}, sync::Arc, - thread::{self, JoinHandle}, }; use crate::{Error, ErrorKind, Object}; @@ -41,83 +40,164 @@ impl CargoOutput { } } - pub(crate) fn print_thread(&self) -> Result, Error> { - self.warnings.then(PrintThread::new).transpose() + fn stdio_for_warnings(&self) -> Stdio { + if self.warnings { + Stdio::piped() + } else { + Stdio::null() + } } } -pub(crate) struct PrintThread { - handle: Option>, - pipe_writer: Option, +pub(crate) struct StderrForwarder { + inner: Option<(ChildStderr, Vec)>, + #[cfg(feature = "parallel")] + is_non_blocking: bool, + #[cfg(feature = "parallel")] + bytes_available_failed: bool, } -impl PrintThread { - pub(crate) fn new() -> Result { - let (pipe_reader, pipe_writer) = crate::os_pipe::pipe()?; - - // Capture the standard error coming from compilation, and write it out - // with cargo:warning= prefixes. Note that this is a bit wonky to avoid - // requiring the output to be UTF-8, we instead just ship bytes from one - // location to another. - let print = thread::spawn(move || { - let mut stderr = BufReader::with_capacity(4096, pipe_reader); - let mut line = Vec::with_capacity(20); - let stdout = io::stdout(); - - // read_until returns 0 on Eof - while stderr.read_until(b'\n', &mut line).unwrap() != 0 { - { - let mut stdout = stdout.lock(); - - stdout.write_all(b"cargo:warning=").unwrap(); - stdout.write_all(&line).unwrap(); - stdout.write_all(b"\n").unwrap(); - } +const MIN_BUFFER_CAPACITY: usize = 100; - // read_until does not clear the buffer - line.clear(); - } - }); +impl StderrForwarder { + pub(crate) fn new(child: &mut Child) -> Self { + Self { + inner: child + .stderr + .take() + .map(|stderr| (stderr, Vec::with_capacity(MIN_BUFFER_CAPACITY))), + #[cfg(feature = "parallel")] + is_non_blocking: false, + #[cfg(feature = "parallel")] + bytes_available_failed: false, + } + } - Ok(Self { - handle: Some(print), - pipe_writer: Some(pipe_writer), - }) + #[allow(clippy::uninit_vec)] + fn forward_available(&mut self) -> bool { + if let Some((stderr, buffer)) = self.inner.as_mut() { + loop { + let old_data_end = buffer.len(); + + // For non-blocking we check to see if there is data available, so we should try to + // read at least that much. For blocking, always read at least the minimum amount. + #[cfg(not(feature = "parallel"))] + let to_reserve = MIN_BUFFER_CAPACITY; + #[cfg(feature = "parallel")] + let to_reserve = if self.is_non_blocking && !self.bytes_available_failed { + match crate::parallel::stderr::bytes_available(stderr) { + #[cfg(windows)] + Ok(0) => return false, + #[cfg(unix)] + Ok(0) => { + // On Unix, depending on the implementation, we may sometimes get 0 in a + // loop (either there is data available or the pipe is broken), so + // continue with the non-blocking read anyway. + MIN_BUFFER_CAPACITY + } + #[cfg(windows)] + Err(_) => { + // On Windows, if we get an error then the pipe is broken, so flush + // the buffer and bail. + if !buffer.is_empty() { + write_warning(&buffer[..]); + } + self.inner = None; + return true; + } + #[cfg(unix)] + Err(_) => { + // On Unix, depending on the implementation, we may get spurious + // errors so make a note not to use bytes_available again and try + // the non-blocking read anyway. + self.bytes_available_failed = true; + MIN_BUFFER_CAPACITY + } + Ok(bytes_available) => MIN_BUFFER_CAPACITY.max(bytes_available), + } + } else { + MIN_BUFFER_CAPACITY + }; + buffer.reserve(to_reserve); + + // SAFETY: 1) the length is set to the capacity, so we are never using memory beyond + // the underlying buffer and 2) we always call `truncate` below to set the len back + // to the intitialized data. + unsafe { + buffer.set_len(buffer.capacity()); + } + match stderr.read(&mut buffer[old_data_end..]) { + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + // No data currently, yield back. + buffer.truncate(old_data_end); + return false; + } + Err(err) if err.kind() == std::io::ErrorKind::Interrupted => { + // Interrupted, try again. + buffer.truncate(old_data_end); + } + Ok(0) | Err(_) => { + // End of stream: flush remaining data and bail. + if old_data_end > 0 { + write_warning(&buffer[..old_data_end]); + } + self.inner = None; + return true; + } + Ok(bytes_read) => { + buffer.truncate(old_data_end + bytes_read); + let mut consumed = 0; + for line in buffer.split_inclusive(|&b| b == b'\n') { + // Only forward complete lines, leave the rest in the buffer. + if let Some((b'\n', line)) = line.split_last() { + consumed += line.len() + 1; + write_warning(line); + } + } + buffer.drain(..consumed); + } + } + } + } else { + true + } } - /// # Panics - /// - /// Will panic if the pipe writer has already been taken. - pub(crate) fn take_pipe_writer(&mut self) -> File { - self.pipe_writer.take().unwrap() + #[cfg(feature = "parallel")] + pub(crate) fn set_non_blocking(&mut self) -> Result<(), Error> { + assert!(!self.is_non_blocking); + + if let Some((stderr, _)) = self.inner.as_mut() { + crate::parallel::stderr::set_non_blocking(stderr)?; + } + + self.is_non_blocking = true; + Ok(()) } - /// # Panics - /// - /// Will panic if the pipe writer has already been taken. - pub(crate) fn clone_pipe_writer(&self) -> Result { - self.try_clone_pipe_writer().map(Option::unwrap) + #[cfg(feature = "parallel")] + fn forward_all(&mut self) { + while !self.forward_available() {} } - pub(crate) fn try_clone_pipe_writer(&self) -> Result, Error> { - self.pipe_writer - .as_ref() - .map(File::try_clone) - .transpose() - .map_err(From::from) + #[cfg(not(feature = "parallel"))] + fn forward_all(&mut self) { + let forward_result = self.forward_available(); + assert!(forward_result, "Should have consumed all data"); } } -impl Drop for PrintThread { - fn drop(&mut self) { - // Drop pipe_writer first to avoid deadlock - self.pipe_writer.take(); - - self.handle.take().unwrap().join().unwrap(); - } +fn write_warning(line: &[u8]) { + let stdout = io::stdout(); + let mut stdout = stdout.lock(); + stdout.write_all(b"cargo:warning=").unwrap(); + stdout.write_all(line).unwrap(); + stdout.write_all(b"\n").unwrap(); } fn wait_on_child(cmd: &Command, program: &str, child: &mut Child) -> Result<(), Error> { + StderrForwarder::new(child).forward_all(); + let status = match child.wait() { Ok(s) => s, Err(e) => { @@ -193,20 +273,13 @@ pub(crate) fn objects_from_files(files: &[Arc], dst: &Path) -> Result) -> Result<(), Error> { - let mut child = spawn(cmd, program, pipe_writer)?; - wait_on_child(cmd, program, &mut child) -} - pub(crate) fn run( cmd: &mut Command, program: &str, - print: Option<&PrintThread>, + cargo_output: &CargoOutput, ) -> Result<(), Error> { - let pipe_writer = print.map(PrintThread::clone_pipe_writer).transpose()?; - run_inner(cmd, program, pipe_writer)?; - - Ok(()) + let mut child = spawn(cmd, program, cargo_output)?; + wait_on_child(cmd, program, &mut child) } pub(crate) fn run_output( @@ -216,12 +289,7 @@ pub(crate) fn run_output( ) -> Result, Error> { cmd.stdout(Stdio::piped()); - let mut print = cargo_output.print_thread()?; - let mut child = spawn( - cmd, - program, - print.as_mut().map(PrintThread::take_pipe_writer), - )?; + let mut child = spawn(cmd, program, cargo_output)?; let mut stdout = vec![]; child @@ -239,7 +307,7 @@ pub(crate) fn run_output( pub(crate) fn spawn( cmd: &mut Command, program: &str, - pipe_writer: Option, + cargo_output: &CargoOutput, ) -> Result { struct ResetStderr<'cmd>(&'cmd mut Command); @@ -254,10 +322,7 @@ pub(crate) fn spawn( println!("running: {:?}", cmd); let cmd = ResetStderr(cmd); - let child = cmd - .0 - .stderr(pipe_writer.map_or_else(Stdio::null, Stdio::from)) - .spawn(); + let child = cmd.0.stderr(cargo_output.stdio_for_warnings()).spawn(); match child { Ok(child) => Ok(child), Err(ref e) if e.kind() == io::ErrorKind::NotFound => { @@ -307,9 +372,14 @@ pub(crate) fn try_wait_on_child( program: &str, child: &mut Child, stdout: &mut dyn io::Write, + stderr_forwarder: &mut StderrForwarder, ) -> Result, Error> { + stderr_forwarder.forward_available(); + match child.try_wait() { Ok(Some(status)) => { + stderr_forwarder.forward_all(); + let _ = writeln!(stdout, "{}", status); if status.success() { @@ -325,12 +395,15 @@ pub(crate) fn try_wait_on_child( } } Ok(None) => Ok(None), - Err(e) => Err(Error::new( - ErrorKind::ToolExecError, - format!( - "Failed to wait on spawned child process, command {:?} with args {:?}: {}.", - cmd, program, e - ), - )), + Err(e) => { + stderr_forwarder.forward_all(); + Err(Error::new( + ErrorKind::ToolExecError, + format!( + "Failed to wait on spawned child process, command {:?} with args {:?}: {}.", + cmd, program, e + ), + )) + } } } diff --git a/src/lib.rs b/src/lib.rs index c3237dd4b..a76d5321a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,7 +66,6 @@ use std::process::Child; use std::process::Command; use std::sync::{Arc, Mutex}; -mod os_pipe; #[cfg(feature = "parallel")] mod parallel; mod windows; @@ -1042,10 +1041,9 @@ impl Build { let dst = self.get_out_dir()?; let objects = objects_from_files(&self.files, &dst)?; - let print = self.cargo_output.print_thread()?; - self.compile_objects(&objects, print.as_ref())?; - self.assemble(lib_name, &dst.join(gnu_lib_name), &objects, print.as_ref())?; + self.compile_objects(&objects)?; + self.assemble(lib_name, &dst.join(gnu_lib_name), &objects)?; if self.get_target()?.contains("msvc") { let compiler = self.get_base_compiler()?; @@ -1207,15 +1205,14 @@ impl Build { pub fn try_compile_intermediates(&self) -> Result, Error> { let dst = self.get_out_dir()?; let objects = objects_from_files(&self.files, &dst)?; - let print = self.cargo_output.print_thread()?; - self.compile_objects(&objects, print.as_ref())?; + self.compile_objects(&objects)?; Ok(objects.into_iter().map(|v| v.dst).collect()) } #[cfg(feature = "parallel")] - fn compile_objects(&self, objs: &[Object], print: Option<&PrintThread>) -> Result<(), Error> { + fn compile_objects(&self, objs: &[Object]) -> Result<(), Error> { use std::cell::Cell; use parallel::async_executor::{block_on, YieldOnce}; @@ -1223,7 +1220,7 @@ impl Build { if objs.len() <= 1 { for obj in objs { let (mut cmd, name) = self.create_compile_object_cmd(obj)?; - run(&mut cmd, &name, print)?; + run(&mut cmd, &name, &self.cargo_output)?; } return Ok(()); @@ -1280,7 +1277,13 @@ impl Build { parallel::retain_unordered_mut( &mut pendings, |(cmd, program, child, _token)| { - match try_wait_on_child(cmd, program, &mut child.0, &mut stdout) { + match try_wait_on_child( + cmd, + program, + &mut child.0, + &mut stdout, + &mut child.1, + ) { Ok(Some(())) => { // Task done, remove the entry has_made_progress.set(true); @@ -1328,11 +1331,12 @@ impl Build { YieldOnce::default().await } }; - let pipe_writer = print.map(PrintThread::clone_pipe_writer).transpose()?; - let child = spawn(&mut cmd, &program, pipe_writer)?; + let mut child = spawn(&mut cmd, &program, &self.cargo_output)?; + let mut stderr_forwarder = StderrForwarder::new(&mut child); + stderr_forwarder.set_non_blocking()?; cell_update(&pendings, |mut pendings| { - pendings.push((cmd, program, KillOnDrop(child), token)); + pendings.push((cmd, program, KillOnDrop(child, stderr_forwarder), token)); pendings }); @@ -1345,7 +1349,7 @@ impl Build { return block_on(wait_future, spawn_future, &has_made_progress); - struct KillOnDrop(Child); + struct KillOnDrop(Child, StderrForwarder); impl Drop for KillOnDrop { fn drop(&mut self) { @@ -1367,10 +1371,10 @@ impl Build { } #[cfg(not(feature = "parallel"))] - fn compile_objects(&self, objs: &[Object], print: Option<&PrintThread>) -> Result<(), Error> { + fn compile_objects(&self, objs: &[Object]) -> Result<(), Error> { for obj in objs { let (mut cmd, name) = self.create_compile_object_cmd(obj)?; - run(&mut cmd, &name, print)?; + run(&mut cmd, &name, &self.cargo_output)?; } Ok(()) @@ -2145,13 +2149,7 @@ impl Build { Ok((cmd, tool.to_string())) } - fn assemble( - &self, - lib_name: &str, - dst: &Path, - objs: &[Object], - print: Option<&PrintThread>, - ) -> Result<(), Error> { + fn assemble(&self, lib_name: &str, dst: &Path, objs: &[Object]) -> Result<(), Error> { // Delete the destination if it exists as we want to // create on the first iteration instead of appending. let _ = fs::remove_file(dst); @@ -2165,7 +2163,7 @@ impl Build { .chain(self.objects.iter().map(std::ops::Deref::deref)) .collect(); for chunk in objs.chunks(100) { - self.assemble_progressive(dst, chunk, print)?; + self.assemble_progressive(dst, chunk)?; } if self.cuda && self.cuda_file_count() > 0 { @@ -2176,8 +2174,8 @@ impl Build { let dlink = out_dir.join(lib_name.to_owned() + "_dlink.o"); let mut nvcc = self.get_compiler().to_command(); nvcc.arg("--device-link").arg("-o").arg(&dlink).arg(dst); - run(&mut nvcc, "nvcc", print)?; - self.assemble_progressive(dst, &[dlink.as_path()], print)?; + run(&mut nvcc, "nvcc", &self.cargo_output)?; + self.assemble_progressive(dst, &[dlink.as_path()])?; } let target = self.get_target()?; @@ -2209,18 +2207,13 @@ impl Build { // NOTE: We add `s` even if flags were passed using $ARFLAGS/ar_flag, because `s` // here represents a _mode_, not an arbitrary flag. Further discussion of this choice // can be seen in https://github.com/rust-lang/cc-rs/pull/763. - run(ar.arg("s").arg(dst), &cmd, print)?; + run(ar.arg("s").arg(dst), &cmd, &self.cargo_output)?; } Ok(()) } - fn assemble_progressive( - &self, - dst: &Path, - objs: &[&Path], - print: Option<&PrintThread>, - ) -> Result<(), Error> { + fn assemble_progressive(&self, dst: &Path, objs: &[&Path]) -> Result<(), Error> { let target = self.get_target()?; if target.contains("msvc") { @@ -2241,7 +2234,7 @@ impl Build { cmd.arg(dst); } cmd.args(objs); - run(&mut cmd, &program, print)?; + run(&mut cmd, &program, &self.cargo_output)?; } else { let (mut ar, cmd, _any_flags) = self.get_ar()?; @@ -2272,7 +2265,7 @@ impl Build { // NOTE: We add cq here regardless of whether $ARFLAGS/ar_flag have been used because // it dictates the _mode_ ar runs in, which the setter of $ARFLAGS/ar_flag can't // dictate. See https://github.com/rust-lang/cc-rs/pull/763 for further discussion. - run(ar.arg("cq").arg(dst).args(objs), &cmd, print)?; + run(ar.arg("cq").arg(dst).args(objs), &cmd, &self.cargo_output)?; } Ok(()) diff --git a/src/os_pipe/mod.rs b/src/os_pipe/mod.rs deleted file mode 100644 index 0a6ad791f..000000000 --- a/src/os_pipe/mod.rs +++ /dev/null @@ -1,28 +0,0 @@ -//! Adapted from: -//! - -//! - -//! - -//! - -use std::fs::File; - -/// Open a new pipe and return a pair of [`File`] objects for the reader and writer. -/// -/// This corresponds to the `pipe2` library call on Posix and the -/// `CreatePipe` library call on Windows (though these implementation -/// details might change). These pipes are non-inheritable, so new child -/// processes won't receive a copy of them unless they're explicitly -/// passed as stdin/stdout/stderr. -pub fn pipe() -> std::io::Result<(File, File)> { - sys::pipe() -} - -#[cfg(unix)] -#[path = "unix.rs"] -mod sys; - -#[cfg(windows)] -#[path = "windows.rs"] -mod sys; - -#[cfg(all(not(unix), not(windows)))] -compile_error!("Only unix and windows support os_pipe!"); diff --git a/src/os_pipe/unix.rs b/src/os_pipe/unix.rs deleted file mode 100644 index ec4e547f0..000000000 --- a/src/os_pipe/unix.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::{ - fs::File, - io, - os::{raw::c_int, unix::io::FromRawFd}, -}; - -pub(super) fn pipe() -> io::Result<(File, File)> { - let mut fds = [0; 2]; - - // The only known way right now to create atomically set the CLOEXEC flag is - // to use the `pipe2` syscall. This was added to Linux in 2.6.27, glibc 2.9 - // and musl 0.9.3, and some other targets also have it. - #[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "linux", - target_os = "netbsd", - target_os = "openbsd", - target_os = "redox" - ))] - { - unsafe { - cvt(libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC))?; - } - } - - #[cfg(not(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "linux", - target_os = "netbsd", - target_os = "openbsd", - target_os = "redox" - )))] - { - unsafe { - cvt(libc::pipe(fds.as_mut_ptr()))?; - } - - cloexec::set_cloexec(fds[0])?; - cloexec::set_cloexec(fds[1])?; - } - - unsafe { Ok((File::from_raw_fd(fds[0]), File::from_raw_fd(fds[1]))) } -} - -fn cvt(t: c_int) -> io::Result { - if t == -1 { - Err(io::Error::last_os_error()) - } else { - Ok(t) - } -} - -#[cfg(not(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "linux", - target_os = "netbsd", - target_os = "openbsd", - target_os = "redox" -)))] -mod cloexec { - use super::{c_int, cvt, io}; - - #[cfg(not(any( - target_env = "newlib", - target_os = "solaris", - target_os = "illumos", - target_os = "emscripten", - target_os = "fuchsia", - target_os = "l4re", - target_os = "linux", - target_os = "haiku", - target_os = "redox", - target_os = "vxworks", - target_os = "nto", - )))] - pub(super) fn set_cloexec(fd: c_int) -> io::Result<()> { - unsafe { - cvt(libc::ioctl(fd, libc::FIOCLEX))?; - } - - Ok(()) - } - - #[cfg(any( - all( - target_env = "newlib", - not(any(target_os = "espidf", target_os = "horizon")) - ), - target_os = "solaris", - target_os = "illumos", - target_os = "emscripten", - target_os = "fuchsia", - target_os = "l4re", - target_os = "linux", - target_os = "haiku", - target_os = "redox", - target_os = "vxworks", - target_os = "nto", - ))] - pub(super) fn set_cloexec(fd: c_int) -> io::Result<()> { - unsafe { - let previous = cvt(libc::fcntl(fd, libc::F_GETFD))?; - let new = previous | libc::FD_CLOEXEC; - if new != previous { - cvt(libc::fcntl(fd, libc::F_SETFD, new))?; - } - } - - Ok(()) - } - - // FD_CLOEXEC is not supported in ESP-IDF and Horizon OS but there's no need to, - // because neither supports spawning processes. - #[cfg(any(target_os = "espidf", target_os = "horizon"))] - pub(super) fn set_cloexec(_fd: c_int) -> io::Result<()> { - Ok(()) - } -} diff --git a/src/os_pipe/windows.rs b/src/os_pipe/windows.rs deleted file mode 100644 index de0106a2a..000000000 --- a/src/os_pipe/windows.rs +++ /dev/null @@ -1,24 +0,0 @@ -use crate::windows::windows_sys::{CreatePipe, INVALID_HANDLE_VALUE}; -use std::{fs::File, io, os::windows::prelude::*, ptr}; - -/// NOTE: These pipes do not support IOCP. -/// -/// If IOCP is needed, then you might want to emulate -/// anonymous pipes with `CreateNamedPipe`, as Rust's stdlib does. -pub(super) fn pipe() -> io::Result<(File, File)> { - let mut read_pipe = INVALID_HANDLE_VALUE; - let mut write_pipe = INVALID_HANDLE_VALUE; - - let ret = unsafe { CreatePipe(&mut read_pipe, &mut write_pipe, ptr::null_mut(), 0) }; - - if ret == 0 { - Err(io::Error::last_os_error()) - } else { - unsafe { - Ok(( - File::from_raw_handle(read_pipe as RawHandle), - File::from_raw_handle(write_pipe as RawHandle), - )) - } - } -} diff --git a/src/parallel/mod.rs b/src/parallel/mod.rs index 1c6d597ea..d69146dc5 100644 --- a/src/parallel/mod.rs +++ b/src/parallel/mod.rs @@ -1,5 +1,6 @@ pub(crate) mod async_executor; pub(crate) mod job_token; +pub(crate) mod stderr; /// Remove all element in `vec` which `f(element)` returns `false`. /// diff --git a/src/parallel/stderr.rs b/src/parallel/stderr.rs new file mode 100644 index 000000000..1ba712877 --- /dev/null +++ b/src/parallel/stderr.rs @@ -0,0 +1,75 @@ +/// Helpers functions for [ChildStderr]. +use std::{convert::TryInto, process::ChildStderr}; + +use crate::{Error, ErrorKind}; + +#[cfg(all(not(unix), not(windows)))] +compile_error!("Only unix and windows support non-blocking pipes! For other OSes, disable the parallel feature."); + +#[allow(unused_variables)] +pub fn set_non_blocking(stderr: &mut ChildStderr) -> Result<(), Error> { + // On Unix, switch the pipe to non-blocking mode. + // On Windows, we have a different way to be non-blocking. + #[cfg(unix)] + { + use std::os::unix::io::AsRawFd; + let fd = stderr.as_raw_fd(); + debug_assert_eq!( + unsafe { libc::fcntl(fd, libc::F_GETFL, 0) }, + 0, + "stderr should have no flags set" + ); + + if unsafe { libc::fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK) } != 0 { + return Err(Error::new( + ErrorKind::IOError, + format!( + "Failed to set flags for child stderr: {}", + std::io::Error::last_os_error() + ), + )); + } + } + + Ok(()) +} + +pub fn bytes_available(stderr: &mut ChildStderr) -> Result { + let mut bytes_available = 0; + #[cfg(windows)] + { + use crate::windows::windows_sys::PeekNamedPipe; + use std::os::windows::io::AsRawHandle; + use std::ptr::null_mut; + if unsafe { + PeekNamedPipe( + stderr.as_raw_handle(), + null_mut(), + 0, + null_mut(), + &mut bytes_available, + null_mut(), + ) + } == 0 + { + return Err(Error::new( + ErrorKind::IOError, + format!( + "PeekNamedPipe failed with {}", + std::io::Error::last_os_error() + ), + )); + } + } + #[cfg(unix)] + { + use std::os::unix::io::AsRawFd; + if unsafe { libc::ioctl(stderr.as_raw_fd(), libc::FIONREAD, &mut bytes_available) } != 0 { + return Err(Error::new( + ErrorKind::IOError, + format!("ioctl failed with {}", std::io::Error::last_os_error()), + )); + } + } + Ok(bytes_available.try_into().unwrap()) +} diff --git a/src/windows/windows_sys.rs b/src/windows/windows_sys.rs index 20a256076..88ca76730 100644 --- a/src/windows/windows_sys.rs +++ b/src/windows/windows_sys.rs @@ -55,16 +55,18 @@ extern "system" { } #[link(name = "kernel32")] extern "system" { - pub fn CreatePipe( - hreadpipe: *mut HANDLE, - hwritepipe: *mut HANDLE, - lppipeattributes: *const SECURITY_ATTRIBUTES, - nsize: u32, - ) -> BOOL; + pub fn OpenSemaphoreA(dwdesiredaccess: u32, binherithandle: BOOL, lpname: PCSTR) -> HANDLE; } #[link(name = "kernel32")] extern "system" { - pub fn OpenSemaphoreA(dwdesiredaccess: u32, binherithandle: BOOL, lpname: PCSTR) -> HANDLE; + pub fn PeekNamedPipe( + hnamedpipe: HANDLE, + lpbuffer: *mut ::core::ffi::c_void, + nbuffersize: u32, + lpbytesread: *mut u32, + lptotalbytesavail: *mut u32, + lpbytesleftthismessage: *mut u32, + ) -> BOOL; } #[link(name = "kernel32")] extern "system" { @@ -148,7 +150,6 @@ pub type HANDLE = *mut ::core::ffi::c_void; pub type HKEY = *mut ::core::ffi::c_void; pub const HKEY_LOCAL_MACHINE: HKEY = invalid_mut(-2147483646i32 as _); pub type HRESULT = i32; -pub const INVALID_HANDLE_VALUE: HANDLE = invalid_mut(-1i32 as _); pub type IUnknown = *mut ::core::ffi::c_void; pub const KEY_READ: REG_SAM_FLAGS = 131097u32; pub const KEY_WOW64_32KEY: REG_SAM_FLAGS = 512u32; @@ -184,18 +185,6 @@ impl ::core::clone::Clone for SAFEARRAYBOUND { *self } } -#[repr(C)] -pub struct SECURITY_ATTRIBUTES { - pub nLength: u32, - pub lpSecurityDescriptor: *mut ::core::ffi::c_void, - pub bInheritHandle: BOOL, -} -impl ::core::marker::Copy for SECURITY_ATTRIBUTES {} -impl ::core::clone::Clone for SECURITY_ATTRIBUTES { - fn clone(&self) -> Self { - *self - } -} pub const SEMAPHORE_MODIFY_STATE: SYNCHRONIZATION_ACCESS_RIGHTS = 2u32; pub type SYNCHRONIZATION_ACCESS_RIGHTS = u32; pub const S_FALSE: HRESULT = 1i32;