Skip to content

Commit

Permalink
Auto merge of #483 - asomers:aio2, r=posborne
Browse files Browse the repository at this point in the history
Add POSIX AIO support

POSIX AIO is a standard for asynchronous file I/O.  Read, write, and
fsync operations can all take place in the background, with completion
notification delivered by a signal, by a new thread, by kqueue, or not
at all.

The SigEvent class, used for AIO notifications among other things, is
also added.
  • Loading branch information
homu committed Jan 26, 2017
2 parents 8865e78 + b740156 commit d4ba02f
Show file tree
Hide file tree
Showing 7 changed files with 748 additions and 0 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ This project adheres to [Semantic Versioning](http://semver.org/).
## [Unreleased]

### Added
- Added support for POSIX AIO
([#483](https://github.com/nix-rust/nix/pull/483))
- Added support for XNU system control sockets
([#478](https://github.com/nix-rust/nix/pull/478))
- Added support for `ioctl` calls on BSD platforms
([#478](https://github.com/nix-rust/nix/pull/478))
- Added struct `TimeSpec`
([#475](https://github.com/nix-rust/nix/pull/475))
([#483](https://github.com/nix-rust/nix/pull/483))
- Added complete definitions for all kqueue-related constants on all supported
OSes
([#415](https://github.com/nix-rust/nix/pull/415))
Expand Down
249 changes: 249 additions & 0 deletions src/sys/aio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
use {Error, Errno, Result};
use std::os::unix::io::RawFd;
use libc::{c_void, off_t, size_t};
use libc;
use std::marker::PhantomData;
use std::mem;
use std::ptr::{null, null_mut};
use sys::signal::*;
use sys::time::TimeSpec;

/// Mode for `aio_fsync`. Controls whether only data or both data and metadata
/// are synced.
#[repr(i32)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum AioFsyncMode {
/// do it like `fsync`
O_SYNC = libc::O_SYNC,
/// on supported operating systems only, do it like `fdatasync`
#[cfg(any(target_os = "openbsd", target_os = "bitrig",
target_os = "netbsd", target_os = "macos", target_os = "ios",
target_os = "linux"))]
O_DSYNC = libc::O_DSYNC
}

/// When used with `lio_listio`, determines whether a given `aiocb` should be
/// used for a read operation, a write operation, or ignored. Has no effect for
/// any other aio functions.
#[repr(i32)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum LioOpcode {
LIO_NOP = libc::LIO_NOP,
LIO_WRITE = libc::LIO_WRITE,
LIO_READ = libc::LIO_READ
}

/// Mode for `lio_listio`.
#[repr(i32)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum LioMode {
/// Requests that `lio_listio` block until all requested operations have
/// been completed
LIO_WAIT = libc::LIO_WAIT,
/// Requests that `lio_listio` return immediately
LIO_NOWAIT = libc::LIO_NOWAIT,
}

/// Return values for `aio_cancel`
#[repr(i32)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum AioCancelStat {
/// All outstanding requests were canceled
AioCanceled = libc::AIO_CANCELED,
/// Some requests were not canceled. Their status should be checked with
/// `aio_error`
AioNotCanceled = libc::AIO_NOTCANCELED,
/// All of the requests have already finished
AioAllDone = libc::AIO_ALLDONE,
}

/// The basic structure used by all aio functions. Each `aiocb` represents one
/// I/O request.
#[repr(C)]
pub struct AioCb<'a> {
aiocb: libc::aiocb,
phantom: PhantomData<&'a mut [u8]>
}

impl<'a> AioCb<'a> {
/// Constructs a new `AioCb` with no associated buffer.
///
/// The resulting `AioCb` structure is suitable for use with `aio_fsync`.
/// * `fd` File descriptor. Required for all aio functions.
/// * `prio` If POSIX Prioritized IO is supported, then the operation will
/// be prioritized at the process's priority level minus `prio`
/// * `sigev_notify` Determines how you will be notified of event
/// completion.
pub fn from_fd(fd: RawFd, prio: ::c_int,
sigev_notify: SigevNotify) -> AioCb<'a> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.aio_offset = 0;
a.aio_nbytes = 0;
a.aio_buf = null_mut();

let aiocb = AioCb { aiocb: a, phantom: PhantomData};
aiocb
}

/// Constructs a new `AioCb`.
///
/// * `fd` File descriptor. Required for all aio functions.
/// * `offs` File offset
/// * `buf` A memory buffer
/// * `prio` If POSIX Prioritized IO is supported, then the operation will
/// be prioritized at the process's priority level minus `prio`
/// * `sigev_notify` Determines how you will be notified of event
/// completion.
/// * `opcode` This field is only used for `lio_listio`. It determines
/// which operation to use for this individual aiocb
pub fn from_mut_slice(fd: RawFd, offs: off_t, buf: &'a mut [u8],
prio: ::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> AioCb {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.aio_offset = offs;
a.aio_nbytes = buf.len() as size_t;
a.aio_buf = buf.as_ptr() as *mut c_void;
a.aio_lio_opcode = opcode as ::c_int;

let aiocb = AioCb { aiocb: a, phantom: PhantomData};
aiocb
}

/// Like `from_mut_slice`, but works on constant slices rather than
/// mutable slices.
///
/// This is technically unsafe, but in practice it's fine
/// to use with any aio functions except `aio_read` and `lio_listio` (with
/// `opcode` set to `LIO_READ`). This method is useful when writing a const
/// buffer with `aio_write`, since from_mut_slice can't work with const
/// buffers.
// Note: another solution to the problem of writing const buffers would be
// to genericize AioCb for both &mut [u8] and &[u8] buffers. aio_read could
// take the former and aio_write could take the latter. However, then
// lio_listio wouldn't work, because that function needs a slice of AioCb,
// and they must all be the same type. We're basically stuck with using an
// unsafe function, since aio (as designed in C) is an unsafe API.
pub unsafe fn from_slice(fd: RawFd, offs: off_t, buf: &'a [u8],
prio: ::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> AioCb {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.aio_offset = offs;
a.aio_nbytes = buf.len() as size_t;
a.aio_buf = buf.as_ptr() as *mut c_void;
a.aio_lio_opcode = opcode as ::c_int;

let aiocb = AioCb { aiocb: a, phantom: PhantomData};
aiocb
}

fn common_init(fd: RawFd, prio: ::c_int,
sigev_notify: SigevNotify) -> libc::aiocb {
// Use mem::zeroed instead of explicitly zeroing each field, because the
// number and name of reserved fields is OS-dependent. On some OSes,
// some reserved fields are used the kernel for state, and must be
// explicitly zeroed when allocated.
let mut a = unsafe { mem::zeroed::<libc::aiocb>()};
a.aio_fildes = fd;
a.aio_reqprio = prio;
a.aio_sigevent = SigEvent::new(sigev_notify).sigevent();
a
}

/// Update the notification settings for an existing `aiocb`
pub fn set_sigev_notify(&mut self, sigev_notify: SigevNotify) {
self.aiocb.aio_sigevent = SigEvent::new(sigev_notify).sigevent();
}
}

/// Cancels outstanding AIO requests. If `aiocb` is `None`, then all requests
/// for `fd` will be cancelled. Otherwise, only the given `AioCb` will be
/// cancelled.
pub fn aio_cancel(fd: RawFd, aiocb: Option<&mut AioCb>) -> Result<AioCancelStat> {
let p: *mut libc::aiocb = match aiocb {
None => null_mut(),
Some(x) => &mut x.aiocb
};
match unsafe { libc::aio_cancel(fd, p) } {
libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled),
libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled),
libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone),
-1 => Err(Error::last()),
_ => panic!("unknown aio_cancel return value")
}
}

/// Retrieve error status of an asynchronous operation. If the request has not
/// yet completed, returns `EINPROGRESS`. Otherwise, returns `Ok` or any other
/// error.
pub fn aio_error(aiocb: &mut AioCb) -> Result<()> {
let p: *mut libc::aiocb = &mut aiocb.aiocb;
match unsafe { libc::aio_error(p) } {
0 => Ok(()),
num if num > 0 => Err(Error::from_errno(Errno::from_i32(num))),
-1 => Err(Error::last()),
num => panic!("unknown aio_error return value {:?}", num)
}
}

/// An asynchronous version of `fsync`.
pub fn aio_fsync(mode: AioFsyncMode, aiocb: &mut AioCb) -> Result<()> {
let p: *mut libc::aiocb = &mut aiocb.aiocb;
Errno::result(unsafe { libc::aio_fsync(mode as ::c_int, p) }).map(drop)
}

/// Asynchronously reads from a file descriptor into a buffer
pub fn aio_read(aiocb: &mut AioCb) -> Result<()> {
let p: *mut libc::aiocb = &mut aiocb.aiocb;
Errno::result(unsafe { libc::aio_read(p) }).map(drop)
}

/// Retrieve return status of an asynchronous operation. Should only be called
/// once for each `AioCb`, after `aio_error` indicates that it has completed.
/// The result the same as for `read`, `write`, of `fsync`.
pub fn aio_return(aiocb: &mut AioCb) -> Result<isize> {
let p: *mut libc::aiocb = &mut aiocb.aiocb;
Errno::result(unsafe { libc::aio_return(p) })
}

/// Suspends the calling process until at least one of the specified `AioCb`s
/// has completed, a signal is delivered, or the timeout has passed. If
/// `timeout` is `None`, `aio_suspend` will block indefinitely.
pub fn aio_suspend(list: &[&AioCb], timeout: Option<TimeSpec>) -> Result<()> {
// We must use transmute because Rust doesn't understand that a pointer to a
// Struct is the same as a pointer to its first element.
let plist = unsafe {
mem::transmute::<&[&AioCb], *const [*const libc::aiocb]>(list)
};
let p = plist as *const *const libc::aiocb;
let timep = match timeout {
None => null::<libc::timespec>(),
Some(x) => x.as_ref() as *const libc::timespec
};
Errno::result(unsafe {
libc::aio_suspend(p, list.len() as i32, timep)
}).map(drop)
}

/// Asynchronously writes from a buffer to a file descriptor
pub fn aio_write(aiocb: &mut AioCb) -> Result<()> {
let p: *mut libc::aiocb = &mut aiocb.aiocb;
Errno::result(unsafe { libc::aio_write(p) }).map(drop)
}

/// Submits multiple asynchronous I/O requests with a single system call. The
/// order in which the requests are carried out is not specified.
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
pub fn lio_listio(mode: LioMode, list: &[&mut AioCb],
sigev_notify: SigevNotify) -> Result<()> {
let sigev = SigEvent::new(sigev_notify);
let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
// We must use transmute because Rust doesn't understand that a pointer to a
// Struct is the same as a pointer to its first element.
let plist = unsafe {
mem::transmute::<&[&mut AioCb], *const [*mut libc::aiocb]>(list)
};
let p = plist as *const *mut libc::aiocb;
Errno::result(unsafe {
libc::lio_listio(mode as i32, p, list.len() as i32, sigevp)
}).map(drop)
}
4 changes: 4 additions & 0 deletions src/sys/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#[cfg(any(target_os = "freebsd", target_os = "dragonfly", target_os = "ios",
target_os = "netbsd", target_os = "macos", target_os = "linux"))]
pub mod aio;

