Skip to content

Commit

Permalink
rpc module: remove SubscriptionAnswer (#1025)
Browse files Browse the repository at this point in the history
* rpc module: remove `SubscriptionAnswer`

This abstraction is not very nice and I think it's more readable with Result<MethodResult, Id>
anyway so let's remove it.

* Update server/src/transport/ws.rs
  • Loading branch information
niklasad1 authored Feb 22, 2023
1 parent b4e3b6e commit afe40df
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 50 deletions.
3 changes: 2 additions & 1 deletion benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::se
UNSUB_METHOD_NAME,
|_params, pending, _ctx| async move {
let sink = pending.accept().await?;
let msg = SubscriptionMessage::from_json(&"Hello")?;
let msg = SubscriptionMessage::from("Hello");
sink.send(msg).await?;

Ok(())
},
)
Expand Down
58 changes: 19 additions & 39 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub type AsyncMethod<'a> =
Arc<dyn Send + Sync + Fn(Id<'a>, Params<'a>, ConnectionId, MaxResponseSize) -> BoxFuture<'a, MethodResponse>>;
/// Method callback for subscriptions.
pub type SubscriptionMethod<'a> =
Arc<dyn Send + Sync + Fn(Id, Params, MethodSink, ConnState) -> BoxFuture<'a, SubscriptionAnswered>>;
Arc<dyn Send + Sync + Fn(Id, Params, MethodSink, ConnState) -> BoxFuture<'a, Result<MethodResponse, Id<'a>>>>;
// Method callback to unsubscribe.
type UnsubscriptionMethod = Arc<dyn Send + Sync + Fn(Id, Params, ConnectionId, MaxResponseSize) -> MethodResponse>;

Expand Down Expand Up @@ -97,18 +97,6 @@ impl std::fmt::Display for TrySendError {
}
}

#[derive(Debug, Clone)]
/// Represents whether a subscription was answered or not.
pub enum SubscriptionAnswered {
/// The subscription was already answered and doesn't need to answered again.
/// The response is kept to be logged.
Yes(MethodResponse),
/// The subscription was never answered and needs to be answered.
///
/// This may occur if a subscription dropped without calling `PendingSubscriptionSink::accept` or `PendingSubscriptionSink::reject`.
No(MethodResponse),
}

