Skip to content

Commit

Permalink
fix(linux): use tokio for BtlePlugConnection
Browse files Browse the repository at this point in the history
btleplug requires tokio, so there needs to be an internal tokio runtime
to make it runtime agnostic.
  • Loading branch information
Oppzippy committed Aug 11, 2023
1 parent e39acd9 commit 84cbdfd
Show file tree
Hide file tree
Showing 10 changed files with 251 additions and 166 deletions.
8 changes: 7 additions & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ crate-type = ["lib"]

[dependencies]
web-sys = { version = "0.3", optional = true, features = ["Window"] }
tokio = { version = "1", features = ["macros", "sync", "time", "rt"] }
tokio = { version = "1", features = [
"macros",
"sync",
"time",
"rt",
"rt-multi-thread",
] }
futures = { version = "0.3" }
uuid = "1.3"
thiserror = "1.0"
Expand Down
4 changes: 2 additions & 2 deletions lib/src/api/connection/connection_registry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashSet, fmt::Debug, rc::Rc};
use std::{collections::HashSet, fmt::Debug, sync::Arc};

use async_trait::async_trait;
use macaddr::MacAddr6;
Expand All @@ -15,5 +15,5 @@ pub trait ConnectionRegistry {
async fn connection(
&self,
mac_address: MacAddr6,
) -> crate::Result<Option<Rc<Self::ConnectionType>>>;
) -> crate::Result<Option<Arc<Self::ConnectionType>>>;
}
16 changes: 14 additions & 2 deletions lib/src/q30/connection/btleplug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ pub use btleplug_connection_registry::*;
pub use btleplug_error::*;

