Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Module API refactor #412

Merged
merged 8 commits into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@ pub(crate) const UNSUB_METHOD_NAME: &str = "unsub";
pub async fn http_server() -> String {
let (server_started_tx, server_started_rx) = oneshot::channel();
tokio::spawn(async move {
let mut server =
let server =
HttpServerBuilder::default().max_request_body_size(u32::MAX).build("127.0.0.1:0".parse().unwrap()).unwrap();
let mut module = RpcModule::new(());
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| (async { Ok("lo") }).boxed()).unwrap();
server.register_module(module).unwrap();
server_started_tx.send(server.local_addr().unwrap()).unwrap();
server.start().await
server.start(module).await
});
format!("http://{}", server_started_rx.await.unwrap())
}
Expand All @@ -30,7 +29,7 @@ pub async fn http_server() -> String {
pub async fn ws_server() -> String {
let (server_started_tx, server_started_rx) = oneshot::channel();
tokio::spawn(async move {
let mut server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap();
let server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap();
let mut module = RpcModule::new(());
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| (async { Ok("lo") }).boxed()).unwrap();
Expand All @@ -42,9 +41,8 @@ pub async fn ws_server() -> String {
})
.unwrap();

server.register_module(module).unwrap();
server_started_tx.send(server.local_addr().unwrap()).unwrap();
server.start().await
server.start(module).await
});
format!("ws://{}", server_started_rx.await.unwrap())
}
Expand Down
5 changes: 2 additions & 3 deletions examples/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,11 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let mut server = HttpServerBuilder::default().build("127.0.0.1:0".parse()?)?;
let server = HttpServerBuilder::default().build("127.0.0.1:0".parse()?)?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;
server.register_module(module).unwrap();

let addr = server.local_addr()?;
tokio::spawn(async move { server.start().await });
tokio::spawn(server.start(module));
Ok(addr)
}
5 changes: 2 additions & 3 deletions examples/proc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,11 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let mut server = HttpServerBuilder::default().build("127.0.0.1:0".parse()?)?;
let server = HttpServerBuilder::default().build("127.0.0.1:0".parse()?)?;
let mut module = RpcModule::new(());
module.register_method("state_getPairs", |_, _| Ok(vec![1, 2, 3]))?;
server.register_module(module).unwrap();

let addr = server.local_addr()?;
tokio::spawn(async move { server.start().await });
tokio::spawn(server.start(module));
Ok(addr)
}
6 changes: 2 additions & 4 deletions examples/weather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ struct WeatherApiCx {
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let mut server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;

let api_client = restson::RestClient::new("http://api.openweathermap.org").unwrap();
let last_weather = Weather::default();
Expand All @@ -126,9 +126,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
})
.unwrap();

server.register_module(module).unwrap();

let addr = server.local_addr()?;
tokio::spawn(async move { server.start().await });
tokio::spawn(server.start(module));
Ok(addr)
}
5 changes: 2 additions & 3 deletions examples/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let mut server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;
server.register_module(module).unwrap();
let addr = server.local_addr()?;
tokio::spawn(async move { server.start().await });
tokio::spawn(server.start(module));
Ok(addr)
}
5 changes: 2 additions & 3 deletions examples/ws_sub_with_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn main() -> anyhow::Result<()> {

async fn run_server() -> anyhow::Result<SocketAddr> {
const LETTERS: &str = "abcdefghijklmnopqrstuvxyz";
let mut server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module
.register_subscription("sub_one_param", "unsub_one_param", |params, mut sink, _| {
Expand All @@ -77,8 +77,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
})
.unwrap();

server.register_module(module).unwrap();
let addr = server.local_addr()?;
tokio::spawn(async move { server.start().await });
tokio::spawn(server.start(module));
Ok(addr)
}
5 changes: 2 additions & 3 deletions examples/ws_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let mut server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_subscription("subscribe_hello", "unsubscribe_hello", |_, mut sink, _| {
std::thread::spawn(move || loop {
Expand All @@ -65,8 +65,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
});
Ok(())
})?;
server.register_module(module).unwrap();
let addr = server.local_addr()?;
tokio::spawn(async move { server.start().await });
tokio::spawn(server.start(module));
Ok(addr)
}
18 changes: 2 additions & 16 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use jsonrpsee_types::{
TEN_MB_SIZE_BYTES,
};
use jsonrpsee_utils::hyper_helpers::read_response_to_body;
use jsonrpsee_utils::server::rpc_module::RpcModule;
use jsonrpsee_utils::server::{
helpers::{collect_batch_response, send_error},
rpc_module::Methods,
Expand Down Expand Up @@ -160,19 +159,6 @@ pub struct Server {
}

impl Server {
/// Register all methods from a [`Methods`] of provided [`RpcModule`] on this server.
/// In case a method already is registered with the same name, no method is added and a [`Error::MethodAlreadyRegistered`]
/// is returned. Note that the [`RpcModule`] is consumed after this call.
pub fn register_module<Context: Send + Sync + 'static>(&mut self, module: RpcModule<Context>) -> Result<(), Error> {
self.methods.merge(module.into_methods())?;
Ok(())
}

/// Returns a `Vec` with all the method names registered on this server.
pub fn method_names(&self) -> Vec<&'static str> {
self.methods.method_names()
}