#[cfg(any(target_os = "linux", target_os = "android"))]
pub mod epoll;

Expand Down
103 changes: 103 additions & 0 deletions src/sys/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
use libc;
use {Errno, Error, Result};
use std::mem;
#[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
use std::os::unix::io::RawFd;
use std::ptr;

// Currently there is only one definition of c_int in libc, as well as only one
Expand Down Expand Up @@ -403,6 +405,107 @@ pub fn raise(signal: Signal) -> Result<()> {
Errno::result(res).map(drop)
}


#[cfg(target_os = "freebsd")]
pub type type_of_thread_id = libc::lwpid_t;
#[cfg(target_os = "linux")]
pub type type_of_thread_id = libc::pid_t;

/// Used to request asynchronous notification of certain events, for example,
/// with POSIX AIO, POSIX message queues, and POSIX timers.
// sigval is actually a union of a int and a void*. But it's never really used
// as a pointer, because neither libc nor the kernel ever dereference it. nix
// therefore presents it as an intptr_t, which is how kevent uses it.
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SigevNotify {
/// No notification will be delivered
SigevNone,
/// The signal given by `signal` will be delivered to the process. The
/// value in `si_value` will be present in the `si_value` field of the
/// `siginfo_t` structure of the queued signal.
SigevSignal { signal: Signal, si_value: libc::intptr_t },
// Note: SIGEV_THREAD is not implemented because libc::sigevent does not
// expose a way to set the union members needed by SIGEV_THREAD.
/// A new `kevent` is posted to the kqueue `kq`. The `kevent`'s `udata`
/// field will contain the value in `udata`.
#[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
SigevKevent { kq: RawFd, udata: libc::intptr_t },
/// The signal `signal` is queued to the thread whose LWP ID is given in
/// `thread_id`. The value stored in `si_value` will be present in the
/// `si_value` of the `siginfo_t` structure of the queued signal.
#[cfg(any(target_os = "freebsd", target_os = "linux"))]
SigevThreadId { signal: Signal, thread_id: type_of_thread_id,
si_value: libc::intptr_t },
}

