Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement asynchronous hotplug #156

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ travis-ci = { repository = "a1ien/rusb" }
vendored = [ "libusb1-sys/vendored" ]

[workspace]
members = ["libusb1-sys"]
members = ["libusb1-sys", "rusb-async"]

[dependencies]
libusb1-sys = { path = "libusb1-sys", version = "0.6.0" }
Expand Down
27 changes: 27 additions & 0 deletions rusb-async/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "rusb-async"
version = "0.0.1-alpha-2"
edition = "2021"
authors = [
"Ilya Averyanov <a1ien.n3t@gmail.com>",
"Ryan Butler <thebutlah@gmail.com>",
"Kevin Mehall <km@kevinmehall.net>",
]

description = "Rust library for accessing USB devices."
license = "MIT"
homepage = "https://github.com/a1ien/rusb"
repository = "https://github.com/a1ien/rusb.git"
keywords = ["usb", "libusb", "async"]

[features]
vendored = [ "rusb/vendored" ]

[dependencies]
async-trait = "0.1"
futures = "0.3"
rusb = { path = "..", version = "0.9.1" }
libc = "0.2"

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
29 changes: 29 additions & 0 deletions rusb-async/examples/async_hotplug.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use rusb::Error;
use rusb_async::{Context, HotplugBuilder, HotplugEvent, Registration};

#[tokio::main]
async fn main() -> Result<(), Error> {
if !rusb::has_hotplug() {
eprint!("libusb hotplug api unsupported");
return Ok(());
}

let mut context = Context::new()?;

let mut registration: Registration<Context> = HotplugBuilder::new()
.enumerate(true)
.register(&mut context)?;

while let Some(event) = registration.next_event().await {
match event {
HotplugEvent::Arrived(device) => {
println!("device arrived {:?}", device);
}
HotplugEvent::Left(device) => {
println!("device left {:?}", device);
}
}
}

Ok(())
}
50 changes: 50 additions & 0 deletions rusb-async/examples/read_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use rusb::UsbContext as _;
use rusb_async::{Context, DeviceHandleExt as _};

use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

const BUF_SIZE: usize = 64;

fn convert_argument(input: &str) -> u16 {
if input.starts_with("0x") {
return u16::from_str_radix(input.trim_start_matches("0x"), 16).unwrap();
}
u16::from_str_radix(input, 10)
.expect("Invalid input, be sure to add `0x` for hexadecimal values.")
}

#[tokio::main]
async fn main() {
let args: Vec<String> = std::env::args().collect();

if args.len() < 4 {
eprintln!("Usage: read_async <base-10/0xbase-16> <base-10/0xbase-16> <endpoint>");
return;
}

let vid = convert_argument(args[1].as_ref());
let pid = convert_argument(args[2].as_ref());
let endpoint: u8 = FromStr::from_str(args[3].as_ref()).unwrap();

let ctx = Context::new().expect("Could not initialize libusb");
let device = Arc::new(
ctx.open_device_with_vid_pid(vid, pid)
.expect("Could not find device"),
);

let timeout = Duration::from_secs(10);
let mut buffer = vec![0u8; BUF_SIZE].into_boxed_slice();

loop {
let (bytes, n) = device
.read_bulk_async(endpoint, buffer, timeout)
.await
.expect("Failed to submit transfer");

println!("Got data: {} {:?}", n, &bytes[..n]);

buffer = bytes;
}
}
68 changes: 68 additions & 0 deletions rusb-async/src/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Duration;

use rusb::ffi::*;
use rusb::{Error, UsbContext};

struct EventThread {
thread: Option<JoinHandle<Result<(), Error>>>,
should_quit: Arc<AtomicBool>,
}

impl EventThread {
fn new(context: &mut rusb::Context) -> Self {
let thread_context = context.clone();
let tx = Arc::new(AtomicBool::new(false));
let rx = tx.clone();

let thread = std::thread::spawn(move || -> Result<(), Error> {
while !rx.load(Ordering::SeqCst) {
thread_context.handle_events(Some(Duration::from_millis(0)))?;
}

Ok(())
});

Self {
thread: Some(thread),
should_quit: tx,
}
}
}

impl Drop for EventThread {
fn drop(&mut self) {
self.should_quit.store(true, Ordering::SeqCst);

let _ = self.thread.take().map(|thread| thread.join());
}
}

