Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Don't send ActiveLeaves from leaves in db on startup in Overseer (#…
Browse files Browse the repository at this point in the history
…6727)

* Don't send `ActiveLeaves` from leaves in db on startup in Overseer. Wait for fresh leaves instead.

* Don't pass initial set of leaves to Overseer

* Fix compilation error in subsystem-test-helpers
  • Loading branch information
tdimitrov authored Mar 6, 2023
1 parent d6057be commit 177ceca
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 133 deletions.
1 change: 0 additions & 1 deletion node/overseer/src/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ where
.span_per_active_leaf(Default::default())
.active_leaves(Default::default())
.known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE))
.leaves(Default::default())
.spawner(SpawnGlue(spawner))
.metrics(metrics)
.supports_parachains(supports_parachains);
Expand Down
15 changes: 0 additions & 15 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,6 @@ pub struct Overseer<SupportsParachains> {
/// Stores the [`jaeger::Span`] per active leaf.
pub span_per_active_leaf: HashMap<Hash, Arc<jaeger::Span>>,

/// A set of leaves that `Overseer` starts working with.
///
/// Drained at the beginning of `run` and never used again.
pub leaves: Vec<(Hash, BlockNumber)>,

/// The set of the "active leaves".
pub active_leaves: HashMap<Hash, BlockNumber>,

Expand Down Expand Up @@ -728,16 +723,6 @@ where
let metrics = self.metrics.clone();
spawn_metronome_metrics(&mut self, metrics)?;

// Notify about active leaves on startup before starting the loop
for (hash, number) in std::mem::take(&mut self.leaves) {
let _ = self.active_leaves.insert(hash, number);
if let Some((span, status)) = self.on_head_activated(&hash, None).await {
let update =
ActiveLeavesUpdate::start_work(ActivatedLeaf { hash, number, status, span });
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
}

loop {
select! {
msg = self.events_rx.select_next_some() => {
Expand Down
80 changes: 34 additions & 46 deletions node/overseer/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,6 @@ use sp_core::crypto::Pair as _;

use super::*;

fn block_info_to_pair(blocks: impl IntoIterator<Item = BlockInfo>) -> Vec<(Hash, BlockNumber)> {
Vec::from_iter(
blocks
.into_iter()
.map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)),
)
}

type SpawnedSubsystem = crate::gen::SpawnedSubsystem<SubsystemError>;

struct TestSubsystem1(metered::MeteredSender<usize>);
Expand Down Expand Up @@ -223,20 +215,16 @@ fn overseer_metrics_work() {
executor::block_on(async move {
let first_block_hash = [1; 32].into();
let second_block_hash = [2; 32].into();
let third_block_hash = [3; 32].into();

let first_block =
BlockInfo { hash: first_block_hash, parent_hash: [0; 32].into(), number: 1 };
let second_block =
BlockInfo { hash: second_block_hash, parent_hash: first_block_hash, number: 2 };
let third_block =
BlockInfo { hash: third_block_hash, parent_hash: second_block_hash, number: 3 };

let registry = prometheus::Registry::new();
let (overseer, handle) =
dummy_overseer_builder(spawner, MockSupportsParachains, Some(&registry))
.unwrap()
.leaves(block_info_to_pair(vec![first_block]))
.build()
.unwrap();

Expand All @@ -245,8 +233,8 @@ fn overseer_metrics_work() {

pin_mut!(overseer_fut);

handle.block_imported(first_block).await;
handle.block_imported(second_block).await;
handle.block_imported(third_block).await;
handle
.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg()))
.await;
Expand All @@ -256,8 +244,8 @@ fn overseer_metrics_work() {
res = overseer_fut => {
assert!(res.is_ok());
let metrics = extract_metrics(&registry);
assert_eq!(metrics["activated"], 3);
assert_eq!(metrics["deactivated"], 2);
assert_eq!(metrics["activated"], 2);
assert_eq!(metrics["deactivated"], 1);
assert_eq!(metrics["relayed"], 1);
},
complete => (),
Expand Down Expand Up @@ -379,14 +367,11 @@ fn overseer_start_stop_works() {
executor::block_on(async move {
let first_block_hash = [1; 32].into();
let second_block_hash = [2; 32].into();
let third_block_hash = [3; 32].into();

let first_block =
BlockInfo { hash: first_block_hash, parent_hash: [0; 32].into(), number: 1 };
let second_block =
BlockInfo { hash: second_block_hash, parent_hash: first_block_hash, number: 2 };
let third_block =
BlockInfo { hash: third_block_hash, parent_hash: second_block_hash, number: 3 };

let (tx_5, mut rx_5) = metered::channel(64);
let (tx_6, mut rx_6) = metered::channel(64);
Expand All @@ -395,7 +380,6 @@ fn overseer_start_stop_works() {
.unwrap()
.replace_candidate_validation(move |_| TestSubsystem5(tx_5))
.replace_candidate_backing(move |_| TestSubsystem6(tx_6))
.leaves(block_info_to_pair(vec![first_block]))
.build()
.unwrap();
let mut handle = Handle::new(handle);
Expand All @@ -406,33 +390,27 @@ fn overseer_start_stop_works() {
let mut ss5_results = Vec::new();
let mut ss6_results = Vec::new();

handle.block_imported(first_block).await;
handle.block_imported(second_block).await;
handle.block_imported(third_block).await;

let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: first_block_hash,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
})),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: Some(ActivatedLeaf {
hash: second_block_hash,
number: 2,
hash: first_block_hash,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
}),
deactivated: [first_block_hash].as_ref().into(),
deactivated: Default::default(),
}),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: Some(ActivatedLeaf {
hash: third_block_hash,
number: 3,
hash: second_block_hash,
number: 2,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
}),
deactivated: [second_block_hash].as_ref().into(),
deactivated: [first_block_hash].as_ref().into(),
}),
];

Expand Down Expand Up @@ -494,7 +472,6 @@ fn overseer_finalize_works() {
.unwrap()
.replace_candidate_validation(move |_| TestSubsystem5(tx_5))
.replace_candidate_backing(move |_| TestSubsystem6(tx_6))
.leaves(block_info_to_pair(vec![first_block, second_block]))
.build()
.unwrap();
let mut handle = Handle::new(handle);
Expand All @@ -505,22 +482,32 @@ fn overseer_finalize_works() {
let mut ss5_results = Vec::new();
let mut ss6_results = Vec::new();

// activate two blocks
handle.block_imported(first_block).await;
handle.block_imported(second_block).await;

// this should stop work on both forks we started with earlier.
handle.block_finalized(third_block).await;

let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: first_block_hash,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
})),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: second_block_hash,
number: 2,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
})),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: Some(ActivatedLeaf {
hash: first_block_hash,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
}),
deactivated: Default::default(),
}),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: Some(ActivatedLeaf {
hash: second_block_hash,
number: 2,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
}),
deactivated: Default::default(),
}),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
deactivated: [first_block_hash, second_block_hash].as_ref().into(),
..Default::default()
Expand Down Expand Up @@ -590,7 +577,6 @@ fn overseer_finalize_leaf_preserves_it() {
.unwrap()
.replace_candidate_validation(move |_| TestSubsystem5(tx_5))
.replace_candidate_backing(move |_| TestSubsystem6(tx_6))
.leaves(block_info_to_pair(vec![first_block.clone(), second_block]))
.build()
.unwrap();
let mut handle = Handle::new(handle);
Expand All @@ -601,6 +587,8 @@ fn overseer_finalize_leaf_preserves_it() {
let mut ss5_results = Vec::new();
let mut ss6_results = Vec::new();

handle.block_imported(first_block.clone()).await;
handle.block_imported(second_block).await;
// This should stop work on the second block, but only the
// second block.
handle.block_finalized(first_block).await;
Expand Down
60 changes: 0 additions & 60 deletions node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ use {
polkadot_node_network_protocol::{
peer_set::PeerSetProtocolNames, request_response::ReqProtocolNames,
},
polkadot_overseer::BlockInfo,
sc_client_api::BlockBackend,
sp_core::traits::SpawnNamed,
sp_trie::PrefixedMemoryDB,
Expand Down Expand Up @@ -129,10 +128,6 @@ pub use {rococo_runtime, rococo_runtime_constants};
#[cfg(feature = "westend-native")]
pub use {westend_runtime, westend_runtime_constants};

/// The maximum number of active leaves we forward to the [`Overseer`] on startup.
#[cfg(any(test, feature = "full-node"))]
const MAX_ACTIVE_LEAVES: usize = 4;

/// Provides the header and block number for a hash.
///
/// Decouples `sc_client_api::Backend` and `sp_blockchain::HeaderBackend`.
Expand Down Expand Up @@ -668,56 +663,6 @@ impl IsCollator {
}
}

/// Returns the active leaves the overseer should start with.
#[cfg(feature = "full-node")]
async fn active_leaves<RuntimeApi, ExecutorDispatch>(
select_chain: &impl SelectChain<Block>,
client: &FullClient<RuntimeApi, ExecutorDispatch>,
) -> Result<Vec<BlockInfo>, Error>
where
RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi, ExecutorDispatch>>
+ Send
+ Sync
+ 'static,
RuntimeApi::RuntimeApi:
RuntimeApiCollection<StateBackend = sc_client_api::StateBackendFor<FullBackend, Block>>,
ExecutorDispatch: NativeExecutionDispatch + 'static,
{
let best_block = select_chain.best_chain().await?;

let mut leaves = select_chain
.leaves()
.await
.unwrap_or_default()
.into_iter()
.filter_map(|hash| {
let number = HeaderBackend::number(client, hash).ok()??;

// Only consider leaves that are in maximum an uncle of the best block.
if number < best_block.number().saturating_sub(1) {
return None
} else if hash == best_block.hash() {
return None
};

let parent_hash = client.header(hash).ok()??.parent_hash;

Some(BlockInfo { hash, parent_hash, number })
})
.collect::<Vec<_>>();

// Sort by block number and get the maximum number of leaves
leaves.sort_by_key(|b| b.number);

leaves.push(BlockInfo {
hash: best_block.hash(),
parent_hash: *best_block.parent_hash(),
number: *best_block.number(),
});

Ok(leaves.into_iter().rev().take(MAX_ACTIVE_LEAVES).collect())
}

pub const AVAILABILITY_CONFIG: AvailabilityConfig = AvailabilityConfig {
col_data: parachains_db::REAL_COLUMNS.col_availability_data,
col_meta: parachains_db::REAL_COLUMNS.col_availability_meta,
Expand Down Expand Up @@ -1009,10 +954,6 @@ where

let overseer_client = client.clone();
let spawner = task_manager.spawn_handle();
// Cannot use the `RelayChainSelection`, since that'd require a setup _and running_ overseer
// which we are about to setup.
let active_leaves =
futures::executor::block_on(active_leaves(select_chain.as_longest_chain(), &*client))?;

let authority_discovery_service = if auth_or_collator || overseer_enable_anyways {
use futures::StreamExt;
Expand Down Expand Up @@ -1068,7 +1009,6 @@ where
.generate::<service::SpawnTaskHandle, FullClient<RuntimeApi, ExecutorDispatch>>(
overseer_connector,
OverseerGenArgs {
leaves: active_leaves,
keystore,
runtime_client: overseer_client.clone(),
parachains_db,
Expand Down
12 changes: 2 additions & 10 deletions node/service/src/overseer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ pub use polkadot_overseer::{
HeadSupportsParachains,
};
use polkadot_overseer::{
metrics::Metrics as OverseerMetrics, BlockInfo, InitializedOverseerBuilder, MetricsTrait,
Overseer, OverseerConnector, OverseerHandle, SpawnGlue,
metrics::Metrics as OverseerMetrics, InitializedOverseerBuilder, MetricsTrait, Overseer,
OverseerConnector, OverseerHandle, SpawnGlue,
};

use polkadot_primitives::runtime_api::ParachainHost;
Expand Down Expand Up @@ -81,8 +81,6 @@ where
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
/// Set of initial relay chain leaves to track.
pub leaves: Vec<BlockInfo>,
/// The keystore to use for i.e. validator keys.
pub keystore: Arc<LocalKeystore>,
/// Runtime client generic, providing the `ProvieRuntimeApi` trait besides others.
Expand Down Expand Up @@ -131,7 +129,6 @@ where
/// with all default values.
pub fn prepared_overseer_builder<Spawner, RuntimeClient>(
OverseerGenArgs {
leaves,
keystore,
runtime_client,
parachains_db,
Expand Down Expand Up @@ -309,11 +306,6 @@ where
Metrics::register(registry)?,
))
.chain_selection(ChainSelectionSubsystem::new(chain_selection_config, parachains_db))
.leaves(Vec::from_iter(
leaves
.into_iter()
.map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)),
))
.activation_external_listeners(Default::default())
.span_per_active_leaf(Default::default())
.active_leaves(Default::default())
Expand Down
1 change: 0 additions & 1 deletion node/subsystem-test-helpers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,6 @@ mod tests {
dummy_overseer_builder(spawner.clone(), AlwaysSupportsParachains, None)
.unwrap()
.replace_collator_protocol(|_| ForwardSubsystem(tx))
.leaves(vec![])
.build()
.unwrap();

Expand Down

0 comments on commit 177ceca

Please sign in to comment.