Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Make sure collators join the validation network #1010

Merged
merged 1 commit into from
Apr 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ pub trait Network: Send + Sync {
/// The returned stream will not terminate, so it is required to make sure that the stream is
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
fn checked_statements(&self, relay_parent: Hash) -> Pin<Box<dyn Stream<Item=SignedStatement>>>;
fn checked_statements(&self, relay_parent: Hash) -> Pin<Box<dyn Stream<Item=SignedStatement> + Send>>;
}

impl Network for polkadot_network::protocol::Service {
fn checked_statements(&self, relay_parent: Hash) -> Pin<Box<dyn Stream<Item=SignedStatement>>> {
polkadot_network::protocol::Service::checked_statements(self, relay_parent)
fn checked_statements(&self, relay_parent: Hash) -> Pin<Box<dyn Stream<Item=SignedStatement> + Send>> {
polkadot_network::protocol::Service::checked_statements(self, relay_parent).boxed()
}
}

Expand Down Expand Up @@ -241,12 +241,14 @@ fn build_collator_service<S, P, Extrinsic>(
let (service, handles) = service;
let spawner = service.spawn_task_handle();

let polkadot_network = match handles.polkadot_network {
None => return Err(
"Collator cannot run when Polkadot-specific networking has not been started".into()
),
Some(n) => n,
};
let polkadot_network = handles.polkadot_network
.ok_or_else(|| "Collator cannot run when Polkadot-specific networking has not been started")?;

// We don't require this here, but we need to make sure that the validation service is started.
// This service makes sure the collator is joining the correct gossip topics and receives the appropiate
// messages.
handles.validation_service_handle
.ok_or_else(|| "Collator cannot run when validation networking has not been started")?;

let client = service.client();

Expand Down
3 changes: 1 addition & 2 deletions network/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1375,7 +1375,7 @@ impl<N: NetworkServiceOps> Service<N> {
/// Take care to drop the stream, as the sending side will not be cleaned
/// up until it is.
pub fn checked_statements(&self, relay_parent: Hash)
-> Pin<Box<dyn Stream<Item = SignedStatement>>> {
-> impl Stream<Item = SignedStatement> + Send {
let (tx, rx) = oneshot::channel();
let mut sender = self.sender.clone();

Expand All @@ -1400,7 +1400,6 @@ impl<N: NetworkServiceOps> Service<N> {
}
})
.flatten_stream()
.boxed()
}
}

Expand Down
19 changes: 17 additions & 2 deletions service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,12 @@ pub fn westend_new_full(
/// Handles to other sub-services that full nodes instantiate, which consumers
/// of the node may use.
#[cfg(feature = "full-node")]
#[derive(Default)]
pub struct FullNodeHandles {
/// A handle to the Polkadot networking protocol.
pub polkadot_network: Option<network_protocol::Service>,
/// A handle to the validation service.
pub validation_service_handle: Option<consensus::ServiceHandle>,
}

/// Builds a new service for a full client.
Expand Down Expand Up @@ -389,7 +392,7 @@ pub fn new_full<Runtime, Dispatch, Extrinsic>(
let client = service.client();
let known_oracle = client.clone();

let mut handles = FullNodeHandles { polkadot_network: None };
let mut handles = FullNodeHandles::default();
let select_chain = if let Some(select_chain) = service.select_chain() {
select_chain
} else {
Expand Down Expand Up @@ -427,7 +430,7 @@ pub fn new_full<Runtime, Dispatch, Extrinsic>(
service.spawn_task_handle(),
).map_err(|e| format!("Could not spawn network worker: {:?}", e))?;

if let Role::Authority { .. } = &role {
let authority_handles = if is_collator || role.is_authority() {
let availability_store = {
use std::path::PathBuf;

Expand Down Expand Up @@ -464,6 +467,18 @@ pub fn new_full<Runtime, Dispatch, Extrinsic>(

service.spawn_essential_task("validation-service", Box::pin(validation_service));

handles.validation_service_handle = Some(validation_service_handle.clone());

Some((validation_service_handle, availability_store))
} else {
None
};

if role.is_authority() {
let (validation_service_handle, availability_store) = authority_handles
.clone()
.expect("Authority handles are set for authority nodes; qed");

let proposer = consensus::ProposerFactory::new(
client.clone(),
service.transaction_pool(),
Expand Down