From 2060b976ac590586d1289e73b0df69ff2e00ac4d Mon Sep 17 00:00:00 2001 From: S John CD Date: Mon, 18 Dec 2023 07:40:13 +0000 Subject: [PATCH] mix blocking and async example (wip) --- Cargo.lock | 13 +- xmpl/mix_blocking_and_async/Cargo.toml | 12 ++ .../src/async_manager.rs | 55 ++++++ xmpl/mix_blocking_and_async/src/main.rs | 74 ++++++++ .../src/synchronous_backend_manager.rs | 171 ++++++++++++++++++ 5 files changed, 322 insertions(+), 3 deletions(-) create mode 100644 xmpl/mix_blocking_and_async/Cargo.toml create mode 100644 xmpl/mix_blocking_and_async/src/async_manager.rs create mode 100644 xmpl/mix_blocking_and_async/src/main.rs create mode 100644 xmpl/mix_blocking_and_async/src/synchronous_backend_manager.rs diff --git a/Cargo.lock b/Cargo.lock index 876dd33c..e4ae5a01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -212,6 +212,16 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking_async_mix_example" +version = "0.1.0" +dependencies = [ + "rayon", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "borsh" version = "1.2.1" @@ -2597,10 +2607,7 @@ dependencies = [ name = "tokio_example" version = "0.1.0" dependencies = [ - "rayon", "tokio", - "tracing", - "tracing-subscriber", ] [[package]] diff --git a/xmpl/mix_blocking_and_async/Cargo.toml b/xmpl/mix_blocking_and_async/Cargo.toml new file mode 100644 index 00000000..7bdf50a8 --- /dev/null +++ b/xmpl/mix_blocking_and_async/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "blocking_async_mix_example" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +rayon = "1.8.0" +tokio = { version = "1.35.0", features = ["full"] } +tracing = "0.1.40" +tracing-subscriber = "0.3.18" diff --git a/xmpl/mix_blocking_and_async/src/async_manager.rs b/xmpl/mix_blocking_and_async/src/async_manager.rs new file mode 100644 index 00000000..04933bdc --- /dev/null +++ b/xmpl/mix_blocking_and_async/src/async_manager.rs @@ -0,0 +1,55 @@ +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use tokio::sync::mpsc; +use tokio::sync::oneshot; + +pub struct AsyncManager { + rt: tokio::runtime::Runtime, +} + +impl AsyncManager { + // `enable_all()` enables the IO and timer drivers on the Tokio runtime. If they are not enabled, the runtime is unable to perform IO or timers + fn build_current_thread_runtime() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + } + + // equivalent to #[tokio::main] async fn main() + fn build_multi_thread_runtime(n: usize) -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(n) + .thread_name_fn(|| { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); + format!("tokio-runtime-worker-{}", id) + }) + .enable_all() + .build() + .unwrap() + } + + pub fn new() -> Self { + let rt = Self::build_multi_thread_runtime(2); + Self { rt } + } +} + +// https://ryhl.io/blog/actors-with-tokio/ + +// Set up a channel for communicating. +//let (tx, mut rx) = mpsc::channel(16); + +// std::thread::spawn(move || { +// rt.block_on(async move { +// while let Some(task) = rx.recv().await { +// //tokio::spawn(handle_task(task)); +// } + +// // Once all senders have gone out of scope, +// // the `.recv()` call returns None and it will +// // exit from the while loop and shut down the +// // thread. +// }); +// }); diff --git a/xmpl/mix_blocking_and_async/src/main.rs b/xmpl/mix_blocking_and_async/src/main.rs new file mode 100644 index 00000000..4bc6c854 --- /dev/null +++ b/xmpl/mix_blocking_and_async/src/main.rs @@ -0,0 +1,74 @@ +#![allow(unused_variables)] +#![allow(unused_mut)] +#![allow(unused_imports)] +#![allow(unused_must_use)] +// or simply #[allow(unused)] +#![allow(dead_code)] +mod async_manager; +#[allow(missing_docs)] +mod synchronous_backend_manager; + +use async_manager::AsyncManager; +use std::error::Error; +use synchronous_backend_manager::{OperationHandle, Responder, SynchronousBackendManager}; +use tracing::{debug, error, event, info, trace, warn, Level}; + +async fn some_async_computation(i: u64) -> Result<(), Box> { + let millis = 1000 - 50 * i; + println!("Task {} sleeping for {} ms.", i, millis); + tokio::time::sleep(tokio::time::Duration::from_millis(millis)).await; + println!("Task {} stopping.", i); + Ok(()) +} + +// async entrypoint +async fn run() { + tokio::spawn(async move { + for i in 1..=10 { + let res = some_async_computation(i).await; + //tx.send(res).await.unwrap(); + } + }); +} + +// synchronous computations below --------------------- + +fn some_sync_computation(cmd: Command, resp: &dyn Responder) { + match cmd { + Command::Run(i) => { + let millis = 1000 - 50 * i; + resp.respond_with(format!("Task {} sleeping for {} ms.", i, millis)); + std::thread::sleep(std::time::Duration::from_millis(millis)); + resp.respond_with(format!("Task {} stopping.", i)); + } + _ => resp.respond_with("Not implemented".to_string()), + } +} + +#[derive(Clone)] +enum Command { + Run(u64), + Other, +} + +// structure the application as largely synchronous, with smaller or logically distinct asynchronous portions. +//For instance, a GUI application might want to run the GUI code on the main thread and run a Tokio runtime next to it on another thread. +fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let mut m = SynchronousBackendManager::create()?; + + let (id, mut handle) = m.create_operation(some_sync_computation); + + // testing on another thread + let sync_code = std::thread::spawn(move || { + handle.send_command(Command::Run(0)); + // while let Some(out) = handle.get_response_blocking() { + // info!("{out}"); + // } + // println!("done!") + }); + sync_code.join().unwrap(); + + Ok(()) +} diff --git a/xmpl/mix_blocking_and_async/src/synchronous_backend_manager.rs b/xmpl/mix_blocking_and_async/src/synchronous_backend_manager.rs new file mode 100644 index 00000000..64376a6d --- /dev/null +++ b/xmpl/mix_blocking_and_async/src/synchronous_backend_manager.rs @@ -0,0 +1,171 @@ +use rayon::ThreadPoolBuildError; +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; +use tracing::{debug, error, event, info, trace, warn, Level}; + +pub struct SynchronousBackendManager { + pool: rayon::ThreadPool, + operations: HashMap>, + next_id: u32, +} + +impl SynchronousBackendManager { + const N: usize = 0; // auto + + pub fn create() -> Result { + // Build the threadpool + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(Self::N) + .build()?; + Ok(Self { + pool, + operations: HashMap::new(), + next_id: 0, + }) + } + + pub fn create_operation(&mut self, opr: OP) -> (u32, OperationHandle) + where + OP: Fn(I, &dyn Responder) + Send + 'static, // &dyn Responder + I: Clone + Send + 'static, + O: Clone + Send + 'static, + { + let operation_arc = Arc::new(Operation::new()); + // do stuff with operation before it gets consumed + // Receiver does not implement clone; new Receiver handles are created by calling Sender::subscribe. + let mut new_in_rx = operation_arc.in_tx.subscribe(); + // prepare a clone to be consumed by the pool thread + let handle = OperationHandle::new(&operation_arc); + let operation_arc_clone = operation_arc.clone(); + self.operations.insert(self.next_id, operation_arc); + self.next_id += 1; + + // `install` executes the closure within the threadpool. + // Any attempts to use join, scope, or parallel iterators will then operate within that threadpool. + self.pool.spawn(move || { + loop { + match new_in_rx.blocking_recv() { + Ok(cmd) => opr(cmd, operation_arc_clone.as_ref()), + // all Sender halves have dropped, indicating that no further values can be sent on the channel. + Err(RecvError::Closed) => break, + Err(RecvError::Lagged(n)) => { + warn!("The receiver lagged too far behind. Next receive will return the oldest message still retained by the channel. {} skipped messages.", n); + continue; + }, + }; + } + }); + (self.next_id - 1, handle) + } + + // pub fn get_new_handle_for(&self, id: u32) -> OperationHandle { + // self.operations.get(&id).map( |v| OperationHandle::new(v)) + // } +} + +use tokio::sync::broadcast::error::RecvError; +use tokio::sync::broadcast::{self, Receiver, Sender}; + +// wraps in / out queues for a given operation +// I = inputs to the synchronous operation +// O = outputs of the synchronous operation +struct Operation { + in_tx: Sender, + out_tx: Sender, +} + +impl Operation +where + I: Clone, + O: Clone, +{ + fn new() -> Self { + let (in_tx, _) = broadcast::channel(16); + let (out_tx, _) = broadcast::channel(16); + Self { in_tx, out_tx } + } +} + +// trait passed to the synchronous operation for it to respond to the handler +pub trait Responder: Sync + Send { + fn respond_with(&self, out: O); +} + +impl Responder for Operation { + fn respond_with(&self, resp: O) { + // A send operation can only fail if there are no active receivers, implying that the message could never be received. + self.out_tx.send(resp).unwrap_or_else(|error| { + //info!("There are no active receivers: {:?}", error); + 0 + }); + } +} + +pub struct OperationHandle { + operation: Arc>, // smart pointer to the Operation + out_rx: broadcast::Receiver, +} + +impl OperationHandle +where + I: Clone, + O: Clone, +{ + fn new(operation: &Arc>) -> OperationHandle { + Self { + operation: operation.clone(), + out_rx: operation.out_tx.subscribe(), + } + } + + pub fn send_command(&self, cmd: I) { + self.operation.in_tx.send(cmd).unwrap_or_else(|err| { + //info!("There are no active receivers: {:?}", err); + 0 + }); + } + + pub fn get_response_blocking(&mut self) -> Option { + match self.out_rx.blocking_recv() { + Ok(out_msg) => Some(out_msg), + // all Sender have dropped, indicating that no further values can be sent on the channel. + Err(RecvError::Closed) => None, + Err(RecvError::Lagged(n)) => { + warn!("The receiver lagged too far behind. Next receive will return the oldest message still retained by the channel. {} skipped messages.", n); + match self.out_rx.blocking_recv() { + Ok(out_msg) => Some(out_msg), + Err(RecvError::Closed) => None, + _ => panic!("Retrieved `RecvError::Lagged` twice"), + } + } + } + } + + pub async fn get_response(&mut self) -> Option { + match self.out_rx.recv().await { + Ok(out_msg) => Some(out_msg), + // all Sender have dropped, indicating that no further values can be sent on the channel. + Err(RecvError::Closed) => None, + Err(RecvError::Lagged(n)) => { + warn!("The receiver lagged too far behind. Next receive will return the oldest message still retained by the channel. {} skipped messages.", n); + match self.out_rx.recv().await { + Ok(out_msg) => Some(out_msg), + Err(RecvError::Closed) => None, + _ => panic!("Retrieved `RecvError::Lagged` twice"), + } + } + } + } +} + +// Cloning creates a new Handle that can be used on another thread and, within it, a new receiver +impl Clone for OperationHandle { + fn clone(&self) -> Self { + let receiver = self.operation.out_tx.subscribe(); + Self { + operation: self.operation.clone(), + out_rx: receiver, + } + } +}