diff --git a/Cargo.toml b/Cargo.toml index daf70fa..91a7981 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ travis-ci = { repository = "a1ien/rusb" } vendored = [ "libusb1-sys/vendored" ] [workspace] -members = ["libusb1-sys"] +members = [ "libusb1-sys", "rusb-async" ] [dependencies] libusb1-sys = { path = "libusb1-sys", version = "0.6.0" } diff --git a/rusb-async/Cargo.toml b/rusb-async/Cargo.toml new file mode 100644 index 0000000..c58ea2c --- /dev/null +++ b/rusb-async/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "rusb-async" +version = "0.0.1-alpha" +edition = "2021" +authors = [ +"Ilya Averyanov ", +"Ryan Butler ", +"Kevin Mehall " +] + +description = "Rust library for accessing USB devices." +license = "MIT" +homepage = "https://github.com/a1ien/rusb" +repository = "https://github.com/a1ien/rusb.git" +keywords = ["usb", "libusb", "async"] + +[features] +vendored = [ "rusb/vendored" ] + +[dependencies] +rusb = { path = "..", version = "0.9.1" } +libc = "0.2" diff --git a/rusb-async/examples/read_async.rs b/rusb-async/examples/read_async.rs new file mode 100644 index 0000000..7685f08 --- /dev/null +++ b/rusb-async/examples/read_async.rs @@ -0,0 +1,54 @@ +use rusb_async::TransferPool; + +use rusb::{Context, UsbContext}; + +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; + +fn convert_argument(input: &str) -> u16 { + if input.starts_with("0x") { + return u16::from_str_radix(input.trim_start_matches("0x"), 16).unwrap(); + } + u16::from_str_radix(input, 10) + .expect("Invalid input, be sure to add `0x` for hexadecimal values.") +} + +fn main() { + let args: Vec = std::env::args().collect(); + + if args.len() < 4 { + eprintln!("Usage: read_async "); + return; + } + + let vid = convert_argument(args[1].as_ref()); + let pid = convert_argument(args[2].as_ref()); + let endpoint: u8 = FromStr::from_str(args[3].as_ref()).unwrap(); + + let ctx = Context::new().expect("Could not initialize libusb"); + let device = Arc::new( + ctx.open_device_with_vid_pid(vid, pid) + .expect("Could not find device"), + ); + + const NUM_TRANSFERS: usize = 32; + const BUF_SIZE: usize = 64; + + let mut async_pool = TransferPool::new(device).expect("Failed to create async pool!"); + + while async_pool.pending() < NUM_TRANSFERS { + async_pool + .submit_bulk(endpoint, Vec::with_capacity(BUF_SIZE)) + .expect("Failed to submit transfer"); + } + + let timeout = Duration::from_secs(10); + loop { + let data = async_pool.poll(timeout).expect("Transfer failed"); + println!("Got data: {} {:?}", data.len(), data); + async_pool + .submit_bulk(endpoint, data) + .expect("Failed to resubmit transfer"); + } +} diff --git a/rusb-async/examples/read_write_async.rs b/rusb-async/examples/read_write_async.rs new file mode 100644 index 0000000..4ca8119 --- /dev/null +++ b/rusb-async/examples/read_write_async.rs @@ -0,0 +1,73 @@ +use rusb_async::TransferPool; + +use rusb::{Context, UsbContext}; + +use std::time::Duration; +use std::{sync::Arc, thread}; + +fn main() { + let args: Vec = std::env::args().collect(); + + if args.len() < 5 { + eprintln!("Usage: read_write_async (all numbers hex)"); + return; + } + + let vid = u16::from_str_radix(args[1].as_ref(), 16).unwrap(); + let pid = u16::from_str_radix(args[2].as_ref(), 16).unwrap(); + let out_endpoint = u8::from_str_radix(args[3].as_ref(), 16).unwrap(); + let in_endpoint = u8::from_str_radix(args[4].as_ref(), 16).unwrap(); + + let ctx = Context::new().expect("Could not initialize libusb"); + let device = Arc::new( + ctx.open_device_with_vid_pid(vid, pid) + .expect("Could not find device"), + ); + + thread::spawn({ + let device = device.clone(); + move || { + let mut write_pool = TransferPool::new(device).expect("Failed to create async pool!"); + + let mut i = 0u8; + + loop { + let mut buf = if write_pool.pending() < 8 { + Vec::with_capacity(64) + } else { + write_pool + .poll(Duration::from_secs(5)) + .expect("Failed to poll OUT transfer") + }; + + buf.clear(); + buf.push(i); + buf.resize(64, 0x2); + + write_pool + .submit_bulk(out_endpoint, buf) + .expect("Failed to submit OUT transfer"); + println!("Wrote {}", i); + i = i.wrapping_add(1); + } + } + }); + + let mut read_pool = TransferPool::new(device).expect("Failed to create async pool!"); + + while read_pool.pending() < 8 { + read_pool + .submit_bulk(in_endpoint, Vec::with_capacity(1024)) + .expect("Failed to submit IN transfer"); + } + + loop { + let data = read_pool + .poll(Duration::from_secs(10)) + .expect("Failed to poll IN transfer"); + println!("Got data: {} {:?}", data.len(), data[0]); + read_pool + .submit_bulk(in_endpoint, data) + .expect("Failed to resubmit IN transfer"); + } +} diff --git a/rusb-async/src/error.rs b/rusb-async/src/error.rs new file mode 100644 index 0000000..73fb42c --- /dev/null +++ b/rusb-async/src/error.rs @@ -0,0 +1,48 @@ +use std::{fmt, result}; + +/// A result of a function that may return a `Error`. +pub type Result = result::Result; + +#[derive(Debug)] +pub enum Error { + /// No transfers pending + NoTransfersPending, + + /// Poll timed out + PollTimeout, + + /// Transfer is stalled + Stall, + + /// Device was disconnected + Disconnected, + + /// Device sent more data than expected + Overflow, + + /// Other Error + Other(&'static str), + + /// Error code on other failure + Errno(&'static str, i32), + + /// Transfer was cancelled + Cancelled, +} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> std::result::Result<(), fmt::Error> { + match self { + Error::NoTransfersPending => fmt.write_str("No transfers pending"), + Error::PollTimeout => fmt.write_str("Poll timed out"), + Error::Stall => fmt.write_str("Transfer is stalled"), + Error::Disconnected => fmt.write_str("Device was disconnected"), + Error::Overflow => fmt.write_str("Device sent more data than expected"), + Error::Other(s) => write!(fmt, "Other Error: {s}"), + Error::Errno(s, n) => write!(fmt, "{s} ERRNO: {n}"), + Error::Cancelled => fmt.write_str("Transfer was cancelled"), + } + } +} + +impl std::error::Error for Error {} diff --git a/rusb-async/src/lib.rs b/rusb-async/src/lib.rs new file mode 100644 index 0000000..38b2422 --- /dev/null +++ b/rusb-async/src/lib.rs @@ -0,0 +1,286 @@ +use rusb::ffi::{self, constants::*}; + +use std::convert::TryInto; +use std::ptr::NonNull; + +use std::sync::atomic::{AtomicBool, Ordering}; + +mod error; +mod pool; + +use error::{Error, Result}; + +pub use pool::TransferPool; + +struct Transfer { + ptr: NonNull, + buffer: Vec, +} + +impl Transfer { + // Invariant: Caller must ensure `device` outlives this transfer + unsafe fn bulk( + device: *mut ffi::libusb_device_handle, + endpoint: u8, + mut buffer: Vec, + ) -> Self { + // non-isochronous endpoints (e.g. control, bulk, interrupt) specify a value of 0 + // This is step 1 of async API + + let ptr = + NonNull::new(ffi::libusb_alloc_transfer(0)).expect("Could not allocate transfer!"); + + let user_data = Box::into_raw(Box::new(AtomicBool::new(false))).cast::(); + + let length = if endpoint & ffi::constants::LIBUSB_ENDPOINT_DIR_MASK + == ffi::constants::LIBUSB_ENDPOINT_OUT + { + // for OUT endpoints: the currently valid data in the buffer + buffer.len() + } else { + // for IN endpoints: the full capacity + buffer.capacity() + } + .try_into() + .unwrap(); + + ffi::libusb_fill_bulk_transfer( + ptr.as_ptr(), + device, + endpoint, + buffer.as_mut_ptr(), + length, + Self::transfer_cb, + user_data, + 0, + ); + + Self { ptr, buffer } + } + + // Invariant: Caller must ensure `device` outlives this transfer + unsafe fn control( + device: *mut ffi::libusb_device_handle, + + request_type: u8, + request: u8, + value: u16, + index: u16, + data: &[u8], + ) -> Self { + let mut buf = Vec::with_capacity(data.len() + LIBUSB_CONTROL_SETUP_SIZE); + + let length = data.len() as u16; + + ffi::libusb_fill_control_setup( + buf.as_mut_ptr() as *mut u8, + request_type, + request, + value, + index, + length, + ); + Self::control_raw(device, buf) + } + + // Invariant: Caller must ensure `device` outlives this transfer + unsafe fn control_raw(device: *mut ffi::libusb_device_handle, mut buffer: Vec) -> Self { + // non-isochronous endpoints (e.g. control, bulk, interrupt) specify a value of 0 + // This is step 1 of async API + + let ptr = + NonNull::new(ffi::libusb_alloc_transfer(0)).expect("Could not allocate transfer!"); + + let user_data = Box::into_raw(Box::new(AtomicBool::new(false))).cast::(); + + ffi::libusb_fill_control_transfer( + ptr.as_ptr(), + device, + buffer.as_mut_ptr(), + Self::transfer_cb, + user_data, + 0, + ); + + Self { ptr, buffer } + } + + // Invariant: Caller must ensure `device` outlives this transfer + unsafe fn interrupt( + device: *mut ffi::libusb_device_handle, + endpoint: u8, + mut buffer: Vec, + ) -> Self { + // non-isochronous endpoints (e.g. control, bulk, interrupt) specify a value of 0 + // This is step 1 of async API + + let ptr = + NonNull::new(ffi::libusb_alloc_transfer(0)).expect("Could not allocate transfer!"); + + let user_data = Box::into_raw(Box::new(AtomicBool::new(false))).cast::(); + + let length = if endpoint & ffi::constants::LIBUSB_ENDPOINT_DIR_MASK + == ffi::constants::LIBUSB_ENDPOINT_OUT + { + // for OUT endpoints: the currently valid data in the buffer + buffer.len() + } else { + // for IN endpoints: the full capacity + buffer.capacity() + } + .try_into() + .unwrap(); + + ffi::libusb_fill_interrupt_transfer( + ptr.as_ptr(), + device, + endpoint, + buffer.as_mut_ptr(), + length, + Self::transfer_cb, + user_data, + 0, + ); + + Self { ptr, buffer } + } + + // Invariant: Caller must ensure `device` outlives this transfer + unsafe fn iso( + device: *mut ffi::libusb_device_handle, + endpoint: u8, + mut buffer: Vec, + iso_packets: i32, + ) -> Self { + // isochronous endpoints + // This is step 1 of async API + let ptr = NonNull::new(ffi::libusb_alloc_transfer(iso_packets)) + .expect("Could not allocate transfer!"); + + let user_data = Box::into_raw(Box::new(AtomicBool::new(false))).cast::(); + + let length = if endpoint & ffi::constants::LIBUSB_ENDPOINT_DIR_MASK + == ffi::constants::LIBUSB_ENDPOINT_OUT + { + // for OUT endpoints: the currently valid data in the buffer + buffer.len() + } else { + // for IN endpoints: the full capacity + buffer.capacity() + } + .try_into() + .unwrap(); + + ffi::libusb_fill_iso_transfer( + ptr.as_ptr(), + device, + endpoint, + buffer.as_mut_ptr(), + length, + iso_packets, + Self::transfer_cb, + user_data, + 0, + ); + ffi::libusb_set_iso_packet_lengths(ptr.as_ptr(), (length / iso_packets) as u32); + + Self { ptr, buffer } + } + // Part of step 4 of async API the transfer is finished being handled when + // `poll()` is called. + extern "system" fn transfer_cb(transfer: *mut ffi::libusb_transfer) { + // Safety: transfer is still valid because libusb just completed + // it but we haven't told anyone yet. user_data remains valid + // because it is freed only with the transfer. + // After the store to completed, these may no longer be valid if + // the polling thread freed it after seeing it completed. + let completed = unsafe { + let transfer = &mut *transfer; + &*transfer.user_data.cast::() + }; + completed.store(true, Ordering::SeqCst); + } + + fn transfer(&self) -> &ffi::libusb_transfer { + // Safety: transfer remains valid as long as self + unsafe { self.ptr.as_ref() } + } + + fn completed_flag(&self) -> &AtomicBool { + // Safety: transfer and user_data remain valid as long as self + unsafe { &*self.transfer().user_data.cast::() } + } + + /// Prerequisite: self.buffer ans self.ptr are both correctly set + fn swap_buffer(&mut self, new_buf: Vec) -> Vec { + let transfer_struct = unsafe { self.ptr.as_mut() }; + + let data = std::mem::replace(&mut self.buffer, new_buf); + + // Update transfer struct for new buffer + transfer_struct.actual_length = 0; // TODO: Is this necessary? + transfer_struct.buffer = self.buffer.as_mut_ptr(); + transfer_struct.length = self.buffer.capacity() as i32; + + data + } + + // Step 3 of async API + fn submit(&mut self) -> Result<()> { + self.completed_flag().store(false, Ordering::SeqCst); + let errno = unsafe { ffi::libusb_submit_transfer(self.ptr.as_ptr()) }; + + match errno { + 0 => Ok(()), + LIBUSB_ERROR_NO_DEVICE => Err(Error::Disconnected), + LIBUSB_ERROR_BUSY => { + unreachable!("We shouldn't be calling submit on transfers already submitted!") + } + LIBUSB_ERROR_NOT_SUPPORTED => Err(Error::Other("Transfer not supported")), + LIBUSB_ERROR_INVALID_PARAM => { + Err(Error::Other("Transfer size bigger than OS supports")) + } + _ => Err(Error::Errno("Error while submitting transfer: ", errno)), + } + } + + fn cancel(&mut self) { + unsafe { + ffi::libusb_cancel_transfer(self.ptr.as_ptr()); + } + } + + fn handle_completed(&mut self) -> Result> { + assert!(self.completed_flag().load(Ordering::Relaxed)); + let err = match self.transfer().status { + LIBUSB_TRANSFER_COMPLETED => { + debug_assert!(self.transfer().length >= self.transfer().actual_length); + unsafe { + self.buffer.set_len(self.transfer().actual_length as usize); + } + let data = self.swap_buffer(Vec::new()); + return Ok(data); + } + LIBUSB_TRANSFER_CANCELLED => Error::Cancelled, + LIBUSB_TRANSFER_ERROR => Error::Other("Error occurred during transfer execution"), + LIBUSB_TRANSFER_TIMED_OUT => { + unreachable!("We are using timeout=0 which means no timeout") + } + LIBUSB_TRANSFER_STALL => Error::Stall, + LIBUSB_TRANSFER_NO_DEVICE => Error::Disconnected, + LIBUSB_TRANSFER_OVERFLOW => Error::Overflow, + _ => panic!("Found an unexpected error value for transfer status"), + }; + Err(err) + } +} + +/// Invariant: transfer must not be pending +impl Drop for Transfer { + fn drop(&mut self) { + unsafe { + drop(Box::from_raw(self.transfer().user_data)); + ffi::libusb_free_transfer(self.ptr.as_ptr()); + } + } +} diff --git a/rusb-async/src/pool.rs b/rusb-async/src/pool.rs new file mode 100644 index 0000000..b4e6b33 --- /dev/null +++ b/rusb-async/src/pool.rs @@ -0,0 +1,183 @@ +use std::collections::VecDeque; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use rusb::{ffi, DeviceHandle, UsbContext}; + +use crate::{error::Error, error::Result, Transfer}; + +use std::sync::atomic::{AtomicBool, Ordering}; + +/// Represents a pool of asynchronous transfers, that can be polled to completion +pub struct TransferPool { + device: Arc>, + pending: VecDeque, +} + +impl TransferPool { + pub fn new(device: Arc>) -> Result { + Ok(Self { + device, + pending: VecDeque::new(), + }) + } + + pub fn submit_bulk(&mut self, endpoint: u8, buf: Vec) -> Result<()> { + // Safety: If transfer is submitted, it is pushed onto `pending` where it will be + // dropped before `device` is freed. + unsafe { + let mut transfer = Transfer::bulk(self.device.as_raw(), endpoint, buf); + transfer.submit()?; + self.pending.push_back(transfer); + Ok(()) + } + } + + pub fn submit_control( + &mut self, + request_type: u8, + request: u8, + value: u16, + index: u16, + data: &[u8], + ) -> Result<()> { + // Safety: If transfer is submitted, it is pushed onto `pending` where it will be + // dropped before `device` is freed. + unsafe { + let mut transfer = Transfer::control( + self.device.as_raw(), + request_type, + request, + value, + index, + data, + ); + transfer.submit()?; + self.pending.push_back(transfer); + Ok(()) + } + } + + pub unsafe fn submit_control_raw(&mut self, buffer: Vec) -> Result<()> { + // Safety: If transfer is submitted, it is pushed onto `pending` where it will be + // dropped before `device` is freed. + unsafe { + let mut transfer = Transfer::control_raw(self.device.as_raw(), buffer); + transfer.submit()?; + self.pending.push_back(transfer); + Ok(()) + } + } + + pub fn submit_interrupt(&mut self, endpoint: u8, buf: Vec) -> Result<()> { + // Safety: If transfer is submitted, it is pushed onto `pending` where it will be + // dropped before `device` is freed. + unsafe { + let mut transfer = Transfer::interrupt(self.device.as_raw(), endpoint, buf); + transfer.submit()?; + self.pending.push_back(transfer); + Ok(()) + } + } + + pub fn submit_iso(&mut self, endpoint: u8, buf: Vec, iso_packets: i32) -> Result<()> { + // Safety: If transfer is submitted, it is pushed onto `pending` where it will be + // dropped before `device` is freed. + unsafe { + let mut transfer = Transfer::iso(self.device.as_raw(), endpoint, buf, iso_packets); + transfer.submit()?; + self.pending.push_back(transfer); + Ok(()) + } + } + + pub fn poll(&mut self, timeout: Duration) -> Result> { + let next = self.pending.front().ok_or(Error::NoTransfersPending)?; + if poll_completed(self.device.context(), timeout, next.completed_flag()) { + let mut transfer = self.pending.pop_front().unwrap(); + let res = transfer.handle_completed(); + res + } else { + Err(Error::PollTimeout) + } + } + + pub fn cancel_all(&mut self) { + // Cancel in reverse order to avoid a race condition in which one + // transfer is cancelled but another submitted later makes its way onto + // the bus. + for transfer in self.pending.iter_mut().rev() { + transfer.cancel(); + } + } + + /// Returns the number of async transfers pending + pub fn pending(&self) -> usize { + self.pending.len() + } +} + +unsafe impl Send for TransferPool {} +unsafe impl Sync for TransferPool {} + +impl Drop for TransferPool { + fn drop(&mut self) { + self.cancel_all(); + while self.pending() > 0 { + self.poll(Duration::from_secs(1)).ok(); + } + } +} + +/// This is effectively libusb_handle_events_timeout_completed, but with +/// `completed` as `AtomicBool` instead of `c_int` so it is safe to access +/// without the events lock held. It also continues polling until completion, +/// timeout, or error, instead of potentially returning early. +/// +/// This design is based on +/// https://libusb.sourceforge.io/api-1.0/libusb_mtasync.html#threadwait +/// +/// Returns `true` when `completed` becomes true, `false` on timeout, and panics on +/// any other libusb error. +fn poll_completed(ctx: &impl UsbContext, timeout: Duration, completed: &AtomicBool) -> bool { + use ffi::constants::LIBUSB_ERROR_TIMEOUT; + + let deadline = Instant::now() + timeout; + + let mut err = 0; + while err == 0 && !completed.load(Ordering::SeqCst) && deadline > Instant::now() { + let remaining = deadline.saturating_duration_since(Instant::now()); + let timeval = libc::timeval { + tv_sec: remaining.as_secs().try_into().unwrap(), + tv_usec: remaining.subsec_micros().try_into().unwrap(), + }; + unsafe { + if ffi::libusb_try_lock_events(ctx.as_raw()) == 0 { + if !completed.load(Ordering::SeqCst) + && ffi::libusb_event_handling_ok(ctx.as_raw()) != 0 + { + err = ffi::libusb_handle_events_locked(ctx.as_raw(), &timeval as *const _); + } + ffi::libusb_unlock_events(ctx.as_raw()); + } else { + ffi::libusb_lock_event_waiters(ctx.as_raw()); + if !completed.load(Ordering::SeqCst) + && ffi::libusb_event_handler_active(ctx.as_raw()) != 0 + { + ffi::libusb_wait_for_event(ctx.as_raw(), &timeval as *const _); + } + ffi::libusb_unlock_event_waiters(ctx.as_raw()); + } + } + } + + match err { + 0 => completed.load(Ordering::SeqCst), + LIBUSB_ERROR_TIMEOUT => false, + _ => panic!( + "Error {} when polling transfers: {}", + err, + unsafe { std::ffi::CStr::from_ptr(ffi::libusb_strerror(err)) }.to_string_lossy() + ), + } +}