/// Error that may occur during `MethodSink::send` or `SubscriptionSink::send`.
#[derive(Debug)]
pub struct DisconnectError(pub SubscriptionMessage);
Expand Down Expand Up @@ -198,7 +186,7 @@ type Subscribers = Arc<Mutex<FxHashMap<SubscriptionKey, (MethodSink, mpsc::Recei
pub enum CallOrSubscription {
/// The subscription callback itself sends back the result
/// so it must not be sent back again.
Subscription(SubscriptionAnswered),
Subscription(MethodResponse),

/// Treat it as ordinary call.
Call(MethodResponse),
Expand All @@ -208,21 +196,15 @@ impl CallOrSubscription {
/// Extract the JSON-RPC response.
pub fn as_response(&self) -> &MethodResponse {
match &self {
Self::Subscription(r) => match r {
SubscriptionAnswered::Yes(r) => r,
SubscriptionAnswered::No(r) => r,
},
Self::Subscription(r) => r,
Self::Call(r) => r,
}
}

/// Convert the `CallOrSubscription` to JSON-RPC response.
/// Extract the JSON-RPC response.
pub fn into_response(self) -> MethodResponse {
match self {
Self::Subscription(r) => match r {
SubscriptionAnswered::Yes(r) => r,
SubscriptionAnswered::No(r) => r,
},
Self::Subscription(r) => r,
Self::Call(r) => r,
}
}
Expand All @@ -243,6 +225,8 @@ pub struct SubscriptionMessage(pub(crate) SubscriptionMessageInner);

impl SubscriptionMessage {
/// Create a new subscription message from JSON.
///
/// Fails if the value couldn't be serialized.
pub fn from_json(t: &impl Serialize) -> Result<Self, serde_json::Error> {
serde_json::to_string(t).map(|json| SubscriptionMessage(SubscriptionMessageInner::NeedsData(json)))
}
Expand All @@ -256,6 +240,12 @@ impl SubscriptionMessage {
}
}

impl From<&str> for SubscriptionMessage {
fn from(msg: &str) -> Self {
SubscriptionMessage(SubscriptionMessageInner::NeedsData(format!("\"{msg}\"")))
}
}

/// Represent a unique subscription entry based on [`RpcSubscriptionId`] and [`ConnectionId`].
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct SubscriptionKey {
Expand Down Expand Up @@ -485,18 +475,18 @@ impl Methods {
Some(MethodCallback::Subscription(cb)) => {
let conn_state =
ConnState { conn_id: 0, id_provider: &RandomIntegerIdProvider, subscription_permit: p1 };
let res = (cb)(id, params, MethodSink::new(tx.clone()), conn_state).await;
let res = match (cb)(id, params, MethodSink::new(tx.clone()), conn_state).await {
Ok(rp) => rp,
Err(id) => MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError)),
};

// This message is not used because it's used for metrics so we discard in other to
// not read once this is used for subscriptions.
//
// The same information is part of `res` above.
let _ = rx.recv().await.expect("Every call must at least produce one response; qed");

match res {
SubscriptionAnswered::Yes(r) => r,
SubscriptionAnswered::No(r) => r,
}
res
}
Some(MethodCallback::Unsubscription(cb)) => (cb)(id, params, 0, usize::MAX),
};
Expand Down Expand Up @@ -855,17 +845,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {

let id = id.clone().into_owned();

let result = async move {
match rx.await {
Ok(r) => SubscriptionAnswered::Yes(r),
Err(_) => {
let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError));
SubscriptionAnswered::No(response)
}
}
};

Box::pin(result)
Box::pin(async move { rx.await.map_err(|_| id) })
})),
)?
};
Expand Down
20 changes: 10 additions & 10 deletions server/src/transport/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ use hyper::upgrade::Upgraded;
use jsonrpsee_core::server::helpers::{
batch_response_error, prepare_error, BatchResponseBuilder, BoundedSubscriptions, MethodResponse, MethodSink,
};
use jsonrpsee_core::server::rpc_module::{
CallOrSubscription, ConnState, MethodCallback, Methods, SubscriptionAnswered,
};
use jsonrpsee_core::server::rpc_module::{CallOrSubscription, ConnState, MethodCallback, Methods};
use jsonrpsee_core::tracing::{rx_log_from_json, tx_log_from_str};
use jsonrpsee_core::traits::IdProvider;
use jsonrpsee_core::{Error, JsonRawValue};
Expand Down Expand Up @@ -226,8 +224,13 @@ pub(crate) async fn execute_call<'a, L: Logger>(req: Request<'a>, call: CallData

if let Some(p) = bounded_subscriptions.acquire() {
let conn_state = ConnState { conn_id, id_provider, subscription_permit: p };
let response = callback(id.clone(), params, sink.clone(), conn_state).await;
CallOrSubscription::Subscription(response)
match callback(id, params, sink.clone(), conn_state).await {
Ok(r) => CallOrSubscription::Subscription(r),
Err(id) => {
let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError));
CallOrSubscription::Call(response)
}
}
} else {
let response =
MethodResponse::error(id, reject_too_many_subscriptions(bounded_subscriptions.max()));
Expand Down Expand Up @@ -378,13 +381,10 @@ pub(crate) async fn background_task<L: Logger>(

if let Some(rp) = process_single_request(data, call).await {
match rp {
CallOrSubscription::Subscription(SubscriptionAnswered::Yes(r)) => {
CallOrSubscription::Subscription(r) => {
logger.on_response(&r.result, request_start, TransportProtocol::WebSocket);
}
CallOrSubscription::Subscription(SubscriptionAnswered::No(r)) => {
logger.on_response(&r.result, request_start, TransportProtocol::WebSocket);
sink_permit.send_raw(r.result);
}

CallOrSubscription::Call(r) => {
logger.on_response(&r.result, request_start, TransportProtocol::WebSocket);
sink_permit.send_raw(r.result);
Expand Down

0 comments on commit afe40df

Please sign in to comment.