Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: bounded channels and backpressure #962

Merged
merged 68 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
1f2bd33
bounded channels
niklasad1 Dec 5, 2022
cf1b122
remove bounded subscriptions
niklasad1 Dec 7, 2022
5d67a41
remove resource limiting
niklasad1 Dec 7, 2022
cfa48ee
kill connection once message tx fails
niklasad1 Jan 4, 2023
e103908
switch to tokio::mpsc
niklasad1 Jan 5, 2023
44469eb
Merge remote-tracking branch 'origin/master' into na-server-bounded-c…
niklasad1 Jan 5, 2023
b320766
fix nits
niklasad1 Jan 9, 2023
f3a41d9
make futures_channel hard dependency
niklasad1 Jan 9, 2023
69ac4c4
add real backpressure to rx
niklasad1 Jan 9, 2023
d11ec7e
PoC with crossbeam queue
niklasad1 Jan 10, 2023
6ec941d
remove pipe_from_stream
niklasad1 Jan 13, 2023
2599b8f
bring back Pending and SubscriptionSink again
niklasad1 Jan 16, 2023
1254080
more refactoring
niklasad1 Jan 17, 2023
72f0b7d
add example of old APIs
niklasad1 Jan 17, 2023
ede6f4a
introduce opaque SubscriptionMessage
niklasad1 Jan 17, 2023
64e7da6
feat: make subscription callbacks async
niklasad1 Jan 25, 2023
a77a66a
fix tests
niklasad1 Jan 26, 2023
7e306b3
move non-jsonrpc spec types from types
niklasad1 Jan 26, 2023
1bb875e
Merge remote-tracking branch 'origin/master' into na-server-bounded-c…
niklasad1 Jan 26, 2023
a9002c9
fix nits
niklasad1 Jan 26, 2023
f3d19a5
improve docs
niklasad1 Jan 26, 2023
d6c0996
add pipe_from_stream APIs back
niklasad1 Jan 26, 2023
ec35e8f
cleanup
niklasad1 Jan 27, 2023
c6efb43
Update core/src/server/helpers.rs
niklasad1 Jan 27, 2023
a252c07
Update server/src/server.rs
niklasad1 Jan 27, 2023
78298cd
more cleanup
niklasad1 Jan 27, 2023
7214a5d
Merge remote-tracking branch 'origin/na-server-bounded-channels' into…
niklasad1 Jan 27, 2023
1001603
Update core/src/server/helpers.rs
niklasad1 Jan 30, 2023
701ff75
small fixes
niklasad1 Jan 30, 2023
7458334
Merge remote-tracking branch 'origin/master' into na-server-bounded-c…
niklasad1 Jan 31, 2023
f553649
Merge remote-tracking branch 'origin/master' into na-server-bounded-c…
niklasad1 Feb 1, 2023
93623e5
rpc module: add unit test for backpressure
niklasad1 Feb 2, 2023
91a58f8
Merge remote-tracking branch 'origin/master' into na-server-bounded-c…
niklasad1 Feb 2, 2023
f63c3ff
doc fixes
niklasad1 Feb 2, 2023
8fa7172
fix more nits
niklasad1 Feb 2, 2023
6abfcba
refactor: pipe_from_stream
niklasad1 Feb 3, 2023
2227a7b
fix examples: revert unintentional change
niklasad1 Feb 3, 2023
17a98ee
address grumbles
niklasad1 Feb 3, 2023
9a4cc19
revert: don't require subscriptions to return Result
niklasad1 Feb 3, 2023
109e930
Update core/src/server/helpers.rs
niklasad1 Feb 3, 2023
1487383
grumbles: simplify PendingSubscription
niklasad1 Feb 6, 2023
f468f03
grumbles: fix doc nits
niklasad1 Feb 6, 2023
01a5679
remove pipe_from_stream APIs again
niklasad1 Feb 6, 2023
62496e2
add backpressure test for ws server
niklasad1 Feb 7, 2023
dc37776
rpc module: add `send_timeout` APIs
niklasad1 Feb 7, 2023
3c98441
rpc module: add tokio/time
niklasad1 Feb 7, 2023
8e68156
cleanup
niklasad1 Feb 7, 2023
0226258
Update examples/Cargo.toml
niklasad1 Feb 7, 2023
2024cad
Update server/src/server.rs
niklasad1 Feb 7, 2023
898fcd3
Update server/src/server.rs
niklasad1 Feb 7, 2023
d306593
Update server/src/server.rs
niklasad1 Feb 7, 2023
5a03761
extract `build_message` to `SubscriptionMessage`
niklasad1 Feb 7, 2023
332d182
remove resource limiting leftover
niklasad1 Feb 7, 2023
748b9ac
Merge remote-tracking branch 'origin/na-server-bounded-channels' into…
niklasad1 Feb 8, 2023
e2c07e9
Update core/src/server/rpc_module.rs
niklasad1 Feb 8, 2023
935da72
Update examples/examples/ws_pubsub_broadcast.rs
niklasad1 Feb 8, 2023
ade35a7
Update examples/examples/ws_pubsub_broadcast.rs
niklasad1 Feb 8, 2023
bb89d00
revert unintentional change
niklasad1 Feb 8, 2023
4e95865
Merge remote-tracking branch 'origin/na-server-bounded-channels' into…
niklasad1 Feb 8, 2023
5aae651
Update examples/examples/ws_pubsub_with_params.rs
niklasad1 Feb 8, 2023
9b0d2ef
fix more nits
niklasad1 Feb 8, 2023
d25062b
improve SubscriptionEmptyErr
niklasad1 Feb 8, 2023
d3cbf69
clippy --fix
niklasad1 Feb 8, 2023
1f1148b
bring back subscription limit
niklasad1 Feb 8, 2023
fce6d45
server: `set_message_buffer_capacity`
niklasad1 Feb 8, 2023
c141dbf
rpc module: revert raw_json_request API
niklasad1 Feb 8, 2023
1a18f9b
subscribe_bounded -> subscribe
niklasad1 Feb 8, 2023
039a0d1
CallResponse -> CallOrSubscription
niklasad1 Feb 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::
/// Run jsonrpsee WebSocket server for benchmarks.
#[cfg(not(feature = "jsonrpc-crate"))]
pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::server::ServerHandle) {
use jsonrpsee::server::ServerBuilder;
use jsonrpsee::{core::server::rpc_module::SubscriptionMessage, server::ServerBuilder};

let server = ServerBuilder::default()
.max_request_body_size(u32::MAX)
Expand All @@ -146,11 +146,17 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::se
let mut module = gen_rpc_module();

module
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
let x = "Hello";
tokio::spawn(async move { sink.send(&x) });
Ok(())
})
.register_subscription(
SUB_METHOD_NAME,
SUB_METHOD_NAME,
UNSUB_METHOD_NAME,
|_params, pending, _ctx| async move {
let sink = pending.accept().await?;
let msg = SubscriptionMessage::from_json(&"Hello")?;
sink.send(msg).await?;
Ok(())
},
)
.unwrap();

