Skip to content

Commit

Permalink
Add LioCb::listio_resubmit
Browse files Browse the repository at this point in the history
It helps deal with errors like EAGAIN, which can result in a subset of
an LioCb's operations being queued.  The test is only enabled on
FreeBSD, because it requires intimate knowledge of AIO system limits.
  • Loading branch information
asomers committed Apr 7, 2018
1 parent 4729935 commit 2942d73
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 15 deletions.
7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ rand = "0.4"
tempdir = "0.3"
tempfile = "2"

[target.'cfg(target_os = "freebsd")'.dev-dependencies]
sysctl = "0.1"

[[test]]
name = "test"
path = "test/test.rs"
Expand All @@ -37,6 +40,10 @@ path = "test/test.rs"
name = "test-aio-drop"
path = "test/sys/test_aio_drop.rs"

[[test]]
name = "test-lio-listio-resubmit"
path = "test/sys/test_lio_listio_resubmit.rs"

[[test]]
name = "test-mount"
path = "test/test_mount.rs"
Expand Down
160 changes: 147 additions & 13 deletions src/sys/aio.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// vim: tw=80
//! POSIX Asynchronous I/O
//!
//! The POSIX AIO interface is used for asynchronous I/O on files and disk-like
Expand Down Expand Up @@ -31,8 +32,8 @@ use std::fmt::Debug;
use std::marker::PhantomData;
use std::mem;
use std::ptr::{null, null_mut};
use std::thread;
use sys::signal::*;
use std::thread;
use sys::time::TimeSpec;

