Skip to content

Commit

Permalink
Revert "Remove network starter that is no longer needed (#6400)"
Browse files Browse the repository at this point in the history
This reverts commit b601d57.
  • Loading branch information
dmitry-markin committed Dec 27, 2024
1 parent ca78179 commit 8d1a027
Show file tree
Hide file tree
Showing 13 changed files with 105 additions and 19 deletions.
4 changes: 3 additions & 1 deletion cumulus/client/relay-chain-minimal-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ async fn new_minimal_relay_chain<Block: BlockT, Network: NetworkBackend<RelayBlo
.chain_get_header(None)
.await?
.ok_or_else(|| RelayChainError::RpcCallError("Unable to fetch best header".to_string()))?;
let (network, sync_service) = build_collator_network::<Network>(
let (network, network_starter, sync_service) = build_collator_network::<Network>(
&config,
net_config,
task_manager.spawn_handle(),
Expand Down Expand Up @@ -262,6 +262,8 @@ async fn new_minimal_relay_chain<Block: BlockT, Network: NetworkBackend<RelayBlo
let overseer_handle =
collator_overseer::spawn_overseer(overseer_args, &task_manager, relay_chain_rpc_client)?;

network_starter.start_network();

Ok(NewMinimalNode { task_manager, overseer_handle })
}

Expand Down
26 changes: 22 additions & 4 deletions cumulus/client/relay-chain-minimal-node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use sc_network::{

use sc_network::{config::FullNetworkConfiguration, NetworkBackend, NotificationService};
use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake};
use sc_service::{error::Error, Configuration, SpawnTaskHandle};
use sc_service::{error::Error, Configuration, NetworkStarter, SpawnTaskHandle};

use std::{iter, sync::Arc};

Expand All @@ -41,7 +41,10 @@ pub(crate) fn build_collator_network<Network: NetworkBackend<Block, Hash>>(
genesis_hash: Hash,
best_header: Header,
notification_metrics: NotificationMetrics,
) -> Result<(Arc<dyn NetworkService>, Arc<dyn sp_consensus::SyncOracle + Send + Sync>), Error> {
) -> Result<
(Arc<dyn NetworkService>, NetworkStarter, Arc<dyn sp_consensus::SyncOracle + Send + Sync>),
Error,
> {
let protocol_id = config.protocol_id();
let (block_announce_config, _notification_service) = get_block_announce_proto_config::<Network>(
protocol_id.clone(),
Expand Down Expand Up @@ -82,16 +85,31 @@ pub(crate) fn build_collator_network<Network: NetworkBackend<Block, Hash>>(
let network_worker = Network::new(network_params)?;
let network_service = network_worker.network_service();

let (network_start_tx, network_start_rx) = futures::channel::oneshot::channel();

// The network worker is responsible for gathering all network messages and processing
// them. This is quite a heavy task, and at the time of the writing of this comment it
// frequently happens that this future takes several seconds or in some situations
// even more than a minute until it has processed its entire queue. This is clearly an
// issue, and ideally we would like to fix the network future to take as little time as
// possible, but we also take the extra harm-prevention measure to execute the networking
// future using `spawn_blocking`.
spawn_handle.spawn_blocking("network-worker", Some("networking"), network_worker.run());
spawn_handle.spawn_blocking("network-worker", Some("networking"), async move {
if network_start_rx.await.is_err() {
tracing::warn!(
"The NetworkStart returned as part of `build_network` has been silently dropped"
);
// This `return` might seem unnecessary, but we don't want to make it look like
// everything is working as normal even though the user is clearly misusing the API.
return
}

network_worker.run().await;
});

let network_starter = NetworkStarter::new(network_start_tx);

Ok((network_service, Arc::new(SyncOracle {})))
Ok((network_service, network_starter, Arc::new(SyncOracle {})))
}

fn adjust_network_config_light_in_peers(config: &mut NetworkConfiguration) {
Expand Down
3 changes: 2 additions & 1 deletion cumulus/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use sc_consensus::{
use sc_network::{config::SyncMode, service::traits::NetworkService, NetworkBackend};
use sc_network_sync::SyncingService;
use sc_network_transactions::TransactionsHandlerController;
use sc_service::{Configuration, SpawnTaskHandle, TaskManager, WarpSyncConfig};
use sc_service::{Configuration, NetworkStarter, SpawnTaskHandle, TaskManager, WarpSyncConfig};
use sc_telemetry::{log, TelemetryWorkerHandle};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_api::ProvideRuntimeApi;
Expand Down Expand Up @@ -439,6 +439,7 @@ pub async fn build_network<'a, Block, Client, RCInterface, IQ, Network>(
Arc<dyn NetworkService>,
TracingUnboundedSender<sc_rpc::system::Request<Block>>,
TransactionsHandlerController<Block::Hash>,
NetworkStarter,
Arc<SyncingService<Block>>,
)>
where
Expand Down
4 changes: 3 additions & 1 deletion cumulus/polkadot-omni-node/lib/src/common/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ pub(crate) trait NodeSpec: BaseNodeSpec {
prometheus_registry.clone(),
);

let (network, system_rpc_tx, tx_handler_controller, sync_service) =
let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
build_network(BuildNetworkParams {
parachain_config: &parachain_config,
net_config,
Expand Down Expand Up @@ -397,6 +397,8 @@ pub(crate) trait NodeSpec: BaseNodeSpec {
)?;
}

start_network.start_network();

Ok(task_manager)
};

Expand Down
3 changes: 2 additions & 1 deletion cumulus/polkadot-omni-node/lib/src/nodes/manual_seal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl<NodeSpec: NodeSpecT> ManualSealNode<NodeSpec> {
config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
);

let (network, system_rpc_tx, tx_handler_controller, sync_service) =
let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
client: client.clone(),
Expand Down Expand Up @@ -247,6 +247,7 @@ impl<NodeSpec: NodeSpecT> ManualSealNode<NodeSpec> {
telemetry: telemetry.as_mut(),
})?;

start_network.start_network();
Ok(task_manager)
}
}
4 changes: 3 additions & 1 deletion cumulus/test/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ where
prometheus_registry.clone(),
);

