Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

PVF validation host: do not alter niceness #4525

Merged
merged 1 commit into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ async-process = "1.3.0"
assert_matches = "1.4.0"
futures = "0.3.17"
futures-timer = "3.0.2"
libc = "0.2.109"
slotmap = "1.0"
tracing = "0.1.29"
pin-project = "1.0.8"
Expand Down
59 changes: 3 additions & 56 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,12 +486,6 @@ async fn handle_execute_pvf(
.await?;
},
ArtifactState::Preparing { waiting_for_response: _ } => {
send_prepare(
prepare_queue,
prepare::ToQueue::Amend { priority, artifact_id: artifact_id.clone() },
)
.await?;

awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
},
ArtifactState::FailedToProcess(error) => {
Expand Down Expand Up @@ -525,18 +519,17 @@ async fn handle_heads_up(
*last_time_needed = now;
},
ArtifactState::Preparing { waiting_for_response: _ } => {
// Already preparing. We don't need to send a priority amend either because
// it can't get any lower than the background.
// The artifact is already being prepared, so we don't need to do anything.
},
ArtifactState::FailedToProcess(_) => {},
}
} else {
// The artifact is unknown: register it and put a background job into the prepare queue.
// It's not in the artifacts, so we need to enqueue a job to prepare it.
artifacts.insert_preparing(artifact_id.clone(), Vec::new());

send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue { priority: Priority::Background, pvf: active_pvf },
prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf: active_pvf },
)
.await?;
}
Expand Down Expand Up @@ -923,48 +916,6 @@ mod tests {
test.poll_ensure_to_sweeper_is_empty().await;
}

#[async_std::test]
async fn amending_priority() {
let mut test = Builder::default().build();
let mut host = test.host_handle();

host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();

// Run until we receive a prepare request.
let prepare_q_rx = &mut test.to_prepare_queue_rx;
run_until(
&mut test.run,
async {
assert_matches!(
prepare_q_rx.next().await.unwrap(),
prepare::ToQueue::Enqueue { .. }
);
}
.boxed(),
)
.await;

let (result_tx, _result_rx) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
vec![],
Priority::Critical,
result_tx,
)
.await
.unwrap();

run_until(
&mut test.run,
async {
assert_matches!(prepare_q_rx.next().await.unwrap(), prepare::ToQueue::Amend { .. });
}
.boxed(),
)
.await;
}

