Skip to content

Commit

Permalink
feat: Introduce async-bridge for tauri + soundcore-lib integration
Browse files Browse the repository at this point in the history
  • Loading branch information
gmallios committed Feb 25, 2024
1 parent 91f9d63 commit 0080353
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 15 deletions.
7 changes: 7 additions & 0 deletions src-tauri/src/async_bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod bridge;
mod command;
mod response;

pub use bridge::*;
pub use command::*;
pub use response::*;
161 changes: 161 additions & 0 deletions src-tauri/src/async_bridge/bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use std::sync::Arc;

use tauri::AppHandle;
use tokio::sync::{mpsc, Mutex};

use soundcore_lib::device::SoundcoreBLEDevice;
use soundcore_lib::{
ble::BLEConnectionManager,
device_manager::{create_device_manager, DeviceManager},
};

use super::{BridgeCommand, BridgeResponse};

struct CommandLoopState<B: BLEConnectionManager> {
manager: DeviceManager<B>,
app_handle: AppHandle,
devices: Vec<Arc<SoundcoreBLEDevice<B::Connection>>>,
}

impl<B: BLEConnectionManager> CommandLoopState<B> {
fn new(manager: DeviceManager<B>, app_handle: AppHandle) -> Self {
Self {
manager,
app_handle,
devices: Vec::new(),
}
}
}

pub async fn async_bridge(
mut input_rx: mpsc::Receiver<BridgeCommand>,
output_tx: mpsc::Sender<BridgeResponse>,
app_handle: AppHandle,
) {
let command_loop = tokio::spawn(async move {
let manager = create_device_manager().await;
let command_loop_state = Arc::new(Mutex::new(CommandLoopState::new(manager, app_handle)));
loop {
while let Some(command) = input_rx.recv().await {
let command_loop_state = command_loop_state.clone();
let response = handle_command(command_loop_state, command, output_tx.clone()).await;
output_tx
.send(response)
.await
.expect("Failed to send response");
}
}
});
}

async fn handle_command<B: BLEConnectionManager>(
command_loop_state: Arc<Mutex<CommandLoopState<B>>>,
command: BridgeCommand,
output_tx: mpsc::Sender<BridgeResponse>,
) -> BridgeResponse {
match command {
BridgeCommand::ScanBle => command_loop_state
.lock()
.await
.manager
.ble_scan(None)
.await
.map(BridgeResponse::ScanResult),
BridgeCommand::DisconnectBle(addr) => {
let addr_clone = addr.clone();
command_loop_state
.lock()
.await
.manager
.disconnect(addr)
.await
.map(|_| BridgeResponse::Disconnected(addr_clone))
}
BridgeCommand::ConnectBle(d) => {
let addr = d.clone().descriptor.addr;
let device = command_loop_state.lock().await.manager.connect(d).await;
let addr_clone = addr.clone();
if let Ok(device) = device {
// Get the state channel and listen for changes in the background
let mut state_channel = device.state_channel().await;
tokio::task::spawn(async move {
while let Ok(()) = state_channel.changed().await {
let state = state_channel.borrow().clone();
// TODO: Add logging
output_tx
.send(BridgeResponse::NewState((addr_clone.clone(), state)))
.await;
}
});
Ok(BridgeResponse::ConnectionEstablished(addr))
} else {
Err(device.err().unwrap())
}
}
}
.map_err(|e| BridgeResponse::Error(e.to_string()))
.unwrap_or_else(|e| e)
}

// #[cfg(test)]
// mod test {
// use super::*;
//
// async fn create_bridge() -> (mpsc::Sender<BridgeCommand>, mpsc::Receiver<BridgeResponse>) {
// let (input_tx, input_rx) = mpsc::channel(1);
// let (output_tx, output_rx) = mpsc::channel(1);
// async_bridge(input_rx, output_tx).await;
// (input_tx, output_rx)
// }
//
// #[tokio::test]
// async fn should_handle_scan_command_and_produce_response() {
// let (input_tx, mut output_rx) = create_bridge().await;
// input_tx
// .send(BridgeCommand::ScanBle)
// .await
// .expect("Failed to send command");
//
// let response = output_rx.recv().await.expect("Failed to receive response");
//
// match response {
// BridgeResponse::ScanResult(res) => {
// assert!(!res.is_empty());
// }
// _ => panic!("Unexpected response: {:?}", response),
// }
// }
//
// #[tokio::test]
// async fn should_handle_connect_command_and_produce_response() {
// let (input_tx, mut output_rx) = create_bridge().await;
// input_tx
// .send(BridgeCommand::ScanBle)
// .await
// .expect("Failed to send command");
//
// let scan_response = output_rx.recv().await.expect("Failed to receive response");
//
// let devices = match scan_response {
// BridgeResponse::ScanResult(res) => res,
// _ => panic!("Unexpected response: {:?}", scan_response),
// };
//
// let device = devices.first().unwrap();
//
// input_tx
// .send(BridgeCommand::ConnectBle(device.clone()))
// .await
// .expect("Failed to send command");
//
// let response = output_rx.recv().await.expect("Failed to receive response");
//
// // TODO: Fix this test
// // match response {
// // BridgeResponse::ConnectionEstablished(addr) => {
// // assert_eq!(addr, device.descriptor.addr);
// // }
// // _ => panic!("Unexpected response: {:?}", response),
// // }
// }
// }
14 changes: 14 additions & 0 deletions src-tauri/src/async_bridge/command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use serde::Deserialize;
use typeshare::typeshare;
use soundcore_lib::btaddr::BluetoothAdrr;

