Skip to content

Commit

Permalink
Binary serialization over pipes
Browse files Browse the repository at this point in the history
We use `UnixStream::pair()` a few times and reimplement binary ser/de
each time. Add an abstraction that does the serde steps (`DeSerialize`) and
buffer ceremony (`BinPipe`) for us.

Fixes #471.
  • Loading branch information
xy2i committed Sep 7, 2023
1 parent 6d40763 commit 0217b4d
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 81 deletions.
133 changes: 133 additions & 0 deletions src/common/bin_serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
//! Binary serialization, and an implementation over Unix pipes.
use sealed::DeSerializeBytes;
use std::{
io::{self, Read, Write},
marker::PhantomData,
os::{fd::AsRawFd, unix::net::UnixStream},
};

mod sealed {
pub trait DeSerializeBytes {
fn zero_init() -> Self;
fn as_mut_ref(&mut self) -> &mut [u8];
}

impl<const N: usize> DeSerializeBytes for [u8; N] {
fn zero_init() -> [u8; N] {
[0; N]
}
fn as_mut_ref(&mut self) -> &mut [u8] {
self.as_mut_slice()
}
}
}

/// Serialization/deserialization trait using a byte array as storage.
pub trait DeSerialize {
/// Usually `[u8; std::mem::size_of::<Self>()]`.
type Bytes: sealed::DeSerializeBytes;
fn serialize(&self) -> Self::Bytes;
fn deserialize(bytes: Self::Bytes) -> Self;
}

/// A binary pipe that can send and recieve typed messages.
///
/// By default, if only one generic is included,
/// the types of the [BinPipe::write()] and [BinPipe::read()] messages
/// are the same.
pub struct BinPipe<R: DeSerialize, W: DeSerialize = R> {
sock: UnixStream,
_read_marker: PhantomData<R>,
_write_marker: PhantomData<W>,
}

impl<R: DeSerialize, W: DeSerialize> BinPipe<R, W> {
/// A pipe abstracting over a [UnixStream] with easier
/// binary serialization, to help with the buffer sizes and ser/de steps.
/// Uses [UnixStream::pair()].
pub fn pair() -> io::Result<(BinPipe<R, W>, BinPipe<W, R>)> {
let (first, second) = UnixStream::pair()?;
Ok((
BinPipe {
sock: first,
_read_marker: PhantomData::<R>,
_write_marker: PhantomData::<W>,
},
// R and W are inverted here since the type of what's written in one
// pipe is read in the other, and vice versa.
BinPipe {
sock: second,
_read_marker: PhantomData::<W>,
_write_marker: PhantomData::<R>,
},
))
}

/// Read a `R` from the pipe.
pub fn read(&mut self) -> io::Result<R> {
let mut bytes = R::Bytes::zero_init();
self.sock.read_exact(bytes.as_mut_ref())?;
Ok(R::deserialize(bytes))
}

/// Write a `W` to the pipe.
pub fn write(&mut self, bytes: &W) -> io::Result<()> {
self.sock.write_all(bytes.serialize().as_mut_ref())?;
Ok(())
}

/// Calls [TcpStream::set_nonblocking] on the underlying socket.
pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {

Check warning on line 80 in src/common/bin_serde.rs

View workflow job for this annotation

GitHub Actions / build-and-test-msrv

method `set_nonblocking` is never used

Check warning on line 80 in src/common/bin_serde.rs

View workflow job for this annotation

GitHub Actions / build-and-test-msrv

method `set_nonblocking` is never used

Check warning on line 80 in src/common/bin_serde.rs

View workflow job for this annotation

GitHub Actions / build-and-test-minimal

method `set_nonblocking` is never used

Check warning on line 80 in src/common/bin_serde.rs

View workflow job for this annotation

GitHub Actions / build-and-test-minimal

method `set_nonblocking` is never used

Check warning on line 80 in src/common/bin_serde.rs

View workflow job for this annotation

GitHub Actions / build-and-test

method `set_nonblocking` is never used

Check warning on line 80 in src/common/bin_serde.rs

View workflow job for this annotation

GitHub Actions / build-and-test

method `set_nonblocking` is never used
self.sock.set_nonblocking(nonblocking)
}

Check warning on line 82 in src/common/bin_serde.rs

View check run for this annotation

Codecov / codecov/patch

src/common/bin_serde.rs#L80-L82

Added lines #L80 - L82 were not covered by tests
}

