Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cattibrie committed Jun 28, 2019
1 parent f778a37 commit 1cb27b3
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 53 deletions.
4 changes: 2 additions & 2 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl AddAssign<UploadSummary> for ExecutionStats {
}

pub trait CommandRunner: Send + Sync {
fn run(&self, req: ExecuteProcessRequest, workunit_store: Arc<WorkUnitStore>) -> BoxFuture<FallibleExecuteProcessResult, String>;
fn run(&self, req: ExecuteProcessRequest, workunit_store: WorkUnitStore) -> BoxFuture<FallibleExecuteProcessResult, String>;
}

///
Expand All @@ -154,7 +154,7 @@ impl BoundedCommandRunner {
}

impl CommandRunner for BoundedCommandRunner {
fn run(&self, req: ExecuteProcessRequest, workunit_store: Arc<WorkUnitStore>) -> BoxFuture<FallibleExecuteProcessResult, String> {
fn run(&self, req: ExecuteProcessRequest, workunit_store: WorkUnitStore) -> BoxFuture<FallibleExecuteProcessResult, String> {
let inner = self.inner.clone();
self.inner.1.with_acquired(move || inner.0.run(req, workunit_store))
}
Expand Down
9 changes: 5 additions & 4 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ impl super::CommandRunner for CommandRunner {
///
/// Runs a command on this machine in the passed working directory.
///
fn run(&self, req: ExecuteProcessRequest, _workunit_store: Arc<WorkUnitStore>) -> BoxFuture<FallibleExecuteProcessResult, String> {
/// TODO: start to create workunits for local process execution
///
fn run(&self, req: ExecuteProcessRequest, _workunit_store: WorkUnitStore) -> BoxFuture<FallibleExecuteProcessResult, String> {
let workdir = try_future!(tempfile::Builder::new()
.prefix("process-execution")
.tempdir_in(&self.work_dir)
Expand Down Expand Up @@ -341,8 +343,7 @@ mod tests {
use testutil::data::{TestData, TestDirectory};
use testutil::path::find_bash;
use testutil::{as_bytes, owned_string_vec};
use workunit_store::SafeWorkUnitStore;
use std::sync::Arc;
use workunit_store::WorkUnitStore;

#[test]
#[cfg(unix)]
Expand Down Expand Up @@ -900,6 +901,6 @@ mod tests {
};
tokio::runtime::Runtime::new()
.unwrap()
.block_on(runner.run(req, Arc::new(SafeWorkUnitStore::new())))
.block_on(runner.run(req, WorkUnitStore::new()))
}
}
36 changes: 17 additions & 19 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl super::CommandRunner for CommandRunner {
///
/// TODO: Request jdk_home be created if set.
///
fn run(&self, req: ExecuteProcessRequest, workunit_store: Arc<WorkUnitStore>) -> BoxFuture<FallibleExecuteProcessResult, String> {
fn run(&self, req: ExecuteProcessRequest, workunit_store: WorkUnitStore) -> BoxFuture<FallibleExecuteProcessResult, String> {
let operations_client = self.operations_client.clone();

let store = self.store.clone();
Expand Down Expand Up @@ -375,7 +375,7 @@ impl CommandRunner {
&self,
operation_or_status: OperationOrStatus,
attempts: &mut ExecutionHistory,
workunit_store: Arc<WorkUnitStore>,
workunit_store: WorkUnitStore,
) -> BoxFuture<FallibleExecuteProcessResult, ExecutionError> {
trace!("Got operation response: {:?}", operation_or_status);

Expand Down Expand Up @@ -416,28 +416,28 @@ impl CommandRunner {
match (worker_start - enqueued).to_std() {
Ok(duration) => {
attempts.current_attempt.remote_queue = Some(duration);
maybe_add_workunit(&result_cached, "scheduling", &enqueued, &worker_start, parent_id.clone(), &workunit_store);
maybe_add_workunit(result_cached, "scheduling", &enqueued, &worker_start, parent_id.clone(), &workunit_store);
},
Err(err) => warn!("Got negative remote queue time: {}", err),
}
match (input_fetch_completed - input_fetch_start).to_std() {
Ok(duration) => {
attempts.current_attempt.remote_input_fetch = Some(duration);
maybe_add_workunit(&result_cached, "fetching", &input_fetch_start, &input_fetch_completed, parent_id.clone(), &workunit_store);
maybe_add_workunit(result_cached, "fetching", &input_fetch_start, &input_fetch_completed, parent_id.clone(), &workunit_store);
},
Err(err) => warn!("Got negative remote input fetch time: {}", err),
}
match (execution_completed - execution_start).to_std() {
Ok(duration) => {
attempts.current_attempt.remote_execution = Some(duration);
maybe_add_workunit(&result_cached, "executing", &execution_start, &execution_completed, parent_id.clone(), &workunit_store);
maybe_add_workunit(result_cached, "executing", &execution_start, &execution_completed, parent_id.clone(), &workunit_store);
},
Err(err) => warn!("Got negative remote execution time: {}", err),
}
match (output_upload_completed - output_upload_start).to_std() {
Ok(duration) => {
attempts.current_attempt.remote_output_store = Some(duration);
maybe_add_workunit(&result_cached, "uploading", &output_upload_start, &output_upload_completed, parent_id, &workunit_store);
maybe_add_workunit(result_cached, "uploading", &output_upload_start, &output_upload_completed, parent_id, &workunit_store);
},
Err(err) => warn!("Got negative remote output store time: {}", err),
}
Expand Down Expand Up @@ -748,7 +748,7 @@ impl CommandRunner {
}
}

fn maybe_add_workunit(result_cached: &bool, name: &str, start_time: &Timespec, end_time: &Timespec, parent_id: Option<String>, workunit_store: &Arc<WorkUnitStore>) {
fn maybe_add_workunit(result_cached: bool, name: &str, start_time: &Timespec, end_time: &Timespec, parent_id: Option<String>, workunit_store: &WorkUnitStore) {
// TODO: workunits for scheduling, fetching, executing and uploading should be recorded
// only if '--reporting-zipkin-trace-v2' is set
if !result_cached {
Expand All @@ -764,8 +764,6 @@ fn maybe_add_workunit(result_cached: &bool, name: &str, start_time: &Timespec, e
}

fn timespec_as_float_secs(timespec: &Timespec) -> f64 {
// Returning value is formed by representing duration as a hole number of seconds (u64) plus
// a hole number of microseconds (u32) turned into a f64 type.
// Reverting time from duration to f64 decrease precision.
let whole_secs = timespec.sec as f64;
let fract_part_in_nanos = timespec.nsec as f64;
Expand Down Expand Up @@ -966,8 +964,7 @@ mod tests {
use std::ops::Sub;
use std::path::PathBuf;
use std::time::Duration;
use workunit_store::{WorkUnitStore, SafeWorkUnitStore, WorkUnit};
use std::sync::Arc;
use workunit_store::{WorkUnitStore, WorkUnit};
use bazel_protos::remote_execution::ExecutedActionMetadata;
use protobuf::well_known_types::Timestamp;

Expand Down Expand Up @@ -1525,7 +1522,7 @@ mod tests {
store,
);
let result = runtime
.block_on(cmd_runner.run(echo_roland_request(), Arc::new(SafeWorkUnitStore::new())))
.block_on(cmd_runner.run(echo_roland_request(), WorkUnitStore::new()))
.unwrap();
assert_eq!(
result.without_execution_attempts(),
Expand Down Expand Up @@ -1898,7 +1895,7 @@ mod tests {
);

let result = runtime
.block_on(command_runner.run(cat_roland_request(), Arc::new(SafeWorkUnitStore::new())))
.block_on(command_runner.run(cat_roland_request(), WorkUnitStore::new()))
.unwrap();
assert_eq!(
result.without_execution_attempts(),
Expand Down Expand Up @@ -1990,7 +1987,7 @@ mod tests {
1,
store,
)
.run(cat_roland_request(), Arc::new(SafeWorkUnitStore::new()))
.run(cat_roland_request(), WorkUnitStore::new())
.wait();
assert_eq!(
result,
Expand Down Expand Up @@ -2058,7 +2055,7 @@ mod tests {
);

let error = runtime
.block_on(runner.run(cat_roland_request(), Arc::new(SafeWorkUnitStore::new())))
.block_on(runner.run(cat_roland_request(), WorkUnitStore::new()))
.expect_err("Want error");
assert_contains(&error, &format!("{}", missing_digest.0));
}
Expand Down Expand Up @@ -2534,7 +2531,7 @@ mod tests {

#[test]
fn check_that_remote_workunits_are_in_workunit_store() {
let workunit_store = Arc::new(SafeWorkUnitStore::new());
let workunit_store =WorkUnitStore::new();
let op_name = "gimme-foo".to_string();
let testdata = TestData::roland();
let testdata_empty = TestData::empty();
Expand Down Expand Up @@ -2565,7 +2562,8 @@ mod tests {
workunit_store.len(),
4
);
let workunits = workunit_store.get_workunits().lock();
let workunits_arc = workunit_store.get_workunits();
let workunits = workunits_arc.lock();
let scheduling_workunit = &workunits[0];
assert_workunit_params(scheduling_workunit, "scheduling", 0.0, 1.0, None);
let fetching_workunit = &workunits[1];
Expand Down Expand Up @@ -2767,7 +2765,7 @@ mod tests {
.build();
let command_runner = create_command_runner(address, &cas);
let mut runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(command_runner.run(request, Arc::new(SafeWorkUnitStore::new())))
runtime.block_on(command_runner.run(request, WorkUnitStore::new()))
}

fn create_command_runner(address: String, cas: &mock::StubCAS) -> CommandRunner {
Expand Down Expand Up @@ -2803,7 +2801,7 @@ mod tests {
runtime.block_on(command_runner.extract_execute_response(
super::OperationOrStatus::Operation(operation),
&mut ExecutionHistory::default(),
Arc::new(SafeWorkUnitStore::new()),
WorkUnitStore::new(),
))
}

Expand Down
5 changes: 2 additions & 3 deletions src/rust/engine/process_executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ use std::process::exit;
use std::time::Duration;
use store::{BackoffConfig, Store};
use tokio::runtime::Runtime;
use workunit_store::SafeWorkUnitStore;
use std::sync::Arc;
use workunit_store::WorkUnitStore;

/// A binary which takes args of format:
/// process_executor --env=FOO=bar --env=SOME=value --input-digest=abc123 --input-digest-length=80
Expand Down Expand Up @@ -320,7 +319,7 @@ fn main() {
let mut runtime = Runtime::new().unwrap();

let result = runtime
.block_on(runner.run(request, Arc::new(SafeWorkUnitStore::new())))
.block_on(runner.run(request, WorkUnitStore::new()))
.expect("Error executing");

if let Some(output) = args.value_of("materialize-output-to").map(PathBuf::from) {
Expand Down
1 change: 0 additions & 1 deletion src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ use log::{error, Log};
use logging::logger::LOGGER;
use logging::{Destination, Logger};
use rule_graph::{GraphMaker, RuleGraph};
use workunit_store::WorkUnitStore;

// TODO: Consider renaming and making generic for collections of PyResults.
#[repr(C)]
Expand Down
3 changes: 2 additions & 1 deletion src/rust/engine/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use rule_graph;

use graph::{Entry, Node, NodeError, NodeTracer, NodeVisualizer};
use store::{self, StoreFileByDigest};
use workunit_store::{WorkUnit, WorkUnitStore, generate_random_64bit_string, set_parent_id};
use workunit_store::{WorkUnit, generate_random_64bit_string, set_parent_id};

pub type NodeFuture<T> = BoxFuture<T, Failure>;

Expand Down Expand Up @@ -1127,6 +1127,7 @@ impl Node for NodeKey {
start_timestamp,
end_timestamp,
span_id,
// TODO: set parent_id with the proper value, issue #7969
parent_id: None,
};
context2.session.workunit_store().add_workunit(workunit)
Expand Down
8 changes: 4 additions & 4 deletions src/rust/engine/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use indexmap::IndexMap;
use log::{debug, info, warn};
use parking_lot::Mutex;
use ui::EngineDisplay;
use workunit_store::SafeWorkUnitStore;
use workunit_store::WorkUnitStore;

///
/// A Session represents a related series of requests (generally: one run of the pants CLI) on an
Expand All @@ -37,7 +37,7 @@ struct InnerSession {
// If enabled, Zipkin spans for v2 engine will be collected.
should_record_zipkin_spans: bool,
// A place to store info about workunits in rust part
workunit_store: Arc<SafeWorkUnitStore>,
workunit_store: WorkUnitStore,
}

#[derive(Clone)]
Expand All @@ -55,7 +55,7 @@ impl Session {
roots: Mutex::new(HashSet::new()),
display: EngineDisplay::create(ui_worker_count, should_render_ui).map(Mutex::new),
should_record_zipkin_spans: should_record_zipkin_spans,
workunit_store: Arc::new(SafeWorkUnitStore::new()),
workunit_store: WorkUnitStore::new(),
};
Session(Arc::new(inner_session))
}
Expand All @@ -82,7 +82,7 @@ impl Session {
self.0.should_record_zipkin_spans
}

pub fn workunit_store(&self) -> Arc<SafeWorkUnitStore> {
pub fn workunit_store(&self) -> WorkUnitStore {
self.0.workunit_store.clone()
}
}
Expand Down
32 changes: 13 additions & 19 deletions src/rust/engine/workunit_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use parking_lot::Mutex;
use rand::thread_rng;
use rand::Rng;
use futures::task_local;
use std::sync::Arc;

pub struct WorkUnit {
pub name: String,
Expand All @@ -39,35 +40,28 @@ pub struct WorkUnit {
pub parent_id: Option<String>,
}

pub trait WorkUnitStore: Send + Sync {
fn get_workunits(&self) -> &Mutex<Vec<WorkUnit>>;
fn add_workunit(&self, workunit: WorkUnit);
fn len(&self) -> usize;
#[derive (Clone)]
pub struct WorkUnitStore {
workunits: Arc<Mutex<Vec<WorkUnit>>>,
}

pub struct SafeWorkUnitStore {
pub workunits: Mutex<Vec<WorkUnit>>,
}

impl SafeWorkUnitStore {
pub fn new() -> SafeWorkUnitStore {
SafeWorkUnitStore {
workunits: Mutex::new(Vec::new()),
impl WorkUnitStore {
pub fn new() -> WorkUnitStore {
WorkUnitStore {
workunits: Arc::new(Mutex::new(Vec::new())),
}
}
}

impl WorkUnitStore for SafeWorkUnitStore {
fn get_workunits(&self) -> &Mutex<Vec<WorkUnit>> {
&self.workunits
pub fn get_workunits(&self) -> Arc<Mutex<Vec<WorkUnit>>> {
self.workunits.clone()
}

fn add_workunit(&self, workunit: WorkUnit) {
pub fn add_workunit(&self, workunit: WorkUnit) {
self.workunits.lock().push(workunit);
}

fn len(&self) -> usize {
self.workunits.lock().len().clone()
pub fn len(&self) -> usize {
self.workunits.lock().len()
}
}

Expand Down

0 comments on commit 1cb27b3

Please sign in to comment.