Skip to content

Commit

Permalink
Revert "Make candidate validation bounded again (#2125)"
Browse files Browse the repository at this point in the history
This reverts commit d37a456.
  • Loading branch information
alexggh committed Jan 29, 2024
1 parent 84a246c commit e9bb9e5
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 201 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cumulus/client/relay-chain-inprocess-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ sp-keyring = { path = "../../../substrate/primitives/keyring" }
# Polkadot
polkadot-primitives = { path = "../../../polkadot/primitives" }
polkadot-test-client = { path = "../../../polkadot/node/test/client" }
metered = { package = "prioritized-metered-channel", version = "0.6.1", default-features = false, features = ["futures_channel"] }
metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features = ["futures_channel"] }

# Cumulus
cumulus-test-service = { path = "../../test/service" }
221 changes: 98 additions & 123 deletions polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,10 @@ use polkadot_primitives::{

use parity_scale_codec::Encode;

use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered};
use futures::{channel::oneshot, prelude::*};

use std::{
path::PathBuf,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
Expand All @@ -82,11 +81,6 @@ const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3);
#[cfg(test)]
const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);

// The task queue size is chosen to be somewhat bigger than the PVF host incoming queue size
// to allow exhaustive validation messages to fall through in case the tasks are clogged with
// `ValidateFromChainState` messages awaiting data from the runtime
const TASK_LIMIT: usize = 30;

/// Configuration for the candidate validation subsystem
#[derive(Clone)]
pub struct Config {
Expand Down Expand Up @@ -136,83 +130,6 @@ impl<Context> CandidateValidationSubsystem {
}
}

