Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc module: remove SubscriptionAnswer #1025

Merged
merged 5 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::se
UNSUB_METHOD_NAME,
|_params, pending, _ctx| async move {
let sink = pending.accept().await?;
let msg = SubscriptionMessage::from_json(&"Hello")?;
let msg = SubscriptionMessage::from("Hello");
sink.send(msg).await?;

Ok(())
},
)
Expand Down
58 changes: 19 additions & 39 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub type AsyncMethod<'a> =
Arc<dyn Send + Sync + Fn(Id<'a>, Params<'a>, ConnectionId, MaxResponseSize) -> BoxFuture<'a, MethodResponse>>;
/// Method callback for subscriptions.
pub type SubscriptionMethod<'a> =
Arc<dyn Send + Sync + Fn(Id, Params, MethodSink, ConnState) -> BoxFuture<'a, SubscriptionAnswered>>;
Arc<dyn Send + Sync + Fn(Id, Params, MethodSink, ConnState) -> BoxFuture<'a, Result<MethodResponse, Id<'a>>>>;
// Method callback to unsubscribe.
type UnsubscriptionMethod = Arc<dyn Send + Sync + Fn(Id, Params, ConnectionId, MaxResponseSize) -> MethodResponse>;

Expand Down Expand Up @@ -97,18 +97,6 @@ impl std::fmt::Display for TrySendError {
}
}

#[derive(Debug, Clone)]
/// Represents whether a subscription was answered or not.
pub enum SubscriptionAnswered {
/// The subscription was already answered and doesn't need to answered again.
/// The response is kept to be logged.
Yes(MethodResponse),
/// The subscription was never answered and needs to be answered.
///
/// This may occur if a subscription dropped without calling `PendingSubscriptionSink::accept` or `PendingSubscriptionSink::reject`.
No(MethodResponse),
}

