Skip to content

Commit

Permalink
Add LioCb::listio_resubmit
Browse files Browse the repository at this point in the history
It helps deal with errors like EAGAIN, which can result in a subset of
an LioCb's operations being queued.
  • Loading branch information
asomers committed Mar 22, 2018
1 parent a35baf2 commit c458f49
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 12 deletions.
136 changes: 126 additions & 10 deletions src/sys/aio.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// vim: tw=80
//! POSIX Asynchronous I/O
//!
//! The POSIX AIO interface is used for asynchronous I/O on files and disk-like
Expand Down Expand Up @@ -31,8 +32,8 @@ use std::fmt::Debug;
use std::marker::PhantomData;
use std::mem;
use std::ptr::{null, null_mut};
use std::thread;
use sys::signal::*;
use std::thread;
use sys::time::TimeSpec;

libc_enum! {
Expand Down Expand Up @@ -111,15 +112,15 @@ impl<'a> Debug for Buffer<'a> {
// not today.
// https://github.com/rust-lang/rust/issues/1563
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
&Buffer::None => write!(fmt, "None"),
&Buffer::Phantom(p) => p.fmt(fmt),
&Buffer::BoxedSlice(ref bs) => {
match *self {
Buffer::None => write!(fmt, "None"),
Buffer::Phantom(p) => p.fmt(fmt),
Buffer::BoxedSlice(ref bs) => {
let borrowed : &Borrow<[u8]> = bs.borrow();
write!(fmt, "BoxedSlice({:?})",
borrowed as *const Borrow<[u8]>)
},
&Buffer::BoxedMutSlice(ref bms) => {
Buffer::BoxedMutSlice(ref bms) => {
let borrowed : &BorrowMut<[u8]> = bms.borrow();
write!(fmt, "BoxedMutSlice({:?})",
borrowed as *const BorrowMut<[u8]>)
Expand Down Expand Up @@ -1059,7 +1060,11 @@ pub struct LioCb<'a> {
/// It must live for as long as any of the operations are still being
/// processesed, because the aio subsystem uses its address as a unique
/// identifier.
list: Vec<*mut libc::aiocb>
list: Vec<*mut libc::aiocb>,

/// A partial set of results. This field will get populated by
/// `listio_resubmit` when an `LioCb` is resubmitted after an error
results: Vec<Option<Result<isize>>>
}

#[cfg(not(any(target_os = "ios", target_os = "macos")))]
Expand All @@ -1068,7 +1073,8 @@ impl<'a> LioCb<'a> {
pub fn with_capacity(capacity: usize) -> LioCb<'a> {
LioCb {
aiocbs: Vec::with_capacity(capacity),
list: Vec::with_capacity(capacity)
list: Vec::with_capacity(capacity),
results: Vec::with_capacity(capacity)
}
}

Expand Down Expand Up @@ -1109,7 +1115,7 @@ impl<'a> LioCb<'a> {
/// LioOpcode::LIO_WRITE));
/// liocb.listio(LioMode::LIO_WAIT,
/// SigevNotify::SigevNone).unwrap();
/// assert_eq!(liocb.aiocbs[0].aio_return().unwrap() as usize, WBUF.len());
/// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len());
/// # }
/// ```
///
Expand All @@ -1121,7 +1127,8 @@ impl<'a> LioCb<'a> {
let sigev = SigEvent::new(sigev_notify);
let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
self.list.clear();
for a in self.aiocbs.iter_mut() {
for a in &mut self.aiocbs {
//for a in self.aiocbs.iter_mut() {
self.list.push(a as *mut AioCb<'a>
as *mut libc::aiocb);
}
Expand All @@ -1130,6 +1137,114 @@ impl<'a> LioCb<'a> {
libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp)
}).map(|_| ())
}

/// Resubmits any incomplete operations with [`lio_listio`].
///
/// Sometimes, due to system resource limitations, an `lio_listio` call will
/// return `EIO`, or `EAGAIN`. Or, if a signal is received, it may return
/// `EINTR`. In any of these cases, only a subset of its constituent
/// operations will actually have been initiated. `listio_incomplete` will
/// resubmit any operations that are still uninitiated.
///
/// After calling `listio_resubmit`, results should be collected by
/// [`LioCb::aio_return`].
///
/// # Examples
/// ```no_run
/// # extern crate tempfile;
/// # extern crate nix;
/// # use nix::Error;
/// # use nix::errno::Errno;
/// # use nix::sys::aio::*;
/// # use nix::sys::signal::SigevNotify;
/// # use std::os::unix::io::AsRawFd;
/// # use std::{thread, time};
/// # use tempfile::tempfile;
/// # fn main() {
/// const WBUF: &[u8] = b"abcdef123456";
/// let mut f = tempfile().unwrap();
/// let mut liocb = LioCb::with_capacity(1);
/// liocb.aiocbs.push(AioCb::from_slice( f.as_raw_fd(),
/// 2, //offset
/// WBUF,
/// 0, //priority
/// SigevNotify::SigevNone,
/// LioOpcode::LIO_WRITE));
/// let mut err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone);
/// while err == Err(Error::Sys(Errno::EIO)) ||
/// err == Err(Error::Sys(Errno::EAGAIN)) {
/// thread::sleep(time::Duration::from_millis(10));
/// err = liocb.listio_resubmit(LioMode::LIO_WAIT, SigevNotify::SigevNone);
/// }
/// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len());
/// # }
/// ```
///
/// # References
///
/// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html)
///
/// [`AioCb::aio_return`] (struct.AioCb.html#method.aio_return)
/// [`AioCb::error`] (struct.AioCb.html#method.error)
/// [`LioCb::aio_return`] #method.aio_return
// Note: the addresses of any EINPROGRESS or EOK aiocbs _must_ not be
// changed by this method, because the kernel relies on their addresses
// being stable.
// Note: aiocbs that are Ok(()) must be finalized by aio_return, or else the
// sigev_notify will immediately refire.
pub fn listio_resubmit(&mut self, mode:LioMode,
sigev_notify: SigevNotify) -> Result<()> {
let sigev = SigEvent::new(sigev_notify);
let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
self.list.clear();

while self.results.len() < self.aiocbs.len() {
self.results.push(None);
}

for (i, a) in self.aiocbs.iter_mut().enumerate() {
if self.results[i].is_some() {
// Already collected final status for this operation
continue;
}
match a.error() {
Ok(()) => {
// aiocb is complete; collect its status and don't resubmit
self.results[i] = Some(a.aio_return());
},
Err(Error::Sys(Errno::EAGAIN)) => {
self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb);
},
Err(Error::Sys(Errno::EINPROGRESS)) => {
// aiocb is was successfully queued; no need to do anything
()
},
Err(Error::Sys(Errno::EINVAL)) => panic!(
"AioCb was never submitted, or already finalized"),
_ => unreachable!()
}
}
let p = self.list.as_ptr();
Errno::result(unsafe {
libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp)
}).map(|_| ())
}

/// Collect final status for an individual `AioCb` submitted as part of an
/// `LioCb`.
///
/// This is just like [`AioCb::aio_return`], except it takes into account
/// operations that were restarted by [`LioCb::listio_resubmit`]
///
/// [`AioCb::aio_return`] (struct.AioCb.html#method.aio_return)
/// [`LioCb::listio_resubmit`] #method.listio_resubmit
pub fn aio_return(&mut self, i: usize) -> Result<isize> {
if i >= self.results.len() || self.results[i].is_none() {
self.aiocbs[i].aio_return()
} else {
self.results[i].unwrap()
}
}
}

#[cfg(not(any(target_os = "ios", target_os = "macos")))]
Expand All @@ -1146,6 +1261,7 @@ impl<'a> From<Vec<AioCb<'a>>> for LioCb<'a> {
fn from(src: Vec<AioCb<'a>>) -> LioCb<'a> {
LioCb {
list: Vec::with_capacity(src.capacity()),
results: Vec::with_capacity(src.capacity()),
aiocbs: src,
}
}
Expand Down
4 changes: 2 additions & 2 deletions test/sys/test_aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,8 +513,8 @@ fn test_liocb_listio_wait() {
let err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone);
err.expect("lio_listio");

assert!(liocb.aiocbs[0].aio_return().unwrap() as usize == WBUF.len());
assert!(liocb.aiocbs[1].aio_return().unwrap() as usize == rlen);
assert!(liocb.aio_return(0).unwrap() as usize == WBUF.len());
assert!(liocb.aio_return(1).unwrap() as usize == rlen);
}
assert!(rbuf.deref().deref() == b"3456");

Expand Down

0 comments on commit c458f49

Please sign in to comment.