/// A `libusb` context with a dedicated thread to handle events in the background.
#[derive(Clone)]
pub struct Context {
inner: rusb::Context,
_thread: Arc<EventThread>,
}

impl Context {
/// Opens a new `libusb` context and spawns a thread to handle events in the background for
/// that context.
pub fn new() -> Result<Self, Error> {
let mut inner = rusb::Context::new()?;
let thread = EventThread::new(&mut inner);

Ok(Self {
inner,
_thread: Arc::new(thread),
})
}
}

impl UsbContext for Context {
fn as_raw(&self) -> *mut libusb_context {
self.inner.as_raw()
}
}
109 changes: 109 additions & 0 deletions rusb-async/src/hotplug.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use rusb::{Device, Error, UsbContext};

use futures::{
channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
SinkExt, StreamExt,
};
use std::borrow::Borrow;

/// Events retrieved by polling the [`Registration`] whenever new USB devices arrive or existing
/// USB devices leave.
#[derive(Debug)]
pub enum HotplugEvent<T: UsbContext> {
/// A new device arrived.
Arrived(Device<T>),
/// The specified device left.
Left(Device<T>),
}

/// Builds hotplug [`Registration`] with custom configuration values.
pub struct HotplugBuilder {
inner: rusb::HotplugBuilder,
}

impl HotplugBuilder {
/// Returns a new builder with no filter. Devices can optionally be filtered by
/// [`HotplugBuilder::vendor_id`], [`HotplugBuilder::product_id`] and
/// [`HotplugBuilder::class`].
///
/// Registration is done by calling [`HotplugBuilder::register`].
pub fn new() -> Self {
Self {
inner: rusb::HotplugBuilder::new(),
}
}

/// Devices can optionally be filtered by their vendor ID.
pub fn vendor_id(&mut self, vendor_id: u16) -> &mut Self {
self.inner.vendor_id(vendor_id);
self
}

/// Devices can optionally be filtered by their product ID.
pub fn product_id(&mut self, product_id: u16) -> &mut Self {
self.inner.product_id(product_id);
self
}

/// Devices can optionally be filtered by their class.
pub fn class(&mut self, class: u8) -> &mut Self {
self.inner.class(class);
self
}

/// If `enumerate` is `true`, then devices that are already connected will cause the
/// [`Registration`] to return [`HotplugEvent::Arrived`] events for them.
pub fn enumerate(&mut self, enumerate: bool) -> &mut Self {
self.inner.enumerate(enumerate);
self
}

/// Registers the hotplug configuration and returns a [`Registration`] object that can be
/// polled for [`HotplugEvents`](HotplugEvent).
pub fn register<U: rusb::UsbContext + 'static, T: Borrow<U>>(
&mut self,
context: T,
) -> Result<Registration<U>, Error> {
let (tx, rx): (
UnboundedSender<HotplugEvent<U>>,
UnboundedReceiver<HotplugEvent<U>>,
) = unbounded();

let hotplug = Box::new(Hotplug { tx });

let inner = self.inner.register(context, hotplug)?;

Ok(Registration { _inner: inner, rx })
}
}

struct Hotplug<T: UsbContext> {
tx: UnboundedSender<HotplugEvent<T>>,
}

impl<T: UsbContext> rusb::Hotplug<T> for Hotplug<T> {
fn device_arrived(&mut self, device: Device<T>) {
futures::executor::block_on(async {
self.tx.send(HotplugEvent::Arrived(device)).await.unwrap();
})
}

fn device_left(&mut self, device: Device<T>) {
futures::executor::block_on(async {
self.tx.send(HotplugEvent::Left(device)).await.unwrap();
});
}
}

/// The hotplug registration which can be polled for [`HotplugEvents`](HotplugEvent).
pub struct Registration<T: UsbContext> {
_inner: rusb::Registration<T>,
rx: UnboundedReceiver<HotplugEvent<T>>,
}

impl<T: UsbContext> Registration<T> {
/// Creates a future to await the next [`HotplugEvent`].
pub async fn next_event(&mut self) -> Option<HotplugEvent<T>> {
self.rx.next().await
}
}
7 changes: 7 additions & 0 deletions rusb-async/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pub mod context;
pub mod hotplug;
pub mod transfer;

pub use crate::context::Context;
pub use crate::hotplug::{HotplugBuilder, HotplugEvent, Registration};
pub use crate::transfer::{CancellationToken, DeviceHandleExt, Transfer};
Loading