Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Simplify subsystem jobs #2037

Merged
merged 3 commits into from
Nov 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

129 changes: 23 additions & 106 deletions node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ use polkadot_subsystem::{
messages::{
AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData,
ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, ValidationFailed,
RuntimeApiRequest,
ProvisionerMessage, StatementDistributionMessage, ValidationFailed, RuntimeApiRequest,
},
};
use polkadot_node_subsystem_util::{
Expand Down Expand Up @@ -93,9 +92,9 @@ struct CandidateBackingJob {
/// The hash of the relay parent on top of which this job is doing it's work.
parent: Hash,
/// Inbound message channel receiving part.
rx_to: mpsc::Receiver<ToJob>,
rx_to: mpsc::Receiver<CandidateBackingMessage>,
/// Outbound message channel sending part.
tx_from: mpsc::Sender<FromJob>,
tx_from: mpsc::Sender<FromJobCommand>,
/// The `ParaId` assigned to this validator
assignment: ParaId,
/// The collator required to author the candidate, if any.
Expand Down Expand Up @@ -151,84 +150,6 @@ impl TableContextTrait for TableContext {
}
}

/// A message type that is sent from `CandidateBackingSubsystem` to `CandidateBackingJob`.
pub enum ToJob {
/// A `CandidateBackingMessage`.
CandidateBacking(CandidateBackingMessage),
/// Stop working.
Stop,
}

impl TryFrom<AllMessages> for ToJob {
type Error = ();

fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
match msg {
AllMessages::CandidateBacking(msg) => Ok(ToJob::CandidateBacking(msg)),
_ => Err(()),
}
}
}

impl From<CandidateBackingMessage> for ToJob {
fn from(msg: CandidateBackingMessage) -> Self {
Self::CandidateBacking(msg)
}
}

impl util::ToJobTrait for ToJob {
const STOP: Self = ToJob::Stop;

fn relay_parent(&self) -> Option<Hash> {
match self {
Self::CandidateBacking(cb) => cb.relay_parent(),
Self::Stop => None,
}
}
}

/// A message type that is sent from `CandidateBackingJob` to `CandidateBackingSubsystem`.
enum FromJob {
AvailabilityStore(AvailabilityStoreMessage),
RuntimeApiMessage(RuntimeApiMessage),
CandidateValidation(CandidateValidationMessage),
CandidateSelection(CandidateSelectionMessage),
Provisioner(ProvisionerMessage),
PoVDistribution(PoVDistributionMessage),
StatementDistribution(StatementDistributionMessage),
}

impl From<FromJob> for FromJobCommand {
fn from(f: FromJob) -> FromJobCommand {
FromJobCommand::SendMessage(match f {
FromJob::AvailabilityStore(msg) => AllMessages::AvailabilityStore(msg),
FromJob::RuntimeApiMessage(msg) => AllMessages::RuntimeApi(msg),
FromJob::CandidateValidation(msg) => AllMessages::CandidateValidation(msg),
FromJob::CandidateSelection(msg) => AllMessages::CandidateSelection(msg),
FromJob::StatementDistribution(msg) => AllMessages::StatementDistribution(msg),
FromJob::PoVDistribution(msg) => AllMessages::PoVDistribution(msg),
FromJob::Provisioner(msg) => AllMessages::Provisioner(msg),
})
}
}

impl TryFrom<AllMessages> for FromJob {
type Error = &'static str;

fn try_from(f: AllMessages) -> Result<Self, Self::Error> {
match f {
AllMessages::AvailabilityStore(msg) => Ok(FromJob::AvailabilityStore(msg)),
AllMessages::RuntimeApi(msg) => Ok(FromJob::RuntimeApiMessage(msg)),
AllMessages::CandidateValidation(msg) => Ok(FromJob::CandidateValidation(msg)),
AllMessages::CandidateSelection(msg) => Ok(FromJob::CandidateSelection(msg)),
AllMessages::StatementDistribution(msg) => Ok(FromJob::StatementDistribution(msg)),
AllMessages::PoVDistribution(msg) => Ok(FromJob::PoVDistribution(msg)),
AllMessages::Provisioner(msg) => Ok(FromJob::Provisioner(msg)),
_ => Err("can't convert this AllMessages variant to FromJob"),
}
}
}

struct InvalidErasureRoot;

