Skip to content

Commit

Permalink
Change sys::aio::lio_listio to sys::aio::LioCb::listio
Browse files Browse the repository at this point in the history
The new LioCb structure allows us to control the exact arguments passed
to lio_listio, guaranteeing that each call gets a unique storage
location for the list argument.  This prevents clients from misusing
lio_listio in a way that causes events to get dropped from a kqueue

Fixes #870
  • Loading branch information
asomers committed Mar 5, 2018
1 parent ad624c8 commit a11432e
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 96 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).
## [Unreleased]

### Added
- Added `sys::aio::LioCb` as a wrapper for `libc::lio_listio`.
([#872](https://github.com/nix-rust/nix/pull/872))
- Added `getsid` in `::nix::unistd`
([#850](https://github.com/nix-rust/nix/pull/850))
- Added `alarm`. ([#830](https://github.com/nix-rust/nix/pull/830))
Expand All @@ -29,6 +31,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).
([#837](https://github.com/nix-rust/nix/pull/837))

### Removed
- Removed `sys::aio::lio_listio`. Use `sys::aio::LioCb::listio` instead.
([#872](https://github.com/nix-rust/nix/pull/872))

## [0.10.0] 2018-01-26

Expand Down
166 changes: 109 additions & 57 deletions src/sys/aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -965,63 +965,6 @@ pub fn aio_suspend(list: &[&AioCb], timeout: Option<TimeSpec>) -> Result<()> {
}).map(drop)
}


/// Submits multiple asynchronous I/O requests with a single system call.
///
/// They are not guaranteed to complete atomically, and the order in which the
/// requests are carried out is not specified. Reads, writes, and fsyncs may be
/// freely mixed.
///
/// This function is useful for reducing the context-switch overhead of
/// submitting many AIO operations. It can also be used with
/// `LioMode::LIO_WAIT` to block on the result of several independent
/// operations. Used that way, it is often useful in programs that otherwise
/// make little use of AIO.
///
/// # Examples
///
/// Use `lio_listio` to submit an aio operation and wait for its completion. In
/// this case, there is no need to use `aio_suspend` to wait or `AioCb#error` to
/// poll.
///
/// ```
/// # extern crate tempfile;
/// # extern crate nix;
/// # use nix::sys::aio::*;
/// # use nix::sys::signal::SigevNotify;
/// # use std::os::unix::io::AsRawFd;
/// # use tempfile::tempfile;
/// # fn main() {
/// const WBUF: &[u8] = b"abcdef123456";
/// let mut f = tempfile().unwrap();
/// let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
/// 2, //offset
/// WBUF,
/// 0, //priority
/// SigevNotify::SigevNone,
/// LioOpcode::LIO_WRITE);
/// lio_listio(LioMode::LIO_WAIT,
/// &[&mut aiocb],
/// SigevNotify::SigevNone).unwrap();
/// assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len());
/// # }
/// ```
///
/// # References
///
/// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html)
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
pub fn lio_listio(mode: LioMode, list: &[&mut AioCb],
sigev_notify: SigevNotify) -> Result<()> {
let sigev = SigEvent::new(sigev_notify);
let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
let plist = list as *const [&mut AioCb] as *const [*mut libc::aiocb];
let p = plist as *const *mut libc::aiocb;
Errno::result(unsafe {
libc::lio_listio(mode as i32, p, list.len() as i32, sigevp)
}).map(drop)
}

impl<'a> Debug for AioCb<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("AioCb")
Expand All @@ -1045,3 +988,112 @@ impl<'a> Drop for AioCb<'a> {
assert!(!self.in_progress, "Dropped an in-progress AioCb");
}
}

/// LIO Control Block.
///
/// The basic structure used to issue multiple AIO operations simultaneously.
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
pub struct LioCb<'a> {
/// A collection of [`AioCb`]s. All of these will be issued simultaneously
/// by the [`listio`] method.
///
/// [`AioCb`]: struct.AioCb.html
/// [`listio`]: #method.listio
pub aiocbs: Vec<AioCb<'a>>,

/// The actual list passed to `libc::lio_listio`.
///
/// It must live for as long as any of the operations are still being
/// processesed, because the aio subsystem uses its address as a unique
/// identifier.
list: Vec<*mut libc::aiocb>
}

#[cfg(not(any(target_os = "ios", target_os = "macos")))]
impl<'a> LioCb<'a> {
/// Initialize an empty `LioCb`
pub fn with_capacity(capacity: usize) -> LioCb<'a> {
LioCb {
aiocbs: Vec::with_capacity(capacity),
list: Vec::with_capacity(capacity)
}
}

/// Submits multiple asynchronous I/O requests with a single system call.
///
/// They are not guaranteed to complete atomically, and the order in which
/// the requests are carried out is not specified. Reads, writes, and
/// fsyncs may be freely mixed.
///
/// This function is useful for reducing the context-switch overhead of
/// submitting many AIO operations. It can also be used with
/// `LioMode::LIO_WAIT` to block on the result of several independent
/// operations. Used that way, it is often useful in programs that
/// otherwise make little use of AIO.
///
/// # Examples
///
/// Use `listio` to submit an aio operation and wait for its completion. In
/// this case, there is no need to use `aio_suspend` to wait or
/// `AioCb#error` to poll.
///
/// ```
/// # extern crate tempfile;
/// # extern crate nix;
/// # use nix::sys::aio::*;
/// # use nix::sys::signal::SigevNotify;
/// # use std::os::unix::io::AsRawFd;
/// # use tempfile::tempfile;
/// # fn main() {
/// const WBUF: &[u8] = b"abcdef123456";
/// let mut f = tempfile().unwrap();
/// let mut liocb = LioCb::with_capacity(1);
/// liocb.aiocbs.push(AioCb::from_slice( f.as_raw_fd(),
/// 2, //offset
/// WBUF,
/// 0, //priority
/// SigevNotify::SigevNone,
/// LioOpcode::LIO_WRITE));
/// liocb.listio(LioMode::LIO_WAIT,
/// SigevNotify::SigevNone).unwrap();
/// assert_eq!(liocb.aiocbs[0].aio_return().unwrap() as usize, WBUF.len());
/// # }
/// ```
///
/// # References
///
/// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html)
pub fn listio(&mut self, mode: LioMode,
sigev_notify: SigevNotify) -> Result<()> {
let sigev = SigEvent::new(sigev_notify);
let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
self.list.clear();
for a in self.aiocbs.iter_mut() {
self.list.push(a as *mut AioCb<'a>
as *mut libc::aiocb);
}
let p = self.list.as_ptr();
Errno::result(unsafe {
libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp)
}).map(|_| ())
}
}

#[cfg(not(any(target_os = "ios", target_os = "macos")))]
impl<'a> Debug for LioCb<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("LioCb")
.field("aiocbs", &self.aiocbs)
.finish()
}
}

#[cfg(not(any(target_os = "ios", target_os = "macos")))]
impl<'a> From<Vec<AioCb<'a>>> for LioCb<'a> {
fn from(src: Vec<AioCb<'a>>) -> LioCb<'a> {
LioCb {
list: Vec::with_capacity(src.capacity()),
aiocbs: src,
}
}
}
89 changes: 50 additions & 39 deletions test/sys/test_aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,12 +505,12 @@ fn test_write_sigev_signal() {
assert!(rbuf == EXPECT);
}

// Test lio_listio with LIO_WAIT, so all AIO ops should be complete by the time
// lio_listio returns.
// Test LioCb::listio with LIO_WAIT, so all AIO ops should be complete by the
// time listio returns.
#[test]
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
fn test_lio_listio_wait() {
fn test_liocb_listio_wait() {
const INITIAL: &[u8] = b"abcdef123456";
const WBUF: &[u8] = b"CDEF";
let mut rbuf = vec![0; 4];
Expand All @@ -522,24 +522,27 @@ fn test_lio_listio_wait() {
f.write_all(INITIAL).unwrap();

{
let mut wcb = AioCb::from_slice( f.as_raw_fd(),
let wcb = AioCb::from_slice( f.as_raw_fd(),
2, //offset
WBUF,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);

let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(),
let rcb = AioCb::from_mut_slice( f.as_raw_fd(),
8, //offset
&mut rbuf,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_READ);
let err = lio_listio(LioMode::LIO_WAIT, &[&mut wcb, &mut rcb], SigevNotify::SigevNone);
err.expect("lio_listio failed");

assert!(wcb.aio_return().unwrap() as usize == WBUF.len());
assert!(rcb.aio_return().unwrap() as usize == rlen);
let mut liocb = LioCb::with_capacity(2);
liocb.aiocbs.push(wcb);
liocb.aiocbs.push(rcb);
let err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone);
err.expect("lio_listio");

assert!(liocb.aiocbs[0].aio_return().unwrap() as usize == WBUF.len());
assert!(liocb.aiocbs[1].aio_return().unwrap() as usize == rlen);
}
assert!(rbuf.deref().deref() == b"3456");

Expand All @@ -549,12 +552,12 @@ fn test_lio_listio_wait() {
assert!(rbuf2 == EXPECT);
}

// Test lio_listio with LIO_NOWAIT and no SigEvent, so we must use some other
// Test LioCb::listio with LIO_NOWAIT and no SigEvent, so we must use some other
// mechanism to check for the individual AioCb's completion.
#[test]
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
fn test_lio_listio_nowait() {
fn test_liocb_listio_nowait() {
const INITIAL: &[u8] = b"abcdef123456";
const WBUF: &[u8] = b"CDEF";
let mut rbuf = vec![0; 4];
Expand All @@ -566,26 +569,29 @@ fn test_lio_listio_nowait() {
f.write_all(INITIAL).unwrap();

{
let mut wcb = AioCb::from_slice( f.as_raw_fd(),
let wcb = AioCb::from_slice( f.as_raw_fd(),
2, //offset
WBUF,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);

let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(),
let rcb = AioCb::from_mut_slice( f.as_raw_fd(),
8, //offset
&mut rbuf,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_READ);
let err = lio_listio(LioMode::LIO_NOWAIT, &[&mut wcb, &mut rcb], SigevNotify::SigevNone);
err.expect("lio_listio failed");

poll_aio(&mut wcb).unwrap();
poll_aio(&mut rcb).unwrap();
assert!(wcb.aio_return().unwrap() as usize == WBUF.len());
assert!(rcb.aio_return().unwrap() as usize == rlen);
let mut liocb = LioCb::with_capacity(2);
liocb.aiocbs.push(wcb);
liocb.aiocbs.push(rcb);
let err = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone);
err.expect("lio_listio");

poll_aio(&mut liocb.aiocbs[0]).unwrap();
poll_aio(&mut liocb.aiocbs[1]).unwrap();
assert!(liocb.aiocbs[0].aio_return().unwrap() as usize == WBUF.len());
assert!(liocb.aiocbs[1].aio_return().unwrap() as usize == rlen);
}
assert!(rbuf.deref().deref() == b"3456");

Expand All @@ -595,13 +601,13 @@ fn test_lio_listio_nowait() {
assert!(rbuf2 == EXPECT);
}

// Test lio_listio with LIO_NOWAIT and a SigEvent to indicate when all AioCb's
// are complete.
// Test LioCb::listio with LIO_NOWAIT and a SigEvent to indicate when all
// AioCb's are complete.
// FIXME: This test is ignored on mips/mips64 because of failures in qemu in CI.
#[test]
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[cfg_attr(any(target_arch = "mips", target_arch = "mips64", target_env = "musl"), ignore)]
fn test_lio_listio_signal() {
fn test_liocb_listio_signal() {
#[allow(unused_variables)]
let m = ::SIGNAL_MTX.lock().expect("Mutex got poisoned by another test");
const INITIAL: &[u8] = b"abcdef123456";
Expand All @@ -620,29 +626,32 @@ fn test_lio_listio_signal() {
f.write_all(INITIAL).unwrap();

{
let mut wcb = AioCb::from_slice( f.as_raw_fd(),
let wcb = AioCb::from_slice( f.as_raw_fd(),
2, //offset
WBUF,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);

let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(),
let rcb = AioCb::from_mut_slice( f.as_raw_fd(),
8, //offset
&mut rbuf,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_READ);
let mut liocb = LioCb::with_capacity(2);
liocb.aiocbs.push(wcb);
liocb.aiocbs.push(rcb);
SIGNALED.store(false, Ordering::Relaxed);
unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap();
let err = lio_listio(LioMode::LIO_NOWAIT, &[&mut wcb, &mut rcb], sigev_notify);
err.expect("lio_listio failed");
let err = liocb.listio(LioMode::LIO_NOWAIT, sigev_notify);
err.expect("lio_listio");
while !SIGNALED.load(Ordering::Relaxed) {
thread::sleep(time::Duration::from_millis(10));
}

assert!(wcb.aio_return().unwrap() as usize == WBUF.len());
assert!(rcb.aio_return().unwrap() as usize == rlen);
assert!(liocb.aiocbs[0].aio_return().unwrap() as usize == WBUF.len());
assert!(liocb.aiocbs[1].aio_return().unwrap() as usize == rlen);
}
assert!(rbuf.deref().deref() == b"3456");

Expand All @@ -652,22 +661,24 @@ fn test_lio_listio_signal() {
assert!(rbuf2 == EXPECT);
}

// Try to use lio_listio to read into an immutable buffer. It should fail
// Try to use LioCb::listio to read into an immutable buffer. It should fail
// FIXME: This test fails to panic on Linux/musl
#[test]
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[should_panic(expected = "Can't read into an immutable buffer")]
#[cfg_attr(target_env = "musl", ignore)]
fn test_lio_listio_read_immutable() {
fn test_liocb_listio_read_immutable() {
let rbuf: &[u8] = b"abcd";
let f = tempfile().unwrap();


let mut rcb = AioCb::from_slice( f.as_raw_fd(),
2, //offset
rbuf,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_READ);
let _ = lio_listio(LioMode::LIO_NOWAIT, &[&mut rcb], SigevNotify::SigevNone);
let mut liocb = LioCb::from(vec![
AioCb::from_slice( f.as_raw_fd(),
2, //offset
rbuf,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_READ)
]);
let _ = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone);
}

0 comments on commit a11432e

Please sign in to comment.