Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make candidate validation bounded again #2125

Merged
merged 19 commits into from
Jan 21, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

218 changes: 120 additions & 98 deletions polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ use polkadot_primitives::{

use parity_scale_codec::Encode;

use futures::{channel::oneshot, prelude::*};
use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered};

use std::{
path::PathBuf,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
Expand All @@ -81,6 +82,8 @@ const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3);
#[cfg(test)]
const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);

const TASK_LIMIT: usize = 30;
s0me0ne-unkn0wn marked this conversation as resolved.
Show resolved Hide resolved

/// Configuration for the candidate validation subsystem
#[derive(Clone)]
pub struct Config {
Expand Down Expand Up @@ -130,6 +133,83 @@ impl<Context> CandidateValidationSubsystem {
}
}

fn handle_validation_message<S>(
mut sender: S,
validation_host: ValidationHost,
metrics: Metrics,
msg: CandidateValidationMessage,
) -> Pin<Box<dyn Future<Output = ()> + Send>>
where
S: SubsystemSender<RuntimeApiMessage>,
{
match msg {
CandidateValidationMessage::ValidateFromChainState {
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => async move {
let _timer = metrics.time_validate_from_chain_state();
let res = validate_from_chain_state(
&mut sender,
validation_host,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
.boxed(),
CandidateValidationMessage::ValidateFromExhaustive {
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => async move {
let _timer = metrics.time_validate_from_exhaustive();
let res = validate_candidate_exhaustive(
validation_host,
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
.boxed(),
CandidateValidationMessage::PreCheck {
relay_parent,
validation_code_hash,
response_sender,
..
} => async move {
let precheck_result =
precheck_pvf(&mut sender, validation_host, relay_parent, validation_code_hash)
.await;

let _ = response_sender.send(precheck_result);
}
.boxed(),
}
}

#[overseer::contextbounds(CandidateValidation, prefix = self::overseer)]
async fn run<Context>(
mut ctx: Context,
Expand All @@ -156,106 +236,48 @@ async fn run<Context>(
.await?;
ctx.spawn_blocking("pvf-validation-host", task.boxed())?;

let mut tasks = FuturesUnordered::new();

loop {
match ctx.recv().await? {
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_)) => {},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Communication { msg } => match msg {
CandidateValidationMessage::ValidateFromChainState {
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => {
let bg = {
let mut sender = ctx.sender().clone();
let metrics = metrics.clone();
let validation_host = validation_host.clone();

async move {
let _timer = metrics.time_validate_from_chain_state();
let res = validate_from_chain_state(
&mut sender,
validation_host,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
};

ctx.spawn("validate-from-chain-state", bg.boxed())?;
},
CandidateValidationMessage::ValidateFromExhaustive {
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => {
let bg = {
let metrics = metrics.clone();
let validation_host = validation_host.clone();

async move {
let _timer = metrics.time_validate_from_exhaustive();
let res = validate_candidate_exhaustive(
validation_host,
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
};

ctx.spawn("validate-from-exhaustive", bg.boxed())?;
loop {
futures::select! {
comm = ctx.recv().fuse() => {
match comm {
Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_))) => {},
Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {},
Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()),
Ok(FromOrchestra::Communication { msg }) => {
let task = handle_validation_message(ctx.sender().clone(), validation_host.clone(), metrics.clone(), msg);
tasks.push(task);
if tasks.len() >= TASK_LIMIT {
break
}
},
Err(e) => return Err(SubsystemError::from(e)),
}
},
CandidateValidationMessage::PreCheck {
relay_parent,
validation_code_hash,
response_sender,
..
} => {
let bg = {
let mut sender = ctx.sender().clone();
let validation_host = validation_host.clone();

async move {
let precheck_result = precheck_pvf(
&mut sender,
validation_host,
relay_parent,
validation_code_hash,
)
.await;

let _ = response_sender.send(precheck_result);
}
};

ctx.spawn("candidate-validation-pre-check", bg.boxed())?;
_ = tasks.select_next_some() => ()
}
}

gum::debug!(target: LOG_TARGET, "Validation task limit hit");

loop {
futures::select! {
signal = ctx.recv_signal().fuse() => {
match signal {
Ok(OverseerSignal::ActiveLeaves(_)) => {},
Ok(OverseerSignal::BlockFinalized(..)) => {},
Ok(OverseerSignal::Conclude) => return Ok(()),
Err(e) => return Err(SubsystemError::from(e)),
}
},
},
_ = tasks.select_next_some() => {
if tasks.len() < TASK_LIMIT {
break
}
}
}
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion polkadot/node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ pub const PREPARE_BINARY_NAME: &str = "polkadot-prepare-worker";
/// The name of binary spawned to execute a PVF
pub const EXECUTE_BINARY_NAME: &str = "polkadot-execute-worker";

/// The size of incoming message queue
pub const HOST_MESSAGE_QUEUE_SIZE: usize = 10;

/// An alias to not spell the type for the oneshot sender for the PVF execution result.
pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;

Expand Down Expand Up @@ -227,7 +230,7 @@ pub async fn start(
Err(err) => return Err(SubsystemError::Context(err)),
};

let (to_host_tx, to_host_rx) = mpsc::channel(10);
let (to_host_tx, to_host_rx) = mpsc::channel(HOST_MESSAGE_QUEUE_SIZE);

let validation_host = ValidationHost { to_host_tx, security_status: security_status.clone() };

Expand Down
5 changes: 4 additions & 1 deletion polkadot/node/core/pvf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ mod worker_interface;
pub mod testing;

pub use error::{InvalidCandidate, PossiblyInvalidError, ValidationError};
pub use host::{start, Config, ValidationHost, EXECUTE_BINARY_NAME, PREPARE_BINARY_NAME};
pub use host::{
start, Config, ValidationHost, EXECUTE_BINARY_NAME, HOST_MESSAGE_QUEUE_SIZE,
PREPARE_BINARY_NAME,
};
pub use metrics::Metrics;
pub use priority::Priority;
pub use worker_interface::{framed_recv, framed_send, JOB_TIMEOUT_WALL_CLOCK_FACTOR};
Expand Down
Loading
Loading