#[async_std::test]
async fn execute_pvf_requests() {
let mut test = Builder::default().build();
Expand Down Expand Up @@ -1007,10 +958,6 @@ mod tests {
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Amend { .. }
);
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
Expand Down
22 changes: 3 additions & 19 deletions node/core/pvf/src/prepare/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ pub enum ToPool {
/// this message is processed.
Kill(Worker),

/// If the given worker was started with the background priority, then it will be raised up to
/// normal priority. Otherwise, it's no-op.
BumpPriority(Worker),

/// Request the given worker to start working on the given code.
///
/// Once the job either succeeded or failed, a [`FromPool::Concluded`] message will be sent back.
Expand All @@ -65,12 +61,7 @@ pub enum ToPool {
///
/// In either case, the worker is considered busy and no further `StartWork` messages should be
/// sent until either `Concluded` or `Rip` message is received.
StartWork {
worker: Worker,
code: Arc<Vec<u8>>,
artifact_path: PathBuf,
background_priority: bool,
},
StartWork { worker: Worker, code: Arc<Vec<u8>>, artifact_path: PathBuf },
}

/// A message sent from pool to its client.
Expand Down Expand Up @@ -214,7 +205,7 @@ fn handle_to_pool(
metrics.prepare_worker().on_begin_spawn();
mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
},
ToPool::StartWork { worker, code, artifact_path, background_priority } => {
ToPool::StartWork { worker, code, artifact_path } => {
if let Some(data) = spawned.get_mut(worker) {
if let Some(idle) = data.idle.take() {
let preparation_timer = metrics.time_preparation();
Expand All @@ -225,7 +216,6 @@ fn handle_to_pool(
code,
cache_path.to_owned(),
artifact_path,
background_priority,
preparation_timer,
)
.boxed(),
Expand All @@ -248,10 +238,6 @@ fn handle_to_pool(
// It may be absent if it were previously already removed by `purge_dead`.
let _ = attempt_retire(metrics, spawned, worker);
},
ToPool::BumpPriority(worker) =>
if let Some(data) = spawned.get(worker) {
worker::bump_priority(&data.handle);
},
}
}

Expand All @@ -277,11 +263,9 @@ async fn start_work_task<Timer>(
code: Arc<Vec<u8>>,
cache_path: PathBuf,
artifact_path: PathBuf,
background_priority: bool,
_preparation_timer: Option<Timer>,
) -> PoolEvent {
let outcome =
worker::start_work(idle, code, &cache_path, artifact_path, background_priority).await;
let outcome = worker::start_work(idle, code, &cache_path, artifact_path).await;
PoolEvent::StartWork(worker, outcome)
}

Expand Down
80 changes: 5 additions & 75 deletions node/core/pvf/src/prepare/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ pub enum ToQueue {
/// This schedules preparation of the given PVF.
///
/// Note that it is incorrect to enqueue the same PVF again without first receiving the
/// [`FromQueue`] response. In case there is a need to bump the priority, use
/// [`ToQueue::Amend`].
/// [`FromQueue`] response.
Enqueue { priority: Priority, pvf: Pvf },
/// Amends the priority for the given [`ArtifactId`] if it is running. If it's not, then it's noop.
Amend { priority: Priority, artifact_id: ArtifactId },
}

/// A response from queue.
Expand Down Expand Up @@ -97,15 +94,13 @@ impl WorkerData {
/// there is going to be a limited number of critical jobs and we don't really care if background starve.
#[derive(Default)]
struct Unscheduled {
background: VecDeque<Job>,
normal: VecDeque<Job>,
critical: VecDeque<Job>,
}

impl Unscheduled {
fn queue_mut(&mut self, prio: Priority) -> &mut VecDeque<Job> {
match prio {
Priority::Background => &mut self.background,
Priority::Normal => &mut self.normal,
Priority::Critical => &mut self.critical,
}
Expand All @@ -120,14 +115,12 @@ impl Unscheduled {
}

fn is_empty(&self) -> bool {
self.background.is_empty() && self.normal.is_empty() && self.critical.is_empty()
self.normal.is_empty() && self.critical.is_empty()
}

fn next(&mut self) -> Option<Job> {
let mut check = |prio: Priority| self.queue_mut(prio).pop_front();
check(Priority::Critical)
.or_else(|| check(Priority::Normal))
.or_else(|| check(Priority::Background))
check(Priority::Critical).or_else(|| check(Priority::Normal))
}
}

Expand Down Expand Up @@ -213,9 +206,6 @@ async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fat
ToQueue::Enqueue { priority, pvf } => {
handle_enqueue(queue, priority, pvf).await?;
},
ToQueue::Amend { priority, artifact_id } => {
handle_amend(queue, priority, artifact_id).await?;
},
}
Ok(())
}
Expand Down Expand Up @@ -265,41 +255,6 @@ fn find_idle_worker(queue: &mut Queue) -> Option<Worker> {
queue.workers.iter().filter(|(_, data)| data.is_idle()).map(|(k, _)| k).next()
}

async fn handle_amend(
queue: &mut Queue,
priority: Priority,
artifact_id: ArtifactId,
) -> Result<(), Fatal> {
if let Some(&job) = queue.artifact_id_to_job.get(&artifact_id) {
tracing::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact_id.code_hash,
?priority,
"amending preparation priority.",
);

let mut job_data: &mut JobData = &mut queue.jobs[job];
if job_data.priority < priority {
// The new priority is higher. We should do two things:
// - if the worker was already spawned with the background prio and the new one is not
// (it's already the case, if we are in this branch but we still do the check for
// clarity), then we should tell the pool to bump the priority for the worker.
//
// - save the new priority in the job.

if let Some(worker) = job_data.worker {
if job_data.priority.is_background() && !priority.is_background() {
send_pool(&mut queue.to_pool_tx, pool::ToPool::BumpPriority(worker)).await?;
}
}

job_data.priority = priority;
}
}

Ok(())
}

async fn handle_from_pool(queue: &mut Queue, from_pool: pool::FromPool) -> Result<(), Fatal> {
use pool::FromPool::*;
match from_pool {
Expand Down Expand Up @@ -469,12 +424,7 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal

send_pool(
&mut queue.to_pool_tx,
pool::ToPool::StartWork {
worker,
code: job_data.pvf.code.clone(),
artifact_path,
background_priority: job_data.priority.is_background(),
},
pool::ToPool::StartWork { worker, code: job_data.pvf.code.clone(), artifact_path },
)
.await?;

Expand Down Expand Up @@ -644,7 +594,7 @@ mod tests {
async fn properly_concludes() {
let mut test = Test::new(2, 2);

test.send_queue(ToQueue::Enqueue { priority: Priority::Background, pvf: pvf(1) });
test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) });
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);

let w = test.workers.insert(());
Expand Down Expand Up @@ -713,26 +663,6 @@ mod tests {
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1));
}

#[async_std::test]
async fn bump_prio_on_urgency_change() {
let mut test = Test::new(2, 2);

test.send_queue(ToQueue::Enqueue { priority: Priority::Background, pvf: pvf(1) });

assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);

let w = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w));

assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
test.send_queue(ToQueue::Amend {
priority: Priority::Normal,
artifact_id: pvf(1).as_artifact_id(),
});

assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::BumpPriority(w));
}

#[async_std::test]
async fn worker_mass_die_out_doesnt_stall_queue() {
let mut test = Test::new(2, 2);
Expand Down
37 changes: 2 additions & 35 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ use parity_scale_codec::{Decode, Encode};
use sp_core::hexdisplay::HexDisplay;
use std::{any::Any, panic, sync::Arc, time::Duration};

const NICENESS_BACKGROUND: i32 = 10;
const NICENESS_FOREGROUND: i32 = 0;

/// The time period after which the preparation worker is considered unresponsive and will be killed.
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
const COMPILATION_TIMEOUT: Duration = Duration::from_secs(60);
Expand Down Expand Up @@ -72,22 +69,16 @@ pub async fn start_work(
code: Arc<Vec<u8>>,
cache_path: &Path,
artifact_path: PathBuf,
background_priority: bool,
) -> Outcome {
let IdleWorker { mut stream, pid } = worker;

tracing::debug!(
target: LOG_TARGET,
worker_pid = %pid,
%background_priority,
"starting prepare for {}",
artifact_path.display(),
);

if background_priority {
renice(pid, NICENESS_BACKGROUND);
}

with_tmp_file(pid, cache_path, |tmp_file| async move {
if let Err(err) = send_request(&mut stream, code, &tmp_file).await {
tracing::warn!(
Expand Down Expand Up @@ -172,10 +163,8 @@ pub async fn start_work(
};

match selected {
Selected::Done(result) => {
renice(pid, NICENESS_FOREGROUND);
Outcome::Concluded { worker: IdleWorker { stream, pid }, result }
},
Selected::Done(result) =>
Outcome::Concluded { worker: IdleWorker { stream, pid }, result },
Selected::Deadline => Outcome::TimedOut,
Selected::IoErr => Outcome::DidNotMakeIt,
}
Expand Down Expand Up @@ -250,28 +239,6 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf)>
Ok((code, tmp_file))
}

pub fn bump_priority(handle: &WorkerHandle) {
let pid = handle.id();
renice(pid, NICENESS_FOREGROUND);
}

fn renice(pid: u32, niceness: i32) {
tracing::debug!(
target: LOG_TARGET,
worker_pid = %pid,
"changing niceness to {}",
niceness,
);

// Consider upstreaming this to the `nix` crate.
unsafe {
if -1 == libc::setpriority(libc::PRIO_PROCESS, pid, niceness) {
let err = std::io::Error::last_os_error();
tracing::warn!(target: LOG_TARGET, "failed to set the priority: {:?}", err);
}
}
}

/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host.
pub fn worker_entrypoint(socket_path: &str) {
Expand Down
Loading