Skip to content

Commit

Permalink
Fix memory leak when trying to reconnect
Browse files Browse the repository at this point in the history
I was recreating btleplug's Manager and Adapter, bad idea, lots of Arc reference loops
  • Loading branch information
nullstalgia committed Oct 17, 2024
1 parent e27d833 commit 136e541
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 118 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter", "chrono"] }
tracing-appender = "0.2"
tracing-log = "0.2.0"
rolling-file = "0.2.0"

# console-subscriber = "0.4.0"

[target.'cfg(windows)'.dependencies]
# Using fork until lilopkins/lnk-rs's #25 and #26 resolve, not sure if #21 is ready yet
Expand Down Expand Up @@ -92,3 +92,7 @@ test-log = { version = "0.2.16", default-features = false, features = [
"trace",
"unstable",
] }

# For console-subscriber
# [build]
# rustflags = ["--cfg", "tokio_unstable"]
10 changes: 9 additions & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub enum AppRx {
UpdateReply(UpdateReply),
}

#[derive(Debug)]
pub enum DeviceUpdate {
ConnectedEvent(String),
DisconnectedEvent(String),
Expand Down Expand Up @@ -351,14 +352,17 @@ impl App {
tokio::select! {
// Check for updates from BLE Thread
Some(new_device_info) = self.ble_rx.recv() => {
// debug!("ble: {new_device_info:?}");
AppRx::DeviceUpdate(new_device_info)
}
// HR Notification Updates
Ok(hr_data) = self.broadcast_rx.recv() => {
// debug!("broadcast: {hr_data:?}");
AppRx::AppUpdate(hr_data)
}
// Replies from the executable self-updating task
Some(data) = self.updates.reply_rx.recv() => {
// debug!("update: {data:?}");
AppRx::UpdateReply(data)
}
}
Expand All @@ -370,7 +374,7 @@ impl App {
AppRx::AppUpdate(hr_data) => {
match hr_data {
AppUpdate::HeartRateStatus(data) => {
if data.heart_rate_bpm > 0 {
if data.heart_rate_bpm > 0 || !data.rr_intervals.is_empty() {
// Assume we have proper data now
self.view = AppView::HeartRateView;
if self.sub_state == SubState::ConnectingForHeartRate {
Expand Down Expand Up @@ -546,6 +550,10 @@ impl App {
}

pub fn connect_for_hr(&mut self, quick_connect_device: Option<&DeviceInfo>) {
if self.hr_thread_handle.is_some() {
debug!("Not spawning extra notification thread");
return;
}
let selected_device = if let Some(device) = quick_connect_device {
device
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub async fn run_tui(mut arg_config: TopLevelCmd) -> AppResult<()> {
.file_name()
.expect("Couldn't build log path!")
.to_owned();
// let console = console_subscriber::spawn();
let file_appender = BasicRollingFileAppender::new(
log_name,
RollingConditionBasic::new().max_size(1024 * 1024 * 5),
Expand All @@ -95,6 +96,7 @@ pub async fn run_tui(mut arg_config: TopLevelCmd) -> AppResult<()> {
// Allow everything through but limit lnk to just info, since it spits out a bit too much when reading shortcuts
let env_filter = tracing_subscriber::EnvFilter::new("trace,lnk=info");
tracing_subscriber::registry()
// .with(console)
.with(env_filter)
.with(fmt_layer)
.init();
Expand Down
241 changes: 126 additions & 115 deletions src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::structs::{Characteristic, DeviceInfo};
use btleplug::api::{
Central, CentralEvent, Manager as _, Peripheral, PeripheralProperties, ScanFilter,
};
use btleplug::platform::Manager;
use btleplug::platform::{Adapter, Manager};
use futures::StreamExt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand All @@ -22,39 +22,44 @@ pub async fn bluetooth_event_thread(
pause_signal: Arc<AtomicBool>,
cancel_token: CancellationToken,
) {
info!("Bluetooth CentralEvent thread started!");
// If no event is heard in this period,
// the manager and adapter will be recreated
// (if the scan isn't paused)
let duration = Duration::from_secs(30);

let manager = match Manager::new().await {
Ok(manager) => manager,
Err(e) => {
error!("Failed to create manager: {}", e);
tx.send(DeviceUpdate::Error(ErrorPopup::detailed(
"Failed to create manager: ",
e.into(),
)))
.await
.expect("Failed to send error message");
return;
}
};

let central: Adapter;

'adapter: loop {
info!("Bluetooth CentralEvent thread started!");
if cancel_token.is_cancelled() {
info!("Shutting down Bluetooth CentralEvent thread!");
break 'adapter;
return;
}
let manager = match Manager::new().await {
Ok(manager) => manager,
Err(e) => {
error!("Failed to create manager: {}", e);
tx.send(DeviceUpdate::Error(ErrorPopup::UserMustDismiss(format!(
"Failed to create manager: {}",
e
))))
.await
.expect("Failed to send error message");
tokio::time::sleep(Duration::from_secs(5)).await;
continue 'adapter;
}
};
let central = match manager.adapters().await.and_then(|adapters| {
match manager.adapters().await.and_then(|adapters| {
debug!("Found adapters: {adapters:#?}");
adapters
.into_iter()
.next()
.ok_or(btleplug::Error::DeviceNotFound)
}) {
Ok(central) => central,
Ok(adapter) => {
central = adapter;
break 'adapter;
}
Err(_) => {
error!("No Bluetooth adapters found!");
tx.send(DeviceUpdate::Error(ErrorPopup::UserMustDismiss(
Expand All @@ -64,128 +69,134 @@ pub async fn bluetooth_event_thread(
.await
.expect("Failed to send error message");
tokio::time::sleep(Duration::from_secs(10)).await;
continue 'adapter;
}
};
}

if let Err(e) = central.start_scan(ScanFilter::default()).await {
error!("Scanning failure: {}", e);
tx.send(DeviceUpdate::Error(ErrorPopup::UserMustDismiss(format!(
"Scanning failure: {}",
if let Err(e) = central.start_scan(ScanFilter::default()).await {
error!("Scanning failure: {}", e);
tx.send(DeviceUpdate::Error(ErrorPopup::Fatal(format!(
"Scanning failure: {}",
e
))))
.await
.expect("Failed to send error message");
// tokio::time::sleep(Duration::from_secs(10)).await;
return;
}

let mut events = match central.events().await {
Ok(e) => e,
Err(e) => {
error!("BLE failure: {}", e);
tx.send(DeviceUpdate::Error(ErrorPopup::Fatal(format!(
"BLE failure: {}",
e
))))
.await
.expect("Failed to send error message");
tokio::time::sleep(Duration::from_secs(10)).await;
continue 'adapter;
// tokio::time::sleep(Duration::from_secs(5)).await;
return;
}
let mut events = match central.events().await {
Ok(e) => e,
Err(e) => {
error!("BLE failure: {}", e);
};
info!("Inital scanning started!");
let mut scanning = true;

'events: loop {
if pause_signal.load(Ordering::SeqCst) {
if scanning {
info!("Pausing scan");
central.stop_scan().await.expect("Failed to stop scan!");
scanning = false;
}
} else if !scanning {
info!("Resuming scan");
if let Err(e) = central.start_scan(ScanFilter::default()).await {
error!("Failed to resume scanning: {}", e);
tx.send(DeviceUpdate::Error(ErrorPopup::UserMustDismiss(format!(
"BLE failure: {}",
"Failed to resume scanning: {}",
e
))))
.await
.expect("Failed to send error message");
tokio::time::sleep(Duration::from_secs(5)).await;
continue 'adapter;
break 'events;
}
};
info!("Inital scanning started!");
let mut scanning = true;
scanning = true;
}

'events: loop {
if pause_signal.load(Ordering::SeqCst) {
if scanning {
info!("Pausing scan");
central.stop_scan().await.expect("Failed to stop scan!");
scanning = false;
}
} else if !scanning {
info!("Resuming scan");
if let Err(e) = central.start_scan(ScanFilter::default()).await {
error!("Failed to resume scanning: {}", e);
tx.send(DeviceUpdate::Error(ErrorPopup::UserMustDismiss(format!(
"Failed to resume scanning: {}",
e
))))
.await
.expect("Failed to send error message");
tokio::time::sleep(Duration::from_secs(10)).await;
break 'events;
}
scanning = true;
}
tokio::select! {
Some(event) = events.next() => {
match event {
CentralEvent::DeviceDiscovered(id) | CentralEvent::DeviceUpdated(id) => {
if let Ok(device) = central.peripheral(&id).await {
let properties = device
.properties()
.await
.unwrap()
.unwrap_or(PeripheralProperties::default());
tokio::select! {
Some(event) = events.next() => {
match event {
CentralEvent::DeviceDiscovered(id) | CentralEvent::DeviceUpdated(id) => {
if let Ok(device) = central.peripheral(&id).await {
let properties = device
.properties()
.await
.unwrap()
.unwrap_or(PeripheralProperties::default());

if properties.services.is_empty() {
continue 'events;
}
if properties.services.is_empty() {
continue 'events;
}

// Add the device's information to the discovered list
let device = DeviceInfo::new(
device.id().to_string(),
properties.local_name,
properties.tx_power_level,
properties.address.to_string(),
properties.rssi,
properties.manufacturer_data,
properties.services,
properties.service_data,
device.clone(),
);
// Add the device's information to the discovered list
let device = DeviceInfo::new(
device.id().to_string(),
properties.local_name,
properties.tx_power_level,
properties.address.to_string(),
properties.rssi,
properties.manufacturer_data,
properties.services,
properties.service_data,
device.clone(),
);

// Send a clone of the accumulated device information so far
if tx.send(DeviceUpdate::DeviceInfo(device)).await.is_err() {
error!("Couldn't send device info update!");
break 'adapter;
}
// Send a clone of the accumulated device information so far
if tx.send(DeviceUpdate::DeviceInfo(device)).await.is_err() {
error!("Couldn't send device info update!");
break 'events;
}
}
CentralEvent::DeviceDisconnected(id) => {
warn!("Device disconnected: {}", id);
if tx.send(DeviceUpdate::DisconnectedEvent(id.to_string())).await.is_err() {
error!("Couldn't send DisconnectedEvent!");
break 'adapter;
}
}
CentralEvent::DeviceDisconnected(id) => {
warn!("Device disconnected: {}", id);
if tx.send(DeviceUpdate::DisconnectedEvent(id.to_string())).await.is_err() {
error!("Couldn't send DisconnectedEvent!");
break 'events;
}
CentralEvent::DeviceConnected(id) => {
info!("Device connected: {}", id);
if tx.send(DeviceUpdate::ConnectedEvent(id.to_string())).await.is_err() {
error!("Couldn't send ConnectedEvent!");
break 'adapter;
}
}
CentralEvent::DeviceConnected(id) => {
info!("Device connected: {}", id);
if tx.send(DeviceUpdate::ConnectedEvent(id.to_string())).await.is_err() {
error!("Couldn't send ConnectedEvent!");
break 'events;
}
_ => {}
}
_ => {}
}
_ = cancel_token.cancelled() => {
info!("Shutting down Bluetooth CentralEvent thread!");
break 'adapter;
}
_ = tokio::time::sleep(duration) => {
debug!("CentralEvent timeout");
if !pause_signal.load(Ordering::SeqCst) {
warn!("Restarting manager and adapter!");
break 'events;
}
_ = cancel_token.cancelled() => {
info!("Shutting down Bluetooth CentralEvent thread!");
break 'events;
}
_ = tokio::time::sleep(duration) => {
debug!("CentralEvent timeout");
if !pause_signal.load(Ordering::SeqCst) {
warn!("Restarting scan!");
if scanning {
let _ = central.stop_scan().await;
scanning = false;
}
}
Some(()) = restart_signal.recv() => {
warn!("Got signal to restart BLE manager and adapter!");
debug!("Central State was: {central:#?}");
pause_signal.store(false, Ordering::SeqCst);
break 'events;
}
Some(()) = restart_signal.recv() => {
warn!("Got signal to restart scan from HR Notif thread!");
// debug!("Central State was: {central:#?}");
pause_signal.store(false, Ordering::SeqCst);
if scanning {
let _ = central.stop_scan().await;
scanning = false;
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use btleplug::api::CharPropFlags;
use uuid::Uuid;

/// A struct to hold the information of a Bluetooth device.
#[derive(Clone, Default)]
#[derive(Debug, Clone, Default)]
#[allow(dead_code)]
pub struct DeviceInfo {
// TODO id vs address
Expand Down Expand Up @@ -59,6 +59,7 @@ impl DeviceInfo {
}

/// A struct to hold the information of a GATT Characteristic.
#[derive(Debug)]
pub struct Characteristic {
pub uuid: Uuid,
pub properties: CharPropFlags,
Expand Down

0 comments on commit 136e541

Please sign in to comment.