fn handle_validation_message<S>(
mut sender: S,
validation_host: ValidationHost,
metrics: Metrics,
msg: CandidateValidationMessage,
) -> Pin<Box<dyn Future<Output = ()> + Send>>
where
S: SubsystemSender<RuntimeApiMessage>,
{
match msg {
CandidateValidationMessage::ValidateFromChainState {
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => async move {
let _timer = metrics.time_validate_from_chain_state();
let res = validate_from_chain_state(
&mut sender,
validation_host,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
.boxed(),
CandidateValidationMessage::ValidateFromExhaustive {
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => async move {
let _timer = metrics.time_validate_from_exhaustive();
let res = validate_candidate_exhaustive(
validation_host,
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
.boxed(),
CandidateValidationMessage::PreCheck {
relay_parent,
validation_code_hash,
response_sender,
..
} => async move {
let precheck_result =
precheck_pvf(&mut sender, validation_host, relay_parent, validation_code_hash)
.await;

let _ = response_sender.send(precheck_result);
}
.boxed(),
}
}

#[overseer::contextbounds(CandidateValidation, prefix = self::overseer)]
async fn run<Context>(
mut ctx: Context,
Expand All @@ -239,48 +156,106 @@ async fn run<Context>(
.await?;
ctx.spawn_blocking("pvf-validation-host", task.boxed())?;

let mut tasks = FuturesUnordered::new();

loop {
loop {
futures::select! {
comm = ctx.recv().fuse() => {
match comm {
Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_))) => {},
Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {},
Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()),
Ok(FromOrchestra::Communication { msg }) => {
let task = handle_validation_message(ctx.sender().clone(), validation_host.clone(), metrics.clone(), msg);
tasks.push(task);
if tasks.len() >= TASK_LIMIT {
break
}
},
Err(e) => return Err(SubsystemError::from(e)),
}
match ctx.recv().await? {
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_)) => {},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Communication { msg } => match msg {
CandidateValidationMessage::ValidateFromChainState {
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => {
let bg = {
let mut sender = ctx.sender().clone();
let metrics = metrics.clone();
let validation_host = validation_host.clone();

async move {
let _timer = metrics.time_validate_from_chain_state();
let res = validate_from_chain_state(
&mut sender,
validation_host,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
};

ctx.spawn("validate-from-chain-state", bg.boxed())?;
},
_ = tasks.select_next_some() => ()
}
}

gum::debug!(target: LOG_TARGET, "Validation task limit hit");

loop {
futures::select! {
signal = ctx.recv_signal().fuse() => {
match signal {
Ok(OverseerSignal::ActiveLeaves(_)) => {},
Ok(OverseerSignal::BlockFinalized(..)) => {},
Ok(OverseerSignal::Conclude) => return Ok(()),
Err(e) => return Err(SubsystemError::from(e)),
}
CandidateValidationMessage::ValidateFromExhaustive {
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => {
let bg = {
let metrics = metrics.clone();
let validation_host = validation_host.clone();

async move {
let _timer = metrics.time_validate_from_exhaustive();
let res = validate_candidate_exhaustive(
validation_host,
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
};

ctx.spawn("validate-from-exhaustive", bg.boxed())?;
},
_ = tasks.select_next_some() => {
if tasks.len() < TASK_LIMIT {
break
}
}
}
CandidateValidationMessage::PreCheck {
relay_parent,
validation_code_hash,
response_sender,
..
} => {
let bg = {
let mut sender = ctx.sender().clone();
let validation_host = validation_host.clone();

async move {
let precheck_result = precheck_pvf(
&mut sender,
validation_host,
relay_parent,
validation_code_hash,
)
.await;

let _ = response_sender.send(precheck_result);
}
};

ctx.spawn("candidate-validation-pre-check", bg.boxed())?;
},
},
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions polkadot/node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ pub const PREPARE_BINARY_NAME: &str = "polkadot-prepare-worker";
/// The name of binary spawned to execute a PVF
pub const EXECUTE_BINARY_NAME: &str = "polkadot-execute-worker";

/// The size of incoming message queue
pub const HOST_MESSAGE_QUEUE_SIZE: usize = 10;

/// An alias to not spell the type for the oneshot sender for the PVF execution result.
pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;

Expand Down Expand Up @@ -230,7 +227,7 @@ pub async fn start(
Err(err) => return Err(SubsystemError::Context(err)),
};

let (to_host_tx, to_host_rx) = mpsc::channel(HOST_MESSAGE_QUEUE_SIZE);
let (to_host_tx, to_host_rx) = mpsc::channel(10);

let validation_host = ValidationHost { to_host_tx, security_status: security_status.clone() };

Expand Down
5 changes: 1 addition & 4 deletions polkadot/node/core/pvf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,7 @@ mod worker_interface;
pub mod testing;

pub use error::{InvalidCandidate, PossiblyInvalidError, ValidationError};
pub use host::{
start, Config, ValidationHost, EXECUTE_BINARY_NAME, HOST_MESSAGE_QUEUE_SIZE,
PREPARE_BINARY_NAME,
};
pub use host::{start, Config, ValidationHost, EXECUTE_BINARY_NAME, PREPARE_BINARY_NAME};
pub use metrics::Metrics;
pub use priority::Priority;
pub use worker_interface::{framed_recv, framed_send, JOB_TIMEOUT_WALL_CLOCK_FACTOR};
Expand Down
21 changes: 2 additions & 19 deletions polkadot/node/malus/src/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
use polkadot_node_subsystem::*;
pub use polkadot_node_subsystem::{messages::*, overseer, FromOrchestra};
use std::{collections::VecDeque, future::Future, pin::Pin};
use std::{future::Future, pin::Pin};

/// Filter incoming and outgoing messages.
pub trait MessageInterceptor<Sender>: Send + Sync + Clone + 'static
Expand Down Expand Up @@ -170,7 +170,6 @@ where
inner: Context,
message_filter: Fil,
sender: InterceptedSender<<Context as overseer::SubsystemContext>::Sender, Fil>,
message_buffer: VecDeque<FromOrchestra<<Context as overseer::SubsystemContext>::Message>>,
}

impl<Context, Fil> InterceptedContext<Context, Fil>
Expand All @@ -190,7 +189,7 @@ where
inner: inner.sender().clone(),
message_filter: message_filter.clone(),
};
Self { inner, message_filter, sender, message_buffer: VecDeque::new() }
Self { inner, message_filter, sender }
}
}

Expand Down Expand Up @@ -234,9 +233,6 @@ where
}

async fn recv(&mut self) -> SubsystemResult<FromOrchestra<Self::Message>> {
if let Some(msg) = self.message_buffer.pop_front() {
return Ok(msg)
}
loop {
let msg = self.inner.recv().await?;
if let Some(msg) = self.message_filter.intercept_incoming(self.inner.sender(), msg) {
Expand All @@ -245,19 +241,6 @@ where
}
}

async fn recv_signal(&mut self) -> SubsystemResult<Self::Signal> {
loop {
let msg = self.inner.recv().await?;
if let Some(msg) = self.message_filter.intercept_incoming(self.inner.sender(), msg) {
if let FromOrchestra::Signal(sig) = msg {
return Ok(sig)
} else {
self.message_buffer.push_back(msg)
}
}
}
}

fn spawn(
&mut self,
name: &'static str,
Expand Down
2 changes: 1 addition & 1 deletion polkadot/node/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../gum" }

metered = { package = "prioritized-metered-channel", version = "0.6.1", default-features = false, features = ["futures_channel"] }
metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features = ["futures_channel"] }
# Both `sc-service` and `sc-cli` are required by runtime metrics `logger_hook()`.
sc-service = { path = "../../../substrate/client/service" }
sc-cli = { path = "../../../substrate/client/cli" }
Expand Down
Loading

0 comments on commit e9bb9e5

Please sign in to comment.