Skip to content

Commit

Permalink
mix blocking and async example (wip)
Browse files Browse the repository at this point in the history
  • Loading branch information
john-cd committed Dec 18, 2023
1 parent c322832 commit 2060b97
Show file tree
Hide file tree
Showing 5 changed files with 322 additions and 3 deletions.
13 changes: 10 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions xmpl/mix_blocking_and_async/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "blocking_async_mix_example"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rayon = "1.8.0"
tokio = { version = "1.35.0", features = ["full"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
55 changes: 55 additions & 0 deletions xmpl/mix_blocking_and_async/src/async_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use tokio::sync::mpsc;
use tokio::sync::oneshot;

pub struct AsyncManager {
rt: tokio::runtime::Runtime,
}

impl AsyncManager {
// `enable_all()` enables the IO and timer drivers on the Tokio runtime. If they are not enabled, the runtime is unable to perform IO or timers
fn build_current_thread_runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
}

// equivalent to #[tokio::main] async fn main()
fn build_multi_thread_runtime(n: usize) -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(n)
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("tokio-runtime-worker-{}", id)
})
.enable_all()
.build()
.unwrap()
}

pub fn new() -> Self {
let rt = Self::build_multi_thread_runtime(2);
Self { rt }
}
}

// https://ryhl.io/blog/actors-with-tokio/

// Set up a channel for communicating.
//let (tx, mut rx) = mpsc::channel(16);

// std::thread::spawn(move || {
// rt.block_on(async move {
// while let Some(task) = rx.recv().await {
// //tokio::spawn(handle_task(task));
// }

// // Once all senders have gone out of scope,
// // the `.recv()` call returns None and it will
// // exit from the while loop and shut down the
// // thread.
// });
// });
74 changes: 74 additions & 0 deletions xmpl/mix_blocking_and_async/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_imports)]
#![allow(unused_must_use)]
// or simply #[allow(unused)]
#![allow(dead_code)]
mod async_manager;
#[allow(missing_docs)]
mod synchronous_backend_manager;

use async_manager::AsyncManager;
use std::error::Error;
use synchronous_backend_manager::{OperationHandle, Responder, SynchronousBackendManager};
use tracing::{debug, error, event, info, trace, warn, Level};

async fn some_async_computation(i: u64) -> Result<(), Box<dyn Error>> {
let millis = 1000 - 50 * i;
println!("Task {} sleeping for {} ms.", i, millis);
tokio::time::sleep(tokio::time::Duration::from_millis(millis)).await;
println!("Task {} stopping.", i);
Ok(())
}

// async entrypoint
async fn run() {
tokio::spawn(async move {
for i in 1..=10 {
let res = some_async_computation(i).await;
//tx.send(res).await.unwrap();
}
});
}

// synchronous computations below ---------------------

fn some_sync_computation(cmd: Command, resp: &dyn Responder<String>) {
match cmd {
Command::Run(i) => {
let millis = 1000 - 50 * i;
resp.respond_with(format!("Task {} sleeping for {} ms.", i, millis));
std::thread::sleep(std::time::Duration::from_millis(millis));
resp.respond_with(format!("Task {} stopping.", i));
}
_ => resp.respond_with("Not implemented".to_string()),
}
}

#[derive(Clone)]
enum Command {
Run(u64),
Other,
}

// structure the application as largely synchronous, with smaller or logically distinct asynchronous portions.
//For instance, a GUI application might want to run the GUI code on the main thread and run a Tokio runtime next to it on another thread.
fn main() -> Result<(), Box<dyn Error>> {
tracing_subscriber::fmt::init();

let mut m = SynchronousBackendManager::create()?;

let (id, mut handle) = m.create_operation(some_sync_computation);

// testing on another thread
let sync_code = std::thread::spawn(move || {
handle.send_command(Command::Run(0));
// while let Some(out) = handle.get_response_blocking() {
// info!("{out}");
// }
// println!("done!")
});
sync_code.join().unwrap();

Ok(())
}
171 changes: 171 additions & 0 deletions xmpl/mix_blocking_and_async/src/synchronous_backend_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use rayon::ThreadPoolBuildError;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{debug, error, event, info, trace, warn, Level};

pub struct SynchronousBackendManager {
pool: rayon::ThreadPool,
operations: HashMap<u32, Arc<dyn Any>>,
next_id: u32,
}