impl<R: DeSerialize, W: DeSerialize> AsRawFd for BinPipe<R, W> {
fn as_raw_fd(&self) -> std::os::fd::RawFd {
self.sock.as_raw_fd()
}

Check warning on line 88 in src/common/bin_serde.rs

View check run for this annotation

Codecov / codecov/patch

src/common/bin_serde.rs#L86-L88

Added lines #L86 - L88 were not covered by tests
}

impl DeSerialize for i32 {
type Bytes = [u8; std::mem::size_of::<Self>()];

fn serialize(&self) -> Self::Bytes {
self.to_ne_bytes()
}
fn deserialize(bytes: Self::Bytes) -> Self {
Self::from_ne_bytes(bytes)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
pub fn single_type() {
let (mut tx, mut rx) = BinPipe::pair().unwrap();
tx.write(&42i32).unwrap();
assert_eq!(rx.read().unwrap(), 42);
rx.write(&23i32).unwrap();
assert_eq!(tx.read().unwrap(), 23);
}

#[test]
pub fn different_types() {
impl DeSerialize for u8 {
type Bytes = [u8; std::mem::size_of::<Self>()];
fn serialize(&self) -> [u8; 1] {
self.to_ne_bytes()
}
fn deserialize(bytes: [u8; 1]) -> Self {
Self::from_ne_bytes(bytes)
}
}

let (mut tx, mut rx) = BinPipe::pair().unwrap();
tx.write(&42i32).unwrap();
assert_eq!(rx.read().unwrap(), 42);
rx.write(&23u8).unwrap();
assert_eq!(tx.read().unwrap(), 23);
}
}
1 change: 1 addition & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub use command::CommandAndArguments;
pub use context::Context;
pub use error::Error;

pub mod bin_serde;
pub mod command;
pub mod context;
pub mod error;
Expand Down
27 changes: 14 additions & 13 deletions src/exec/no_pty.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
ffi::c_int,
io::{self, Read, Write},
os::unix::{net::UnixStream, process::CommandExt},
io,
os::{fd::AsRawFd, unix::process::CommandExt},

Check warning on line 4 in src/exec/no_pty.rs

View workflow job for this annotation

GitHub Actions / docs

unused import: `fd::AsRawFd`

Check warning on line 4 in src/exec/no_pty.rs

View workflow job for this annotation

GitHub Actions / build-and-test-msrv

unused import: `fd::AsRawFd`

Check warning on line 4 in src/exec/no_pty.rs

View workflow job for this annotation

GitHub Actions / build-and-test-msrv

unused import: `fd::AsRawFd`

Check warning on line 4 in src/exec/no_pty.rs

View workflow job for this annotation

GitHub Actions / build-and-test-minimal

unused import: `fd::AsRawFd`

Check warning on line 4 in src/exec/no_pty.rs

View workflow job for this annotation

GitHub Actions / build-and-test-minimal

unused import: `fd::AsRawFd`

Check warning on line 4 in src/exec/no_pty.rs

View workflow job for this annotation

GitHub Actions / track-dependencies

unused import: `fd::AsRawFd`

Check warning on line 4 in src/exec/no_pty.rs

View workflow job for this annotation

GitHub Actions / build-and-test

unused import: `fd::AsRawFd`

Check warning on line 4 in src/exec/no_pty.rs

View workflow job for this annotation

GitHub Actions / build-and-test

unused import: `fd::AsRawFd`

Check warning on line 4 in src/exec/no_pty.rs

View workflow job for this annotation

GitHub Actions / miri

unused import: `fd::AsRawFd`

Check warning on line 4 in src/exec/no_pty.rs

View workflow job for this annotation

GitHub Actions / miri

unused import: `fd::AsRawFd`
process::Command,
};

