diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index 56f38807b431..6897080b0487 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -342,6 +342,7 @@ mod tests { use testutil::path::find_bash; use testutil::{as_bytes, owned_string_vec}; use workunit_store::SafeWorkUnitStore; + use std::sync::Arc; #[test] #[cfg(unix)] diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 01b62b433ba9..8bb4cd2bafdd 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -23,10 +23,8 @@ use super::{ExecuteProcessRequest, ExecutionStats, FallibleExecuteProcessResult} use std; use std::cmp::min; use std::collections::btree_map::BTreeMap; -use workunit_store::WorkUnitStore; -use workunit_store::{WorkUnit, generate_random_64bit_string}; +use workunit_store::{WorkUnit, WorkUnitStore, generate_random_64bit_string, get_parent_id}; use time::Timespec; -use workunit_store::get_parent_id; // Environment variable which is exclusively used for cache key invalidation. // This may be not specified in an ExecuteProcessRequest, and may be populated only by the @@ -402,7 +400,6 @@ impl CommandRunner { .merge_from_bytes(operation.get_response().get_value()) .map_err(|e| ExecutionError::Fatal(format!("Invalid ExecuteResponse: {:?}", e)))); trace!("Got (nested) execute response: {:?}", execute_response); - if execute_response.get_result().has_execution_metadata() { let metadata = execute_response.get_result().get_execution_metadata(); let enqueued = timespec_from(metadata.get_queued_timestamp()); @@ -969,7 +966,10 @@ mod tests { use std::ops::Sub; use std::path::PathBuf; use std::time::Duration; - use workunit_store::SafeWorkUnitStore; + use workunit_store::{WorkUnitStore, SafeWorkUnitStore, WorkUnit}; + use std::sync::Arc; + use bazel_protos::remote_execution::ExecutedActionMetadata; + use protobuf::well_known_types::Timestamp; #[derive(Debug, PartialEq)] enum StdoutType { @@ -2532,6 +2532,69 @@ mod tests { ) } + #[test] + fn check_that_remote_workunits_are_in_workunit_store() { + let workunit_store = Arc::new(SafeWorkUnitStore::new()); + let op_name = "gimme-foo".to_string(); + let testdata = TestData::roland(); + let testdata_empty = TestData::empty(); + let operation = make_successful_operation_with_metadata( + &op_name, + StdoutType::Digest(testdata.digest()), + StderrType::Raw(testdata_empty.string()), + 0, + ) + .op + .unwrap() + .unwrap(); + let cas = mock::StubCAS::builder() + .file(&TestData::roland()) + .directory(&TestDirectory::containing_roland()) + .build(); + let command_runner = create_command_runner("".to_owned(), &cas); + + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + + let workunit_store_2 = workunit_store.clone(); + runtime.block_on(futures::future::ok(()).and_then(move |()| command_runner.extract_execute_response( + super::OperationOrStatus::Operation(operation), + &mut ExecutionHistory::default(), + workunit_store_2, + ))).unwrap(); + assert_eq!( + workunit_store.len(), + 4 + ); + let workunits = workunit_store.get_workunits().lock(); + let scheduling_workunit = &workunits[0]; + assert_workunit_params(scheduling_workunit, "scheduling", 0.0, 1.0, None); + let fetching_workunit = &workunits[1]; + assert_workunit_params(fetching_workunit, "fetching", 2.0, 3.0, None); + let executing_workunit = &workunits[2]; + assert_workunit_params(executing_workunit, "executing", 4.0, 5.0, None); + let uploading_workunit = &workunits[3]; + assert_workunit_params(uploading_workunit, "uploading", 6.0, 7.0, None); + } + + fn assert_workunit_params(workunit: &WorkUnit, name: &str, start_timestamp: f64, end_timestamp: f64, parent_id: Option) { + assert_eq!( + workunit.name, + String::from(name) + ); + assert_eq!( + workunit.start_timestamp, + start_timestamp + ); + assert_eq!( + workunit.end_timestamp, + end_timestamp + ); + assert_eq!( + workunit.parent_id, + parent_id + ); + } + fn echo_foo_request() -> ExecuteProcessRequest { ExecuteProcessRequest { argv: owned_string_vec(&["/bin/echo", "-n", "foo"]), @@ -2569,11 +2632,12 @@ mod tests { } } - fn make_successful_operation( + fn make_successful_operation_with_maybe_metadata( operation_name: &str, stdout: StdoutType, stderr: StderrType, exit_code: i32, + metadata: Option, ) -> MockOperation { let mut op = bazel_protos::operations::Operation::new(); op.set_name(operation_name.to_string()); @@ -2599,6 +2663,9 @@ mod tests { } } action_result.set_exit_code(exit_code); + if let Some(metadata) = metadata { + action_result.set_execution_metadata(metadata); + }; action_result }); @@ -2614,6 +2681,53 @@ mod tests { MockOperation::new(op) } + fn make_successful_operation( + operation_name: &str, + stdout: StdoutType, + stderr: StderrType, + exit_code: i32, + ) -> MockOperation { + make_successful_operation_with_maybe_metadata( + operation_name, + stdout, + stderr, + exit_code, + None + ) + } + + fn make_successful_operation_with_metadata( + operation_name: &str, + stdout: StdoutType, + stderr: StderrType, + exit_code: i32, + ) -> MockOperation { + let mut metadata = ExecutedActionMetadata::new(); + metadata.set_queued_timestamp(timestamp_only_secs(0)); + metadata.set_worker_start_timestamp(timestamp_only_secs(1)); + metadata.set_worker_completed_timestamp(timestamp_only_secs(8)); + metadata.set_input_fetch_start_timestamp(timestamp_only_secs(2)); + metadata.set_input_fetch_completed_timestamp(timestamp_only_secs(3)); + metadata.set_execution_start_timestamp(timestamp_only_secs(4)); + metadata.set_execution_completed_timestamp(timestamp_only_secs(5)); + metadata.set_output_upload_start_timestamp(timestamp_only_secs(6)); + metadata.set_output_upload_completed_timestamp(timestamp_only_secs(7)); + + make_successful_operation_with_maybe_metadata( + operation_name, + stdout, + stderr, + exit_code, + Some(metadata), + ) + } + + fn timestamp_only_secs(v: i64) -> Timestamp { + let mut dummy_timestamp = Timestamp::new(); + dummy_timestamp.set_seconds(v); + dummy_timestamp + } + fn make_precondition_failure_operation( violations: Vec, ) -> MockOperation { diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index 1285589ca4d1..714fe7cd060f 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -41,6 +41,7 @@ use std::time::Duration; use store::{BackoffConfig, Store}; use tokio::runtime::Runtime; use workunit_store::SafeWorkUnitStore; +use std::sync::Arc; /// A binary which takes args of format: /// process_executor --env=FOO=bar --env=SOME=value --input-digest=abc123 --input-digest-length=80 diff --git a/src/rust/engine/workunit_store/src/lib.rs b/src/rust/engine/workunit_store/src/lib.rs index cfb2238005ac..b6eb08b615b7 100644 --- a/src/rust/engine/workunit_store/src/lib.rs +++ b/src/rust/engine/workunit_store/src/lib.rs @@ -42,6 +42,7 @@ pub struct WorkUnit { pub trait WorkUnitStore: Send + Sync { fn get_workunits(&self) -> &Mutex>; fn add_workunit(&self, workunit: WorkUnit); + fn len(&self) -> usize; } pub struct SafeWorkUnitStore { @@ -64,6 +65,10 @@ impl WorkUnitStore for SafeWorkUnitStore { fn add_workunit(&self, workunit: WorkUnit) { self.workunits.lock().push(workunit); } + + fn len(&self) -> usize { + self.workunits.lock().len().clone() + } } pub fn generate_random_64bit_string() -> String {