libc_enum! {
Expand Down Expand Up @@ -111,15 +112,15 @@ impl<'a> Debug for Buffer<'a> {
// not today.
// https://github.com/rust-lang/rust/issues/1563
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
&Buffer::None => write!(fmt, "None"),
&Buffer::Phantom(p) => p.fmt(fmt),
&Buffer::BoxedSlice(ref bs) => {
match *self {
Buffer::None => write!(fmt, "None"),
Buffer::Phantom(p) => p.fmt(fmt),
Buffer::BoxedSlice(ref bs) => {
let borrowed : &Borrow<[u8]> = bs.borrow();
write!(fmt, "BoxedSlice({:?})",
borrowed as *const Borrow<[u8]>)
},
&Buffer::BoxedMutSlice(ref bms) => {
Buffer::BoxedMutSlice(ref bms) => {
let borrowed : &BorrowMut<[u8]> = bms.borrow();
write!(fmt, "BoxedMutSlice({:?})",
borrowed as *const BorrowMut<[u8]>)
Expand Down Expand Up @@ -265,7 +266,7 @@ impl<'a> AioCb<'a> {
/// operations, but only if the borrow checker can guarantee that the slice
/// will outlive the `AioCb`. That will usually be the case if the `AioCb`
/// is stack-allocated. If the borrow checker gives you trouble, try using
/// [`from_bytes_mut`](#method.from_bytes_mut) instead.
/// [`from_boxed_mut_slice`](#method.from_boxed_mut_slice) instead.
///
/// # Parameters
///
Expand Down Expand Up @@ -1059,7 +1060,11 @@ pub struct LioCb<'a> {
/// It must live for as long as any of the operations are still being
/// processesed, because the aio subsystem uses its address as a unique
/// identifier.
list: Vec<*mut libc::aiocb>
list: Vec<*mut libc::aiocb>,

/// A partial set of results. This field will get populated by
/// `listio_resubmit` when an `LioCb` is resubmitted after an error
results: Vec<Option<Result<isize>>>
}

#[cfg(not(any(target_os = "ios", target_os = "macos")))]
Expand All @@ -1068,7 +1073,8 @@ impl<'a> LioCb<'a> {
pub fn with_capacity(capacity: usize) -> LioCb<'a> {
LioCb {
aiocbs: Vec::with_capacity(capacity),
list: Vec::with_capacity(capacity)
list: Vec::with_capacity(capacity),
results: Vec::with_capacity(capacity)
}
}

Expand All @@ -1087,8 +1093,8 @@ impl<'a> LioCb<'a> {
/// # Examples
///
/// Use `listio` to submit an aio operation and wait for its completion. In
/// this case, there is no need to use `aio_suspend` to wait or
/// `AioCb#error` to poll.
/// this case, there is no need to use [`aio_suspend`] to wait or
/// [`AioCb::error`] to poll.
///
/// ```
/// # extern crate tempfile;
Expand All @@ -1109,19 +1115,23 @@ impl<'a> LioCb<'a> {
/// LioOpcode::LIO_WRITE));
/// liocb.listio(LioMode::LIO_WAIT,
/// SigevNotify::SigevNone).unwrap();
/// assert_eq!(liocb.aiocbs[0].aio_return().unwrap() as usize, WBUF.len());
/// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len());
/// # }
/// ```
///
/// # References
///
/// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html)
///
/// [`aio_suspend`]: fn.aio_suspend.html
/// [`AioCb::error`]: struct.AioCb.html#method.error
pub fn listio(&mut self, mode: LioMode,
sigev_notify: SigevNotify) -> Result<()> {
let sigev = SigEvent::new(sigev_notify);
let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
self.list.clear();
for a in self.aiocbs.iter_mut() {
for a in &mut self.aiocbs {
a.in_progress = true;
self.list.push(a as *mut AioCb<'a>
as *mut libc::aiocb);
}
Expand All @@ -1130,6 +1140,129 @@ impl<'a> LioCb<'a> {
libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp)
}).map(|_| ())
}

/// Resubmits any incomplete operations with [`lio_listio`].
///
/// Sometimes, due to system resource limitations, an `lio_listio` call will
/// return `EIO`, or `EAGAIN`. Or, if a signal is received, it may return
/// `EINTR`. In any of these cases, only a subset of its constituent
/// operations will actually have been initiated. `listio_resubmit` will
/// resubmit any operations that are still uninitiated.
///
/// After calling `listio_resubmit`, results should be collected by
/// [`LioCb::aio_return`].
///
/// # Examples
/// ```no_run
/// # extern crate tempfile;
/// # extern crate nix;
/// # use nix::Error;
/// # use nix::errno::Errno;
/// # use nix::sys::aio::*;
/// # use nix::sys::signal::SigevNotify;
/// # use std::os::unix::io::AsRawFd;
/// # use std::{thread, time};
/// # use tempfile::tempfile;
/// # fn main() {
/// const WBUF: &[u8] = b"abcdef123456";
/// let mut f = tempfile().unwrap();
/// let mut liocb = LioCb::with_capacity(1);
/// liocb.aiocbs.push(AioCb::from_slice( f.as_raw_fd(),
/// 2, //offset
/// WBUF,
/// 0, //priority
/// SigevNotify::SigevNone,
/// LioOpcode::LIO_WRITE));
/// let mut err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone);
/// while err == Err(Error::Sys(Errno::EIO)) ||
/// err == Err(Error::Sys(Errno::EAGAIN)) {
/// thread::sleep(time::Duration::from_millis(10));
/// err = liocb.listio_resubmit(LioMode::LIO_WAIT, SigevNotify::SigevNone);
/// }
/// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len());
/// # }
/// ```
///
/// # References
///
/// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html)
///
/// [`lio_listio`]: http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
/// [`LioCb::aio_return`]: struct.LioCb.html#method.aio_return
// Note: the addresses of any EINPROGRESS or EOK aiocbs _must_ not be
// changed by this method, because the kernel relies on their addresses
// being stable.
// Note: aiocbs that are Ok(()) must be finalized by aio_return, or else the
// sigev_notify will immediately refire.
pub fn listio_resubmit(&mut self, mode:LioMode,
sigev_notify: SigevNotify) -> Result<()> {
let sigev = SigEvent::new(sigev_notify);
let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
self.list.clear();

while self.results.len() < self.aiocbs.len() {
self.results.push(None);
}

for (i, a) in self.aiocbs.iter_mut().enumerate() {
if self.results[i].is_some() {
// Already collected final status for this operation
continue;
}
match a.error() {
Ok(()) => {
// aiocb is complete; collect its status and don't resubmit
self.results[i] = Some(a.aio_return());
},
Err(Error::Sys(Errno::EAGAIN)) => {
self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb);
},
Err(Error::Sys(Errno::EINPROGRESS)) => {
// aiocb is was successfully queued; no need to do anything
()
},
Err(Error::Sys(Errno::EINVAL)) => panic!(
"AioCb was never submitted, or already finalized"),
_ => unreachable!()
}
}
let p = self.list.as_ptr();
Errno::result(unsafe {
libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp)
}).map(|_| ())
}

/// Collect final status for an individual `AioCb` submitted as part of an
/// `LioCb`.
///
/// This is just like [`AioCb::aio_return`], except it takes into account
/// operations that were restarted by [`LioCb::listio_resubmit`]
///
/// [`AioCb::aio_return`]: struct.AioCb.html#method.aio_return
/// [`LioCb::listio_resubmit`]: #method.listio_resubmit
pub fn aio_return(&mut self, i: usize) -> Result<isize> {
if i >= self.results.len() || self.results[i].is_none() {
self.aiocbs[i].aio_return()
} else {
self.results[i].unwrap()
}
}

/// Retrieve error status of an individual `AioCb` submitted as part of an
/// `LioCb`.
///
/// This is just like [`AioCb::error`], except it takes into account
/// operations that were restarted by [`LioCb::listio_resubmit`]
///
/// [`AioCb::error`]: struct.AioCb.html#method.error
/// [`LioCb::listio_resubmit`]: #method.listio_resubmit
pub fn error(&mut self, i: usize) -> Result<()> {
if i >= self.results.len() || self.results[i].is_none() {
self.aiocbs[i].error()
} else {
Ok(())
}
}
}

#[cfg(not(any(target_os = "ios", target_os = "macos")))]
Expand All @@ -1146,6 +1279,7 @@ impl<'a> From<Vec<AioCb<'a>>> for LioCb<'a> {
fn from(src: Vec<AioCb<'a>>) -> LioCb<'a> {
LioCb {
list: Vec::with_capacity(src.capacity()),
results: Vec::with_capacity(src.capacity()),
aiocbs: src,
}
}
Expand Down
4 changes: 2 additions & 2 deletions test/sys/test_aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,8 +513,8 @@ fn test_liocb_listio_wait() {
let err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone);
err.expect("lio_listio");

assert!(liocb.aiocbs[0].aio_return().unwrap() as usize == WBUF.len());
assert!(liocb.aiocbs[1].aio_return().unwrap() as usize == rlen);
assert!(liocb.aio_return(0).unwrap() as usize == WBUF.len());
assert!(liocb.aio_return(1).unwrap() as usize == rlen);
}
assert!(rbuf.deref().deref() == b"3456");

Expand Down
108 changes: 108 additions & 0 deletions test/sys/test_lio_listio_resubmit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// vim: tw=80
extern crate nix;
#[cfg(target_os = "freebsd")]
extern crate sysctl;
extern crate tempfile;

use nix::Error;
use nix::errno::*;
use nix::libc::off_t;
use nix::sys::aio::*;
use nix::sys::signal::SigevNotify;
use nix::unistd::{SysconfVar, sysconf};
use std::os::unix::io::AsRawFd;
use std::{thread, time};
use sysctl::CtlValue;
use tempfile::tempfile;

const BYTES_PER_OP: usize = 512;

/// Attempt to collect final status for all of `liocb`'s operations, freeing
/// system resources
fn finish_liocb(liocb: &mut LioCb) {
for j in 0..liocb.aiocbs.len() {
loop {
let e = liocb.error(j);
match e {
Ok(()) => break,
Err(Error::Sys(Errno::EINPROGRESS)) =>
thread::sleep(time::Duration::from_millis(10)),
Err(x) => panic!("aio_error({:?})", x)
}
}
assert_eq!(liocb.aio_return(j).unwrap(), BYTES_PER_OP as isize);
}
}

// Deliberately exceed system resource limits, causing lio_listio to return EIO.
// This test must run in its own process since it deliberately uses all AIO
// resources. ATM it is only enabled on FreeBSD, because I don't know how to
// check system AIO limits on other operating systems.
#[test]
#[cfg(target_os = "freebsd")]
fn test_lio_listio_resubmit() {
let mut resubmit_count = 0;

// Lookup system resource limits
let alm = sysconf(SysconfVar::AIO_LISTIO_MAX)
.expect("sysconf").unwrap() as usize;
let maqpp = if let CtlValue::Int(x) = sysctl::value(
"vfs.aio.max_aio_queue_per_proc").unwrap(){
x as usize
} else {
panic!("unknown sysctl");
};

// Find lio_listio sizes that satisfy the AIO_LISTIO_MAX constraint and also
// result in a final lio_listio call that can only partially be queued
let target_ops = maqpp + alm / 2;
let num_listios = (target_ops + alm - 3) / (alm - 2);
let ops_per_listio = (target_ops + num_listios - 1) / num_listios;
assert!((num_listios - 1) * ops_per_listio < maqpp,
"the last lio_listio won't make any progress; fix the algorithm");
println!("Using {:?} LioCbs of {:?} operations apiece", num_listios,
ops_per_listio);

let f = tempfile().unwrap();
let buffer_set = (0..num_listios).map(|_| {
(0..ops_per_listio).map(|_| {
vec![0u8; BYTES_PER_OP]
}).collect::<Vec<_>>()
}).collect::<Vec<_>>();

let mut liocbs = (0..num_listios).map(|i| {
let mut liocb = LioCb::with_capacity(ops_per_listio);
for j in 0..ops_per_listio {
let offset = (BYTES_PER_OP * (i * ops_per_listio + j)) as off_t;
let wcb = AioCb::from_slice( f.as_raw_fd(),
offset,
&buffer_set[i][j][..],
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);
liocb.aiocbs.push(wcb);
}
let mut err = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone);
while err == Err(Error::Sys(Errno::EIO)) ||
err == Err(Error::Sys(Errno::EAGAIN)) ||
err == Err(Error::Sys(Errno::EINTR)) {
//
thread::sleep(time::Duration::from_millis(10));
resubmit_count += 1;
err = liocb.listio_resubmit(LioMode::LIO_NOWAIT,
SigevNotify::SigevNone);
}
liocb
}).collect::<Vec<_>>();

// Ensure that every AioCb completed
for liocb in liocbs.iter_mut() {
finish_liocb(liocb);
}

if resubmit_count > 0 {
println!("Resubmitted {:?} times, test passed", resubmit_count);
} else {
println!("Never resubmitted. Test ambiguous");
}
}

0 comments on commit 2942d73

Please sign in to comment.