diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2c438029a2..b0c24fdb2e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -92,7 +92,7 @@ jobs: run: cargo hack check --workspace --each-feature --all-targets tests_ubuntu: - name: Run nextests on Ubuntu + name: Run tests Ubuntu runs-on: ubuntu-latest steps: - name: Checkout sources diff --git a/benches/helpers.rs b/benches/helpers.rs index 1cf4321004..af5d39e65d 100644 --- a/benches/helpers.rs +++ b/benches/helpers.rs @@ -141,13 +141,10 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::ws let mut module = gen_rpc_module(); module - .register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, pending, _ctx| { - let mut sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; + .register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| { let x = "Hello"; tokio::spawn(async move { sink.send(&x) }); + Ok(()) }) .unwrap(); diff --git a/core/src/server/helpers.rs b/core/src/server/helpers.rs index 09ded6799f..bf1dcf5fcb 100644 --- a/core/src/server/helpers.rs +++ b/core/src/server/helpers.rs @@ -156,9 +156,10 @@ impl MethodSink { if let Err(err) = self.send_raw(json) { tracing::warn!("Error sending response {:?}", err); + false + } else { + true } - - false } /// Helper for sending the general purpose `Error` as a JSON-RPC errors to the client diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index b387615b59..8020ae9199 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -39,11 +39,14 @@ use futures_channel::mpsc; use futures_util::future::Either; use futures_util::pin_mut; use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt, TryStream, TryStreamExt}; -use jsonrpsee_types::error::{CallError, ErrorCode, ErrorObject, ErrorObjectOwned, SUBSCRIPTION_CLOSED_WITH_ERROR}; +use jsonrpsee_types::error::{ + CallError, ErrorCode, ErrorObject, ErrorObjectOwned, INTERNAL_ERROR_CODE, + SUBSCRIPTION_CLOSED_WITH_ERROR, SubscriptionAcceptRejectError +}; use jsonrpsee_types::response::{SubscriptionError, SubscriptionPayloadError}; use jsonrpsee_types::{ - ErrorResponse, Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload, - SubscriptionResponse, + ErrorResponse, Id, Params, Request, Response, SubscriptionResult, + SubscriptionId as RpcSubscriptionId, SubscriptionPayload, SubscriptionResponse }; use parking_lot::Mutex; use rustc_hash::FxHashMap; @@ -87,7 +90,7 @@ pub struct ConnState<'a> { /// Outcome of a successful terminated subscription. #[derive(Debug)] -pub enum SubscriptionResult { +pub enum InnerSubscriptionResult { /// The subscription stream was executed successfully. Success, /// The subscription was aborted by the remote peer. @@ -382,8 +385,9 @@ impl Methods { /// use futures_util::StreamExt; /// /// let mut module = RpcModule::new(()); - /// module.register_subscription("hi", "hi", "goodbye", |_, pending, _| { - /// pending.accept().unwrap().send(&"one answer").unwrap(); + /// module.register_subscription("hi", "hi", "goodbye", |_, mut sink, _| { + /// sink.send(&"one answer").unwrap(); + /// Ok(()) /// }).unwrap(); /// let (resp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"hi","id":0}"#).await.unwrap(); /// let resp = serde_json::from_str::>(&resp).unwrap(); @@ -443,8 +447,9 @@ impl Methods { /// use jsonrpsee::{RpcModule, types::EmptyParams}; /// /// let mut module = RpcModule::new(()); - /// module.register_subscription("hi", "hi", "goodbye", |_, pending, _| { - /// pending.accept().unwrap().send(&"one answer").unwrap(); + /// module.register_subscription("hi", "hi", "goodbye", |_, mut sink, _| { + /// sink.send(&"one answer").unwrap(); + /// Ok(()) /// }).unwrap(); /// /// let mut sub = module.subscribe("hi", EmptyParams::new()).await.unwrap(); @@ -653,27 +658,22 @@ impl RpcModule { /// use jsonrpsee_core::Error; /// /// let mut ctx = RpcModule::new(99_usize); - /// ctx.register_subscription("sub", "notif_name", "unsub", |params, pending, ctx| { + /// ctx.register_subscription("sub", "notif_name", "unsub", |params, mut sink, ctx| { /// let x = match params.one::() { /// Ok(x) => x, /// Err(e) => { /// let err: Error = e.into(); - /// pending.reject(err); - /// return; - /// } - /// }; - /// - /// let mut sink = match pending.accept() { - /// Some(sink) => sink, - /// _ => { - /// return; + /// sink.reject(err); + /// return Ok(()); /// } /// }; - /// + /// // Sink is accepted on the first `send` call. /// std::thread::spawn(move || { /// let sum = x + (*ctx); /// let _ = sink.send(&sum); /// }); + /// + /// Ok(()) /// }); /// ``` pub fn register_subscription( @@ -685,7 +685,7 @@ impl RpcModule { ) -> Result where Context: Send + Sync + 'static, - F: Fn(Params, PendingSubscription, Arc) + Send + Sync + 'static, + F: Fn(Params, SubscriptionSink, Arc) -> SubscriptionResult + Send + Sync + 'static, { if subscribe_method_name == unsubscribe_method_name { return Err(Error::SubscriptionNameConflict(subscribe_method_name.into())); @@ -740,17 +740,21 @@ impl RpcModule { MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn, claimed| { let sub_id: RpcSubscriptionId = conn.id_provider.next_id(); - let sink = PendingSubscription(Some(InnerPendingSubscription { - sink: method_sink.clone(), + let sink = SubscriptionSink { + inner: method_sink.clone(), close_notify: Some(conn.close_notify), method: notif_method_name, subscribers: subscribers.clone(), uniq_sub: SubscriptionKey { conn_id: conn.conn_id, sub_id }, - id: id.clone().into_owned(), - claimed, - })); + id: Some(id.clone().into_owned()), + unsubscribe: None, + _claimed: claimed, + }; - callback(params, sink, ctx.clone()); + // The callback returns a `SubscriptionResult` for better ergonomics and is not propagated further. + if let Err(_) = callback(params, sink, ctx.clone()) { + tracing::warn!("subscribe call `{}` failed", subscribe_method_name); + } true })), @@ -775,101 +779,75 @@ impl RpcModule { } } -/// Represent a pending subscription which waits to be accepted or rejected. -/// -/// Note: you need to call either `PendingSubscription::accept` or `PendingSubscription::reject` otherwise -/// the subscription will be dropped with an `InvalidParams` error. +/// Returns once the unsubscribe method has been called. +type UnsubscribeCall = Option>; + +/// Represents a single subscription. #[derive(Debug)] -struct InnerPendingSubscription { +pub struct SubscriptionSink { /// Sink. - sink: MethodSink, + inner: MethodSink, /// Get notified when subscribers leave so we can exit close_notify: Option, /// MethodCallback. method: &'static str, + /// Shared Mutex of subscriptions for this method. + subscribers: Subscribers, /// Unique subscription. uniq_sub: SubscriptionKey, - /// Shared Mutex of subscriptions - subscribers: Subscribers, - /// Request ID. - id: Id<'static>, + /// Id of the `subscription call` (i.e. not the same as subscription id) which is used + /// to reply to subscription method call and must only be used once. + /// + /// *Note*: Having some value means the subscription was not accepted or rejected yet. + id: Option>, + /// Having some value means the subscription was accepted. + unsubscribe: UnsubscribeCall, /// Claimed resources. - claimed: Option, + _claimed: Option, } -/// Represent a pending subscription which waits until it's either accepted or rejected. -/// -/// This type implements `Drop` for ease of use, e.g. when dropped in error short circuiting via `map_err()?`. -#[derive(Debug)] -pub struct PendingSubscription(Option); - -impl PendingSubscription { +impl SubscriptionSink { /// Reject the subscription call from [`ErrorObject`]. - pub fn reject(mut self, err: impl Into) -> bool { - if let Some(inner) = self.0.take() { - let InnerPendingSubscription { sink, id, .. } = inner; - sink.send_error(id, err.into()) + pub fn reject(&mut self, err: impl Into) -> Result<(), SubscriptionAcceptRejectError> { + let id = self.id.take().ok_or(SubscriptionAcceptRejectError::AlreadyCalled)?; + + if self.inner.send_error(id, err.into()) { + Ok(()) } else { - false + Err(SubscriptionAcceptRejectError::RemotePeerAborted) } } /// Attempt to accept the subscription and respond the subscription method call. /// - /// Fails if the connection was closed - pub fn accept(mut self) -> Option { - let inner = self.0.take()?; - - let InnerPendingSubscription { sink, close_notify, method, uniq_sub, subscribers, id, claimed } = inner; + /// Fails if the connection was closed, or if called multiple times. + pub fn accept(&mut self) -> Result<(), SubscriptionAcceptRejectError> { + let id = self.id.take().ok_or(SubscriptionAcceptRejectError::AlreadyCalled)?; - if sink.send_response(id, &uniq_sub.sub_id) { + if self.inner.send_response(id, &self.uniq_sub.sub_id) { let (tx, rx) = watch::channel(()); - subscribers.lock().insert(uniq_sub.clone(), (sink.clone(), tx)); - Some(SubscriptionSink { inner: sink, close_notify, method, uniq_sub, subscribers, unsubscribe: rx, _claimed: claimed }) + self.subscribers.lock().insert(self.uniq_sub.clone(), (self.inner.clone(), tx)); + self.unsubscribe = Some(rx); + Ok(()) } else { - None + Err(SubscriptionAcceptRejectError::RemotePeerAborted) } } -} - -// When dropped it returns an [`InvalidParams`] error to the subscriber -impl Drop for PendingSubscription { - fn drop(&mut self) { - if let Some(inner) = self.0.take() { - let InnerPendingSubscription { sink, id, .. } = inner; - sink.send_error(id, ErrorCode::InvalidParams.into()); - } - } -} - -/// Represents a single subscription. -#[derive(Debug)] -pub struct SubscriptionSink { - /// Sink. - inner: MethodSink, - /// Get notified when subscribers leave so we can exit - close_notify: Option, - /// MethodCallback. - method: &'static str, - /// Unique subscription. - uniq_sub: SubscriptionKey, - /// Shared Mutex of subscriptions for this method. - subscribers: Subscribers, - /// Future that returns when the unsubscribe method has been called. - unsubscribe: watch::Receiver<()>, - /// Claimed resources. - _claimed: Option, -} -impl SubscriptionSink { /// Send a message back to subscribers. /// - /// Returns `Ok(true)` if the message could be send - /// Returns `Ok(false)` if the sink was closed (either because the subscription was closed or the connection was terminated) - /// Return `Err(err)` if the message could not be serialized. - /// + /// Returns + /// - `Ok(true)` if the message could be send. + /// - `Ok(false)` if the sink was closed (either because the subscription was closed or the connection was terminated), + /// or the subscription could not be accepted. + /// - `Err(err)` if the message could not be serialized. pub fn send(&mut self, result: &T) -> Result { - // only possible to trigger when the connection is dropped. + // Cannot accept the subscription. + if let Err(SubscriptionAcceptRejectError::RemotePeerAborted) = self.accept() { + return Ok(false); + } + + // Only possible to trigger when the connection is dropped. if self.is_closed() { return Ok(false); } @@ -889,14 +867,13 @@ impl SubscriptionSink { /// /// ```no_run /// - /// use jsonrpsee_core::server::rpc_module::{RpcModule, SubscriptionResult}; + /// use jsonrpsee_core::server::rpc_module::RpcModule; /// use jsonrpsee_core::error::{Error, SubscriptionClosed}; /// use jsonrpsee_types::ErrorObjectOwned; /// use anyhow::anyhow; /// /// let mut m = RpcModule::new(()); - /// m.register_subscription("sub", "_", "unsub", |params, pending, _| { - /// let mut sink = pending.accept().unwrap(); + /// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| { /// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]); /// // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber /// // because after the `Err(_)` the stream is terminated. @@ -915,6 +892,7 @@ impl SubscriptionSink { /// } /// } /// }); + /// Ok(()) /// }); /// ``` pub async fn pipe_from_try_stream(&mut self, mut stream: S) -> SubscriptionClosed @@ -923,14 +901,24 @@ impl SubscriptionSink { T: Serialize, E: std::fmt::Display, { + if let Err(SubscriptionAcceptRejectError::RemotePeerAborted) = self.accept() { + return SubscriptionClosed::RemotePeerAborted; + } + let conn_closed = match self.close_notify.as_ref().map(|cn| cn.handle()) { Some(cn) => cn, - None => { - return SubscriptionClosed::RemotePeerAborted; - } + None => return SubscriptionClosed::RemotePeerAborted, + }; + + let mut sub_closed = match self.unsubscribe.as_ref() { + Some(rx) => rx.clone(), + _ => return SubscriptionClosed::Failed(ErrorObject::owned( + INTERNAL_ERROR_CODE, + "Unsubscribe watcher not set after accepting the subscription".to_string(), + None::<()> + )), }; - let mut sub_closed = self.unsubscribe.clone(); let sub_closed_fut = sub_closed.changed(); let conn_closed_fut = conn_closed.notified(); @@ -983,10 +971,10 @@ impl SubscriptionSink { /// use jsonrpsee_core::server::rpc_module::RpcModule; /// /// let mut m = RpcModule::new(()); - /// m.register_subscription("sub", "_", "unsub", |params, pending, _| { - /// let mut sink = pending.accept().unwrap(); + /// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| { /// let stream = futures_util::stream::iter(vec![1_usize, 2, 3]); /// tokio::spawn(async move { sink.pipe_from_stream(stream).await; }); + /// Ok(()) /// }); /// ``` pub async fn pipe_from_stream(&mut self, stream: S) -> SubscriptionClosed @@ -1003,7 +991,10 @@ impl SubscriptionSink { } fn is_active_subscription(&self) -> bool { - !self.unsubscribe.has_changed().is_err() + match self.unsubscribe.as_ref() { + Some(unsubscribe) => !unsubscribe.has_changed().is_err(), + _ => false, + } } fn build_message(&self, result: &T) -> Result { @@ -1057,7 +1048,13 @@ impl SubscriptionSink { impl Drop for SubscriptionSink { fn drop(&mut self) { - if self.is_active_subscription() { + if let Some(id) = self.id.take() { + // Subscription was never accepted / rejected. As such, + // we default to assuming that the params were invalid, + // because that's how the previous PendingSubscription logic + // worked. + self.inner.send_error(id, ErrorCode::InvalidParams.into()); + } else if self.is_active_subscription() { self.subscribers.lock().remove(&self.uniq_sub); } } diff --git a/examples/examples/proc_macro.rs b/examples/examples/proc_macro.rs index 610789e53c..f980196899 100644 --- a/examples/examples/proc_macro.rs +++ b/examples/examples/proc_macro.rs @@ -28,8 +28,9 @@ use std::net::SocketAddr; use jsonrpsee::core::{async_trait, client::Subscription, Error}; use jsonrpsee::proc_macros::rpc; +use jsonrpsee::types::SubscriptionResult; use jsonrpsee::ws_client::WsClientBuilder; -use jsonrpsee::ws_server::{PendingSubscription, WsServerBuilder, WsServerHandle}; +use jsonrpsee::ws_server::{SubscriptionSink, WsServerBuilder, WsServerHandle}; type ExampleHash = [u8; 32]; type ExampleStorageKey = Vec; @@ -60,10 +61,14 @@ impl RpcServer for RpcServerImpl { Ok(vec![storage_key]) } - fn subscribe_storage(&self, pending: PendingSubscription, _keys: Option>) { - if let Some(mut sink) = pending.accept() { - let _ = sink.send(&vec![[0; 32]]); - } + // Note that the server's subscription method must return `SubscriptionResult`. + fn subscribe_storage( + &self, + mut sink: SubscriptionSink, + _keys: Option>, + ) -> SubscriptionResult { + let _ = sink.send(&vec![[0; 32]]); + Ok(()) } } diff --git a/examples/examples/ws_pubsub_broadcast.rs b/examples/examples/ws_pubsub_broadcast.rs index 6c144fc430..e5de2e78d0 100644 --- a/examples/examples/ws_pubsub_broadcast.rs +++ b/examples/examples/ws_pubsub_broadcast.rs @@ -71,12 +71,8 @@ async fn run_server() -> anyhow::Result { std::thread::spawn(move || produce_items(tx2)); - module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, pending, _| { + module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, mut sink, _| { let rx = BroadcastStream::new(tx.clone().subscribe()); - let mut sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; tokio::spawn(async move { match sink.pipe_from_try_stream(rx).await { @@ -89,6 +85,7 @@ async fn run_server() -> anyhow::Result { } }; }); + Ok(()) })?; let addr = server.local_addr()?; server.start(module)?; diff --git a/examples/examples/ws_pubsub_with_params.rs b/examples/examples/ws_pubsub_with_params.rs index 432e9a144c..dddc09c1ff 100644 --- a/examples/examples/ws_pubsub_with_params.rs +++ b/examples/examples/ws_pubsub_with_params.rs @@ -66,11 +66,8 @@ async fn run_server() -> anyhow::Result { let server = WsServerBuilder::default().build("127.0.0.1:0").await?; let mut module = RpcModule::new(()); module - .register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, pending, _| { - let (idx, mut sink) = match (params.one(), pending.accept()) { - (Ok(idx), Some(sink)) => (idx, sink), - _ => return, - }; + .register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, mut sink, _| { + let idx = params.one()?; let item = LETTERS.chars().nth(idx); let interval = interval(Duration::from_millis(200)); @@ -78,24 +75,18 @@ async fn run_server() -> anyhow::Result { tokio::spawn(async move { match sink.pipe_from_stream(stream).await { - // Send close notification when subscription stream failed. SubscriptionClosed::Failed(err) => { sink.close(err); } - // Don't send close notification because the stream should run forever. - SubscriptionClosed::Success => (), - // Don't send close because the client has already disconnected. - SubscriptionClosed::RemotePeerAborted => (), + _ => (), }; }); + Ok(()) }) .unwrap(); module - .register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, pending, _| { - let (one, two, mut sink) = match (params.parse::<(usize, usize)>(), pending.accept()) { - (Ok((one, two)), Some(sink)) => (one, two, sink), - _ => return, - }; + .register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, mut sink, _| { + let (one, two) = params.parse::<(usize, usize)>()?; let item = &LETTERS[one..two]; @@ -104,16 +95,14 @@ async fn run_server() -> anyhow::Result { tokio::spawn(async move { match sink.pipe_from_stream(stream).await { - // Send close notification when subscription stream failed. SubscriptionClosed::Failed(err) => { sink.close(err); } - // Don't send close notification because the stream should run forever. - SubscriptionClosed::Success => (), - // Don't send close because the client has already disconnected. - SubscriptionClosed::RemotePeerAborted => (), + _ => (), }; }); + + Ok(()) }) .unwrap(); diff --git a/jsonrpsee/src/lib.rs b/jsonrpsee/src/lib.rs index a792f32796..c60b2660da 100644 --- a/jsonrpsee/src/lib.rs +++ b/jsonrpsee/src/lib.rs @@ -92,7 +92,7 @@ cfg_types! { } cfg_server! { - pub use jsonrpsee_core::server::rpc_module::{PendingSubscription, RpcModule, SubscriptionSink}; + pub use jsonrpsee_core::server::rpc_module::{RpcModule, SubscriptionSink}; } cfg_client_or_server! { diff --git a/proc-macros/src/lib.rs b/proc-macros/src/lib.rs index fed781d73a..0b0b21cae3 100644 --- a/proc-macros/src/lib.rs +++ b/proc-macros/src/lib.rs @@ -49,13 +49,14 @@ pub(crate) mod visitor; /// type that implements `Client` or `SubscriptionClient` (depending on whether trait has /// subscriptions methods or not), namely `HttpClient` and `WsClient`. /// -/// For servers, it will generate a trait mostly equivalent to the input, with two main -/// differences: +/// For servers, it will generate a trait mostly equivalent to the input, with the following differences: /// /// - The trait will have one additional (already implemented) method, `into_rpc`, which turns any object that /// implements the server trait into an `RpcModule`. -/// - For subscription methods, there will be one additional argument inserted right after `&self`: `subscription_sink: -/// PendingSubscription`. It should be used accept or reject a pending subscription. +/// - For subscription methods: +/// - There will be one additional argument inserted right after `&self`: `subscription_sink: SubscriptionSink`. +/// It should be used to accept or reject a subscription and send data back to subscribers. +/// - The return type of the subscription method is `SubscriptionResult` for improved ergonomics. /// /// Since this macro can generate up to two traits, both server and client traits will have /// a new name. For the `Foo` trait, server trait will be named `FooServer`, and client, @@ -98,8 +99,8 @@ pub(crate) mod visitor; /// async fn async_method(&self, param_a: u8, param_b: String) -> u16; /// fn sync_method(&self) -> String; /// -/// // Note that `subscription_sink` was added automatically. -/// fn sub(&self, subscription_sink: PendingSubscription); +/// // Note that `subscription_sink` and `SubscriptionResult` were added automatically. +/// fn sub(&self, subscription_sink: SubscriptionResult) -> SubscriptionResult; /// /// fn into_rpc(self) -> Result { /// // Actual implementation stripped, but inside we will create @@ -214,7 +215,8 @@ pub(crate) mod visitor; /// /// // RPC is put into a separate module to clearly show names of generated entities. /// mod rpc_impl { -/// use jsonrpsee::{proc_macros::rpc, core::async_trait, core::RpcResult, ws_server::PendingSubscription}; +/// use jsonrpsee::{proc_macros::rpc, core::async_trait, core::RpcResult, ws_server::SubscriptionSink}; +/// use jsonrpsee::types::SubscriptionResult; /// /// // Generate both server and client implementations, prepend all the methods with `foo_` prefix. /// #[rpc(client, server, namespace = "foo")] @@ -286,21 +288,21 @@ pub(crate) mod visitor; /// /// // The stream API can be used to pipe items from the underlying stream /// // as subscription responses. -/// fn sub_override_notif_method(&self, pending: PendingSubscription) { -/// let mut sink = pending.accept().unwrap(); +/// fn sub_override_notif_method(&self, mut sink: SubscriptionSink) -> SubscriptionResult { /// tokio::spawn(async move { /// let stream = futures_util::stream::iter(["one", "two", "three"]); /// sink.pipe_from_stream(stream).await; /// }); +/// Ok(()) /// } /// /// // We could've spawned a `tokio` future that yields values while our program works, /// // but for simplicity of the example we will only send two values and then close /// // the subscription. -/// fn sub(&self, pending: PendingSubscription) { -/// let mut sink = pending.accept().unwrap(); +/// fn sub(&self, mut sink: SubscriptionSink) -> SubscriptionResult { /// let _ = sink.send(&"Response_A"); /// let _ = sink.send(&"Response_B"); +/// Ok(()) /// } /// } /// } diff --git a/proc-macros/src/render_server.rs b/proc-macros/src/render_server.rs index 2f7807f66c..e59b5e2d56 100644 --- a/proc-macros/src/render_server.rs +++ b/proc-macros/src/render_server.rs @@ -32,7 +32,7 @@ use crate::helpers::{generate_where_clause, is_option}; use proc_macro2::{Span, TokenStream as TokenStream2}; use quote::{quote, quote_spanned}; use syn::punctuated::Punctuated; -use syn::Token; +use syn::{parse_quote, ReturnType, Token}; impl RpcDescription { pub(super) fn render_server(&self) -> Result { @@ -71,10 +71,16 @@ impl RpcDescription { let subscriptions = self.subscriptions.iter().map(|sub| { let docs = &sub.docs; - let subscription_sink_ty = self.jrps_server_item(quote! { PendingSubscription }); + let subscription_sink_ty = self.jrps_server_item(quote! { SubscriptionSink }); // Add `SubscriptionSink` as the second input parameter to the signature. let subscription_sink: syn::FnArg = syn::parse_quote!(subscription_sink: #subscription_sink_ty); let mut sub_sig = sub.signature.clone(); + + // For ergonomic reasons, the server's subscription method should return `SubscriptionResult`. + let return_ty = self.jrps_server_item(quote! { types::SubscriptionResult }); + let output: ReturnType = parse_quote! { -> #return_ty }; + sub_sig.sig.output = output; + sub_sig.sig.inputs.insert(1, subscription_sink); quote! { #docs @@ -209,7 +215,7 @@ impl RpcDescription { let resources = handle_resource_limits(&sub.resources); handle_register_result(quote! { - rpc.register_subscription(#rpc_sub_name, #rpc_notif_name, #rpc_unsub_name, |params, subscription_sink, context| { + rpc.register_subscription(#rpc_sub_name, #rpc_notif_name, #rpc_unsub_name, |params, mut subscription_sink, context| { #parsing context.as_ref().#rust_method_name(subscription_sink, #params_seq) }) @@ -313,6 +319,7 @@ impl RpcDescription { let params_fields = quote! { #(#params_fields_seq),* }; let tracing = self.jrps_server_item(quote! { tracing }); let err = self.jrps_server_item(quote! { core::Error }); + let sub_err = self.jrps_server_item(quote! { types::SubscriptionEmptyError }); // Code to decode sequence of parameters from a JSON array. let decode_array = { @@ -324,8 +331,8 @@ impl RpcDescription { Err(e) => { #tracing::error!(concat!("Error parsing optional \"", stringify!(#name), "\" as \"", stringify!(#ty), "\": {:?}"), e); let _e: #err = e.into(); - #pending.reject(_e); - return; + #pending.reject(_e)?; + return Err(#sub_err); } }; } @@ -348,8 +355,8 @@ impl RpcDescription { Err(e) => { #tracing::error!(concat!("Error parsing \"", stringify!(#name), "\" as \"", stringify!(#ty), "\": {:?}"), e); let _e: #err = e.into(); - #pending.reject(_e); - return; + #pending.reject(_e)?; + return Err(#sub_err); } }; } @@ -399,8 +406,8 @@ impl RpcDescription { Err(e) => { #tracing::error!("Failed to parse JSON-RPC params as object: {}", e); let _e: #err = e.into(); - #pending.reject(_e); - return; + #pending.reject(_e)?; + return Err(#sub_err); } }; diff --git a/proc-macros/src/rpc_macro.rs b/proc-macros/src/rpc_macro.rs index c12e43bf35..8418740a0b 100644 --- a/proc-macros/src/rpc_macro.rs +++ b/proc-macros/src/rpc_macro.rs @@ -277,7 +277,7 @@ impl RpcDescription { if !matches!(method.sig.output, syn::ReturnType::Default) { return Err(syn::Error::new_spanned( &method, - "Subscription methods must not return anything; the error must send via subscription via either `PendingSubscription::reject` or `SubscripionSink::close`", + "Subscription methods must not return anything; the error must send via subscription via either `SubscriptionSink::reject` or `SubscriptionSink::close`", )); } diff --git a/proc-macros/tests/ui/correct/basic.rs b/proc-macros/tests/ui/correct/basic.rs index db84889c2a..7be44ccda7 100644 --- a/proc-macros/tests/ui/correct/basic.rs +++ b/proc-macros/tests/ui/correct/basic.rs @@ -3,10 +3,11 @@ use std::net::SocketAddr; use jsonrpsee::core::{async_trait, client::ClientT, RpcResult}; +use jsonrpsee::types::SubscriptionResult; use jsonrpsee::proc_macros::rpc; use jsonrpsee::rpc_params; use jsonrpsee::ws_client::*; -use jsonrpsee::ws_server::{PendingSubscription, WsServerBuilder}; +use jsonrpsee::ws_server::{SubscriptionSink, WsServerBuilder}; #[rpc(client, server, namespace = "foo")] pub trait Rpc { @@ -63,28 +64,21 @@ impl RpcServer for RpcServerImpl { Ok(10u16) } - fn sub(&self, pending: PendingSubscription) { - let mut sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; + fn sub(&self, mut sink: SubscriptionSink) -> SubscriptionResult { let _ = sink.send(&"Response_A"); let _ = sink.send(&"Response_B"); + Ok(()) } - fn sub_with_params(&self, pending: PendingSubscription, val: u32) { - let mut sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; + fn sub_with_params(&self, mut sink: SubscriptionSink, val: u32) -> SubscriptionResult { let _ = sink.send(&val); let _ = sink.send(&val); + Ok(()) } - fn sub_with_override_notif_method(&self, pending: PendingSubscription) { - if let Some(mut sink) = pending.accept() { - let _ = sink.send(&1); - } + fn sub_with_override_notif_method(&self, mut sink: SubscriptionSink) -> SubscriptionResult { + let _ = sink.send(&1); + Ok(()) } } diff --git a/proc-macros/tests/ui/correct/only_server.rs b/proc-macros/tests/ui/correct/only_server.rs index 371d1ca1b6..d8556b09f7 100644 --- a/proc-macros/tests/ui/correct/only_server.rs +++ b/proc-macros/tests/ui/correct/only_server.rs @@ -1,8 +1,9 @@ use std::net::SocketAddr; use jsonrpsee::core::{async_trait, RpcResult}; +use jsonrpsee::types::SubscriptionResult; use jsonrpsee::proc_macros::rpc; -use jsonrpsee::ws_server::{PendingSubscription, WsServerBuilder}; +use jsonrpsee::ws_server::{SubscriptionSink, WsServerBuilder}; #[rpc(server)] pub trait Rpc { @@ -28,14 +29,12 @@ impl RpcServer for RpcServerImpl { Ok(10u16) } - fn sub(&self, pending: PendingSubscription) { - let mut sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; + fn sub(&self, mut sink: SubscriptionSink) -> SubscriptionResult { + sink.accept()?; let _ = sink.send(&"Response_A"); let _ = sink.send(&"Response_B"); + Ok(()) } } diff --git a/tests/tests/helpers.rs b/tests/tests/helpers.rs index 72df50fd9e..6e283d94bd 100644 --- a/tests/tests/helpers.rs +++ b/tests/tests/helpers.rs @@ -44,51 +44,58 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle module.register_method("say_hello", |_, _| Ok("hello")).unwrap(); module - .register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, pending, _| { - let mut sink = pending.accept().unwrap(); + .register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, mut sink, _| { + // Explicit call to accept. + sink.accept().unwrap(); std::thread::spawn(move || loop { if let Ok(false) = sink.send(&"hello from subscription") { break; } std::thread::sleep(Duration::from_millis(50)); }); + Ok(()) }) .unwrap(); module - .register_subscription("subscribe_foo", "subscribe_foo", "unsubscribe_foo", |_, pending, _| { - let mut sink = pending.accept().unwrap(); + .register_subscription("subscribe_foo", "subscribe_foo", "unsubscribe_foo", |_, mut sink, _| { std::thread::spawn(move || loop { + // Implicit call to accept for the first send. if let Ok(false) = sink.send(&1337_usize) { break; } std::thread::sleep(Duration::from_millis(100)); }); + Ok(()) }) .unwrap(); module - .register_subscription("subscribe_add_one", "subscribe_add_one", "unsubscribe_add_one", |params, pending, _| { - let mut count = match params.one::() { - Ok(count) => count, - _ => return, - }; - - let mut sink = pending.accept().unwrap(); - - std::thread::spawn(move || loop { - count = count.wrapping_add(1); - if let Err(_) | Ok(false) = sink.send(&count) { - break; - } - std::thread::sleep(Duration::from_millis(100)); - }); - }) + .register_subscription( + "subscribe_add_one", + "subscribe_add_one", + "unsubscribe_add_one", + |params, mut sink, _| { + let mut count = match params.one::() { + Ok(count) => count, + _ => return Ok(()), + }; + + std::thread::spawn(move || loop { + count = count.wrapping_add(1); + if let Err(_) | Ok(false) = sink.send(&count) { + break; + } + std::thread::sleep(Duration::from_millis(100)); + }); + Ok(()) + }, + ) .unwrap(); module - .register_subscription("subscribe_noop", "subscribe_noop", "unsubscribe_noop", |_, pending, _| { - let sink = pending.accept().unwrap(); + .register_subscription("subscribe_noop", "subscribe_noop", "unsubscribe_noop", |_, mut sink, _| { + sink.accept().unwrap(); std::thread::spawn(move || { std::thread::sleep(Duration::from_secs(1)); let err = ErrorObject::owned( @@ -98,13 +105,12 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle ); sink.close(err); }); + Ok(()) }) .unwrap(); module - .register_subscription("subscribe_5_ints", "n", "unsubscribe_5_ints", |_, pending, _| { - let mut sink = pending.accept().unwrap(); - + .register_subscription("subscribe_5_ints", "n", "unsubscribe_5_ints", |_, mut sink, _| { tokio::spawn(async move { let interval = interval(Duration::from_millis(50)); let stream = IntervalStream::new(interval).zip(futures::stream::iter(1..=5)).map(|(_, c)| c); @@ -116,13 +122,12 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle _ => unreachable!(), } }); + Ok(()) }) .unwrap(); module - .register_subscription("can_reuse_subscription", "n", "u_can_reuse_subscription", |_, pending, _| { - let mut sink = pending.accept().unwrap(); - + .register_subscription("can_reuse_subscription", "n", "u_can_reuse_subscription", |_, mut sink, _| { tokio::spawn(async move { let stream1 = IntervalStream::new(interval(Duration::from_millis(50))) .zip(futures::stream::iter(1..=5)) @@ -141,6 +146,7 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle _ => unreachable!(), } }); + Ok(()) }) .unwrap(); @@ -149,12 +155,10 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle "subscribe_with_err_on_stream", "n", "unsubscribe_with_err_on_stream", - move |_, pending, _| { - let mut sink = pending.accept().unwrap(); - + move |_, mut sink, _| { let err: &'static str = "error on the stream"; - // create stream that produce an error which will cancel the subscription. + // Create stream that produce an error which will cancel the subscription. let stream = futures::stream::iter(vec![Ok(1_u32), Err(err), Ok(2), Ok(3)]); tokio::spawn(async move { match sink.pipe_from_try_stream(stream).await { @@ -164,6 +168,7 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle _ => unreachable!(), } }); + Ok(()) }, ) .unwrap(); @@ -201,9 +206,7 @@ pub async fn websocket_server_with_sleeping_subscription(tx: futures::channel::m let mut module = RpcModule::new(tx); module - .register_subscription("subscribe_sleep", "n", "unsubscribe_sleep", |_, pending, mut tx| { - let mut sink = pending.accept().unwrap(); - + .register_subscription("subscribe_sleep", "n", "unsubscribe_sleep", |_, mut sink, mut tx| { tokio::spawn(async move { let interval = interval(Duration::from_secs(60 * 60)); let stream = IntervalStream::new(interval).zip(futures::stream::iter(1..=5)).map(|(_, c)| c); @@ -212,6 +215,7 @@ pub async fn websocket_server_with_sleeping_subscription(tx: futures::channel::m let send_back = std::sync::Arc::make_mut(&mut tx); send_back.send(()).await.unwrap(); }); + Ok(()) }) .unwrap(); server.start(module).unwrap(); diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 9369cd6b67..81fe1153b3 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -424,8 +424,8 @@ async fn ws_server_should_stop_subscription_after_client_drop() { let mut module = RpcModule::new(tx); module - .register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, pending, mut tx| { - let mut sink = pending.accept().unwrap(); + .register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, mut sink, mut tx| { + sink.accept().unwrap(); tokio::spawn(async move { let close_err = loop { if !sink.send(&1_usize).expect("usize can be serialized; qed") { @@ -436,6 +436,7 @@ async fn ws_server_should_stop_subscription_after_client_drop() { let send_back = Arc::make_mut(&mut tx); send_back.feed(close_err).await.unwrap(); }); + Ok(()) }) .unwrap(); @@ -589,12 +590,7 @@ async fn ws_server_limit_subs_per_conn_works() { let mut module = RpcModule::new(()); module - .register_subscription("subscribe_forever", "n", "unsubscribe_forever", |_, pending, _| { - let mut sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; - + .register_subscription("subscribe_forever", "n", "unsubscribe_forever", |_, mut sink, _| { tokio::spawn(async move { let interval = interval(Duration::from_millis(50)); let stream = IntervalStream::new(interval).map(move |_| 0_usize); @@ -606,6 +602,7 @@ async fn ws_server_limit_subs_per_conn_works() { _ => unreachable!(), }; }); + Ok(()) }) .unwrap(); server.start(module).unwrap(); @@ -648,12 +645,7 @@ async fn ws_server_unsub_methods_should_ignore_sub_limit() { let mut module = RpcModule::new(()); module - .register_subscription("subscribe_forever", "n", "unsubscribe_forever", |_, pending, _| { - let mut sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; - + .register_subscription("subscribe_forever", "n", "unsubscribe_forever", |_, mut sink, _| { tokio::spawn(async move { let interval = interval(Duration::from_millis(50)); let stream = IntervalStream::new(interval).map(move |_| 0_usize); @@ -665,6 +657,7 @@ async fn ws_server_unsub_methods_should_ignore_sub_limit() { _ => unreachable!(), }; }); + Ok(()) }) .unwrap(); server.start(module).unwrap(); diff --git a/tests/tests/proc_macros.rs b/tests/tests/proc_macros.rs index 5ee1e67407..8b55ce039a 100644 --- a/tests/tests/proc_macros.rs +++ b/tests/tests/proc_macros.rs @@ -43,7 +43,8 @@ use serde_json::json; mod rpc_impl { use jsonrpsee::core::{async_trait, RpcResult}; use jsonrpsee::proc_macros::rpc; - use jsonrpsee::PendingSubscription; + use jsonrpsee::types::SubscriptionResult; + use jsonrpsee::SubscriptionSink; #[rpc(client, server, namespace = "foo")] pub trait Rpc { @@ -166,22 +167,16 @@ mod rpc_impl { Ok(10u16) } - fn sub(&self, pending: PendingSubscription) { - let mut sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; + fn sub(&self, mut sink: SubscriptionSink) -> SubscriptionResult { let _ = sink.send(&"Response_A"); let _ = sink.send(&"Response_B"); + Ok(()) } - fn sub_with_params(&self, pending: PendingSubscription, val: u32) { - let mut sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; + fn sub_with_params(&self, mut sink: SubscriptionSink, val: u32) -> SubscriptionResult { let _ = sink.send(&val); let _ = sink.send(&val); + Ok(()) } } @@ -194,12 +189,9 @@ mod rpc_impl { #[async_trait] impl OnlyGenericSubscriptionServer for RpcServerImpl { - fn sub(&self, pending: PendingSubscription, _: String) { - let mut sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; + fn sub(&self, mut sink: SubscriptionSink, _: String) -> SubscriptionResult { let _ = sink.send(&"hello"); + Ok(()) } } } diff --git a/tests/tests/resource_limiting.rs b/tests/tests/resource_limiting.rs index 56c2f97632..4784be06f2 100644 --- a/tests/tests/resource_limiting.rs +++ b/tests/tests/resource_limiting.rs @@ -33,9 +33,10 @@ use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle}; use jsonrpsee::proc_macros::rpc; use jsonrpsee::types::error::CallError; +use jsonrpsee::types::SubscriptionResult; use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::ws_server::{WsServerBuilder, WsServerHandle}; -use jsonrpsee::{PendingSubscription, RpcModule}; +use jsonrpsee::{RpcModule, SubscriptionSink}; use tokio::time::sleep; fn module_manual() -> Result, Error> { @@ -64,29 +65,25 @@ fn module_manual() -> Result, Error> { // Drop the `SubscriptionSink` to cause the internal `ResourceGuard` allocated per subscription call // to get dropped. This is the equivalent of not having any resource limits (ie, sink is never used). module - .register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, pending, _| { - let mut _sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; + .register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, mut sink, _| { + sink.accept()?; + Ok(()) })? .resource("SUB", 3)?; // Keep the `SubscriptionSink` alive for a bit to validate that `ResourceGuard` is alive // and the subscription method gets limited. module - .register_subscription("subscribe_hello_limit", "s_hello", "unsubscribe_hello_limit", move |_, pending, _| { - let mut sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; - + .register_subscription("subscribe_hello_limit", "s_hello", "unsubscribe_hello_limit", move |_, mut sink, _| { tokio::spawn(async move { for val in 0..10 { + // Sink is accepted on the first `send` call. sink.send(&val).unwrap(); sleep(Duration::from_secs(1)).await; } }); + + Ok(()) })? .resource("SUB", 3)?; @@ -122,25 +119,20 @@ fn module_macro() -> RpcModule<()> { } impl RpcServer for () { - fn sub_hello(&self, pending: PendingSubscription) { - let mut _sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; + fn sub_hello(&self, mut sink: SubscriptionSink) -> SubscriptionResult { + sink.accept()?; + Ok(()) } - fn sub_hello_limit(&self, pending: PendingSubscription) { - let mut sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; - + fn sub_hello_limit(&self, mut sink: SubscriptionSink) -> SubscriptionResult { tokio::spawn(async move { for val in 0..10 { sink.send(&val).unwrap(); sleep(Duration::from_secs(1)).await; } }); + + Ok(()) } } diff --git a/tests/tests/rpc_module.rs b/tests/tests/rpc_module.rs index e96c74254a..b21a8c52ae 100644 --- a/tests/tests/rpc_module.rs +++ b/tests/tests/rpc_module.rs @@ -30,7 +30,7 @@ use std::time::Duration; use futures::StreamExt; use jsonrpsee::core::error::{Error, SubscriptionClosed}; use jsonrpsee::core::server::rpc_module::*; -use jsonrpsee::types::error::{CallError, ErrorCode, ErrorObject}; +use jsonrpsee::types::error::{CallError, ErrorCode, ErrorObject, PARSE_ERROR_CODE}; use jsonrpsee::types::{EmptyParams, Params}; use serde::{Deserialize, Serialize}; use tokio::time::interval; @@ -70,7 +70,7 @@ fn flatten_rpc_modules() { fn rpc_context_modules_can_register_subscriptions() { let cx = (); let mut cxmodule = RpcModule::new(cx); - cxmodule.register_subscription("hi", "hi", "goodbye", |_, _, _| {}).unwrap(); + cxmodule.register_subscription("hi", "hi", "goodbye", |_, _, _| Ok(())).unwrap(); assert!(cxmodule.method("hi").is_some()); assert!(cxmodule.method("goodbye").is_some()); @@ -203,11 +203,8 @@ async fn calling_method_without_server_using_proc_macro() { async fn subscribing_without_server() { let mut module = RpcModule::new(()); module - .register_subscription("my_sub", "my_sub", "my_unsub", |_, pending, _| { - let mut sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; + .register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| { + sink.accept()?; let mut stream_data = vec!['0', '1', '2']; std::thread::spawn(move || { @@ -219,6 +216,7 @@ async fn subscribing_without_server() { let close = ErrorObject::borrowed(0, &"closed successfully", None); sink.close(close.into_owned()); }); + Ok(()) }) .unwrap(); @@ -241,11 +239,8 @@ async fn close_test_subscribing_without_server() { let mut module = RpcModule::new(()); module - .register_subscription("my_sub", "my_sub", "my_unsub", |_, pending, _| { - let mut sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; + .register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| { + sink.accept()?; std::thread::spawn(move || { // make sure to only send one item @@ -259,6 +254,7 @@ async fn close_test_subscribing_without_server() { sink.close(SubscriptionClosed::RemotePeerAborted); } }); + Ok(()) }) .unwrap(); @@ -291,21 +287,19 @@ async fn close_test_subscribing_without_server() { async fn subscribing_without_server_bad_params() { let mut module = RpcModule::new(()); module - .register_subscription("my_sub", "my_sub", "my_unsub", |params, pending, _| { + .register_subscription("my_sub", "my_sub", "my_unsub", |params, mut sink, _| { let p = match params.one::() { Ok(p) => p, Err(e) => { let err: Error = e.into(); - let _ = pending.reject(err); - return; + let _ = sink.reject(err); + return Ok(()); } }; - let mut sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; + sink.accept()?; sink.send(&p).unwrap(); + Ok(()) }) .unwrap(); @@ -320,18 +314,14 @@ async fn subscribing_without_server_bad_params() { async fn subscribe_unsubscribe_without_server() { let mut module = RpcModule::new(()); module - .register_subscription("my_sub", "my_sub", "my_unsub", |_, pending, _| { - let mut sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; - + .register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| { let interval = interval(Duration::from_millis(200)); let stream = IntervalStream::new(interval).map(move |_| 1); tokio::spawn(async move { sink.pipe_from_stream(stream).await; }); + Ok(()) }) .unwrap(); @@ -358,3 +348,84 @@ async fn subscribe_unsubscribe_without_server() { futures::future::join(sub1, sub2).await; } + +#[tokio::test] +async fn empty_subscription_without_server() { + let mut module = RpcModule::new(()); + module + .register_subscription("my_sub", "my_sub", "my_unsub", |_, mut _sink, _| { + // Sink was never accepted or rejected. Expected to return `InvalidParams`. + Ok(()) + }) + .unwrap(); + + let sub_err = module.subscribe("my_sub", EmptyParams::new()).await.unwrap_err(); + assert!( + matches!(sub_err, Error::Call(CallError::Custom(e)) if e.message().contains("Invalid params") && e.code() == ErrorCode::InvalidParams.code()) + ); +} + +#[tokio::test] +async fn rejected_subscription_without_server() { + let mut module = RpcModule::new(()); + module + .register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| { + let err = ErrorObject::borrowed(PARSE_ERROR_CODE, &"rejected", None); + sink.reject(err.into_owned())?; + Ok(()) + }) + .unwrap(); + + let sub_err = module.subscribe("my_sub", EmptyParams::new()).await.unwrap_err(); + assert!( + matches!(sub_err, Error::Call(CallError::Custom(e)) if e.message().contains("rejected") && e.code() == PARSE_ERROR_CODE) + ); +} + +#[tokio::test] +async fn accepted_twice_subscription_without_server() { + let mut module = RpcModule::new(()); + module + .register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| { + let res = sink.accept(); + assert!(matches!(res, Ok(()))); + + let res = sink.accept(); + assert!(matches!(res, Err(_))); + + let err = ErrorObject::borrowed(PARSE_ERROR_CODE, &"rejected", None); + let res = sink.reject(err.into_owned()); + assert!(matches!(res, Err(_))); + + Ok(()) + }) + .unwrap(); + + let _ = module.subscribe("my_sub", EmptyParams::new()).await.expect("Subscription should not fail"); +} + +#[tokio::test] +async fn reject_twice_subscription_without_server() { + let mut module = RpcModule::new(()); + module + .register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| { + let err = ErrorObject::borrowed(PARSE_ERROR_CODE, &"rejected", None); + let res = sink.reject(err.into_owned()); + assert!(matches!(res, Ok(()))); + + let err = ErrorObject::borrowed(PARSE_ERROR_CODE, &"rejected", None); + let res = sink.reject(err.into_owned()); + assert!(matches!(res, Err(_))); + + let res = sink.accept(); + assert!(matches!(res, Err(_))); + + Ok(()) + }) + .unwrap(); + + let sub_err = module.subscribe("my_sub", EmptyParams::new()).await.unwrap_err(); + assert!( + matches!(sub_err, Error::Call(CallError::Custom(e)) if e.message().contains("rejected") && e.code() == PARSE_ERROR_CODE) + ); +} diff --git a/types/src/error.rs b/types/src/error.rs index 4db09994a9..dfb19d4638 100644 --- a/types/src/error.rs +++ b/types/src/error.rs @@ -80,6 +80,51 @@ impl<'a> fmt::Display for ErrorResponse<'a> { } } +/// The return type of the subscription's method for the rpc server implementation. +/// +/// **Note**: The error does not contain any data and is discarded on drop. +pub type SubscriptionResult = Result<(), SubscriptionEmptyError>; + +/// The error returned by the subscription's method for the rpc server implementation. +/// +/// It contains no data, and neither is the error utilized. It provides an abstraction to make the +/// API more ergonomic while handling errors that may occur during the subscription callback. +#[derive(Debug)] +pub struct SubscriptionEmptyError; + +impl From for SubscriptionEmptyError { + fn from(_: anyhow::Error) -> Self { + SubscriptionEmptyError + } +} + +impl From for SubscriptionEmptyError { + fn from(_: CallError) -> Self { + SubscriptionEmptyError + } +} + +impl<'a> From> for SubscriptionEmptyError { + fn from(_: ErrorObject<'a>) -> Self { + SubscriptionEmptyError + } +} + +impl From for SubscriptionEmptyError { + fn from(_: SubscriptionAcceptRejectError) -> Self { + SubscriptionEmptyError + } +} + +/// The error returned while accepting or rejecting a subscription. +#[derive(Debug)] +pub enum SubscriptionAcceptRejectError { + /// The method was already called. + AlreadyCalled, + /// The remote peer closed the connection or called the unsubscribe method. + RemotePeerAborted, +} + /// Owned variant of [`ErrorObject`]. pub type ErrorObjectOwned = ErrorObject<'static>; @@ -156,6 +201,16 @@ impl<'a> From for ErrorObject<'a> { } } +impl<'a> From for ErrorObject<'a> { + fn from(error: CallError) -> Self { + match error { + CallError::InvalidParams(e) => ErrorObject::owned(INVALID_PARAMS_CODE, e.to_string(), None::<()>), + CallError::Failed(e) => ErrorObject::owned(CALL_EXECUTION_FAILED_CODE, e.to_string(), None::<()>), + CallError::Custom(err) => err, + } + } +} + /// Parse error code. pub const PARSE_ERROR_CODE: i32 = -32700; /// Oversized request error code. diff --git a/types/src/lib.rs b/types/src/lib.rs index db81587161..5c7b4e04bd 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -43,7 +43,7 @@ pub mod response; /// JSON-RPC response error object related types. pub mod error; -pub use error::{ErrorObject, ErrorObjectOwned, ErrorResponse}; +pub use error::{ErrorObject, ErrorObjectOwned, ErrorResponse, SubscriptionEmptyError, SubscriptionResult}; pub use params::{Id, Params, ParamsSequence, ParamsSer, SubscriptionId, TwoPointZero}; pub use request::{InvalidRequest, Notification, NotificationSer, Request, RequestSer}; pub use response::{Response, SubscriptionPayload, SubscriptionResponse}; diff --git a/ws-server/src/lib.rs b/ws-server/src/lib.rs index b90ff5adf2..26d58de6ae 100644 --- a/ws-server/src/lib.rs +++ b/ws-server/src/lib.rs @@ -39,7 +39,7 @@ mod server; mod tests; pub use future::{ServerHandle as WsServerHandle, ShutdownWaiter as WsShutdownWaiter}; -pub use jsonrpsee_core::server::rpc_module::{PendingSubscription, RpcModule, SubscriptionSink}; +pub use jsonrpsee_core::server::rpc_module::{RpcModule, SubscriptionSink}; pub use jsonrpsee_core::{id_providers::*, traits::IdProvider}; pub use jsonrpsee_types as types; pub use server::{Builder as WsServerBuilder, Server as WsServer}; diff --git a/ws-server/src/tests.rs b/ws-server/src/tests.rs index eeb0e170fc..a228e0bf1a 100644 --- a/ws-server/src/tests.rs +++ b/ws-server/src/tests.rs @@ -120,15 +120,13 @@ async fn server_with_handles() -> (SocketAddr, ServerHandle) { }) .unwrap(); module - .register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, pending, _| { - let sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; + .register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, mut sink, _| { + sink.accept()?; std::thread::spawn(move || loop { let _ = &sink; std::thread::sleep(std::time::Duration::from_secs(30)); }); + Ok(()) }) .unwrap(); @@ -517,10 +515,12 @@ async fn register_methods_works() { assert!(module.register_method("say_hello", |_, _| Ok("lo")).is_ok()); assert!(module.register_method("say_hello", |_, _| Ok("lo")).is_err()); assert!(module - .register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, _, _| {}) + .register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, _, _| { Ok(()) }) .is_ok()); assert!(module - .register_subscription("subscribe_hello_again", "subscribe_hello_again", "unsubscribe_hello", |_, _, _| {}) + .register_subscription("subscribe_hello_again", "subscribe_hello_again", "unsubscribe_hello", |_, _, _| { + Ok(()) + }) .is_err()); assert!( module.register_method("subscribe_hello_again", |_, _| Ok("lo")).is_ok(), @@ -532,7 +532,7 @@ async fn register_methods_works() { async fn register_same_subscribe_unsubscribe_is_err() { let mut module = RpcModule::new(()); assert!(matches!( - module.register_subscription("subscribe_hello", "subscribe_hello", "subscribe_hello", |_, _, _| {}), + module.register_subscription("subscribe_hello", "subscribe_hello", "subscribe_hello", |_, _, _| { Ok(()) }), Err(Error::SubscriptionNameConflict(_)) )); } @@ -688,15 +688,14 @@ async fn custom_subscription_id_works() { let addr = server.local_addr().unwrap(); let mut module = RpcModule::new(()); module - .register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, pending, _| { - let sink = match pending.accept() { - Some(sink) => sink, - _ => return, - }; + .register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, mut sink, _| { + sink.accept()?; + std::thread::spawn(move || loop { let _ = &sink; std::thread::sleep(std::time::Duration::from_secs(30)); }); + Ok(()) }) .unwrap(); server.start(module).unwrap();