diff --git a/Cargo.lock b/Cargo.lock index 8412781c682e3..c4bd2532e8ede 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7948,6 +7948,7 @@ dependencies = [ "pin-project", "sc-network-common", "sc-peerset", + "sc-utils", "sp-consensus", "sp-runtime", "substrate-prometheus-endpoint", @@ -8284,6 +8285,7 @@ dependencies = [ "parking_lot 0.12.1", "pin-project", "rand 0.8.5", + "sc-utils", "serde", "serde_json", "thiserror", diff --git a/client/api/src/notifications.rs b/client/api/src/notifications.rs index 9fcc381f9697e..4dd23581d2622 100644 --- a/client/api/src/notifications.rs +++ b/client/api/src/notifications.rs @@ -144,7 +144,9 @@ impl StorageNotifications { filter_keys: Option<&[StorageKey]>, filter_child_keys: Option<&[(StorageKey, Option>)]>, ) -> StorageEventStream { - let receiver = self.0.subscribe(registry::SubscribeOp { filter_keys, filter_child_keys }); + let receiver = self + .0 + .subscribe(registry::SubscribeOp { filter_keys, filter_child_keys }, 100_000); StorageEventStream(receiver) } diff --git a/client/beefy/rpc/src/lib.rs b/client/beefy/rpc/src/lib.rs index 59a133b86214e..4fea98c6eb24e 100644 --- a/client/beefy/rpc/src/lib.rs +++ b/client/beefy/rpc/src/lib.rs @@ -120,7 +120,7 @@ where ) -> Result { let beefy_best_block = Arc::new(RwLock::new(None)); - let stream = best_block_stream.subscribe(); + let stream = best_block_stream.subscribe(100_000); let closure_clone = beefy_best_block.clone(); let future = stream.for_each(move |best_beefy| { let async_clone = closure_clone.clone(); @@ -141,7 +141,7 @@ where fn subscribe_justifications(&self, mut sink: SubscriptionSink) -> SubscriptionResult { let stream = self .finality_proof_stream - .subscribe() + .subscribe(100_000) .map(|vfp| notification::EncodedVersionedFinalityProof::new::(vfp)); let fut = async move { diff --git a/client/beefy/src/lib.rs b/client/beefy/src/lib.rs index 35f7cac55a964..5b6531822a0a1 100644 --- a/client/beefy/src/lib.rs +++ b/client/beefy/src/lib.rs @@ -265,7 +265,7 @@ where // Subscribe to finality notifications and justifications before waiting for runtime pallet and // reuse the streams, so we don't miss notifications while waiting for pallet to be available. let mut finality_notifications = client.finality_notification_stream().fuse(); - let block_import_justif = links.from_block_import_justif_stream.subscribe().fuse(); + let block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse(); // Wait for BEEFY pallet to be active before starting voter. let persisted_state = diff --git a/client/beefy/src/tests.rs b/client/beefy/src/tests.rs index ce6f6ae3c978a..66d3a210d7eba 100644 --- a/client/beefy/src/tests.rs +++ b/client/beefy/src/tests.rs @@ -430,8 +430,8 @@ pub(crate) fn get_beefy_streams( let beefy_rpc_links = net.peer(index).data.beefy_rpc_links.lock().clone().unwrap(); let BeefyRPCLinks { from_voter_justif_stream, from_voter_best_beefy_stream } = beefy_rpc_links; - best_block_streams.push(from_voter_best_beefy_stream.subscribe()); - versioned_finality_proof_streams.push(from_voter_justif_stream.subscribe()); + best_block_streams.push(from_voter_best_beefy_stream.subscribe(100_000)); + versioned_finality_proof_streams.push(from_voter_justif_stream.subscribe(100_000)); }); (best_block_streams, versioned_finality_proof_streams) } @@ -736,7 +736,7 @@ async fn beefy_importing_blocks() { let hashof1 = block.header.hash(); // Import without justifications. - let mut justif_recv = justif_stream.subscribe(); + let mut justif_recv = justif_stream.subscribe(100_000); assert_eq!( block_import .import_block(params(block.clone(), None), HashMap::new()) @@ -779,7 +779,7 @@ async fn beefy_importing_blocks() { let builder = full_client.new_block_at(&parent_id, Default::default(), false).unwrap(); let block = builder.build().unwrap().block; let hashof2 = block.header.hash(); - let mut justif_recv = justif_stream.subscribe(); + let mut justif_recv = justif_stream.subscribe(100_000); assert_eq!( block_import.import_block(params(block, justif), HashMap::new()).await.unwrap(), ImportResult::Imported(ImportedAux { @@ -823,7 +823,7 @@ async fn beefy_importing_blocks() { let builder = full_client.new_block_at(&parent_id, Default::default(), false).unwrap(); let block = builder.build().unwrap().block; let hashof3 = block.header.hash(); - let mut justif_recv = justif_stream.subscribe(); + let mut justif_recv = justif_stream.subscribe(100_000); assert_eq!( block_import.import_block(params(block, justif), HashMap::new()).await.unwrap(), ImportResult::Imported(ImportedAux { diff --git a/client/consensus/common/src/import_queue/basic_queue.rs b/client/consensus/common/src/import_queue/basic_queue.rs index b63bc192b2e77..a96b0a47e57c7 100644 --- a/client/consensus/common/src/import_queue/basic_queue.rs +++ b/client/consensus/common/src/import_queue/basic_queue.rs @@ -69,7 +69,7 @@ impl BasicQueue { spawner: &impl sp_core::traits::SpawnEssentialNamed, prometheus_registry: Option<&Registry>, ) -> Self { - let (result_sender, result_port) = buffered_link::buffered_link(); + let (result_sender, result_port) = buffered_link::buffered_link(100_000); let metrics = prometheus_registry.and_then(|r| { Metrics::register(r) @@ -276,10 +276,10 @@ impl BlockImportWorker { use worker_messages::*; let (justification_sender, mut justification_port) = - tracing_unbounded("mpsc_import_queue_worker_justification"); + tracing_unbounded("mpsc_import_queue_worker_justification", 100_000); let (block_import_sender, block_import_port) = - tracing_unbounded("mpsc_import_queue_worker_blocks"); + tracing_unbounded("mpsc_import_queue_worker_blocks", 100_000); let mut worker = BlockImportWorker { result_sender, justification_import, metrics }; @@ -595,7 +595,7 @@ mod tests { #[test] fn prioritizes_finality_work_over_block_import() { - let (result_sender, mut result_port) = buffered_link::buffered_link(); + let (result_sender, mut result_port) = buffered_link::buffered_link(100_000); let (worker, mut finality_sender, mut block_import_sender) = BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None); diff --git a/client/consensus/common/src/import_queue/buffered_link.rs b/client/consensus/common/src/import_queue/buffered_link.rs index e6d3b212fdbac..71adcf2dc2ea9 100644 --- a/client/consensus/common/src/import_queue/buffered_link.rs +++ b/client/consensus/common/src/import_queue/buffered_link.rs @@ -28,7 +28,7 @@ //! # use sp_test_primitives::Block; //! # struct DummyLink; impl Link for DummyLink {} //! # let mut my_link = DummyLink; -//! let (mut tx, mut rx) = buffered_link::(); +//! let (mut tx, mut rx) = buffered_link::(100_000); //! tx.blocks_processed(0, 0, vec![]); //! //! // Calls `my_link.blocks_processed(0, 0, vec![])` when polled. @@ -51,9 +51,11 @@ use super::BlockImportResult; /// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and /// can be used to buffer commands, and the receiver can be used to poll said commands and transfer -/// them to another link. -pub fn buffered_link() -> (BufferedLinkSender, BufferedLinkReceiver) { - let (tx, rx) = tracing_unbounded("mpsc_buffered_link"); +/// them to another link. `queue_size_warning` sets the warning threshold of the channel queue size. +pub fn buffered_link( + queue_size_warning: i64, +) -> (BufferedLinkSender, BufferedLinkReceiver) { + let (tx, rx) = tracing_unbounded("mpsc_buffered_link", queue_size_warning); let tx = BufferedLinkSender { tx }; let rx = BufferedLinkReceiver { rx: rx.fuse() }; (tx, rx) @@ -175,7 +177,7 @@ mod tests { #[test] fn is_closed() { - let (tx, rx) = super::buffered_link::(); + let (tx, rx) = super::buffered_link::(1); assert!(!tx.is_closed()); drop(rx); assert!(tx.is_closed()); diff --git a/client/finality-grandpa/rpc/src/lib.rs b/client/finality-grandpa/rpc/src/lib.rs index dfdad666ba8f3..70ff7ed176869 100644 --- a/client/finality-grandpa/rpc/src/lib.rs +++ b/client/finality-grandpa/rpc/src/lib.rs @@ -104,7 +104,7 @@ where } fn subscribe_justifications(&self, mut sink: SubscriptionSink) -> SubscriptionResult { - let stream = self.justification_stream.subscribe().map( + let stream = self.justification_stream.subscribe(100_000).map( |x: sc_finality_grandpa::GrandpaJustification| { JustificationNotification::from(x) }, diff --git a/client/finality-grandpa/src/communication/gossip.rs b/client/finality-grandpa/src/communication/gossip.rs index 408cbda745e56..cbcafc727d436 100644 --- a/client/finality-grandpa/src/communication/gossip.rs +++ b/client/finality-grandpa/src/communication/gossip.rs @@ -1364,7 +1364,7 @@ impl GossipValidator { None => None, }; - let (tx, rx) = tracing_unbounded("mpsc_grandpa_gossip_validator"); + let (tx, rx) = tracing_unbounded("mpsc_grandpa_gossip_validator", 100_000); let val = GossipValidator { inner: parking_lot::RwLock::new(Inner::new(config)), set_state, diff --git a/client/finality-grandpa/src/communication/periodic.rs b/client/finality-grandpa/src/communication/periodic.rs index 7e50abb96e441..c00fed1296512 100644 --- a/client/finality-grandpa/src/communication/periodic.rs +++ b/client/finality-grandpa/src/communication/periodic.rs @@ -70,6 +70,7 @@ impl NeighborPacketWorker { pub(super) fn new(rebroadcast_period: Duration) -> (Self, NeighborPacketSender) { let (tx, rx) = tracing_unbounded::<(Vec, NeighborPacket>)>( "mpsc_grandpa_neighbor_packet_worker", + 100_000, ); let delay = Delay::new(rebroadcast_period); diff --git a/client/finality-grandpa/src/communication/tests.rs b/client/finality-grandpa/src/communication/tests.rs index eab7bb2df50cf..839b2d52be651 100644 --- a/client/finality-grandpa/src/communication/tests.rs +++ b/client/finality-grandpa/src/communication/tests.rs @@ -135,7 +135,7 @@ impl NetworkEventStream for TestNetwork { &self, _name: &'static str, ) -> Pin + Send>> { - let (tx, rx) = tracing_unbounded("test"); + let (tx, rx) = tracing_unbounded("test", 100_000); let _ = self.sender.unbounded_send(Event::EventStream(tx)); Box::pin(rx) } @@ -253,7 +253,7 @@ fn voter_set_state() -> SharedVoterSetState { // needs to run in a tokio runtime. pub(crate) fn make_test_network() -> (impl Future, TestNetwork) { - let (tx, rx) = tracing_unbounded("test"); + let (tx, rx) = tracing_unbounded("test", 100_000); let net = TestNetwork { sender: tx }; #[derive(Clone)] diff --git a/client/finality-grandpa/src/lib.rs b/client/finality-grandpa/src/lib.rs index efc46d8f93a6d..1597e60bd6061 100644 --- a/client/finality-grandpa/src/lib.rs +++ b/client/finality-grandpa/src/lib.rs @@ -566,7 +566,8 @@ where } })?; - let (voter_commands_tx, voter_commands_rx) = tracing_unbounded("mpsc_grandpa_voter_command"); + let (voter_commands_tx, voter_commands_rx) = + tracing_unbounded("mpsc_grandpa_voter_command", 100_000); let (justification_sender, justification_stream) = GrandpaJustificationStream::channel(); diff --git a/client/finality-grandpa/src/observer.rs b/client/finality-grandpa/src/observer.rs index 1efb71e5903ec..96101a8eda0ab 100644 --- a/client/finality-grandpa/src/observer.rs +++ b/client/finality-grandpa/src/observer.rs @@ -437,7 +437,7 @@ mod tests { aux_schema::load_persistent(&*backend, client.info().genesis_hash, 0, || Ok(voters)) .unwrap(); - let (_tx, voter_command_rx) = tracing_unbounded(""); + let (_tx, voter_command_rx) = tracing_unbounded("test_mpsc_voter_command", 100_000); let observer = ObserverWork::new( client, diff --git a/client/finality-grandpa/src/until_imported.rs b/client/finality-grandpa/src/until_imported.rs index 95b658e92298a..776411f8fb493 100644 --- a/client/finality-grandpa/src/until_imported.rs +++ b/client/finality-grandpa/src/until_imported.rs @@ -579,7 +579,7 @@ mod tests { impl TestChainState { fn new() -> (Self, ImportNotifications) { - let (tx, rx) = tracing_unbounded("test"); + let (tx, rx) = tracing_unbounded("test", 100_000); let state = TestChainState { sender: tx, known_blocks: Arc::new(Mutex::new(HashMap::new())) }; @@ -680,7 +680,7 @@ mod tests { // enact all dependencies before importing the message enact_dependencies(&chain_state); - let (global_tx, global_rx) = tracing_unbounded("test"); + let (global_tx, global_rx) = tracing_unbounded("test", 100_000); let until_imported = UntilGlobalMessageBlocksImported::new( import_notifications, @@ -708,7 +708,7 @@ mod tests { let (chain_state, import_notifications) = TestChainState::new(); let block_status = chain_state.block_status(); - let (global_tx, global_rx) = tracing_unbounded("test"); + let (global_tx, global_rx) = tracing_unbounded("test", 100_000); let until_imported = UntilGlobalMessageBlocksImported::new( import_notifications, @@ -896,7 +896,7 @@ mod tests { let (chain_state, import_notifications) = TestChainState::new(); let block_status = chain_state.block_status(); - let (global_tx, global_rx) = tracing_unbounded("test"); + let (global_tx, global_rx) = tracing_unbounded("test", 100_000); let block_sync_requester = TestBlockSyncRequester::default(); diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 08e498299a1d3..66de263a19a3e 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -208,7 +208,7 @@ where ¶ms.network_config.transport, )?; - let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker"); + let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker", 100_000); if let Some(path) = ¶ms.network_config.net_config_path { fs::create_dir_all(path)?; @@ -1003,7 +1003,7 @@ where H: ExHashT, { fn event_stream(&self, name: &'static str) -> Pin + Send>> { - let (tx, rx) = out_events::channel(name); + let (tx, rx) = out_events::channel(name, 100_000); let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx)); Box::pin(rx) } diff --git a/client/network/src/service/out_events.rs b/client/network/src/service/out_events.rs index 4144d7f19551e..8febdd4726b37 100644 --- a/client/network/src/service/out_events.rs +++ b/client/network/src/service/out_events.rs @@ -32,25 +32,40 @@ //! collection. use futures::{channel::mpsc, prelude::*, ready, stream::FusedStream}; +use log::error; use parking_lot::Mutex; use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64}; use sc_network_common::protocol::event::Event; use std::{ + backtrace::{Backtrace, BacktraceStatus}, cell::RefCell, fmt, pin::Pin, - sync::Arc, + sync::{ + atomic::{AtomicI64, Ordering}, + Arc, + }, task::{Context, Poll}, }; /// Creates a new channel that can be associated to a [`OutChannels`]. /// -/// The name is used in Prometheus reports. -pub fn channel(name: &'static str) -> (Sender, Receiver) { +/// The name is used in Prometheus reports, the queue size threshold is used +/// to warn if there are too many unprocessed events in the channel. +pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver) { let (tx, rx) = mpsc::unbounded(); let metrics = Arc::new(Mutex::new(None)); - let tx = Sender { inner: tx, name, metrics: metrics.clone() }; - let rx = Receiver { inner: rx, name, metrics }; + let queue_size = Arc::new(AtomicI64::new(0)); + let tx = Sender { + inner: tx, + name, + queue_size: queue_size.clone(), + queue_size_warning, + warning_fired: false, + creation_backtrace: Backtrace::capture(), + metrics: metrics.clone(), + }; + let rx = Receiver { inner: rx, name, queue_size, metrics }; (tx, rx) } @@ -63,7 +78,19 @@ pub fn channel(name: &'static str) -> (Sender, Receiver) { /// sync on drop. If someone adds a `#[derive(Clone)]` below, it is **wrong**. pub struct Sender { inner: mpsc::UnboundedSender, + /// Name to identify the channel (e.g., in Prometheus and logs). name: &'static str, + /// Number of events in the queue. Clone of [`Receiver::in_transit`]. + // To not bother with ordering and possible underflow errors of the unsigned counter + // we just use `i64` and `Ordering::Relaxed`, and perceive `queue_size` as approximate. + // It can turn < 0 though. + queue_size: Arc, + /// Threshold queue size to generate an error message in the logs. + queue_size_warning: i64, + /// We generate the error message only once to not spam the logs. + warning_fired: bool, + /// Backtrace of a place where the channel was created. + creation_backtrace: Backtrace, /// Clone of [`Receiver::metrics`]. metrics: Arc>>>>, } @@ -87,6 +114,7 @@ impl Drop for Sender { pub struct Receiver { inner: mpsc::UnboundedReceiver, name: &'static str, + queue_size: Arc, /// Initially contains `None`, and will be set to a value once the corresponding [`Sender`] /// is assigned to an instance of [`OutChannels`]. metrics: Arc>>>>, @@ -97,6 +125,7 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) { + let _ = self.queue_size.fetch_sub(1, Ordering::Relaxed); let metrics = self.metrics.lock().clone(); match metrics.as_ref().map(|m| m.as_ref()) { Some(Some(metrics)) => metrics.event_out(&ev, self.name), @@ -160,12 +189,28 @@ impl OutChannels { /// Sends an event. pub fn send(&mut self, event: Event) { - self.event_streams - .retain(|sender| sender.inner.unbounded_send(event.clone()).is_ok()); + self.event_streams.retain_mut(|sender| { + let queue_size = sender.queue_size.fetch_add(1, Ordering::Relaxed); + if queue_size == sender.queue_size_warning && !sender.warning_fired { + sender.warning_fired = true; + match sender.creation_backtrace.status() { + BacktraceStatus::Captured => error!( + "The number of unprocessed events in channel `{}` reached {}.\n\ + The channel was created at:\n{}", + sender.name, sender.queue_size_warning, sender.creation_backtrace, + ), + _ => error!( + "The number of unprocessed events in channel `{}` reached {}.", + sender.name, sender.queue_size_warning, + ), + } + } + sender.inner.unbounded_send(event.clone()).is_ok() + }); if let Some(metrics) = &*self.metrics { for ev in &self.event_streams { - metrics.event_in(&event, 1, ev.name); + metrics.event_in(&event, ev.name); } } } @@ -232,45 +277,35 @@ impl Metrics { }) } - fn event_in(&self, event: &Event, num: u64, name: &str) { + fn event_in(&self, event: &Event, name: &str) { match event { Event::Dht(_) => { - self.events_total.with_label_values(&["dht", "sent", name]).inc_by(num); + self.events_total.with_label_values(&["dht", "sent", name]).inc(); }, Event::SyncConnected { .. } => { - self.events_total - .with_label_values(&["sync-connected", "sent", name]) - .inc_by(num); + self.events_total.with_label_values(&["sync-connected", "sent", name]).inc(); }, Event::SyncDisconnected { .. } => { - self.events_total - .with_label_values(&["sync-disconnected", "sent", name]) - .inc_by(num); + self.events_total.with_label_values(&["sync-disconnected", "sent", name]).inc(); }, Event::NotificationStreamOpened { protocol, .. } => { format_label("notif-open-", protocol, |protocol_label| { - self.events_total - .with_label_values(&[protocol_label, "sent", name]) - .inc_by(num); + self.events_total.with_label_values(&[protocol_label, "sent", name]).inc(); }); }, Event::NotificationStreamClosed { protocol, .. } => { format_label("notif-closed-", protocol, |protocol_label| { - self.events_total - .with_label_values(&[protocol_label, "sent", name]) - .inc_by(num); + self.events_total.with_label_values(&[protocol_label, "sent", name]).inc(); }); }, Event::NotificationsReceived { messages, .. } => for (protocol, message) in messages { format_label("notif-", protocol, |protocol_label| { - self.events_total - .with_label_values(&[protocol_label, "sent", name]) - .inc_by(num); + self.events_total.with_label_values(&[protocol_label, "sent", name]).inc(); }); - self.notifications_sizes.with_label_values(&[protocol, "sent", name]).inc_by( - num.saturating_mul(u64::try_from(message.len()).unwrap_or(u64::MAX)), - ); + self.notifications_sizes + .with_label_values(&[protocol, "sent", name]) + .inc_by(u64::try_from(message.len()).unwrap_or(u64::MAX)); }, } } diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 7dd818d4c12cb..76f14a721a439 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -1436,7 +1436,7 @@ where state_request_protocol_name: ProtocolName, warp_sync_protocol_name: Option, ) -> Result<(Self, ChainSyncInterfaceHandle, NonDefaultSetConfig), ClientError> { - let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync"); + let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000); let block_announce_config = Self::get_block_announce_proto_config( protocol_id, fork_id, diff --git a/client/network/sync/src/service/network.rs b/client/network/sync/src/service/network.rs index c44398b0f1a9e..b81a65ae731cf 100644 --- a/client/network/sync/src/service/network.rs +++ b/client/network/sync/src/service/network.rs @@ -99,7 +99,7 @@ impl NetworkServiceHandle { impl NetworkServiceProvider { /// Create new `NetworkServiceProvider` pub fn new() -> (Self, NetworkServiceHandle) { - let (tx, rx) = tracing_unbounded("mpsc_network_service_provider"); + let (tx, rx) = tracing_unbounded("mpsc_network_service_provider", 100_000); (Self { rx }, NetworkServiceHandle::new(tx)) } diff --git a/client/network/transactions/Cargo.toml b/client/network/transactions/Cargo.toml index f91651d1508b5..1255dededc3eb 100644 --- a/client/network/transactions/Cargo.toml +++ b/client/network/transactions/Cargo.toml @@ -22,5 +22,6 @@ pin-project = "1.0.12" prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" } sc-network-common = { version = "0.10.0-dev", path = "../common" } sc-peerset = { version = "4.0.0-dev", path = "../../peerset" } +sc-utils = { version = "4.0.0-dev", path = "../../utils" } sp-runtime = { version = "7.0.0", path = "../../../primitives/runtime" } sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" } diff --git a/client/network/transactions/src/lib.rs b/client/network/transactions/src/lib.rs index 4cc76507c6f16..a5adb274d29de 100644 --- a/client/network/transactions/src/lib.rs +++ b/client/network/transactions/src/lib.rs @@ -28,7 +28,7 @@ use crate::config::*; use codec::{Decode, Encode}; -use futures::{channel::mpsc, prelude::*, stream::FuturesUnordered}; +use futures::{prelude::*, stream::FuturesUnordered}; use libp2p::{multiaddr, PeerId}; use log::{debug, trace, warn}; use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; @@ -40,6 +40,7 @@ use sc_network_common::{ utils::{interval, LruHashSet}, ExHashT, }; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_runtime::traits::Block as BlockT; use std::{ collections::{hash_map::Entry, HashMap}, @@ -168,7 +169,7 @@ impl TransactionsHandlerPrototype { metrics_registry: Option<&Registry>, ) -> error::Result<(TransactionsHandler, TransactionsHandlerController)> { let event_stream = service.event_stream("transactions-handler"); - let (to_handler, from_controller) = mpsc::unbounded(); + let (to_handler, from_controller) = tracing_unbounded("mpsc_transactions_handler", 100_000); let handler = TransactionsHandler { protocol_name: self.protocol_name, @@ -197,7 +198,7 @@ impl TransactionsHandlerPrototype { /// Controls the behaviour of a [`TransactionsHandler`] it is connected to. pub struct TransactionsHandlerController { - to_handler: mpsc::UnboundedSender>, + to_handler: TracingUnboundedSender>, } impl TransactionsHandlerController { @@ -246,7 +247,7 @@ pub struct TransactionsHandler< // All connected peers peers: HashMap>, transaction_pool: Arc>, - from_controller: mpsc::UnboundedReceiver>, + from_controller: TracingUnboundedReceiver>, /// Prometheus metrics. metrics: Option, } diff --git a/client/offchain/src/api/http.rs b/client/offchain/src/api/http.rs index 4c97e5a47058d..a47adb3e8026e 100644 --- a/client/offchain/src/api/http.rs +++ b/client/offchain/src/api/http.rs @@ -66,8 +66,8 @@ impl SharedClient { /// Creates a pair of [`HttpApi`] and [`HttpWorker`]. pub fn http(shared_client: SharedClient) -> (HttpApi, HttpWorker) { - let (to_worker, from_api) = tracing_unbounded("mpsc_ocw_to_worker"); - let (to_api, from_worker) = tracing_unbounded("mpsc_ocw_to_api"); + let (to_worker, from_api) = tracing_unbounded("mpsc_ocw_to_worker", 100_000); + let (to_api, from_worker) = tracing_unbounded("mpsc_ocw_to_api", 100_000); let api = HttpApi { to_worker, diff --git a/client/peerset/src/lib.rs b/client/peerset/src/lib.rs index ec09835c4898e..9b1dc6a2d0276 100644 --- a/client/peerset/src/lib.rs +++ b/client/peerset/src/lib.rs @@ -275,7 +275,7 @@ pub struct Peerset { impl Peerset { /// Builds a new peerset from the given configuration. pub fn from_config(config: PeersetConfig) -> (Self, PeersetHandle) { - let (tx, rx) = tracing_unbounded("mpsc_peerset_messages"); + let (tx, rx) = tracing_unbounded("mpsc_peerset_messages", 10_000); let handle = PeersetHandle { tx: tx.clone() }; diff --git a/client/rpc/src/system/tests.rs b/client/rpc/src/system/tests.rs index 00ab9c46861e2..4da49cdd1a0c5 100644 --- a/client/rpc/src/system/tests.rs +++ b/client/rpc/src/system/tests.rs @@ -52,7 +52,7 @@ impl Default for Status { fn api>>(sync: T) -> RpcModule> { let status = sync.into().unwrap_or_default(); let should_have_peers = !status.is_dev; - let (tx, rx) = tracing_unbounded("rpc_system_tests"); + let (tx, rx) = tracing_unbounded("rpc_system_tests", 10_000); thread::spawn(move || { futures::executor::block_on(rx.for_each(move |request| { match request { diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 0a26c00485e2f..1f94f96fae89e 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -962,7 +962,7 @@ where ); spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(chain_sync_service))); - let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc"); + let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000); let future = build_network_future( config.role.clone(), diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index 493fd320b7b23..78f2174b89eea 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -1913,13 +1913,13 @@ where { /// Get block import event stream. fn import_notification_stream(&self) -> ImportNotifications { - let (sink, stream) = tracing_unbounded("mpsc_import_notification_stream"); + let (sink, stream) = tracing_unbounded("mpsc_import_notification_stream", 100_000); self.import_notification_sinks.lock().push(sink); stream } fn finality_notification_stream(&self) -> FinalityNotifications { - let (sink, stream) = tracing_unbounded("mpsc_finality_notification_stream"); + let (sink, stream) = tracing_unbounded("mpsc_finality_notification_stream", 100_000); self.finality_notification_sinks.lock().push(sink); stream } diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index 49189dc21ce8d..23265f9672555 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -310,7 +310,8 @@ impl TaskManager { let (signal, on_exit) = exit_future::signal(); // A side-channel for essential tasks to communicate shutdown. - let (essential_failed_tx, essential_failed_rx) = tracing_unbounded("mpsc_essential_tasks"); + let (essential_failed_tx, essential_failed_rx) = + tracing_unbounded("mpsc_essential_tasks", 100); let metrics = prometheus_registry.map(Metrics::register).transpose()?; diff --git a/client/telemetry/Cargo.toml b/client/telemetry/Cargo.toml index 1fc89980e2de8..6ef69d1d97afd 100644 --- a/client/telemetry/Cargo.toml +++ b/client/telemetry/Cargo.toml @@ -20,6 +20,7 @@ libp2p = { version = "0.49.0", default-features = false, features = ["dns", "tcp log = "0.4.17" parking_lot = "0.12.1" pin-project = "1.0.12" +sc-utils = { version = "4.0.0-dev", path = "../utils" } rand = "0.8.5" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.85" diff --git a/client/telemetry/src/lib.rs b/client/telemetry/src/lib.rs index 503a326f76c2b..aa6b841b79164 100644 --- a/client/telemetry/src/lib.rs +++ b/client/telemetry/src/lib.rs @@ -40,6 +40,7 @@ use futures::{channel::mpsc, prelude::*}; use libp2p::Multiaddr; use log::{error, warn}; use parking_lot::Mutex; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use serde::Serialize; use std::{ collections::{ @@ -147,8 +148,8 @@ pub struct SysInfo { pub struct TelemetryWorker { message_receiver: mpsc::Receiver, message_sender: mpsc::Sender, - register_receiver: mpsc::UnboundedReceiver, - register_sender: mpsc::UnboundedSender, + register_receiver: TracingUnboundedReceiver, + register_sender: TracingUnboundedSender, id_counter: Arc, } @@ -163,7 +164,8 @@ impl TelemetryWorker { // error as early as possible. let _transport = initialize_transport()?; let (message_sender, message_receiver) = mpsc::channel(buffer_size); - let (register_sender, register_receiver) = mpsc::unbounded(); + let (register_sender, register_receiver) = + tracing_unbounded("mpsc_telemetry_register", 10_000); Ok(Self { message_receiver, @@ -360,7 +362,7 @@ impl TelemetryWorker { #[derive(Debug, Clone)] pub struct TelemetryWorkerHandle { message_sender: mpsc::Sender, - register_sender: mpsc::UnboundedSender, + register_sender: TracingUnboundedSender, id_counter: Arc, } @@ -386,7 +388,7 @@ impl TelemetryWorkerHandle { #[derive(Debug)] pub struct Telemetry { message_sender: mpsc::Sender, - register_sender: mpsc::UnboundedSender, + register_sender: TracingUnboundedSender, id: Id, connection_notifier: TelemetryConnectionNotifier, endpoints: Option, @@ -460,7 +462,7 @@ impl TelemetryHandle { /// (re-)establishes. #[derive(Clone, Debug)] pub struct TelemetryConnectionNotifier { - register_sender: mpsc::UnboundedSender, + register_sender: TracingUnboundedSender, addresses: Vec, } diff --git a/client/transaction-pool/src/graph/watcher.rs b/client/transaction-pool/src/graph/watcher.rs index 0613300c8684b..df5bb94edfe6d 100644 --- a/client/transaction-pool/src/graph/watcher.rs +++ b/client/transaction-pool/src/graph/watcher.rs @@ -62,7 +62,7 @@ impl Default for Sender { impl Sender { /// Add a new watcher to this sender object. pub fn new_watcher(&mut self, hash: H) -> Watcher { - let (tx, receiver) = tracing_unbounded("mpsc_txpool_watcher"); + let (tx, receiver) = tracing_unbounded("mpsc_txpool_watcher", 100_000); self.receivers.push(tx); Watcher { receiver, hash } } diff --git a/client/transaction-pool/src/revalidation.rs b/client/transaction-pool/src/revalidation.rs index b4b4299240a32..d8c8bea625fb3 100644 --- a/client/transaction-pool/src/revalidation.rs +++ b/client/transaction-pool/src/revalidation.rs @@ -291,7 +291,7 @@ where pool: Arc>, interval: Duration, ) -> (Self, Pin + Send>>) { - let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue"); + let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue", 100_000); let worker = RevalidationWorker::new(api.clone(), pool.clone()); diff --git a/client/utils/src/mpsc.rs b/client/utils/src/mpsc.rs index ee3fba4a5ee67..7db5e49f5bca9 100644 --- a/client/utils/src/mpsc.rs +++ b/client/utils/src/mpsc.rs @@ -45,71 +45,136 @@ mod inner { stream::{FusedStream, Stream}, task::{Context, Poll}, }; - use std::pin::Pin; + use log::error; + use std::{ + backtrace::{Backtrace, BacktraceStatus}, + pin::Pin, + sync::{ + atomic::{AtomicBool, AtomicI64, Ordering}, + Arc, + }, + }; /// Wrapper Type around `UnboundedSender` that increases the global /// measure when a message is added #[derive(Debug)] - pub struct TracingUnboundedSender(&'static str, UnboundedSender); + pub struct TracingUnboundedSender { + inner: UnboundedSender, + name: &'static str, + // To not bother with ordering and possible underflow errors of the unsigned counter + // we just use `i64` and `Ordering::Relaxed`, and perceive `queue_size` as approximate. + // It can turn < 0 though. + queue_size: Arc, + queue_size_warning: i64, + warning_fired: Arc, + creation_backtrace: Arc, + } // Strangely, deriving `Clone` requires that `T` is also `Clone`. impl Clone for TracingUnboundedSender { fn clone(&self) -> Self { - Self(self.0, self.1.clone()) + Self { + inner: self.inner.clone(), + name: self.name, + queue_size: self.queue_size.clone(), + queue_size_warning: self.queue_size_warning, + warning_fired: self.warning_fired.clone(), + creation_backtrace: self.creation_backtrace.clone(), + } } } /// Wrapper Type around `UnboundedReceiver` that decreases the global /// measure when a message is polled #[derive(Debug)] - pub struct TracingUnboundedReceiver(&'static str, UnboundedReceiver); + pub struct TracingUnboundedReceiver { + inner: UnboundedReceiver, + name: &'static str, + queue_size: Arc, + } /// Wrapper around `mpsc::unbounded` that tracks the in- and outflow via - /// `UNBOUNDED_CHANNELS_COUNTER` + /// `UNBOUNDED_CHANNELS_COUNTER` and warns if the message queue grows + /// above the warning threshold. pub fn tracing_unbounded( - key: &'static str, + name: &'static str, + queue_size_warning: i64, ) -> (TracingUnboundedSender, TracingUnboundedReceiver) { let (s, r) = mpsc::unbounded(); - (TracingUnboundedSender(key, s), TracingUnboundedReceiver(key, r)) + let queue_size = Arc::new(AtomicI64::new(0)); + let sender = TracingUnboundedSender { + inner: s, + name, + queue_size: queue_size.clone(), + queue_size_warning, + warning_fired: Arc::new(AtomicBool::new(false)), + creation_backtrace: Arc::new(Backtrace::capture()), + }; + let receiver = TracingUnboundedReceiver { inner: r, name, queue_size }; + (sender, receiver) } impl TracingUnboundedSender { /// Proxy function to mpsc::UnboundedSender pub fn poll_ready(&self, ctx: &mut Context) -> Poll> { - self.1.poll_ready(ctx) + self.inner.poll_ready(ctx) } /// Proxy function to mpsc::UnboundedSender pub fn is_closed(&self) -> bool { - self.1.is_closed() + self.inner.is_closed() } /// Proxy function to mpsc::UnboundedSender pub fn close_channel(&self) { - self.1.close_channel() + self.inner.close_channel() } /// Proxy function to mpsc::UnboundedSender pub fn disconnect(&mut self) { - self.1.disconnect() + self.inner.disconnect() } - /// Proxy function to mpsc::UnboundedSender pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { - self.1.start_send(msg) + // The underlying implementation of [`UnboundedSender::start_send`] is the same as + // [`UnboundedSender::unbounded_send`], so we just reuse the message counting and + // error reporting code from `unbounded_send`. + self.unbounded_send(msg).map_err(TrySendError::into_send_error) } /// Proxy function to mpsc::UnboundedSender pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError> { - self.1.unbounded_send(msg).map(|s| { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, "send"]).inc(); + self.inner.unbounded_send(msg).map(|s| { + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "send"]).inc(); + + let queue_size = self.queue_size.fetch_add(1, Ordering::Relaxed); + if queue_size == self.queue_size_warning && + !self.warning_fired.load(Ordering::Relaxed) + { + // `warning_fired` and `queue_size` are not synchronized, so it's possible + // that the warning is fired few times before the `warning_fired` is seen + // by all threads. This seems better than introducing a mutex guarding them. + self.warning_fired.store(true, Ordering::Relaxed); + match self.creation_backtrace.status() { + BacktraceStatus::Captured => error!( + "The number of unprocessed messages in channel `{}` reached {}.\n\ + The channel was created at:\n{}", + self.name, self.queue_size_warning, self.creation_backtrace, + ), + _ => error!( + "The number of unprocessed messages in channel `{}` reached {}.", + self.name, self.queue_size_warning, + ), + } + } + s }) } /// Proxy function to mpsc::UnboundedSender pub fn same_receiver(&self, other: &UnboundedSender) -> bool { - self.1.same_receiver(other) + self.inner.same_receiver(other) } } @@ -118,7 +183,7 @@ mod inner { // consume all items, make sure to reflect the updated count let mut count = 0; loop { - if self.1.is_terminated() { + if self.inner.is_terminated() { break } @@ -129,7 +194,9 @@ mod inner { } // and discount the messages if count > 0 { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, "dropped"]).inc_by(count); + UNBOUNDED_CHANNELS_COUNTER + .with_label_values(&[self.name, "dropped"]) + .inc_by(count); } } @@ -137,15 +204,16 @@ mod inner { /// that consumes all messages first and updates the counter pub fn close(&mut self) { self.consume(); - self.1.close() + self.inner.close() } /// Proxy function to mpsc::UnboundedReceiver /// that discounts the messages taken out pub fn try_next(&mut self) -> Result, TryRecvError> { - self.1.try_next().map(|s| { + self.inner.try_next().map(|s| { if s.is_some() { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, "received"]).inc(); + let _ = self.queue_size.fetch_sub(1, Ordering::Relaxed); + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "received"]).inc(); } s }) @@ -165,10 +233,11 @@ mod inner { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let s = self.get_mut(); - match Pin::new(&mut s.1).poll_next(cx) { + match Pin::new(&mut s.inner).poll_next(cx) { Poll::Ready(msg) => { if msg.is_some() { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.0, "received"]).inc(); + let _ = s.queue_size.fetch_sub(1, Ordering::Relaxed); + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, "received"]).inc(); } Poll::Ready(msg) }, @@ -179,7 +248,7 @@ mod inner { impl FusedStream for TracingUnboundedReceiver { fn is_terminated(&self) -> bool { - self.1.is_terminated() + self.inner.is_terminated() } } @@ -223,6 +292,10 @@ mod inner { } fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + // The difference with `TracingUnboundedSender` is intentional. The underlying + // implementation differs for `UnboundedSender` and `&UnboundedSender`: + // the latter closes the channel completely with `close_channel()`, while the former + // only closes this specific sender with `disconnect()`. self.close_channel(); Poll::Ready(Ok(())) } diff --git a/client/utils/src/notification.rs b/client/utils/src/notification.rs index ff527c343f9f2..4917a43265df4 100644 --- a/client/utils/src/notification.rs +++ b/client/utils/src/notification.rs @@ -79,8 +79,8 @@ impl NotificationStream { } /// Subscribe to a channel through which the generic payload can be received. - pub fn subscribe(&self) -> NotificationReceiver { - let receiver = self.hub.subscribe(()); + pub fn subscribe(&self, queue_size_warning: i64) -> NotificationReceiver { + let receiver = self.hub.subscribe((), queue_size_warning); NotificationReceiver { receiver } } } diff --git a/client/utils/src/notification/tests.rs b/client/utils/src/notification/tests.rs index a001fa7e89e95..f813f37d29ddb 100644 --- a/client/utils/src/notification/tests.rs +++ b/client/utils/src/notification/tests.rs @@ -36,7 +36,7 @@ fn notification_channel_simple() { // Create a future to receive a single notification // from the stream and verify its payload. - let future = stream.subscribe().take(1).for_each(move |payload| { + let future = stream.subscribe(100_000).take(1).for_each(move |payload| { let test_payload = closure_payload.clone(); async move { assert_eq!(payload, test_payload); diff --git a/client/utils/src/pubsub.rs b/client/utils/src/pubsub.rs index ba6e9ddc6ca2a..f85f44b498841 100644 --- a/client/utils/src/pubsub.rs +++ b/client/utils/src/pubsub.rs @@ -164,7 +164,7 @@ impl Hub { /// Subscribe to this Hub using the `subs_key: K`. /// /// A subscription with a key `K` is possible if the Registry implements `Subscribe`. - pub fn subscribe(&self, subs_key: K) -> Receiver + pub fn subscribe(&self, subs_key: K, queue_size_warning: i64) -> Receiver where R: Subscribe + Unsubscribe, { @@ -178,7 +178,7 @@ impl Hub { // have the sink disposed. shared_borrowed.registry.subscribe(subs_key, subs_id); - let (tx, rx) = crate::mpsc::tracing_unbounded(self.tracing_key); + let (tx, rx) = crate::mpsc::tracing_unbounded(self.tracing_key, queue_size_warning); assert!(shared_borrowed.sinks.insert(subs_id, tx).is_none(), "Used IDSequence to create another ID. Should be unique until u64 is overflowed. Should be unique."); Receiver { shared: Arc::downgrade(&self.shared), subs_id, rx } diff --git a/client/utils/src/pubsub/tests/normal_operation.rs b/client/utils/src/pubsub/tests/normal_operation.rs index a13c718d74a8f..830388de32e46 100644 --- a/client/utils/src/pubsub/tests/normal_operation.rs +++ b/client/utils/src/pubsub/tests/normal_operation.rs @@ -27,7 +27,7 @@ fn positive_rx_receives_relevant_messages_and_terminates_upon_hub_drop() { // No subscribers yet. That message is not supposed to get to anyone. hub.send(0); - let mut rx_01 = hub.subscribe(SubsKey::new()); + let mut rx_01 = hub.subscribe(SubsKey::new(), 100_000); assert_eq!(hub.subs_count(), 1); // That message is sent after subscription. Should be delivered into rx_01. @@ -49,9 +49,9 @@ fn positive_subs_count_is_correct_upon_drop_of_rxs() { let hub = TestHub::new(TK); assert_eq!(hub.subs_count(), 0); - let rx_01 = hub.subscribe(SubsKey::new()); + let rx_01 = hub.subscribe(SubsKey::new(), 100_000); assert_eq!(hub.subs_count(), 1); - let rx_02 = hub.subscribe(SubsKey::new()); + let rx_02 = hub.subscribe(SubsKey::new(), 100_000); assert_eq!(hub.subs_count(), 2); std::mem::drop(rx_01); @@ -69,11 +69,11 @@ fn positive_subs_count_is_correct_upon_drop_of_rxs_on_cloned_hubs() { assert_eq!(hub_01.subs_count(), 0); assert_eq!(hub_02.subs_count(), 0); - let rx_01 = hub_02.subscribe(SubsKey::new()); + let rx_01 = hub_02.subscribe(SubsKey::new(), 100_000); assert_eq!(hub_01.subs_count(), 1); assert_eq!(hub_02.subs_count(), 1); - let rx_02 = hub_02.subscribe(SubsKey::new()); + let rx_02 = hub_02.subscribe(SubsKey::new(), 100_000); assert_eq!(hub_01.subs_count(), 2); assert_eq!(hub_02.subs_count(), 2); diff --git a/client/utils/src/pubsub/tests/panicking_registry.rs b/client/utils/src/pubsub/tests/panicking_registry.rs index 26ce63bd51b01..cfe8168d80229 100644 --- a/client/utils/src/pubsub/tests/panicking_registry.rs +++ b/client/utils/src/pubsub/tests/panicking_registry.rs @@ -30,7 +30,7 @@ fn t01() { let hub = TestHub::new(TK); assert_hub_props(&hub, 0, 0); - let rx_01 = hub.subscribe(SubsKey::new()); + let rx_01 = hub.subscribe(SubsKey::new(), 100_000); assert_hub_props(&hub, 1, 1); std::mem::drop(rx_01); @@ -45,17 +45,17 @@ fn t02() { assert_hub_props(&hub, 0, 0); // Subscribe rx-01 - let rx_01 = hub.subscribe(SubsKey::new()); + let rx_01 = hub.subscribe(SubsKey::new(), 100_000); assert_hub_props(&hub, 1, 1); // Subscribe rx-02 so that its unsubscription will lead to an attempt to drop rx-01 in the // middle of unsubscription of rx-02 - let rx_02 = hub.subscribe(SubsKey::new().with_receiver(rx_01)); + let rx_02 = hub.subscribe(SubsKey::new().with_receiver(rx_01), 100_000); assert_hub_props(&hub, 2, 2); // Subscribe rx-03 in order to see that it will receive messages after the unclean // unsubscription - let mut rx_03 = hub.subscribe(SubsKey::new()); + let mut rx_03 = hub.subscribe(SubsKey::new(), 100_000); assert_hub_props(&hub, 3, 3); // drop rx-02 leads to an attempt to unsubscribe rx-01 @@ -69,7 +69,7 @@ fn t02() { // Subscribe rx-04 in order to see that it will receive messages after the unclean // unsubscription - let mut rx_04 = hub.subscribe(SubsKey::new()); + let mut rx_04 = hub.subscribe(SubsKey::new(), 100_000); assert_hub_props(&hub, 3, 3); hub.send(2); @@ -96,8 +96,8 @@ fn t02() { } async fn add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(hub: &TestHub) { - let rx_01 = hub.subscribe(SubsKey::new()); - let rx_02 = hub.subscribe(SubsKey::new()); + let rx_01 = hub.subscribe(SubsKey::new(), 100_000); + let rx_02 = hub.subscribe(SubsKey::new(), 100_000); hub.send(1); hub.send(2); @@ -121,9 +121,8 @@ fn t03() { add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; assert_hub_props(&hub, 0, 0); - assert!(catch_unwind(AssertUnwindSafe( - || hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnSubscribePanicBefore)) - )) + assert!(catch_unwind(AssertUnwindSafe(|| hub + .subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnSubscribePanicBefore), 100_000))) .is_err()); assert_hub_props(&hub, 0, 0); @@ -141,9 +140,8 @@ fn t04() { add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; assert_hub_props(&hub, 0, 0); - assert!(catch_unwind(AssertUnwindSafe( - || hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnSubscribePanicAfter)) - )) + assert!(catch_unwind(AssertUnwindSafe(|| hub + .subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnSubscribePanicAfter), 100_000))) .is_err()); // the registry has panicked after it has added a subs-id into its internal storage — the @@ -163,8 +161,8 @@ fn t05() { add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; assert_hub_props(&hub, 0, 0); - let rx_01 = - hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnUnsubscribePanicBefore)); + let rx_01 = hub + .subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnUnsubscribePanicBefore), 100_000); assert_hub_props(&hub, 1, 1); add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; @@ -189,7 +187,8 @@ fn t06() { add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; assert_hub_props(&hub, 0, 0); - let rx_01 = hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnUnsubscribePanicAfter)); + let rx_01 = hub + .subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnUnsubscribePanicAfter), 100_000); assert_hub_props(&hub, 1, 1); add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; @@ -214,7 +213,8 @@ fn t07() { add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; assert_hub_props(&hub, 0, 0); - let rx_01 = hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnDispatchPanicBefore)); + let rx_01 = + hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnDispatchPanicBefore), 100_000); assert_hub_props(&hub, 1, 1); assert!(catch_unwind(AssertUnwindSafe(|| hub.send(1))).is_err()); assert_hub_props(&hub, 1, 1); @@ -235,7 +235,8 @@ fn t08() { add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; assert_hub_props(&hub, 0, 0); - let rx_01 = hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnDispatchPanicAfter)); + let rx_01 = + hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnDispatchPanicAfter), 100_000); assert_hub_props(&hub, 1, 1); assert!(catch_unwind(AssertUnwindSafe(|| hub.send(1))).is_err()); assert_hub_props(&hub, 1, 1); diff --git a/client/utils/src/status_sinks.rs b/client/utils/src/status_sinks.rs index a1d965d08085e..c536e2c18c6a1 100644 --- a/client/utils/src/status_sinks.rs +++ b/client/utils/src/status_sinks.rs @@ -58,7 +58,7 @@ impl Default for StatusSinks { impl StatusSinks { /// Builds a new empty collection. pub fn new() -> StatusSinks { - let (entries_tx, entries_rx) = tracing_unbounded("status-sinks-entries"); + let (entries_tx, entries_rx) = tracing_unbounded("status-sinks-entries", 100_000); StatusSinks { inner: Mutex::new(Inner { entries: stream::FuturesUnordered::new(), entries_rx }), @@ -196,7 +196,7 @@ mod tests { let status_sinks = StatusSinks::new(); - let (tx, rx) = tracing_unbounded("test"); + let (tx, rx) = tracing_unbounded("test", 100_000); status_sinks.push(Duration::from_millis(100), tx); let mut val_order = 5;