Skip to content

Commit

Permalink
Binary serialization over pipes
Browse files Browse the repository at this point in the history
We use `UnixStream::pair()` a few times and reimplement binary ser/de
each time. Add an abstraction that does the serde (`BinSerDe`) and buffer
ceremony (`BinPipe`) for us.

Fixes #471.
  • Loading branch information
xy2i committed Aug 29, 2023
1 parent 9a7f38f commit 218a2f7
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 94 deletions.
99 changes: 99 additions & 0 deletions src/common/bin_serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//! Binary serialization, and an implementation over Unix pipes.
use std::{
io::{self, Read, Write},
marker::PhantomData,
os::unix::net::UnixStream,
};

/// Serialization/deserialization trait using a byte array as storage.
pub trait BinSerDe<const N: usize> {
fn serialize(&self) -> [u8; N];
fn deserialize(val: [u8; N]) -> Self;
}

/// A pipe abstracting over a [UnixStream] with easier
/// binary serialization, to help with the buffer sizes and ser/de steps.
/// Uses [UnixStream::pair()].
pub fn pair<const N1: usize, T1: BinSerDe<N1>, const N2: usize, T2: BinSerDe<N2>>(
) -> io::Result<(BinPipe<N1, T1, N2, T2>, BinPipe<N2, T2, N1, T1>)> {
let (a, b) = UnixStream::pair()?;
Ok((
BinPipe {
sock: a,
_t1: PhantomData::<T1>,
_t2: PhantomData::<T2>,
},
BinPipe {
sock: b,
_t1: PhantomData::<T2>,
_t2: PhantomData::<T1>,
},
))
}

/// A binary pipe that can send and recieve typed messages.
///
/// By default, the types of the [BinPipe::write()] and [BinPipe::read()]
/// messages are the same, but you can specify extra generic arguments to make
/// them differ. In this case, T1 is the type in [BinPipe::write()]
/// and T2 is returned from [BinPipe::read()].
pub struct BinPipe<const N1: usize, T1: BinSerDe<N1>, const N2: usize = N1, T2: BinSerDe<N2> = T1> {
pub sock: UnixStream,
_t1: PhantomData<T1>,
_t2: PhantomData<T2>,
}

impl<const N1: usize, const N2: usize, T1: BinSerDe<N1>, T2: BinSerDe<N2>> BinPipe<N1, T1, N2, T2> {
/// Write `T` to the pipe.
pub fn write(&mut self, val: &T1) -> io::Result<()> {
self.sock.write_all(&val.serialize())?;
Ok(())
}
/// Read `T` from the pipe.
pub fn read(&mut self) -> io::Result<T2> {
let mut buf = [0u8; N2];
self.sock.read_exact(buf.as_mut_slice())?;
Ok(T2::deserialize(buf))
}
}

