From 7a113b9746a89b5094bc51a22b940c6e61a6506e Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 15 Sep 2023 14:28:18 +0200 Subject: [PATCH 1/5] fix: remove needless clone in ws background task --- server/src/transport/ws.rs | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 68180ed307..2360ec9f46 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -60,7 +60,7 @@ pub(crate) struct Batch<'a, L: Logger> { #[derive(Debug, Clone)] pub(crate) struct CallData<'a, L: Logger> { pub(crate) conn_id: usize, - pub(crate) bounded_subscriptions: BoundedSubscriptions, + pub(crate) bounded_subscriptions: &'a BoundedSubscriptions, pub(crate) id_provider: &'a dyn IdProvider, pub(crate) methods: &'a Methods, pub(crate) max_response_body_size: u32, @@ -266,6 +266,7 @@ pub(crate) async fn background_task(sender: Sender, mut receiver: Rec sink: sink.clone(), id_provider, logger: logger.clone(), + bounded_subscriptions, }); tokio::pin!(stopped); @@ -319,11 +320,7 @@ pub(crate) async fn background_task(sender: Sender, mut receiver: Rec } }; - pending_calls.push(tokio::spawn(execute_unchecked_call( - params.clone(), - std::mem::take(&mut data), - bounded_subscriptions.clone(), - ))); + pending_calls.push(tokio::spawn(execute_unchecked_call(params.clone(), std::mem::take(&mut data)))); }; // Drive all running methods to completion. @@ -492,18 +489,15 @@ struct ExecuteCallParams { max_log_length: u32, sink: MethodSink, logger: L, + bounded_subscriptions: BoundedSubscriptions, } -async fn execute_unchecked_call( - params: Arc>, - data: Vec, - bounded_subscriptions: BoundedSubscriptions, -) { +async fn execute_unchecked_call(params: Arc>, data: Vec) { let request_start = params.logger.on_request(TransportProtocol::WebSocket); let first_non_whitespace = data.iter().enumerate().take(128).find(|(_, byte)| !byte.is_ascii_whitespace()); let call_data = CallData { - bounded_subscriptions, + bounded_subscriptions: ¶ms.bounded_subscriptions, conn_id: params.conn_id as usize, max_response_body_size: params.max_response_body_size, max_log_length: params.max_log_length, From d68fc124d59b22b0c08e0308212d6e97ed5f4b27 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 15 Sep 2023 17:10:27 +0200 Subject: [PATCH 2/5] fix(server): fix leak in FuturesUnordered The tokio::spawn handles were never removed from `FutursUnordered` which this commit fixes. Reduces the memory usage signficantly but still slightly worse than v0.16.x --- server/src/transport/ws.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 2360ec9f46..3c73689f7e 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -7,7 +7,7 @@ use crate::PingConfig; use futures_util::future::{self, Either, Fuse}; use futures_util::io::{BufReader, BufWriter}; -use futures_util::stream::{FuturesOrdered, FuturesUnordered}; +use futures_util::stream::FuturesOrdered; use futures_util::{Future, FutureExt, StreamExt}; use hyper::upgrade::Upgraded; use jsonrpsee_core::server::helpers::{ @@ -248,7 +248,7 @@ pub(crate) async fn background_task(sender: Sender, mut receiver: Rec let (conn_tx, conn_rx) = oneshot::channel(); let sink = MethodSink::new_with_limit(tx, max_response_body_size, max_log_length); let bounded_subscriptions = BoundedSubscriptions::new(max_subscriptions_per_connection); - let pending_calls = FuturesUnordered::new(); + let (call_complete, pending_calls) = mpsc::channel::<()>(1); // Spawn another task that sends out the responses on the Websocket. let send_task_handle = tokio::spawn(send_task(rx, sender, ping_config.ping_interval(), conn_rx)); @@ -320,7 +320,7 @@ pub(crate) async fn background_task(sender: Sender, mut receiver: Rec } }; - pending_calls.push(tokio::spawn(execute_unchecked_call(params.clone(), std::mem::take(&mut data)))); + tokio::spawn(execute_unchecked_call(params.clone(), std::mem::take(&mut data), call_complete.clone())); }; // Drive all running methods to completion. @@ -492,7 +492,11 @@ struct ExecuteCallParams { bounded_subscriptions: BoundedSubscriptions, } -async fn execute_unchecked_call(params: Arc>, data: Vec) { +async fn execute_unchecked_call( + params: Arc>, + data: Vec, + drop_on_completion: mpsc::Sender<()>, +) { let request_start = params.logger.on_request(TransportProtocol::WebSocket); let first_non_whitespace = data.iter().enumerate().take(128).find(|(_, byte)| !byte.is_ascii_whitespace()); @@ -550,6 +554,8 @@ async fn execute_unchecked_call(params: Arc>, da _ = params.sink.send_error(Id::Null, ErrorCode::ParseError.into()).await; } }; + + drop(drop_on_completion); } #[derive(Debug, Copy, Clone)] @@ -561,14 +567,16 @@ pub(crate) enum Shutdown { /// Enforce a graceful shutdown. /// /// This will return once the connection has been terminated or all pending calls have been executed. -async fn graceful_shutdown( +async fn graceful_shutdown( result: Result, - pending_calls: FuturesUnordered, + pending_calls: mpsc::Receiver<()>, receiver: Receiver, data: Vec, mut conn_tx: oneshot::Sender<()>, send_task_handle: tokio::task::JoinHandle<()>, ) { + let pending_calls = ReceiverStream::new(pending_calls); + match result { Ok(Shutdown::ConnectionClosed) | Err(SokettoError::Closed) => (), Ok(Shutdown::Stopped) | Err(_) => { From 2df1f31cdda5357b3726b86ee107aac79147c461 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 15 Sep 2023 17:34:03 +0200 Subject: [PATCH 3/5] Update server/src/transport/ws.rs --- server/src/transport/ws.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 25f6cd760e..35e6ceeac0 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -560,6 +560,8 @@ async fn execute_unchecked_call( } }; + // NOTE: This channel is only used to indicate that a method call was completed + // thus the drop here tells the main task that method call was completed. drop(drop_on_completion); } From 87f2d6c99a53dd6841d99b319560a44d590f708a Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 15 Sep 2023 17:35:31 +0200 Subject: [PATCH 4/5] cargo fmt --- server/src/transport/ws.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 35e6ceeac0..956a222a15 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -560,8 +560,8 @@ async fn execute_unchecked_call( } }; - // NOTE: This channel is only used to indicate that a method call was completed - // thus the drop here tells the main task that method call was completed. + // NOTE: This channel is only used to indicate that a method call was completed + // thus the drop here tells the main task that method call was completed. drop(drop_on_completion); } From 89c3033526f49e225e6e0a335d549bbf3e35d186 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 15 Sep 2023 17:41:58 +0200 Subject: [PATCH 5/5] wording --- server/src/transport/ws.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 956a222a15..e0fe0029a5 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -250,8 +250,8 @@ pub(crate) async fn background_task(sender: Sender, mut receiver: Rec let bounded_subscriptions = BoundedSubscriptions::new(max_subscriptions_per_connection); // On each method call the `pending_calls` is cloned - // then all pending_calls are dropped then a graceful shutdown - // has occured. + // then when all pending_calls are dropped + // a graceful shutdown can has occur. let (pending_calls, pending_calls_completed) = mpsc::channel::<()>(1); // Spawn another task that sends out the responses on the Websocket.