/// Returns socket address to which the server is bound.
pub fn local_addr(&self) -> Result<SocketAddr, Error> {
self.local_addr.ok_or_else(|| Error::Custom("Local address not found".into()))
Expand All @@ -184,15 +170,15 @@ impl Server {
}

/// Start the server.
pub async fn start(self) -> Result<(), Error> {
pub async fn start(self, methods: impl Into<Methods>) -> Result<(), Error> {
// Lock the stop mutex so existing stop handles can wait for server to stop.
// It will be unlocked once this function returns.
let _stop_handle = self.stop_handle.lock().await;

let methods = Arc::new(self.methods);
let max_request_body_size = self.max_request_body_size;
let access_control = self.access_control;
let mut stop_receiver = self.stop_pair.1;
let methods = methods.into();

let make_service = make_service_fn(move |_| {
let methods = methods.clone();
Expand Down
15 changes: 6 additions & 9 deletions http-server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn server() -> SocketAddr {
}

async fn server_with_handles() -> (SocketAddr, JoinHandle<Result<(), Error>>, StopHandle) {
let mut server = HttpServerBuilder::default().build("127.0.0.1:0".parse().unwrap()).unwrap();
let server = HttpServerBuilder::default().build("127.0.0.1:0".parse().unwrap()).unwrap();
let ctx = TestContext;
let mut module = RpcModule::new(ctx);
let addr = server.local_addr().unwrap();
Expand Down Expand Up @@ -87,9 +87,8 @@ async fn server_with_handles() -> (SocketAddr, JoinHandle<Result<(), Error>>, St
})
.unwrap();

server.register_module(module).unwrap();
let stop_handle = server.stop_handle();
let join_handle = tokio::spawn(async move { server.start().with_default_timeout().await.unwrap() });
let join_handle = tokio::spawn(async move { server.start(module).with_default_timeout().await.unwrap() });
(addr, join_handle, stop_handle)
}

Expand Down Expand Up @@ -323,23 +322,21 @@ async fn can_register_modules() {
let cx2 = Vec::<u8>::new();
let mut mod2 = RpcModule::new(cx2);

let mut server = HttpServerBuilder::default().build("127.0.0.1:0".parse().unwrap()).unwrap();
assert_eq!(server.method_names().len(), 0);
assert_eq!(mod1.method_names().count(), 0);
mod1.register_method("bla", |_, cx| Ok(format!("Gave me {}", cx))).unwrap();
mod1.register_method("bla2", |_, cx| Ok(format!("Gave me {}", cx))).unwrap();
mod2.register_method("yada", |_, cx| Ok(format!("Gave me {:?}", cx))).unwrap();

// Won't register, name clashes
mod2.register_method("bla", |_, cx| Ok(format!("Gave me {:?}", cx))).unwrap();

server.register_module(mod1).unwrap();
assert_eq!(server.method_names().len(), 2);
assert_eq!(mod1.method_names().count(), 2);

let err = server.register_module(mod2).unwrap_err();
let err = mod1.merge(mod2).unwrap_err();

let expected_err = Error::MethodAlreadyRegistered(String::from("bla"));
assert_eq!(err.to_string(), expected_err.to_string());
assert_eq!(server.method_names().len(), 2);
assert_eq!(mod1.method_names().count(), 2);
}

#[tokio::test]
Expand Down
5 changes: 2 additions & 3 deletions proc-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,14 +325,13 @@ pub fn rpc_client_api(input_token_stream: TokenStream) -> TokenStream {
///
/// std::thread::spawn(move || {
/// let rt = tokio::runtime::Runtime::new().unwrap();
/// let mut server = rt.block_on(WsServerBuilder::default().build("127.0.0.1:0")).unwrap();
/// let server = rt.block_on(WsServerBuilder::default().build("127.0.0.1:0")).unwrap();
/// // `into_rpc()` method was generated inside of the `RpcServer` trait under the hood.
/// server.register_module(RpcServerImpl.into_rpc().unwrap()).unwrap();
///
/// rt.block_on(async move {
/// server_started_tx.send(server.local_addr().unwrap()).unwrap();
///
/// server.start().await
/// server.start(RpcServerImpl.into_rpc()).await
/// });
/// });
///
Expand Down
Loading