Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
PVF validation host: do not alter niceness (#4525)
Browse files Browse the repository at this point in the history
We wanted to change niceness to accomodate the fact that some of the
preparation tasks are low priority. For example, when a node sees that
there is a new para was onboarded the node may start preparing right
away. Since all other activities are more important, such as network I/O
or validation of the backed candidates and preparation of the
immediatelly needed PVFs.

However, it turned out that this approach does not work: generally
non-root processes can only decrease niceness and they cannot increase
it to the previous value, as was assumed by the code.

Apart from that, #4123
assumes all PVFs are prepared in the same way. Specifically, that if a
PVF preparation failed before, then PVF pre-checking will also report
that it was failed, even though it could happen that preparation failed
due to being low-priority. In order to avoid such cases, we decided to
simplify the whole preparation model. Preparation under low priority
does not work well with that.

Closes #4520
  • Loading branch information
pepyakin committed Dec 14, 2021
1 parent ed480c5 commit 69b4791
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 197 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ async-process = "1.3.0"
assert_matches = "1.4.0"
futures = "0.3.17"
futures-timer = "3.0.2"
libc = "0.2.109"
slotmap = "1.0"
tracing = "0.1.29"
pin-project = "1.0.8"
Expand Down
59 changes: 3 additions & 56 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,12 +486,6 @@ async fn handle_execute_pvf(
.await?;
},
ArtifactState::Preparing { waiting_for_response: _ } => {
send_prepare(
prepare_queue,
prepare::ToQueue::Amend { priority, artifact_id: artifact_id.clone() },
)
.await?;

awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
},
ArtifactState::FailedToProcess(error) => {
Expand Down Expand Up @@ -525,18 +519,17 @@ async fn handle_heads_up(
*last_time_needed = now;
},
ArtifactState::Preparing { waiting_for_response: _ } => {
// Already preparing. We don't need to send a priority amend either because
// it can't get any lower than the background.
// The artifact is already being prepared, so we don't need to do anything.
},
ArtifactState::FailedToProcess(_) => {},
}
} else {
// The artifact is unknown: register it and put a background job into the prepare queue.
// It's not in the artifacts, so we need to enqueue a job to prepare it.
artifacts.insert_preparing(artifact_id.clone(), Vec::new());

send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue { priority: Priority::Background, pvf: active_pvf },
prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf: active_pvf },
)
.await?;
}
Expand Down Expand Up @@ -923,48 +916,6 @@ mod tests {
test.poll_ensure_to_sweeper_is_empty().await;
}

#[async_std::test]
async fn amending_priority() {
let mut test = Builder::default().build();
let mut host = test.host_handle();

host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();

// Run until we receive a prepare request.
let prepare_q_rx = &mut test.to_prepare_queue_rx;
run_until(
&mut test.run,
async {
assert_matches!(
prepare_q_rx.next().await.unwrap(),
prepare::ToQueue::Enqueue { .. }
);
}
.boxed(),
)
.await;

let (result_tx, _result_rx) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
vec![],
Priority::Critical,
result_tx,
)
.await
.unwrap();

run_until(
&mut test.run,
async {
assert_matches!(prepare_q_rx.next().await.unwrap(), prepare::ToQueue::Amend { .. });
}
.boxed(),
)
.await;
}

