Skip to content

Commit

Permalink
fix websocket subscription performance issue
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Jun 27, 2024
1 parent 189e665 commit 0ea9561
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions rpc/src/module/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use async_trait::async_trait;
use broadcast::error::RecvError;
use ckb_async_runtime::Handle;
use ckb_jsonrpc_types::Topic;
use ckb_logger::error;
use ckb_notify::NotifyController;
use ckb_stop_handler::new_tokio_exit_rx;
use futures_util::{stream::BoxStream, Stream};
use jsonrpc_core::Result;
use jsonrpc_utils::{pub_sub::PublishMsg, rpc};
use tokio::sync::broadcast;

const NOTIFY_CHANNEL_SIZE: usize = 128;

/// RPC Module Subscription that CKB node will push new messages to subscribers, support with WebSocket or TCP.
///
/// RPC subscriptions require a full duplex connection. CKB offers such connections in the form of
Expand Down Expand Up @@ -211,9 +215,19 @@ impl SubscriptionRpc for SubscriptionRpcImpl {
};
let mut rx = tx.subscribe();
Ok(Box::pin(async_stream::stream! {
while let Ok(msg) = rx.recv().await {
yield msg;
}
loop {
match rx.recv().await {
Ok(msg) => {
yield msg;
}
Err(RecvError::Lagged(cnt)) => {
error!("subscription lagged error: {:?}", cnt);
}
Err(RecvError::Closed) => {
break;
}
}
}
}))
}
}
Expand All @@ -232,11 +246,11 @@ impl SubscriptionRpcImpl {
let mut reject_transaction_receiver = handle
.block_on(notify_controller.subscribe_reject_transaction(SUBSCRIBER_NAME.to_string()));

let (new_tip_header_sender, _) = broadcast::channel(10);
let (new_tip_block_sender, _) = broadcast::channel(10);
let (proposed_transaction_sender, _) = broadcast::channel(10);
let (new_transaction_sender, _) = broadcast::channel(10);
let (new_reject_transaction_sender, _) = broadcast::channel(10);
let (new_tip_header_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
let (new_tip_block_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
let (proposed_transaction_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
let (new_transaction_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);
let (new_reject_transaction_sender, _) = broadcast::channel(NOTIFY_CHANNEL_SIZE);

let stop_rx = new_tokio_exit_rx();
handle.spawn({
Expand Down Expand Up @@ -266,7 +280,10 @@ impl SubscriptionRpcImpl {
_ = stop_rx.cancelled() => {
break;
},
else => break,
else => {
error!("SubscriptionRpcImpl tokio::select! unexpected error");
break;
}
}
}
}
Expand Down

0 comments on commit 0ea9561

Please sign in to comment.