Skip to content

Commit

Permalink
Add test
Browse files Browse the repository at this point in the history
  • Loading branch information
cattibrie committed Jun 28, 2019
1 parent 62c4d83 commit f778a37
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 6 deletions.
1 change: 1 addition & 0 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ mod tests {
use testutil::path::find_bash;
use testutil::{as_bytes, owned_string_vec};
use workunit_store::SafeWorkUnitStore;
use std::sync::Arc;

#[test]
#[cfg(unix)]
Expand Down
126 changes: 120 additions & 6 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ use super::{ExecuteProcessRequest, ExecutionStats, FallibleExecuteProcessResult}
use std;
use std::cmp::min;
use std::collections::btree_map::BTreeMap;
use workunit_store::WorkUnitStore;
use workunit_store::{WorkUnit, generate_random_64bit_string};
use workunit_store::{WorkUnit, WorkUnitStore, generate_random_64bit_string, get_parent_id};
use time::Timespec;
use workunit_store::get_parent_id;

// Environment variable which is exclusively used for cache key invalidation.
// This may be not specified in an ExecuteProcessRequest, and may be populated only by the
Expand Down Expand Up @@ -402,7 +400,6 @@ impl CommandRunner {
.merge_from_bytes(operation.get_response().get_value())
.map_err(|e| ExecutionError::Fatal(format!("Invalid ExecuteResponse: {:?}", e))));
trace!("Got (nested) execute response: {:?}", execute_response);

if execute_response.get_result().has_execution_metadata() {
let metadata = execute_response.get_result().get_execution_metadata();
let enqueued = timespec_from(metadata.get_queued_timestamp());
Expand Down Expand Up @@ -969,7 +966,10 @@ mod tests {
use std::ops::Sub;
use std::path::PathBuf;
use std::time::Duration;
use workunit_store::SafeWorkUnitStore;
use workunit_store::{WorkUnitStore, SafeWorkUnitStore, WorkUnit};
use std::sync::Arc;
use bazel_protos::remote_execution::ExecutedActionMetadata;
use protobuf::well_known_types::Timestamp;

#[derive(Debug, PartialEq)]
enum StdoutType {
Expand Down Expand Up @@ -2532,6 +2532,69 @@ mod tests {
)
}

#[test]
fn check_that_remote_workunits_are_in_workunit_store() {
let workunit_store = Arc::new(SafeWorkUnitStore::new());
let op_name = "gimme-foo".to_string();
let testdata = TestData::roland();
let testdata_empty = TestData::empty();
let operation = make_successful_operation_with_metadata(
&op_name,
StdoutType::Digest(testdata.digest()),
StderrType::Raw(testdata_empty.string()),
0,
)
.op
.unwrap()
.unwrap();
let cas = mock::StubCAS::builder()
.file(&TestData::roland())
.directory(&TestDirectory::containing_roland())
.build();
let command_runner = create_command_runner("".to_owned(), &cas);

let mut runtime = tokio::runtime::Runtime::new().unwrap();

let workunit_store_2 = workunit_store.clone();
runtime.block_on(futures::future::ok(()).and_then(move |()| command_runner.extract_execute_response(
super::OperationOrStatus::Operation(operation),
&mut ExecutionHistory::default(),
workunit_store_2,
))).unwrap();
assert_eq!(
workunit_store.len(),
4
);
let workunits = workunit_store.get_workunits().lock();
let scheduling_workunit = &workunits[0];
assert_workunit_params(scheduling_workunit, "scheduling", 0.0, 1.0, None);
let fetching_workunit = &workunits[1];
assert_workunit_params(fetching_workunit, "fetching", 2.0, 3.0, None);
let executing_workunit = &workunits[2];
assert_workunit_params(executing_workunit, "executing", 4.0, 5.0, None);
let uploading_workunit = &workunits[3];
assert_workunit_params(uploading_workunit, "uploading", 6.0, 7.0, None);
}

fn assert_workunit_params(workunit: &WorkUnit, name: &str, start_timestamp: f64, end_timestamp: f64, parent_id: Option<String>) {
assert_eq!(
workunit.name,
String::from(name)
);
assert_eq!(
workunit.start_timestamp,
start_timestamp
);
assert_eq!(
workunit.end_timestamp,
end_timestamp
);
assert_eq!(
workunit.parent_id,
parent_id
);
}

fn echo_foo_request() -> ExecuteProcessRequest {
ExecuteProcessRequest {
argv: owned_string_vec(&["/bin/echo", "-n", "foo"]),
Expand Down Expand Up @@ -2569,11 +2632,12 @@ mod tests {
}
}

fn make_successful_operation(
fn make_successful_operation_with_maybe_metadata(
operation_name: &str,
stdout: StdoutType,
stderr: StderrType,
exit_code: i32,
metadata: Option<ExecutedActionMetadata>,
) -> MockOperation {
let mut op = bazel_protos::operations::Operation::new();
op.set_name(operation_name.to_string());
Expand All @@ -2599,6 +2663,9 @@ mod tests {
}
}
action_result.set_exit_code(exit_code);
if let Some(metadata) = metadata {
action_result.set_execution_metadata(metadata);
};
action_result
});

Expand All @@ -2614,6 +2681,53 @@ mod tests {
MockOperation::new(op)
}

fn make_successful_operation(
operation_name: &str,
stdout: StdoutType,
stderr: StderrType,
exit_code: i32,
) -> MockOperation {
make_successful_operation_with_maybe_metadata(
operation_name,
stdout,
stderr,
exit_code,
None
)
}

fn make_successful_operation_with_metadata(
operation_name: &str,
stdout: StdoutType,
stderr: StderrType,
exit_code: i32,
) -> MockOperation {
let mut metadata = ExecutedActionMetadata::new();
metadata.set_queued_timestamp(timestamp_only_secs(0));
metadata.set_worker_start_timestamp(timestamp_only_secs(1));
metadata.set_worker_completed_timestamp(timestamp_only_secs(8));
metadata.set_input_fetch_start_timestamp(timestamp_only_secs(2));
metadata.set_input_fetch_completed_timestamp(timestamp_only_secs(3));
metadata.set_execution_start_timestamp(timestamp_only_secs(4));
metadata.set_execution_completed_timestamp(timestamp_only_secs(5));
metadata.set_output_upload_start_timestamp(timestamp_only_secs(6));
metadata.set_output_upload_completed_timestamp(timestamp_only_secs(7));

make_successful_operation_with_maybe_metadata(
operation_name,
stdout,
stderr,
exit_code,
Some(metadata),
)
}

fn timestamp_only_secs(v: i64) -> Timestamp {
let mut dummy_timestamp = Timestamp::new();
dummy_timestamp.set_seconds(v);
dummy_timestamp
}

fn make_precondition_failure_operation(
violations: Vec<bazel_protos::error_details::PreconditionFailure_Violation>,
) -> MockOperation {
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/process_executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use std::time::Duration;
use store::{BackoffConfig, Store};
use tokio::runtime::Runtime;
use workunit_store::SafeWorkUnitStore;
use std::sync::Arc;

/// A binary which takes args of format:
/// process_executor --env=FOO=bar --env=SOME=value --input-digest=abc123 --input-digest-length=80
Expand Down
5 changes: 5 additions & 0 deletions src/rust/engine/workunit_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct WorkUnit {
pub trait WorkUnitStore: Send + Sync {
fn get_workunits(&self) -> &Mutex<Vec<WorkUnit>>;
fn add_workunit(&self, workunit: WorkUnit);
fn len(&self) -> usize;
}

pub struct SafeWorkUnitStore {
Expand All @@ -64,6 +65,10 @@ impl WorkUnitStore for SafeWorkUnitStore {
fn add_workunit(&self, workunit: WorkUnit) {
self.workunits.lock().push(workunit);
}

fn len(&self) -> usize {
self.workunits.lock().len().clone()
}
}

pub fn generate_random_64bit_string() -> String {
Expand Down

0 comments on commit f778a37

Please sign in to comment.