impl BinSerDe<4> for i32 {
fn serialize(&self) -> [u8; 4] {
self.to_ne_bytes()
}
fn deserialize(val: [u8; 4]) -> Self {
i32::from_ne_bytes(val)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
pub fn single_type_send_recv() {
let (mut tx, mut rx) = pair().unwrap();
tx.write(&42i32).unwrap();
assert_eq!(rx.read().unwrap(), 42);
rx.write(&23i32).unwrap();
assert_eq!(tx.read().unwrap(), 23);
}

#[test]
pub fn different_send_recv_types() {
impl BinSerDe<1> for u8 {
fn serialize(&self) -> [u8; 1] {
self.to_ne_bytes()
}
fn deserialize(val: [u8; 1]) -> Self {
u8::from_ne_bytes(val)
}
}

let (mut tx, mut rx) = pair().unwrap();
tx.write(&42i32).unwrap();
assert_eq!(rx.read().unwrap(), 42);
rx.write(&23u8).unwrap();
assert_eq!(tx.read().unwrap(), 23);
}
}
1 change: 1 addition & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub use command::CommandAndArguments;
pub use context::Context;
pub use error::Error;

pub mod bin_serde;
pub mod command;
pub mod context;
pub mod error;
Expand Down
36 changes: 17 additions & 19 deletions src/exec/no_pty.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use std::{
ffi::c_int,
io::{self, Read, Write},
os::unix::{net::UnixStream, process::CommandExt},
process::Command,
};
use std::{ffi::c_int, io, os::unix::process::CommandExt, process::Command};

use super::{
event::PollEvent,
event::{EventRegistry, Process, StopReason},
io_util::was_interrupted,
terminate_process, ExitReason, HandleSigchld, ProcessOutput,
};
use crate::system::signal::{
consts::*, register_handlers, SignalHandler, SignalHandlerBehavior, SignalNumber, SignalSet,
SignalStream,
use crate::{
common::bin_serde::{pair, BinPipe},
system::signal::{
consts::*, register_handlers, SignalHandler, SignalHandlerBehavior, SignalNumber,
SignalSet, SignalStream,
},
};
use crate::{
exec::{handle_sigchld, opt_fmt, signal_fmt},
Expand Down Expand Up @@ -46,11 +44,11 @@ pub(super) fn exec_no_pty(sudo_pid: ProcessId, mut command: Command) -> io::Resu
// FIXME (ogsudo): Some extra config happens here if selinux is available.

// Use a pipe to get the IO error if `exec` fails.
let (mut errpipe_tx, errpipe_rx) = UnixStream::pair()?;
let (mut errpipe_tx, errpipe_rx) = pair()?;

Check warning on line 47 in src/exec/no_pty.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/no_pty.rs#L47

Added line #L47 was not covered by tests

// Don't close the error pipe as we need it to retrieve the error code if the command execution
// fails.
file_closer.except(&errpipe_tx);
file_closer.except(&errpipe_tx.sock);

Check warning on line 51 in src/exec/no_pty.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/no_pty.rs#L51

Added line #L51 was not covered by tests

let ForkResult::Parent(command_pid) = fork().map_err(|err| {
dev_warn!("unable to fork command process: {err}");
Expand All @@ -72,7 +70,7 @@ pub(super) fn exec_no_pty(sudo_pid: ProcessId, mut command: Command) -> io::Resu
// If `exec` returns, it means that executing the command failed. Send the error to the
// monitor using the pipe.
if let Some(error_code) = err.raw_os_error() {
errpipe_tx.write_all(&error_code.to_ne_bytes()).ok();
errpipe_tx.write(&error_code).ok();

Check warning on line 73 in src/exec/no_pty.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/no_pty.rs#L73

Added line #L73 was not covered by tests
}

return Ok(ProcessOutput::ChildExit);
Expand Down Expand Up @@ -108,7 +106,7 @@ struct ExecClosure {
command_pid: Option<ProcessId>,
sudo_pid: ProcessId,
parent_pgrp: ProcessId,
errpipe_rx: UnixStream,
errpipe_rx: BinPipe<4, i32>,
signal_stream: &'static SignalStream,
signal_handlers: [SignalHandler; ExecClosure::SIGNALS.len()],
}
Expand All @@ -122,10 +120,12 @@ impl ExecClosure {
fn new(
command_pid: ProcessId,
sudo_pid: ProcessId,
errpipe_rx: UnixStream,
errpipe_rx: BinPipe<4, i32>,

Check warning on line 123 in src/exec/no_pty.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/no_pty.rs#L123

Added line #L123 was not covered by tests
registry: &mut EventRegistry<Self>,
) -> io::Result<Self> {
registry.register_event(&errpipe_rx, PollEvent::Readable, |_| ExecEvent::ErrPipe);
registry.register_event(&errpipe_rx.sock, PollEvent::Readable, |_| {
ExecEvent::ErrPipe
});

Check warning on line 128 in src/exec/no_pty.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/no_pty.rs#L126-L128

Added lines #L126 - L128 were not covered by tests

let signal_stream = SignalStream::init()?;

Expand Down Expand Up @@ -287,13 +287,11 @@ impl Process for ExecClosure {
match event {
ExecEvent::Signal => self.on_signal(registry),
ExecEvent::ErrPipe => {
let mut buf = 0i32.to_ne_bytes();
match self.errpipe_rx.read_exact(&mut buf) {
match self.errpipe_rx.read() {

Check warning on line 290 in src/exec/no_pty.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/no_pty.rs#L290

Added line #L290 was not covered by tests
Err(err) if was_interrupted(&err) => { /* Retry later */ }
Err(err) => registry.set_break(err),
Ok(_) => {
Ok(error_code) => {

Check warning on line 293 in src/exec/no_pty.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/no_pty.rs#L293

Added line #L293 was not covered by tests
// Received error code from the command, forward it to the parent.
let error_code = i32::from_ne_bytes(buf);
registry.set_break(io::Error::from_raw_os_error(error_code));
}
}
Expand Down
Loading

0 comments on commit 218a2f7

Please sign in to comment.