#[async_std::test]
async fn execute_pvf_requests() {
let mut test = Builder::default().build();
Expand Down Expand Up @@ -1007,10 +958,6 @@ mod tests {
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Amend { .. }
);
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
Expand Down
22 changes: 3 additions & 19 deletions node/core/pvf/src/prepare/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ pub enum ToPool {
/// this message is processed.
Kill(Worker),

/// If the given worker was started with the background priority, then it will be raised up to
/// normal priority. Otherwise, it's no-op.
BumpPriority(Worker),

/// Request the given worker to start working on the given code.
///
/// Once the job either succeeded or failed, a [`FromPool::Concluded`] message will be sent back.
Expand All @@ -65,12 +61,7 @@ pub enum ToPool {
///
/// In either case, the worker is considered busy and no further `StartWork` messages should be
/// sent until either `Concluded` or `Rip` message is received.
StartWork {
worker: Worker,
code: Arc<Vec<u8>>,
artifact_path: PathBuf,
background_priority: bool,
},
StartWork { worker: Worker, code: Arc<Vec<u8>>, artifact_path: PathBuf },
}

/// A message sent from pool to its client.
Expand Down Expand Up @@ -214,7 +205,7 @@ fn handle_to_pool(
metrics.prepare_worker().on_begin_spawn();
mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
},
ToPool::StartWork { worker, code, artifact_path, background_priority } => {
ToPool::StartWork { worker, code, artifact_path } => {
if let Some(data) = spawned.get_mut(worker) {
if let Some(idle) = data.idle.take() {
let preparation_timer = metrics.time_preparation();
Expand All @@ -225,7 +216,6 @@ fn handle_to_pool(
code,
cache_path.to_owned(),
artifact_path,
background_priority,
preparation_timer,
)
.boxed(),
Expand All @@ -248,10 +238,6 @@ fn handle_to_pool(
// It may be absent if it were previously already removed by `purge_dead`.
let _ = attempt_retire(metrics, spawned, worker);
},
ToPool::BumpPriority(worker) =>
if let Some(data) = spawned.get(worker) {
worker::bump_priority(&data.handle);
},
}
}

Expand All @@ -277,11 +263,9 @@ async fn start_work_task<Timer>(
code: Arc<Vec<u8>>,
cache_path: PathBuf,
artifact_path: PathBuf,
background_priority: bool,
_preparation_timer: Option<Timer>,
) -> PoolEvent {
let outcome =
worker::start_work(idle, code, &cache_path, artifact_path, background_priority).await;
let outcome = worker::start_work(idle, code, &cache_path, artifact_path).await;
PoolEvent::StartWork(worker, outcome)
}

Expand Down
80 changes: 5 additions & 75 deletions node/core/pvf/src/prepare/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ pub enum ToQueue {
/// This schedules preparation of the given PVF.
///
/// Note that it is incorrect to enqueue the same PVF again without first receiving the
/// [`FromQueue`] response. In case there is a need to bump the priority, use
/// [`ToQueue::Amend`].
/// [`FromQueue`] response.
Enqueue { priority: Priority, pvf: Pvf },
/// Amends the priority for the given [`ArtifactId`] if it is running. If it's not, then it's noop.
Amend { priority: Priority, artifact_id: ArtifactId },
}

/// A response from queue.
Expand Down Expand Up @@ -97,15 +94,13 @@ impl WorkerData {
/// there is going to be a limited number of critical jobs and we don't really care if background starve.
#[derive(Default)]
struct Unscheduled {
background: VecDeque<Job>,
normal: VecDeque<Job>,
critical: VecDeque<Job>,
}

impl Unscheduled {
fn queue_mut(&mut self, prio: Priority) -> &mut VecDeque<Job> {
match prio {
Priority::Background => &mut self.background,
Priority::Normal => &mut self.normal,
Priority::Critical => &mut self.critical,
}
Expand All @@ -120,14 +115,12 @@ impl Unscheduled {
}

fn is_empty(&self) -> bool {
self.background.is_empty() && self.normal.is_empty() && self.critical.is_empty()
self.normal.is_empty() && self.critical.is_empty()
}

fn next(&mut self) -> Option<Job> {
let mut check = |prio: Priority| self.queue_mut(prio).pop_front();
check(Priority::Critical)
.or_else(|| check(Priority::Normal))
.or_else(|| check(Priority::Background))
check(Priority::Critical).or_else(|| check(Priority::Normal))
}
}

Expand Down Expand Up @@ -213,9 +206,6 @@ async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fat
ToQueue::Enqueue { priority, pvf } => {
handle_enqueue(queue, priority, pvf).await?;
},
ToQueue::Amend { priority, artifact_id } => {
handle_amend(queue, priority, artifact_id).await?;
},
}
Ok(())
}
Expand Down Expand Up @@ -265,41 +255,6 @@ fn find_idle_worker(queue: &mut Queue) -> Option<Worker> {
queue.workers.iter().filter(|(_, data)| data.is_idle()).map(|(k, _)| k).next()
}

async fn handle_amend(
queue: &mut Queue,
priority: Priority,
artifact_id: ArtifactId,
) -> Result<(), Fatal> {
if let Some(&job) = queue.artifact_id_to_job.get(&artifact_id) {
tracing::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact_id.code_hash,
?priority,
"amending preparation priority.",
);

let mut job_data: &mut JobData = &mut queue.jobs[job];
if job_data.priority < priority {
// The new priority is higher. We should do two things:
// - if the worker was already spawned with the background prio and the new one is not
// (it's already the case, if we are in this branch but we still do the check for
// clarity), then we should tell the pool to bump the priority for the worker.
//
// - save the new priority in the job.

if let Some(worker) = job_data.worker {
if job_data.priority.is_background() && !priority.is_background() {
send_pool(&mut queue.to_pool_tx, pool::ToPool::BumpPriority(worker)).await?;
}
}

job_data.priority = priority;
}
}

Ok(())
}

async fn handle_from_pool(queue: &mut Queue, from_pool: pool::FromPool) -> Result<(), Fatal> {
use pool::FromPool::*;
match from_pool {
Expand Down Expand Up @@ -469,12 +424,7 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal

send_pool(
&mut queue.to_pool_tx,
pool::ToPool::StartWork {
worker,
code: job_data.pvf.code.clone(),
artifact_path,
background_priority: job_data.priority.is_background(),
},
pool::ToPool::StartWork { worker, code: job_data.pvf.code.clone(), artifact_path },
)
.await?;

Expand Down Expand Up @@ -644,7 +594,7 @@ mod tests {
async fn properly_concludes() {
let mut test = Test::new(2, 2);

test.send_queue(ToQueue::Enqueue { priority: Priority::Background, pvf: pvf(1) });
test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) });
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);

let w = test.workers.insert(());
Expand Down Expand Up @@ -713,26 +663,6 @@ mod tests {
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1));
}

#[async_std::test]
async fn bump_prio_on_urgency_change() {
let mut test = Test::new(2, 2);

test.send_queue(ToQueue::Enqueue { priority: Priority::Background, pvf: pvf(1) });

assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);

let w = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w));

assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
test.send_queue(ToQueue::Amend {
priority: Priority::Normal,
artifact_id: pvf(1).as_artifact_id(),
});

assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::BumpPriority(w));
}

#[async_std::test]
async fn worker_mass_die_out_doesnt_stall_queue() {
let mut test = Test::new(2, 2);
Expand Down
37 changes: 2 additions & 35 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ use parity_scale_codec::{Decode, Encode};
use sp_core::hexdisplay::HexDisplay;
use std::{any::Any, panic, sync::Arc, time::Duration};

const NICENESS_BACKGROUND: i32 = 10;
const NICENESS_FOREGROUND: i32 = 0;

/// The time period after which the preparation worker is considered unresponsive and will be killed.
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
const COMPILATION_TIMEOUT: Duration = Duration::from_secs(60);
Expand Down Expand Up @@ -72,22 +69,16 @@ pub async fn start_work(
code: Arc<Vec<u8>>,
cache_path: &Path,
artifact_path: PathBuf,
background_priority: bool,
) -> Outcome {
let IdleWorker { mut stream, pid } = worker;

tracing::debug!(
target: LOG_TARGET,
worker_pid = %pid,
%background_priority,
"starting prepare for {}",
artifact_path.display(),
);

if background_priority {
renice(pid, NICENESS_BACKGROUND);
}

with_tmp_file(pid, cache_path, |tmp_file| async move {
if let Err(err) = send_request(&mut stream, code, &tmp_file).await {
tracing::warn!(
Expand Down Expand Up @@ -172,10 +163,8 @@ pub async fn start_work(
};

match selected {
Selected::Done(result) => {
renice(pid, NICENESS_FOREGROUND);
Outcome::Concluded { worker: IdleWorker { stream, pid }, result }
},
Selected::Done(result) =>
Outcome::Concluded { worker: IdleWorker { stream, pid }, result },
Selected::Deadline => Outcome::TimedOut,
Selected::IoErr => Outcome::DidNotMakeIt,
}
Expand Down Expand Up @@ -250,28 +239,6 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf)>
Ok((code, tmp_file))
}

pub fn bump_priority(handle: &WorkerHandle) {
let pid = handle.id();
renice(pid, NICENESS_FOREGROUND);
}

fn renice(pid: u32, niceness: i32) {
tracing::debug!(
target: LOG_TARGET,
worker_pid = %pid,
"changing niceness to {}",
niceness,
);

// Consider upstreaming this to the `nix` crate.
unsafe {
if -1 == libc::setpriority(libc::PRIO_PROCESS, pid, niceness) {
let err = std::io::Error::last_os_error();
tracing::warn!(target: LOG_TARGET, "failed to set the priority: {:?}", err);
}
}
}

/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host.
pub fn worker_entrypoint(socket_path: &str) {
Expand Down
Loading

0 comments on commit 69b4791

Please sign in to comment.