// It looks like it's not possible to do an `impl From` given the current state of
Expand Down Expand Up @@ -301,12 +222,10 @@ fn table_attested_to_backed(
impl CandidateBackingJob {
/// Run asynchronously.
async fn run_loop(mut self) -> Result<(), Error> {
while let Some(msg) = self.rx_to.next().await {
match msg {
ToJob::CandidateBacking(msg) => {
self.process_msg(msg).await?;
}
ToJob::Stop => break,
loop {
match self.rx_to.next().await {
Some(msg) => self.process_msg(msg).await?,
None => break,
}
}

Expand All @@ -317,9 +236,7 @@ impl CandidateBackingJob {
&mut self,
candidate: CandidateReceipt,
) -> Result<(), Error> {
self.tx_from.send(FromJob::CandidateSelection(
CandidateSelectionMessage::Invalid(self.parent, candidate)
)).await?;
self.tx_from.send(AllMessages::from(CandidateSelectionMessage::Invalid(self.parent, candidate)).into()).await?;

Ok(())
}
Expand Down Expand Up @@ -664,7 +581,7 @@ impl CandidateBackingJob {
}

async fn send_to_provisioner(&mut self, msg: ProvisionerMessage) -> Result<(), Error> {
self.tx_from.send(FromJob::Provisioner(msg)).await?;
self.tx_from.send(AllMessages::from(msg).into()).await?;

Ok(())
}
Expand All @@ -674,9 +591,9 @@ impl CandidateBackingJob {
descriptor: CandidateDescriptor,
pov: Arc<PoV>,
) -> Result<(), Error> {
self.tx_from.send(FromJob::PoVDistribution(
self.tx_from.send(AllMessages::from(
PoVDistributionMessage::DistributePoV(self.parent, descriptor, pov),
)).await.map_err(Into::into)
).into()).await.map_err(Into::into)
}

async fn request_pov_from_distribution(
Expand All @@ -685,9 +602,9 @@ impl CandidateBackingJob {
) -> Result<Arc<PoV>, Error> {
let (tx, rx) = oneshot::channel();

self.tx_from.send(FromJob::PoVDistribution(
self.tx_from.send(AllMessages::from(
PoVDistributionMessage::FetchPoV(self.parent, descriptor, tx)
)).await?;
).into()).await?;

Ok(rx.await?)
}
Expand All @@ -699,13 +616,14 @@ impl CandidateBackingJob {
) -> Result<ValidationResult, Error> {
let (tx, rx) = oneshot::channel();

self.tx_from.send(FromJob::CandidateValidation(
self.tx_from.send(
AllMessages::from(
CandidateValidationMessage::ValidateFromChainState(
candidate,
pov,
tx,
)
)
).into(),
).await?;

Ok(rx.await??)
Expand All @@ -719,15 +637,15 @@ impl CandidateBackingJob {
available_data: AvailableData,
) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.tx_from.send(FromJob::AvailabilityStore(
self.tx_from.send(AllMessages::from(
AvailabilityStoreMessage::StoreAvailableData(
candidate_hash,
id,
n_validators,
available_data,
tx,
)
)
).into(),
).await?;

let _ = rx.await?;
Expand Down Expand Up @@ -777,15 +695,14 @@ impl CandidateBackingJob {
async fn distribute_signed_statement(&mut self, s: SignedFullStatement) -> Result<(), Error> {
let smsg = StatementDistributionMessage::Share(self.parent, s);

self.tx_from.send(FromJob::StatementDistribution(smsg)).await?;
self.tx_from.send(AllMessages::from(smsg).into()).await?;

Ok(())
}
}

impl util::JobTrait for CandidateBackingJob {
type ToJob = ToJob;
type FromJob = FromJob;
type ToJob = CandidateBackingMessage;
type Error = Error;
type RunArgs = SyncCryptoStorePtr;
type Metrics = Metrics;
Expand All @@ -798,7 +715,7 @@ impl util::JobTrait for CandidateBackingJob {
keystore: SyncCryptoStorePtr,
metrics: Metrics,
rx_to: mpsc::Receiver<Self::ToJob>,
mut tx_from: mpsc::Sender<Self::FromJob>,
mut tx_from: mpsc::Sender<FromJobCommand>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
macro_rules! try_runtime_api {
Expand Down Expand Up @@ -1000,7 +917,7 @@ impl metrics::Metrics for Metrics {
}
}

delegated_subsystem!(CandidateBackingJob(SyncCryptoStorePtr, Metrics) <- ToJob as CandidateBackingSubsystem);
delegated_subsystem!(CandidateBackingJob(SyncCryptoStorePtr, Metrics) <- CandidateBackingMessage as CandidateBackingSubsystem);

#[cfg(test)]
mod tests {
Expand All @@ -1013,7 +930,7 @@ mod tests {
GroupRotationInfo,
};
use polkadot_subsystem::{
messages::RuntimeApiRequest,
messages::{RuntimeApiRequest, RuntimeApiMessage},
ActiveLeavesUpdate, FromOverseer, OverseerSignal,
};
use polkadot_node_primitives::InvalidCandidate;
Expand Down
1 change: 0 additions & 1 deletion node/core/bitfield-signing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,3 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
wasm-timer = "0.2.5"
thiserror = "1.0.22"
derive_more = "0.99.11"
Loading