From 0ea9561c7c28b217388b0f88ac30290834d7877f Mon Sep 17 00:00:00 2001 From: yukang Date: Thu, 27 Jun 2024 11:16:40 +0800 Subject: [PATCH] fix websocket subscription performance issue --- rpc/src/module/subscription.rs | 35 +++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/rpc/src/module/subscription.rs b/rpc/src/module/subscription.rs index 1c6e846dcef..21a4a4a08c2 100644 --- a/rpc/src/module/subscription.rs +++ b/rpc/src/module/subscription.rs @@ -1,6 +1,8 @@ use async_trait::async_trait; +use broadcast::error::RecvError; use ckb_async_runtime::Handle; use ckb_jsonrpc_types::Topic; +use ckb_logger::error; use ckb_notify::NotifyController; use ckb_stop_handler::new_tokio_exit_rx; use futures_util::{stream::BoxStream, Stream}; @@ -8,6 +10,8 @@ use jsonrpc_core::Result; use jsonrpc_utils::{pub_sub::PublishMsg, rpc}; use tokio::sync::broadcast; +const NOTIFY_CHANNEL_SIZE: usize = 128; + /// RPC Module Subscription that CKB node will push new messages to subscribers, support with WebSocket or TCP. /// /// RPC subscriptions require a full duplex connection. CKB offers such connections in the form of @@ -211,9 +215,19 @@ impl SubscriptionRpc for SubscriptionRpcImpl { }; let mut rx = tx.subscribe(); Ok(Box::pin(async_stream::stream! { - while let Ok(msg) = rx.recv().await { - yield msg; - } + loop { + match rx.recv().await { + Ok(msg) => { + yield msg; + } + Err(RecvError::Lagged(cnt)) => { + error!("subscription lagged error: {:?}", cnt); + } + Err(RecvError::Closed) => { + break; + } + } + } })) } } @@ -232,11 +246,11 @@ impl SubscriptionRpcImpl { let mut reject_transaction_receiver = handle .block_on(notify_controller.subscribe_reject_transaction(SUBSCRIBER_NAME.to_string())); - let (new_tip_header_sender, _) = broadcast::channel(10); - let (new_tip_block_sender, _) = broadcast::channel(10); - let (proposed_transaction_sender, _) = broadcast::channel(10); - let (new_transaction_sender, _) = broadcast::channel(10); - let (new_reject_transaction_sender, _) = broadcast::channel(10); + let (new_tip_header_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE); + let (new_tip_block_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE); + let (proposed_transaction_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE); + let (new_transaction_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE); + let (new_reject_transaction_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE); let stop_rx = new_tokio_exit_rx(); handle.spawn({ @@ -266,7 +280,10 @@ impl SubscriptionRpcImpl { _ = stop_rx.cancelled() => { break; }, - else => break, + else => { + error!("SubscriptionRpcImpl tokio::select! unexpected error"); + break; + } } } }