let (network, system_rpc_tx, tx_handler_controller, sync_service) =
let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
build_network(BuildNetworkParams {
parachain_config: &parachain_config,
net_config,
Expand Down Expand Up @@ -540,6 +540,8 @@ where
}
}

start_network.start_network();

Ok((task_manager, client, network, rpc_handlers, transaction_pool, backend))
}

Expand Down
4 changes: 3 additions & 1 deletion polkadot/node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ pub fn new_full<
})
};

let (network, system_rpc_tx, tx_handler_controller, sync_service) =
let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
net_config,
Expand Down Expand Up @@ -1383,6 +1383,8 @@ pub fn new_full<
);
}

network_starter.start_network();

Ok(NewFull {
task_manager,
client,
Expand Down
3 changes: 2 additions & 1 deletion substrate/bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ pub fn new_full_base<N: NetworkBackend<Block, <Block as BlockT>::Hash>>(
Vec::default(),
));

let (network, system_rpc_tx, tx_handler_controller, sync_service) =
let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
net_config,
Expand Down Expand Up @@ -801,6 +801,7 @@ pub fn new_full_base<N: NetworkBackend<Block, <Block as BlockT>::Hash>>(
);
}

network_starter.start_network();
Ok(NewFullBase {
task_manager,
client,
Expand Down
59 changes: 56 additions & 3 deletions substrate/client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers, SpawnTaskHandle,
TaskManager, TransactionPoolAdapter,
};
use futures::{select, FutureExt, StreamExt};
use futures::{channel::oneshot, select, FutureExt, StreamExt};
use jsonrpsee::RpcModule;
use log::info;
use prometheus_endpoint::Registry;
Expand Down Expand Up @@ -845,6 +845,7 @@ pub fn build_network<Block, Net, TxPool, IQ, Client>(
Arc<dyn sc_network::service::traits::NetworkService>,
TracingUnboundedSender<sc_rpc::system::Request<Block>>,
sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
NetworkStarter,
Arc<SyncingService<Block>>,
),
Error,
Expand Down Expand Up @@ -1006,6 +1007,7 @@ pub fn build_network_advanced<Block, Net, TxPool, IQ, Client>(
Arc<dyn sc_network::service::traits::NetworkService>,
TracingUnboundedSender<sc_rpc::system::Request<Block>>,
sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
NetworkStarter,
Arc<SyncingService<Block>>,
),
Error,
Expand Down Expand Up @@ -1146,16 +1148,49 @@ where
announce_block,
);

