From 92a615d3b50cd15123f0d9cf7043c2de389e695a Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 17 Feb 2022 11:09:27 +0100 Subject: [PATCH 1/8] update pubsub examples --- examples/Cargo.toml | 1 + examples/ws_sub_with_params.rs | 28 +++++++++++----- examples/ws_subscription.rs | 61 ++++++++++++++++++++++++++-------- 3 files changed, 69 insertions(+), 21 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index dfc9e15a66..377ae9e481 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -14,6 +14,7 @@ jsonrpsee = { path = "../jsonrpsee", features = ["full"] } tracing = "0.1" tracing-subscriber = { version = "0.3.3", features = ["env-filter"] } tokio = { version = "1.8", features = ["full"] } +tokio-stream = { version = "0.1", features = ["sync"] } [[example]] name = "http" diff --git a/examples/ws_sub_with_params.rs b/examples/ws_sub_with_params.rs index df8cd32729..3ef8533940 100644 --- a/examples/ws_sub_with_params.rs +++ b/examples/ws_sub_with_params.rs @@ -25,11 +25,15 @@ // DEALINGS IN THE SOFTWARE. use std::net::SocketAddr; +use std::time::Duration; +use futures::StreamExt; use jsonrpsee::core::client::SubscriptionClientT; use jsonrpsee::rpc_params; use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::ws_server::{RpcModule, WsServerBuilder}; +use tokio::time::interval; +use tokio_stream::wrappers::IntervalStream; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -61,21 +65,29 @@ async fn run_server() -> anyhow::Result { let server = WsServerBuilder::default().build("127.0.0.1:0").await?; let mut module = RpcModule::new(()); module - .register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, mut sink, _| { + .register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, sink, _| { let idx: usize = params.one()?; - std::thread::spawn(move || loop { - let _ = sink.send(&LETTERS.chars().nth(idx)); - std::thread::sleep(std::time::Duration::from_millis(50)); + let item = LETTERS.chars().nth(idx); + + let interval = interval(Duration::from_millis(200)); + let stream = IntervalStream::new(interval).map(move |_| item); + + tokio::spawn(async move { + let _ = sink.pipe_from_stream(stream).await; }); Ok(()) }) .unwrap(); module - .register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, mut sink, _| { + .register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, sink, _| { let (one, two): (usize, usize) = params.parse()?; - std::thread::spawn(move || loop { - let _ = sink.send(&LETTERS[one..two].to_string()); - std::thread::sleep(std::time::Duration::from_millis(100)); + let item = &LETTERS[one..two]; + + let interval = interval(Duration::from_millis(200)); + let stream = IntervalStream::new(interval).map(move |_| item); + + tokio::spawn(async move { + let _ = sink.pipe_from_stream(stream).await; }); Ok(()) }) diff --git a/examples/ws_subscription.rs b/examples/ws_subscription.rs index 16cb170d8d..f32754c4af 100644 --- a/examples/ws_subscription.rs +++ b/examples/ws_subscription.rs @@ -24,16 +24,25 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use std::net::SocketAddr; +//! Example that shows how to broadcasts the produced values to active all subscriptions using `mpsc channels`. +//! +//! It's possible to use `tokio::sync::broadcast` too but because the Receiver doesn't implement +//! stream we chose this `mpsc channels` in this example. +use std::{net::SocketAddr, sync::Arc}; + +use futures::channel::mpsc; use jsonrpsee::core::client::{Subscription, SubscriptionClientT}; -use jsonrpsee::core::Error; use jsonrpsee::rpc_params; use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::ws_server::{RpcModule, WsServerBuilder}; +use tokio::sync::Mutex; const NUM_SUBSCRIPTION_RESPONSES: usize = 5; +/// Sinks that can be shared across threads. +type SharedSinks = Arc>>>; + #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::FmtSubscriber::builder() @@ -45,13 +54,12 @@ async fn main() -> anyhow::Result<()> { let url = format!("ws://{}", addr); let client = WsClientBuilder::default().build(&url).await?; - let mut subscribe_hello: Subscription = - client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?; + let mut sub: Subscription = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?; let mut i = 0; while i <= NUM_SUBSCRIPTION_RESPONSES { - let r = subscribe_hello.next().await; - tracing::info!("received {:?}", r); + let r = sub.next().await.unwrap().unwrap(); + tracing::info!("{}", r); i += 1; } @@ -60,13 +68,18 @@ async fn main() -> anyhow::Result<()> { async fn run_server() -> anyhow::Result { let server = WsServerBuilder::default().build("127.0.0.1:0").await?; - let mut module = RpcModule::new(()); - module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", |_, mut sink, _| { - std::thread::spawn(move || loop { - if let Err(Error::SubscriptionClosed(_)) = sink.send(&"hello my friend") { - return; - } - std::thread::sleep(std::time::Duration::from_secs(1)); + let sinks = SharedSinks::default(); + let mut module = RpcModule::new(sinks.clone()); + + // Produce new items for the server to publish. + tokio::spawn(produce_items(sinks)); + + module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", |_, sink, ctx| { + let ctx = ctx.clone(); + tokio::spawn(async move { + let (tx, rx) = mpsc::unbounded(); + ctx.lock().await.push(tx); + let _ = sink.pipe_from_stream(rx).await; }); Ok(()) })?; @@ -74,3 +87,25 @@ async fn run_server() -> anyhow::Result { server.start(module)?; Ok(addr) } + +/// Produce new values that are sent to each active subscription. +async fn produce_items(sinks: SharedSinks) { + let mut count = 0; + loop { + let mut to_remove = Vec::new(); + + for (idx, sink) in sinks.lock().await.iter().enumerate() { + if sink.unbounded_send(count).is_err() { + to_remove.push(idx); + } + } + + // If the channel is closed remove that channel. + for rm in to_remove { + sinks.lock().await.remove(rm); + } + + count += 1; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } +} From efb0dda861a06962c1e76df26987b7a7f0c7b97c Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 17 Feb 2022 11:14:23 +0100 Subject: [PATCH 2/8] Update examples/ws_subscription.rs --- examples/ws_subscription.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/ws_subscription.rs b/examples/ws_subscription.rs index f32754c4af..275cdcb8c2 100644 --- a/examples/ws_subscription.rs +++ b/examples/ws_subscription.rs @@ -27,7 +27,7 @@ //! Example that shows how to broadcasts the produced values to active all subscriptions using `mpsc channels`. //! //! It's possible to use `tokio::sync::broadcast` too but because the Receiver doesn't implement -//! stream we chose this `mpsc channels` in this example. +//! stream thus `mpsc channels` were picked in this example. use std::{net::SocketAddr, sync::Arc}; From 9c0de64eb642e191fd6b639c242defe882172d28 Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 17 Feb 2022 11:17:47 +0100 Subject: [PATCH 3/8] remove some docs --- examples/ws_subscription.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/examples/ws_subscription.rs b/examples/ws_subscription.rs index 275cdcb8c2..7c6db5d888 100644 --- a/examples/ws_subscription.rs +++ b/examples/ws_subscription.rs @@ -25,9 +25,6 @@ // DEALINGS IN THE SOFTWARE. //! Example that shows how to broadcasts the produced values to active all subscriptions using `mpsc channels`. -//! -//! It's possible to use `tokio::sync::broadcast` too but because the Receiver doesn't implement -//! stream thus `mpsc channels` were picked in this example. use std::{net::SocketAddr, sync::Arc}; From 0c62167c3571282d09f9209eb2620e7e3723ee72 Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 17 Feb 2022 11:28:43 +0100 Subject: [PATCH 4/8] remove needless clone --- examples/ws_subscription.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/ws_subscription.rs b/examples/ws_subscription.rs index 7c6db5d888..effba61de2 100644 --- a/examples/ws_subscription.rs +++ b/examples/ws_subscription.rs @@ -72,7 +72,6 @@ async fn run_server() -> anyhow::Result { tokio::spawn(produce_items(sinks)); module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", |_, sink, ctx| { - let ctx = ctx.clone(); tokio::spawn(async move { let (tx, rx) = mpsc::unbounded(); ctx.lock().await.push(tx); From ba8af8b5e9bd217891037c33f6142183aed2d41e Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 17 Feb 2022 12:33:59 +0100 Subject: [PATCH 5/8] simplify example --- examples/ws_subscription.rs | 75 +++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 41 deletions(-) diff --git a/examples/ws_subscription.rs b/examples/ws_subscription.rs index effba61de2..bedd75bd9f 100644 --- a/examples/ws_subscription.rs +++ b/examples/ws_subscription.rs @@ -24,22 +24,22 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! Example that shows how to broadcasts the produced values to active all subscriptions using `mpsc channels`. +//! Example that shows how to broadcasts the produced values to all active subscriptions using `tokio::sync::broadcast`. -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; +use std::time::Duration; -use futures::channel::mpsc; +use futures::future; +use futures::StreamExt; use jsonrpsee::core::client::{Subscription, SubscriptionClientT}; use jsonrpsee::rpc_params; use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::ws_server::{RpcModule, WsServerBuilder}; -use tokio::sync::Mutex; +use tokio::sync::broadcast; +use tokio_stream::wrappers::BroadcastStream; const NUM_SUBSCRIPTION_RESPONSES: usize = 5; -/// Sinks that can be shared across threads. -type SharedSinks = Arc>>>; - #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::FmtSubscriber::builder() @@ -50,32 +50,35 @@ async fn main() -> anyhow::Result<()> { let addr = run_server().await?; let url = format!("ws://{}", addr); - let client = WsClientBuilder::default().build(&url).await?; - let mut sub: Subscription = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?; + let client1 = WsClientBuilder::default().build(&url).await?; + let client2 = WsClientBuilder::default().build(&url).await?; + let sub1: Subscription = client1.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?; + let sub2: Subscription = client2.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?; - let mut i = 0; - while i <= NUM_SUBSCRIPTION_RESPONSES { - let r = sub.next().await.unwrap().unwrap(); - tracing::info!("{}", r); - i += 1; - } + let fut1 = sub1.take(NUM_SUBSCRIPTION_RESPONSES).for_each(|r| async move { tracing::info!("sub1 rx: {:?}", r) }); + let fut2 = sub2.take(NUM_SUBSCRIPTION_RESPONSES).for_each(|r| async move { tracing::info!("sub2 rx: {:?}", r) }); + + future::join(fut1, fut2).await; Ok(()) } async fn run_server() -> anyhow::Result { let server = WsServerBuilder::default().build("127.0.0.1:0").await?; - let sinks = SharedSinks::default(); - let mut module = RpcModule::new(sinks.clone()); + let mut module = RpcModule::new(()); + let (tx, _) = broadcast::channel(1024); + + tokio::spawn(produce_items(tx.clone())); - // Produce new items for the server to publish. - tokio::spawn(produce_items(sinks)); + module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, sink, _| { + let rx = tx.subscribe(); + + // Convert stream from `Item = Result` to `Item = T::Serialize`. + let stream = + BroadcastStream::new(rx).take_while(|r| future::ready(r.is_ok())).filter_map(|r| future::ready(r.ok())); - module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", |_, sink, ctx| { tokio::spawn(async move { - let (tx, rx) = mpsc::unbounded(); - ctx.lock().await.push(tx); - let _ = sink.pipe_from_stream(rx).await; + let _ = sink.pipe_from_stream(stream).await; }); Ok(()) })?; @@ -84,24 +87,14 @@ async fn run_server() -> anyhow::Result { Ok(addr) } -/// Produce new values that are sent to each active subscription. -async fn produce_items(sinks: SharedSinks) { - let mut count = 0; +// Naive example that broadcasts the produced values to all subscribers. +async fn produce_items(tx: broadcast::Sender) { + let mut i = 0; loop { - let mut to_remove = Vec::new(); - - for (idx, sink) in sinks.lock().await.iter().enumerate() { - if sink.unbounded_send(count).is_err() { - to_remove.push(idx); - } - } - - // If the channel is closed remove that channel. - for rm in to_remove { - sinks.lock().await.remove(rm); - } - - count += 1; - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + // This might fail if no receivers are alive + // could occur if no subscriptions are active... + let _ = tx.send(i); + i += 1; + tokio::time::sleep(Duration::from_secs(1)).await; } } From 0e71e563fbfffa4a92dce9ee3b004072c095799a Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 23 Feb 2022 13:06:41 +0100 Subject: [PATCH 6/8] simplify code with async-broadcast channel --- examples/Cargo.toml | 9 +++++---- ...subscription.rs => ws_pubsub_broadcast.rs} | 19 +++++-------------- ...ith_params.rs => ws_pubsub_with_params.rs} | 0 3 files changed, 10 insertions(+), 18 deletions(-) rename examples/{ws_subscription.rs => ws_pubsub_broadcast.rs} (84%) rename examples/{ws_sub_with_params.rs => ws_pubsub_with_params.rs} (100%) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 377ae9e481..ff9e7e128a 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -8,6 +8,7 @@ publish = false [dev-dependencies] anyhow = "1" +async-broadcast = "0.3" env_logger = "0.9" futures = "0.3" jsonrpsee = { path = "../jsonrpsee", features = ["full"] } @@ -37,12 +38,12 @@ name = "ws" path = "ws.rs" [[example]] -name = "ws_subscription" -path = "ws_subscription.rs" +name = "ws_pubsub_broadcast" +path = "ws_pubsub_broadcast.rs" [[example]] -name = "ws_sub_with_params" -path = "ws_sub_with_params.rs" +name = "ws_pubsub_with_params" +path = "ws_pubsub_with_params.rs" [[example]] name = "proc_macro" diff --git a/examples/ws_subscription.rs b/examples/ws_pubsub_broadcast.rs similarity index 84% rename from examples/ws_subscription.rs rename to examples/ws_pubsub_broadcast.rs index bedd75bd9f..cb9328b0ce 100644 --- a/examples/ws_subscription.rs +++ b/examples/ws_pubsub_broadcast.rs @@ -35,8 +35,6 @@ use jsonrpsee::core::client::{Subscription, SubscriptionClientT}; use jsonrpsee::rpc_params; use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::ws_server::{RpcModule, WsServerBuilder}; -use tokio::sync::broadcast; -use tokio_stream::wrappers::BroadcastStream; const NUM_SUBSCRIPTION_RESPONSES: usize = 5; @@ -66,19 +64,15 @@ async fn main() -> anyhow::Result<()> { async fn run_server() -> anyhow::Result { let server = WsServerBuilder::default().build("127.0.0.1:0").await?; let mut module = RpcModule::new(()); - let (tx, _) = broadcast::channel(1024); + let (tx, rx) = async_broadcast::broadcast(16); tokio::spawn(produce_items(tx.clone())); module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, sink, _| { - let rx = tx.subscribe(); - - // Convert stream from `Item = Result` to `Item = T::Serialize`. - let stream = - BroadcastStream::new(rx).take_while(|r| future::ready(r.is_ok())).filter_map(|r| future::ready(r.ok())); + let rx = rx.clone(); tokio::spawn(async move { - let _ = sink.pipe_from_stream(stream).await; + let _ = sink.pipe_from_stream(rx).await; }); Ok(()) })?; @@ -88,12 +82,9 @@ async fn run_server() -> anyhow::Result { } // Naive example that broadcasts the produced values to all subscribers. -async fn produce_items(tx: broadcast::Sender) { +async fn produce_items(tx: async_broadcast::Sender) { let mut i = 0; - loop { - // This might fail if no receivers are alive - // could occur if no subscriptions are active... - let _ = tx.send(i); + while let Ok(_) = tx.broadcast(i).await { i += 1; tokio::time::sleep(Duration::from_secs(1)).await; } diff --git a/examples/ws_sub_with_params.rs b/examples/ws_pubsub_with_params.rs similarity index 100% rename from examples/ws_sub_with_params.rs rename to examples/ws_pubsub_with_params.rs From 18580d96f5c506da17bf7334becbe661fefc9c3f Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sat, 2 Apr 2022 16:41:05 +0200 Subject: [PATCH 7/8] use tokio broadcast for smaller dependency tree --- examples/Cargo.toml | 1 - examples/ws_pubsub_broadcast.rs | 28 +++++++++++++++++----------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index ff9e7e128a..a4510f7807 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -8,7 +8,6 @@ publish = false [dev-dependencies] anyhow = "1" -async-broadcast = "0.3" env_logger = "0.9" futures = "0.3" jsonrpsee = { path = "../jsonrpsee", features = ["full"] } diff --git a/examples/ws_pubsub_broadcast.rs b/examples/ws_pubsub_broadcast.rs index cb9328b0ce..4162c574ef 100644 --- a/examples/ws_pubsub_broadcast.rs +++ b/examples/ws_pubsub_broadcast.rs @@ -27,7 +27,6 @@ //! Example that shows how to broadcasts the produced values to all active subscriptions using `tokio::sync::broadcast`. use std::net::SocketAddr; -use std::time::Duration; use futures::future; use futures::StreamExt; @@ -35,6 +34,8 @@ use jsonrpsee::core::client::{Subscription, SubscriptionClientT}; use jsonrpsee::rpc_params; use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::ws_server::{RpcModule, WsServerBuilder}; +use tokio::sync::broadcast; +use tokio_stream::wrappers::BroadcastStream; const NUM_SUBSCRIPTION_RESPONSES: usize = 5; @@ -64,15 +65,16 @@ async fn main() -> anyhow::Result<()> { async fn run_server() -> anyhow::Result { let server = WsServerBuilder::default().build("127.0.0.1:0").await?; let mut module = RpcModule::new(()); - let (tx, rx) = async_broadcast::broadcast(16); + let (tx, _rx) = broadcast::channel(16); + let tx2 = tx.clone(); - tokio::spawn(produce_items(tx.clone())); + std::thread::spawn(move || produce_items(tx2)); module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, sink, _| { - let rx = rx.clone(); + let rx = BroadcastStream::new(tx.clone().subscribe()); tokio::spawn(async move { - let _ = sink.pipe_from_stream(rx).await; + let _ = sink.pipe_from_try_stream(rx).await; }); Ok(()) })?; @@ -81,11 +83,15 @@ async fn run_server() -> anyhow::Result { Ok(addr) } -// Naive example that broadcasts the produced values to all subscribers. -async fn produce_items(tx: async_broadcast::Sender) { - let mut i = 0; - while let Ok(_) = tx.broadcast(i).await { - i += 1; - tokio::time::sleep(Duration::from_secs(1)).await; +// Naive example that broadcasts the produced values to all active subscribers. +fn produce_items(tx: broadcast::Sender) { + for c in 1..=100 { + std::thread::sleep(std::time::Duration::from_secs(1)); + + // This might fail if no receivers are alive, could occur if no subscriptions are active... + // Also be aware that this will succeed when at least one receiver is alive + // Thus, clients connecting at different point in time will not receive + // the items sent before the subscription got established. + let _ = tx.send(c); } } From 241bd8acd370c52039a612f5e73283670477feb9 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sat, 2 Apr 2022 17:04:07 +0200 Subject: [PATCH 8/8] Update examples/ws_pubsub_broadcast.rs --- examples/ws_pubsub_broadcast.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/ws_pubsub_broadcast.rs b/examples/ws_pubsub_broadcast.rs index 4162c574ef..4759c6cb73 100644 --- a/examples/ws_pubsub_broadcast.rs +++ b/examples/ws_pubsub_broadcast.rs @@ -24,7 +24,7 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! Example that shows how to broadcasts the produced values to all active subscriptions using `tokio::sync::broadcast`. +//! Example that shows how to broadcast to all active subscriptions using `tokio::sync::broadcast`. use std::net::SocketAddr;