/// Error that may occur during `MethodSink::send` or `SubscriptionSink::send`.
#[derive(Debug)]
pub struct DisconnectError(pub SubscriptionMessage);
Expand Down Expand Up @@ -198,7 +186,7 @@ type Subscribers = Arc<Mutex<FxHashMap<SubscriptionKey, (MethodSink, mpsc::Recei
pub enum CallOrSubscription {
/// The subscription callback itself sends back the result
/// so it must not be sent back again.
Subscription(SubscriptionAnswered),
Subscription(MethodResponse),

/// Treat it as ordinary call.
Call(MethodResponse),
Expand All @@ -208,21 +196,15 @@ impl CallOrSubscription {
/// Extract the JSON-RPC response.
pub fn as_response(&self) -> &MethodResponse {
match &self {
Self::Subscription(r) => match r {
SubscriptionAnswered::Yes(r) => r,
SubscriptionAnswered::No(r) => r,
},
Self::Subscription(r) => r,
Self::Call(r) => r,
}
}

/// Convert the `CallOrSubscription` to JSON-RPC response.
/// Extract the JSON-RPC response.
pub fn into_response(self) -> MethodResponse {
match self {
Self::Subscription(r) => match r {
SubscriptionAnswered::Yes(r) => r,
SubscriptionAnswered::No(r) => r,
},
Self::Subscription(r) => r,
Self::Call(r) => r,
}
}
Expand All @@ -243,6 +225,8 @@ pub struct SubscriptionMessage(pub(crate) SubscriptionMessageInner);

impl SubscriptionMessage {
/// Create a new subscription message from JSON.
///
/// Fails if the value couldn't be serialized.
pub fn from_json(t: &impl Serialize) -> Result<Self, serde_json::Error> {
serde_json::to_string(t).map(|json| SubscriptionMessage(SubscriptionMessageInner::NeedsData(json)))
}
Expand All @@ -256,6 +240,12 @@ impl SubscriptionMessage {
}
}

impl From<&str> for SubscriptionMessage {
fn from(msg: &str) -> Self {
SubscriptionMessage(SubscriptionMessageInner::NeedsData(format!("\"{msg}\"")))
}
}

/// Represent a unique subscription entry based on [`RpcSubscriptionId`] and [`ConnectionId`].
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct SubscriptionKey {
Expand Down Expand Up @@ -513,18 +503,18 @@ impl Methods {
Some(MethodKind::Subscription(cb)) => {
let conn_state =
ConnState { conn_id: 0, id_provider: &RandomIntegerIdProvider, subscription_permit: p1 };
let res = (cb)(id, params, MethodSink::new(tx.clone()), conn_state).await;
let res = match (cb)(id, params, MethodSink::new(tx.clone()), conn_state).await {
Ok(rp) => rp,
Err(id) => MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError)),
};

// This message is not used because it's used for metrics so we discard in other to
// not read once this is used for subscriptions.
//
// The same information is part of `res` above.
let _ = rx.recv().await.expect("Every call must at least produce one response; qed");

match res {
SubscriptionAnswered::Yes(r) => r,
SubscriptionAnswered::No(r) => r,
}
res
}
Some(MethodKind::Unsubscription(cb)) => (cb)(id, params, 0, usize::MAX),
};
Expand Down Expand Up @@ -883,17 +873,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {

let id = id.clone().into_owned();

let result = async move {
match rx.await {
Ok(r) => SubscriptionAnswered::Yes(r),
Err(_) => {
let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError));
SubscriptionAnswered::No(response)
}
}
};

Box::pin(result)
Box::pin(async move { rx.await.map_err(|_| id) })
})),
)?
};
Expand Down
18 changes: 10 additions & 8 deletions server/src/transport/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use hyper::upgrade::Upgraded;
use jsonrpsee_core::server::helpers::{
prepare_error, BatchResponse, BatchResponseBuilder, BoundedSubscriptions, MethodResponse, MethodSink,
};
use jsonrpsee_core::server::rpc_module::{CallOrSubscription, ConnState, MethodKind, Methods, SubscriptionAnswered};
use jsonrpsee_core::server::rpc_module::{CallOrSubscription, ConnState, MethodKind, Methods};
use jsonrpsee_core::tracing::{rx_log_from_json, tx_log_from_str};
use jsonrpsee_core::traits::IdProvider;
use jsonrpsee_core::{Error, JsonRawValue};
Expand Down Expand Up @@ -224,8 +224,13 @@ pub(crate) async fn execute_call<'a, L: Logger>(req: Request<'a>, call: CallData

if let Some(p) = bounded_subscriptions.acquire() {
let conn_state = ConnState { conn_id, id_provider, subscription_permit: p };
let response = callback(id.clone(), params, sink.clone(), conn_state).await;
CallOrSubscription::Subscription(response)
match callback(id, params, sink.clone(), conn_state).await {
Ok(r) => CallOrSubscription::Subscription(r),
Err(id) => {
let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError));
CallOrSubscription::Call(response)
}
}
} else {
let response =
MethodResponse::error(id, reject_too_many_subscriptions(bounded_subscriptions.max()));
Expand Down Expand Up @@ -376,13 +381,10 @@ pub(crate) async fn background_task<L: Logger>(

if let Some(rp) = process_single_request(data, call).await {
match rp {
CallOrSubscription::Subscription(SubscriptionAnswered::Yes(r)) => {
logger.on_response(&r.result, request_start, TransportProtocol::WebSocket);
}
CallOrSubscription::Subscription(SubscriptionAnswered::No(r)) => {
CallOrSubscription::Subscription(r) => {
logger.on_response(&r.result, request_start, TransportProtocol::WebSocket);
sink_permit.send_raw(r.result);
}

CallOrSubscription::Call(r) => {
logger.on_response(&r.result, request_start, TransportProtocol::WebSocket);
sink_permit.send_raw(r.result);
Expand Down