impl SynchronousBackendManager {
const N: usize = 0; // auto

pub fn create() -> Result<Self, ThreadPoolBuildError> {
// Build the threadpool
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(Self::N)
.build()?;
Ok(Self {
pool,
operations: HashMap::new(),
next_id: 0,
})
}

pub fn create_operation<I, O, OP>(&mut self, opr: OP) -> (u32, OperationHandle<I, O>)
where
OP: Fn(I, &dyn Responder<O>) + Send + 'static, // &dyn Responder<O>
I: Clone + Send + 'static,
O: Clone + Send + 'static,
{
let operation_arc = Arc::new(Operation::new());
// do stuff with operation before it gets consumed
// Receiver does not implement clone; new Receiver handles are created by calling Sender::subscribe.
let mut new_in_rx = operation_arc.in_tx.subscribe();
// prepare a clone to be consumed by the pool thread
let handle = OperationHandle::new(&operation_arc);
let operation_arc_clone = operation_arc.clone();
self.operations.insert(self.next_id, operation_arc);
self.next_id += 1;

// `install` executes the closure within the threadpool.
// Any attempts to use join, scope, or parallel iterators will then operate within that threadpool.
self.pool.spawn(move || {
loop {
match new_in_rx.blocking_recv() {
Ok(cmd) => opr(cmd, operation_arc_clone.as_ref()),
// all Sender halves have dropped, indicating that no further values can be sent on the channel.
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(n)) => {
warn!("The receiver lagged too far behind. Next receive will return the oldest message still retained by the channel. {} skipped messages.", n);
continue;
},
};
}
});
(self.next_id - 1, handle)
}

// pub fn get_new_handle_for(&self, id: u32) -> OperationHandle<I, O> {
// self.operations.get(&id).map( |v| OperationHandle::new(v))
// }
}

use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::{self, Receiver, Sender};

// wraps in / out queues for a given operation
// I = inputs to the synchronous operation
// O = outputs of the synchronous operation
struct Operation<I, O> {
in_tx: Sender<I>,
out_tx: Sender<O>,
}

impl<I, O> Operation<I, O>
where
I: Clone,
O: Clone,
{
fn new() -> Self {
let (in_tx, _) = broadcast::channel(16);
let (out_tx, _) = broadcast::channel(16);
Self { in_tx, out_tx }
}
}

// trait passed to the synchronous operation for it to respond to the handler
pub trait Responder<O>: Sync + Send {
fn respond_with(&self, out: O);
}

impl<I: Send, O: Send> Responder<O> for Operation<I, O> {
fn respond_with(&self, resp: O) {
// A send operation can only fail if there are no active receivers, implying that the message could never be received.
self.out_tx.send(resp).unwrap_or_else(|error| {
//info!("There are no active receivers: {:?}", error);
0
});
}
}

pub struct OperationHandle<I, O> {
operation: Arc<Operation<I, O>>, // smart pointer to the Operation
out_rx: broadcast::Receiver<O>,
}

impl<I, O> OperationHandle<I, O>
where
I: Clone,
O: Clone,
{
fn new(operation: &Arc<Operation<I, O>>) -> OperationHandle<I, O> {
Self {
operation: operation.clone(),
out_rx: operation.out_tx.subscribe(),
}
}

pub fn send_command(&self, cmd: I) {
self.operation.in_tx.send(cmd).unwrap_or_else(|err| {
//info!("There are no active receivers: {:?}", err);
0
});
}

pub fn get_response_blocking(&mut self) -> Option<O> {
match self.out_rx.blocking_recv() {
Ok(out_msg) => Some(out_msg),
// all Sender have dropped, indicating that no further values can be sent on the channel.
Err(RecvError::Closed) => None,
Err(RecvError::Lagged(n)) => {
warn!("The receiver lagged too far behind. Next receive will return the oldest message still retained by the channel. {} skipped messages.", n);
match self.out_rx.blocking_recv() {
Ok(out_msg) => Some(out_msg),
Err(RecvError::Closed) => None,
_ => panic!("Retrieved `RecvError::Lagged` twice"),
}
}
}
}

pub async fn get_response(&mut self) -> Option<O> {
match self.out_rx.recv().await {
Ok(out_msg) => Some(out_msg),
// all Sender have dropped, indicating that no further values can be sent on the channel.
Err(RecvError::Closed) => None,
Err(RecvError::Lagged(n)) => {
warn!("The receiver lagged too far behind. Next receive will return the oldest message still retained by the channel. {} skipped messages.", n);
match self.out_rx.recv().await {
Ok(out_msg) => Some(out_msg),
Err(RecvError::Closed) => None,
_ => panic!("Retrieved `RecvError::Lagged` twice"),
}
}
}
}
}

// Cloning creates a new Handle that can be used on another thread and, within it, a new receiver
impl<I, O> Clone for OperationHandle<I, O> {
fn clone(&self) -> Self {
let receiver = self.operation.out_tx.subscribe();
Self {
operation: self.operation.clone(),
out_rx: receiver,
}
}
}

0 comments on commit 2060b97

Please sign in to comment.