Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Don't send ActiveLeaves from leaves in db on startup in Overseer #6727

Merged
merged 6 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion node/overseer/src/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ where
.span_per_active_leaf(Default::default())
.active_leaves(Default::default())
.known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE))
.leaves(Default::default())
.spawner(SpawnGlue(spawner))
.metrics(metrics)
.supports_parachains(supports_parachains);
Expand Down
15 changes: 0 additions & 15 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,6 @@ pub struct Overseer<SupportsParachains> {
/// Stores the [`jaeger::Span`] per active leaf.
pub span_per_active_leaf: HashMap<Hash, Arc<jaeger::Span>>,

/// A set of leaves that `Overseer` starts working with.
///
/// Drained at the beginning of `run` and never used again.
pub leaves: Vec<(Hash, BlockNumber)>,

/// The set of the "active leaves".
pub active_leaves: HashMap<Hash, BlockNumber>,

Expand Down Expand Up @@ -728,16 +723,6 @@ where
let metrics = self.metrics.clone();
spawn_metronome_metrics(&mut self, metrics)?;

// Notify about active leaves on startup before starting the loop
for (hash, number) in std::mem::take(&mut self.leaves) {
let _ = self.active_leaves.insert(hash, number);
if let Some((span, status)) = self.on_head_activated(&hash, None).await {
let update =
ActiveLeavesUpdate::start_work(ActivatedLeaf { hash, number, status, span });
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
}

loop {
select! {
msg = self.events_rx.select_next_some() => {
Expand Down
80 changes: 34 additions & 46 deletions node/overseer/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,6 @@ use sp_core::crypto::Pair as _;

use super::*;

fn block_info_to_pair(blocks: impl IntoIterator<Item = BlockInfo>) -> Vec<(Hash, BlockNumber)> {
Vec::from_iter(
blocks
.into_iter()
.map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)),
)
}

type SpawnedSubsystem = crate::gen::SpawnedSubsystem<SubsystemError>;

struct TestSubsystem1(metered::MeteredSender<usize>);
Expand Down Expand Up @@ -223,20 +215,16 @@ fn overseer_metrics_work() {
executor::block_on(async move {
let first_block_hash = [1; 32].into();
let second_block_hash = [2; 32].into();
let third_block_hash = [3; 32].into();

let first_block =
BlockInfo { hash: first_block_hash, parent_hash: [0; 32].into(), number: 1 };
let second_block =
BlockInfo { hash: second_block_hash, parent_hash: first_block_hash, number: 2 };
let third_block =
BlockInfo { hash: third_block_hash, parent_hash: second_block_hash, number: 3 };

let registry = prometheus::Registry::new();
let (overseer, handle) =
dummy_overseer_builder(spawner, MockSupportsParachains, Some(&registry))
.unwrap()
.leaves(block_info_to_pair(vec![first_block]))
.build()
.unwrap();

Expand All @@ -245,8 +233,8 @@ fn overseer_metrics_work() {

pin_mut!(overseer_fut);

handle.block_imported(first_block).await;
handle.block_imported(second_block).await;
handle.block_imported(third_block).await;
handle
.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg()))
.await;
Expand All @@ -256,8 +244,8 @@ fn overseer_metrics_work() {
res = overseer_fut => {
assert!(res.is_ok());
let metrics = extract_metrics(&registry);
assert_eq!(metrics["activated"], 3);
assert_eq!(metrics["deactivated"], 2);
assert_eq!(metrics["activated"], 2);
assert_eq!(metrics["deactivated"], 1);
assert_eq!(metrics["relayed"], 1);
},
complete => (),
Expand Down Expand Up @@ -379,14 +367,11 @@ fn overseer_start_stop_works() {
executor::block_on(async move {
let first_block_hash = [1; 32].into();
let second_block_hash = [2; 32].into();
let third_block_hash = [3; 32].into();

let first_block =
BlockInfo { hash: first_block_hash, parent_hash: [0; 32].into(), number: 1 };
let second_block =
BlockInfo { hash: second_block_hash, parent_hash: first_block_hash, number: 2 };
let third_block =
BlockInfo { hash: third_block_hash, parent_hash: second_block_hash, number: 3 };

let (tx_5, mut rx_5) = metered::channel(64);
let (tx_6, mut rx_6) = metered::channel(64);
Expand All @@ -395,7 +380,6 @@ fn overseer_start_stop_works() {
.unwrap()
.replace_candidate_validation(move |_| TestSubsystem5(tx_5))
.replace_candidate_backing(move |_| TestSubsystem6(tx_6))
.leaves(block_info_to_pair(vec![first_block]))
.build()
.unwrap();
let mut handle = Handle::new(handle);
Expand All @@ -406,33 +390,27 @@ fn overseer_start_stop_works() {
let mut ss5_results = Vec::new();
let mut ss6_results = Vec::new();

handle.block_imported(first_block).await;
handle.block_imported(second_block).await;
handle.block_imported(third_block).await;

let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: first_block_hash,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
})),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: Some(ActivatedLeaf {
hash: second_block_hash,
number: 2,
hash: first_block_hash,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
}),
deactivated: [first_block_hash].as_ref().into(),
deactivated: Default::default(),
}),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: Some(ActivatedLeaf {
hash: third_block_hash,
number: 3,
hash: second_block_hash,
number: 2,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
}),
deactivated: [second_block_hash].as_ref().into(),
deactivated: [first_block_hash].as_ref().into(),
}),
];

Expand Down Expand Up @@ -494,7 +472,6 @@ fn overseer_finalize_works() {
.unwrap()
.replace_candidate_validation(move |_| TestSubsystem5(tx_5))
.replace_candidate_backing(move |_| TestSubsystem6(tx_6))
.leaves(block_info_to_pair(vec![first_block, second_block]))
.build()
.unwrap();
let mut handle = Handle::new(handle);
Expand All @@ -505,22 +482,32 @@ fn overseer_finalize_works() {
let mut ss5_results = Vec::new();
let mut ss6_results = Vec::new();

// activate two blocks
handle.block_imported(first_block).await;
handle.block_imported(second_block).await;

// this should stop work on both forks we started with earlier.
handle.block_finalized(third_block).await;

let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: first_block_hash,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
})),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: second_block_hash,
number: 2,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
})),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: Some(ActivatedLeaf {
hash: first_block_hash,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
}),
deactivated: Default::default(),
}),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: Some(ActivatedLeaf {
hash: second_block_hash,
number: 2,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
}),
deactivated: Default::default(),
}),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
deactivated: [first_block_hash, second_block_hash].as_ref().into(),
..Default::default()
Expand Down Expand Up @@ -590,7 +577,6 @@ fn overseer_finalize_leaf_preserves_it() {
.unwrap()
.replace_candidate_validation(move |_| TestSubsystem5(tx_5))
.replace_candidate_backing(move |_| TestSubsystem6(tx_6))
.leaves(block_info_to_pair(vec![first_block.clone(), second_block]))
.build()
.unwrap();
let mut handle = Handle::new(handle);
Expand All @@ -601,6 +587,8 @@ fn overseer_finalize_leaf_preserves_it() {
let mut ss5_results = Vec::new();
let mut ss6_results = Vec::new();

handle.block_imported(first_block.clone()).await;
handle.block_imported(second_block).await;
// This should stop work on the second block, but only the
// second block.
handle.block_finalized(first_block).await;
Expand Down
60 changes: 0 additions & 60 deletions node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ use {
polkadot_node_network_protocol::{
peer_set::PeerSetProtocolNames, request_response::ReqProtocolNames,
},
polkadot_overseer::BlockInfo,
sc_client_api::BlockBackend,
sp_core::traits::SpawnNamed,
sp_trie::PrefixedMemoryDB,
Expand Down Expand Up @@ -129,10 +128,6 @@ pub use {rococo_runtime, rococo_runtime_constants};
#[cfg(feature = "westend-native")]
pub use {westend_runtime, westend_runtime_constants};

/// The maximum number of active leaves we forward to the [`Overseer`] on startup.
#[cfg(any(test, feature = "full-node"))]
const MAX_ACTIVE_LEAVES: usize = 4;

/// Provides the header and block number for a hash.
///
/// Decouples `sc_client_api::Backend` and `sp_blockchain::HeaderBackend`.
Expand Down Expand Up @@ -668,56 +663,6 @@ impl IsCollator {
}
}

/// Returns the active leaves the overseer should start with.
#[cfg(feature = "full-node")]
async fn active_leaves<RuntimeApi, ExecutorDispatch>(
select_chain: &impl SelectChain<Block>,
client: &FullClient<RuntimeApi, ExecutorDispatch>,
) -> Result<Vec<BlockInfo>, Error>
where
RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi, ExecutorDispatch>>
+ Send
+ Sync
+ 'static,
RuntimeApi::RuntimeApi:
RuntimeApiCollection<StateBackend = sc_client_api::StateBackendFor<FullBackend, Block>>,
ExecutorDispatch: NativeExecutionDispatch + 'static,
{
let best_block = select_chain.best_chain().await?;

let mut leaves = select_chain
.leaves()
.await
.unwrap_or_default()
.into_iter()
.filter_map(|hash| {
let number = HeaderBackend::number(client, hash).ok()??;

// Only consider leaves that are in maximum an uncle of the best block.
if number < best_block.number().saturating_sub(1) {
return None
} else if hash == best_block.hash() {
return None
};

let parent_hash = client.header(hash).ok()??.parent_hash;

Some(BlockInfo { hash, parent_hash, number })
})
.collect::<Vec<_>>();

// Sort by block number and get the maximum number of leaves
leaves.sort_by_key(|b| b.number);

leaves.push(BlockInfo {
hash: best_block.hash(),
parent_hash: *best_block.parent_hash(),
number: *best_block.number(),
});

Ok(leaves.into_iter().rev().take(MAX_ACTIVE_LEAVES).collect())
}

pub const AVAILABILITY_CONFIG: AvailabilityConfig = AvailabilityConfig {
col_data: parachains_db::REAL_COLUMNS.col_availability_data,
col_meta: parachains_db::REAL_COLUMNS.col_availability_meta,
Expand Down Expand Up @@ -1009,10 +954,6 @@ where

let overseer_client = client.clone();
let spawner = task_manager.spawn_handle();
// Cannot use the `RelayChainSelection`, since that'd require a setup _and running_ overseer
// which we are about to setup.
let active_leaves =
futures::executor::block_on(active_leaves(select_chain.as_longest_chain(), &*client))?;

let authority_discovery_service = if auth_or_collator || overseer_enable_anyways {
use futures::StreamExt;
Expand Down Expand Up @@ -1068,7 +1009,6 @@ where
.generate::<service::SpawnTaskHandle, FullClient<RuntimeApi, ExecutorDispatch>>(
overseer_connector,
OverseerGenArgs {
leaves: active_leaves,
keystore,
runtime_client: overseer_client.clone(),
parachains_db,
Expand Down
12 changes: 2 additions & 10 deletions node/service/src/overseer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ pub use polkadot_overseer::{
HeadSupportsParachains,
};
use polkadot_overseer::{
metrics::Metrics as OverseerMetrics, BlockInfo, InitializedOverseerBuilder, MetricsTrait,
Overseer, OverseerConnector, OverseerHandle, SpawnGlue,
metrics::Metrics as OverseerMetrics, InitializedOverseerBuilder, MetricsTrait, Overseer,
OverseerConnector, OverseerHandle, SpawnGlue,
};

use polkadot_primitives::runtime_api::ParachainHost;
Expand Down Expand Up @@ -81,8 +81,6 @@ where
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
/// Set of initial relay chain leaves to track.
pub leaves: Vec<BlockInfo>,
/// The keystore to use for i.e. validator keys.
pub keystore: Arc<LocalKeystore>,
/// Runtime client generic, providing the `ProvieRuntimeApi` trait besides others.
Expand Down Expand Up @@ -131,7 +129,6 @@ where
/// with all default values.
pub fn prepared_overseer_builder<Spawner, RuntimeClient>(
OverseerGenArgs {
leaves,
keystore,
runtime_client,
parachains_db,
Expand Down Expand Up @@ -309,11 +306,6 @@ where
Metrics::register(registry)?,
))
.chain_selection(ChainSelectionSubsystem::new(chain_selection_config, parachains_db))
.leaves(Vec::from_iter(
leaves
.into_iter()
.map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)),
))
.activation_external_listeners(Default::default())
.span_per_active_leaf(Default::default())
.active_leaves(Default::default())
Expand Down
1 change: 0 additions & 1 deletion node/subsystem-test-helpers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,6 @@ mod tests {
dummy_overseer_builder(spawner.clone(), AlwaysSupportsParachains, None)
.unwrap()
.replace_collator_protocol(|_| ForwardSubsystem(tx))
.leaves(vec![])
.build()
.unwrap();

Expand Down