From 186526f2fe0f9f6f4c77d8a13273b11ebd16d767 Mon Sep 17 00:00:00 2001 From: "S.J.R. van Schaik" Date: Fri, 4 Nov 2022 16:14:35 -0400 Subject: [PATCH 1/8] Provide asynchronous interface to rusb in rusb-async --- Cargo.toml | 2 +- rusb-async/Cargo.toml | 26 + rusb-async/examples/read_async.rs | 50 ++ rusb-async/src/context.rs | 70 +++ rusb-async/src/lib.rs | 5 + rusb-async/src/transfer.rs | 865 ++++++++++++++++++++++++++++++ 6 files changed, 1017 insertions(+), 1 deletion(-) create mode 100644 rusb-async/Cargo.toml create mode 100644 rusb-async/examples/read_async.rs create mode 100644 rusb-async/src/context.rs create mode 100644 rusb-async/src/lib.rs create mode 100644 rusb-async/src/transfer.rs diff --git a/Cargo.toml b/Cargo.toml index daf70fa..f977db6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ travis-ci = { repository = "a1ien/rusb" } vendored = [ "libusb1-sys/vendored" ] [workspace] -members = ["libusb1-sys"] +members = ["libusb1-sys", "rusb-async"] [dependencies] libusb1-sys = { path = "libusb1-sys", version = "0.6.0" } diff --git a/rusb-async/Cargo.toml b/rusb-async/Cargo.toml new file mode 100644 index 0000000..a828e38 --- /dev/null +++ b/rusb-async/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "rusb-async" +version = "0.0.1-alpha-2" +edition = "2021" +authors = [ +"Ilya Averyanov ", +"Ryan Butler ", +"Kevin Mehall ", +] + +description = "Rust library for accessing USB devices." +license = "MIT" +homepage = "https://github.com/a1ien/rusb" +repository = "https://github.com/a1ien/rusb.git" +keywords = ["usb", "libusb", "async"] + +[features] +vendored = [ "rusb/vendored" ] + +[dependencies] +async-trait = "0.1" +rusb = { path = "..", version = "0.9.1" } +libc = "0.2" + +[dev-dependencies] +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } diff --git a/rusb-async/examples/read_async.rs b/rusb-async/examples/read_async.rs new file mode 100644 index 0000000..07c63d9 --- /dev/null +++ b/rusb-async/examples/read_async.rs @@ -0,0 +1,50 @@ +use rusb::UsbContext as _; +use rusb_async::{Context, DeviceHandleExt as _}; + +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; + +const BUF_SIZE: usize = 64; + +fn convert_argument(input: &str) -> u16 { + if input.starts_with("0x") { + return u16::from_str_radix(input.trim_start_matches("0x"), 16).unwrap(); + } + u16::from_str_radix(input, 10) + .expect("Invalid input, be sure to add `0x` for hexadecimal values.") +} + +#[tokio::main] +async fn main() { + let args: Vec = std::env::args().collect(); + + if args.len() < 4 { + eprintln!("Usage: read_async "); + return; + } + + let vid = convert_argument(args[1].as_ref()); + let pid = convert_argument(args[2].as_ref()); + let endpoint: u8 = FromStr::from_str(args[3].as_ref()).unwrap(); + + let ctx = Context::new().expect("Could not initialize libusb"); + let device = Arc::new( + ctx.open_device_with_vid_pid(vid, pid) + .expect("Could not find device"), + ); + + let timeout = Duration::from_secs(10); + let mut buffer = Vec::with_capacity(BUF_SIZE); + + loop { + let (bytes, n) = device + .read_bulk_async(endpoint, buffer, timeout) + .await + .expect("Failed to submit transfer"); + + println!("Got data: {} {:?}", n, &bytes[..n]); + + buffer = bytes; + } +} diff --git a/rusb-async/src/context.rs b/rusb-async/src/context.rs new file mode 100644 index 0000000..08a869c --- /dev/null +++ b/rusb-async/src/context.rs @@ -0,0 +1,70 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread::JoinHandle; +use std::time::Duration; + +use rusb::{Error, UsbContext}; +use rusb::ffi::*; + +struct EventThread { + thread: Option>>, + should_quit: Arc, +} + +impl EventThread { + fn new(context: &mut rusb::Context) -> Self { + let thread_context = context.clone(); + let tx = Arc::new(AtomicBool::new(false)); + let rx = tx.clone(); + + let thread = std::thread::spawn(move || -> Result<(), Error> { + while !rx.load(Ordering::SeqCst) { + thread_context.handle_events(Some(Duration::from_millis(0)))?; + } + + Ok(()) + }); + + Self { + thread: Some(thread), + should_quit: tx, + } + } +} + +impl Drop for EventThread { + fn drop(&mut self) { + self.should_quit.store(true, Ordering::SeqCst); + + let _ = self.thread + .take() + .map(|thread| thread.join()); + } +} + +/// A `libusb` context with a dedicated thread to handle events in the background. +#[derive(Clone)] +pub struct Context { + inner: rusb::Context, + _thread: Arc, +} + +impl Context { + /// Opens a new `libusb` context and spawns a thread to handle events in the background for + /// that context. + pub fn new() -> Result { + let mut inner = rusb::Context::new()?; + let thread = EventThread::new(&mut inner); + + Ok(Self { + inner, + _thread: Arc::new(thread), + }) + } +} + +impl UsbContext for Context { + fn as_raw(&self) -> *mut libusb_context { + self.inner.as_raw() + } +} diff --git a/rusb-async/src/lib.rs b/rusb-async/src/lib.rs new file mode 100644 index 0000000..a4754ac --- /dev/null +++ b/rusb-async/src/lib.rs @@ -0,0 +1,5 @@ +pub mod context; +pub mod transfer; + +pub use crate::context::Context; +pub use crate::transfer::{CancellationToken, DeviceHandleExt, Transfer}; diff --git a/rusb-async/src/transfer.rs b/rusb-async/src/transfer.rs new file mode 100644 index 0000000..6a16fde --- /dev/null +++ b/rusb-async/src/transfer.rs @@ -0,0 +1,865 @@ +use std::convert::TryInto; +use std::ffi::c_void; +use std::future::Future; +use std::pin::Pin; +use std::ptr::NonNull; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; +use std::time::Duration; + +use async_trait::async_trait; +use libc::{c_int, c_uint}; +use rusb::{DeviceHandle, Error, UsbContext}; +use rusb::constants::*; +use rusb::ffi::*; + +const LIBUSB_TRANSFER_ACTIVE: c_int = -1; + +fn check_transfer_error(status: c_int) -> Result<(), Error> { + if status < 0 { + Err(match status { + LIBUSB_ERROR_NO_DEVICE => Error::NoDevice, + LIBUSB_ERROR_BUSY => Error::Busy, + LIBUSB_ERROR_NOT_SUPPORTED => Error::NotSupported, + LIBUSB_ERROR_INVALID_PARAM => Error::InvalidParam, + _ => Error::Other, + }) + } else { + Ok(()) + } +} + +struct WrappedTransfer(NonNull); + +// SAFETY: only used behind an `Arc>`. +unsafe impl Send for WrappedTransfer {} + +struct InnerTransfer { + transfer: WrappedTransfer, + _context: T, + status: c_int, + actual_length: c_int, + waker: Option, +} + +impl InnerTransfer { + fn new(context: T) -> Result { + let transfer = unsafe { libusb_alloc_transfer(0) }; + + if transfer.is_null() { + return Err(Error::NoMem); + } + + Ok(Self { + // SAFETY: transfer is not NULL. + transfer: unsafe { WrappedTransfer(NonNull::new_unchecked(transfer)) }, + _context: context, + status: LIBUSB_TRANSFER_ACTIVE, + actual_length: -1, + waker: None, + }) + } + + fn as_ptr(&mut self) -> *mut libusb_transfer { + self.transfer.0.as_ptr() + } +} + +impl Drop for InnerTransfer { + fn drop(&mut self) { + // SAFETY: transfer points to a valid libusb_transfer struct. + unsafe { libusb_free_transfer(self.transfer.0.as_ptr()) }; + } +} + +extern "system" fn transfer_finished( + transfer_ptr: *mut libusb_transfer, +) { + if transfer_ptr.is_null() { + return; + } + + // SAFETY: transfer_ptr is not NULL. + let transfer: &mut libusb_transfer = unsafe { &mut *transfer_ptr }; + let user_data = transfer.user_data; + + if user_data.is_null() { + return; + } + + // SAFETY: user_data is not NULL and the only user always passes a valid `Arc>>`. + let inner = unsafe { Arc::from_raw(user_data as *mut Mutex>) }; + let mut inner = inner.lock().unwrap(); + + inner.status = transfer.status; + inner.actual_length = transfer.actual_length; + + if let Some(waker) = inner.waker.take() { + waker.wake() + } +} + +/// Represents a cancellation token for the [`Transfer`]. This allows the user to cancel the USB +/// transfer while it is still pending. +#[derive(Clone)] +pub struct CancellationToken { + inner: Arc>>, +} + +impl CancellationToken { + /// Asynchronously cancels the pending USB transfer. This function returns immediately, but + /// this does not indicate that the cancellation is complete. Instead this will unblock the + /// task awaiting the [`Transfer`] future and cause it to return [`Error::Interrupted`]. + pub fn cancel(&self) { + let mut inner = self.inner.lock().unwrap(); + let ptr = inner.as_ptr(); + + if inner.status == LIBUSB_TRANSFER_ACTIVE { + // SAFETY: the transfer is guarded by `Arc>` and we only cancel the transfer + // if it is still active. + unsafe { + libusb_cancel_transfer(ptr); + } + } + } +} + +/// Represents a submitted USB transfer that can be polled until completion, in which case it will +/// return the ownership of the associated buffer. In the transfer got cancelled, this returns +/// [`Error::Interrupted`]. +pub struct Transfer { + inner: Arc>>, + _context: T, + buffer: Arc>>>>, +} + +impl Drop for Transfer { + fn drop(&mut self) { + let mut inner = self.inner.lock().unwrap(); + let ptr = inner.as_ptr(); + + inner.waker = None; + + if inner.status == LIBUSB_TRANSFER_ACTIVE { + // SAFETY: the transfer is guarded by `Arc>` and we only cancel the transfer + // if it is still active. + unsafe { + libusb_cancel_transfer(ptr); + } + } + } +} + +impl Future for Transfer { + type Output = Result<(Vec, usize), (Vec, Error)>; + + fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { + let mut inner = self.inner.lock().unwrap(); + + // The transfer has not been completed, cancelled or errored out. Clone the waker and + // return that the transfer is still pending. + if inner.status == LIBUSB_TRANSFER_ACTIVE { + inner.waker = Some(ctx.waker().clone()); + + return Poll::Pending; + } + + // At this point it is safe to claim ownership of the buffer since the transfer_finished + // callback has been called as the transfer has been completed, cancelled or errored out. + // + // In addition, `Future::poll()` should not be called a second time after it returns + // `Poll::Ready`. Thus, it is safe to panic. + let buffer = self.buffer + .lock() + .unwrap() + .take() + .map(|buffer| Pin::>::into_inner(buffer).into_vec()) + .unwrap(); + + // The transfer completed. + if inner.status == LIBUSB_TRANSFER_COMPLETED { + return Poll::Ready(Ok((buffer, inner.actual_length as usize))); + } + + // The transfer has either been cancelled or errored out. + let e = match inner.status { + LIBUSB_TRANSFER_TIMED_OUT => Error::Timeout, + LIBUSB_TRANSFER_CANCELLED => Error::Interrupted, + _ => Error::Other, + }; + + return Poll::Ready(Err((buffer, e))); + } +} + +impl Transfer { + /// Constructs a new bulk transfer to transfer data to/from the bulk endpoint with the address + /// given by the `endpoint` parameter and fills `data` with any data received from the endpoint + /// or writes the contents of `data` to the endpoint depending on the direction of the + /// endpoint. The transfer will claim ownership of `data` until the request completes, gets + /// cancelled or errors out, upon which the ownership of `data` is given back to the task + /// awaiting this transfer. + /// + /// The function blocks up to the amount of time specified by `timeout`. Minimal `timeout` is 1 + /// milliseconds, anything smaller will result in an infinite block. + /// + /// If the return value is `Ok((data, n))`, then `data` is populated with `n` bytes of data + /// received from the endpoint for readable endpoints. Otherwise for writeable endpoints, `n` + /// bytes of `data` were written to the endpoint. + /// + /// ## Errors + /// + /// If this function encounters any form of error while fulfilling the transfer request, an + /// error variant will be returned. If an error variant is returned, no bytes were transferred. + /// + /// The errors returned by polling this transfer include: + /// + /// * `InvalidParam` if the endpoint is not an output endpoint. + /// * `Timeout` if the transfer timed out. + /// * `Interrupted` if the transfer was cancelled. + /// * `Pipe` if the endpoint halted. + /// * `NoDevice` if the device has been disconnected. + /// * `Io` if the transfer encountered an I/O error. + pub fn new_bulk_transfer( + device: &DeviceHandle, + endpoint: u8, + data: Vec, + timeout: Duration, + ) -> Result, Error)> { + let context = device.context().clone(); + let device = unsafe { NonNull::new_unchecked(device.as_raw()) }; + + let max_size = data.len() as i32; + let timeout = timeout.as_millis() as c_uint; + + let mut inner = match InnerTransfer::new(context.clone()) { + Ok(inner) => inner, + Err(e) => return Err((data, e)), + }; + let transfer_ptr = inner.as_ptr(); + let transfer = Arc::new(Mutex::new(inner)); + + let mut buffer: Pin> = data.into_boxed_slice().into(); + + let result = { + let state_ptr = Arc::into_raw(transfer.clone()) as *mut c_void; + let buffer: *mut u8 = buffer.as_mut_ptr(); + + unsafe { + libusb_fill_bulk_transfer( + transfer_ptr, + device.as_ptr(), + endpoint, + buffer, + max_size, + transfer_finished:: as _, + state_ptr, + timeout, + ); + } + + unsafe { libusb_submit_transfer(transfer_ptr) } + }; + + if let Err(e) = check_transfer_error(result) { + return Err((Pin::>::into_inner(buffer).into_vec(), e)); + } + + Ok(Self { + inner: transfer, + _context: context, + buffer: Arc::new(Mutex::new(Some(buffer))), + }) + } + + /// Construct a new control transfer to transfer data to/from the device using a control + /// transfer and fills `data` with any data received during the transfer or writes the contents + /// of `data` to the device depending on the direction of `request_type`. The transfer will + /// claim ownership of `data` until the request completes, gets cancelled or errors out, upon + /// which the ownership of `data` is given back to the task awaiting this transfer. + /// + /// The parameters `request_type`, `request`, `value` and `index` specify the fields of the + /// control transfer setup packet (`bmRequestType`, `bmRequest`, `wValue` and `wIndex` + /// respectively). The values for each of these parameters shall be given in host-endian byte + /// order. The value for the `request_type` parameter can be built with the helper function + /// [request_type()](fn.request_type.html). The meaning of the other parameters depends on the + /// type of control request. + /// + /// As these parameters are stored in the first [`LIBUSB_CONTROL_SETUP_SIZE`] bytes of the + /// control request, the buffer must be at least [`LIBUSB_CONTROL_SETUP_SIZE`] bytes in size. + /// + /// The function blocks up to the amount of time specified by `timeout`. Minimal `timeout` is 1 + /// milliseconds, anything smaller will result in an infinite block. + /// + /// If the return value is `Ok((data, n))`, then `data` is populated with `n` bytes of data + /// received from the device for read requests. This data can be found starting at offset + /// [`LIBUSB_CONTROL_SETUP_SIZE`] in `data`. Otherwise for write requests, `n` bytes of `data` + /// were written to the device. + /// + /// ## Errors + /// + /// If this function encounters any form of error while fulfilling the transfer request, an + /// error variant will be returned. If an error variant is returned, no bytes were transferred. + /// + /// The errors returned by this function include: + /// + /// * `InvalidParam` if buffer is not at least [`LIBUSB_CONTROL_SETUP_SIZE`] bytes in size. + /// * `Timeout` if the transfer timed out. + /// * `Interrupted` if the transfer was cancelled. + /// * `Pipe` if the endpoint halted. + /// * `NoDevice` if the device has been disconnected. + /// * `Io` if the transfer encountered an I/O error. + /// + /// [`LIBUSB_CONTROL_SETUP_SIZE`]: rusb::constants::LIBUSB_CONTROL_SETUP_SIZE + pub fn new_control_transfer( + device: &DeviceHandle, + request_type: u8, + request: u8, + value: u16, + index: u16, + data: Vec, + timeout: Duration, + ) -> Result, Error)> { + if data.len() < LIBUSB_CONTROL_SETUP_SIZE { + return Err((data, Error::InvalidParam)); + } + + let context = device.context().clone(); + // SAFETY: device.as_raw() must be a valid pointer. + let device = unsafe { NonNull::new_unchecked(device.as_raw()) }; + + let max_size: u16 = match (data.len() - LIBUSB_CONTROL_SETUP_SIZE).try_into() { + Ok(n) => n, + Err(_) => return Err((data, Error::InvalidParam)), + }; + + let timeout = timeout.as_millis() as c_uint; + + let mut inner = match InnerTransfer::new(context.clone()) { + Ok(inner) => inner, + Err(e) => return Err((data, e)), + }; + let transfer_ptr = inner.as_ptr(); + let transfer = Arc::new(Mutex::new(inner)); + + let mut buffer: Pin> = data.into_boxed_slice().into(); + + let result = { + let state_ptr = Arc::into_raw(transfer.clone()) as *mut c_void; + let buffer: *mut u8 = buffer.as_mut_ptr(); + + // SAFETY: buffer has at least LIBUSB_CONTROL_SETUP_SIZE bytes and is a valid pointer. + unsafe { + libusb_fill_control_setup( + buffer, + request_type, + request, + value, + index, + max_size + ); + } + + // SAFETY: transfer_ptr, device.as_ptr(), buffer, state_ptr are all valid. This is the + // only user of transfer_finished and transfer_finished gets state_ptr which is of the + // type `Arc>>` as expected. These pointers remain valid until + // `transfer_finished` gets called. + unsafe { + libusb_fill_control_transfer( + transfer_ptr, + device.as_ptr(), + buffer, + transfer_finished:: as _, + state_ptr, + timeout, + ); + } + + // SAFETY: we ensure that transfer_ptr and buffer are valid until completion, + // cancellation or an error occurs. In addition, as buffer is `Pin`, it is guaranteed + // to not move around while the transfer is in progress. + unsafe { libusb_submit_transfer(transfer_ptr) } + }; + + if let Err(e) = check_transfer_error(result) { + return Err((Pin::>::into_inner(buffer).into_vec(), e)); + } + + Ok(Self { + inner: transfer, + _context: context, + buffer: Arc::new(Mutex::new(Some(buffer))), + }) + } + + /// Constructs a new interrupt transfer to transfer data to/from the interrupt endpoint with + /// the address given by the `endpoint` parameter and fills `data` with any data received from + /// the endpoint or writes the contents of `data` to the endpoint depending on the direction of + /// the endpoint. The transfer will claim ownership of `data` until the request completes, gets + /// cancelled or errors out, upon which the ownership of `data` is given back to the task + /// awaiting this transfer. + /// + /// The function blocks up to the amount of time specified by `timeout`. Minimal `timeout` is 1 + /// milliseconds, anything smaller will result in an infinite block. + /// + /// If the return value is `Ok((data, n))`, then `data` is populated with `n` bytes of data + /// received from the endpoint for readable endpoints. Otherwise for writeable endpoints, `n` + /// bytes of `data` were written to the endpoint. + /// + /// ## Errors + /// + /// If this function encounters any form of error while fulfilling the transfer request, an + /// error variant will be returned. If an error variant is returned, no bytes were transferred. + /// + /// The errors returned by polling this transfer include: + /// + /// * `InvalidParam` if the endpoint is not an output endpoint. + /// * `Timeout` if the transfer timed out. + /// * `Interrupted` if the transfer was cancelled. + /// * `Pipe` if the endpoint halted. + /// * `NoDevice` if the device has been disconnected. + /// * `Io` if the transfer encountered an I/O error. + pub fn new_interrupt_transfer( + device: &DeviceHandle, + endpoint: u8, + data: Vec, + timeout: Duration, + ) -> Result, Error)> { + let context = device.context().clone(); + let device = unsafe { NonNull::new_unchecked(device.as_raw()) }; + + let max_size = data.len() as i32; + let timeout = timeout.as_millis() as c_uint; + + let mut inner = match InnerTransfer::new(context.clone()) { + Ok(inner) => inner, + Err(e) => return Err((data, e)), + }; + let transfer_ptr = inner.as_ptr(); + let transfer = Arc::new(Mutex::new(inner)); + + let mut buffer: Pin> = data.into_boxed_slice().into(); + + let result = { + let state_ptr = Arc::into_raw(transfer.clone()) as *mut c_void; + let buffer: *mut u8 = buffer.as_mut_ptr(); + + unsafe { + libusb_fill_interrupt_transfer( + transfer_ptr, + device.as_ptr(), + endpoint, + buffer, + max_size, + transfer_finished:: as _, + state_ptr, + timeout, + ); + } + + unsafe { libusb_submit_transfer(transfer_ptr) } + }; + + if let Err(e) = check_transfer_error(result) { + return Err((Pin::>::into_inner(buffer).into_vec(), e)); + } + + Ok(Self { + inner: transfer, + _context: context, + buffer: Arc::new(Mutex::new(Some(buffer))), + }) + } + + /// Constructs a [`CancellationToken`] that can be used to cancel the USB transfer. + pub fn cancellation_token(&self) -> CancellationToken { + CancellationToken { + inner: self.inner.clone(), + } + } +} + +#[async_trait] +pub trait DeviceHandleExt { + /// Asynchronously reads from a bulk endpoint. + /// + /// This function attempts to asynchronously read from the bulk endpoint with the address given + /// by the `endpoint` parameter and fills `data` with any data received from the endpoint. This + /// function will claim ownership of `data` until the request completes, gets cancelled or + /// errors out, upon which the ownership of `data` is given back to the caller of this function. + /// + /// The function blocks up to the amount of time specified by `timeout`. Minimal `timeout` is 1 + /// milliseconds, anything smaller will result in an infinite block. + /// + /// In case you want the ability to cancel the USB transfer, consider using + /// [`Transfer::new_bulk_transfer()`] instead. + /// + /// If the return value is `Ok((data, n))`, then `data` is populated with `n` bytes of data + /// received from the endpoint. + /// + /// ## Errors + /// + /// If this function encounters any form of error while fulfilling the transfer request, an + /// error variant will be returned. If an error variant is returned, no bytes were read. + /// + /// The errors returned by this function include: + /// + /// * `InvalidParam` if the endpoint is not an input endpoint. + /// * `Timeout` if the transfer timed out. + /// * `Interrupted` if the transfer was cancelled. + /// * `Pipe` if the endpoint halted. + /// * `NoDevice` if the device has been disconnected. + /// * `Io` if the transfer encountered an I/O error. + async fn read_bulk_async( + &self, + endpoint: u8, + data: Vec, + timeout: Duration, + ) -> Result<(Vec, usize), (Vec, Error)>; + + /// Asynchronously reads data using a control transfer. + /// + /// This function attempts to asynchronously read data from the device using a control transfer + /// and fills `data` with any data received during the transfer. This function will claim + /// ownership of `data` until the request completes, gets cancelled or errors out, upon which + /// the ownership of `data` is given back to the caller of this function. + /// + /// The parameters `request_type`, `request`, `value` and `index` specify the fields of the + /// control transfer setup packet (`bmRequestType`, `bmRequest`, `wValue` and `wIndex` + /// respectively). The values for each of these parameters shall be given in host-endian byte + /// order. The value for the `request_type` parameter can be built with the helper function + /// [request_type()](fn.request_type.html). The meaning of the other parameters depends on the + /// type of control request. + /// + /// As these parameters are stored in the first [`LIBUSB_CONTROL_SETUP_SIZE`] bytes of the + /// control request, the buffer must be at least [`LIBUSB_CONTROL_SETUP_SIZE`] bytes in size. + /// + /// The function blocks up to the amount of time specified by `timeout`. Minimal `timeout` is 1 + /// milliseconds, anything smaller will result in an infinite block. + /// + /// In case you want the ability to cancel the USB transfer, consider using + /// [`Transfer::new_control_transfer()`] instead. + /// + /// If the return value is `Ok((data, n))`, then `data` is populated with `n` bytes of data + /// received from the device. This data can be found starting at offset + /// [`LIBUSB_CONTROL_SETUP_SIZE`] in `data`. + /// + /// ## Errors + /// + /// If this function encounters any form of error while fulfilling the transfer request, an + /// error variant will be returned. If an error variant is returned, no bytes were read. + /// + /// The errors returned by this function include: + /// + /// * `InvalidParam` if the `request_type` does not specify a read transfer. + /// * `InvalidParam` if buffer is not at least [`LIBUSB_CONTROL_SETUP_SIZE`] bytes in size. + /// * `Timeout` if the transfer timed out. + /// * `Interrupted` if the transfer was cancelled. + /// * `Pipe` if the endpoint halted. + /// * `NoDevice` if the device has been disconnected. + /// * `Io` if the transfer encountered an I/O error. + /// + /// [`LIBUSB_CONTROL_SETUP_SIZE`]: rusb::constants::LIBUSB_CONTROL_SETUP_SIZE + async fn read_control_async( + &self, + request_type: u8, + request: u8, + value: u16, + index: u16, + data: Vec, + timeout: Duration, + ) -> Result<(Vec, usize), (Vec, Error)>; + + /// Asynchronously reads from an interrupt endpoint. + /// + /// This function attempts to asynchronously read from the interrupt endpoint with the address + /// given by the `endpoint` parameter and fills `data` with any data received from the endpoint. + /// This function will claim ownership of `data` until the request completes, gets cancelled or + /// errors out, upon which the ownership of `data` is given back to the caller of this function. + /// + /// The function blocks up to the amount of time specified by `timeout`. Minimal `timeout` is 1 + /// milliseconds, anything smaller will result in an infinite block. + /// + /// In case you want the ability to cancel the USB transfer, consider using + /// [`Transfer::new_bulk_transfer()`] instead. + /// + /// If the return value is `Ok((data, n))`, then `data` is populated with `n` bytes of data + /// received from the endpoint. + /// + /// ## Errors + /// + /// If this function encounters any form of error while fulfilling the transfer request, an + /// error variant will be returned. If an error variant is returned, no bytes were read. + /// + /// The errors returned by this function include: + /// + /// * `InvalidParam` if the endpoint is not an input endpoint. + /// * `Timeout` if the transfer timed out. + /// * `Interrupted` if the transfer was cancelled. + /// * `Pipe` if the endpoint halted. + /// * `NoDevice` if the device has been disconnected. + /// * `Io` if the transfer encountered an I/O error. + async fn read_interrupt_async( + &self, + endpoint: u8, + data: Vec, + timeout: Duration, + ) -> Result<(Vec, usize), (Vec, Error)>; + + /// Asynchronously writes to a bulk endpoint. + /// + /// This function attempts to asynchronously write the contents of `data` to the bulk endpoint + /// with the address given by the `endpoint` parameter. This function will claim ownership of + /// `data` until the request completes, gets cancelled or errors out, upon which the ownership + /// of `data` is given back to the caller of this function. + /// + /// The function blocks up to the amount of time specified by `timeout`. Minimal `timeout` is 1 + /// milliseconds, anything smaller will result in an infinite block. + /// + /// In case you want the ability to cancel the USB transfer, consider using + /// [`Transfer::new_bulk_transfer()`] instead. + /// + /// If the return value is `Ok((_, n))`, then `n` bytes of `data` were written to the endpoint. + /// + /// ## Errors + /// + /// If this function encounters any form of error while fulfilling the transfer request, an + /// error variant will be returned. If an error variant is returned, no bytes were written. + /// + /// The errors returned by this function include: + /// + /// * `InvalidParam` if the endpoint is not an output endpoint. + /// * `Timeout` if the transfer timed out. + /// * `Interrupted` if the transfer was cancelled. + /// * `Pipe` if the endpoint halted. + /// * `NoDevice` if the device has been disconnected. + /// * `Io` if the transfer encountered an I/O error. + async fn write_bulk_async( + &self, + endpoint: u8, + data: Vec, + timeout: Duration, + ) -> Result<(Vec, usize), (Vec, Error)>; + + /// Asynchronously writes data using a control transfer. + /// + /// This function attempts to asynchronously write data to the device using a control transfer + /// and writes the contents of `data` during the transfer. This function will claim ownership + /// of `data` until the request completes, gets cancelled or errors out, upon which the + /// ownership of `data` is given back to the caller of this function. + /// + /// The parameters `request_type`, `request`, `value` and `index` specify the fields of the + /// control transfer setup packet (`bmRequestType`, `bmRequest`, `wValue` and `wIndex` + /// respectively). The values for each of these parameters shall be given in host-endian byte + /// order. The value for the `request_type` parameter can be built with the helper function + /// [request_type()](fn.request_type.html). The meaning of the other parameters depends on the + /// type of control request. + /// + /// As these parameters are stored in the first [`LIBUSB_CONTROL_SETUP_SIZE`] bytes of the + /// control request, the buffer must be at least [`LIBUSB_CONTROL_SETUP_SIZE`] bytes in size. + /// The actual data must start at offset [`LIBUSB_CONTROL_SETUP_SIZE`]. + /// + /// The function blocks up to the amount of time specified by `timeout`. Minimal `timeout` is 1 + /// milliseconds, anything smaller will result in an infinite block. + /// + /// In case you want the ability to cancel the USB transfer, consider using + /// [`Transfer::new_control_transfer()`] instead. + /// + /// If the return value is `Ok((_, n))`, then `n` bytes have been written to the device. + /// + /// ## Errors + /// + /// If this function encounters any form of error while fulfilling the transfer request, an + /// error variant will be returned. If an error variant is returned, no bytes were written. + /// + /// The errors returned by this function include: + /// + /// * `InvalidParam` if the `request_type` does not specify a write transfer. + /// * `InvalidParam` if buffer is not at least [`LIBUSB_CONTROL_SETUP_SIZE`] bytes in size. + /// * `Timeout` if the transfer timed out. + /// * `Interrupted` if the transfer was cancelled. + /// * `Pipe` if the endpoint halted. + /// * `NoDevice` if the device has been disconnected. + /// * `Io` if the transfer encountered an I/O error. + /// + /// [`LIBUSB_CONTROL_SETUP_SIZE`]: rusb::constants::LIBUSB_CONTROL_SETUP_SIZE + async fn write_control_async( + &self, + request_type: u8, + request: u8, + value: u16, + index: u16, + data: Vec, + timeout: Duration, + ) -> Result<(Vec, usize), (Vec, Error)>; + + /// Asynchronously writes to an interrupt endpoint. + /// + /// This function attempts to asynchronously write the contents of `data` to the interrupt + /// endpoint with the address given by the `endpoint` parameter. This function will claim + /// ownership of `data` until the request completes, gets cancelled or errors out, upon which + /// the ownership of `data` is given back to the caller of this function. + /// + /// The function blocks up to the amount of time specified by `timeout`. Minimal `timeout` is 1 + /// milliseconds, anything smaller will result in an infinite block. + /// + /// In case you want the ability to cancel the USB transfer, consider using + /// [`Transfer::new_interrupt_transfer()`] instead. + /// + /// If the return value is `Ok((_, n))`, then `n` bytes of `data` were written to the endpoint. + /// + /// ## Errors + /// + /// If this function encounters any form of error while fulfilling the transfer request, an + /// error variant will be returned. If an error variant is returned, no bytes were written. + /// + /// The errors returned by this function include: + /// + /// * `InvalidParam` if the endpoint is not an output endpoint. + /// * `Timeout` if the transfer timed out. + /// * `Interrupted` if the transfer was cancelled. + /// * `Pipe` if the endpoint halted. + /// * `NoDevice` if the device has been disconnected. + /// * `Io` if the transfer encountered an I/O error. + async fn write_interrupt_async( + &self, + endpoint: u8, + data: Vec, + timeout: Duration, + ) -> Result<(Vec, usize), (Vec, Error)>; +} + +#[async_trait] +impl DeviceHandleExt for DeviceHandle { + async fn read_bulk_async( + &self, + endpoint: u8, + data: Vec, + timeout: Duration, + ) -> Result<(Vec, usize), (Vec, Error)> { + if endpoint & LIBUSB_ENDPOINT_DIR_MASK != LIBUSB_ENDPOINT_IN { + return Err((data, Error::InvalidParam)); + } + + let transfer = Transfer::new_bulk_transfer( + self, + endpoint, + data, + timeout, + )?; + + Ok(transfer.await?) + } + + async fn read_control_async( + &self, + request_type: u8, + request: u8, + value: u16, + index: u16, + data: Vec, + timeout: Duration, + ) -> Result<(Vec, usize), (Vec, Error)> { + if request_type & LIBUSB_ENDPOINT_DIR_MASK != LIBUSB_ENDPOINT_IN { + return Err((data, Error::InvalidParam)); + } + + let transfer = Transfer::new_control_transfer( + self, + request_type, + request, + value, + index, + data, + timeout, + )?; + + Ok(transfer.await?) + } + + async fn read_interrupt_async( + &self, + endpoint: u8, + data: Vec, + timeout: Duration, + ) -> Result<(Vec, usize), (Vec, Error)> { + if endpoint & LIBUSB_ENDPOINT_DIR_MASK != LIBUSB_ENDPOINT_IN { + return Err((data, Error::InvalidParam)); + } + + let transfer = Transfer::new_interrupt_transfer( + self, + endpoint, + data, + timeout, + )?; + + Ok(transfer.await?) + } + + async fn write_bulk_async( + &self, + endpoint: u8, + data: Vec, + timeout: Duration, + ) -> Result<(Vec, usize), (Vec, Error)> { + if endpoint & LIBUSB_ENDPOINT_DIR_MASK != LIBUSB_ENDPOINT_OUT { + return Err((data, Error::InvalidParam)); + } + + let transfer = Transfer::new_bulk_transfer( + self, + endpoint, + data, + timeout, + )?; + + Ok(transfer.await?) + } + + async fn write_control_async( + &self, + request_type: u8, + request: u8, + value: u16, + index: u16, + data: Vec, + timeout: Duration, + ) -> Result<(Vec, usize), (Vec, Error)> { + if request_type & LIBUSB_ENDPOINT_DIR_MASK != LIBUSB_ENDPOINT_OUT { + return Err((data, Error::InvalidParam)); + } + + let transfer = Transfer::new_control_transfer( + self, + request_type, + request, + value, + index, + data, + timeout, + )?; + + Ok(transfer.await?) + } + + async fn write_interrupt_async( + &self, + endpoint: u8, + data: Vec, + timeout: Duration, + ) -> Result<(Vec, usize), (Vec, Error)> { + if endpoint & LIBUSB_ENDPOINT_DIR_MASK != LIBUSB_ENDPOINT_OUT { + return Err((data, Error::InvalidParam)); + } + + let transfer = Transfer::new_interrupt_transfer( + self, + endpoint, + data, + timeout, + )?; + + Ok(transfer.await?) + } +} From 2fdb5c75e40a2dae1f33596f3aed56d5cf9d2750 Mon Sep 17 00:00:00 2001 From: "S.J.R. van Schaik" Date: Fri, 4 Nov 2022 16:25:28 -0400 Subject: [PATCH 2/8] run cargo fmt --- rusb-async/src/context.rs | 8 +++---- rusb-async/src/transfer.rs | 46 ++++++++------------------------------ 2 files changed, 12 insertions(+), 42 deletions(-) diff --git a/rusb-async/src/context.rs b/rusb-async/src/context.rs index 08a869c..b36e2d6 100644 --- a/rusb-async/src/context.rs +++ b/rusb-async/src/context.rs @@ -1,10 +1,10 @@ -use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::thread::JoinHandle; use std::time::Duration; -use rusb::{Error, UsbContext}; use rusb::ffi::*; +use rusb::{Error, UsbContext}; struct EventThread { thread: Option>>, @@ -36,9 +36,7 @@ impl Drop for EventThread { fn drop(&mut self) { self.should_quit.store(true, Ordering::SeqCst); - let _ = self.thread - .take() - .map(|thread| thread.join()); + let _ = self.thread.take().map(|thread| thread.join()); } } diff --git a/rusb-async/src/transfer.rs b/rusb-async/src/transfer.rs index 6a16fde..f8bd597 100644 --- a/rusb-async/src/transfer.rs +++ b/rusb-async/src/transfer.rs @@ -9,9 +9,9 @@ use std::time::Duration; use async_trait::async_trait; use libc::{c_int, c_uint}; -use rusb::{DeviceHandle, Error, UsbContext}; use rusb::constants::*; use rusb::ffi::*; +use rusb::{DeviceHandle, Error, UsbContext}; const LIBUSB_TRANSFER_ACTIVE: c_int = -1; @@ -72,9 +72,7 @@ impl Drop for InnerTransfer { } } -extern "system" fn transfer_finished( - transfer_ptr: *mut libusb_transfer, -) { +extern "system" fn transfer_finished(transfer_ptr: *mut libusb_transfer) { if transfer_ptr.is_null() { return; } @@ -169,7 +167,8 @@ impl Future for Transfer { // // In addition, `Future::poll()` should not be called a second time after it returns // `Poll::Ready`. Thus, it is safe to panic. - let buffer = self.buffer + let buffer = self + .buffer .lock() .unwrap() .take() @@ -350,14 +349,7 @@ impl Transfer { // SAFETY: buffer has at least LIBUSB_CONTROL_SETUP_SIZE bytes and is a valid pointer. unsafe { - libusb_fill_control_setup( - buffer, - request_type, - request, - value, - index, - max_size - ); + libusb_fill_control_setup(buffer, request_type, request, value, index, max_size); } // SAFETY: transfer_ptr, device.as_ptr(), buffer, state_ptr are all valid. This is the @@ -741,12 +733,7 @@ impl DeviceHandleExt for DeviceHandle { return Err((data, Error::InvalidParam)); } - let transfer = Transfer::new_bulk_transfer( - self, - endpoint, - data, - timeout, - )?; + let transfer = Transfer::new_bulk_transfer(self, endpoint, data, timeout)?; Ok(transfer.await?) } @@ -787,12 +774,7 @@ impl DeviceHandleExt for DeviceHandle { return Err((data, Error::InvalidParam)); } - let transfer = Transfer::new_interrupt_transfer( - self, - endpoint, - data, - timeout, - )?; + let transfer = Transfer::new_interrupt_transfer(self, endpoint, data, timeout)?; Ok(transfer.await?) } @@ -807,12 +789,7 @@ impl DeviceHandleExt for DeviceHandle { return Err((data, Error::InvalidParam)); } - let transfer = Transfer::new_bulk_transfer( - self, - endpoint, - data, - timeout, - )?; + let transfer = Transfer::new_bulk_transfer(self, endpoint, data, timeout)?; Ok(transfer.await?) } @@ -853,12 +830,7 @@ impl DeviceHandleExt for DeviceHandle { return Err((data, Error::InvalidParam)); } - let transfer = Transfer::new_interrupt_transfer( - self, - endpoint, - data, - timeout, - )?; + let transfer = Transfer::new_interrupt_transfer(self, endpoint, data, timeout)?; Ok(transfer.await?) } From c6c75f7473f3675a62bee3d1bf262af97ada710c Mon Sep 17 00:00:00 2001 From: "S.J.R. van Schaik" Date: Thu, 17 Nov 2022 07:44:10 -0500 Subject: [PATCH 3/8] change transfers to use boxed slices rather than vec --- rusb-async/examples/read_async.rs | 2 +- rusb-async/src/transfer.rs | 89 ++++++++++++++----------------- 2 files changed, 42 insertions(+), 49 deletions(-) diff --git a/rusb-async/examples/read_async.rs b/rusb-async/examples/read_async.rs index 07c63d9..7a9262f 100644 --- a/rusb-async/examples/read_async.rs +++ b/rusb-async/examples/read_async.rs @@ -35,7 +35,7 @@ async fn main() { ); let timeout = Duration::from_secs(10); - let mut buffer = Vec::with_capacity(BUF_SIZE); + let mut buffer = vec![0u8; BUF_SIZE].into_boxed_slice(); loop { let (bytes, n) = device diff --git a/rusb-async/src/transfer.rs b/rusb-async/src/transfer.rs index f8bd597..a3d5bd8 100644 --- a/rusb-async/src/transfer.rs +++ b/rusb-async/src/transfer.rs @@ -128,7 +128,7 @@ impl CancellationToken { pub struct Transfer { inner: Arc>>, _context: T, - buffer: Arc>>>>, + buffer: Arc>>>, } impl Drop for Transfer { @@ -149,7 +149,7 @@ impl Drop for Transfer { } impl Future for Transfer { - type Output = Result<(Vec, usize), (Vec, Error)>; + type Output = Result<(Box<[u8]>, usize), (Box<[u8]>, Error)>; fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { let mut inner = self.inner.lock().unwrap(); @@ -172,7 +172,6 @@ impl Future for Transfer { .lock() .unwrap() .take() - .map(|buffer| Pin::>::into_inner(buffer).into_vec()) .unwrap(); // The transfer completed. @@ -222,9 +221,9 @@ impl Transfer { pub fn new_bulk_transfer( device: &DeviceHandle, endpoint: u8, - data: Vec, + mut data: Box<[u8]>, timeout: Duration, - ) -> Result, Error)> { + ) -> Result, Error)> { let context = device.context().clone(); let device = unsafe { NonNull::new_unchecked(device.as_raw()) }; @@ -238,11 +237,9 @@ impl Transfer { let transfer_ptr = inner.as_ptr(); let transfer = Arc::new(Mutex::new(inner)); - let mut buffer: Pin> = data.into_boxed_slice().into(); - let result = { let state_ptr = Arc::into_raw(transfer.clone()) as *mut c_void; - let buffer: *mut u8 = buffer.as_mut_ptr(); + let buffer: *mut u8 = data.as_mut_ptr(); unsafe { libusb_fill_bulk_transfer( @@ -261,13 +258,13 @@ impl Transfer { }; if let Err(e) = check_transfer_error(result) { - return Err((Pin::>::into_inner(buffer).into_vec(), e)); + return Err((data, e)); } Ok(Self { inner: transfer, _context: context, - buffer: Arc::new(Mutex::new(Some(buffer))), + buffer: Arc::new(Mutex::new(Some(data))), }) } @@ -316,9 +313,9 @@ impl Transfer { request: u8, value: u16, index: u16, - data: Vec, + mut data: Box<[u8]>, timeout: Duration, - ) -> Result, Error)> { + ) -> Result, Error)> { if data.len() < LIBUSB_CONTROL_SETUP_SIZE { return Err((data, Error::InvalidParam)); } @@ -341,11 +338,9 @@ impl Transfer { let transfer_ptr = inner.as_ptr(); let transfer = Arc::new(Mutex::new(inner)); - let mut buffer: Pin> = data.into_boxed_slice().into(); - let result = { let state_ptr = Arc::into_raw(transfer.clone()) as *mut c_void; - let buffer: *mut u8 = buffer.as_mut_ptr(); + let buffer: *mut u8 = data.as_mut_ptr(); // SAFETY: buffer has at least LIBUSB_CONTROL_SETUP_SIZE bytes and is a valid pointer. unsafe { @@ -374,13 +369,13 @@ impl Transfer { }; if let Err(e) = check_transfer_error(result) { - return Err((Pin::>::into_inner(buffer).into_vec(), e)); + return Err((data, e)); } Ok(Self { inner: transfer, _context: context, - buffer: Arc::new(Mutex::new(Some(buffer))), + buffer: Arc::new(Mutex::new(Some(data))), }) } @@ -414,9 +409,9 @@ impl Transfer { pub fn new_interrupt_transfer( device: &DeviceHandle, endpoint: u8, - data: Vec, + mut data: Box<[u8]>, timeout: Duration, - ) -> Result, Error)> { + ) -> Result, Error)> { let context = device.context().clone(); let device = unsafe { NonNull::new_unchecked(device.as_raw()) }; @@ -430,11 +425,9 @@ impl Transfer { let transfer_ptr = inner.as_ptr(); let transfer = Arc::new(Mutex::new(inner)); - let mut buffer: Pin> = data.into_boxed_slice().into(); - let result = { let state_ptr = Arc::into_raw(transfer.clone()) as *mut c_void; - let buffer: *mut u8 = buffer.as_mut_ptr(); + let buffer: *mut u8 = data.as_mut_ptr(); unsafe { libusb_fill_interrupt_transfer( @@ -453,13 +446,13 @@ impl Transfer { }; if let Err(e) = check_transfer_error(result) { - return Err((Pin::>::into_inner(buffer).into_vec(), e)); + return Err((data, e)); } Ok(Self { inner: transfer, _context: context, - buffer: Arc::new(Mutex::new(Some(buffer))), + buffer: Arc::new(Mutex::new(Some(data))), }) } @@ -505,9 +498,9 @@ pub trait DeviceHandleExt { async fn read_bulk_async( &self, endpoint: u8, - data: Vec, + data: Box<[u8]>, timeout: Duration, - ) -> Result<(Vec, usize), (Vec, Error)>; + ) -> Result<(Box<[u8]>, usize), (Box<[u8]>, Error)>; /// Asynchronously reads data using a control transfer. /// @@ -558,9 +551,9 @@ pub trait DeviceHandleExt { request: u8, value: u16, index: u16, - data: Vec, + data: Box<[u8]>, timeout: Duration, - ) -> Result<(Vec, usize), (Vec, Error)>; + ) -> Result<(Box<[u8]>, usize), (Box<[u8]>, Error)>; /// Asynchronously reads from an interrupt endpoint. /// @@ -594,9 +587,9 @@ pub trait DeviceHandleExt { async fn read_interrupt_async( &self, endpoint: u8, - data: Vec, + data: Box<[u8]>, timeout: Duration, - ) -> Result<(Vec, usize), (Vec, Error)>; + ) -> Result<(Box<[u8]>, usize), (Box<[u8]>, Error)>; /// Asynchronously writes to a bulk endpoint. /// @@ -629,9 +622,9 @@ pub trait DeviceHandleExt { async fn write_bulk_async( &self, endpoint: u8, - data: Vec, + data: Box<[u8]>, timeout: Duration, - ) -> Result<(Vec, usize), (Vec, Error)>; + ) -> Result<(Box<[u8]>, usize), (Box<[u8]>, Error)>; /// Asynchronously writes data using a control transfer. /// @@ -681,9 +674,9 @@ pub trait DeviceHandleExt { request: u8, value: u16, index: u16, - data: Vec, + data: Box<[u8]>, timeout: Duration, - ) -> Result<(Vec, usize), (Vec, Error)>; + ) -> Result<(Box<[u8]>, usize), (Box<[u8]>, Error)>; /// Asynchronously writes to an interrupt endpoint. /// @@ -716,9 +709,9 @@ pub trait DeviceHandleExt { async fn write_interrupt_async( &self, endpoint: u8, - data: Vec, + data: Box<[u8]>, timeout: Duration, - ) -> Result<(Vec, usize), (Vec, Error)>; + ) -> Result<(Box<[u8]>, usize), (Box<[u8]>, Error)>; } #[async_trait] @@ -726,9 +719,9 @@ impl DeviceHandleExt for DeviceHandle { async fn read_bulk_async( &self, endpoint: u8, - data: Vec, + data: Box<[u8]>, timeout: Duration, - ) -> Result<(Vec, usize), (Vec, Error)> { + ) -> Result<(Box<[u8]>, usize), (Box<[u8]>, Error)> { if endpoint & LIBUSB_ENDPOINT_DIR_MASK != LIBUSB_ENDPOINT_IN { return Err((data, Error::InvalidParam)); } @@ -744,9 +737,9 @@ impl DeviceHandleExt for DeviceHandle { request: u8, value: u16, index: u16, - data: Vec, + data: Box<[u8]>, timeout: Duration, - ) -> Result<(Vec, usize), (Vec, Error)> { + ) -> Result<(Box<[u8]>, usize), (Box<[u8]>, Error)> { if request_type & LIBUSB_ENDPOINT_DIR_MASK != LIBUSB_ENDPOINT_IN { return Err((data, Error::InvalidParam)); } @@ -767,9 +760,9 @@ impl DeviceHandleExt for DeviceHandle { async fn read_interrupt_async( &self, endpoint: u8, - data: Vec, + data: Box<[u8]>, timeout: Duration, - ) -> Result<(Vec, usize), (Vec, Error)> { + ) -> Result<(Box<[u8]>, usize), (Box<[u8]>, Error)> { if endpoint & LIBUSB_ENDPOINT_DIR_MASK != LIBUSB_ENDPOINT_IN { return Err((data, Error::InvalidParam)); } @@ -782,9 +775,9 @@ impl DeviceHandleExt for DeviceHandle { async fn write_bulk_async( &self, endpoint: u8, - data: Vec, + data: Box<[u8]>, timeout: Duration, - ) -> Result<(Vec, usize), (Vec, Error)> { + ) -> Result<(Box<[u8]>, usize), (Box<[u8]>, Error)> { if endpoint & LIBUSB_ENDPOINT_DIR_MASK != LIBUSB_ENDPOINT_OUT { return Err((data, Error::InvalidParam)); } @@ -800,9 +793,9 @@ impl DeviceHandleExt for DeviceHandle { request: u8, value: u16, index: u16, - data: Vec, + data: Box<[u8]>, timeout: Duration, - ) -> Result<(Vec, usize), (Vec, Error)> { + ) -> Result<(Box<[u8]>, usize), (Box<[u8]>, Error)> { if request_type & LIBUSB_ENDPOINT_DIR_MASK != LIBUSB_ENDPOINT_OUT { return Err((data, Error::InvalidParam)); } @@ -823,9 +816,9 @@ impl DeviceHandleExt for DeviceHandle { async fn write_interrupt_async( &self, endpoint: u8, - data: Vec, + data: Box<[u8]>, timeout: Duration, - ) -> Result<(Vec, usize), (Vec, Error)> { + ) -> Result<(Box<[u8]>, usize), (Box<[u8]>, Error)> { if endpoint & LIBUSB_ENDPOINT_DIR_MASK != LIBUSB_ENDPOINT_OUT { return Err((data, Error::InvalidParam)); } From ab556b0d01e57d4b91c2a656bcda53c56316e8bb Mon Sep 17 00:00:00 2001 From: "S.J.R. van Schaik" Date: Thu, 17 Nov 2022 07:56:54 -0500 Subject: [PATCH 4/8] run cargo fmt again --- rusb-async/src/transfer.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/rusb-async/src/transfer.rs b/rusb-async/src/transfer.rs index a3d5bd8..8b26488 100644 --- a/rusb-async/src/transfer.rs +++ b/rusb-async/src/transfer.rs @@ -167,12 +167,7 @@ impl Future for Transfer { // // In addition, `Future::poll()` should not be called a second time after it returns // `Poll::Ready`. Thus, it is safe to panic. - let buffer = self - .buffer - .lock() - .unwrap() - .take() - .unwrap(); + let buffer = self.buffer.lock().unwrap().take().unwrap(); // The transfer completed. if inner.status == LIBUSB_TRANSFER_COMPLETED { From 606a048b34902a2d34c3efc3e04400da20fd578a Mon Sep 17 00:00:00 2001 From: "S.J.R. van Schaik" Date: Mon, 7 Nov 2022 22:21:00 -0500 Subject: [PATCH 5/8] Implement the asynchronous hotplug API --- rusb-async/Cargo.toml | 1 + rusb-async/examples/async_hotplug.rs | 29 +++++++ rusb-async/src/hotplug.rs | 111 +++++++++++++++++++++++++++ rusb-async/src/lib.rs | 2 + 4 files changed, 143 insertions(+) create mode 100644 rusb-async/examples/async_hotplug.rs create mode 100644 rusb-async/src/hotplug.rs diff --git a/rusb-async/Cargo.toml b/rusb-async/Cargo.toml index a828e38..ac2a96b 100644 --- a/rusb-async/Cargo.toml +++ b/rusb-async/Cargo.toml @@ -19,6 +19,7 @@ vendored = [ "rusb/vendored" ] [dependencies] async-trait = "0.1" +futures = "0.3" rusb = { path = "..", version = "0.9.1" } libc = "0.2" diff --git a/rusb-async/examples/async_hotplug.rs b/rusb-async/examples/async_hotplug.rs new file mode 100644 index 0000000..b8048f0 --- /dev/null +++ b/rusb-async/examples/async_hotplug.rs @@ -0,0 +1,29 @@ +use rusb::Error; +use rusb_async::{Context, HotplugBuilder, HotplugEvent, Registration}; + +#[tokio::main] +async fn main() -> Result<(), Error> { + if !rusb::has_hotplug() { + eprint!("libusb hotplug api unsupported"); + return Ok(()); + } + + let mut context = Context::new()?; + + let mut registration: Registration = HotplugBuilder::new() + .enumerate(true) + .register(&mut context)?; + + while let Some(event) = registration.next_event().await { + match event { + HotplugEvent::Arrived(device) => { + println!("device arrived {:?}", device); + } + HotplugEvent::Left(device) => { + println!("device left {:?}", device); + } + } + } + + Ok(()) +} diff --git a/rusb-async/src/hotplug.rs b/rusb-async/src/hotplug.rs new file mode 100644 index 0000000..a846a98 --- /dev/null +++ b/rusb-async/src/hotplug.rs @@ -0,0 +1,111 @@ +use rusb::{Device, Error, UsbContext}; + +use futures::{ + channel::mpsc::{channel, Receiver, Sender}, + SinkExt, StreamExt, +}; +use std::borrow::Borrow; + +/// Events retrieved by polling the [`Registration`] whenever new USB devices arrive or existing +/// USB devices leave. +#[derive(Debug)] +pub enum HotplugEvent { + /// A new device arrived. + Arrived(Device), + /// The specified device left. + Left(Device), +} + +/// Builds hotplug [`Registration`] with custom configuration values. +pub struct HotplugBuilder { + inner: rusb::HotplugBuilder, +} + +impl HotplugBuilder { + /// Returns a new builder with no filter. Devices can optionally be filtered by + /// [`HotplugBuilder::vendor_id`], [`HotplugBuilder::product_id`] and + /// [`HotplugBuilder::class`]. + /// + /// Registration is done by calling [`HotplugBuilder::register`]. + pub fn new() -> Self { + Self { + inner: rusb::HotplugBuilder::new(), + } + } + + /// Devices can optionally be filtered by their vendor ID. + pub fn vendor_id(&mut self, vendor_id: u16) -> &mut Self { + self.inner.vendor_id(vendor_id); + self + } + + /// Devices can optionally be filtered by their product ID. + pub fn product_id(&mut self, product_id: u16) -> &mut Self { + self.inner.product_id(product_id); + self + } + + /// Devices can optionally be filtered by their class. + pub fn class(&mut self, class: u8) -> &mut Self { + self.inner.class(class); + self + } + + /// If `enumerate` is `true`, then devices that are already connected will cause the + /// [`Registration`] to return [`HotplugEvent::Arrived`] events for them. + pub fn enumerate(&mut self, enumerate: bool) -> &mut Self { + self.inner.enumerate(enumerate); + self + } + + /// Registers the hotplug configuration and returns a [`Registration`] object that can be + /// polled for [`HotplugEvents`](HotplugEvent). + pub fn register>( + &mut self, + context: T, + ) -> Result, Error> { + let (tx, rx): (Sender>, Receiver>) = channel(1); + + let hotplug = Box::new(Hotplug { + tx, + }); + + let inner = self.inner.register(context, hotplug)?; + + Ok(Registration { + _inner: inner, + rx, + }) + } +} + +struct Hotplug { + tx: Sender>, +} + +impl rusb::Hotplug for Hotplug { + fn device_arrived(&mut self, device: Device) { + futures::executor::block_on(async { + self.tx.send(HotplugEvent::Arrived(device)).await.unwrap(); + }) + } + + fn device_left(&mut self, device: Device) { + futures::executor::block_on(async { + self.tx.send(HotplugEvent::Left(device)).await.unwrap(); + }); + } +} + +/// The hotplug registration which can be polled for [`HotplugEvents`](HotplugEvent). +pub struct Registration { + _inner: rusb::Registration, + rx: Receiver>, +} + +impl Registration { + /// Creates a future to await the next [`HotplugEvent`]. + pub async fn next_event(&mut self) -> Option> { + self.rx.next().await + } +} diff --git a/rusb-async/src/lib.rs b/rusb-async/src/lib.rs index a4754ac..40fbb6f 100644 --- a/rusb-async/src/lib.rs +++ b/rusb-async/src/lib.rs @@ -1,5 +1,7 @@ pub mod context; +pub mod hotplug; pub mod transfer; pub use crate::context::Context; +pub use crate::hotplug::{HotplugBuilder, HotplugEvent, Registration}; pub use crate::transfer::{CancellationToken, DeviceHandleExt, Transfer}; From ffa956c577fe469b1cfb6aa779b2717307ef5131 Mon Sep 17 00:00:00 2001 From: "S.J.R. van Schaik" Date: Fri, 11 Nov 2022 18:38:53 -0500 Subject: [PATCH 6/8] run cargo fmt --- rusb-async/src/hotplug.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/rusb-async/src/hotplug.rs b/rusb-async/src/hotplug.rs index a846a98..9ccce6a 100644 --- a/rusb-async/src/hotplug.rs +++ b/rusb-async/src/hotplug.rs @@ -66,16 +66,11 @@ impl HotplugBuilder { ) -> Result, Error> { let (tx, rx): (Sender>, Receiver>) = channel(1); - let hotplug = Box::new(Hotplug { - tx, - }); + let hotplug = Box::new(Hotplug { tx }); let inner = self.inner.register(context, hotplug)?; - Ok(Registration { - _inner: inner, - rx, - }) + Ok(Registration { _inner: inner, rx }) } } From a513d0e5a9d955c8452eae63f6785dc2a79831a8 Mon Sep 17 00:00:00 2001 From: "S.J.R. van Schaik" Date: Mon, 1 May 2023 09:57:02 -0400 Subject: [PATCH 7/8] rusb-async: hotplug: use unbounded channel --- rusb-async/src/hotplug.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rusb-async/src/hotplug.rs b/rusb-async/src/hotplug.rs index 9ccce6a..eaf270d 100644 --- a/rusb-async/src/hotplug.rs +++ b/rusb-async/src/hotplug.rs @@ -1,7 +1,7 @@ use rusb::{Device, Error, UsbContext}; use futures::{ - channel::mpsc::{channel, Receiver, Sender}, + channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}, SinkExt, StreamExt, }; use std::borrow::Borrow; @@ -64,7 +64,7 @@ impl HotplugBuilder { &mut self, context: T, ) -> Result, Error> { - let (tx, rx): (Sender>, Receiver>) = channel(1); + let (tx, rx): (UnboundedSender>, UnboundedReceiver>) = unbounded(); let hotplug = Box::new(Hotplug { tx }); @@ -75,7 +75,7 @@ impl HotplugBuilder { } struct Hotplug { - tx: Sender>, + tx: UnboundedSender>, } impl rusb::Hotplug for Hotplug { @@ -95,7 +95,7 @@ impl rusb::Hotplug for Hotplug { /// The hotplug registration which can be polled for [`HotplugEvents`](HotplugEvent). pub struct Registration { _inner: rusb::Registration, - rx: Receiver>, + rx: UnboundedReceiver>, } impl Registration { From 8393e61deb77a1bc2ee475658af6b844373a9aee Mon Sep 17 00:00:00 2001 From: "S.J.R. van Schaik" Date: Mon, 1 May 2023 09:59:41 -0400 Subject: [PATCH 8/8] run cargo fmt --- rusb-async/src/hotplug.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rusb-async/src/hotplug.rs b/rusb-async/src/hotplug.rs index eaf270d..d4ea723 100644 --- a/rusb-async/src/hotplug.rs +++ b/rusb-async/src/hotplug.rs @@ -64,7 +64,10 @@ impl HotplugBuilder { &mut self, context: T, ) -> Result, Error> { - let (tx, rx): (UnboundedSender>, UnboundedReceiver>) = unbounded(); + let (tx, rx): ( + UnboundedSender>, + UnboundedReceiver>, + ) = unbounded(); let hotplug = Box::new(Hotplug { tx });