diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 51e328227e30..1d215138de43 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -1653,6 +1653,7 @@ dependencies = [ "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-process 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", + "workunit_store 0.0.1", ] [[package]] @@ -1665,6 +1666,7 @@ dependencies = [ "process_execution 0.0.1", "store 0.1.0", "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "workunit_store 0.0.1", ] [[package]] diff --git a/src/rust/engine/process_execution/Cargo.toml b/src/rust/engine/process_execution/Cargo.toml index 153dc5135849..5885d9444068 100644 --- a/src/rust/engine/process_execution/Cargo.toml +++ b/src/rust/engine/process_execution/Cargo.toml @@ -25,6 +25,7 @@ time = "0.1.40" tokio-codec = "0.1" tokio-process = "0.2.1" tokio-timer = "0.2" +workunit_store = { path = "../workunit_store" } [dev-dependencies] mock = { path = "../testutil/mock" } diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index de608e7678cf..9ded47e94064 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -37,6 +37,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use store::UploadSummary; +use workunit_store::WorkUnitStore; use async_semaphore::AsyncSemaphore; @@ -133,7 +134,7 @@ impl AddAssign for ExecutionStats { } pub trait CommandRunner: Send + Sync { - fn run(&self, req: ExecuteProcessRequest) -> BoxFuture; + fn run(&self, req: ExecuteProcessRequest, workunit_store: Arc) -> BoxFuture; } /// @@ -153,9 +154,9 @@ impl BoundedCommandRunner { } impl CommandRunner for BoundedCommandRunner { - fn run(&self, req: ExecuteProcessRequest) -> BoxFuture { + fn run(&self, req: ExecuteProcessRequest, workunit_store: Arc) -> BoxFuture { let inner = self.inner.clone(); - self.inner.1.with_acquired(move || inner.0.run(req)) + self.inner.1.with_acquired(move || inner.0.run(req, workunit_store)) } } diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index 5d3c6d2ae4af..56f38807b431 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -21,6 +21,7 @@ use tokio_process::CommandExt; use super::{ExecuteProcessRequest, FallibleExecuteProcessResult}; use bytes::{Bytes, BytesMut}; +use workunit_store::WorkUnitStore; pub struct CommandRunner { store: Store, @@ -205,7 +206,7 @@ impl super::CommandRunner for CommandRunner { /// /// Runs a command on this machine in the passed working directory. /// - fn run(&self, req: ExecuteProcessRequest) -> BoxFuture { + fn run(&self, req: ExecuteProcessRequest, _workunit_store: Arc) -> BoxFuture { let workdir = try_future!(tempfile::Builder::new() .prefix("process-execution") .tempdir_in(&self.work_dir) @@ -340,6 +341,7 @@ mod tests { use testutil::data::{TestData, TestDirectory}; use testutil::path::find_bash; use testutil::{as_bytes, owned_string_vec}; + use workunit_store::SafeWorkUnitStore; #[test] #[cfg(unix)] @@ -897,6 +899,6 @@ mod tests { }; tokio::runtime::Runtime::new() .unwrap() - .block_on(runner.run(req)) + .block_on(runner.run(req, Arc::new(SafeWorkUnitStore::new()))) } } diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 3c893aeceecc..a02b58691781 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -23,6 +23,7 @@ use super::{ExecuteProcessRequest, ExecutionStats, FallibleExecuteProcessResult} use std; use std::cmp::min; use std::collections::btree_map::BTreeMap; +use workunit_store::WorkUnitStore; // Environment variable which is exclusively used for cache key invalidation. // This may be not specified in an ExecuteProcessRequest, and may be populated only by the @@ -126,7 +127,7 @@ impl super::CommandRunner for CommandRunner { /// /// TODO: Request jdk_home be created if set. /// - fn run(&self, req: ExecuteProcessRequest) -> BoxFuture { + fn run(&self, req: ExecuteProcessRequest, _workunit_store: Arc) -> BoxFuture { let operations_client = self.operations_client.clone(); let store = self.store.clone(); @@ -927,6 +928,7 @@ mod tests { use std::ops::Sub; use std::path::PathBuf; use std::time::Duration; + use workunit_store::SafeWorkUnitStore; #[derive(Debug, PartialEq)] enum StdoutType { @@ -1482,7 +1484,7 @@ mod tests { store, ); let result = runtime - .block_on(cmd_runner.run(echo_roland_request())) + .block_on(cmd_runner.run(echo_roland_request(), Arc::new(SafeWorkUnitStore::new()))) .unwrap(); assert_eq!( result.without_execution_attempts(), @@ -1855,7 +1857,7 @@ mod tests { ); let result = runtime - .block_on(command_runner.run(cat_roland_request())) + .block_on(command_runner.run(cat_roland_request(), Arc::new(SafeWorkUnitStore::new()))) .unwrap(); assert_eq!( result.without_execution_attempts(), @@ -1947,7 +1949,7 @@ mod tests { 1, store, ) - .run(cat_roland_request()) + .run(cat_roland_request(), Arc::new(SafeWorkUnitStore::new())) .wait(); assert_eq!( result, @@ -2015,7 +2017,7 @@ mod tests { ); let error = runtime - .block_on(runner.run(cat_roland_request())) + .block_on(runner.run(cat_roland_request(), Arc::new(SafeWorkUnitStore::new()))) .expect_err("Want error"); assert_contains(&error, &format!("{}", missing_digest.0)); } @@ -2610,7 +2612,7 @@ mod tests { .build(); let command_runner = create_command_runner(address, &cas); let mut runtime = tokio::runtime::Runtime::new().unwrap(); - runtime.block_on(command_runner.run(request)) + runtime.block_on(command_runner.run(request, Arc::new(SafeWorkUnitStore::new()))) } fn create_command_runner(address: String, cas: &mock::StubCAS) -> CommandRunner { diff --git a/src/rust/engine/process_executor/Cargo.toml b/src/rust/engine/process_executor/Cargo.toml index ebefbdf7cd36..60b4a86b4c21 100644 --- a/src/rust/engine/process_executor/Cargo.toml +++ b/src/rust/engine/process_executor/Cargo.toml @@ -12,3 +12,4 @@ hashing = { path = "../hashing" } process_execution = { path = "../process_execution" } store = { path = "../fs/store" } tokio = "0.1" +workunit_store = { path = "../workunit_store"} diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index 6e8c6a1e90d8..1285589ca4d1 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -40,6 +40,7 @@ use std::process::exit; use std::time::Duration; use store::{BackoffConfig, Store}; use tokio::runtime::Runtime; +use workunit_store::SafeWorkUnitStore; /// A binary which takes args of format: /// process_executor --env=FOO=bar --env=SOME=value --input-digest=abc123 --input-digest-length=80 @@ -318,7 +319,7 @@ fn main() { let mut runtime = Runtime::new().unwrap(); let result = runtime - .block_on(runner.run(request)) + .block_on(runner.run(request, Arc::new(SafeWorkUnitStore::new()))) .expect("Error executing"); if let Some(output) = args.value_of("materialize-output-to").map(PathBuf::from) { diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index 741fdcaa6776..5499c0ee0186 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -342,7 +342,7 @@ pub extern "C" fn scheduler_metrics( .flat_map(|(metric, value)| vec![externs::store_utf8(metric), externs::store_i64(value)]) .collect::>(); if session.should_record_zipkin_spans() { - let workunits = session + let workunits = session.workunit_store() .get_workunits() .lock() .iter() diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index c230419c7abb..29324dd8f264 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -415,11 +415,11 @@ impl WrappedNode for ExecuteProcess { fn run(self, context: Context) -> NodeFuture { let request = self.0; - + let workunit_store = context.session.workunit_store(); context .core .command_runner - .run(request) + .run(request, workunit_store) .map(ProcessResult) .map_err(|e| throw(&format!("Failed to execute process: {}", e))) .to_boxed() @@ -1126,7 +1126,7 @@ impl Node for NodeKey { end_timestamp: end_timestamp, span_id: generate_random_64bit_string(), }; - context2.session.add_workunit(workunit) + context2.session.workunit_store().add_workunit(workunit) }; }) .to_boxed() diff --git a/src/rust/engine/src/scheduler.rs b/src/rust/engine/src/scheduler.rs index 06e1d2883a61..468ff11bc00f 100644 --- a/src/rust/engine/src/scheduler.rs +++ b/src/rust/engine/src/scheduler.rs @@ -18,7 +18,7 @@ use indexmap::IndexMap; use log::{debug, info, warn}; use parking_lot::Mutex; use ui::EngineDisplay; -use workunit_store::{WorkUnit, WorkUnitStore}; +use workunit_store::SafeWorkUnitStore; /// /// A Session represents a related series of requests (generally: one run of the pants CLI) on an @@ -37,26 +37,12 @@ struct InnerSession { // If enabled, Zipkin spans for v2 engine will be collected. should_record_zipkin_spans: bool, // A place to store info about workunits in rust part - workunits: Mutex>, + workunit_store: Arc, } #[derive(Clone)] pub struct Session(Arc); -impl WorkUnitStore for Session { - fn should_record_zipkin_spans(&self) -> bool { - self.0.should_record_zipkin_spans - } - - fn get_workunits(&self) -> &Mutex> { - &self.0.workunits - } - - fn add_workunit(&self, workunit: WorkUnit) { - self.0.workunits.lock().push(workunit); - } -} - impl Session { pub fn new( scheduler: &Scheduler, @@ -69,7 +55,7 @@ impl Session { roots: Mutex::new(HashSet::new()), display: EngineDisplay::create(ui_worker_count, should_render_ui).map(Mutex::new), should_record_zipkin_spans: should_record_zipkin_spans, - workunits: Mutex::new(Vec::new()), + workunit_store: Arc::new(SafeWorkUnitStore::new()), }; Session(Arc::new(inner_session)) } @@ -91,6 +77,14 @@ impl Session { pub fn display(&self) -> &Option> { &self.0.display } + + pub fn should_record_zipkin_spans(&self) -> bool { + self.0.should_record_zipkin_spans + } + + pub fn workunit_store(&self) -> Arc { + self.0.workunit_store.clone() + } } pub struct ExecutionRequest { diff --git a/src/rust/engine/workunit_store/src/lib.rs b/src/rust/engine/workunit_store/src/lib.rs index 840b963f48d1..d7c265a19ade 100644 --- a/src/rust/engine/workunit_store/src/lib.rs +++ b/src/rust/engine/workunit_store/src/lib.rs @@ -35,8 +35,29 @@ pub struct WorkUnit { pub span_id: String, } -pub trait WorkUnitStore { - fn should_record_zipkin_spans(&self) -> bool; +pub trait WorkUnitStore: Send + Sync { fn get_workunits(&self) -> &Mutex>; fn add_workunit(&self, workunit: WorkUnit); } + +pub struct SafeWorkUnitStore { + pub workunits: Mutex>, +} + +impl SafeWorkUnitStore { + pub fn new() -> SafeWorkUnitStore { + SafeWorkUnitStore { + workunits: Mutex::new(Vec::new()), + } + } +} + +impl WorkUnitStore for SafeWorkUnitStore { + fn get_workunits(&self) -> &Mutex> { + &self.workunits + } + + fn add_workunit(&self, workunit: WorkUnit) { + self.workunits.lock().push(workunit); + } +}