Expand All @@ -11,9 +11,12 @@ use super::{
io_util::was_interrupted,
terminate_process, ExitReason, HandleSigchld, ProcessOutput,
};
use crate::system::signal::{
consts::*, register_handlers, SignalHandler, SignalHandlerBehavior, SignalNumber, SignalSet,
SignalStream,
use crate::{
common::bin_serde::BinPipe,
system::signal::{
consts::*, register_handlers, SignalHandler, SignalHandlerBehavior, SignalNumber,
SignalSet, SignalStream,
},
};
use crate::{
exec::{handle_sigchld, opt_fmt, signal_fmt},
Expand Down Expand Up @@ -46,7 +49,7 @@ pub(super) fn exec_no_pty(sudo_pid: ProcessId, mut command: Command) -> io::Resu
// FIXME (ogsudo): Some extra config happens here if selinux is available.

// Use a pipe to get the IO error if `exec` fails.
let (mut errpipe_tx, errpipe_rx) = UnixStream::pair()?;
let (mut errpipe_tx, errpipe_rx) = BinPipe::pair()?;

Check warning on line 52 in src/exec/no_pty.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/no_pty.rs#L52

Added line #L52 was not covered by tests

// Don't close the error pipe as we need it to retrieve the error code if the command execution
// fails.
Expand All @@ -72,7 +75,7 @@ pub(super) fn exec_no_pty(sudo_pid: ProcessId, mut command: Command) -> io::Resu
// If `exec` returns, it means that executing the command failed. Send the error to the
// monitor using the pipe.
if let Some(error_code) = err.raw_os_error() {
errpipe_tx.write_all(&error_code.to_ne_bytes()).ok();
errpipe_tx.write(&error_code).ok();

Check warning on line 78 in src/exec/no_pty.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/no_pty.rs#L78

Added line #L78 was not covered by tests
}

return Ok(ProcessOutput::ChildExit);
Expand Down Expand Up @@ -108,7 +111,7 @@ struct ExecClosure {
command_pid: Option<ProcessId>,
sudo_pid: ProcessId,
parent_pgrp: ProcessId,
errpipe_rx: UnixStream,
errpipe_rx: BinPipe<i32>,
signal_stream: &'static SignalStream,
signal_handlers: [SignalHandler; ExecClosure::SIGNALS.len()],
}
Expand All @@ -122,7 +125,7 @@ impl ExecClosure {
fn new(
command_pid: ProcessId,
sudo_pid: ProcessId,
errpipe_rx: UnixStream,
errpipe_rx: BinPipe<i32>,

Check warning on line 128 in src/exec/no_pty.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/no_pty.rs#L128

Added line #L128 was not covered by tests
registry: &mut EventRegistry<Self>,
) -> io::Result<Self> {
registry.register_event(&errpipe_rx, PollEvent::Readable, |_| ExecEvent::ErrPipe);
Expand Down Expand Up @@ -287,13 +290,11 @@ impl Process for ExecClosure {
match event {
ExecEvent::Signal => self.on_signal(registry),
ExecEvent::ErrPipe => {
let mut buf = 0i32.to_ne_bytes();
match self.errpipe_rx.read_exact(&mut buf) {
match self.errpipe_rx.read() {

Check warning on line 293 in src/exec/no_pty.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/no_pty.rs#L293

Added line #L293 was not covered by tests
Err(err) if was_interrupted(&err) => { /* Retry later */ }
Err(err) => registry.set_break(err),
Ok(_) => {
Ok(error_code) => {

Check warning on line 296 in src/exec/no_pty.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/no_pty.rs#L296

Added line #L296 was not covered by tests
// Received error code from the command, forward it to the parent.
let error_code = i32::from_ne_bytes(buf);
registry.set_break(io::Error::from_raw_os_error(error_code));
}
}
Expand Down
Loading

0 comments on commit 0217b4d

Please sign in to comment.