From 022790e7830803b4a4f2193b78c144b34d01d324 Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Thu, 22 Mar 2018 09:44:54 -0600 Subject: [PATCH] Add LioCb::listio_resubmit It helps deal with errors like EAGAIN, which can result in a subset of an LioCb's operations being queued. --- src/sys/aio.rs | 136 +++++++++++++++++++++++++++++++++++++++---- test/sys/test_aio.rs | 4 +- 2 files changed, 128 insertions(+), 12 deletions(-) diff --git a/src/sys/aio.rs b/src/sys/aio.rs index 8a958c8411..7cb5d6abaa 100644 --- a/src/sys/aio.rs +++ b/src/sys/aio.rs @@ -1,3 +1,4 @@ +// vim: tw=80 //! POSIX Asynchronous I/O //! //! The POSIX AIO interface is used for asynchronous I/O on files and disk-like @@ -31,8 +32,8 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::mem; use std::ptr::{null, null_mut}; -use std::thread; use sys::signal::*; +use std::thread; use sys::time::TimeSpec; libc_enum! { @@ -111,15 +112,15 @@ impl<'a> Debug for Buffer<'a> { // not today. // https://github.com/rust-lang/rust/issues/1563 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match self { - &Buffer::None => write!(fmt, "None"), - &Buffer::Phantom(p) => p.fmt(fmt), - &Buffer::BoxedSlice(ref bs) => { + match *self { + Buffer::None => write!(fmt, "None"), + Buffer::Phantom(p) => p.fmt(fmt), + Buffer::BoxedSlice(ref bs) => { let borrowed : &Borrow<[u8]> = bs.borrow(); write!(fmt, "BoxedSlice({:?})", borrowed as *const Borrow<[u8]>) }, - &Buffer::BoxedMutSlice(ref bms) => { + Buffer::BoxedMutSlice(ref bms) => { let borrowed : &BorrowMut<[u8]> = bms.borrow(); write!(fmt, "BoxedMutSlice({:?})", borrowed as *const BorrowMut<[u8]>) @@ -1059,7 +1060,11 @@ pub struct LioCb<'a> { /// It must live for as long as any of the operations are still being /// processesed, because the aio subsystem uses its address as a unique /// identifier. - list: Vec<*mut libc::aiocb> + list: Vec<*mut libc::aiocb>, + + /// A partial set of results. This field will get populated by + /// `listio_resubmit` when an `LioCb` is resubmitted after an error + results: Vec>> } #[cfg(not(any(target_os = "ios", target_os = "macos")))] @@ -1068,7 +1073,8 @@ impl<'a> LioCb<'a> { pub fn with_capacity(capacity: usize) -> LioCb<'a> { LioCb { aiocbs: Vec::with_capacity(capacity), - list: Vec::with_capacity(capacity) + list: Vec::with_capacity(capacity), + results: Vec::with_capacity(capacity) } } @@ -1109,7 +1115,7 @@ impl<'a> LioCb<'a> { /// LioOpcode::LIO_WRITE)); /// liocb.listio(LioMode::LIO_WAIT, /// SigevNotify::SigevNone).unwrap(); - /// assert_eq!(liocb.aiocbs[0].aio_return().unwrap() as usize, WBUF.len()); + /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); /// # } /// ``` /// @@ -1121,7 +1127,8 @@ impl<'a> LioCb<'a> { let sigev = SigEvent::new(sigev_notify); let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; self.list.clear(); - for a in self.aiocbs.iter_mut() { + for a in &mut self.aiocbs { + a.in_progress = true; self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb); } @@ -1130,6 +1137,114 @@ impl<'a> LioCb<'a> { libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp) }).map(|_| ()) } + + /// Resubmits any incomplete operations with [`lio_listio`]. + /// + /// Sometimes, due to system resource limitations, an `lio_listio` call will + /// return `EIO`, or `EAGAIN`. Or, if a signal is received, it may return + /// `EINTR`. In any of these cases, only a subset of its constituent + /// operations will actually have been initiated. `listio_incomplete` will + /// resubmit any operations that are still uninitiated. + /// + /// After calling `listio_resubmit`, results should be collected by + /// [`LioCb::aio_return`]. + /// + /// # Examples + /// ```no_run + /// # extern crate tempfile; + /// # extern crate nix; + /// # use nix::Error; + /// # use nix::errno::Errno; + /// # use nix::sys::aio::*; + /// # use nix::sys::signal::SigevNotify; + /// # use std::os::unix::io::AsRawFd; + /// # use std::{thread, time}; + /// # use tempfile::tempfile; + /// # fn main() { + /// const WBUF: &[u8] = b"abcdef123456"; + /// let mut f = tempfile().unwrap(); + /// let mut liocb = LioCb::with_capacity(1); + /// liocb.aiocbs.push(AioCb::from_slice( f.as_raw_fd(), + /// 2, //offset + /// WBUF, + /// 0, //priority + /// SigevNotify::SigevNone, + /// LioOpcode::LIO_WRITE)); + /// let mut err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone); + /// while err == Err(Error::Sys(Errno::EIO)) || + /// err == Err(Error::Sys(Errno::EAGAIN)) { + /// thread::sleep(time::Duration::from_millis(10)); + /// err = liocb.listio_resubmit(LioMode::LIO_WAIT, SigevNotify::SigevNone); + /// } + /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); + /// # } + /// ``` + /// + /// # References + /// + /// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html) + /// + /// [`AioCb::aio_return`] (struct.AioCb.html#method.aio_return) + /// [`AioCb::error`] (struct.AioCb.html#method.error) + /// [`LioCb::aio_return`] #method.aio_return + // Note: the addresses of any EINPROGRESS or EOK aiocbs _must_ not be + // changed by this method, because the kernel relies on their addresses + // being stable. + // Note: aiocbs that are Ok(()) must be finalized by aio_return, or else the + // sigev_notify will immediately refire. + pub fn listio_resubmit(&mut self, mode:LioMode, + sigev_notify: SigevNotify) -> Result<()> { + let sigev = SigEvent::new(sigev_notify); + let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; + self.list.clear(); + + while self.results.len() < self.aiocbs.len() { + self.results.push(None); + } + + for (i, a) in self.aiocbs.iter_mut().enumerate() { + if self.results[i].is_some() { + // Already collected final status for this operation + continue; + } + match a.error() { + Ok(()) => { + // aiocb is complete; collect its status and don't resubmit + self.results[i] = Some(a.aio_return()); + }, + Err(Error::Sys(Errno::EAGAIN)) => { + self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb); + }, + Err(Error::Sys(Errno::EINPROGRESS)) => { + // aiocb is was successfully queued; no need to do anything + () + }, + Err(Error::Sys(Errno::EINVAL)) => panic!( + "AioCb was never submitted, or already finalized"), + _ => unreachable!() + } + } + let p = self.list.as_ptr(); + Errno::result(unsafe { + libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp) + }).map(|_| ()) + } + + /// Collect final status for an individual `AioCb` submitted as part of an + /// `LioCb`. + /// + /// This is just like [`AioCb::aio_return`], except it takes into account + /// operations that were restarted by [`LioCb::listio_resubmit`] + /// + /// [`AioCb::aio_return`] (struct.AioCb.html#method.aio_return) + /// [`LioCb::listio_resubmit`] #method.listio_resubmit + pub fn aio_return(&mut self, i: usize) -> Result { + if i >= self.results.len() || self.results[i].is_none() { + self.aiocbs[i].aio_return() + } else { + self.results[i].unwrap() + } + } } #[cfg(not(any(target_os = "ios", target_os = "macos")))] @@ -1146,6 +1261,7 @@ impl<'a> From>> for LioCb<'a> { fn from(src: Vec>) -> LioCb<'a> { LioCb { list: Vec::with_capacity(src.capacity()), + results: Vec::with_capacity(src.capacity()), aiocbs: src, } } diff --git a/test/sys/test_aio.rs b/test/sys/test_aio.rs index 0f7d71e5c1..48399fbdfa 100644 --- a/test/sys/test_aio.rs +++ b/test/sys/test_aio.rs @@ -513,8 +513,8 @@ fn test_liocb_listio_wait() { let err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone); err.expect("lio_listio"); - assert!(liocb.aiocbs[0].aio_return().unwrap() as usize == WBUF.len()); - assert!(liocb.aiocbs[1].aio_return().unwrap() as usize == rlen); + assert!(liocb.aio_return(0).unwrap() as usize == WBUF.len()); + assert!(liocb.aio_return(1).unwrap() as usize == rlen); } assert!(rbuf.deref().deref() == b"3456");