pub(crate) async fn new_connection_registry() -> crate::Result<BtlePlugConnectionRegistry> {
let manager = Manager::new().await?;
Ok(BtlePlugConnectionRegistry::new(manager))
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
runtime
.handle()
.to_owned()
.spawn(async move {
let manager = Manager::new().await?;
Ok(BtlePlugConnectionRegistry::new(manager, runtime))
})
.await
.unwrap()
}
216 changes: 128 additions & 88 deletions lib/src/q30/connection/btleplug/btleplug_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use btleplug::{
use futures::StreamExt;
use macaddr::MacAddr6;
use tokio::{
runtime::Handle,
sync::{
mpsc::{self, error::TrySendError},
watch,
Expand All @@ -27,6 +28,7 @@ use super::mac_address::IntoMacAddr;

#[derive(Debug)]
pub struct BtlePlugConnection {
handle: Handle,
peripheral: Peripheral,
write_characteristic: Characteristic,
read_characteristic: Characteristic,
Expand All @@ -35,94 +37,111 @@ pub struct BtlePlugConnection {
}

impl BtlePlugConnection {
pub async fn new(adapter: Adapter, peripheral: Peripheral) -> crate::Result<Self> {
peripheral.connect().await?;
peripheral.discover_services().await?;

let service = peripheral
.services()
.into_iter()
.find(|service| is_soundcore_service_uuid(&service.uuid))
.ok_or(crate::Error::ServiceNotFound {
uuid: SERVICE_UUID,
source: None,
})?;

let write_characteristic = service
.characteristics
.iter()
.find(|characteristic| characteristic.uuid == WRITE_CHARACTERISTIC_UUID)
.ok_or(crate::Error::CharacteristicNotFound {
uuid: WRITE_CHARACTERISTIC_UUID,
source: None,
})?;
let read_characteristic = service
.characteristics
.iter()
.find(|characteristic| characteristic.uuid == READ_CHARACTERISTIC_UUID)
.ok_or(crate::Error::CharacteristicNotFound {
uuid: READ_CHARACTERISTIC_UUID,
source: None,
})?;

peripheral.subscribe(read_characteristic).await?;

let (connection_status_sender, connection_status_receiver) =
watch::channel(ConnectionStatus::Connected);

let connection_status_handle = {
let mut events = adapter.events().await?;
let peripheral = peripheral.to_owned();
tokio::spawn(async move {
loop {
if let Some(event) = events.next().await {
match event {
CentralEvent::DeviceConnected(peripheral_id) => {
if peripheral_id == peripheral.id() {
connection_status_sender
.send_replace(ConnectionStatus::Connected);
pub async fn new(
adapter: Adapter,
peripheral: Peripheral,
handle: Handle,
) -> crate::Result<Self> {
let handle2 = handle.clone();
handle
.spawn(async move {
let handle = handle2;

peripheral.connect().await?;
peripheral.discover_services().await?;

let service = peripheral
.services()
.into_iter()
.find(|service| is_soundcore_service_uuid(&service.uuid))
.ok_or(crate::Error::ServiceNotFound {
uuid: SERVICE_UUID,
source: None,
})?;

let write_characteristic = service
.characteristics
.iter()
.find(|characteristic| characteristic.uuid == WRITE_CHARACTERISTIC_UUID)
.ok_or(crate::Error::CharacteristicNotFound {
uuid: WRITE_CHARACTERISTIC_UUID,
source: None,
})?;
let read_characteristic = service
.characteristics
.iter()
.find(|characteristic| characteristic.uuid == READ_CHARACTERISTIC_UUID)
.ok_or(crate::Error::CharacteristicNotFound {
uuid: READ_CHARACTERISTIC_UUID,
source: None,
})?;

peripheral.subscribe(read_characteristic).await?;

let (connection_status_sender, connection_status_receiver) =
watch::channel(ConnectionStatus::Connected);

let connection_status_handle = {
let mut events = adapter.events().await?;
let peripheral = peripheral.to_owned();
tokio::spawn(async move {
loop {
if let Some(event) = events.next().await {
match event {
CentralEvent::DeviceConnected(peripheral_id) => {
if peripheral_id == peripheral.id() {
connection_status_sender
.send_replace(ConnectionStatus::Connected);
}
}
CentralEvent::DeviceDisconnected(peripheral_id) => {
if peripheral_id == peripheral.id() {
connection_status_sender
.send_replace(ConnectionStatus::Disconnected);
}
}
_ => (),
}
}
CentralEvent::DeviceDisconnected(peripheral_id) => {
if peripheral_id == peripheral.id() {
connection_status_sender
.send_replace(ConnectionStatus::Disconnected);
}
}
_ => (),
}
}
}
})
};

let connection = BtlePlugConnection {
peripheral,
write_characteristic: write_characteristic.to_owned(),
read_characteristic: read_characteristic.to_owned(),
connection_status_receiver,
connection_status_handle,
handle: handle.to_owned(),
};
Ok(connection)
})
};

let connection = BtlePlugConnection {
peripheral,
write_characteristic: write_characteristic.to_owned(),
read_characteristic: read_characteristic.to_owned(),
connection_status_receiver,
connection_status_handle,
};

Ok(connection)
.await
.unwrap()
}
}

#[async_trait(?Send)]
impl Connection for BtlePlugConnection {
async fn name(&self) -> crate::Result<String> {
let maybe_name = self
.peripheral
.properties()
.await?
.map(|property| property.local_name);

match maybe_name {
Some(Some(name)) => Ok(name),
_ => Err(crate::Error::NameNotFound {
mac_address: self.peripheral.address().to_string(),
}),
}
let peripheral = self.peripheral.to_owned();
self.handle
.spawn(async move {
let maybe_name = peripheral
.properties()
.await?
.map(|property| property.local_name);

match maybe_name {
Some(Some(name)) => Ok(name),
_ => Err(crate::Error::NameNotFound {
mac_address: peripheral.address().to_string(),
}),
}
})
.await
.unwrap()
}

fn connection_status(&self) -> watch::Receiver<ConnectionStatus> {
Expand All @@ -135,29 +154,50 @@ impl Connection for BtlePlugConnection {

#[instrument(level = "trace", skip(self))]
async fn write_with_response(&self, data: &[u8]) -> crate::Result<()> {
self.peripheral
.write(&self.write_characteristic, data, WriteType::WithResponse)
.await?;
Ok(())
let data = data.to_owned();
let peripheral = self.peripheral.to_owned();
let write_characteristic = self.write_characteristic.to_owned();
self.handle
.spawn(async move {
peripheral
.write(&write_characteristic, &data, WriteType::WithResponse)
.await?;
Ok(())
})
.await
.unwrap()
}

#[instrument(level = "trace", skip(self))]
async fn write_without_response(&self, data: &[u8]) -> crate::Result<()> {
self.peripheral
.write(&self.write_characteristic, data, WriteType::WithoutResponse)
.await?;
Ok(())
let data = data.to_owned();
let peripheral = self.peripheral.to_owned();
let write_characteristic = self.write_characteristic.to_owned();
self.handle
.spawn(async move {
peripheral
.write(&write_characteristic, &data, WriteType::WithoutResponse)
.await?;
Ok(())
})
.await
.unwrap()
}

async fn inbound_packets_channel(&self) -> crate::Result<mpsc::Receiver<Vec<u8>>> {
// This queue should always be really small unless something is malfunctioning
let (sender, receiver) = mpsc::channel(100);

let mut notifications = self.peripheral.notifications().await?;
let peripheral = self.peripheral.to_owned();
let mut notifications = self
.handle
.spawn(async move { peripheral.notifications().await })
.await
.unwrap()?;

let read_characteristic_uuid = self.read_characteristic.uuid;

tokio::spawn(async move {
self.handle.spawn(async move {
let span = trace_span!("inbound_packets_channel async task");
let _enter = span.enter();
while let Some(data) = notifications.next().await {
Expand Down
Loading

0 comments on commit 84cbdfd

Please sign in to comment.