/// Used to request asynchronous notification of the completion of certain
/// events, such as POSIX AIO and timers.
#[repr(C)]
pub struct SigEvent {
sigevent: libc::sigevent
}

impl SigEvent {
// Note: this constructor does not allow the user to set the
// sigev_notify_kevent_flags field. That's considered ok because on FreeBSD
// at least those flags don't do anything useful. That field is part of a
// union that shares space with the more genuinely useful
// Note: This constructor also doesn't allow the caller to set the
// sigev_notify_function or sigev_notify_attributes fields, which are
// required for SIGEV_THREAD. That's considered ok because on no operating
// system is SIGEV_THREAD the most efficient way to deliver AIO
// notification. FreeBSD and Dragonfly programs should prefer SIGEV_KEVENT.
// Linux, Solaris, and portable programs should prefer SIGEV_THREAD_ID or
// SIGEV_SIGNAL. That field is part of a union that shares space with the
// more genuinely useful sigev_notify_thread_id
pub fn new(sigev_notify: SigevNotify) -> SigEvent {
let mut sev = unsafe { mem::zeroed::<libc::sigevent>()};
sev.sigev_notify = match sigev_notify {
SigevNotify::SigevNone => libc::SIGEV_NONE,
SigevNotify::SigevSignal{..} => libc::SIGEV_SIGNAL,
#[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
SigevNotify::SigevKevent{..} => libc::SIGEV_KEVENT,
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
SigevNotify::SigevThreadId{..} => libc::SIGEV_THREAD_ID
};
sev.sigev_signo = match sigev_notify {
SigevNotify::SigevSignal{ signal, .. } => signal as ::c_int,
#[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
SigevNotify::SigevKevent{ kq, ..} => kq,
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
SigevNotify::SigevThreadId{ signal, .. } => signal as ::c_int,
_ => 0
};
sev.sigev_value.sival_ptr = match sigev_notify {
SigevNotify::SigevNone => ptr::null_mut::<libc::c_void>(),
SigevNotify::SigevSignal{ si_value, .. } => si_value as *mut ::c_void,
#[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
SigevNotify::SigevKevent{ udata, .. } => udata as *mut ::c_void,
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
SigevNotify::SigevThreadId{ si_value, .. } => si_value as *mut ::c_void,
};
SigEvent::set_tid(&mut sev, &sigev_notify);
SigEvent{sigevent: sev}
}

#[cfg(any(target_os = "linux", target_os = "freebsd"))]
fn set_tid(sev: &mut libc::sigevent, sigev_notify: &SigevNotify) {
sev.sigev_notify_thread_id = match sigev_notify {
&SigevNotify::SigevThreadId { thread_id, .. } => thread_id,
_ => 0 as type_of_thread_id
};
}

#[cfg(not(any(target_os = "freebsd", target_os = "linux")))]
fn set_tid(_sev: &mut libc::sigevent, _sigev_notify: &SigevNotify) {
}

pub fn sigevent(&self) -> libc::sigevent {
self.sigevent
}
}


#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit d4ba02f

Please sign in to comment.