From 72024eaea84b7ced204c395eb4a5077f07ad4cb4 Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 5 Jan 2022 12:59:44 +0100 Subject: [PATCH 01/22] feat(rpc module): add_stream to subscription sink --- core/Cargo.toml | 2 + core/src/server/helpers.rs | 24 ++++++++++- core/src/server/rpc_module.rs | 70 +++++++++++++++++++++++++------- http-server/src/server.rs | 3 +- tests/tests/integration_tests.rs | 38 +++++++++++++++++ ws-server/Cargo.toml | 1 + ws-server/src/server.rs | 34 +++++++++++----- 7 files changed, 146 insertions(+), 26 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index d1967ab828..d7f9381712 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -11,6 +11,7 @@ anyhow = "1" arrayvec = "0.7.1" async-trait = "0.1" beef = { version = "0.5.1", features = ["impl_serde"] } +async-channel = { version = "1.6", optional = true } thiserror = "1" futures-channel = { version = "0.3.14", default-features = false } futures-util = { version = "0.3.14", default-features = false, optional = true } @@ -29,6 +30,7 @@ tokio = { version = "1.8", features = ["rt"], optional = true } default = [] http-helpers = ["futures-util"] server = [ + "async-channel", "futures-util", "rustc-hash", "tracing", diff --git a/core/src/server/helpers.rs b/core/src/server/helpers.rs index 5ef8b56248..942f136da6 100644 --- a/core/src/server/helpers.rs +++ b/core/src/server/helpers.rs @@ -28,13 +28,15 @@ use std::io; use crate::{to_json_raw_value, Error}; use futures_channel::mpsc; -use futures_util::stream::StreamExt; +use futures_util::task::{Context, Poll}; +use futures_util::{Sink, StreamExt}; use jsonrpsee_types::error::{ CallError, ErrorCode, ErrorObject, ErrorResponse, CALL_EXECUTION_FAILED_CODE, OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG, UNKNOWN_ERROR_CODE, }; use jsonrpsee_types::{Id, InvalidRequest, Response}; use serde::Serialize; +use std::pin::Pin; /// Bounded writer that allows writing at most `max_len` bytes. /// @@ -90,6 +92,26 @@ pub struct MethodSink { max_response_size: u32, } +impl Sink for MethodSink { + type Error = Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.tx).poll_ready(cx).map_err(Into::into) + } + + fn start_send(mut self: Pin<&mut Self>, item: String) -> Result<(), Self::Error> { + Pin::new(&mut self.tx).start_send(item).map_err(Into::into) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.tx).poll_flush(cx).map_err(Into::into) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.tx).poll_close(cx).map_err(Into::into) + } +} + impl MethodSink { /// Create a new `MethodSink` with unlimited response size pub fn new(tx: mpsc::UnboundedSender) -> Self { diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 8911de999c..3eafb5571e 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -38,7 +38,7 @@ use crate::to_json_raw_value; use crate::traits::{IdProvider, ToRpcParams}; use beef::Cow; use futures_channel::{mpsc, oneshot}; -use futures_util::{future::BoxFuture, FutureExt, StreamExt}; +use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt}; use jsonrpsee_types::error::{invalid_subscription_err, ErrorCode, CALL_EXECUTION_FAILED_CODE}; use jsonrpsee_types::{ Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload, SubscriptionResponse, @@ -51,16 +51,29 @@ use serde::{de::DeserializeOwned, Serialize}; /// implemented as a function pointer to a `Fn` function taking four arguments: /// the `id`, `params`, a channel the function uses to communicate the result (or error) /// back to `jsonrpsee`, and the connection ID (useful for the websocket transport). -pub type SyncMethod = Arc bool>; +pub type SyncMethod = + Arc bool>; /// Similar to [`SyncMethod`], but represents an asynchronous handler and takes an additional argument containing a [`ResourceGuard`] if configured. pub type AsyncMethod<'a> = Arc< - dyn Send + Sync + Fn(Id<'a>, Params<'a>, MethodSink, Option, &dyn IdProvider) -> BoxFuture<'a, bool>, + dyn Send + + Sync + + Fn( + Id<'a>, + Params<'a>, + MethodSink, + ConnectionState, + Option, + &dyn IdProvider, + ) -> BoxFuture<'a, bool>, >; /// Connection ID, used for stateful protocol such as WebSockets. /// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value. pub type ConnectionId = usize; /// Raw RPC response. pub type RawRpcResponse = (String, mpsc::UnboundedReceiver, mpsc::UnboundedSender); +/// Connection state for stateful protocols such as WebSocket +/// This is used to keep track whether the connection has been closed. +pub type ConnectionState = Option>; type Subscribers = Arc)>>>; @@ -157,6 +170,7 @@ impl MethodCallback { pub fn execute( &self, sink: &MethodSink, + conn: ConnectionState, req: Request<'_>, conn_id: ConnectionId, claimed: Option, @@ -174,7 +188,7 @@ impl MethodCallback { conn_id ); - let result = (callback)(id, params, sink, conn_id, id_gen); + let result = (callback)(id, params, sink, conn, conn_id, id_gen); // Release claimed resources drop(claimed); @@ -192,7 +206,7 @@ impl MethodCallback { conn_id ); - MethodResult::Async((callback)(id, params, sink, claimed, id_gen)) + MethodResult::Async((callback)(id, params, sink, conn, claimed, id_gen)) } }; @@ -310,13 +324,14 @@ impl Methods { pub fn execute( &self, sink: &MethodSink, + conn: ConnectionState, req: Request, conn_id: ConnectionId, id_gen: &dyn IdProvider, ) -> MethodResult { tracing::trace!("[Methods::execute] Executing request: {:?}", req); match self.callbacks.get(&*req.method) { - Some(callback) => callback.execute(sink, req, conn_id, None, id_gen), + Some(callback) => callback.execute(sink, conn, req, conn_id, None, id_gen), None => { sink.send_error(req.id, ErrorCode::MethodNotFound.into()); MethodResult::Sync(false) @@ -329,6 +344,7 @@ impl Methods { pub fn execute_with_resources<'r>( &self, sink: &MethodSink, + rx: ConnectionState, req: Request<'r>, conn_id: ConnectionId, resources: &Resources, @@ -337,7 +353,7 @@ impl Methods { tracing::trace!("[Methods::execute_with_resources] Executing request: {:?}", req); match self.callbacks.get_key_value(&*req.method) { Some((&name, callback)) => match callback.claim(&req.method, resources) { - Ok(guard) => Ok((name, callback.execute(sink, req, conn_id, Some(guard), id_gen))), + Ok(guard) => Ok((name, callback.execute(sink, rx, req, conn_id, Some(guard), id_gen))), Err(err) => { tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err); sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); @@ -426,8 +442,9 @@ impl Methods { async fn inner_call(&self, req: Request<'_>) -> RawRpcResponse { let (tx, mut rx) = mpsc::unbounded(); let sink = MethodSink::new(tx.clone()); + let (_tx, rx_2) = async_channel::unbounded(); - if let MethodResult::Async(fut) = self.execute(&sink, req, 0, &RandomIntegerIdProvider) { + if let MethodResult::Async(fut) = self.execute(&sink, Some(rx_2), req, 0, &RandomIntegerIdProvider) { fut.await; } @@ -527,7 +544,7 @@ impl RpcModule { let ctx = self.ctx.clone(); let callback = self.methods.verify_and_insert( method_name, - MethodCallback::new_sync(Arc::new(move |id, params, sink, _, _| match callback(params, &*ctx) { + MethodCallback::new_sync(Arc::new(move |id, params, sink, _, _, _| match callback(params, &*ctx) { Ok(res) => sink.send_response(id, res), Err(err) => sink.send_call_error(id, err), })), @@ -550,7 +567,7 @@ impl RpcModule { let ctx = self.ctx.clone(); let callback = self.methods.verify_and_insert( method_name, - MethodCallback::new_async(Arc::new(move |id, params, sink, claimed, _| { + MethodCallback::new_async(Arc::new(move |id, params, sink, _, claimed, _| { let ctx = ctx.clone(); let future = async move { let result = match callback(params, ctx).await { @@ -585,7 +602,7 @@ impl RpcModule { let ctx = self.ctx.clone(); let callback = self.methods.verify_and_insert( method_name, - MethodCallback::new_async(Arc::new(move |id, params, sink, claimed, _| { + MethodCallback::new_async(Arc::new(move |id, params, sink, _, claimed, _| { let ctx = ctx.clone(); tokio::task::spawn_blocking(move || { @@ -671,7 +688,7 @@ impl RpcModule { let subscribers = subscribers.clone(); self.methods.mut_callbacks().insert( subscribe_method_name, - MethodCallback::new_sync(Arc::new(move |id, params, method_sink, conn_id, id_provider| { + MethodCallback::new_sync(Arc::new(move |id, params, method_sink, conn, conn_id, id_provider| { let (conn_tx, conn_rx) = oneshot::channel::<()>(); let sub_id = { @@ -687,6 +704,7 @@ impl RpcModule { let sink = SubscriptionSink { inner: method_sink.clone(), + close: conn, method: notif_method_name, subscribers: subscribers.clone(), uniq_sub: SubscriptionKey { conn_id, sub_id }, @@ -710,7 +728,7 @@ impl RpcModule { { self.methods.mut_callbacks().insert( unsubscribe_method_name, - MethodCallback::new_sync(Arc::new(move |id, params, sink, conn_id, _| { + MethodCallback::new_sync(Arc::new(move |id, params, sink, _, conn_id, _| { let sub_id = match params.one::() { Ok(sub_id) => sub_id, Err(_) => { @@ -764,6 +782,8 @@ impl RpcModule { pub struct SubscriptionSink { /// Sink. inner: MethodSink, + /// Close + close: ConnectionState, /// MethodCallback. method: &'static str, /// Unique subscription. @@ -786,6 +806,28 @@ impl SubscriptionSink { self.inner_send(msg).map_err(Into::into) } + /// Consume the sink by passing a stream to be sent via the sink. + pub async fn add_stream(mut self, mut stream: S) + where + S: Stream + Unpin, + T: Serialize, + { + let mut rx = self.close.take().expect("close must be Some; please file an issue"); + + loop { + tokio::select! { + Some(item) = stream.next() => { + if let Err(Error::SubscriptionClosed(_)) = self.send(&item) { + break; + } + }, + // "close" only returns None when connection closed. + _ = rx.next() => break, + else => break, + } + } + } + /// Returns whether this channel is closed without needing a context. pub fn is_closed(&self) -> bool { self.inner.is_closed() @@ -806,7 +848,7 @@ impl SubscriptionSink { self.inner.send_raw(msg).map_err(|_| Some(SubscriptionClosedReason::ConnectionReset)) } Some(_) => Err(Some(SubscriptionClosedReason::Unsubscribed)), - // NOTE(niklasad1): this should be unreachble, after the first error is detected the subscription is closed. + // NOTE(niklasad1): this should be unreachable, after the first error is detected the subscription is closed. None => Err(None), }; diff --git a/http-server/src/server.rs b/http-server/src/server.rs index fbbc86e779..016ab8092b 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -335,7 +335,7 @@ impl Server { middleware.on_call(req.method.as_ref()); // NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here. - match methods.execute_with_resources(&sink, req, 0, &resources, &NoopIdProvider) { + match methods.execute_with_resources(&sink, None, req, 0, &resources, &NoopIdProvider) { Ok((name, MethodResult::Sync(success))) => { middleware.on_result(name, success, request_start); } @@ -363,6 +363,7 @@ impl Server { join_all(batch.into_iter().filter_map( move |req| match methods.execute_with_resources( &sink, + None, req, 0, &resources, diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 874e504c12..7bebb5a143 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -359,6 +359,44 @@ async fn ws_server_should_stop_subscription_after_client_drop() { assert!(matches!(close_err.close_reason(), &SubscriptionClosedReason::ConnectionReset)); } +#[tokio::test] +async fn ws_server_cancel_stream_after_reset_conn() { + use futures::{channel::mpsc, SinkExt, StreamExt}; + use jsonrpsee::{ws_server::WsServerBuilder, RpcModule}; + + let server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap(); + let server_url = format!("ws://{}", server.local_addr().unwrap()); + + let (tx, mut rx) = mpsc::channel(1); + let mut module = RpcModule::new(tx); + + module + .register_subscription("subscribe_never_produce", "n", "unsubscribe_never_produce", |_, sink, mut tx| { + // create stream that doesn't produce items. + let stream = futures::stream::empty::(); + tokio::spawn(async move { + sink.add_stream(stream).await; + let send_back = Arc::make_mut(&mut tx); + send_back.feed(()).await.unwrap(); + }); + Ok(()) + }) + .unwrap(); + + server.start(module).unwrap(); + + let client = WsClientBuilder::default().build(&server_url).await.unwrap(); + let _sub1: Subscription = + client.subscribe("subscribe_never_produce", None, "unsubscribe_never_produce").await.unwrap(); + let _sub2: Subscription = + client.subscribe("subscribe_never_produce", None, "unsubscribe_never_produce").await.unwrap(); + + // terminate connection. + drop(client); + assert_eq!(Some(()), rx.next().await, "subscription stream should be terminated after the client was dropped"); + assert_eq!(Some(()), rx.next().await, "subscription stream should be terminated after the client was dropped"); +} + #[tokio::test] async fn ws_batch_works() { let server_addr = websocket_server().await; diff --git a/ws-server/Cargo.toml b/ws-server/Cargo.toml index ff6123e45d..f8674bee5d 100644 --- a/ws-server/Cargo.toml +++ b/ws-server/Cargo.toml @@ -10,6 +10,7 @@ homepage = "https://github.com/paritytech/jsonrpsee" documentation = "https://docs.rs/jsonrpsee-ws-server" [dependencies] +async-channel = "1.6.1" futures-channel = "0.3.14" futures-util = { version = "0.3.14", default-features = false, features = ["io", "async-await-macro"] } jsonrpsee-types = { path = "../types", version = "0.7.0" } diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index d23ec05b96..f8e9651d1d 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -137,6 +137,8 @@ impl Server { }, ))); + tracing::info!("Accepting new connection, {}/{}", connections.count(), self.cfg.max_connections); + id = id.wrapping_add(1); } Err(MonitoredError::Selector(err)) => { @@ -293,6 +295,8 @@ async fn background_task( builder.set_max_message_size(max_request_body_size as usize); let (mut sender, mut receiver) = builder.finish(); let (tx, mut rx) = mpsc::unbounded::(); + let (conn_tx, conn_rx) = async_channel::unbounded(); + let stop_server2 = stop_server.clone(); let sink = MethodSink::new_with_limit(tx, max_request_body_size); @@ -301,21 +305,21 @@ async fn background_task( // Send results back to the client. tokio::spawn(async move { while !stop_server2.shutdown_requested() { - match rx.next().await { - Some(response) => { - // If websocket message send fail then terminate the connection. - if let Err(err) = send_ws_message(&mut sender, response).await { - tracing::error!("WS transport error: {:?}; terminate connection", err); - break; - } + if let Some(response) = rx.next().await { + // If websocket message send fail then terminate the connection. + if let Err(err) = send_ws_message(&mut sender, response).await { + tracing::error!("WS transport error: {:?}; terminate connection", err); + break; } - None => break, - }; + } else { + break; + } } // terminate connection. let _ = sender.close().await; // NOTE(niklasad1): when the receiver is dropped no further requests or subscriptions // will be possible. + drop(conn_tx); }); // Buffer for incoming data. @@ -370,7 +374,15 @@ async fn background_task( tracing::debug!("recv method call={}", req.method); tracing::trace!("recv: req={:?}", req); - match methods.execute_with_resources(&sink, req, conn_id, &resources, &*id_provider) { + + match methods.execute_with_resources( + &sink, + Some(conn_rx.clone()), + req, + conn_id, + &resources, + &*id_provider, + ) { Ok((name, MethodResult::Sync(success))) => { middleware.on_result(name, success, request_start); middleware.on_response(request_start); @@ -405,6 +417,7 @@ async fn background_task( let sink = sink.clone(); let id_provider = id_provider.clone(); + let conn_rx2 = conn_rx.clone(); let fut = async move { // Batch responses must be sent back as a single message so we read the results from each // request in the batch and read the results off of a new channel, `rx_batch`, and then send the @@ -418,6 +431,7 @@ async fn background_task( join_all(batch.into_iter().filter_map(move |req| { match methods.execute_with_resources( &sink_batch, + Some(conn_rx2.clone()), req, conn_id, resources, From fe176df492806ee7f17b8854c7752c151e8ebf8d Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 5 Jan 2022 18:41:54 +0100 Subject: [PATCH 02/22] fix some nits --- core/src/server/helpers.rs | 20 -------------------- core/src/server/rpc_module.rs | 11 +++++++++-- 2 files changed, 9 insertions(+), 22 deletions(-) diff --git a/core/src/server/helpers.rs b/core/src/server/helpers.rs index 942f136da6..ae188bf92e 100644 --- a/core/src/server/helpers.rs +++ b/core/src/server/helpers.rs @@ -92,26 +92,6 @@ pub struct MethodSink { max_response_size: u32, } -impl Sink for MethodSink { - type Error = Error; - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.tx).poll_ready(cx).map_err(Into::into) - } - - fn start_send(mut self: Pin<&mut Self>, item: String) -> Result<(), Self::Error> { - Pin::new(&mut self.tx).start_send(item).map_err(Into::into) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.tx).poll_flush(cx).map_err(Into::into) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.tx).poll_close(cx).map_err(Into::into) - } -} - impl MethodSink { /// Create a new `MethodSink` with unlimited response size pub fn new(tx: mpsc::UnboundedSender) -> Self { diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 3eafb5571e..b53462f6a9 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -821,8 +821,15 @@ impl SubscriptionSink { break; } }, - // "close" only returns None when connection closed. - _ = rx.next() => break, + v = rx.next() => { + if let Some(_) = v { + // No messages should be sent over this channel. + () + } else { + // Channel dropped by the server. + break; + } + }, else => break, } } From a29f988cf96ff04b8283423d44419e6670596791 Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 12 Jan 2022 15:21:04 +0100 Subject: [PATCH 03/22] unify parameters to rpc methods --- core/src/server/rpc_module.rs | 113 +++++++++++++++++----------------- http-server/src/server.rs | 5 +- ws-server/src/server.rs | 25 +++----- 3 files changed, 65 insertions(+), 78 deletions(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index b53462f6a9..c7165616c3 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -51,20 +51,10 @@ use serde::{de::DeserializeOwned, Serialize}; /// implemented as a function pointer to a `Fn` function taking four arguments: /// the `id`, `params`, a channel the function uses to communicate the result (or error) /// back to `jsonrpsee`, and the connection ID (useful for the websocket transport). -pub type SyncMethod = - Arc bool>; +pub type SyncMethod = Arc bool>; /// Similar to [`SyncMethod`], but represents an asynchronous handler and takes an additional argument containing a [`ResourceGuard`] if configured. pub type AsyncMethod<'a> = Arc< - dyn Send - + Sync - + Fn( - Id<'a>, - Params<'a>, - MethodSink, - ConnectionState, - Option, - &dyn IdProvider, - ) -> BoxFuture<'a, bool>, + dyn Send + Sync + Fn(Id<'a>, Params<'a>, MethodSink, ConnectionId, Option) -> BoxFuture<'a, bool>, >; /// Connection ID, used for stateful protocol such as WebSockets. /// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value. @@ -73,7 +63,23 @@ pub type ConnectionId = usize; pub type RawRpcResponse = (String, mpsc::UnboundedReceiver, mpsc::UnboundedSender); /// Connection state for stateful protocols such as WebSocket /// This is used to keep track whether the connection has been closed. -pub type ConnectionState = Option>; +pub type MaybeConnState<'a> = Option>; + +/// Data for stateful connections. +pub struct ConnState<'a> { + /// Connection ID + pub conn_id: ConnectionId, + /// Channel to know whether the connection is closed or not. + pub close: async_channel::Receiver<()>, + /// ID provider. + pub id_provider: &'a dyn IdProvider, +} + +impl<'a> std::fmt::Debug for ConnState<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Server").field("conn_id", &self.conn_id).field("close", &self.close).finish() + } +} type Subscribers = Arc)>>>; @@ -170,11 +176,9 @@ impl MethodCallback { pub fn execute( &self, sink: &MethodSink, - conn: ConnectionState, + conn_state: MaybeConnState<'_>, req: Request<'_>, - conn_id: ConnectionId, claimed: Option, - id_gen: &dyn IdProvider, ) -> MethodResult { let id = req.id.clone(); let params = Params::new(req.params.map(|params| params.get())); @@ -182,13 +186,13 @@ impl MethodCallback { let result = match &self.callback { MethodKind::Sync(callback) => { tracing::trace!( - "[MethodCallback::execute] Executing sync callback, params={:?}, req.id={:?}, conn_id={:?}", + "[MethodCallback::execute] Executing sync callback, params={:?}, req.id={:?}, conn_state={:?}", params, id, - conn_id + conn_state ); - let result = (callback)(id, params, sink, conn, conn_id, id_gen); + let result = (callback)(id, params, sink, conn_state); // Release claimed resources drop(claimed); @@ -199,14 +203,15 @@ impl MethodCallback { let sink = sink.clone(); let params = params.into_owned(); let id = id.into_owned(); + let conn_id = conn_state.map(|s| s.conn_id).unwrap_or(0); tracing::trace!( - "[MethodCallback::execute] Executing async callback, params={:?}, req.id={:?}, conn_id={:?}", + "[MethodCallback::execute] Executing async callback, params={:?}, req.id={:?}, conn_state={:?}", params, id, - conn_id + conn_id, ); - MethodResult::Async((callback)(id, params, sink, conn, claimed, id_gen)) + MethodResult::Async((callback)(id, params, sink, conn_id, claimed)) } }; @@ -321,17 +326,10 @@ impl Methods { } /// Attempt to execute a callback, sending the resulting JSON (success or error) to the specified sink. - pub fn execute( - &self, - sink: &MethodSink, - conn: ConnectionState, - req: Request, - conn_id: ConnectionId, - id_gen: &dyn IdProvider, - ) -> MethodResult { + pub fn execute(&self, sink: &MethodSink, conn_state: MaybeConnState, req: Request) -> MethodResult { tracing::trace!("[Methods::execute] Executing request: {:?}", req); match self.callbacks.get(&*req.method) { - Some(callback) => callback.execute(sink, conn, req, conn_id, None, id_gen), + Some(callback) => callback.execute(sink, conn_state, req, None), None => { sink.send_error(req.id, ErrorCode::MethodNotFound.into()); MethodResult::Sync(false) @@ -344,16 +342,14 @@ impl Methods { pub fn execute_with_resources<'r>( &self, sink: &MethodSink, - rx: ConnectionState, + conn_state: MaybeConnState<'r>, req: Request<'r>, - conn_id: ConnectionId, resources: &Resources, - id_gen: &dyn IdProvider, ) -> Result<(&'static str, MethodResult), Cow<'r, str>> { tracing::trace!("[Methods::execute_with_resources] Executing request: {:?}", req); match self.callbacks.get_key_value(&*req.method) { Some((&name, callback)) => match callback.claim(&req.method, resources) { - Ok(guard) => Ok((name, callback.execute(sink, rx, req, conn_id, Some(guard), id_gen))), + Ok(guard) => Ok((name, callback.execute(sink, conn_state, req, Some(guard)))), Err(err) => { tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err); sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); @@ -444,7 +440,9 @@ impl Methods { let sink = MethodSink::new(tx.clone()); let (_tx, rx_2) = async_channel::unbounded(); - if let MethodResult::Async(fut) = self.execute(&sink, Some(rx_2), req, 0, &RandomIntegerIdProvider) { + let conn_state = Some(ConnState { conn_id: 0, close: rx_2, id_provider: &RandomIntegerIdProvider }); + + if let MethodResult::Async(fut) = self.execute(&sink, conn_state, req) { fut.await; } @@ -544,7 +542,7 @@ impl RpcModule { let ctx = self.ctx.clone(); let callback = self.methods.verify_and_insert( method_name, - MethodCallback::new_sync(Arc::new(move |id, params, sink, _, _, _| match callback(params, &*ctx) { + MethodCallback::new_sync(Arc::new(move |id, params, sink, _| match callback(params, &*ctx) { Ok(res) => sink.send_response(id, res), Err(err) => sink.send_call_error(id, err), })), @@ -567,7 +565,7 @@ impl RpcModule { let ctx = self.ctx.clone(); let callback = self.methods.verify_and_insert( method_name, - MethodCallback::new_async(Arc::new(move |id, params, sink, _, claimed, _| { + MethodCallback::new_async(Arc::new(move |id, params, sink, _, claimed| { let ctx = ctx.clone(); let future = async move { let result = match callback(params, ctx).await { @@ -602,7 +600,7 @@ impl RpcModule { let ctx = self.ctx.clone(); let callback = self.methods.verify_and_insert( method_name, - MethodCallback::new_async(Arc::new(move |id, params, sink, _, claimed, _| { + MethodCallback::new_async(Arc::new(move |id, params, sink, _, claimed| { let ctx = ctx.clone(); tokio::task::spawn_blocking(move || { @@ -688,12 +686,13 @@ impl RpcModule { let subscribers = subscribers.clone(); self.methods.mut_callbacks().insert( subscribe_method_name, - MethodCallback::new_sync(Arc::new(move |id, params, method_sink, conn, conn_id, id_provider| { + MethodCallback::new_sync(Arc::new(move |id, params, method_sink, conn| { let (conn_tx, conn_rx) = oneshot::channel::<()>(); + let c = conn.expect("conn must be Some; this is bug"); let sub_id = { - let sub_id: RpcSubscriptionId = id_provider.next_id().into_owned(); - let uniq_sub = SubscriptionKey { conn_id, sub_id: sub_id.clone() }; + let sub_id: RpcSubscriptionId = c.id_provider.next_id().into_owned(); + let uniq_sub = SubscriptionKey { conn_id: c.conn_id, sub_id: sub_id.clone() }; subscribers.lock().insert(uniq_sub, (method_sink.clone(), conn_rx)); @@ -704,10 +703,10 @@ impl RpcModule { let sink = SubscriptionSink { inner: method_sink.clone(), - close: conn, + close: c.close, method: notif_method_name, subscribers: subscribers.clone(), - uniq_sub: SubscriptionKey { conn_id, sub_id }, + uniq_sub: SubscriptionKey { conn_id: c.conn_id, sub_id }, is_connected: Some(conn_tx), }; if let Err(err) = callback(params, sink, ctx.clone()) { @@ -728,7 +727,9 @@ impl RpcModule { { self.methods.mut_callbacks().insert( unsubscribe_method_name, - MethodCallback::new_sync(Arc::new(move |id, params, sink, _, conn_id, _| { + MethodCallback::new_sync(Arc::new(move |id, params, sink, conn_state| { + let conn = conn_state.expect("conn must be Some; this is bug"); + let sub_id = match params.one::() { Ok(sub_id) => sub_id, Err(_) => { @@ -745,7 +746,11 @@ impl RpcModule { }; let sub_id = sub_id.into_owned(); - if subscribers.lock().remove(&SubscriptionKey { conn_id, sub_id: sub_id.clone() }).is_some() { + if subscribers + .lock() + .remove(&SubscriptionKey { conn_id: conn.conn_id, sub_id: sub_id.clone() }) + .is_some() + { sink.send_response(id, "Unsubscribed") } else { let err = to_json_raw_value(&format!( @@ -783,7 +788,7 @@ pub struct SubscriptionSink { /// Sink. inner: MethodSink, /// Close - close: ConnectionState, + close: async_channel::Receiver<()>, /// MethodCallback. method: &'static str, /// Unique subscription. @@ -812,8 +817,6 @@ impl SubscriptionSink { S: Stream + Unpin, T: Serialize, { - let mut rx = self.close.take().expect("close must be Some; please file an issue"); - loop { tokio::select! { Some(item) = stream.next() => { @@ -821,15 +824,9 @@ impl SubscriptionSink { break; } }, - v = rx.next() => { - if let Some(_) = v { - // No messages should be sent over this channel. - () - } else { - // Channel dropped by the server. - break; - } - }, + // No messages should be sent over this channel (just ignore and continue) + Some(_) = self.close.next() => {}, + // Stream or connection was dropped => close stream. else => break, } } diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 0ff7b7a594..d699d9cde5 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -40,7 +40,6 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{Error as HyperError, Method}; use jsonrpsee_core::error::{Error, GenericTransportError}; use jsonrpsee_core::http_helpers::{self, read_body}; -use jsonrpsee_core::id_providers::NoopIdProvider; use jsonrpsee_core::middleware::Middleware; use jsonrpsee_core::server::helpers::{collect_batch_response, prepare_error, MethodSink}; use jsonrpsee_core::server::resource_limiting::Resources; @@ -457,7 +456,7 @@ async fn process_validated_request( middleware.on_call(req.method.as_ref()); // NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here. - match methods.execute_with_resources(&sink, None, req, 0, &resources, &NoopIdProvider) { + match methods.execute_with_resources(&sink, None, req, &resources) { Ok((name, MethodResult::Sync(success))) => { middleware.on_result(name, success, request_start); } @@ -483,7 +482,7 @@ async fn process_validated_request( let middleware = &middleware; join_all(batch.into_iter().filter_map(move |req| { - match methods.execute_with_resources(&sink, None, req, 0, &resources, &NoopIdProvider) { + match methods.execute_with_resources(&sink, None, req, &resources) { Ok((name, MethodResult::Sync(success))) => { middleware.on_result(name, success, request_start); None diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index f8e9651d1d..be5282475a 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -41,7 +41,7 @@ use jsonrpsee_core::id_providers::RandomIntegerIdProvider; use jsonrpsee_core::middleware::Middleware; use jsonrpsee_core::server::helpers::{collect_batch_response, prepare_error, MethodSink}; use jsonrpsee_core::server::resource_limiting::Resources; -use jsonrpsee_core::server::rpc_module::{ConnectionId, MethodResult, Methods}; +use jsonrpsee_core::server::rpc_module::{ConnState, ConnectionId, MethodResult, Methods}; use jsonrpsee_core::traits::IdProvider; use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES}; use soketto::connection::Error as SokettoError; @@ -375,14 +375,9 @@ async fn background_task( tracing::debug!("recv method call={}", req.method); tracing::trace!("recv: req={:?}", req); - match methods.execute_with_resources( - &sink, - Some(conn_rx.clone()), - req, - conn_id, - &resources, - &*id_provider, - ) { + let conn_state = Some(ConnState { conn_id, close: conn_rx.clone(), id_provider: &*id_provider }); + + match methods.execute_with_resources(&sink, conn_state, req, &resources) { Ok((name, MethodResult::Sync(success))) => { middleware.on_result(name, success, request_start); middleware.on_response(request_start); @@ -429,14 +424,10 @@ async fn background_task( tracing::trace!("recv: batch={:?}", batch); if !batch.is_empty() { join_all(batch.into_iter().filter_map(move |req| { - match methods.execute_with_resources( - &sink_batch, - Some(conn_rx2.clone()), - req, - conn_id, - resources, - &*id_provider, - ) { + let conn_state = + Some(ConnState { conn_id, close: conn_rx2.clone(), id_provider: &*id_provider }); + + match methods.execute_with_resources(&sink_batch, conn_state, req, resources) { Ok((name, MethodResult::Sync(success))) => { middleware.on_result(name, success, request_start); None From 6aa22e28551e60a878bbcfac7ddd810bbd7c6ef6 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 12 Jan 2022 17:50:06 +0100 Subject: [PATCH 04/22] Update core/src/server/rpc_module.rs --- core/src/server/rpc_module.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index c7165616c3..4438018e6e 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -62,7 +62,6 @@ pub type ConnectionId = usize; /// Raw RPC response. pub type RawRpcResponse = (String, mpsc::UnboundedReceiver, mpsc::UnboundedSender); /// Connection state for stateful protocols such as WebSocket -/// This is used to keep track whether the connection has been closed. pub type MaybeConnState<'a> = Option>; /// Data for stateful connections. From 9bdea0d77b56f10b0cdb7fe7709c224ff6885823 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 13 Jan 2022 11:43:28 +0100 Subject: [PATCH 05/22] Update tests/tests/integration_tests.rs Co-authored-by: David --- tests/tests/integration_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index e76fb19baa..45a7928c91 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -360,7 +360,7 @@ async fn ws_server_should_stop_subscription_after_client_drop() { } #[tokio::test] -async fn ws_server_cancel_stream_after_reset_conn() { +async fn ws_server_cancels_stream_after_reset_conn() { use futures::{channel::mpsc, SinkExt, StreamExt}; use jsonrpsee::{ws_server::WsServerBuilder, RpcModule}; From 79a8e55ee149a01b195ad3d4895a2c41aeaf7991 Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 13 Jan 2022 12:29:24 +0100 Subject: [PATCH 06/22] address grumbles --- core/src/server/rpc_module.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 4438018e6e..80796d7e32 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -51,7 +51,7 @@ use serde::{de::DeserializeOwned, Serialize}; /// implemented as a function pointer to a `Fn` function taking four arguments: /// the `id`, `params`, a channel the function uses to communicate the result (or error) /// back to `jsonrpsee`, and the connection ID (useful for the websocket transport). -pub type SyncMethod = Arc bool>; +pub type SyncMethod = Arc) -> bool>; /// Similar to [`SyncMethod`], but represents an asynchronous handler and takes an additional argument containing a [`ResourceGuard`] if configured. pub type AsyncMethod<'a> = Arc< dyn Send + Sync + Fn(Id<'a>, Params<'a>, MethodSink, ConnectionId, Option) -> BoxFuture<'a, bool>, @@ -61,8 +61,6 @@ pub type AsyncMethod<'a> = Arc< pub type ConnectionId = usize; /// Raw RPC response. pub type RawRpcResponse = (String, mpsc::UnboundedReceiver, mpsc::UnboundedSender); -/// Connection state for stateful protocols such as WebSocket -pub type MaybeConnState<'a> = Option>; /// Data for stateful connections. pub struct ConnState<'a> { @@ -76,7 +74,7 @@ pub struct ConnState<'a> { impl<'a> std::fmt::Debug for ConnState<'a> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Server").field("conn_id", &self.conn_id).field("close", &self.close).finish() + f.debug_struct("ConnState").field("conn_id", &self.conn_id).field("close", &self.close).finish() } } @@ -175,7 +173,7 @@ impl MethodCallback { pub fn execute( &self, sink: &MethodSink, - conn_state: MaybeConnState<'_>, + conn_state: Option, req: Request<'_>, claimed: Option, ) -> MethodResult { @@ -325,7 +323,7 @@ impl Methods { } /// Attempt to execute a callback, sending the resulting JSON (success or error) to the specified sink. - pub fn execute(&self, sink: &MethodSink, conn_state: MaybeConnState, req: Request) -> MethodResult { + pub fn execute(&self, sink: &MethodSink, conn_state: Option, req: Request) -> MethodResult { tracing::trace!("[Methods::execute] Executing request: {:?}", req); match self.callbacks.get(&*req.method) { Some(callback) => callback.execute(sink, conn_state, req, None), @@ -341,7 +339,7 @@ impl Methods { pub fn execute_with_resources<'r>( &self, sink: &MethodSink, - conn_state: MaybeConnState<'r>, + conn_state: Option>, req: Request<'r>, resources: &Resources, ) -> Result<(&'static str, MethodResult), Cow<'r, str>> { @@ -727,7 +725,7 @@ impl RpcModule { self.methods.mut_callbacks().insert( unsubscribe_method_name, MethodCallback::new_sync(Arc::new(move |id, params, sink, conn_state| { - let conn = conn_state.expect("conn must be Some; this is bug"); + let c = conn_state.expect("conn must be Some; this is bug"); let sub_id = match params.one::() { Ok(sub_id) => sub_id, @@ -747,7 +745,7 @@ impl RpcModule { if subscribers .lock() - .remove(&SubscriptionKey { conn_id: conn.conn_id, sub_id: sub_id.clone() }) + .remove(&SubscriptionKey { conn_id: c.conn_id, sub_id: sub_id.clone() }) .is_some() { sink.send_response(id, "Unsubscribed") From 7e81acfb6bc76bcde23d983402da61191683f4ca Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 13 Jan 2022 13:17:13 +0100 Subject: [PATCH 07/22] fix subscription tests --- core/src/server/rpc_module.rs | 16 ++++++++-------- tests/tests/rpc_module.rs | 11 ++++++----- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 80796d7e32..6f988fc138 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -60,7 +60,7 @@ pub type AsyncMethod<'a> = Arc< /// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value. pub type ConnectionId = usize; /// Raw RPC response. -pub type RawRpcResponse = (String, mpsc::UnboundedReceiver, mpsc::UnboundedSender); +pub type RawRpcResponse = (String, mpsc::UnboundedReceiver, async_channel::Sender<()>); /// Data for stateful connections. pub struct ConnState<'a> { @@ -434,17 +434,17 @@ impl Methods { /// Wrapper over [`Methods::execute`] to execute a callback. async fn inner_call(&self, req: Request<'_>) -> RawRpcResponse { let (tx, mut rx) = mpsc::unbounded(); - let sink = MethodSink::new(tx.clone()); - let (_tx, rx_2) = async_channel::unbounded(); + let sink = MethodSink::new(tx); + let (close_tx, close_rx) = async_channel::unbounded(); - let conn_state = Some(ConnState { conn_id: 0, close: rx_2, id_provider: &RandomIntegerIdProvider }); + let conn_state = Some(ConnState { conn_id: 0, close: close_rx, id_provider: &RandomIntegerIdProvider }); if let MethodResult::Async(fut) = self.execute(&sink, conn_state, req) { fut.await; } let resp = rx.next().await.expect("tx and rx still alive; qed"); - (resp, rx, tx) + (resp, rx, close_tx) } /// Helper to create a subscription on the `RPC module` without having to spin up a server. @@ -831,7 +831,7 @@ impl SubscriptionSink { /// Returns whether this channel is closed without needing a context. pub fn is_closed(&self) -> bool { - self.inner.is_closed() + self.inner.is_closed() || self.close.is_closed() } fn build_message(&self, result: &T) -> Result { @@ -893,7 +893,7 @@ impl Drop for SubscriptionSink { /// Wrapper struct that maintains a subscription "mainly" for testing. #[derive(Debug)] pub struct Subscription { - tx: mpsc::UnboundedSender, + tx: async_channel::Sender<()>, rx: mpsc::UnboundedReceiver, sub_id: RpcSubscriptionId<'static>, } @@ -901,7 +901,7 @@ pub struct Subscription { impl Subscription { /// Close the subscription channel. pub fn close(&mut self) { - self.tx.close_channel(); + self.tx.close(); } /// Get the subscription ID diff --git a/tests/tests/rpc_module.rs b/tests/tests/rpc_module.rs index 0308605fa4..e7e5396eba 100644 --- a/tests/tests/rpc_module.rs +++ b/tests/tests/rpc_module.rs @@ -215,11 +215,12 @@ async fn close_test_subscribing_without_server() { let mut module = RpcModule::new(()); module .register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| { - std::thread::spawn(move || loop { - if let Err(Error::SubscriptionClosed(_)) = sink.send(&"lo") { - return; + std::thread::spawn(move || { + // make sure to only send one item + sink.send(&"lo").unwrap(); + while !sink.is_closed() { + std::thread::sleep(std::time::Duration::from_millis(500)); } - std::thread::sleep(std::time::Duration::from_millis(500)); }); Ok(()) }) @@ -232,5 +233,5 @@ async fn close_test_subscribing_without_server() { // close the subscription to ensure it doesn't return any items. my_sub.close(); - assert!(matches!(my_sub.next::().await, None)); + assert!(matches!(my_sub.next::().await, Some(Err(Error::SubscriptionClosed(_))))); } From 69825988d4857b39480a22d0a525ea80a1e12378 Mon Sep 17 00:00:00 2001 From: Niklas Date: Mon, 17 Jan 2022 15:11:29 +0100 Subject: [PATCH 08/22] new type for `SubscriptionCallback` and glue code --- core/src/server/rpc_module.rs | 70 +++++++++---- http-server/src/server.rs | 122 +++++++++++++++++----- ws-server/src/server.rs | 184 +++++++++++++++++++++++++++------- 3 files changed, 295 insertions(+), 81 deletions(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 6f988fc138..c430a3911a 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -36,7 +36,6 @@ use crate::server::helpers::MethodSink; use crate::server::resource_limiting::{ResourceGuard, ResourceTable, ResourceVec, Resources}; use crate::to_json_raw_value; use crate::traits::{IdProvider, ToRpcParams}; -use beef::Cow; use futures_channel::{mpsc, oneshot}; use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt}; use jsonrpsee_types::error::{invalid_subscription_err, ErrorCode, CALL_EXECUTION_FAILED_CODE}; @@ -51,11 +50,14 @@ use serde::{de::DeserializeOwned, Serialize}; /// implemented as a function pointer to a `Fn` function taking four arguments: /// the `id`, `params`, a channel the function uses to communicate the result (or error) /// back to `jsonrpsee`, and the connection ID (useful for the websocket transport). -pub type SyncMethod = Arc) -> bool>; +pub type SyncMethod = Arc bool>; /// Similar to [`SyncMethod`], but represents an asynchronous handler and takes an additional argument containing a [`ResourceGuard`] if configured. pub type AsyncMethod<'a> = Arc< dyn Send + Sync + Fn(Id<'a>, Params<'a>, MethodSink, ConnectionId, Option) -> BoxFuture<'a, bool>, >; +/// Method callback for subscriptions. +pub type SubscriptionMethod = Arc bool>; + /// Connection ID, used for stateful protocol such as WebSockets. /// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value. pub type ConnectionId = usize; @@ -89,11 +91,13 @@ struct SubscriptionKey { /// Callback wrapper that can be either sync or async. #[derive(Clone)] -enum MethodKind { +pub enum MethodKind { /// Synchronous method handler. Sync(SyncMethod), /// Asynchronous method handler. Async(AsyncMethod<'static>), + /// Subscription handler, + Subscription(SubscriptionMethod), } /// Information about resources the method uses during its execution. Initialized when the the server starts. @@ -109,7 +113,8 @@ enum MethodResources { /// plus a table with resources it needs to claim to run #[derive(Clone, Debug)] pub struct MethodCallback { - callback: MethodKind, + /// TODO: Deref impl... + pub callback: MethodKind, resources: MethodResources, } @@ -160,6 +165,13 @@ impl MethodCallback { MethodCallback { callback: MethodKind::Async(callback), resources: MethodResources::Uninitialized([].into()) } } + fn new_subscription(callback: SubscriptionMethod) -> Self { + MethodCallback { + callback: MethodKind::Subscription(callback), + resources: MethodResources::Uninitialized([].into()), + } + } + /// Attempt to claim resources prior to executing a method. On success returns a guard that releases /// claimed resources when dropped. pub fn claim(&self, name: &str, resources: &Resources) -> Result { @@ -169,6 +181,7 @@ impl MethodCallback { } } + /* /// Execute the callback, sending the resulting JSON (success or error) to the specified sink. pub fn execute( &self, @@ -213,7 +226,7 @@ impl MethodCallback { }; result - } + }*/ } impl Debug for MethodKind { @@ -221,6 +234,7 @@ impl Debug for MethodKind { match self { Self::Async(_) => write!(f, "Async"), Self::Sync(_) => write!(f, "Sync"), + Self::Subscription(_) => write!(f, "Subscription"), } } } @@ -322,6 +336,13 @@ impl Methods { self.callbacks.get_key_value(method_name).map(|(k, v)| (*k, v)) } + /// Returns callback handle for a method name + /// + pub fn as_callback(&self, method: &str) -> Option<&MethodCallback> { + self.callbacks.get(method) + } + + /* /// Attempt to execute a callback, sending the resulting JSON (success or error) to the specified sink. pub fn execute(&self, sink: &MethodSink, conn_state: Option, req: Request) -> MethodResult { tracing::trace!("[Methods::execute] Executing request: {:?}", req); @@ -332,8 +353,9 @@ impl Methods { MethodResult::Sync(false) } } - } + }*/ + /* /// Attempt to execute a callback while checking that the call does not exhaust the available resources, // sending the resulting JSON (success or error) to the specified sink. pub fn execute_with_resources<'r>( @@ -358,7 +380,7 @@ impl Methods { Err(req.method) } } - } + }*/ /// Helper to call a method on the `RPC module` without having to spin up a server. /// @@ -437,11 +459,18 @@ impl Methods { let sink = MethodSink::new(tx); let (close_tx, close_rx) = async_channel::unbounded(); - let conn_state = Some(ConnState { conn_id: 0, close: close_rx, id_provider: &RandomIntegerIdProvider }); + let id = req.id.clone(); + let params = Params::new(req.params.map(|params| params.get())); - if let MethodResult::Async(fut) = self.execute(&sink, conn_state, req) { - fut.await; - } + let _result = match self.as_callback(&req.method).map(|c| &c.callback) { + None => todo!(), + Some(MethodKind::Sync(cb)) => (cb)(id, params, &sink), + Some(MethodKind::Async(cb)) => (cb)(id.into_owned(), params.into_owned(), sink, 0, None).await, + Some(MethodKind::Subscription(cb)) => { + let conn_state = ConnState { conn_id: 0, close: close_rx, id_provider: &RandomIntegerIdProvider }; + (cb)(id, params, &sink, conn_state) + } + }; let resp = rx.next().await.expect("tx and rx still alive; qed"); (resp, rx, close_tx) @@ -539,7 +568,7 @@ impl RpcModule { let ctx = self.ctx.clone(); let callback = self.methods.verify_and_insert( method_name, - MethodCallback::new_sync(Arc::new(move |id, params, sink, _| match callback(params, &*ctx) { + MethodCallback::new_sync(Arc::new(move |id, params, sink| match callback(params, &*ctx) { Ok(res) => sink.send_response(id, res), Err(err) => sink.send_call_error(id, err), })), @@ -683,13 +712,12 @@ impl RpcModule { let subscribers = subscribers.clone(); self.methods.mut_callbacks().insert( subscribe_method_name, - MethodCallback::new_sync(Arc::new(move |id, params, method_sink, conn| { + MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn| { let (conn_tx, conn_rx) = oneshot::channel::<()>(); - let c = conn.expect("conn must be Some; this is bug"); let sub_id = { - let sub_id: RpcSubscriptionId = c.id_provider.next_id().into_owned(); - let uniq_sub = SubscriptionKey { conn_id: c.conn_id, sub_id: sub_id.clone() }; + let sub_id: RpcSubscriptionId = conn.id_provider.next_id().into_owned(); + let uniq_sub = SubscriptionKey { conn_id: conn.conn_id, sub_id: sub_id.clone() }; subscribers.lock().insert(uniq_sub, (method_sink.clone(), conn_rx)); @@ -700,10 +728,10 @@ impl RpcModule { let sink = SubscriptionSink { inner: method_sink.clone(), - close: c.close, + close: conn.close, method: notif_method_name, subscribers: subscribers.clone(), - uniq_sub: SubscriptionKey { conn_id: c.conn_id, sub_id }, + uniq_sub: SubscriptionKey { conn_id: conn.conn_id, sub_id }, is_connected: Some(conn_tx), }; if let Err(err) = callback(params, sink, ctx.clone()) { @@ -724,9 +752,7 @@ impl RpcModule { { self.methods.mut_callbacks().insert( unsubscribe_method_name, - MethodCallback::new_sync(Arc::new(move |id, params, sink, conn_state| { - let c = conn_state.expect("conn must be Some; this is bug"); - + MethodCallback::new_subscription(Arc::new(move |id, params, sink, conn| { let sub_id = match params.one::() { Ok(sub_id) => sub_id, Err(_) => { @@ -745,7 +771,7 @@ impl RpcModule { if subscribers .lock() - .remove(&SubscriptionKey { conn_id: c.conn_id, sub_id: sub_id.clone() }) + .remove(&SubscriptionKey { conn_id: conn.conn_id, sub_id: sub_id.clone() }) .is_some() { sink.send_response(id, "Unsubscribed") diff --git a/http-server/src/server.rs b/http-server/src/server.rs index d699d9cde5..2d2c3c9bb4 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -43,10 +43,10 @@ use jsonrpsee_core::http_helpers::{self, read_body}; use jsonrpsee_core::middleware::Middleware; use jsonrpsee_core::server::helpers::{collect_batch_response, prepare_error, MethodSink}; use jsonrpsee_core::server::resource_limiting::Resources; -use jsonrpsee_core::server::rpc_module::{MethodResult, Methods}; +use jsonrpsee_core::server::rpc_module::{MethodKind, Methods}; use jsonrpsee_core::TEN_MB_SIZE_BYTES; use jsonrpsee_types::error::ErrorCode; -use jsonrpsee_types::{Id, Notification, Request}; +use jsonrpsee_types::{Id, Notification, Params, Request}; use serde_json::value::RawValue; use socket2::{Domain, Socket, Type}; @@ -453,22 +453,52 @@ async fn process_validated_request( // Single request or notification if is_single { if let Ok(req) = serde_json::from_slice::(&body) { + let method = req.method.as_ref(); middleware.on_call(req.method.as_ref()); - // NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here. - match methods.execute_with_resources(&sink, None, req, &resources) { - Ok((name, MethodResult::Sync(success))) => { - middleware.on_result(name, success, request_start); - } - Ok((name, MethodResult::Async(fut))) => { - let success = fut.await; + let id = req.id.clone(); + let params = Params::new(req.params.map(|params| params.get())); - middleware.on_result(name, success, request_start); - } - Err(name) => { - middleware.on_result(name.as_ref(), false, request_start); + let result = match methods.as_callback(method) { + None => { + sink.send_error(req.id, ErrorCode::MethodNotFound.into()); + false } - } + Some(method) => match &method.callback { + MethodKind::Sync(callback) => match method.claim(&req.method, &resources) { + Ok(guard) => { + let result = (callback)(id, params, &sink); + drop(guard); + result + } + Err(err) => { + tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err); + sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); + false + } + }, + MethodKind::Async(callback) => match method.claim(&req.method, &resources) { + Ok(guard) => { + let result = + (callback)(id.into_owned(), params.into_owned(), sink.clone(), 0, Some(guard)).await; + result + } + Err(err) => { + tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err); + sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); + false + } + }, + MethodKind::Subscription(_) => { + tracing::error!("Subscriptions not supported on HTTP"); + sink.send_error(req.id, ErrorCode::InternalError.into()); + false + } + }, + }; + middleware.on_result(&req.method, result, request_start); + // TODO: shouldn't `on_response` be called here?! + //middleware.on_response(request_start); } else if let Ok(_req) = serde_json::from_slice::(&body) { return Ok::<_, HyperError>(response::ok_response("".into())); } else { @@ -482,19 +512,61 @@ async fn process_validated_request( let middleware = &middleware; join_all(batch.into_iter().filter_map(move |req| { - match methods.execute_with_resources(&sink, None, req, &resources) { - Ok((name, MethodResult::Sync(success))) => { - middleware.on_result(name, success, request_start); - None - } - Ok((name, MethodResult::Async(fut))) => Some(async move { - let success = fut.await; - middleware.on_result(name, success, request_start); - }), - Err(name) => { - middleware.on_result(name.as_ref(), false, request_start); + let id = req.id.clone(); + let params = Params::new(req.params.map(|params| params.get())); + + match methods.as_callback(&req.method) { + None => { + sink.send_error(req.id, ErrorCode::MethodNotFound.into()); None } + Some(method) => match &method.callback { + MethodKind::Sync(callback) => match method.claim(&req.method, &resources) { + Ok(guard) => { + let result = (callback)(id, params, &sink); + middleware.on_result(&req.method, result, request_start); + drop(guard); + None + } + Err(err) => { + tracing::error!( + "[Methods::execute_with_resources] failed to lock resources: {:?}", + err + ); + sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); + middleware.on_result(&req.method, false, request_start); + None + } + }, + MethodKind::Async(callback) => match method.claim(&req.method, &resources) { + Ok(guard) => { + let sink = sink.clone(); + let id = id.into_owned(); + let params = params.into_owned(); + let callback = callback.clone(); + + Some(async move { + let result = (callback)(id, params, sink, 0, Some(guard)).await; + middleware.on_result(&req.method, result, request_start); + }) + } + Err(err) => { + tracing::error!( + "[Methods::execute_with_resources] failed to lock resources: {:?}", + err + ); + sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); + middleware.on_result(&req.method, false, request_start); + None + } + }, + MethodKind::Subscription(_) => { + tracing::error!("Subscriptions not supported on HTTP"); + sink.send_error(req.id, ErrorCode::InternalError.into()); + middleware.on_result(&req.method, false, request_start); + None + } + }, } })) .await; diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index be5282475a..6e2db63d1e 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -41,9 +41,10 @@ use jsonrpsee_core::id_providers::RandomIntegerIdProvider; use jsonrpsee_core::middleware::Middleware; use jsonrpsee_core::server::helpers::{collect_batch_response, prepare_error, MethodSink}; use jsonrpsee_core::server::resource_limiting::Resources; -use jsonrpsee_core::server::rpc_module::{ConnState, ConnectionId, MethodResult, Methods}; +use jsonrpsee_core::server::rpc_module::{ConnState, ConnectionId, MethodKind, Methods}; use jsonrpsee_core::traits::IdProvider; use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES}; +use jsonrpsee_types::Params; use soketto::connection::Error as SokettoError; use soketto::handshake::{server::Response, Server as SokettoServer}; use soketto::Sender; @@ -370,33 +371,88 @@ async fn background_task( match data.get(0) { Some(b'{') => { if let Ok(req) = serde_json::from_slice::(&data) { - middleware.on_call(req.method.as_ref()); - tracing::debug!("recv method call={}", req.method); tracing::trace!("recv: req={:?}", req); - let conn_state = Some(ConnState { conn_id, close: conn_rx.clone(), id_provider: &*id_provider }); - - match methods.execute_with_resources(&sink, conn_state, req, &resources) { - Ok((name, MethodResult::Sync(success))) => { - middleware.on_result(name, success, request_start); - middleware.on_response(request_start); - } - Ok((name, MethodResult::Async(fut))) => { - let request_start = request_start; + let name = &req.method; + let id = req.id.clone(); + let params = Params::new(req.params.map(|params| params.get())); - let fut = async move { - let success = fut.await; - middleware.on_result(name, success, request_start); - middleware.on_response(request_start); - }; + middleware.on_call(name); - method_executors.add(fut.boxed()); - } - Err(name) => { - middleware.on_result(name.as_ref(), false, request_start); + match methods.as_callback(&name) { + None => { + sink.send_error(req.id, ErrorCode::MethodNotFound.into()); middleware.on_response(request_start); } + Some(method) => { + match &method.callback { + MethodKind::Sync(callback) => match method.claim(name, &resources) { + Ok(guard) => { + let result = (callback)(id, params, &sink); + + middleware.on_result(&name, result, request_start); + middleware.on_response(request_start); + drop(guard); + } + Err(err) => { + tracing::error!( + "[Methods::execute_with_resources] failed to lock resources: {:?}", + err + ); + sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); + middleware.on_result(&name, false, request_start); + middleware.on_response(request_start); + } + }, + MethodKind::Async(callback) => match method.claim(&req.method, &resources) { + Ok(guard) => { + let sink = sink.clone(); + let id = id.into_owned(); + let params = params.into_owned(); + // TODO: return name from the callback handle to avoid `to_string`. + let name = name.to_string(); + + let fut = async move { + let result = (callback)(id, params, sink, conn_id, Some(guard)).await; + middleware.on_result(&name, result, request_start); + middleware.on_response(request_start); + }; + + method_executors.add(fut.boxed()); + } + Err(err) => { + tracing::error!( + "[Methods::execute_with_resources] failed to lock resources: {:?}", + err + ); + sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); + middleware.on_result(&name, false, request_start); + middleware.on_response(request_start); + } + }, + MethodKind::Subscription(callback) => match method.claim(&req.method, &resources) { + Ok(guard) => { + let conn_state = + ConnState { conn_id, close: conn_rx.clone(), id_provider: &*id_provider }; + + let result = callback(id, params, &sink, conn_state); + middleware.on_result(&name, result, request_start); + middleware.on_response(request_start); + drop(guard); + } + Err(err) => { + tracing::error!( + "[Methods::execute_with_resources] failed to lock resources: {:?}", + err + ); + sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); + middleware.on_result(&name, false, request_start); + middleware.on_response(request_start); + } + }, + } + } } } else { let (id, code) = prepare_error(&data); @@ -424,22 +480,82 @@ async fn background_task( tracing::trace!("recv: batch={:?}", batch); if !batch.is_empty() { join_all(batch.into_iter().filter_map(move |req| { - let conn_state = - Some(ConnState { conn_id, close: conn_rx2.clone(), id_provider: &*id_provider }); + let id = req.id.clone(); + let params = Params::new(req.params.map(|params| params.get())); + let name = &req.method; - match methods.execute_with_resources(&sink_batch, conn_state, req, resources) { - Ok((name, MethodResult::Sync(success))) => { - middleware.on_result(name, success, request_start); - None - } - Ok((name, MethodResult::Async(fut))) => Some(async move { - let success = fut.await; - middleware.on_result(name, success, request_start); - }), - Err(name) => { - middleware.on_result(name.as_ref(), false, request_start); + match methods.as_callback(name) { + None => { + sink_batch.send_error(req.id, ErrorCode::MethodNotFound.into()); None } + Some(method) => match &method.callback { + MethodKind::Sync(callback) => match method.claim(name, &resources) { + Ok(guard) => { + let result = (callback)(id, params, &sink_batch); + middleware.on_result(&req.method, result, request_start); + drop(guard); + None + } + Err(err) => { + tracing::error!( + "[Methods::execute_with_resources] failed to lock resources: {:?}", + err + ); + sink_batch.send_error(req.id, ErrorCode::ServerIsBusy.into()); + middleware.on_result(&req.method, false, request_start); + None + } + }, + MethodKind::Async(callback) => match method.claim(&req.method, &resources) { + Ok(guard) => { + let sink_batch = sink_batch.clone(); + let id = id.into_owned(); + let params = params.into_owned(); + + Some(async move { + let result = + (callback)(id, params, sink_batch, conn_id, Some(guard)).await; + middleware.on_result(&req.method, result, request_start); + }) + } + Err(err) => { + tracing::error!( + "[Methods::execute_with_resources] failed to lock resources: {:?}", + err + ); + sink_batch.send_error(req.id, ErrorCode::ServerIsBusy.into()); + middleware.on_result(&req.method, false, request_start); + None + } + }, + MethodKind::Subscription(callback) => { + match method.claim(&req.method, &resources) { + Ok(guard) => { + let conn_state = ConnState { + conn_id, + close: conn_rx2.clone(), + id_provider: &*id_provider, + }; + + let result = callback(id, params, &sink_batch, conn_state); + middleware.on_result(&req.method, result, request_start); + drop(guard); + None + } + Err(err) => { + tracing::error!( + "[Methods::execute_with_resources] failed to lock resources: {:?}", + err + ); + + sink_batch.send_error(req.id, ErrorCode::ServerIsBusy.into()); + middleware.on_result(&req.method, false, request_start); + None + } + } + } + }, } })) .await; From d589d240d5510b1b82feb67c0e13ee0f1ce191f3 Mon Sep 17 00:00:00 2001 From: Niklas Date: Mon, 17 Jan 2022 17:50:40 +0100 Subject: [PATCH 09/22] remove unsed code --- core/src/server/rpc_module.rs | 103 ++---------------------- http-server/src/server.rs | 28 +++---- ws-server/src/server.rs | 145 +++++++++++++++++----------------- 3 files changed, 92 insertions(+), 184 deletions(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index c430a3911a..54634395a2 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -96,7 +96,7 @@ pub enum MethodKind { Sync(SyncMethod), /// Asynchronous method handler. Async(AsyncMethod<'static>), - /// Subscription handler, + /// Subscription method handler Subscription(SubscriptionMethod), } @@ -113,8 +113,7 @@ enum MethodResources { /// plus a table with resources it needs to claim to run #[derive(Clone, Debug)] pub struct MethodCallback { - /// TODO: Deref impl... - pub callback: MethodKind, + callback: MethodKind, resources: MethodResources, } @@ -181,52 +180,10 @@ impl MethodCallback { } } - /* - /// Execute the callback, sending the resulting JSON (success or error) to the specified sink. - pub fn execute( - &self, - sink: &MethodSink, - conn_state: Option, - req: Request<'_>, - claimed: Option, - ) -> MethodResult { - let id = req.id.clone(); - let params = Params::new(req.params.map(|params| params.get())); - - let result = match &self.callback { - MethodKind::Sync(callback) => { - tracing::trace!( - "[MethodCallback::execute] Executing sync callback, params={:?}, req.id={:?}, conn_state={:?}", - params, - id, - conn_state - ); - - let result = (callback)(id, params, sink, conn_state); - - // Release claimed resources - drop(claimed); - - MethodResult::Sync(result) - } - MethodKind::Async(callback) => { - let sink = sink.clone(); - let params = params.into_owned(); - let id = id.into_owned(); - let conn_id = conn_state.map(|s| s.conn_id).unwrap_or(0); - tracing::trace!( - "[MethodCallback::execute] Executing async callback, params={:?}, req.id={:?}, conn_state={:?}", - params, - id, - conn_id, - ); - - MethodResult::Async((callback)(id, params, sink, conn_id, claimed)) - } - }; - - result - }*/ + /// Get handle to the callback. + pub fn inner(&self) -> &MethodKind { + &self.callback + } } impl Debug for MethodKind { @@ -336,52 +293,6 @@ impl Methods { self.callbacks.get_key_value(method_name).map(|(k, v)| (*k, v)) } - /// Returns callback handle for a method name - /// - pub fn as_callback(&self, method: &str) -> Option<&MethodCallback> { - self.callbacks.get(method) - } - - /* - /// Attempt to execute a callback, sending the resulting JSON (success or error) to the specified sink. - pub fn execute(&self, sink: &MethodSink, conn_state: Option, req: Request) -> MethodResult { - tracing::trace!("[Methods::execute] Executing request: {:?}", req); - match self.callbacks.get(&*req.method) { - Some(callback) => callback.execute(sink, conn_state, req, None), - None => { - sink.send_error(req.id, ErrorCode::MethodNotFound.into()); - MethodResult::Sync(false) - } - } - }*/ - - /* - /// Attempt to execute a callback while checking that the call does not exhaust the available resources, - // sending the resulting JSON (success or error) to the specified sink. - pub fn execute_with_resources<'r>( - &self, - sink: &MethodSink, - conn_state: Option>, - req: Request<'r>, - resources: &Resources, - ) -> Result<(&'static str, MethodResult), Cow<'r, str>> { - tracing::trace!("[Methods::execute_with_resources] Executing request: {:?}", req); - match self.callbacks.get_key_value(&*req.method) { - Some((&name, callback)) => match callback.claim(&req.method, resources) { - Ok(guard) => Ok((name, callback.execute(sink, conn_state, req, Some(guard)))), - Err(err) => { - tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err); - sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); - Ok((name, MethodResult::Sync(false))) - } - }, - None => { - sink.send_error(req.id, ErrorCode::MethodNotFound.into()); - Err(req.method) - } - } - }*/ - /// Helper to call a method on the `RPC module` without having to spin up a server. /// /// The params must be serializable as JSON array, see [`ToRpcParams`] for further documentation. @@ -462,7 +373,7 @@ impl Methods { let id = req.id.clone(); let params = Params::new(req.params.map(|params| params.get())); - let _result = match self.as_callback(&req.method).map(|c| &c.callback) { + let _result = match self.method(&req.method).map(|c| &c.callback) { None => todo!(), Some(MethodKind::Sync(cb)) => (cb)(id, params, &sink), Some(MethodKind::Async(cb)) => (cb)(id.into_owned(), params.into_owned(), sink, 0, None).await, diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 2d2c3c9bb4..b25cb08b2c 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -454,18 +454,18 @@ async fn process_validated_request( if is_single { if let Ok(req) = serde_json::from_slice::(&body) { let method = req.method.as_ref(); - middleware.on_call(req.method.as_ref()); + middleware.on_call(method); let id = req.id.clone(); let params = Params::new(req.params.map(|params| params.get())); - let result = match methods.as_callback(method) { + let result = match methods.method_with_name(method) { None => { sink.send_error(req.id, ErrorCode::MethodNotFound.into()); false } - Some(method) => match &method.callback { - MethodKind::Sync(callback) => match method.claim(&req.method, &resources) { + Some((name, method_callback)) => match method_callback.inner() { + MethodKind::Sync(callback) => match method_callback.claim(&req.method, &resources) { Ok(guard) => { let result = (callback)(id, params, &sink); drop(guard); @@ -477,7 +477,7 @@ async fn process_validated_request( false } }, - MethodKind::Async(callback) => match method.claim(&req.method, &resources) { + MethodKind::Async(callback) => match method_callback.claim(name, &resources) { Ok(guard) => { let result = (callback)(id.into_owned(), params.into_owned(), sink.clone(), 0, Some(guard)).await; @@ -498,7 +498,7 @@ async fn process_validated_request( }; middleware.on_result(&req.method, result, request_start); // TODO: shouldn't `on_response` be called here?! - //middleware.on_response(request_start); + // middleware.on_response(request_start); } else if let Ok(_req) = serde_json::from_slice::(&body) { return Ok::<_, HyperError>(response::ok_response("".into())); } else { @@ -515,16 +515,16 @@ async fn process_validated_request( let id = req.id.clone(); let params = Params::new(req.params.map(|params| params.get())); - match methods.as_callback(&req.method) { + match methods.method_with_name(&req.method) { None => { sink.send_error(req.id, ErrorCode::MethodNotFound.into()); None } - Some(method) => match &method.callback { - MethodKind::Sync(callback) => match method.claim(&req.method, &resources) { + Some((name, method_callback)) => match method_callback.inner() { + MethodKind::Sync(callback) => match method_callback.claim(name, &resources) { Ok(guard) => { let result = (callback)(id, params, &sink); - middleware.on_result(&req.method, result, request_start); + middleware.on_result(name, result, request_start); drop(guard); None } @@ -534,11 +534,11 @@ async fn process_validated_request( err ); sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); - middleware.on_result(&req.method, false, request_start); + middleware.on_result(name, false, request_start); None } }, - MethodKind::Async(callback) => match method.claim(&req.method, &resources) { + MethodKind::Async(callback) => match method_callback.claim(name, &resources) { Ok(guard) => { let sink = sink.clone(); let id = id.into_owned(); @@ -547,7 +547,7 @@ async fn process_validated_request( Some(async move { let result = (callback)(id, params, sink, 0, Some(guard)).await; - middleware.on_result(&req.method, result, request_start); + middleware.on_result(name, result, request_start); }) } Err(err) => { @@ -556,7 +556,7 @@ async fn process_validated_request( err ); sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); - middleware.on_result(&req.method, false, request_start); + middleware.on_result(name, false, request_start); None } }, diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 6e2db63d1e..f81d418cee 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -374,85 +374,80 @@ async fn background_task( tracing::debug!("recv method call={}", req.method); tracing::trace!("recv: req={:?}", req); - let name = &req.method; let id = req.id.clone(); let params = Params::new(req.params.map(|params| params.get())); - middleware.on_call(name); + middleware.on_call(&req.method); - match methods.as_callback(&name) { + match methods.method_with_name(&req.method) { None => { sink.send_error(req.id, ErrorCode::MethodNotFound.into()); middleware.on_response(request_start); } - Some(method) => { - match &method.callback { - MethodKind::Sync(callback) => match method.claim(name, &resources) { - Ok(guard) => { - let result = (callback)(id, params, &sink); - + Some((name, method)) => match &method.inner() { + MethodKind::Sync(callback) => match method.claim(name, &resources) { + Ok(guard) => { + let result = (callback)(id, params, &sink); + + middleware.on_result(name, result, request_start); + middleware.on_response(request_start); + drop(guard); + } + Err(err) => { + tracing::error!( + "[Methods::execute_with_resources] failed to lock resources: {:?}", + err + ); + sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); + middleware.on_result(name, false, request_start); + middleware.on_response(request_start); + } + }, + MethodKind::Async(callback) => match method.claim(name, &resources) { + Ok(guard) => { + let sink = sink.clone(); + let id = id.into_owned(); + let params = params.into_owned(); + + let fut = async move { + let result = (callback)(id, params, sink, conn_id, Some(guard)).await; middleware.on_result(&name, result, request_start); middleware.on_response(request_start); - drop(guard); - } - Err(err) => { - tracing::error!( - "[Methods::execute_with_resources] failed to lock resources: {:?}", - err - ); - sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); - middleware.on_result(&name, false, request_start); - middleware.on_response(request_start); - } - }, - MethodKind::Async(callback) => match method.claim(&req.method, &resources) { - Ok(guard) => { - let sink = sink.clone(); - let id = id.into_owned(); - let params = params.into_owned(); - // TODO: return name from the callback handle to avoid `to_string`. - let name = name.to_string(); - - let fut = async move { - let result = (callback)(id, params, sink, conn_id, Some(guard)).await; - middleware.on_result(&name, result, request_start); - middleware.on_response(request_start); - }; - - method_executors.add(fut.boxed()); - } - Err(err) => { - tracing::error!( - "[Methods::execute_with_resources] failed to lock resources: {:?}", - err - ); - sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); - middleware.on_result(&name, false, request_start); - middleware.on_response(request_start); - } - }, - MethodKind::Subscription(callback) => match method.claim(&req.method, &resources) { - Ok(guard) => { - let conn_state = - ConnState { conn_id, close: conn_rx.clone(), id_provider: &*id_provider }; + }; - let result = callback(id, params, &sink, conn_state); - middleware.on_result(&name, result, request_start); - middleware.on_response(request_start); - drop(guard); - } - Err(err) => { - tracing::error!( - "[Methods::execute_with_resources] failed to lock resources: {:?}", - err - ); - sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); - middleware.on_result(&name, false, request_start); - middleware.on_response(request_start); - } - }, - } - } + method_executors.add(fut.boxed()); + } + Err(err) => { + tracing::error!( + "[Methods::execute_with_resources] failed to lock resources: {:?}", + err + ); + sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); + middleware.on_result(name, false, request_start); + middleware.on_response(request_start); + } + }, + MethodKind::Subscription(callback) => match method.claim(&req.method, &resources) { + Ok(guard) => { + let conn_state = + ConnState { conn_id, close: conn_rx.clone(), id_provider: &*id_provider }; + + let result = callback(id, params, &sink, conn_state); + middleware.on_result(name, result, request_start); + middleware.on_response(request_start); + drop(guard); + } + Err(err) => { + tracing::error!( + "[Methods::execute_with_resources] failed to lock resources: {:?}", + err + ); + sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); + middleware.on_result(name, false, request_start); + middleware.on_response(request_start); + } + }, + }, } } else { let (id, code) = prepare_error(&data); @@ -484,16 +479,16 @@ async fn background_task( let params = Params::new(req.params.map(|params| params.get())); let name = &req.method; - match methods.as_callback(name) { + match methods.method_with_name(name) { None => { sink_batch.send_error(req.id, ErrorCode::MethodNotFound.into()); None } - Some(method) => match &method.callback { - MethodKind::Sync(callback) => match method.claim(name, &resources) { + Some((name, method_callback)) => match &method_callback.inner() { + MethodKind::Sync(callback) => match method_callback.claim(name, resources) { Ok(guard) => { let result = (callback)(id, params, &sink_batch); - middleware.on_result(&req.method, result, request_start); + middleware.on_result(name, result, request_start); drop(guard); None } @@ -507,7 +502,9 @@ async fn background_task( None } }, - MethodKind::Async(callback) => match method.claim(&req.method, &resources) { + MethodKind::Async(callback) => match method_callback + .claim(&req.method, resources) + { Ok(guard) => { let sink_batch = sink_batch.clone(); let id = id.into_owned(); @@ -530,7 +527,7 @@ async fn background_task( } }, MethodKind::Subscription(callback) => { - match method.claim(&req.method, &resources) { + match method_callback.claim(&req.method, resources) { Ok(guard) => { let conn_state = ConnState { conn_id, From 92bb97e5120cb7373b4831029af9f6f666e95c3c Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 18 Jan 2022 09:16:08 +0100 Subject: [PATCH 10/22] remove todo --- core/src/server/rpc_module.rs | 4 ++-- ws-server/src/server.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 54634395a2..1ed5dce031 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -364,7 +364,7 @@ impl Methods { Ok((resp, rx)) } - /// Wrapper over [`Methods::execute`] to execute a callback. + /// Execute a callback. async fn inner_call(&self, req: Request<'_>) -> RawRpcResponse { let (tx, mut rx) = mpsc::unbounded(); let sink = MethodSink::new(tx); @@ -374,7 +374,7 @@ impl Methods { let params = Params::new(req.params.map(|params| params.get())); let _result = match self.method(&req.method).map(|c| &c.callback) { - None => todo!(), + None => sink.send_error(req.id, ErrorCode::MethodNotFound.into()), Some(MethodKind::Sync(cb)) => (cb)(id, params, &sink), Some(MethodKind::Async(cb)) => (cb)(id.into_owned(), params.into_owned(), sink, 0, None).await, Some(MethodKind::Subscription(cb)) => { diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index f81d418cee..fc75721e25 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -411,7 +411,7 @@ async fn background_task( let fut = async move { let result = (callback)(id, params, sink, conn_id, Some(guard)).await; - middleware.on_result(&name, result, request_start); + middleware.on_result(name, result, request_start); middleware.on_response(request_start); }; From b9598ca684f3d4fd4148e440351e108cd9cd0407 Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 18 Jan 2022 09:37:12 +0100 Subject: [PATCH 11/22] add missing feature tokio/macros --- core/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index d7f9381712..a401f37c7f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -24,7 +24,7 @@ serde = { version = "1.0", default-features = false, features = ["derive"] } serde_json = { version = "1", features = ["raw_value"] } soketto = "0.7.1" parking_lot = { version = "0.11", optional = true } -tokio = { version = "1.8", features = ["rt"], optional = true } +tokio = { version = "1.8", features = ["rt", "macros"], optional = true } [features] default = [] From bce48da88b5f628cfccbf5cddc9cb576213ae00a Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 18 Jan 2022 15:35:54 +0100 Subject: [PATCH 12/22] make `add_stream` cancel-safe --- core/Cargo.toml | 2 +- core/src/server/rpc_module.rs | 27 +++++++++++++++++++-------- ws-server/src/server.rs | 10 ++++++---- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index a401f37c7f..d7f9381712 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -24,7 +24,7 @@ serde = { version = "1.0", default-features = false, features = ["derive"] } serde_json = { version = "1", features = ["raw_value"] } soketto = "0.7.1" parking_lot = { version = "0.11", optional = true } -tokio = { version = "1.8", features = ["rt", "macros"], optional = true } +tokio = { version = "1.8", features = ["rt"], optional = true } [features] default = [] diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 1ed5dce031..c9db23e14a 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -37,6 +37,7 @@ use crate::server::resource_limiting::{ResourceGuard, ResourceTable, ResourceVec use crate::to_json_raw_value; use crate::traits::{IdProvider, ToRpcParams}; use futures_channel::{mpsc, oneshot}; +use futures_util::future::Either; use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt}; use jsonrpsee_types::error::{invalid_subscription_err, ErrorCode, CALL_EXECUTION_FAILED_CODE}; use jsonrpsee_types::{ @@ -751,17 +752,27 @@ impl SubscriptionSink { S: Stream + Unpin, T: Serialize, { + let mut close_stream = self.close.clone(); + let mut item = stream.next(); + let mut close = close_stream.next(); + loop { - tokio::select! { - Some(item) = stream.next() => { - if let Err(Error::SubscriptionClosed(_)) = self.send(&item) { + match futures_util::future::select(item, close).await { + Either::Left((Some(i), c)) => { + if let Err(Error::SubscriptionClosed(_)) = self.send(&i) { break; } - }, - // No messages should be sent over this channel (just ignore and continue) - Some(_) = self.close.next() => {}, - // Stream or connection was dropped => close stream. - else => break, + close = c; + item = stream.next(); + } + // No messages should be sent over this channel + // if that's occur just continue. + Either::Right((Some(_), i)) => { + item = i; + close = close_stream.next(); + } + // Stream or connection has been terminated. + Either::Right((None, _)) | Either::Left((None, _)) => break, } } } diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index fc75721e25..542dbd753a 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -316,11 +316,13 @@ async fn background_task( break; } } - // terminate connection. + + // Terminate connection and send close message. let _ = sender.close().await; - // NOTE(niklasad1): when the receiver is dropped no further requests or subscriptions - // will be possible. - drop(conn_tx); + + // Force `conn_tx` to this async block and close it down + // when the connection closes to be on safe side. + conn_tx.close(); }); // Buffer for incoming data. From 6d16927c8025fbd499fe926c02e5ec4a85cb683a Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 19 Jan 2022 15:38:59 +0100 Subject: [PATCH 13/22] rename add_stream and return status --- core/src/server/rpc_module.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index c9db23e14a..7151934190 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -746,8 +746,12 @@ impl SubscriptionSink { self.inner_send(msg).map_err(Into::into) } - /// Consume the sink by passing a stream to be sent via the sink. - pub async fn add_stream(mut self, mut stream: S) + /// Read data from the stream and send back data on the subscription when items gets produced by the stream. + /// + /// Returns `Ok(())` if the stream or connection was terminated. + /// Returns `Err(_)` if one of the items couldn't be serialized. + /// + pub async fn read_stream_and_send(mut self, mut stream: S) -> Result<(), Error> where S: Stream + Unpin, T: Serialize, @@ -759,9 +763,11 @@ impl SubscriptionSink { loop { match futures_util::future::select(item, close).await { Either::Left((Some(i), c)) => { - if let Err(Error::SubscriptionClosed(_)) = self.send(&i) { - break; - } + match self.send(&i) { + Ok => (), + Err(Error::SubscriptionClosed(_)) => return Ok(()), + err => return err, + }; close = c; item = stream.next(); } @@ -772,7 +778,7 @@ impl SubscriptionSink { close = close_stream.next(); } // Stream or connection has been terminated. - Either::Right((None, _)) | Either::Left((None, _)) => break, + Either::Right((None, _)) | Either::Left((None, _)) => return Ok(()), } } } From a47a96509871cdfe9dff61d509b2f06e8bd673ee Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 19 Jan 2022 15:47:38 +0100 Subject: [PATCH 14/22] fix nits --- core/src/server/rpc_module.rs | 8 ++++---- tests/tests/integration_tests.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 7151934190..200719aaa3 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -764,9 +764,9 @@ impl SubscriptionSink { match futures_util::future::select(item, close).await { Either::Left((Some(i), c)) => { match self.send(&i) { - Ok => (), - Err(Error::SubscriptionClosed(_)) => return Ok(()), - err => return err, + Ok(_) => (), + Err(Error::SubscriptionClosed(_)) => break Ok(()), + Err(err) => break Err(err), }; close = c; item = stream.next(); @@ -778,7 +778,7 @@ impl SubscriptionSink { close = close_stream.next(); } // Stream or connection has been terminated. - Either::Right((None, _)) | Either::Left((None, _)) => return Ok(()), + Either::Right((None, _)) | Either::Left((None, _)) => break Ok(()), } } } diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 45a7928c91..b6aa707c02 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -375,7 +375,7 @@ async fn ws_server_cancels_stream_after_reset_conn() { // create stream that doesn't produce items. let stream = futures::stream::empty::(); tokio::spawn(async move { - sink.add_stream(stream).await; + sink.read_stream_and_send(stream).await.unwrap(); let send_back = Arc::make_mut(&mut tx); send_back.feed(()).await.unwrap(); }); From 07f80e276a01d54fa612775f2659e5dbde662c82 Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 19 Jan 2022 16:11:46 +0100 Subject: [PATCH 15/22] rename stream API -> streamify --- core/src/server/rpc_module.rs | 2 +- tests/tests/integration_tests.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 200719aaa3..065d24ad6f 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -751,7 +751,7 @@ impl SubscriptionSink { /// Returns `Ok(())` if the stream or connection was terminated. /// Returns `Err(_)` if one of the items couldn't be serialized. /// - pub async fn read_stream_and_send(mut self, mut stream: S) -> Result<(), Error> + pub async fn streamify(mut self, mut stream: S) -> Result<(), Error> where S: Stream + Unpin, T: Serialize, diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index b6aa707c02..e22fdc70f1 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -375,7 +375,7 @@ async fn ws_server_cancels_stream_after_reset_conn() { // create stream that doesn't produce items. let stream = futures::stream::empty::(); tokio::spawn(async move { - sink.read_stream_and_send(stream).await.unwrap(); + sink.streamify(stream).await.unwrap(); let send_back = Arc::make_mut(&mut tx); send_back.feed(()).await.unwrap(); }); From bedf808f3e683a750b696ac30d94a307cf055ca5 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 19 Jan 2022 16:20:31 +0100 Subject: [PATCH 16/22] Update core/src/server/rpc_module.rs --- core/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 065d24ad6f..fd169e498a 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -772,7 +772,7 @@ impl SubscriptionSink { item = stream.next(); } // No messages should be sent over this channel - // if that's occur just continue. + // if that occurred just ignore and continue. Either::Right((Some(_), i)) => { item = i; close = close_stream.next(); From 03ef66987640a8f1fa5aa25483994adc6b3b7a67 Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 19 Jan 2022 18:32:30 +0100 Subject: [PATCH 17/22] provide proper close reason --- core/src/server/rpc_module.rs | 21 +++++++++++++++++---- tests/tests/helpers.rs | 2 +- tests/tests/rpc_module.rs | 17 ++++++++++++++--- 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 065d24ad6f..6d0e4219e1 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -765,7 +765,10 @@ impl SubscriptionSink { Either::Left((Some(i), c)) => { match self.send(&i) { Ok(_) => (), - Err(Error::SubscriptionClosed(_)) => break Ok(()), + Err(Error::SubscriptionClosed(close_reason)) => { + self.close(&close_reason); + break Ok(()); + } Err(err) => break Err(err), }; close = c; @@ -777,8 +780,13 @@ impl SubscriptionSink { item = i; close = close_stream.next(); } - // Stream or connection has been terminated. - Either::Right((None, _)) | Either::Left((None, _)) => break Ok(()), + // Connection terminated. + Either::Right((None, _)) => { + self.close(&SubscriptionClosed::new(SubscriptionClosedReason::ConnectionReset)); + break Ok(()); + } + // Stream terminated. + Either::Left((None, _)) => break Ok(()), } } } @@ -820,11 +828,16 @@ impl SubscriptionSink { } /// Close the subscription sink with a customized error message. - pub fn close(&mut self, msg: &str) { + pub fn close_with_custom_message(&mut self, msg: &str) { let close_reason = SubscriptionClosedReason::Server(msg.to_string()).into(); self.inner_close(Some(&close_reason)); } + /// Provide close from `SubscriptionClosed`. + pub fn close(&mut self, close_reason: &SubscriptionClosed) { + self.inner_close(Some(close_reason)); + } + fn inner_close(&mut self, close_reason: Option<&SubscriptionClosed>) { self.is_connected.take(); if let Some((sink, _)) = self.subscribers.lock().remove(&self.uniq_sub) { diff --git a/tests/tests/helpers.rs b/tests/tests/helpers.rs index b8551cc68c..eef02a3841 100644 --- a/tests/tests/helpers.rs +++ b/tests/tests/helpers.rs @@ -85,7 +85,7 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle .register_subscription("subscribe_noop", "subscribe_noop", "unsubscribe_noop", |_, mut sink, _| { std::thread::spawn(move || { std::thread::sleep(Duration::from_secs(1)); - sink.close("Server closed the stream because it was lazy") + sink.close_with_custom_message("Server closed the stream because it was lazy") }); Ok(()) }) diff --git a/tests/tests/rpc_module.rs b/tests/tests/rpc_module.rs index e7e5396eba..ae200241bb 100644 --- a/tests/tests/rpc_module.rs +++ b/tests/tests/rpc_module.rs @@ -26,8 +26,8 @@ use std::collections::HashMap; +use jsonrpsee::core::error::{Error, SubscriptionClosed, SubscriptionClosedReason}; use jsonrpsee::core::server::rpc_module::*; -use jsonrpsee::core::Error; use jsonrpsee::types::{EmptyParams, Params}; use serde::{Deserialize, Serialize}; @@ -205,9 +205,10 @@ async fn subscribing_without_server() { } let sub_err = my_sub.next::().await.unwrap().unwrap_err(); + let exp = SubscriptionClosed::new(SubscriptionClosedReason::Server("No close reason provided".to_string())); // The subscription is now closed by the server. - assert!(matches!(sub_err, Error::SubscriptionClosed(_))); + assert!(matches!(sub_err, Error::SubscriptionClosed(close_reason) if close_reason == exp)); } #[tokio::test] @@ -221,6 +222,10 @@ async fn close_test_subscribing_without_server() { while !sink.is_closed() { std::thread::sleep(std::time::Duration::from_millis(500)); } + // Get the close reason. + if let Error::SubscriptionClosed(close_reason) = sink.send(&"lo").unwrap_err() { + sink.close(&close_reason); + } }); Ok(()) }) @@ -233,5 +238,11 @@ async fn close_test_subscribing_without_server() { // close the subscription to ensure it doesn't return any items. my_sub.close(); - assert!(matches!(my_sub.next::().await, Some(Err(Error::SubscriptionClosed(_))))); + + // in this case, the unsubscribe method was not called and will be treated as the + // connetion was closed. + let exp = SubscriptionClosed::new(SubscriptionClosedReason::ConnectionReset); + assert!( + matches!(my_sub.next::().await, Some(Err(Error::SubscriptionClosed(close_reason))) if close_reason == exp) + ); } From f7aa544d1799677c45328a2044c62ebfa21dd0cf Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 19 Jan 2022 18:35:03 +0100 Subject: [PATCH 18/22] spelling --- tests/tests/rpc_module.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/tests/rpc_module.rs b/tests/tests/rpc_module.rs index ae200241bb..a3355a98e8 100644 --- a/tests/tests/rpc_module.rs +++ b/tests/tests/rpc_module.rs @@ -239,8 +239,8 @@ async fn close_test_subscribing_without_server() { // close the subscription to ensure it doesn't return any items. my_sub.close(); - // in this case, the unsubscribe method was not called and will be treated as the - // connetion was closed. + // In this case, the unsubscribe method was not called and + // it will be treated as the connection was closed. let exp = SubscriptionClosed::new(SubscriptionClosedReason::ConnectionReset); assert!( matches!(my_sub.next::().await, Some(Err(Error::SubscriptionClosed(close_reason))) if close_reason == exp) From 72f4726182d2209b8d5fa7d76d4638029e212605 Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 20 Jan 2022 11:04:56 +0100 Subject: [PATCH 19/22] consume_and_streamify + docs --- core/src/server/rpc_module.rs | 23 ++++++++++++++++++++--- http-server/src/server.rs | 3 --- tests/tests/integration_tests.rs | 2 +- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 2ad98b2cdb..dcffb1e356 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -746,12 +746,26 @@ impl SubscriptionSink { self.inner_send(msg).map_err(Into::into) } - /// Read data from the stream and send back data on the subscription when items gets produced by the stream. + /// Consumes the `SubscriptionSink` and reads data from the `stream` and sends back data on the subscription + /// when items gets produced by the stream. /// /// Returns `Ok(())` if the stream or connection was terminated. /// Returns `Err(_)` if one of the items couldn't be serialized. /// - pub async fn streamify(mut self, mut stream: S) -> Result<(), Error> + /// # Examples + /// + /// ```no_run + /// + /// use jsonrpsee_core::server::rpc_module::RpcModule; + /// + /// let mut m = RpcModule::new(()); + /// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| { + /// let stream = futures_util::stream::iter(vec![1_u32, 2, 3]); + /// tokio::spawn(sink.consume_and_streamify(stream)); + /// Ok(()) + /// }); + /// ``` + pub async fn consume_and_streamify(mut self, mut stream: S) -> Result<(), Error> where S: Stream + Unpin, T: Serialize, @@ -769,7 +783,10 @@ impl SubscriptionSink { self.close(&close_reason); break Ok(()); } - Err(err) => break Err(err), + Err(err) => { + tracing::warn!("Subscription got error: {:?} terminating task", err); + break Err(err) + } }; close = c; item = stream.next(); diff --git a/http-server/src/server.rs b/http-server/src/server.rs index b25cb08b2c..71c8d0f6e1 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -497,15 +497,12 @@ async fn process_validated_request( }, }; middleware.on_result(&req.method, result, request_start); - // TODO: shouldn't `on_response` be called here?! - // middleware.on_response(request_start); } else if let Ok(_req) = serde_json::from_slice::(&body) { return Ok::<_, HyperError>(response::ok_response("".into())); } else { let (id, code) = prepare_error(&body); sink.send_error(id, code.into()); } - // Batch of requests or notifications } else if let Ok(batch) = serde_json::from_slice::>(&body) { if !batch.is_empty() { diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index e22fdc70f1..119fbe0b71 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -375,7 +375,7 @@ async fn ws_server_cancels_stream_after_reset_conn() { // create stream that doesn't produce items. let stream = futures::stream::empty::(); tokio::spawn(async move { - sink.streamify(stream).await.unwrap(); + sink.consume_and_streamify(stream).await.unwrap(); let send_back = Arc::make_mut(&mut tx); send_back.feed(()).await.unwrap(); }); From 7a20a52c0adfcbbce6f61f1fc4889d27890fcb4d Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 20 Jan 2022 11:44:27 +0100 Subject: [PATCH 20/22] fmt --- core/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index dcffb1e356..e3c7eab506 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -785,7 +785,7 @@ impl SubscriptionSink { } Err(err) => { tracing::warn!("Subscription got error: {:?} terminating task", err); - break Err(err) + break Err(err); } }; close = c; From de4ac6a1db7e3fb9320691b0b1e4934bdcaba770 Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 20 Jan 2022 12:36:57 +0100 Subject: [PATCH 21/22] rename API pipe_from_stream --- core/src/server/rpc_module.rs | 4 ++-- tests/tests/integration_tests.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index e3c7eab506..84732c3f3e 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -761,11 +761,11 @@ impl SubscriptionSink { /// let mut m = RpcModule::new(()); /// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| { /// let stream = futures_util::stream::iter(vec![1_u32, 2, 3]); - /// tokio::spawn(sink.consume_and_streamify(stream)); + /// tokio::spawn(sink.pipe_from_stream(stream)); /// Ok(()) /// }); /// ``` - pub async fn consume_and_streamify(mut self, mut stream: S) -> Result<(), Error> + pub async fn pipe_from_stream(mut self, mut stream: S) -> Result<(), Error> where S: Stream + Unpin, T: Serialize, diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 119fbe0b71..6a6713ec59 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -375,7 +375,7 @@ async fn ws_server_cancels_stream_after_reset_conn() { // create stream that doesn't produce items. let stream = futures::stream::empty::(); tokio::spawn(async move { - sink.consume_and_streamify(stream).await.unwrap(); + sink.pipe_from_stream(stream).await.unwrap(); let send_back = Arc::make_mut(&mut tx); send_back.feed(()).await.unwrap(); }); From fa15cfb36156f3c4380d6fdec757ce9338a11b3d Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 20 Jan 2022 18:52:24 +0100 Subject: [PATCH 22/22] improve logging; indicate which subscription method that failed --- core/src/server/rpc_module.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 84732c3f3e..74f1b34038 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -776,15 +776,15 @@ impl SubscriptionSink { loop { match futures_util::future::select(item, close).await { - Either::Left((Some(i), c)) => { - match self.send(&i) { + Either::Left((Some(result), c)) => { + match self.send(&result) { Ok(_) => (), Err(Error::SubscriptionClosed(close_reason)) => { self.close(&close_reason); break Ok(()); } Err(err) => { - tracing::warn!("Subscription got error: {:?} terminating task", err); + tracing::error!("subscription `{}` failed to send item got error: {:?}", self.method, err); break Err(err); } };