// TODO: Normally, one is supposed to pass a list of notifications protocols supported by the
// node through the `NetworkConfiguration` struct. But because this function doesn't know in
// advance which components, such as GrandPa or Polkadot, will be plugged on top of the
// service, it is unfortunately not possible to do so without some deep refactoring. To
// bypass this problem, the `NetworkService` provides a `register_notifications_protocol`
// method that can be called even after the network has been initialized. However, we want to
// avoid the situation where `register_notifications_protocol` is called *after* the network
// actually connects to other peers. For this reason, we delay the process of the network
// future until the user calls `NetworkStarter::start_network`.
//
// This entire hack should eventually be removed in favour of passing the list of protocols
// through the configuration.
//
// See also https://github.com/paritytech/substrate/issues/6827
let (network_start_tx, network_start_rx) = oneshot::channel();

// The network worker is responsible for gathering all network messages and processing
// them. This is quite a heavy task, and at the time of the writing of this comment it
// frequently happens that this future takes several seconds or in some situations
// even more than a minute until it has processed its entire queue. This is clearly an
// issue, and ideally we would like to fix the network future to take as little time as
// possible, but we also take the extra harm-prevention measure to execute the networking
// future using `spawn_blocking`.
spawn_handle.spawn_blocking("network-worker", Some("networking"), future);
spawn_handle.spawn_blocking("network-worker", Some("networking"), async move {
if network_start_rx.await.is_err() {
log::warn!(
"The NetworkStart returned as part of `build_network` has been silently dropped"
);
// This `return` might seem unnecessary, but we don't want to make it look like
// everything is working as normal even though the user is clearly misusing the API.
return
}

future.await
});

Ok((network, system_rpc_tx, tx_handler_controller, sync_service.clone()))
Ok((
network,
system_rpc_tx,
tx_handler_controller,
NetworkStarter(network_start_tx),
sync_service.clone(),
))
}

/// Configuration for [`build_default_syncing_engine`].
Expand Down Expand Up @@ -1384,3 +1419,21 @@ where
warp_sync_protocol_name,
)?))
}

/// Object used to start the network.
#[must_use]
pub struct NetworkStarter(oneshot::Sender<()>);

impl NetworkStarter {
/// Create a new NetworkStarter
pub fn new(sender: oneshot::Sender<()>) -> Self {
NetworkStarter(sender)
}

/// Start the network. Call this after all sub-components have been initialized.
///
/// > **Note**: If you don't call this function, the networking will not work.
pub fn start_network(self) {
let _ = self.0.send(());
}
}
4 changes: 2 additions & 2 deletions substrate/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ pub use self::{
new_client, new_db_backend, new_full_client, new_full_parts, new_full_parts_record_import,
new_full_parts_with_genesis_builder, new_wasm_executor,
propagate_transaction_notifications, spawn_tasks, BuildNetworkAdvancedParams,
BuildNetworkParams, DefaultSyncingEngineConfig, KeystoreContainer, SpawnTasksParams,
TFullBackend, TFullCallExecutor, TFullClient,
BuildNetworkParams, DefaultSyncingEngineConfig, KeystoreContainer, NetworkStarter,
SpawnTasksParams, TFullBackend, TFullCallExecutor, TFullClient,
},
client::{ClientConfig, LocalCallExecutor},
error::Error,
Expand Down
3 changes: 2 additions & 1 deletion templates/minimal/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub fn new_full<Network: sc_network::NetworkBackend<Block, <Block as BlockT>::Ha
config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
);

let (network, system_rpc_tx, tx_handler_controller, sync_service) =
let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
net_config,
Expand Down Expand Up @@ -264,5 +264,6 @@ pub fn new_full<Network: sc_network::NetworkBackend<Block, <Block as BlockT>::Ha
_ => {},
}

network_starter.start_network();
Ok(task_manager)
}
4 changes: 3 additions & 1 deletion templates/parachain/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ pub async fn start_parachain_node(

// NOTE: because we use Aura here explicitly, we can use `CollatorSybilResistance::Resistant`
// when starting the network.
let (network, system_rpc_tx, tx_handler_controller, sync_service) =
let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
build_network(BuildNetworkParams {
parachain_config: &parachain_config,
net_config,
Expand Down Expand Up @@ -406,5 +406,7 @@ pub async fn start_parachain_node(
)?;
}

start_network.start_network();

Ok((task_manager, client))
}
3 changes: 2 additions & 1 deletion templates/solochain/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ pub fn new_full<
Vec::default(),
));

let (network, system_rpc_tx, tx_handler_controller, sync_service) =
let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
net_config,
Expand Down Expand Up @@ -329,5 +329,6 @@ pub fn new_full<
);
}

network_starter.start_network();
Ok(task_manager)
}

0 comments on commit 8d1a027

Please sign in to comment.