Skip to content

Commit

Permalink
Make candidate validation bounded again (#2125)
Browse files Browse the repository at this point in the history
This PR aims to channel the backpressure of the PVF host's preparation
and execution queues to the candidate validation subsystem consumers.

Related: #708
  • Loading branch information
s0me0ne-unkn0wn authored Jan 21, 2024
1 parent 21ef949 commit d37a456
Show file tree
Hide file tree
Showing 13 changed files with 201 additions and 115 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cumulus/client/relay-chain-inprocess-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ sp-keyring = { path = "../../../substrate/primitives/keyring" }
# Polkadot
polkadot-primitives = { path = "../../../polkadot/primitives" }
polkadot-test-client = { path = "../../../polkadot/node/test/client" }
metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features = ["futures_channel"] }
metered = { package = "prioritized-metered-channel", version = "0.6.1", default-features = false, features = ["futures_channel"] }

# Cumulus
cumulus-test-service = { path = "../../test/service" }
221 changes: 123 additions & 98 deletions polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ use polkadot_primitives::{

use parity_scale_codec::Encode;

use futures::{channel::oneshot, prelude::*};
use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered};

use std::{
path::PathBuf,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
Expand All @@ -81,6 +82,11 @@ const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3);
#[cfg(test)]
const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);

// The task queue size is chosen to be somewhat bigger than the PVF host incoming queue size
// to allow exhaustive validation messages to fall through in case the tasks are clogged with
// `ValidateFromChainState` messages awaiting data from the runtime
const TASK_LIMIT: usize = 30;

/// Configuration for the candidate validation subsystem
#[derive(Clone)]
pub struct Config {
Expand Down Expand Up @@ -130,6 +136,83 @@ impl<Context> CandidateValidationSubsystem {
}
}