use soundcore_lib::device_manager::DiscoveredDevice;

#[typeshare]
#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase", tag = "command", content = "payload")]
pub enum BridgeCommand {
ScanBle,
ConnectBle(DiscoveredDevice),
DisconnectBle(BluetoothAdrr),
}
17 changes: 17 additions & 0 deletions src-tauri/src/async_bridge/response.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use serde::Serialize;

use soundcore_lib::api::SoundcoreDeviceState;
use soundcore_lib::btaddr::BluetoothAdrr;
use soundcore_lib::device_manager::DiscoveredDevice;

#[derive(Debug, Serialize, Clone)]
pub enum BridgeResponse {
ScanResult(Vec<DiscoveredDevice>),
ConnectionEstablished(BluetoothAdrr),
NewState((BluetoothAdrr, SoundcoreDeviceState)),
Disconnected(BluetoothAdrr),
Error(String),
}

unsafe impl Send for BridgeResponse {}
unsafe impl Sync for BridgeResponse {}
65 changes: 50 additions & 15 deletions src-tauri/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,24 @@
windows_subsystem = "windows"
)]

use std::sync::Arc;

use log::info;
use mpsc::channel;
use tauri::async_runtime::{Mutex, RwLock};
use tauri::Manager;
use tauri_plugin_log::LogTarget;
use tokio::sync::mpsc;

use bluetooth_lib::platform::BthScanner;
use bluetooth_lib::Scanner;
use frontend_types::BthScanResult;
use soundcore_lib::base::SoundcoreDevice;
use soundcore_lib::types::{SupportedModels, SOUNDCORE_NAME_MODEL_MAP};
use soundcore_lib::types::{SOUNDCORE_NAME_MODEL_MAP, SupportedModels};

use std::sync::Arc;
use tauri::async_runtime::{Mutex, RwLock};
use tauri::Manager;
use tauri_plugin_log::LogTarget;
use crate::async_bridge::{async_bridge, BridgeCommand, BridgeResponse};

pub(crate) mod async_bridge;
mod device;
pub(crate) mod frontend_types;
mod tray;
Expand All @@ -22,15 +29,6 @@ pub(crate) mod utils;
#[cfg(target_os = "macos")]
mod server;

// #[tauri::command]
// fn close_all(state: State<DeviceState>) -> Result<(), ()> {
// let mut device_state = state.device.lock().map_err(|_| ())?;
// *device_state = None;
// let rfcomm = RFCOMM_STATE.lock().map_err(|_| ())?;
// rfcomm.close();
// Ok(())
// }

#[tauri::command]
async fn scan_for_devices() -> Vec<BthScanResult> {
let res = BthScanner::new().scan().await;
Expand All @@ -51,25 +49,47 @@ async fn scan_for_devices() -> Vec<BthScanResult> {
struct SoundcoreAppState {
device: Arc<Mutex<Option<Box<dyn SoundcoreDevice>>>>,
model: Arc<RwLock<Option<SupportedModels>>>,
bridge_tx: Mutex<mpsc::Sender<BridgeCommand>>,
}

fn main() {
#[tokio::main]
async fn main() {
tauri::async_runtime::set(tokio::runtime::Handle::current());
#[cfg(target_os = "macos")]
server::launch_server();

// Bring up bridge

// builder()
// .filter(None, log::LevelFilter::Debug)
// .filter_module("h2", log::LevelFilter::Off)
// .filter_module("hyper", log::LevelFilter::Off)
// .filter_module("tower", log::LevelFilter::Off)
// .init();
let (input_tx, input_rx) = channel(1);
let (output_tx, mut output_rx) = channel(1);

tauri::Builder::default()
.setup(|app| {
let bridge_app_handle = app.handle();
tokio::spawn(async_bridge(input_rx, output_tx, bridge_app_handle));

let app_handle = app.handle();
tokio::spawn(async move {
loop {
if let Some(resp) = output_rx.recv().await {
handle_bridge_output(resp, &app_handle);
}
}
});
Ok(())
})
.system_tray(tray::get_system_tray())
.on_system_tray_event(tray::handle_tray_event)
.manage(SoundcoreAppState {
device: Arc::new(Mutex::new(None)),
model: Arc::new(RwLock::new(None)),
bridge_tx: Mutex::new(input_tx),
})
.invoke_handler(tauri::generate_handler![
tray::set_tray_device_status,
Expand All @@ -87,6 +107,7 @@ fn main() {
device::set_eq,
device::get_eq,
scan_for_devices,
send_bridge_command
])
.plugin(tauri_plugin_log::Builder::default().targets([
LogTarget::LogDir,
Expand Down Expand Up @@ -120,3 +141,17 @@ fn main() {
_ => {}
});
}

fn handle_bridge_output<R: tauri::Runtime>(resp: BridgeResponse, manager: &impl Manager<R>) {
info!("Received response from bridge: {:?}", resp);
manager.emit_all("bridge-response", resp).unwrap();
}

#[tauri::command]
async fn send_bridge_command(
app_state: tauri::State<'_, SoundcoreAppState>,
command: BridgeCommand,
) -> Result<(), String> {
let tx = app_state.bridge_tx.lock().await;
tx.send(command).await.map_err(|e| e.to_string())
}

0 comments on commit 0080353

Please sign in to comment.