Skip to content

Commit

Permalink
Add AioCb::from_boxed_slice
Browse files Browse the repository at this point in the history
The existing AioCb constructors work for simple programs where
everything is stored on the stack.  But in more complicated programs the
borrow checker can't prove that a buffer will outlive the AioCb that
references it.  Fix this problem by introducting
AioCb::from_boxed_slice, which takes a reference-counted buffer.

Fixes #575
  • Loading branch information
asomers committed Apr 16, 2017
1 parent 8c47de4 commit d3ad3c7
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 34 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).
## [Unreleased]

### Added
- Added `AioCb::from_boxed_slice`
([#582](https://github.com/nix-rust/nix/pull/582)
- Added `nix::unistd::{openat, fstatat, readlink, readlinkat}`
([#551](https://github.com/nix-rust/nix/pull/551))

Expand Down
57 changes: 49 additions & 8 deletions src/sys/aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::io::Write;
use std::io::stderr;
use std::marker::PhantomData;
use std::mem;
use std::rc::Rc;
use std::ptr::{null, null_mut};
use sys::signal::*;
use sys::time::TimeSpec;
Expand Down Expand Up @@ -61,6 +62,17 @@ pub enum AioCancelStat {
AioAllDone = libc::AIO_ALLDONE,
}

/// Private type used by nix to keep buffers from Drop'ing while the kernel has
/// a pointer to them.
#[derive(Clone, Debug)]
enum Keeper<'a> {
none,
/// Keeps a reference to a Boxed slice
boxed(Rc<Box<[u8]>>),
/// Keeps a reference to a slice
phantom(PhantomData<&'a mut [u8]>)
}

/// The basic structure used by all aio functions. Each `aiocb` represents one
/// I/O request.
pub struct AioCb<'a> {
Expand All @@ -69,7 +81,8 @@ pub struct AioCb<'a> {
mutable: bool,
/// Could this `AioCb` potentially have any in-kernel state?
in_progress: bool,
phantom: PhantomData<&'a mut [u8]>
/// Used to keep buffers from Drop'ing
keeper: Keeper<'a>
}

impl<'a> AioCb<'a> {
Expand All @@ -89,7 +102,7 @@ impl<'a> AioCb<'a> {
a.aio_buf = null_mut();

let aiocb = AioCb { aiocb: a, mutable: false, in_progress: false,
phantom: PhantomData};
keeper: Keeper::none};
aiocb
}

Expand All @@ -106,17 +119,43 @@ impl<'a> AioCb<'a> {
/// which operation to use for this individual aiocb
pub fn from_mut_slice(fd: RawFd, offs: off_t, buf: &'a mut [u8],
prio: ::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> AioCb {
opcode: LioOpcode) -> AioCb<'a> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.aio_offset = offs;
a.aio_nbytes = buf.len() as size_t;
// casting an immutable buffer to a mutable pointer looks unsafe, but
// technically its only unsafe to dereference it, not to create it.
a.aio_buf = buf.as_ptr() as *mut c_void;
a.aio_lio_opcode = opcode as ::c_int;

let aiocb = AioCb { aiocb: a, mutable: true, in_progress: false,
phantom: PhantomData};
keeper: Keeper::phantom(PhantomData)};
aiocb
}

/// Constructs a new `AioCb`.
///
/// Unlike `from_mut_slice`, this method returns a structure suitable for
/// placement on the heap.
///
/// * `fd` File descriptor. Required for all aio functions.
/// * `offs` File offset
/// * `buf` A shared memory buffer on the heap
/// * `prio` If POSIX Prioritized IO is supported, then the operation will
/// be prioritized at the process's priority level minus `prio`
/// * `sigev_notify` Determines how you will be notified of event
/// completion.
/// * `opcode` This field is only used for `lio_listio`. It determines
/// which operation to use for this individual aiocb
pub fn from_boxed_slice(fd: RawFd, offs: off_t, buf: Rc<Box<[u8]>>,
prio: ::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> AioCb<'a> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.aio_offset = offs;
a.aio_nbytes = buf.len() as size_t;
a.aio_buf = buf.as_ptr() as *mut c_void;
a.aio_lio_opcode = opcode as ::c_int;

let aiocb = AioCb{ aiocb: a, mutable: true, in_progress: false,
keeper: Keeper::boxed(buf)};
aiocb
}

Expand All @@ -139,12 +178,15 @@ impl<'a> AioCb<'a> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.aio_offset = offs;
a.aio_nbytes = buf.len() as size_t;
// casting an immutable buffer to a mutable pointer looks unsafe,
// but technically its only unsafe to dereference it, not to create
// it.
a.aio_buf = buf.as_ptr() as *mut c_void;
assert!(opcode != LioOpcode::LIO_READ, "Can't read into an immutable buffer");
a.aio_lio_opcode = opcode as ::c_int;

let aiocb = AioCb { aiocb: a, mutable: false, in_progress: false,
phantom: PhantomData};
keeper: Keeper::none};
aiocb
}

Expand Down Expand Up @@ -284,7 +326,6 @@ impl<'a> Debug for AioCb<'a> {
.field("aio_sigevent", &SigEvent::from(&self.aiocb.aio_sigevent))
.field("mutable", &self.mutable)
.field("in_progress", &self.in_progress)
.field("phantom", &self.phantom)
.finish()
}
}
Expand Down
80 changes: 54 additions & 26 deletions test/sys/test_aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use nix::sys::aio::*;
use nix::sys::signal::*;
use nix::sys::time::{TimeSpec, TimeValLike};
use std::io::{Write, Read, Seek, SeekFrom};
use std::ops::Deref;
use std::os::unix::io::AsRawFd;
use std::rc::Rc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::{thread, time};
Expand All @@ -25,12 +27,12 @@ fn poll_aio(mut aiocb: &mut AioCb) -> Result<()> {
// AioCancelStat value.
#[test]
fn test_cancel() {
let mut wbuf = "CDEF".to_string().into_bytes();
let wbuf: &'static [u8] = b"CDEF";

let f = tempfile().unwrap();
let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(),
let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
0, //offset
&mut wbuf,
&wbuf,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
Expand All @@ -49,12 +51,12 @@ fn test_cancel() {
// Tests using aio_cancel_all for all outstanding IOs.
#[test]
fn test_aio_cancel_all() {
let mut wbuf = "CDEF".to_string().into_bytes();
let wbuf: &'static [u8] = b"CDEF";

let f = tempfile().unwrap();
let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(),
let mut aiocb = AioCb::from_slice(f.as_raw_fd(),
0, //offset
&mut wbuf,
&wbuf,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
Expand Down Expand Up @@ -90,7 +92,7 @@ fn test_aio_suspend() {
const INITIAL: &'static [u8] = b"abcdef123456";
const WBUF: &'static [u8] = b"CDEF";
let timeout = TimeSpec::seconds(10);
let mut rbuf = vec![0; 4];
let rbuf = Rc::new(vec![0; 4].into_boxed_slice());
let mut f = tempfile().unwrap();
f.write(INITIAL).unwrap();

Expand All @@ -101,9 +103,9 @@ fn test_aio_suspend() {
SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);

let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(),
let mut rcb = AioCb::from_boxed_slice( f.as_raw_fd(),
8, //offset
&mut rbuf,
rbuf.clone(),
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_READ);
Expand All @@ -128,6 +130,31 @@ fn test_aio_suspend() {
// for completion
#[test]
fn test_read() {
const INITIAL: &'static [u8] = b"abcdef123456";
let rbuf = Rc::new(vec![0; 4].into_boxed_slice());
const EXPECT: &'static [u8] = b"cdef";
let mut f = tempfile().unwrap();
f.write(INITIAL).unwrap();
{
let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(),
2, //offset
rbuf.clone(),
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
aiocb.read().unwrap();

let err = poll_aio(&mut aiocb);
assert!(err == Ok(()));
assert!(aiocb.aio_return().unwrap() as usize == EXPECT.len());
}

assert!(EXPECT == rbuf.deref().deref());
}

// Tests from_mut_slice
#[test]
fn test_read_into_mut_slice() {
const INITIAL: &'static [u8] = b"abcdef123456";
let mut rbuf = vec![0; 4];
const EXPECT: &'static [u8] = b"cdef";
Expand All @@ -154,7 +181,7 @@ fn test_read() {
#[test]
#[should_panic(expected = "Can't read into an immutable buffer")]
fn test_read_immutable_buffer() {
let rbuf = vec![0; 4];
let rbuf: &'static [u8] = b"CDEF";
let f = tempfile().unwrap();
let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
2, //offset
Expand All @@ -165,28 +192,29 @@ fn test_read_immutable_buffer() {
aiocb.read().unwrap();
}


// Test a simple aio operation with no completion notification. We must poll
// for completion. Unlike test_aio_read, this test uses AioCb::from_slice
#[test]
fn test_write() {
const INITIAL: &'static [u8] = b"abcdef123456";
const WBUF: &'static [u8] = b"CDEF"; //"CDEF".to_string().into_bytes();
let wbuf = "CDEF".to_string().into_bytes();
let mut rbuf = Vec::new();
const EXPECT: &'static [u8] = b"abCDEF123456";

let mut f = tempfile().unwrap();
f.write(INITIAL).unwrap();
let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
2, //offset
&WBUF,
&wbuf,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
aiocb.write().unwrap();

let err = poll_aio(&mut aiocb);
assert!(err == Ok(()));
assert!(aiocb.aio_return().unwrap() as usize == WBUF.len());
assert!(aiocb.aio_return().unwrap() as usize == wbuf.len());

f.seek(SeekFrom::Start(0)).unwrap();
let len = f.read_to_end(&mut rbuf).unwrap();
Expand Down Expand Up @@ -249,7 +277,7 @@ fn test_write_sigev_signal() {
fn test_lio_listio_wait() {
const INITIAL: &'static [u8] = b"abcdef123456";
const WBUF: &'static [u8] = b"CDEF";
let mut rbuf = vec![0; 4];
let rbuf = Rc::new(vec![0; 4].into_boxed_slice());
let mut rbuf2 = Vec::new();
const EXPECT: &'static [u8] = b"abCDEF123456";
let mut f = tempfile().unwrap();
Expand All @@ -264,9 +292,9 @@ fn test_lio_listio_wait() {
SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);

let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(),
let mut rcb = AioCb::from_boxed_slice( f.as_raw_fd(),
8, //offset
&mut rbuf,
rbuf.clone(),
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_READ);
Expand All @@ -276,7 +304,7 @@ fn test_lio_listio_wait() {
assert!(wcb.aio_return().unwrap() as usize == WBUF.len());
assert!(rcb.aio_return().unwrap() as usize == WBUF.len());
}
assert!(rbuf == b"3456");
assert!(rbuf.deref().deref() == b"3456");

f.seek(SeekFrom::Start(0)).unwrap();
let len = f.read_to_end(&mut rbuf2).unwrap();
Expand All @@ -291,7 +319,7 @@ fn test_lio_listio_wait() {
fn test_lio_listio_nowait() {
const INITIAL: &'static [u8] = b"abcdef123456";
const WBUF: &'static [u8] = b"CDEF";
let mut rbuf = vec![0; 4];
let rbuf = Rc::new(vec![0; 4].into_boxed_slice());
let mut rbuf2 = Vec::new();
const EXPECT: &'static [u8] = b"abCDEF123456";
let mut f = tempfile().unwrap();
Expand All @@ -306,9 +334,9 @@ fn test_lio_listio_nowait() {
SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);

let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(),
let mut rcb = AioCb::from_boxed_slice( f.as_raw_fd(),
8, //offset
&mut rbuf,
rbuf.clone(),
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_READ);
Expand All @@ -320,7 +348,7 @@ fn test_lio_listio_nowait() {
assert!(wcb.aio_return().unwrap() as usize == WBUF.len());
assert!(rcb.aio_return().unwrap() as usize == WBUF.len());
}
assert!(rbuf == b"3456");
assert!(rbuf.deref().deref() == b"3456");

f.seek(SeekFrom::Start(0)).unwrap();
let len = f.read_to_end(&mut rbuf2).unwrap();
Expand All @@ -336,7 +364,7 @@ fn test_lio_listio_signal() {
let _ = SIGUSR2_MTX.lock().expect("Mutex got poisoned by another test");
const INITIAL: &'static [u8] = b"abcdef123456";
const WBUF: &'static [u8] = b"CDEF";
let mut rbuf = vec![0; 4];
let rbuf = Rc::new(vec![0; 4].into_boxed_slice());
let mut rbuf2 = Vec::new();
const EXPECT: &'static [u8] = b"abCDEF123456";
let mut f = tempfile().unwrap();
Expand All @@ -356,9 +384,9 @@ fn test_lio_listio_signal() {
SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);

let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(),
let mut rcb = AioCb::from_boxed_slice( f.as_raw_fd(),
8, //offset
&mut rbuf,
rbuf.clone(),
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_READ);
Expand All @@ -373,7 +401,7 @@ fn test_lio_listio_signal() {
assert!(wcb.aio_return().unwrap() as usize == WBUF.len());
assert!(rcb.aio_return().unwrap() as usize == WBUF.len());
}
assert!(rbuf == b"3456");
assert!(rbuf.deref().deref() == b"3456");

f.seek(SeekFrom::Start(0)).unwrap();
let len = f.read_to_end(&mut rbuf2).unwrap();
Expand All @@ -386,7 +414,7 @@ fn test_lio_listio_signal() {
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[should_panic(expected = "Can't read into an immutable buffer")]
fn test_lio_listio_read_immutable() {
let rbuf = vec![0; 4];
let rbuf: &'static [u8] = b"abcd";
let f = tempfile().unwrap();


Expand Down

0 comments on commit d3ad3c7

Please sign in to comment.