fn handle_validation_message<S>(
mut sender: S,
validation_host: ValidationHost,
metrics: Metrics,
msg: CandidateValidationMessage,
) -> Pin<Box<dyn Future<Output = ()> + Send>>
where
S: SubsystemSender<RuntimeApiMessage>,
{
match msg {
CandidateValidationMessage::ValidateFromChainState {
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => async move {
let _timer = metrics.time_validate_from_chain_state();
let res = validate_from_chain_state(
&mut sender,
validation_host,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
.boxed(),
CandidateValidationMessage::ValidateFromExhaustive {
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => async move {
let _timer = metrics.time_validate_from_exhaustive();
let res = validate_candidate_exhaustive(
validation_host,
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
.boxed(),
CandidateValidationMessage::PreCheck {
relay_parent,
validation_code_hash,
response_sender,
..
} => async move {
let precheck_result =
precheck_pvf(&mut sender, validation_host, relay_parent, validation_code_hash)
.await;

let _ = response_sender.send(precheck_result);
}
.boxed(),
}
}

#[overseer::contextbounds(CandidateValidation, prefix = self::overseer)]
async fn run<Context>(
mut ctx: Context,
Expand All @@ -156,106 +239,48 @@ async fn run<Context>(
.await?;
ctx.spawn_blocking("pvf-validation-host", task.boxed())?;

let mut tasks = FuturesUnordered::new();

loop {
match ctx.recv().await? {
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_)) => {},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Communication { msg } => match msg {
CandidateValidationMessage::ValidateFromChainState {
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => {
let bg = {
let mut sender = ctx.sender().clone();
let metrics = metrics.clone();
let validation_host = validation_host.clone();

async move {
let _timer = metrics.time_validate_from_chain_state();
let res = validate_from_chain_state(
&mut sender,
validation_host,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
};

ctx.spawn("validate-from-chain-state", bg.boxed())?;
},
CandidateValidationMessage::ValidateFromExhaustive {
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => {
let bg = {
let metrics = metrics.clone();
let validation_host = validation_host.clone();

async move {
let _timer = metrics.time_validate_from_exhaustive();
let res = validate_candidate_exhaustive(
validation_host,
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
};

ctx.spawn("validate-from-exhaustive", bg.boxed())?;
loop {
futures::select! {
comm = ctx.recv().fuse() => {
match comm {
Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_))) => {},
Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {},
Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()),
Ok(FromOrchestra::Communication { msg }) => {
let task = handle_validation_message(ctx.sender().clone(), validation_host.clone(), metrics.clone(), msg);
tasks.push(task);
if tasks.len() >= TASK_LIMIT {
break
}
},
Err(e) => return Err(SubsystemError::from(e)),
}
},
CandidateValidationMessage::PreCheck {
relay_parent,
validation_code_hash,
response_sender,
..
} => {
let bg = {
let mut sender = ctx.sender().clone();
let validation_host = validation_host.clone();

async move {
let precheck_result = precheck_pvf(
&mut sender,
validation_host,
relay_parent,
validation_code_hash,
)
.await;

let _ = response_sender.send(precheck_result);
}
};

ctx.spawn("candidate-validation-pre-check", bg.boxed())?;
_ = tasks.select_next_some() => ()
}
}

gum::debug!(target: LOG_TARGET, "Validation task limit hit");

loop {
futures::select! {
signal = ctx.recv_signal().fuse() => {
match signal {
Ok(OverseerSignal::ActiveLeaves(_)) => {},
Ok(OverseerSignal::BlockFinalized(..)) => {},
Ok(OverseerSignal::Conclude) => return Ok(()),
Err(e) => return Err(SubsystemError::from(e)),
}
},
},
_ = tasks.select_next_some() => {
if tasks.len() < TASK_LIMIT {
break
}
}
}
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion polkadot/node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ pub const PREPARE_BINARY_NAME: &str = "polkadot-prepare-worker";
/// The name of binary spawned to execute a PVF
pub const EXECUTE_BINARY_NAME: &str = "polkadot-execute-worker";

/// The size of incoming message queue
pub const HOST_MESSAGE_QUEUE_SIZE: usize = 10;

/// An alias to not spell the type for the oneshot sender for the PVF execution result.
pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;

Expand Down Expand Up @@ -227,7 +230,7 @@ pub async fn start(
Err(err) => return Err(SubsystemError::Context(err)),
};

let (to_host_tx, to_host_rx) = mpsc::channel(10);
let (to_host_tx, to_host_rx) = mpsc::channel(HOST_MESSAGE_QUEUE_SIZE);

let validation_host = ValidationHost { to_host_tx, security_status: security_status.clone() };

Expand Down
5 changes: 4 additions & 1 deletion polkadot/node/core/pvf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ mod worker_interface;
pub mod testing;

pub use error::{InvalidCandidate, PossiblyInvalidError, ValidationError};
pub use host::{start, Config, ValidationHost, EXECUTE_BINARY_NAME, PREPARE_BINARY_NAME};
pub use host::{
start, Config, ValidationHost, EXECUTE_BINARY_NAME, HOST_MESSAGE_QUEUE_SIZE,
PREPARE_BINARY_NAME,
};
pub use metrics::Metrics;
pub use priority::Priority;
pub use worker_interface::{framed_recv, framed_send, JOB_TIMEOUT_WALL_CLOCK_FACTOR};
Expand Down
21 changes: 19 additions & 2 deletions polkadot/node/malus/src/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
use polkadot_node_subsystem::*;
pub use polkadot_node_subsystem::{messages::*, overseer, FromOrchestra};
use std::{future::Future, pin::Pin};
use std::{collections::VecDeque, future::Future, pin::Pin};

/// Filter incoming and outgoing messages.
pub trait MessageInterceptor<Sender>: Send + Sync + Clone + 'static
Expand Down Expand Up @@ -170,6 +170,7 @@ where
inner: Context,
message_filter: Fil,
sender: InterceptedSender<<Context as overseer::SubsystemContext>::Sender, Fil>,
message_buffer: VecDeque<FromOrchestra<<Context as overseer::SubsystemContext>::Message>>,
}

impl<Context, Fil> InterceptedContext<Context, Fil>
Expand All @@ -189,7 +190,7 @@ where
inner: inner.sender().clone(),
message_filter: message_filter.clone(),
};
Self { inner, message_filter, sender }
Self { inner, message_filter, sender, message_buffer: VecDeque::new() }
}
}

Expand Down Expand Up @@ -233,6 +234,9 @@ where
}

async fn recv(&mut self) -> SubsystemResult<FromOrchestra<Self::Message>> {
if let Some(msg) = self.message_buffer.pop_front() {
return Ok(msg)
}
loop {
let msg = self.inner.recv().await?;
if let Some(msg) = self.message_filter.intercept_incoming(self.inner.sender(), msg) {
Expand All @@ -241,6 +245,19 @@ where
}
}

async fn recv_signal(&mut self) -> SubsystemResult<Self::Signal> {
loop {
let msg = self.inner.recv().await?;
if let Some(msg) = self.message_filter.intercept_incoming(self.inner.sender(), msg) {
if let FromOrchestra::Signal(sig) = msg {
return Ok(sig)
} else {
self.message_buffer.push_back(msg)
}
}
}
}

fn spawn(
&mut self,
name: &'static str,
Expand Down
2 changes: 1 addition & 1 deletion polkadot/node/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../gum" }

metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features = ["futures_channel"] }
metered = { package = "prioritized-metered-channel", version = "0.6.1", default-features = false, features = ["futures_channel"] }
# Both `sc-service` and `sc-cli` are required by runtime metrics `logger_hook()`.
sc-service = { path = "../../../substrate/client/service" }
sc-cli = { path = "../../../substrate/client/cli" }
Expand Down
Loading

0 comments on commit d37a456

Please sign in to comment.