Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Simplify subsystem jobs #2037

Merged
merged 3 commits into from
Nov 30, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 6 additions & 11 deletions node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,6 @@ impl TableContextTrait for TableContext {
pub enum ToJob {
/// A `CandidateBackingMessage`.
CandidateBacking(CandidateBackingMessage),
/// Stop working.
Stop,
}

impl TryFrom<AllMessages> for ToJob {
Expand All @@ -177,12 +175,9 @@ impl From<CandidateBackingMessage> for ToJob {
}

impl util::ToJobTrait for ToJob {
const STOP: Self = ToJob::Stop;

fn relay_parent(&self) -> Option<Hash> {
fn relay_parent(&self) -> Hash {
match self {
Self::CandidateBacking(cb) => cb.relay_parent(),
Self::Stop => None,
}
}
}
Expand Down Expand Up @@ -301,12 +296,12 @@ fn table_attested_to_backed(
impl CandidateBackingJob {
/// Run asynchronously.
async fn run_loop(mut self) -> Result<(), Error> {
while let Some(msg) = self.rx_to.next().await {
match msg {
ToJob::CandidateBacking(msg) => {
loop {
match self.rx_to.next().await {
Some(ToJob::CandidateBacking(msg)) => {
self.process_msg(msg).await?;
}
ToJob::Stop => break,
},
None => break,
}
}

Expand Down
6 changes: 1 addition & 5 deletions node/core/bitfield-signing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,12 @@ pub struct BitfieldSigningJob;
#[allow(missing_docs)]
pub enum ToJob {
BitfieldSigning(BitfieldSigningMessage),
Stop,
}

impl ToJobTrait for ToJob {
const STOP: Self = ToJob::Stop;

fn relay_parent(&self) -> Option<Hash> {
fn relay_parent(&self) -> Hash {
match self {
Self::BitfieldSigning(bsm) => bsm.relay_parent(),
Self::Stop => None,
}
}
}
Expand Down
40 changes: 13 additions & 27 deletions node/core/candidate-selection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,12 @@ struct CandidateSelectionJob {
pub enum ToJob {
/// The provisioner message is the main input to the provisioner.
CandidateSelection(CandidateSelectionMessage),
/// This message indicates that the provisioner should shut itself down.
Stop,
}

impl ToJobTrait for ToJob {
const STOP: Self = Self::Stop;

fn relay_parent(&self) -> Option<Hash> {
fn relay_parent(&self) -> Hash {
match self {
Self::CandidateSelection(csm) => csm.relay_parent(),
Self::Stop => None,
}
}
}
Expand Down Expand Up @@ -142,13 +137,9 @@ impl JobTrait for CandidateSelectionJob {
receiver: mpsc::Receiver<ToJob>,
sender: mpsc::Sender<FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
Box::pin(async move {
let job = CandidateSelectionJob::new(metrics, sender, receiver);

// it isn't necessary to break run_loop into its own function,
// but it's convenient to separate the concerns in this way
job.run_loop().await
})
async move {
CandidateSelectionJob::new(metrics, sender, receiver).run_loop().await
}.boxed()
}
}

Expand All @@ -166,28 +157,23 @@ impl CandidateSelectionJob {
}
}

async fn run_loop(mut self) -> Result<(), Error> {
self.run_loop_borrowed().await
}

/// this function exists for testing and should not generally be used; use `run_loop` instead.
async fn run_loop_borrowed(&mut self) -> Result<(), Error> {
while let Some(msg) = self.receiver.next().await {
match msg {
ToJob::CandidateSelection(CandidateSelectionMessage::Collation(
async fn run_loop(&mut self) -> Result<(), Error> {
loop {
match self.receiver.next().await {
Some(ToJob::CandidateSelection(CandidateSelectionMessage::Collation(
relay_parent,
para_id,
collator_id,
)) => {
))) => {
self.handle_collation(relay_parent, para_id, collator_id).await;
}
ToJob::CandidateSelection(CandidateSelectionMessage::Invalid(
Some(ToJob::CandidateSelection(CandidateSelectionMessage::Invalid(
_,
candidate_receipt,
)) => {
))) => {
self.handle_invalid(candidate_receipt).await;
}
ToJob::Stop => break,
None => break,
}
}

Expand Down Expand Up @@ -453,7 +439,7 @@ mod tests {

let (_, job_result) = futures::executor::block_on(future::join(
test(to_job_tx, from_job_rx),
job.run_loop_borrowed(),
job.run_loop(),
));

postconditions(job, job_result);
Expand Down
25 changes: 10 additions & 15 deletions node/core/provisioner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,12 @@ struct ProvisioningJob {
pub enum ToJob {
/// The provisioner message is the main input to the provisioner.
Provisioner(ProvisionerMessage),
/// This message indicates that the provisioner should shut itself down.
Stop,
}

impl ToJobTrait for ToJob {
const STOP: Self = Self::Stop;

fn relay_parent(&self) -> Option<Hash> {
fn relay_parent(&self) -> Hash {
match self {
Self::Provisioner(pm) => pm.relay_parent(),
Self::Stop => None,
}
}
}
Expand Down Expand Up @@ -190,13 +185,13 @@ impl ProvisioningJob {
}

async fn run_loop(mut self) -> Result<(), Error> {
while let Some(msg) = self.receiver.next().await {
use ProvisionerMessage::{
ProvisionableData, RequestBlockAuthorshipData, RequestInherentData,
};
use ProvisionerMessage::{
ProvisionableData, RequestBlockAuthorshipData, RequestInherentData,
};

match msg {
ToJob::Provisioner(RequestInherentData(_, return_sender)) => {
loop {
match self.receiver.next().await {
Some(ToJob::Provisioner(RequestInherentData(_, return_sender))) => {
let _timer = self.metrics.time_request_inherent_data();

if let Err(err) = send_inherent_data(
Expand All @@ -214,10 +209,10 @@ impl ProvisioningJob {
self.metrics.on_inherent_data_request(Ok(()));
}
}
ToJob::Provisioner(RequestBlockAuthorshipData(_, sender)) => {
Some(ToJob::Provisioner(RequestBlockAuthorshipData(_, sender))) => {
self.provisionable_data_channels.push(sender)
}
ToJob::Provisioner(ProvisionableData(_, data)) => {
Some(ToJob::Provisioner(ProvisionableData(_, data))) => {
let _timer = self.metrics.time_provisionable_data();

let mut bad_indices = Vec::new();
Expand Down Expand Up @@ -252,7 +247,7 @@ impl ProvisioningJob {
.map(|(_, item)| item)
.collect();
}
ToJob::Stop => break,
None => break,
}
}

Expand Down
Loading