Skip to content

Commit

Permalink
Return error from subscription callbacks (#799)
Browse files Browse the repository at this point in the history
* subscription: Allow errors in subscription callbacks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* subscription: Remove the need to own the error

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* error: Build `ErrorObject` from `CallError` for improved ergonomics

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update examples for the new subscription API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Add alias for subscription result

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* macros: Render server subscription method with `ResultSubscription`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Port `proc_macro` example to new API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Rename `ResultSubscription` to `ReturnTypeSubscription` to avoid confusion

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Port all tests to new subscription API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update documentation

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Port benches

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Replace tabs with spaces & add documentation

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Add dummy error for subscriptions

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Implement `From` for `SubscriptionError`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Return `SubscriptionError` when parsing params

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Rename `SubscriptionError` to `SubscriptionEmptyError`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Change `accept` signature

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Port tests to new `accept` api

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Implement `pipe_from_try_stream` and `pipe_from_stream` for `PendingSubscription`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Modify examples to ilustrate new API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Fix docs tests

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Rename previously `SubscriptionResult` -> `InnerSubscriptionResult`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Rename `ReturnTypeSubscription` -> `SubscriptionResult`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Remove documentation line

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Implement `PipeFromStreamResult`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Add comment for empty error

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update proc-macros/src/lib.rs

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* Update proc-macros/src/lib.rs

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* Update proc-macros/src/lib.rs

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* Change `ReturnTypeSubscription` -> `SubscriptionResult`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Add `ResultConsumed` for `PipeFromStreamResult`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update examples to use `PipeFromStreamResult`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Replace ConsumedResult with Options

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Log warning when subscription callback fails

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Change ubuntu test names

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* server: Make `pipe` methods of `SubscriptionSink` private

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* server: Remove `pipe_from_stream` method of `SubscriptionSink`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* server: Update PipeFromStreamResult documentation

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Adjust tests to `SubscriptionSink::pipe_from_stream` private interface

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Add `accept-reject` API on `SubscriptionSink`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Make `pipe_from_try_stream` public

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Maybe accept the subscription

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Revert "server: Remove `pipe_from_stream` method of `SubscriptionSink`"

This reverts commit d3c3ce9.

* Make `unsubscribe` channel optional on accepting the connection

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Pass `SubscriptionSink` to subscription callbacks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Implement subscription sink state

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Submit `InvalidParams` if sink was never accepted

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Handle rejected sinks

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Remove `PendingSubscription`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Fix doc tests

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* macro: Make subscription sink mutable

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Fix tests and examples

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* macro: Return `sink.reject()` result

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tests: Add test for `SubscriptionSinkState`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Test internal subscription sink state

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Fix `send_error` to not always return `false`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Fix benches

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Remove `PipeFromStreamResult`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Use valid Json-RPC return code for test errors

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Remove `SubscriptionSinkState`"

* Remodel state machine using `Option`s for `SubscriptionSink`s

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tests: Double accept / reject API for `SubscriptionSink`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Implement `SubscriptionAcceptRejectError` for error propagation

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Remove `maybe_accept` wrapper

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update comments and documentation

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update core/src/server/rpc_module.rs

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* Update core/src/server/rpc_module.rs

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* rpc_server: Add type alias for unsubscription calls

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc_server: Improve comment regarding dropped error

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* style: Single line return errors

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Make comment more verbose

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
Co-authored-by: James Wilson <james@jsdw.me>
  • Loading branch information
3 people authored Jun 29, 2022
1 parent a35f8c3 commit 98c23fc
Show file tree
Hide file tree
Showing 22 changed files with 408 additions and 314 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ jobs:
run: cargo hack check --workspace --each-feature --all-targets

tests_ubuntu:
name: Run nextests on Ubuntu
name: Run tests Ubuntu
runs-on: ubuntu-latest
steps:
- name: Checkout sources
Expand Down
7 changes: 2 additions & 5 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,10 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::ws
let mut module = gen_rpc_module();

module
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, pending, _ctx| {
let mut sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
let x = "Hello";
tokio::spawn(async move { sink.send(&x) });
Ok(())
})
.unwrap();

Expand Down
5 changes: 3 additions & 2 deletions core/src/server/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,10 @@ impl MethodSink {

if let Err(err) = self.send_raw(json) {
tracing::warn!("Error sending response {:?}", err);
false
} else {
true
}

false
}

