diff --git a/Cargo.lock b/Cargo.lock index 1300be292e94..308b47bce39d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4939,7 +4939,6 @@ dependencies = [ name = "polkadot-node-core-bitfield-signing" version = "0.1.0" dependencies = [ - "derive_more", "futures 0.3.8", "polkadot-node-subsystem", "polkadot-node-subsystem-util", diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs index 000c121c429e..6adec2747a75 100644 --- a/node/core/backing/src/lib.rs +++ b/node/core/backing/src/lib.rs @@ -40,8 +40,7 @@ use polkadot_subsystem::{ messages::{ AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage, CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData, - ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, ValidationFailed, - RuntimeApiRequest, + ProvisionerMessage, StatementDistributionMessage, ValidationFailed, RuntimeApiRequest, }, }; use polkadot_node_subsystem_util::{ @@ -93,9 +92,9 @@ struct CandidateBackingJob { /// The hash of the relay parent on top of which this job is doing it's work. parent: Hash, /// Inbound message channel receiving part. - rx_to: mpsc::Receiver, + rx_to: mpsc::Receiver, /// Outbound message channel sending part. - tx_from: mpsc::Sender, + tx_from: mpsc::Sender, /// The `ParaId` assigned to this validator assignment: ParaId, /// The collator required to author the candidate, if any. @@ -151,84 +150,6 @@ impl TableContextTrait for TableContext { } } -/// A message type that is sent from `CandidateBackingSubsystem` to `CandidateBackingJob`. -pub enum ToJob { - /// A `CandidateBackingMessage`. - CandidateBacking(CandidateBackingMessage), - /// Stop working. - Stop, -} - -impl TryFrom for ToJob { - type Error = (); - - fn try_from(msg: AllMessages) -> Result { - match msg { - AllMessages::CandidateBacking(msg) => Ok(ToJob::CandidateBacking(msg)), - _ => Err(()), - } - } -} - -impl From for ToJob { - fn from(msg: CandidateBackingMessage) -> Self { - Self::CandidateBacking(msg) - } -} - -impl util::ToJobTrait for ToJob { - const STOP: Self = ToJob::Stop; - - fn relay_parent(&self) -> Option { - match self { - Self::CandidateBacking(cb) => cb.relay_parent(), - Self::Stop => None, - } - } -} - -/// A message type that is sent from `CandidateBackingJob` to `CandidateBackingSubsystem`. -enum FromJob { - AvailabilityStore(AvailabilityStoreMessage), - RuntimeApiMessage(RuntimeApiMessage), - CandidateValidation(CandidateValidationMessage), - CandidateSelection(CandidateSelectionMessage), - Provisioner(ProvisionerMessage), - PoVDistribution(PoVDistributionMessage), - StatementDistribution(StatementDistributionMessage), -} - -impl From for FromJobCommand { - fn from(f: FromJob) -> FromJobCommand { - FromJobCommand::SendMessage(match f { - FromJob::AvailabilityStore(msg) => AllMessages::AvailabilityStore(msg), - FromJob::RuntimeApiMessage(msg) => AllMessages::RuntimeApi(msg), - FromJob::CandidateValidation(msg) => AllMessages::CandidateValidation(msg), - FromJob::CandidateSelection(msg) => AllMessages::CandidateSelection(msg), - FromJob::StatementDistribution(msg) => AllMessages::StatementDistribution(msg), - FromJob::PoVDistribution(msg) => AllMessages::PoVDistribution(msg), - FromJob::Provisioner(msg) => AllMessages::Provisioner(msg), - }) - } -} - -impl TryFrom for FromJob { - type Error = &'static str; - - fn try_from(f: AllMessages) -> Result { - match f { - AllMessages::AvailabilityStore(msg) => Ok(FromJob::AvailabilityStore(msg)), - AllMessages::RuntimeApi(msg) => Ok(FromJob::RuntimeApiMessage(msg)), - AllMessages::CandidateValidation(msg) => Ok(FromJob::CandidateValidation(msg)), - AllMessages::CandidateSelection(msg) => Ok(FromJob::CandidateSelection(msg)), - AllMessages::StatementDistribution(msg) => Ok(FromJob::StatementDistribution(msg)), - AllMessages::PoVDistribution(msg) => Ok(FromJob::PoVDistribution(msg)), - AllMessages::Provisioner(msg) => Ok(FromJob::Provisioner(msg)), - _ => Err("can't convert this AllMessages variant to FromJob"), - } - } -} - struct InvalidErasureRoot; // It looks like it's not possible to do an `impl From` given the current state of @@ -301,12 +222,10 @@ fn table_attested_to_backed( impl CandidateBackingJob { /// Run asynchronously. async fn run_loop(mut self) -> Result<(), Error> { - while let Some(msg) = self.rx_to.next().await { - match msg { - ToJob::CandidateBacking(msg) => { - self.process_msg(msg).await?; - } - ToJob::Stop => break, + loop { + match self.rx_to.next().await { + Some(msg) => self.process_msg(msg).await?, + None => break, } } @@ -317,9 +236,7 @@ impl CandidateBackingJob { &mut self, candidate: CandidateReceipt, ) -> Result<(), Error> { - self.tx_from.send(FromJob::CandidateSelection( - CandidateSelectionMessage::Invalid(self.parent, candidate) - )).await?; + self.tx_from.send(AllMessages::from(CandidateSelectionMessage::Invalid(self.parent, candidate)).into()).await?; Ok(()) } @@ -664,7 +581,7 @@ impl CandidateBackingJob { } async fn send_to_provisioner(&mut self, msg: ProvisionerMessage) -> Result<(), Error> { - self.tx_from.send(FromJob::Provisioner(msg)).await?; + self.tx_from.send(AllMessages::from(msg).into()).await?; Ok(()) } @@ -674,9 +591,9 @@ impl CandidateBackingJob { descriptor: CandidateDescriptor, pov: Arc, ) -> Result<(), Error> { - self.tx_from.send(FromJob::PoVDistribution( + self.tx_from.send(AllMessages::from( PoVDistributionMessage::DistributePoV(self.parent, descriptor, pov), - )).await.map_err(Into::into) + ).into()).await.map_err(Into::into) } async fn request_pov_from_distribution( @@ -685,9 +602,9 @@ impl CandidateBackingJob { ) -> Result, Error> { let (tx, rx) = oneshot::channel(); - self.tx_from.send(FromJob::PoVDistribution( + self.tx_from.send(AllMessages::from( PoVDistributionMessage::FetchPoV(self.parent, descriptor, tx) - )).await?; + ).into()).await?; Ok(rx.await?) } @@ -699,13 +616,14 @@ impl CandidateBackingJob { ) -> Result { let (tx, rx) = oneshot::channel(); - self.tx_from.send(FromJob::CandidateValidation( + self.tx_from.send( + AllMessages::from( CandidateValidationMessage::ValidateFromChainState( candidate, pov, tx, ) - ) + ).into(), ).await?; Ok(rx.await??) @@ -719,7 +637,7 @@ impl CandidateBackingJob { available_data: AvailableData, ) -> Result<(), Error> { let (tx, rx) = oneshot::channel(); - self.tx_from.send(FromJob::AvailabilityStore( + self.tx_from.send(AllMessages::from( AvailabilityStoreMessage::StoreAvailableData( candidate_hash, id, @@ -727,7 +645,7 @@ impl CandidateBackingJob { available_data, tx, ) - ) + ).into(), ).await?; let _ = rx.await?; @@ -777,15 +695,14 @@ impl CandidateBackingJob { async fn distribute_signed_statement(&mut self, s: SignedFullStatement) -> Result<(), Error> { let smsg = StatementDistributionMessage::Share(self.parent, s); - self.tx_from.send(FromJob::StatementDistribution(smsg)).await?; + self.tx_from.send(AllMessages::from(smsg).into()).await?; Ok(()) } } impl util::JobTrait for CandidateBackingJob { - type ToJob = ToJob; - type FromJob = FromJob; + type ToJob = CandidateBackingMessage; type Error = Error; type RunArgs = SyncCryptoStorePtr; type Metrics = Metrics; @@ -798,7 +715,7 @@ impl util::JobTrait for CandidateBackingJob { keystore: SyncCryptoStorePtr, metrics: Metrics, rx_to: mpsc::Receiver, - mut tx_from: mpsc::Sender, + mut tx_from: mpsc::Sender, ) -> Pin> + Send>> { async move { macro_rules! try_runtime_api { @@ -1000,7 +917,7 @@ impl metrics::Metrics for Metrics { } } -delegated_subsystem!(CandidateBackingJob(SyncCryptoStorePtr, Metrics) <- ToJob as CandidateBackingSubsystem); +delegated_subsystem!(CandidateBackingJob(SyncCryptoStorePtr, Metrics) <- CandidateBackingMessage as CandidateBackingSubsystem); #[cfg(test)] mod tests { @@ -1013,7 +930,7 @@ mod tests { GroupRotationInfo, }; use polkadot_subsystem::{ - messages::RuntimeApiRequest, + messages::{RuntimeApiRequest, RuntimeApiMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal, }; use polkadot_node_primitives::InvalidCandidate; diff --git a/node/core/bitfield-signing/Cargo.toml b/node/core/bitfield-signing/Cargo.toml index 1d60d85eb886..3abe9680aed7 100644 --- a/node/core/bitfield-signing/Cargo.toml +++ b/node/core/bitfield-signing/Cargo.toml @@ -14,4 +14,3 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" } sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } wasm-timer = "0.2.5" thiserror = "1.0.22" -derive_more = "0.99.11" diff --git a/node/core/bitfield-signing/src/lib.rs b/node/core/bitfield-signing/src/lib.rs index 139fed6ddc35..7937d908efbd 100644 --- a/node/core/bitfield-signing/src/lib.rs +++ b/node/core/bitfield-signing/src/lib.rs @@ -25,16 +25,15 @@ use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr}; use polkadot_node_subsystem::{ messages::{ AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage, - BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage, RuntimeApiRequest, + BitfieldSigningMessage, RuntimeApiMessage, RuntimeApiRequest, }, errors::RuntimeApiError, }; use polkadot_node_subsystem_util::{ - self as util, JobManager, JobTrait, ToJobTrait, Validator, FromJobCommand, - metrics::{self, prometheus}, + self as util, JobManager, JobTrait, Validator, FromJobCommand, metrics::{self, prometheus}, }; use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex}; -use std::{convert::TryFrom, pin::Pin, time::Duration, iter::FromIterator}; +use std::{pin::Pin, time::Duration, iter::FromIterator}; use wasm_timer::{Delay, Instant}; use thiserror::Error; @@ -45,76 +44,6 @@ const LOG_TARGET: &str = "bitfield_signing"; /// Each `BitfieldSigningJob` prepares a signed bitfield for a single relay parent. pub struct BitfieldSigningJob; -/// Messages which a `BitfieldSigningJob` is prepared to receive. -#[allow(missing_docs)] -pub enum ToJob { - BitfieldSigning(BitfieldSigningMessage), - Stop, -} - -impl ToJobTrait for ToJob { - const STOP: Self = ToJob::Stop; - - fn relay_parent(&self) -> Option { - match self { - Self::BitfieldSigning(bsm) => bsm.relay_parent(), - Self::Stop => None, - } - } -} - -impl TryFrom for ToJob { - type Error = (); - - fn try_from(msg: AllMessages) -> Result { - match msg { - AllMessages::BitfieldSigning(bsm) => Ok(ToJob::BitfieldSigning(bsm)), - _ => Err(()), - } - } -} - -impl From for ToJob { - fn from(bsm: BitfieldSigningMessage) -> ToJob { - ToJob::BitfieldSigning(bsm) - } -} - -/// Messages which may be sent from a `BitfieldSigningJob`. -#[allow(missing_docs)] -#[derive(Debug, derive_more::From)] -pub enum FromJob { - AvailabilityStore(AvailabilityStoreMessage), - BitfieldDistribution(BitfieldDistributionMessage), - CandidateBacking(CandidateBackingMessage), - RuntimeApi(RuntimeApiMessage), -} - -impl From for FromJobCommand { - fn from(from_job: FromJob) -> FromJobCommand { - FromJobCommand::SendMessage(match from_job { - FromJob::AvailabilityStore(asm) => AllMessages::AvailabilityStore(asm), - FromJob::BitfieldDistribution(bdm) => AllMessages::BitfieldDistribution(bdm), - FromJob::CandidateBacking(cbm) => AllMessages::CandidateBacking(cbm), - FromJob::RuntimeApi(ram) => AllMessages::RuntimeApi(ram), - }) - } -} - -impl TryFrom for FromJob { - type Error = (); - - fn try_from(msg: AllMessages) -> Result { - match msg { - AllMessages::AvailabilityStore(asm) => Ok(Self::AvailabilityStore(asm)), - AllMessages::BitfieldDistribution(bdm) => Ok(Self::BitfieldDistribution(bdm)), - AllMessages::CandidateBacking(cbm) => Ok(Self::CandidateBacking(cbm)), - AllMessages::RuntimeApi(ram) => Ok(Self::RuntimeApi(ram)), - _ => Err(()), - } - } -} - /// Errors we may encounter in the course of executing the `BitfieldSigningSubsystem`. #[derive(Debug, Error)] pub enum Error { @@ -145,7 +74,7 @@ async fn get_core_availability( relay_parent: Hash, core: CoreState, validator_idx: ValidatorIndex, - sender: &Mutex<&mut mpsc::Sender>, + sender: &Mutex<&mut mpsc::Sender>, ) -> Result { if let CoreState::Occupied(core) = core { let (tx, rx) = oneshot::channel(); @@ -153,10 +82,10 @@ async fn get_core_availability( .lock() .await .send( - RuntimeApiMessage::Request( + AllMessages::from(RuntimeApiMessage::Request( relay_parent, RuntimeApiRequest::CandidatePendingAvailability(core.para_id, tx), - ).into(), + )).into(), ) .await?; @@ -174,11 +103,11 @@ async fn get_core_availability( .lock() .await .send( - AvailabilityStoreMessage::QueryChunkAvailability( + AllMessages::from(AvailabilityStoreMessage::QueryChunkAvailability( committed_candidate_receipt.hash(), validator_idx, tx, - ).into(), + )).into(), ) .await?; return rx.await.map_err(Into::into); @@ -188,9 +117,14 @@ async fn get_core_availability( } /// delegates to the v1 runtime API -async fn get_availability_cores(relay_parent: Hash, sender: &mut mpsc::Sender) -> Result, Error> { +async fn get_availability_cores( + relay_parent: Hash, + sender: &mut mpsc::Sender, +) -> Result, Error> { let (tx, rx) = oneshot::channel(); - sender.send(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::AvailabilityCores(tx)).into()).await?; + sender + .send(AllMessages::from(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::AvailabilityCores(tx))).into()) + .await?; match rx.await { Ok(Ok(out)) => Ok(out), Ok(Err(runtime_err)) => Err(runtime_err.into()), @@ -206,7 +140,7 @@ async fn get_availability_cores(relay_parent: Hash, sender: &mut mpsc::Sender, + sender: &mut mpsc::Sender, ) -> Result { // get the set of availability cores from the runtime let availability_cores = get_availability_cores(relay_parent, sender).await?; @@ -275,8 +209,7 @@ impl metrics::Metrics for Metrics { } impl JobTrait for BitfieldSigningJob { - type ToJob = ToJob; - type FromJob = FromJob; + type ToJob = BitfieldSigningMessage; type Error = Error; type RunArgs = SyncCryptoStorePtr; type Metrics = Metrics; @@ -289,8 +222,8 @@ impl JobTrait for BitfieldSigningJob { relay_parent: Hash, keystore: Self::RunArgs, metrics: Self::Metrics, - _receiver: mpsc::Receiver, - mut sender: mpsc::Sender, + _receiver: mpsc::Receiver, + mut sender: mpsc::Sender, ) -> Pin> + Send>> { let metrics = metrics.clone(); async move { @@ -330,7 +263,11 @@ impl JobTrait for BitfieldSigningJob { metrics.on_bitfield_signed(); sender - .send(BitfieldDistributionMessage::DistributeBitfield(relay_parent, signed_bitfield).into()) + .send( + AllMessages::from( + BitfieldDistributionMessage::DistributeBitfield(relay_parent, signed_bitfield), + ).into(), + ) .await .map_err(Into::into) } @@ -345,8 +282,7 @@ pub type BitfieldSigningSubsystem = JobManager CoreState { CoreState::Occupied(OccupiedCore { @@ -373,12 +309,18 @@ mod tests { loop { futures::select! { m = receiver.next() => match m.unwrap() { - RuntimeApi(RuntimeApiMessage::Request(rp, RuntimeApiRequest::AvailabilityCores(tx))) => { + FromJobCommand::SendMessage( + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(rp, RuntimeApiRequest::AvailabilityCores(tx)), + ), + ) => { assert_eq!(relay_parent, rp); tx.send(Ok(vec![CoreState::Free, occupied_core(1), occupied_core(2)])).unwrap(); }, - RuntimeApi( - RuntimeApiMessage::Request(rp, RuntimeApiRequest::CandidatePendingAvailability(para_id, tx)) + FromJobCommand::SendMessage( + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(rp, RuntimeApiRequest::CandidatePendingAvailability(para_id, tx)), + ), ) => { assert_eq!(relay_parent, rp); @@ -388,7 +330,11 @@ mod tests { tx.send(Ok(None)).unwrap(); } }, - AvailabilityStore(AvailabilityStoreMessage::QueryChunkAvailability(_, vidx, tx)) => { + FromJobCommand::SendMessage( + AllMessages::AvailabilityStore( + AvailabilityStoreMessage::QueryChunkAvailability(_, vidx, tx), + ), + ) => { assert_eq!(validator_index, vidx); tx.send(true).unwrap(); diff --git a/node/core/candidate-selection/src/lib.rs b/node/core/candidate-selection/src/lib.rs index dc2692431c0c..0eb524297917 100644 --- a/node/core/candidate-selection/src/lib.rs +++ b/node/core/candidate-selection/src/lib.rs @@ -30,86 +30,21 @@ use polkadot_node_subsystem::{ }, }; use polkadot_node_subsystem_util::{ - self as util, delegated_subsystem, JobTrait, ToJobTrait, FromJobCommand, - metrics::{self, prometheus}, + self as util, delegated_subsystem, JobTrait, FromJobCommand, metrics::{self, prometheus}, }; use polkadot_primitives::v1::{CandidateReceipt, CollatorId, Hash, Id as ParaId, PoV}; -use std::{convert::TryFrom, pin::Pin}; +use std::pin::Pin; use thiserror::Error; const LOG_TARGET: &'static str = "candidate_selection"; struct CandidateSelectionJob { - sender: mpsc::Sender, - receiver: mpsc::Receiver, + sender: mpsc::Sender, + receiver: mpsc::Receiver, metrics: Metrics, seconded_candidate: Option, } -/// This enum defines the messages that the provisioner is prepared to receive. -#[derive(Debug)] -pub enum ToJob { - /// The provisioner message is the main input to the provisioner. - CandidateSelection(CandidateSelectionMessage), - /// This message indicates that the provisioner should shut itself down. - Stop, -} - -impl ToJobTrait for ToJob { - const STOP: Self = Self::Stop; - - fn relay_parent(&self) -> Option { - match self { - Self::CandidateSelection(csm) => csm.relay_parent(), - Self::Stop => None, - } - } -} - -impl TryFrom for ToJob { - type Error = (); - - fn try_from(msg: AllMessages) -> Result { - match msg { - AllMessages::CandidateSelection(csm) => Ok(Self::CandidateSelection(csm)), - _ => Err(()), - } - } -} - -impl From for ToJob { - fn from(csm: CandidateSelectionMessage) -> Self { - Self::CandidateSelection(csm) - } -} - -#[derive(Debug)] -enum FromJob { - Backing(CandidateBackingMessage), - Collator(CollatorProtocolMessage), -} - -impl From for FromJobCommand { - fn from(from_job: FromJob) -> FromJobCommand { - FromJobCommand::SendMessage(match from_job { - FromJob::Backing(msg) => AllMessages::CandidateBacking(msg), - FromJob::Collator(msg) => AllMessages::CollatorProtocol(msg), - }) - } -} - -impl TryFrom for FromJob { - type Error = (); - - fn try_from(msg: AllMessages) -> Result { - match msg { - AllMessages::CandidateBacking(msg) => Ok(FromJob::Backing(msg)), - AllMessages::CollatorProtocol(msg) => Ok(FromJob::Collator(msg)), - _ => Err(()), - } - } -} - #[derive(Debug, Error)] enum Error { #[error(transparent)] @@ -123,40 +58,32 @@ enum Error { } impl JobTrait for CandidateSelectionJob { - type ToJob = ToJob; - type FromJob = FromJob; + type ToJob = CandidateSelectionMessage; type Error = Error; type RunArgs = (); type Metrics = Metrics; const NAME: &'static str = "CandidateSelectionJob"; - /// Run a job for the parent block indicated - // - // this function is in charge of creating and executing the job's main loop #[tracing::instrument(skip(_relay_parent, _run_args, metrics, receiver, sender), fields(subsystem = LOG_TARGET))] fn run( _relay_parent: Hash, _run_args: Self::RunArgs, metrics: Self::Metrics, - receiver: mpsc::Receiver, - sender: mpsc::Sender, + receiver: mpsc::Receiver, + sender: mpsc::Sender, ) -> Pin> + Send>> { - Box::pin(async move { - let job = CandidateSelectionJob::new(metrics, sender, receiver); - - // it isn't necessary to break run_loop into its own function, - // but it's convenient to separate the concerns in this way - job.run_loop().await - }) + async move { + CandidateSelectionJob::new(metrics, sender, receiver).run_loop().await + }.boxed() } } impl CandidateSelectionJob { pub fn new( metrics: Metrics, - sender: mpsc::Sender, - receiver: mpsc::Receiver, + sender: mpsc::Sender, + receiver: mpsc::Receiver, ) -> Self { Self { sender, @@ -166,28 +93,23 @@ impl CandidateSelectionJob { } } - async fn run_loop(mut self) -> Result<(), Error> { - self.run_loop_borrowed().await - } - - /// this function exists for testing and should not generally be used; use `run_loop` instead. - async fn run_loop_borrowed(&mut self) -> Result<(), Error> { - while let Some(msg) = self.receiver.next().await { - match msg { - ToJob::CandidateSelection(CandidateSelectionMessage::Collation( + async fn run_loop(&mut self) -> Result<(), Error> { + loop { + match self.receiver.next().await { + Some(CandidateSelectionMessage::Collation( relay_parent, para_id, collator_id, )) => { self.handle_collation(relay_parent, para_id, collator_id).await; } - ToJob::CandidateSelection(CandidateSelectionMessage::Invalid( + Some(CandidateSelectionMessage::Invalid( _, candidate_receipt, )) => { self.handle_invalid(candidate_receipt).await; } - ToJob::Stop => break, + None => break, } } @@ -283,16 +205,16 @@ async fn get_collation( relay_parent: Hash, para_id: ParaId, collator_id: CollatorId, - mut sender: mpsc::Sender, + mut sender: mpsc::Sender, ) -> Result<(CandidateReceipt, PoV), Error> { let (tx, rx) = oneshot::channel(); sender - .send(FromJob::Collator(CollatorProtocolMessage::FetchCollation( + .send(AllMessages::from(CollatorProtocolMessage::FetchCollation( relay_parent, collator_id, para_id, tx, - ))) + )).into()) .await?; rx.await.map_err(Into::into) } @@ -301,15 +223,15 @@ async fn second_candidate( relay_parent: Hash, candidate_receipt: CandidateReceipt, pov: PoV, - sender: &mut mpsc::Sender, + sender: &mut mpsc::Sender, metrics: &Metrics, ) -> Result<(), Error> { match sender - .send(FromJob::Backing(CandidateBackingMessage::Second( + .send(AllMessages::from(CandidateBackingMessage::Second( relay_parent, candidate_receipt, pov, - ))) + )).into()) .await { Err(err) => { @@ -326,12 +248,12 @@ async fn second_candidate( async fn forward_invalidity_note( received_from: &CollatorId, - sender: &mut mpsc::Sender, + sender: &mut mpsc::Sender, ) -> Result<(), Error> { sender - .send(FromJob::Collator(CollatorProtocolMessage::ReportCollator( + .send(AllMessages::from(CollatorProtocolMessage::ReportCollator( received_from.clone(), - ))) + )).into()) .await .map_err(Into::into) } @@ -420,7 +342,7 @@ impl metrics::Metrics for Metrics { } } -delegated_subsystem!(CandidateSelectionJob((), Metrics) <- ToJob as CandidateSelectionSubsystem); +delegated_subsystem!(CandidateSelectionJob((), Metrics) <- CandidateSelectionMessage as CandidateSelectionSubsystem); #[cfg(test)] mod tests { @@ -436,7 +358,7 @@ mod tests { postconditions: Postconditions, ) where Preconditions: FnOnce(&mut CandidateSelectionJob), - TestBuilder: FnOnce(mpsc::Sender, mpsc::Receiver) -> Test, + TestBuilder: FnOnce(mpsc::Sender, mpsc::Receiver) -> Test, Test: Future, Postconditions: FnOnce(CandidateSelectionJob, Result<(), Error>), { @@ -453,7 +375,7 @@ mod tests { let (_, job_result) = futures::executor::block_on(future::join( test(to_job_tx, from_job_rx), - job.run_loop_borrowed(), + job.run_loop(), )); postconditions(job, job_result); @@ -479,12 +401,10 @@ mod tests { |_job| {}, |mut to_job, mut from_job| async move { to_job - .send(ToJob::CandidateSelection( - CandidateSelectionMessage::Collation( - relay_parent, - para_id, - collator_id_clone.clone(), - ), + .send(CandidateSelectionMessage::Collation( + relay_parent, + para_id, + collator_id_clone.clone(), )) .await .unwrap(); @@ -492,12 +412,12 @@ mod tests { while let Some(msg) = from_job.next().await { match msg { - FromJob::Collator(CollatorProtocolMessage::FetchCollation( + FromJobCommand::SendMessage(AllMessages::CollatorProtocol(CollatorProtocolMessage::FetchCollation( got_relay_parent, collator_id, got_para_id, return_sender, - )) => { + ))) => { assert_eq!(got_relay_parent, relay_parent); assert_eq!(got_para_id, para_id); assert_eq!(collator_id, collator_id_clone); @@ -506,11 +426,11 @@ mod tests { .send((candidate_receipt.clone(), pov.clone())) .unwrap(); } - FromJob::Backing(CandidateBackingMessage::Second( + FromJobCommand::SendMessage(AllMessages::CandidateBacking(CandidateBackingMessage::Second( got_relay_parent, got_candidate_receipt, got_pov, - )) => { + ))) => { assert_eq!(got_relay_parent, relay_parent); assert_eq!(got_candidate_receipt, candidate_receipt); assert_eq!(got_pov, pov); @@ -546,12 +466,10 @@ mod tests { |job| job.seconded_candidate = Some(prev_collator_id.clone()), |mut to_job, mut from_job| async move { to_job - .send(ToJob::CandidateSelection( - CandidateSelectionMessage::Collation( - relay_parent, - para_id, - collator_id_clone, - ), + .send(CandidateSelectionMessage::Collation( + relay_parent, + para_id, + collator_id_clone, )) .await .unwrap(); @@ -559,11 +477,11 @@ mod tests { while let Some(msg) = from_job.next().await { match msg { - FromJob::Backing(CandidateBackingMessage::Second( + FromJobCommand::SendMessage(AllMessages::CandidateBacking(CandidateBackingMessage::Second( _got_relay_parent, _got_candidate_receipt, _got_pov, - )) => { + ))) => { *was_seconded_clone.lock().await = true; } other => panic!("unexpected message from job: {:?}", other), @@ -595,18 +513,16 @@ mod tests { |job| job.seconded_candidate = Some(collator_id.clone()), |mut to_job, mut from_job| async move { to_job - .send(ToJob::CandidateSelection( - CandidateSelectionMessage::Invalid(relay_parent, candidate_receipt), - )) + .send(CandidateSelectionMessage::Invalid(relay_parent, candidate_receipt)) .await .unwrap(); std::mem::drop(to_job); while let Some(msg) = from_job.next().await { match msg { - FromJob::Collator(CollatorProtocolMessage::ReportCollator( + FromJobCommand::SendMessage(AllMessages::CollatorProtocol(CollatorProtocolMessage::ReportCollator( got_collator_id, - )) => { + ))) => { assert_eq!(got_collator_id, collator_id_clone); *sent_report_clone.lock().await = true; diff --git a/node/core/provisioner/src/lib.rs b/node/core/provisioner/src/lib.rs index 91655951cc0a..c013e11b8f00 100644 --- a/node/core/provisioner/src/lib.rs +++ b/node/core/provisioner/src/lib.rs @@ -26,99 +26,31 @@ use futures::{ }; use polkadot_node_subsystem::{ errors::{ChainApiError, RuntimeApiError}, - messages::{ - AllMessages, ChainApiMessage, ProvisionableData, ProvisionerInherentData, - ProvisionerMessage, RuntimeApiMessage, - }, + messages::{ChainApiMessage, ProvisionableData, ProvisionerInherentData, ProvisionerMessage, AllMessages}, }; use polkadot_node_subsystem_util::{ - self as util, - delegated_subsystem, FromJobCommand, - request_availability_cores, request_persisted_validation_data, JobTrait, ToJobTrait, - metrics::{self, prometheus}, + self as util, delegated_subsystem, FromJobCommand, + request_availability_cores, request_persisted_validation_data, JobTrait, metrics::{self, prometheus}, }; use polkadot_primitives::v1::{ BackedCandidate, BlockNumber, CoreState, Hash, OccupiedCoreAssumption, SignedAvailabilityBitfield, ValidatorIndex, }; -use std::{convert::TryFrom, pin::Pin}; -use std::collections::BTreeMap; +use std::{pin::Pin, collections::BTreeMap}; use thiserror::Error; const LOG_TARGET: &str = "provisioner"; struct ProvisioningJob { relay_parent: Hash, - sender: mpsc::Sender, - receiver: mpsc::Receiver, + sender: mpsc::Sender, + receiver: mpsc::Receiver, provisionable_data_channels: Vec>, backed_candidates: Vec, signed_bitfields: Vec, metrics: Metrics, } -/// This enum defines the messages that the provisioner is prepared to receive. -pub enum ToJob { - /// The provisioner message is the main input to the provisioner. - Provisioner(ProvisionerMessage), - /// This message indicates that the provisioner should shut itself down. - Stop, -} - -impl ToJobTrait for ToJob { - const STOP: Self = Self::Stop; - - fn relay_parent(&self) -> Option { - match self { - Self::Provisioner(pm) => pm.relay_parent(), - Self::Stop => None, - } - } -} - -impl TryFrom for ToJob { - type Error = (); - - fn try_from(msg: AllMessages) -> Result { - match msg { - AllMessages::Provisioner(pm) => Ok(Self::Provisioner(pm)), - _ => Err(()), - } - } -} - -impl From for ToJob { - fn from(pm: ProvisionerMessage) -> Self { - Self::Provisioner(pm) - } -} - -enum FromJob { - ChainApi(ChainApiMessage), - Runtime(RuntimeApiMessage), -} - -impl From for FromJobCommand { - fn from(from_job: FromJob) -> FromJobCommand { - FromJobCommand::SendMessage(match from_job { - FromJob::ChainApi(cam) => AllMessages::ChainApi(cam), - FromJob::Runtime(ram) => AllMessages::RuntimeApi(ram), - }) - } -} - -impl TryFrom for FromJob { - type Error = (); - - fn try_from(msg: AllMessages) -> Result { - match msg { - AllMessages::ChainApi(chain) => Ok(FromJob::ChainApi(chain)), - AllMessages::RuntimeApi(runtime) => Ok(FromJob::Runtime(runtime)), - _ => Err(()), - } - } -} - #[derive(Debug, Error)] enum Error { #[error(transparent)] @@ -141,8 +73,7 @@ enum Error { } impl JobTrait for ProvisioningJob { - type ToJob = ToJob; - type FromJob = FromJob; + type ToJob = ProvisionerMessage; type Error = Error; type RunArgs = (); type Metrics = Metrics; @@ -157,8 +88,8 @@ impl JobTrait for ProvisioningJob { relay_parent: Hash, _run_args: Self::RunArgs, metrics: Self::Metrics, - receiver: mpsc::Receiver, - sender: mpsc::Sender, + receiver: mpsc::Receiver, + sender: mpsc::Sender, ) -> Pin> + Send>> { async move { let job = ProvisioningJob::new(relay_parent, metrics, sender, receiver); @@ -175,8 +106,8 @@ impl ProvisioningJob { pub fn new( relay_parent: Hash, metrics: Metrics, - sender: mpsc::Sender, - receiver: mpsc::Receiver, + sender: mpsc::Sender, + receiver: mpsc::Receiver, ) -> Self { Self { relay_parent, @@ -190,13 +121,13 @@ impl ProvisioningJob { } async fn run_loop(mut self) -> Result<(), Error> { - while let Some(msg) = self.receiver.next().await { - use ProvisionerMessage::{ - ProvisionableData, RequestBlockAuthorshipData, RequestInherentData, - }; + use ProvisionerMessage::{ + ProvisionableData, RequestBlockAuthorshipData, RequestInherentData, + }; - match msg { - ToJob::Provisioner(RequestInherentData(_, return_sender)) => { + loop { + match self.receiver.next().await { + Some(RequestInherentData(_, return_sender)) => { let _timer = self.metrics.time_request_inherent_data(); if let Err(err) = send_inherent_data( @@ -214,10 +145,10 @@ impl ProvisioningJob { self.metrics.on_inherent_data_request(Ok(())); } } - ToJob::Provisioner(RequestBlockAuthorshipData(_, sender)) => { + Some(RequestBlockAuthorshipData(_, sender)) => { self.provisionable_data_channels.push(sender) } - ToJob::Provisioner(ProvisionableData(_, data)) => { + Some(ProvisionableData(_, data)) => { let _timer = self.metrics.time_provisionable_data(); let mut bad_indices = Vec::new(); @@ -252,7 +183,7 @@ impl ProvisioningJob { .map(|(_, item)| item) .collect(); } - ToJob::Stop => break, + None => break, } } @@ -298,7 +229,7 @@ async fn send_inherent_data( bitfields: &[SignedAvailabilityBitfield], candidates: &[BackedCandidate], return_sender: oneshot::Sender, - mut from_job: mpsc::Sender, + mut from_job: mpsc::Sender, ) -> Result<(), Error> { let availability_cores = request_availability_cores(relay_parent, &mut from_job) .await? @@ -368,7 +299,7 @@ async fn select_candidates( bitfields: &[SignedAvailabilityBitfield], candidates: &[BackedCandidate], relay_parent: Hash, - sender: &mut mpsc::Sender, + sender: &mut mpsc::Sender, ) -> Result, Error> { let block_number = get_block_number_under_construction(relay_parent, sender).await?; @@ -432,14 +363,14 @@ async fn select_candidates( #[tracing::instrument(level = "trace", skip(sender), fields(subsystem = LOG_TARGET))] async fn get_block_number_under_construction( relay_parent: Hash, - sender: &mut mpsc::Sender, + sender: &mut mpsc::Sender, ) -> Result { let (tx, rx) = oneshot::channel(); sender - .send(FromJob::ChainApi(ChainApiMessage::BlockNumber( + .send(AllMessages::from(ChainApiMessage::BlockNumber( relay_parent, tx, - ))) + )).into()) .await .map_err(|e| Error::ChainApiMessageSend(e))?; match rx.await? { @@ -558,7 +489,7 @@ impl metrics::Metrics for Metrics { } -delegated_subsystem!(ProvisioningJob((), Metrics) <- ToJob as ProvisioningSubsystem); +delegated_subsystem!(ProvisioningJob((), Metrics) <- ProvisionerMessage as ProvisioningSubsystem); #[cfg(test)] mod tests; diff --git a/node/core/provisioner/src/tests.rs b/node/core/provisioner/src/tests.rs index 3cf06b9e7586..1cfe0cf29fc3 100644 --- a/node/core/provisioner/src/tests.rs +++ b/node/core/provisioner/src/tests.rs @@ -193,13 +193,13 @@ mod select_candidates { use futures_timer::Delay; use super::super::*; use super::{build_occupied_core, default_bitvec, occupied_core, scheduled_core}; - use polkadot_node_subsystem::messages::RuntimeApiRequest::{ - AvailabilityCores, PersistedValidationData as PersistedValidationDataReq, + use polkadot_node_subsystem::messages::{ + AllMessages, RuntimeApiMessage, + RuntimeApiRequest::{AvailabilityCores, PersistedValidationData as PersistedValidationDataReq}, }; use polkadot_primitives::v1::{ BlockNumber, CandidateDescriptor, CommittedCandidateReceipt, PersistedValidationData, }; - use FromJob::{ChainApi, Runtime}; const BLOCK_UNDER_PRODUCTION: BlockNumber = 128; @@ -207,9 +207,9 @@ mod select_candidates { overseer_factory: OverseerFactory, test_factory: TestFactory, ) where - OverseerFactory: FnOnce(mpsc::Receiver) -> Overseer, + OverseerFactory: FnOnce(mpsc::Receiver) -> Overseer, Overseer: Future, - TestFactory: FnOnce(mpsc::Sender) -> Test, + TestFactory: FnOnce(mpsc::Sender) -> Test, Test: Future, { let (tx, rx) = mpsc::channel(64); @@ -297,20 +297,20 @@ mod select_candidates { ] } - async fn mock_overseer(mut receiver: mpsc::Receiver) { + async fn mock_overseer(mut receiver: mpsc::Receiver) { use ChainApiMessage::BlockNumber; use RuntimeApiMessage::Request; while let Some(from_job) = receiver.next().await { match from_job { - ChainApi(BlockNumber(_relay_parent, tx)) => { + FromJobCommand::SendMessage(AllMessages::ChainApi(BlockNumber(_relay_parent, tx))) => { tx.send(Ok(Some(BLOCK_UNDER_PRODUCTION - 1))).unwrap() } - Runtime(Request( + FromJobCommand::SendMessage(AllMessages::RuntimeApi(Request( _parent_hash, PersistedValidationDataReq(_para_id, _assumption, tx), - )) => tx.send(Ok(Some(Default::default()))).unwrap(), - Runtime(Request(_parent_hash, AvailabilityCores(tx))) => { + ))) => tx.send(Ok(Some(Default::default()))).unwrap(), + FromJobCommand::SendMessage(AllMessages::RuntimeApi(Request(_parent_hash, AvailabilityCores(tx)))) => { tx.send(Ok(mock_availability_cores())).unwrap() } // non-exhaustive matches are fine for testing @@ -321,14 +321,14 @@ mod select_candidates { #[test] fn handles_overseer_failure() { - let overseer = |rx: mpsc::Receiver| async move { + let overseer = |rx: mpsc::Receiver| async move { // drop the receiver so it closes and the sender can't send, then just sleep long enough that // this is almost certainly not the first of the two futures to complete std::mem::drop(rx); Delay::new(std::time::Duration::from_secs(1)).await; }; - let test = |mut tx: mpsc::Sender| async move { + let test = |mut tx: mpsc::Sender| async move { // wait so that the overseer can drop the rx before we attempt to send Delay::new(std::time::Duration::from_millis(50)).await; let result = select_candidates(&[], &[], &[], Default::default(), &mut tx).await; @@ -341,7 +341,7 @@ mod select_candidates { #[test] fn can_succeed() { - test_harness(mock_overseer, |mut tx: mpsc::Sender| async move { + test_harness(mock_overseer, |mut tx: mpsc::Sender| async move { let result = select_candidates(&[], &[], &[], Default::default(), &mut tx).await; println!("{:?}", result); assert!(result.is_ok()); @@ -403,7 +403,7 @@ mod select_candidates { .map(|&idx| candidates[idx].clone()) .collect(); - test_harness(mock_overseer, |mut tx: mpsc::Sender| async move { + test_harness(mock_overseer, |mut tx: mpsc::Sender| async move { let result = select_candidates(&mock_cores, &[], &candidates, Default::default(), &mut tx) .await; diff --git a/node/subsystem-util/src/lib.rs b/node/subsystem-util/src/lib.rs index 2620a1f25e49..8abfcad206cb 100644 --- a/node/subsystem-util/src/lib.rs +++ b/node/subsystem-util/src/lib.rs @@ -26,7 +26,7 @@ use polkadot_node_subsystem::{ errors::RuntimeApiError, - messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender}, + messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender, BoundToRelayParent}, FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult, }; use futures::{channel::{mpsc, oneshot}, prelude::*, select, stream::Stream}; @@ -56,6 +56,7 @@ use std::{ pin::Pin, task::{Poll, Context}, time::Duration, + fmt, }; use streamunordered::{StreamUnordered, StreamYield}; use thiserror::Error; @@ -117,18 +118,11 @@ pub async fn request_from_runtime( ) -> Result, Error> where RequestBuilder: FnOnce(RuntimeApiSender) -> RuntimeApiRequest, - FromJob: TryFrom, - >::Error: std::fmt::Debug, + FromJob: From, { let (tx, rx) = oneshot::channel(); - sender - .send( - AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx))) - .try_into() - .map_err(|err| Error::SenderConversion(format!("{:?}", err)))?, - ) - .await?; + sender.send(AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx))).into()).await?; Ok(rx) } @@ -157,8 +151,7 @@ macro_rules! specialize_requests { sender: &mut mpsc::Sender, ) -> Result, Error> where - FromJob: TryFrom, - >::Error: std::fmt::Debug, + FromJob: From, { request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant( $( $param_name, )* tx @@ -308,8 +301,7 @@ impl Validator { mut sender: mpsc::Sender, ) -> Result where - FromJob: TryFrom, - >::Error: std::fmt::Debug, + FromJob: From, { // Note: request_validators and request_session_index_for_child do not and cannot // run concurrently: they both have a mutable handle to the same sender. @@ -392,17 +384,6 @@ impl Validator { } } -/// ToJob is expected to be an enum declaring the set of messages of interest to a particular job. -/// -/// Normally, this will be some subset of `Allmessages`, and a `Stop` variant. -pub trait ToJobTrait: TryFrom { - /// The `Stop` variant of the ToJob enum. - const STOP: Self; - - /// If the message variant contains its relay parent, return it here - fn relay_parent(&self) -> Option; -} - struct AbortOnDrop(future::AbortHandle); impl Drop for AbortOnDrop { @@ -415,7 +396,6 @@ impl Drop for AbortOnDrop { struct JobHandle { _abort_handle: AbortOnDrop, to_job: mpsc::Sender, - finished: oneshot::Receiver<()>, } impl JobHandle { @@ -425,22 +405,6 @@ impl JobHandle { } } -impl JobHandle { - /// Stop this job gracefully. - /// - /// If it hasn't shut itself down after `JOB_GRACEFUL_STOP_DURATION`, abort it. - async fn stop(mut self) { - // we don't actually care if the message couldn't be sent - if self.to_job.send(ToJob::STOP).await.is_err() { - return; - } - - let stop_timer = Delay::new(JOB_GRACEFUL_STOP_DURATION); - - future::select(stop_timer, self.finished).await; - } -} - /// This module reexports Prometheus types and defines the [`Metrics`] trait. pub mod metrics { /// Reexport Substrate Prometheus types. @@ -486,16 +450,29 @@ pub enum FromJobCommand { SpawnBlocking(&'static str, Pin + Send>>), } +impl fmt::Debug for FromJobCommand { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::SendMessage(msg) => write!(fmt, "FromJobCommand::SendMessage({:?})", msg), + Self::Spawn(name, _) => write!(fmt, "FromJobCommand::Spawn({})", name), + Self::SpawnBlocking(name, _) => write!(fmt, "FromJobCommand::SpawnBlocking({})", name), + } + } +} + +impl From for FromJobCommand { + fn from(msg: AllMessages) -> Self { + Self::SendMessage(msg) + } +} + /// This trait governs jobs. /// /// Jobs are instantiated and killed automatically on appropriate overseer messages. -/// Other messages are passed along to and from the job via the overseer to other -/// subsystems. +/// Other messages are passed along to and from the job via the overseer to other subsystems. pub trait JobTrait: Unpin { - /// Message type to the job. Typically a subset of AllMessages. - type ToJob: 'static + ToJobTrait + Send; - /// Message type from the job. Typically a subset of AllMessages. - type FromJob: 'static + Into + Send; + /// Message type used to send messages to the job. + type ToJob: 'static + BoundToRelayParent + Send; /// Job runtime error. type Error: 'static + std::error::Error + Send; /// Extra arguments this job needs to run properly. @@ -512,13 +489,15 @@ pub trait JobTrait: Unpin { /// Name of the job, i.e. `CandidateBackingJob` const NAME: &'static str; - /// Run a job for the parent block indicated + /// Run a job for the given relay `parent`. + /// + /// The job should be ended when `receiver` returns `None`. fn run( parent: Hash, run_args: Self::RunArgs, metrics: Self::Metrics, receiver: mpsc::Receiver, - sender: mpsc::Sender, + sender: mpsc::Sender, ) -> Pin> + Send>>; } @@ -546,7 +525,7 @@ pub enum JobsError { pub struct Jobs { spawner: Spawner, running: HashMap>, - outgoing_msgs: StreamUnordered>, + outgoing_msgs: StreamUnordered>, #[pin] job: std::marker::PhantomData, errors: Option, JobsError)>>, @@ -585,7 +564,6 @@ impl Jobs { fn spawn_job(&mut self, parent_hash: Hash, run_args: Job::RunArgs, metrics: Job::Metrics) -> Result<(), Error> { let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY); let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY); - let (finished_tx, finished) = oneshot::channel(); let err_tx = self.errors.clone(); @@ -609,23 +587,13 @@ impl Jobs { } }); - let future = async move { - // job errors are already handled within the future, meaning - // that any errors here are due to the abortable mechanism. - // failure to abort isn't of interest. - let _ = future.await; - // transmission failure here is only possible if the receiver is closed, - // which means the handle is dropped, which means we don't care anymore - let _ = finished_tx.send(()); - }; - self.spawner.spawn(Job::NAME, future.boxed()); + self.spawner.spawn(Job::NAME, future.map(drop).boxed()); self.outgoing_msgs.push(from_job_rx); let handle = JobHandle { _abort_handle: AbortOnDrop(abort_handle), to_job: to_job_tx, - finished, }; self.running.insert(parent_hash, handle); @@ -635,9 +603,7 @@ impl Jobs { /// Stop the job associated with this `parent_hash`. pub async fn stop_job(&mut self, parent_hash: Hash) { - if let Some(handle) = self.running.remove(&parent_hash) { - handle.stop().await; - } + self.running.remove(&parent_hash); } /// Send a message to the appropriate job for this `parent_hash`. @@ -656,7 +622,7 @@ where Spawner: SpawnNamed, Job: JobTrait, { - type Item = Job::FromJob; + type Item = FromJobCommand; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { loop { @@ -706,7 +672,7 @@ where Context: SubsystemContext, Job: 'static + JobTrait, Job::RunArgs: Clone, - Job::ToJob: TryFrom + TryFrom<::Message> + Sync, + Job::ToJob: From<::Message> + Sync, { /// Creates a new `Subsystem`. pub fn new(spawner: Spawner, run_args: Job::RunArgs, metrics: Job::Metrics) -> Self { @@ -783,7 +749,7 @@ where } } - // if we have a channel on which to forward errors, do so + /// Forward a given error to the higher context using the given error channel. async fn fwd_err( hash: Option, err: JobsError, @@ -798,7 +764,9 @@ where } } - // handle an incoming message. return true if we should break afterwards. + /// Handle an incoming message. + /// + /// Returns `true` when this job manager should shutdown. async fn handle_incoming( incoming: SubsystemResult>, jobs: &mut Jobs, @@ -833,44 +801,12 @@ where } } Ok(Signal(Conclude)) => { - // Breaking the loop ends fn run, which drops `jobs`, which immediately drops all ongoing work. - // We can afford to wait a little while to shut them all down properly before doing that. - // - // Forwarding the stream to a drain means we wait until all of the items in the stream - // have completed. Contrast with `into_future`, which turns it into a future of `(head, rest_stream)`. - use futures::sink::drain; - use futures::stream::FuturesUnordered; - use futures::stream::StreamExt; - - if let Err(e) = jobs - .running - .drain() - .map(|(_, handle)| handle.stop()) - .collect::>() - .map(Ok) - .forward(drain()) - .await - { - tracing::error!( - job = Job::NAME, - err = ?e, - "failed to stop a job on conclude signal", - ); - let e = Error::from(e); - Self::fwd_err(None, JobsError::Utility(e), err_tx).await; - } - + jobs.running.clear(); return true; } Ok(Communication { msg }) => { if let Ok(to_job) = ::try_from(msg) { - match to_job.relay_parent() { - Some(hash) => jobs.send_msg(hash, to_job).await, - None => tracing::debug!( - job = Job::NAME, - "trying to send a message to a job without specifying a relay parent", - ), - } + jobs.send_msg(to_job.relay_parent(), to_job).await; } } Ok(Signal(BlockFinalized(_))) => {} @@ -889,11 +825,10 @@ where // handle a command from a job. async fn handle_from_job( - outgoing: Option, + outgoing: Option, ctx: &mut Context, ) -> SubsystemResult<()> { - let cmd: FromJobCommand = outgoing.expect("the Jobs stream never ends; qed").into(); - match cmd { + match outgoing.expect("the Jobs stream never ends; qed") { FromJobCommand::SendMessage(msg) => ctx.send_message(msg).await, FromJobCommand::Spawn(name, task) => ctx.spawn(name, task).await?, FromJobCommand::SpawnBlocking(name, task) => ctx.spawn_blocking(name, task).await?, @@ -907,10 +842,9 @@ impl Subsystem for JobManager::Message: Into, Job: 'static + JobTrait + Send, Job::RunArgs: Clone + Sync, - Job::ToJob: TryFrom + Sync, + Job::ToJob: From<::Message> + Sync, Job::Metrics: Sync, { fn start(self, ctx: Context) -> SpawnedSubsystem { @@ -985,7 +919,7 @@ macro_rules! delegated_subsystem { where Spawner: Clone + $crate::reexports::SpawnNamed + Send + Unpin, Context: $crate::reexports::SubsystemContext, - ::Message: Into<$to_job>, + $to_job: From<::Message>, { #[doc = "Creates a new "] #[doc = $subsystem_name] @@ -1006,7 +940,7 @@ macro_rules! delegated_subsystem { where Spawner: $crate::reexports::SpawnNamed + Send + Clone + Unpin + 'static, Context: $crate::reexports::SubsystemContext, - ::Message: Into<$to_job>, + $to_job: From<::Message>, { fn start(self, ctx: Context) -> $crate::reexports::SpawnedSubsystem { self.manager.start(ctx) @@ -1061,22 +995,17 @@ impl Future for Timeout { #[cfg(test)] mod tests { - use super::{JobManager, JobTrait, JobsError, TimeoutExt, ToJobTrait, FromJobCommand}; + use super::*; use thiserror::Error; use polkadot_node_subsystem::{ - messages::{AllMessages, CandidateSelectionMessage}, - ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, + messages::{AllMessages, CandidateSelectionMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal, + SpawnedSubsystem, }; use assert_matches::assert_matches; - use futures::{ - channel::mpsc, - executor, - stream::{self, StreamExt}, - future, Future, FutureExt, SinkExt, - }; + use futures::{channel::mpsc, executor, StreamExt, future, Future, FutureExt, SinkExt}; use polkadot_primitives::v1::Hash; use polkadot_node_subsystem_test_helpers::{self as test_helpers, make_subsystem_context}; - use std::{collections::HashMap, convert::TryFrom, pin::Pin, time::Duration}; + use std::{pin::Pin, time::Duration}; // basic usage: in a nutshell, when you want to define a subsystem, just focus on what its jobs do; // you can leave the subsystem itself to the job manager. @@ -1087,69 +1016,7 @@ mod tests { // job structs are constructed within JobTrait::run // most will want to retain the sender and receiver, as well as whatever other data they like struct FakeCandidateSelectionJob { - receiver: mpsc::Receiver, - } - - // ToJob implementations require the following properties: - // - // - have a Stop variant (to impl ToJobTrait) - // - impl ToJobTrait - // - impl TryFrom - // - impl From (from SubsystemContext::Message) - // - // Mostly, they are just a type-safe subset of AllMessages that this job is prepared to receive - enum ToJob { - CandidateSelection(CandidateSelectionMessage), - Stop, - } - - impl ToJobTrait for ToJob { - const STOP: Self = ToJob::Stop; - - fn relay_parent(&self) -> Option { - match self { - Self::CandidateSelection(csm) => csm.relay_parent(), - Self::Stop => None, - } - } - } - - impl TryFrom for ToJob { - type Error = (); - - fn try_from(msg: AllMessages) -> Result { - match msg { - AllMessages::CandidateSelection(csm) => Ok(ToJob::CandidateSelection(csm)), - _ => Err(()), - } - } - } - - impl From for ToJob { - fn from(csm: CandidateSelectionMessage) -> ToJob { - ToJob::CandidateSelection(csm) - } - } - - // FromJob must be infallibly convertable into FromJobCommand. - // - // It exists to be a type-safe subset of FromJobCommand that this job is specified to send. - // - // Note: the Clone impl here is not generally required; it's just ueful for this test context because - // we include it in the RunArgs - #[derive(Clone)] - enum FromJob { - Test, - } - - impl From for FromJobCommand { - fn from(from_job: FromJob) -> FromJobCommand { - match from_job { - FromJob::Test => FromJobCommand::SendMessage( - AllMessages::CandidateSelection(CandidateSelectionMessage::default()) - ), - } - } + receiver: mpsc::Receiver, } // Error will mostly be a wrapper to make the try operator more convenient; @@ -1162,17 +1029,9 @@ mod tests { } impl JobTrait for FakeCandidateSelectionJob { - type ToJob = ToJob; - type FromJob = FromJob; + type ToJob = CandidateSelectionMessage; type Error = Error; - // RunArgs can be anything that a particular job needs supplied from its external context - // in order to create the Job. In this case, they're a hashmap of parents to the mock outputs - // expected from that job. - // - // Note that it's not recommended to use something as heavy as a hashmap in production: the - // RunArgs get cloned so that each job gets its own owned copy. If you need that, wrap it in - // an Arc. Within a testing context, that efficiency is less important. - type RunArgs = HashMap>; + type RunArgs = bool; type Metrics = (); const NAME: &'static str = "FakeCandidateSelectionJob"; @@ -1181,21 +1040,23 @@ mod tests { // // this function is in charge of creating and executing the job's main loop fn run( - parent: Hash, - mut run_args: Self::RunArgs, + _: Hash, + run_args: Self::RunArgs, _metrics: Self::Metrics, - receiver: mpsc::Receiver, - mut sender: mpsc::Sender, + receiver: mpsc::Receiver, + mut sender: mpsc::Sender, ) -> Pin> + Send>> { async move { let job = FakeCandidateSelectionJob { receiver }; - // most jobs will have a request-response cycle at the heart of their run loop. - // however, in this case, we never receive valid messages, so we may as well - // just send all of our (mock) output messages now - let mock_output = run_args.remove(&parent).unwrap_or_default(); - let mut stream = stream::iter(mock_output.into_iter().map(Ok)); - sender.send_all(&mut stream).await?; + if run_args { + sender.send(FromJobCommand::SendMessage( + CandidateSelectionMessage::Invalid( + Default::default(), + Default::default(), + ).into(), + )).await?; + } // it isn't necessary to break run_loop into its own function, // but it's convenient to separate the concerns in this way @@ -1207,12 +1068,12 @@ mod tests { impl FakeCandidateSelectionJob { async fn run_loop(mut self) -> Result<(), Error> { - while let Some(msg) = self.receiver.next().await { - match msg { - ToJob::CandidateSelection(_csm) => { + loop { + match self.receiver.next().await { + Some(_csm) => { unimplemented!("we'd report the collator to the peer set manager here, but that's not implemented yet"); } - ToJob::Stop => break, + None => break, } } @@ -1228,7 +1089,7 @@ mod tests { type OverseerHandle = test_helpers::TestSubsystemContextHandle; fn test_harness>( - run_args: HashMap>, + run_args: bool, test: impl FnOnce(OverseerHandle, mpsc::Receiver<(Option, JobsError)>) -> T, ) { let _ = env_logger::builder() @@ -1259,13 +1120,8 @@ mod tests { #[test] fn starting_and_stopping_job_works() { let relay_parent: Hash = [0; 32].into(); - let mut run_args = HashMap::new(); - let _ = run_args.insert( - relay_parent.clone(), - vec![FromJob::Test], - ); - test_harness(run_args, |mut overseer_handle, err_rx| async move { + test_harness(true, |mut overseer_handle, err_rx| async move { overseer_handle .send(FromOverseer::Signal(OverseerSignal::ActiveLeaves( ActiveLeavesUpdate::start_work(relay_parent), @@ -1293,13 +1149,8 @@ mod tests { #[test] fn sending_to_a_non_running_job_do_not_stop_the_subsystem() { let relay_parent = Hash::repeat_byte(0x01); - let mut run_args = HashMap::new(); - let _ = run_args.insert( - relay_parent.clone(), - vec![FromJob::Test], - ); - test_harness(run_args, |mut overseer_handle, err_rx| async move { + test_harness(true, |mut overseer_handle, err_rx| async move { overseer_handle .send(FromOverseer::Signal(OverseerSignal::ActiveLeaves( ActiveLeavesUpdate::start_work(relay_parent), @@ -1334,7 +1185,7 @@ mod tests { let (context, _) = make_subsystem_context::(pool.clone()); let SpawnedSubsystem { name, .. } = - FakeCandidateSelectionSubsystem::new(pool, HashMap::new(), ()).start(context); + FakeCandidateSelectionSubsystem::new(pool, false, ()).start(context); assert_eq!(name, "FakeCandidateSelection"); } } diff --git a/node/subsystem/src/messages.rs b/node/subsystem/src/messages.rs index eef51fe47a4b..727814381f0b 100644 --- a/node/subsystem/src/messages.rs +++ b/node/subsystem/src/messages.rs @@ -39,8 +39,13 @@ use polkadot_primitives::v1::{ ValidationCode, ValidatorId, ValidationData, CandidateHash, ValidatorIndex, ValidatorSignature, InboundDownwardMessage, InboundHrmpMessage, }; -use std::sync::Arc; -use std::collections::btree_map::BTreeMap; +use std::{sync::Arc, collections::btree_map::BTreeMap}; + +/// Subsystem messages where each message is always bound to a relay parent. +pub trait BoundToRelayParent { + /// Returns the relay parent this message is bound to. + fn relay_parent(&self) -> Hash; +} /// A notification of a new backed candidate. #[derive(Debug)] @@ -56,12 +61,11 @@ pub enum CandidateSelectionMessage { Invalid(Hash, CandidateReceipt), } -impl CandidateSelectionMessage { - /// If the current variant contains the relay parent hash, return it. - pub fn relay_parent(&self) -> Option { +impl BoundToRelayParent for CandidateSelectionMessage { + fn relay_parent(&self) -> Hash { match self { - Self::Collation(hash, ..) => Some(*hash), - Self::Invalid(hash, _) => Some(*hash), + Self::Collation(hash, ..) => *hash, + Self::Invalid(hash, _) => *hash, } } } @@ -86,13 +90,12 @@ pub enum CandidateBackingMessage { Statement(Hash, SignedFullStatement), } -impl CandidateBackingMessage { - /// If the current variant contains the relay parent hash, return it. - pub fn relay_parent(&self) -> Option { +impl BoundToRelayParent for CandidateBackingMessage { + fn relay_parent(&self) -> Hash { match self { - Self::GetBackedCandidates(hash, _) => Some(*hash), - Self::Second(hash, _, _) => Some(*hash), - Self::Statement(hash, _) => Some(*hash), + Self::GetBackedCandidates(hash, _) => *hash, + Self::Second(hash, _, _) => *hash, + Self::Statement(hash, _) => *hash, } } } @@ -273,10 +276,9 @@ impl BitfieldDistributionMessage { #[derive(Debug)] pub enum BitfieldSigningMessage {} -impl BitfieldSigningMessage { - /// If the current variant contains the relay parent hash, return it. - pub fn relay_parent(&self) -> Option { - None +impl BoundToRelayParent for BitfieldSigningMessage { + fn relay_parent(&self) -> Hash { + match *self {} } } @@ -525,13 +527,12 @@ pub enum ProvisionerMessage { ProvisionableData(Hash, ProvisionableData), } -impl ProvisionerMessage { - /// If the current variant contains the relay parent hash, return it. - pub fn relay_parent(&self) -> Option { +impl BoundToRelayParent for ProvisionerMessage { + fn relay_parent(&self) -> Hash { match self { - Self::RequestBlockAuthorshipData(hash, _) => Some(*hash), - Self::RequestInherentData(hash, _) => Some(*hash), - Self::ProvisionableData(hash, _) => Some(*hash), + Self::RequestBlockAuthorshipData(hash, _) => *hash, + Self::RequestInherentData(hash, _) => *hash, + Self::ProvisionableData(hash, _) => *hash, } } }