let addr = format!("ws://{}", server.local_addr().unwrap());
Expand Down
4 changes: 2 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ soketto = { version = "0.7.1", optional = true }
parking_lot = { version = "0.12", optional = true }
tokio = { version = "1.16", optional = true }
wasm-bindgen-futures = { version = "0.4.19", optional = true }
futures-channel = { version = "0.3.14", optional = true }
futures-timer = { version = "3", optional = true }
globset = { version = "0.4", optional = true }
http = { version = "0.2.7", optional = true }
Expand All @@ -45,7 +44,8 @@ server = [
"rand",
"tokio/rt",
"tokio/sync",
"futures-channel",
"tokio/macros",
"tokio/time",
]
client = ["futures-util/sink", "tokio/sync"]
async-client = [
Expand Down
2 changes: 1 addition & 1 deletion core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use tracing::instrument;

use super::{generate_batch_id_range, FrontToBack, IdKind, RequestIdManager};

/// Wrapper over a [`oneshot::Receiver`](futures_channel::oneshot::Receiver) that reads
/// Wrapper over a [`oneshot::Receiver`](tokio::sync::oneshot::Receiver) that reads
/// the underlying channel once and then stores the result in String.
/// It is possible that the error is read more than once if several calls are made
/// when the background thread has been terminated.
Expand Down
122 changes: 77 additions & 45 deletions core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
use std::fmt;

use jsonrpsee_types::error::{
CallError, ErrorObject, ErrorObjectOwned, CALL_EXECUTION_FAILED_CODE, INVALID_PARAMS_CODE, SUBSCRIPTION_CLOSED,
UNKNOWN_ERROR_CODE,
CallError, ErrorObject, ErrorObjectOwned, CALL_EXECUTION_FAILED_CODE, INVALID_PARAMS_CODE, UNKNOWN_ERROR_CODE,
};

/// Convenience type for displaying errors.
Expand Down Expand Up @@ -108,21 +107,6 @@ pub enum Error {
/// Access control verification of HTTP headers failed.
#[error("HTTP header: `{0}` value: `{1}` verification failed")]
HttpHeaderRejected(&'static str, String),
/// Failed to execute a method because a resource was already at capacity
#[error("Resource at capacity: {0}")]
ResourceAtCapacity(&'static str),
/// Failed to register a resource due to a name conflict
#[error("Resource name already taken: {0}")]
ResourceNameAlreadyTaken(&'static str),
/// Failed to initialize resources for a method at startup
#[error("Resource name `{0}` not found for method `{1}`")]
ResourceNameNotFoundForMethod(&'static str, &'static str),
/// Trying to claim resources for a method execution, but the method resources have not been initialized
#[error("Method `{0}` has uninitialized resources")]
UninitializedMethod(Box<str>),
/// Failed to register a resource due to a maximum number of resources already registered
#[error("Maximum number of resources reached")]
MaxResourcesReached,
/// Custom error.
#[error("Custom error: {0}")]
Custom(String),
Expand Down Expand Up @@ -161,34 +145,6 @@ impl From<Error> for ErrorObjectOwned {
}
}

/// A type to represent when a subscription gets closed
/// by either the server or client side.
#[derive(Clone, Debug)]
pub enum SubscriptionClosed {
/// The remote peer closed the connection or called the unsubscribe method.
RemotePeerAborted,
/// The subscription was completed successfully by the server.
Success,
/// The subscription failed during execution by the server.
Failed(ErrorObject<'static>),
}

impl From<SubscriptionClosed> for ErrorObjectOwned {
fn from(err: SubscriptionClosed) -> Self {
match err {
SubscriptionClosed::RemotePeerAborted => {
ErrorObject::owned(SUBSCRIPTION_CLOSED, "Subscription was closed by the remote peer", None::<()>)
}
SubscriptionClosed::Success => ErrorObject::owned(
SUBSCRIPTION_CLOSED,
"Subscription was completed by the server successfully",
None::<()>,
),
SubscriptionClosed::Failed(err) => err,
}
}
}

/// Generic transport error.
#[derive(Debug, thiserror::Error)]
pub enum GenericTransportError {
Expand Down Expand Up @@ -229,3 +185,79 @@ impl From<hyper::Error> for Error {
Error::Transport(hyper_err.into())
}
}

/// The error returned by the subscription's method for the rpc server implementation.
///
/// It provides an abstraction to make the API more ergonomic while handling errors
/// that may occur during the subscription callback.
#[derive(Debug)]
pub enum SubscriptionCallbackError {
/// Error cause is propagated by other code or connection related.
None,
/// Some error happened to be logged.
Some(String),
}

// User defined error.
impl From<anyhow::Error> for SubscriptionCallbackError {
fn from(e: anyhow::Error) -> Self {
Self::Some(format!("Other: {}", e.to_string()))
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
}
}

// User defined error.
impl From<Box<dyn std::error::Error>> for SubscriptionCallbackError {
fn from(e: Box<dyn std::error::Error>) -> Self {
Self::Some(format!("Other: {}", e.to_string()))
}
}

impl From<CallError> for SubscriptionCallbackError {
fn from(e: CallError) -> Self {
Self::Some(e.to_string())
}
}

impl From<SubscriptionAcceptRejectError> for SubscriptionCallbackError {
fn from(_: SubscriptionAcceptRejectError) -> Self {
Self::None
}
}

impl From<serde_json::Error> for SubscriptionCallbackError {
fn from(e: serde_json::Error) -> Self {
Self::Some(format!("Failed to parse SubscriptionMessage::from_json: {}", e.to_string()))
}
}

#[cfg(feature = "server")]
impl From<crate::server::rpc_module::TrySendError> for SubscriptionCallbackError {
fn from(e: crate::server::rpc_module::TrySendError) -> Self {
Self::Some(format!("SubscriptionSink::try_send failed: {}", e.to_string()))
}
}

#[cfg(feature = "server")]
impl From<crate::server::rpc_module::DisconnectError> for SubscriptionCallbackError {
fn from(e: crate::server::rpc_module::DisconnectError) -> Self {
Self::Some(format!("SubscriptionSink::send failed: {}", e.to_string()))
}
}

#[cfg(feature = "server")]
impl From<crate::server::rpc_module::SendTimeoutError> for SubscriptionCallbackError {
fn from(e: crate::server::rpc_module::SendTimeoutError) -> Self {
Self::Some(format!("SubscriptionSink::send_timeout failed: {}", e.to_string()))
}
}

/// The error returned while accepting or rejecting a subscription.
#[derive(Debug, Copy, Clone)]
pub enum SubscriptionAcceptRejectError {
/// The method was already called.
AlreadyCalled,
/// The remote peer closed the connection or called the unsubscribe method.
RemotePeerAborted,
/// The subscription response message was too large.
MessageTooLarge,
}
10 changes: 9 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,19 @@ cfg_client! {
/// Shared tracing helpers to trace RPC calls.
pub mod tracing;
pub use async_trait::async_trait;
pub use error::Error;
pub use error::{Error, SubscriptionAcceptRejectError, SubscriptionCallbackError};

/// JSON-RPC result.
pub type RpcResult<T> = std::result::Result<T, Error>;

/// The return type of the subscription's method for the rpc server implementation.
///
/// **Note**: The error does not contain any data and is discarded on drop.
pub type SubscriptionResult = Result<(), SubscriptionCallbackError>;

/// Empty server `RpcParams` type to use while registering modules.
pub type EmptyServerParams = Vec<()>;

/// Re-exports for proc-macro library to not require any additional
/// dependencies to be explicitly added on the client side.
#[doc(hidden)]
Expand Down
Loading