Skip to content

Commit

Permalink
feat: make it possible to override method name in subscriptions (#568)
Browse files Browse the repository at this point in the history
* feat: override `method` subscription notif

* Arrow syntax for overwrites (#569)

* check that unique notifs are used

* check that custom sub name is unique

* cargo fmt

* address grumbles

* Update proc-macros/src/rpc_macro.rs

* commit added tests

* Update proc-macros/src/render_server.rs

Co-authored-by: David <dvdplm@gmail.com>

* Update proc-macros/src/render_server.rs

Co-authored-by: David <dvdplm@gmail.com>

* Update proc-macros/src/rpc_macro.rs

Co-authored-by: David <dvdplm@gmail.com>

* Update proc-macros/src/rpc_macro.rs

Co-authored-by: David <dvdplm@gmail.com>

* Update utils/src/server/rpc_module.rs

Co-authored-by: David <dvdplm@gmail.com>

* grumbles

* fix long lines

* Update utils/src/server/rpc_module.rs

Co-authored-by: David <dvdplm@gmail.com>

* Update utils/src/server/rpc_module.rs

Co-authored-by: David <dvdplm@gmail.com>

* Update proc-macros/src/rpc_macro.rs

Co-authored-by: David <dvdplm@gmail.com>

* Update proc-macros/src/render_server.rs

Co-authored-by: David <dvdplm@gmail.com>

* Update proc-macros/src/render_server.rs

Co-authored-by: David <dvdplm@gmail.com>

* more grumbles

Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>
Co-authored-by: David <dvdplm@gmail.com>
  • Loading branch information
3 people authored Nov 19, 2021
1 parent 0e46b5c commit 9c6fd4b
Show file tree
Hide file tree
Showing 17 changed files with 167 additions and 45 deletions.
2 changes: 1 addition & 1 deletion benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::ws
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| async { Ok("lo") }).unwrap();
module
.register_subscription(SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
let x = "Hello";
tokio::spawn(async move { sink.send(&x) });
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion examples/proc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
async fn storage_keys(&self, storage_key: StorageKey, hash: Option<Hash>) -> Result<Vec<StorageKey>, Error>;

/// Subscription that takes a `StorageKey` as input and produces a `Vec<Hash>`.
#[subscription(name = "subscribeStorage", item = Vec<Hash>)]
#[subscription(name = "subscribeStorage" => "override", item = Vec<Hash>)]
fn subscribe_storage(&self, keys: Option<Vec<StorageKey>>) -> Result<(), Error>;
}

Expand Down
4 changes: 2 additions & 2 deletions examples/ws_sub_with_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module
.register_subscription("sub_one_param", "unsub_one_param", |params, mut sink, _| {
.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, mut sink, _| {
let idx: usize = params.one()?;
std::thread::spawn(move || loop {
let _ = sink.send(&LETTERS.chars().nth(idx));
Expand All @@ -72,7 +72,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
})
.unwrap();
module
.register_subscription("sub_params_two", "unsub_params_two", |params, mut sink, _| {
.register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, mut sink, _| {
let (one, two): (usize, usize) = params.parse()?;
std::thread::spawn(move || loop {
let _ = sink.send(&LETTERS[one..two].to_string());
Expand Down
2 changes: 1 addition & 1 deletion examples/ws_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn main() -> anyhow::Result<()> {
async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_subscription("subscribe_hello", "unsubscribe_hello", |_, mut sink, _| {
module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", |_, mut sink, _| {
std::thread::spawn(move || loop {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&"hello my friend") {
return;
Expand Down
39 changes: 31 additions & 8 deletions proc-macros/src/attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use proc_macro2::{Span, TokenStream as TokenStream2, TokenTree};
use std::{fmt, iter};
use syn::parse::{Parse, ParseStream, Parser};
use syn::punctuated::Punctuated;
use syn::{spanned::Spanned, Attribute, Error, Token};
use syn::{spanned::Spanned, Attribute, Error, LitInt, LitStr, Token};

pub(crate) struct AttributeMeta {
pub path: syn::Path,
Expand All @@ -48,15 +48,22 @@ pub enum ParamKind {

#[derive(Debug, Clone)]
pub struct Resource {
pub name: syn::LitStr,
pub name: LitStr,
pub assign: Token![=],
pub value: syn::LitInt,
pub value: LitInt,
}

pub struct Aliases {
pub list: Punctuated<syn::LitStr, Token![,]>,
pub struct NameMapping {
pub name: String,
pub mapped: Option<String>,
}

pub struct Bracketed<T> {
pub list: Punctuated<T, Token![,]>,
}

pub type Aliases = Bracketed<LitStr>;

impl Parse for Argument {
fn parse(input: ParseStream) -> syn::Result<Self> {
let label = input.parse()?;
Expand Down Expand Up @@ -91,15 +98,31 @@ impl Parse for Resource {
}
}

impl Parse for Aliases {
impl Parse for NameMapping {
fn parse(input: ParseStream) -> syn::Result<Self> {
let name = input.parse::<LitStr>()?.value();

let mapped = if input.peek(Token![=>]) {
input.parse::<Token![=>]>()?;

Some(input.parse::<LitStr>()?.value())
} else {
None
};

Ok(NameMapping { name, mapped })
}
}

impl<T: Parse> Parse for Bracketed<T> {
fn parse(input: ParseStream) -> syn::Result<Self> {
let content;

syn::bracketed!(content in input);

let list = content.parse_terminated(Parse::parse)?;

Ok(Aliases { list })
Ok(Bracketed { list })
}
}

Expand Down Expand Up @@ -201,7 +224,7 @@ impl Argument {

/// Asserts that the argument is `key = "string"` and gets the value of the string
pub fn string(self) -> syn::Result<String> {
self.value::<syn::LitStr>().map(|lit| lit.value())
self.value::<LitStr>().map(|lit| lit.value())
}
}

Expand Down
12 changes: 11 additions & 1 deletion proc-macros/src/render_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ impl RpcDescription {
let rust_method_name = &sub.signature.sig.ident;
// Name of the RPC method to subscribe to (e.g. `foo_sub`).
let rpc_sub_name = self.rpc_identifier(&sub.name);
// Name of `method` in the subscription response.
let rpc_notif_name_override = sub.notif_name_override.as_ref().map(|m| self.rpc_identifier(m));
// Name of the RPC method to unsubscribe (e.g. `foo_sub`).
let rpc_unsub_name = self.rpc_identifier(&sub.unsubscribe);
// `parsing` is the code associated with parsing structure from the
Expand All @@ -184,8 +186,16 @@ impl RpcDescription {
check_name(&rpc_sub_name, rust_method_name.span());
check_name(&rpc_unsub_name, rust_method_name.span());

let rpc_notif_name = match rpc_notif_name_override {
Some(notif) => {
check_name(&notif, rust_method_name.span());
notif
}
None => rpc_sub_name.clone(),
};

handle_register_result(quote! {
rpc.register_subscription(#rpc_sub_name, #rpc_unsub_name, |params, sink, context| {
rpc.register_subscription(#rpc_sub_name, #rpc_notif_name, #rpc_unsub_name, |params, sink, context| {
#parsing
context.as_ref().#rust_method_name(sink, #params_seq)
})
Expand Down
28 changes: 25 additions & 3 deletions proc-macros/src/rpc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
//! Declaration of the JSON RPC generator procedural macros.
use crate::{
attributes::{optional, parse_param_kind, Aliases, Argument, AttributeMeta, MissingArgument, ParamKind, Resource},
attributes::{
optional, parse_param_kind, Aliases, Argument, AttributeMeta, MissingArgument, NameMapping, ParamKind, Resource,
},
helpers::extract_doc_comments,
};

Expand Down Expand Up @@ -95,6 +97,13 @@ impl RpcMethod {
#[derive(Debug, Clone)]
pub struct RpcSubscription {
pub name: String,
/// When subscribing to an RPC, users can override the content of the `method` field
/// in the JSON data sent to subscribers.
/// Each subscription thus has one method name to set up the subscription,
/// one to unsubscribe and, optionally, a third method name used to describe the
/// payload (aka "notification") sent back from the server to subscribers.
/// If no override is provided, the subscription method name is used.
pub notif_name_override: Option<String>,
pub docs: TokenStream2,
pub unsubscribe: String,
pub params: Vec<(syn::PatIdent, syn::Type)>,
Expand All @@ -111,7 +120,9 @@ impl RpcSubscription {
AttributeMeta::parse(attr)?.retain(["aliases", "item", "name", "param_kind", "unsubscribe_aliases"])?;

let aliases = parse_aliases(aliases)?;
let name = name?.string()?;
let map = name?.value::<NameMapping>()?;
let name = map.name;
let notif_name_override = map.mapped;
let item = item?.value()?;
let param_kind = parse_param_kind(param_kind)?;
let unsubscribe_aliases = parse_aliases(unsubscribe_aliases)?;
Expand All @@ -135,7 +146,18 @@ impl RpcSubscription {
// We've analyzed attributes and don't need them anymore.
sub.attrs.clear();

Ok(Self { name, unsubscribe, unsubscribe_aliases, params, param_kind, item, signature: sub, aliases, docs })
Ok(Self {
name,
notif_name_override,
unsubscribe,
unsubscribe_aliases,
params,
param_kind,
item,
signature: sub,
aliases,
docs,
})
}
}

Expand Down
13 changes: 13 additions & 0 deletions proc-macros/tests/ui/correct/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ pub trait Rpc {

#[subscription(name = "echo", aliases = ["ECHO"], item = u32, unsubscribe_aliases = ["NotInterested", "listenNoMore"])]
fn sub_with_params(&self, val: u32) -> RpcResult<()>;

// This will send data to subscribers with the `method` field in the JSON payload set to `foo_subscribe_override`
// because it's in the `foo` namespace.
#[subscription(name = "subscribe_method" => "subscribe_override", item = u32)]
fn sub_with_override_notif_method(&self) -> RpcResult<()>;
}

pub struct RpcServerImpl;
Expand Down Expand Up @@ -68,6 +73,10 @@ impl RpcServer for RpcServerImpl {
sink.send(&val)?;
sink.send(&val)
}

fn sub_with_override_notif_method(&self, mut sink: SubscriptionSink) -> RpcResult<()> {
sink.send(&1)
}
}

pub async fn websocket_server() -> SocketAddr {
Expand Down Expand Up @@ -102,4 +111,8 @@ async fn main() {
assert_eq!(first_recv, Some("Response_A".to_string()));
let second_recv = sub.next().await.unwrap();
assert_eq!(second_recv, Some("Response_B".to_string()));

let mut sub = client.sub_with_override_notif_method().await.unwrap();
let recv = sub.next().await.unwrap();
assert_eq!(recv, Some(1));
}
12 changes: 12 additions & 0 deletions proc-macros/tests/ui/incorrect/sub/sub_dup_name_override.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use jsonrpsee::{proc_macros::rpc, types::RpcResult};

// Subscription method must not use the same override name.
#[rpc(client, server)]
pub trait DupOverride {
#[subscription(name = "one" => "override", item = u8)]
fn one(&self) -> RpcResult<()>;
#[subscription(name = "two" => "override", item = u8)]
fn two(&self) -> RpcResult<()>;
}

fn main() {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
error: "override" is already defined
--> tests/ui/incorrect/sub/sub_dup_name_override.rs:9:5
|
9 | fn two(&self) -> RpcResult<()>;
| ^^^
10 changes: 10 additions & 0 deletions proc-macros/tests/ui/incorrect/sub/sub_name_override.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use jsonrpsee::{proc_macros::rpc, types::RpcResult};

// Subscription method name conflict with notif override.
#[rpc(client, server)]
pub trait DupName {
#[subscription(name = "one" => "one", item = u8)]
fn one(&self) -> RpcResult<()>;
}

fn main() {}
5 changes: 5 additions & 0 deletions proc-macros/tests/ui/incorrect/sub/sub_name_override.stderr
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
error: "one" is already defined
--> tests/ui/incorrect/sub/sub_name_override.rs:7:5
|
7 | fn one(&self) -> RpcResult<()>;
| ^^^
33 changes: 19 additions & 14 deletions tests/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
module.register_method("say_hello", |_, _| Ok("hello")).unwrap();

module
.register_subscription("subscribe_hello", "unsubscribe_hello", |_, mut sink, _| {
.register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, mut sink, _| {
std::thread::spawn(move || loop {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&"hello from subscription") {
break;
Expand All @@ -52,7 +52,7 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
.unwrap();

module
.register_subscription("subscribe_foo", "unsubscribe_foo", |_, mut sink, _| {
.register_subscription("subscribe_foo", "subscribe_foo", "unsubscribe_foo", |_, mut sink, _| {
std::thread::spawn(move || loop {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&1337) {
break;
Expand All @@ -64,21 +64,26 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
.unwrap();

module
.register_subscription("subscribe_add_one", "unsubscribe_add_one", |params, mut sink, _| {
let mut count: usize = params.one()?;
std::thread::spawn(move || loop {
count = count.wrapping_add(1);
if let Err(Error::SubscriptionClosed(_)) = sink.send(&count) {
break;
}
std::thread::sleep(Duration::from_millis(100));
});
Ok(())
})
.register_subscription(
"subscribe_add_one",
"subscribe_add_one",
"unsubscribe_add_one",
|params, mut sink, _| {
let mut count: usize = params.one()?;
std::thread::spawn(move || loop {
count = count.wrapping_add(1);
if let Err(Error::SubscriptionClosed(_)) = sink.send(&count) {
break;
}
std::thread::sleep(Duration::from_millis(100));
});
Ok(())
},
)
.unwrap();

module
.register_subscription("subscribe_noop", "unsubscribe_noop", |_, mut sink, _| {
.register_subscription("subscribe_noop", "subscribe_noop", "unsubscribe_noop", |_, mut sink, _| {
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(1));
sink.close("Server closed the stream because it was lazy")
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ async fn ws_server_should_stop_subscription_after_client_drop() {
let mut module = RpcModule::new(tx);

module
.register_subscription("subscribe_hello", "unsubscribe_hello", |_, mut sink, mut tx| {
.register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, mut sink, mut tx| {
tokio::spawn(async move {
let close_err = loop {
if let Err(Error::SubscriptionClosed(err)) = sink.send(&1) {
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/proc_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ async fn multiple_blocking_calls_overlap() {

#[tokio::test]
async fn subscriptions_do_not_work_for_http_servers() {
let htserver = HttpServerBuilder::default().build("127.0.0.1:0".parse().unwrap()).unwrap();
let htserver = HttpServerBuilder::default().build("127.0.0.1:0").unwrap();
let addr = htserver.local_addr().unwrap();
let htserver_url = format!("http://{}", addr);
let _handle = htserver.start(RpcServerImpl.into_rpc()).unwrap();
Expand Down
Loading

0 comments on commit 9c6fd4b

Please sign in to comment.