From 8d1a0273de9e4666a337eb31a3bbd31947c21a30 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 27 Dec 2024 12:30:50 +0000 Subject: [PATCH] Revert "Remove network starter that is no longer needed (#6400)" This reverts commit b601d57aa07b344338f9526073b718923a9223bb. --- .../relay-chain-minimal-node/src/lib.rs | 4 +- .../relay-chain-minimal-node/src/network.rs | 26 ++++++-- cumulus/client/service/src/lib.rs | 3 +- .../polkadot-omni-node/lib/src/common/spec.rs | 4 +- .../lib/src/nodes/manual_seal.rs | 3 +- cumulus/test/service/src/lib.rs | 4 +- polkadot/node/service/src/lib.rs | 4 +- substrate/bin/node/cli/src/service.rs | 3 +- substrate/client/service/src/builder.rs | 59 ++++++++++++++++++- substrate/client/service/src/lib.rs | 4 +- templates/minimal/node/src/service.rs | 3 +- templates/parachain/node/src/service.rs | 4 +- templates/solochain/node/src/service.rs | 3 +- 13 files changed, 105 insertions(+), 19 deletions(-) diff --git a/cumulus/client/relay-chain-minimal-node/src/lib.rs b/cumulus/client/relay-chain-minimal-node/src/lib.rs index f70a73a5d5ce..a3d858ea40c9 100644 --- a/cumulus/client/relay-chain-minimal-node/src/lib.rs +++ b/cumulus/client/relay-chain-minimal-node/src/lib.rs @@ -224,7 +224,7 @@ async fn new_minimal_relay_chain( + let (network, network_starter, sync_service) = build_collator_network::( &config, net_config, task_manager.spawn_handle(), @@ -262,6 +262,8 @@ async fn new_minimal_relay_chain>( genesis_hash: Hash, best_header: Header, notification_metrics: NotificationMetrics, -) -> Result<(Arc, Arc), Error> { +) -> Result< + (Arc, NetworkStarter, Arc), + Error, +> { let protocol_id = config.protocol_id(); let (block_announce_config, _notification_service) = get_block_announce_proto_config::( protocol_id.clone(), @@ -82,6 +85,8 @@ pub(crate) fn build_collator_network>( let network_worker = Network::new(network_params)?; let network_service = network_worker.network_service(); + let (network_start_tx, network_start_rx) = futures::channel::oneshot::channel(); + // The network worker is responsible for gathering all network messages and processing // them. This is quite a heavy task, and at the time of the writing of this comment it // frequently happens that this future takes several seconds or in some situations @@ -89,9 +94,22 @@ pub(crate) fn build_collator_network>( // issue, and ideally we would like to fix the network future to take as little time as // possible, but we also take the extra harm-prevention measure to execute the networking // future using `spawn_blocking`. - spawn_handle.spawn_blocking("network-worker", Some("networking"), network_worker.run()); + spawn_handle.spawn_blocking("network-worker", Some("networking"), async move { + if network_start_rx.await.is_err() { + tracing::warn!( + "The NetworkStart returned as part of `build_network` has been silently dropped" + ); + // This `return` might seem unnecessary, but we don't want to make it look like + // everything is working as normal even though the user is clearly misusing the API. + return + } + + network_worker.run().await; + }); + + let network_starter = NetworkStarter::new(network_start_tx); - Ok((network_service, Arc::new(SyncOracle {}))) + Ok((network_service, network_starter, Arc::new(SyncOracle {}))) } fn adjust_network_config_light_in_peers(config: &mut NetworkConfiguration) { diff --git a/cumulus/client/service/src/lib.rs b/cumulus/client/service/src/lib.rs index 912109c2ad32..ae83f2ade3f6 100644 --- a/cumulus/client/service/src/lib.rs +++ b/cumulus/client/service/src/lib.rs @@ -40,7 +40,7 @@ use sc_consensus::{ use sc_network::{config::SyncMode, service::traits::NetworkService, NetworkBackend}; use sc_network_sync::SyncingService; use sc_network_transactions::TransactionsHandlerController; -use sc_service::{Configuration, SpawnTaskHandle, TaskManager, WarpSyncConfig}; +use sc_service::{Configuration, NetworkStarter, SpawnTaskHandle, TaskManager, WarpSyncConfig}; use sc_telemetry::{log, TelemetryWorkerHandle}; use sc_utils::mpsc::TracingUnboundedSender; use sp_api::ProvideRuntimeApi; @@ -439,6 +439,7 @@ pub async fn build_network<'a, Block, Client, RCInterface, IQ, Network>( Arc, TracingUnboundedSender>, TransactionsHandlerController, + NetworkStarter, Arc>, )> where diff --git a/cumulus/polkadot-omni-node/lib/src/common/spec.rs b/cumulus/polkadot-omni-node/lib/src/common/spec.rs index 868368f3ca1a..f81df654a460 100644 --- a/cumulus/polkadot-omni-node/lib/src/common/spec.rs +++ b/cumulus/polkadot-omni-node/lib/src/common/spec.rs @@ -289,7 +289,7 @@ pub(crate) trait NodeSpec: BaseNodeSpec { prometheus_registry.clone(), ); - let (network, system_rpc_tx, tx_handler_controller, sync_service) = + let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) = build_network(BuildNetworkParams { parachain_config: ¶chain_config, net_config, @@ -397,6 +397,8 @@ pub(crate) trait NodeSpec: BaseNodeSpec { )?; } + start_network.start_network(); + Ok(task_manager) }; diff --git a/cumulus/polkadot-omni-node/lib/src/nodes/manual_seal.rs b/cumulus/polkadot-omni-node/lib/src/nodes/manual_seal.rs index f33865ad45cd..bf5a41b2608a 100644 --- a/cumulus/polkadot-omni-node/lib/src/nodes/manual_seal.rs +++ b/cumulus/polkadot-omni-node/lib/src/nodes/manual_seal.rs @@ -103,7 +103,7 @@ impl ManualSealNode { config.prometheus_config.as_ref().map(|cfg| &cfg.registry), ); - let (network, system_rpc_tx, tx_handler_controller, sync_service) = + let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, client: client.clone(), @@ -247,6 +247,7 @@ impl ManualSealNode { telemetry: telemetry.as_mut(), })?; + start_network.start_network(); Ok(task_manager) } } diff --git a/cumulus/test/service/src/lib.rs b/cumulus/test/service/src/lib.rs index 2c13d20333a7..96fe123853aa 100644 --- a/cumulus/test/service/src/lib.rs +++ b/cumulus/test/service/src/lib.rs @@ -374,7 +374,7 @@ where prometheus_registry.clone(), ); - let (network, system_rpc_tx, tx_handler_controller, sync_service) = + let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) = build_network(BuildNetworkParams { parachain_config: ¶chain_config, net_config, @@ -540,6 +540,8 @@ where } } + start_network.start_network(); + Ok((task_manager, client, network, rpc_handlers, transaction_pool, backend)) } diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 227bc5253994..d2424474302a 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -1003,7 +1003,7 @@ pub fn new_full< }) }; - let (network, system_rpc_tx, tx_handler_controller, sync_service) = + let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, net_config, @@ -1383,6 +1383,8 @@ pub fn new_full< ); } + network_starter.start_network(); + Ok(NewFull { task_manager, client, diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index 5f6806c235f6..5e370f43275b 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -513,7 +513,7 @@ pub fn new_full_base::Hash>>( Vec::default(), )); - let (network, system_rpc_tx, tx_handler_controller, sync_service) = + let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, net_config, @@ -801,6 +801,7 @@ pub fn new_full_base::Hash>>( ); } + network_starter.start_network(); Ok(NewFullBase { task_manager, client, diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index a47a05c0a190..8321005980d6 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -25,7 +25,7 @@ use crate::{ start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers, SpawnTaskHandle, TaskManager, TransactionPoolAdapter, }; -use futures::{select, FutureExt, StreamExt}; +use futures::{channel::oneshot, select, FutureExt, StreamExt}; use jsonrpsee::RpcModule; use log::info; use prometheus_endpoint::Registry; @@ -845,6 +845,7 @@ pub fn build_network( Arc, TracingUnboundedSender>, sc_network_transactions::TransactionsHandlerController<::Hash>, + NetworkStarter, Arc>, ), Error, @@ -1006,6 +1007,7 @@ pub fn build_network_advanced( Arc, TracingUnboundedSender>, sc_network_transactions::TransactionsHandlerController<::Hash>, + NetworkStarter, Arc>, ), Error, @@ -1146,6 +1148,22 @@ where announce_block, ); + // TODO: Normally, one is supposed to pass a list of notifications protocols supported by the + // node through the `NetworkConfiguration` struct. But because this function doesn't know in + // advance which components, such as GrandPa or Polkadot, will be plugged on top of the + // service, it is unfortunately not possible to do so without some deep refactoring. To + // bypass this problem, the `NetworkService` provides a `register_notifications_protocol` + // method that can be called even after the network has been initialized. However, we want to + // avoid the situation where `register_notifications_protocol` is called *after* the network + // actually connects to other peers. For this reason, we delay the process of the network + // future until the user calls `NetworkStarter::start_network`. + // + // This entire hack should eventually be removed in favour of passing the list of protocols + // through the configuration. + // + // See also https://github.com/paritytech/substrate/issues/6827 + let (network_start_tx, network_start_rx) = oneshot::channel(); + // The network worker is responsible for gathering all network messages and processing // them. This is quite a heavy task, and at the time of the writing of this comment it // frequently happens that this future takes several seconds or in some situations @@ -1153,9 +1171,26 @@ where // issue, and ideally we would like to fix the network future to take as little time as // possible, but we also take the extra harm-prevention measure to execute the networking // future using `spawn_blocking`. - spawn_handle.spawn_blocking("network-worker", Some("networking"), future); + spawn_handle.spawn_blocking("network-worker", Some("networking"), async move { + if network_start_rx.await.is_err() { + log::warn!( + "The NetworkStart returned as part of `build_network` has been silently dropped" + ); + // This `return` might seem unnecessary, but we don't want to make it look like + // everything is working as normal even though the user is clearly misusing the API. + return + } + + future.await + }); - Ok((network, system_rpc_tx, tx_handler_controller, sync_service.clone())) + Ok(( + network, + system_rpc_tx, + tx_handler_controller, + NetworkStarter(network_start_tx), + sync_service.clone(), + )) } /// Configuration for [`build_default_syncing_engine`]. @@ -1384,3 +1419,21 @@ where warp_sync_protocol_name, )?)) } + +/// Object used to start the network. +#[must_use] +pub struct NetworkStarter(oneshot::Sender<()>); + +impl NetworkStarter { + /// Create a new NetworkStarter + pub fn new(sender: oneshot::Sender<()>) -> Self { + NetworkStarter(sender) + } + + /// Start the network. Call this after all sub-components have been initialized. + /// + /// > **Note**: If you don't call this function, the networking will not work. + pub fn start_network(self) { + let _ = self.0.send(()); + } +} diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index 2a3144a33e1a..5b79f3eef93e 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -61,8 +61,8 @@ pub use self::{ new_client, new_db_backend, new_full_client, new_full_parts, new_full_parts_record_import, new_full_parts_with_genesis_builder, new_wasm_executor, propagate_transaction_notifications, spawn_tasks, BuildNetworkAdvancedParams, - BuildNetworkParams, DefaultSyncingEngineConfig, KeystoreContainer, SpawnTasksParams, - TFullBackend, TFullCallExecutor, TFullClient, + BuildNetworkParams, DefaultSyncingEngineConfig, KeystoreContainer, NetworkStarter, + SpawnTasksParams, TFullBackend, TFullCallExecutor, TFullClient, }, client::{ClientConfig, LocalCallExecutor}, error::Error, diff --git a/templates/minimal/node/src/service.rs b/templates/minimal/node/src/service.rs index 5988dbf3ce6e..b4e6fc0b728b 100644 --- a/templates/minimal/node/src/service.rs +++ b/templates/minimal/node/src/service.rs @@ -134,7 +134,7 @@ pub fn new_full::Ha config.prometheus_config.as_ref().map(|cfg| &cfg.registry), ); - let (network, system_rpc_tx, tx_handler_controller, sync_service) = + let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, net_config, @@ -264,5 +264,6 @@ pub fn new_full::Ha _ => {}, } + network_starter.start_network(); Ok(task_manager) } diff --git a/templates/parachain/node/src/service.rs b/templates/parachain/node/src/service.rs index 8c526317283e..57ffcb9049d8 100644 --- a/templates/parachain/node/src/service.rs +++ b/templates/parachain/node/src/service.rs @@ -270,7 +270,7 @@ pub async fn start_parachain_node( // NOTE: because we use Aura here explicitly, we can use `CollatorSybilResistance::Resistant` // when starting the network. - let (network, system_rpc_tx, tx_handler_controller, sync_service) = + let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) = build_network(BuildNetworkParams { parachain_config: ¶chain_config, net_config, @@ -406,5 +406,7 @@ pub async fn start_parachain_node( )?; } + start_network.start_network(); + Ok((task_manager, client)) } diff --git a/templates/solochain/node/src/service.rs b/templates/solochain/node/src/service.rs index 79d97fbab8df..d6fcebe239f7 100644 --- a/templates/solochain/node/src/service.rs +++ b/templates/solochain/node/src/service.rs @@ -169,7 +169,7 @@ pub fn new_full< Vec::default(), )); - let (network, system_rpc_tx, tx_handler_controller, sync_service) = + let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, net_config, @@ -329,5 +329,6 @@ pub fn new_full< ); } + network_starter.start_network(); Ok(task_manager) }