/// Helper for sending the general purpose `Error` as a JSON-RPC errors to the client
Expand Down
207 changes: 102 additions & 105 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ use futures_channel::mpsc;
use futures_util::future::Either;
use futures_util::pin_mut;
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt, TryStream, TryStreamExt};
use jsonrpsee_types::error::{CallError, ErrorCode, ErrorObject, ErrorObjectOwned, SUBSCRIPTION_CLOSED_WITH_ERROR};
use jsonrpsee_types::error::{
CallError, ErrorCode, ErrorObject, ErrorObjectOwned, INTERNAL_ERROR_CODE,
SUBSCRIPTION_CLOSED_WITH_ERROR, SubscriptionAcceptRejectError
};
use jsonrpsee_types::response::{SubscriptionError, SubscriptionPayloadError};
use jsonrpsee_types::{
ErrorResponse, Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload,
SubscriptionResponse,
ErrorResponse, Id, Params, Request, Response, SubscriptionResult,
SubscriptionId as RpcSubscriptionId, SubscriptionPayload, SubscriptionResponse
};
use parking_lot::Mutex;
use rustc_hash::FxHashMap;
Expand Down Expand Up @@ -87,7 +90,7 @@ pub struct ConnState<'a> {

/// Outcome of a successful terminated subscription.
#[derive(Debug)]
pub enum SubscriptionResult {
pub enum InnerSubscriptionResult {
/// The subscription stream was executed successfully.
Success,
/// The subscription was aborted by the remote peer.
Expand Down Expand Up @@ -382,8 +385,9 @@ impl Methods {
/// use futures_util::StreamExt;
///
/// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, pending, _| {
/// pending.accept().unwrap().send(&"one answer").unwrap();
/// module.register_subscription("hi", "hi", "goodbye", |_, mut sink, _| {
/// sink.send(&"one answer").unwrap();
/// Ok(())
/// }).unwrap();
/// let (resp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"hi","id":0}"#).await.unwrap();
/// let resp = serde_json::from_str::<Response<u64>>(&resp).unwrap();
Expand Down Expand Up @@ -443,8 +447,9 @@ impl Methods {
/// use jsonrpsee::{RpcModule, types::EmptyParams};
///
/// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, pending, _| {
/// pending.accept().unwrap().send(&"one answer").unwrap();
/// module.register_subscription("hi", "hi", "goodbye", |_, mut sink, _| {
/// sink.send(&"one answer").unwrap();
/// Ok(())
/// }).unwrap();
///
/// let mut sub = module.subscribe("hi", EmptyParams::new()).await.unwrap();
Expand Down Expand Up @@ -653,27 +658,22 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
/// use jsonrpsee_core::Error;
///
/// let mut ctx = RpcModule::new(99_usize);
/// ctx.register_subscription("sub", "notif_name", "unsub", |params, pending, ctx| {
/// ctx.register_subscription("sub", "notif_name", "unsub", |params, mut sink, ctx| {
/// let x = match params.one::<usize>() {
/// Ok(x) => x,
/// Err(e) => {
/// let err: Error = e.into();
/// pending.reject(err);
/// return;
/// }
/// };
///
/// let mut sink = match pending.accept() {
/// Some(sink) => sink,
/// _ => {
/// return;
/// sink.reject(err);
/// return Ok(());
/// }
/// };
///
/// // Sink is accepted on the first `send` call.
/// std::thread::spawn(move || {
/// let sum = x + (*ctx);
/// let _ = sink.send(&sum);
/// });
///
/// Ok(())
/// });
/// ```
pub fn register_subscription<F>(
Expand All @@ -685,7 +685,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
) -> Result<MethodResourcesBuilder, Error>
where
Context: Send + Sync + 'static,
F: Fn(Params, PendingSubscription, Arc<Context>) + Send + Sync + 'static,
F: Fn(Params, SubscriptionSink, Arc<Context>) -> SubscriptionResult + Send + Sync + 'static,
{
if subscribe_method_name == unsubscribe_method_name {
return Err(Error::SubscriptionNameConflict(subscribe_method_name.into()));
Expand Down Expand Up @@ -740,17 +740,21 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn, claimed| {
let sub_id: RpcSubscriptionId = conn.id_provider.next_id();

let sink = PendingSubscription(Some(InnerPendingSubscription {
sink: method_sink.clone(),
let sink = SubscriptionSink {
inner: method_sink.clone(),
close_notify: Some(conn.close_notify),
method: notif_method_name,
subscribers: subscribers.clone(),
uniq_sub: SubscriptionKey { conn_id: conn.conn_id, sub_id },
id: id.clone().into_owned(),
claimed,
}));
id: Some(id.clone().into_owned()),
unsubscribe: None,
_claimed: claimed,
};

callback(params, sink, ctx.clone());
// The callback returns a `SubscriptionResult` for better ergonomics and is not propagated further.
if let Err(_) = callback(params, sink, ctx.clone()) {
tracing::warn!("subscribe call `{}` failed", subscribe_method_name);
}

true
})),
Expand All @@ -775,101 +779,75 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
}
}

/// Represent a pending subscription which waits to be accepted or rejected.
///
/// Note: you need to call either `PendingSubscription::accept` or `PendingSubscription::reject` otherwise
/// the subscription will be dropped with an `InvalidParams` error.
/// Returns once the unsubscribe method has been called.
type UnsubscribeCall = Option<watch::Receiver<()>>;

/// Represents a single subscription.
#[derive(Debug)]
struct InnerPendingSubscription {
pub struct SubscriptionSink {
/// Sink.
sink: MethodSink,
inner: MethodSink,
/// Get notified when subscribers leave so we can exit
close_notify: Option<SubscriptionPermit>,
/// MethodCallback.
method: &'static str,
/// Shared Mutex of subscriptions for this method.
subscribers: Subscribers,
/// Unique subscription.
uniq_sub: SubscriptionKey,
/// Shared Mutex of subscriptions
subscribers: Subscribers,
/// Request ID.
id: Id<'static>,
/// Id of the `subscription call` (i.e. not the same as subscription id) which is used
/// to reply to subscription method call and must only be used once.
///
/// *Note*: Having some value means the subscription was not accepted or rejected yet.
id: Option<Id<'static>>,
/// Having some value means the subscription was accepted.
unsubscribe: UnsubscribeCall,
/// Claimed resources.
claimed: Option<ResourceGuard>,
_claimed: Option<ResourceGuard>,
}

/// Represent a pending subscription which waits until it's either accepted or rejected.
///
/// This type implements `Drop` for ease of use, e.g. when dropped in error short circuiting via `map_err()?`.
#[derive(Debug)]
pub struct PendingSubscription(Option<InnerPendingSubscription>);

impl PendingSubscription {
impl SubscriptionSink {
/// Reject the subscription call from [`ErrorObject`].
pub fn reject(mut self, err: impl Into<ErrorObjectOwned>) -> bool {
if let Some(inner) = self.0.take() {
let InnerPendingSubscription { sink, id, .. } = inner;
sink.send_error(id, err.into())
pub fn reject(&mut self, err: impl Into<ErrorObjectOwned>) -> Result<(), SubscriptionAcceptRejectError> {
let id = self.id.take().ok_or(SubscriptionAcceptRejectError::AlreadyCalled)?;

if self.inner.send_error(id, err.into()) {
Ok(())
} else {
false
Err(SubscriptionAcceptRejectError::RemotePeerAborted)
}
}

/// Attempt to accept the subscription and respond the subscription method call.
///
/// Fails if the connection was closed
pub fn accept(mut self) -> Option<SubscriptionSink> {
let inner = self.0.take()?;

let InnerPendingSubscription { sink, close_notify, method, uniq_sub, subscribers, id, claimed } = inner;
/// Fails if the connection was closed, or if called multiple times.
pub fn accept(&mut self) -> Result<(), SubscriptionAcceptRejectError> {
let id = self.id.take().ok_or(SubscriptionAcceptRejectError::AlreadyCalled)?;

if sink.send_response(id, &uniq_sub.sub_id) {
if self.inner.send_response(id, &self.uniq_sub.sub_id) {
let (tx, rx) = watch::channel(());
subscribers.lock().insert(uniq_sub.clone(), (sink.clone(), tx));
Some(SubscriptionSink { inner: sink, close_notify, method, uniq_sub, subscribers, unsubscribe: rx, _claimed: claimed })
self.subscribers.lock().insert(self.uniq_sub.clone(), (self.inner.clone(), tx));
self.unsubscribe = Some(rx);
Ok(())
} else {
None
Err(SubscriptionAcceptRejectError::RemotePeerAborted)
}
}
}

// When dropped it returns an [`InvalidParams`] error to the subscriber
impl Drop for PendingSubscription {
fn drop(&mut self) {
if let Some(inner) = self.0.take() {
let InnerPendingSubscription { sink, id, .. } = inner;
sink.send_error(id, ErrorCode::InvalidParams.into());
}
}
}

/// Represents a single subscription.
#[derive(Debug)]
pub struct SubscriptionSink {
/// Sink.
inner: MethodSink,
/// Get notified when subscribers leave so we can exit
close_notify: Option<SubscriptionPermit>,
/// MethodCallback.
method: &'static str,
/// Unique subscription.
uniq_sub: SubscriptionKey,
/// Shared Mutex of subscriptions for this method.
subscribers: Subscribers,
/// Future that returns when the unsubscribe method has been called.
unsubscribe: watch::Receiver<()>,
/// Claimed resources.
_claimed: Option<ResourceGuard>,
}

impl SubscriptionSink {
/// Send a message back to subscribers.
///
/// Returns `Ok(true)` if the message could be send
/// Returns `Ok(false)` if the sink was closed (either because the subscription was closed or the connection was terminated)
/// Return `Err(err)` if the message could not be serialized.
///
/// Returns
/// - `Ok(true)` if the message could be send.
/// - `Ok(false)` if the sink was closed (either because the subscription was closed or the connection was terminated),
/// or the subscription could not be accepted.
/// - `Err(err)` if the message could not be serialized.
pub fn send<T: Serialize>(&mut self, result: &T) -> Result<bool, serde_json::Error> {
// only possible to trigger when the connection is dropped.
// Cannot accept the subscription.
if let Err(SubscriptionAcceptRejectError::RemotePeerAborted) = self.accept() {
return Ok(false);
}

// Only possible to trigger when the connection is dropped.
if self.is_closed() {
return Ok(false);
}
Expand All @@ -889,14 +867,13 @@ impl SubscriptionSink {
///
/// ```no_run
///
/// use jsonrpsee_core::server::rpc_module::{RpcModule, SubscriptionResult};
/// use jsonrpsee_core::server::rpc_module::RpcModule;
/// use jsonrpsee_core::error::{Error, SubscriptionClosed};
/// use jsonrpsee_types::ErrorObjectOwned;
/// use anyhow::anyhow;
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
/// let mut sink = pending.accept().unwrap();
/// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
/// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
/// // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber
/// // because after the `Err(_)` the stream is terminated.
Expand All @@ -915,6 +892,7 @@ impl SubscriptionSink {
/// }
/// }
/// });
/// Ok(())
/// });
/// ```
pub async fn pipe_from_try_stream<S, T, E>(&mut self, mut stream: S) -> SubscriptionClosed
Expand All @@ -923,14 +901,24 @@ impl SubscriptionSink {
T: Serialize,
E: std::fmt::Display,
{
if let Err(SubscriptionAcceptRejectError::RemotePeerAborted) = self.accept() {
return SubscriptionClosed::RemotePeerAborted;
}

let conn_closed = match self.close_notify.as_ref().map(|cn| cn.handle()) {
Some(cn) => cn,
None => {
return SubscriptionClosed::RemotePeerAborted;
}
None => return SubscriptionClosed::RemotePeerAborted,
};

let mut sub_closed = match self.unsubscribe.as_ref() {
Some(rx) => rx.clone(),
_ => return SubscriptionClosed::Failed(ErrorObject::owned(
INTERNAL_ERROR_CODE,
"Unsubscribe watcher not set after accepting the subscription".to_string(),
None::<()>
)),
};

let mut sub_closed = self.unsubscribe.clone();
let sub_closed_fut = sub_closed.changed();

let conn_closed_fut = conn_closed.notified();
Expand Down Expand Up @@ -983,10 +971,10 @@ impl SubscriptionSink {
/// use jsonrpsee_core::server::rpc_module::RpcModule;
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
/// let mut sink = pending.accept().unwrap();
/// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
/// let stream = futures_util::stream::iter(vec![1_usize, 2, 3]);
/// tokio::spawn(async move { sink.pipe_from_stream(stream).await; });
/// Ok(())
/// });
/// ```
pub async fn pipe_from_stream<S, T>(&mut self, stream: S) -> SubscriptionClosed
Expand All @@ -1003,7 +991,10 @@ impl SubscriptionSink {
}

fn is_active_subscription(&self) -> bool {
!self.unsubscribe.has_changed().is_err()
match self.unsubscribe.as_ref() {
Some(unsubscribe) => !unsubscribe.has_changed().is_err(),
_ => false,
}
}

fn build_message<T: Serialize>(&self, result: &T) -> Result<String, serde_json::Error> {
Expand Down Expand Up @@ -1057,7 +1048,13 @@ impl SubscriptionSink {

impl Drop for SubscriptionSink {
fn drop(&mut self) {
if self.is_active_subscription() {
if let Some(id) = self.id.take() {
// Subscription was never accepted / rejected. As such,
// we default to assuming that the params were invalid,
// because that's how the previous PendingSubscription logic
// worked.
self.inner.send_error(id, ErrorCode::InvalidParams.into());
} else if self.is_active_subscription() {
self.subscribers.lock().remove(&self.uniq_sub);
}
}
Expand Down
Loading

1 comment on commit 98c23fc

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 98c23fc Previous: a35f8c3 Ratio
subscriptions/subscribe_response 9791 ns/iter (± 2341) 4601 ns/iter (± 632) 2.13

This comment was automatically generated by workflow using github-action-benchmark.

CC: @niklasad1

Please sign in to comment.