From 02d2b993a9e911cfc301fb939273f6b842671804 Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Fri, 8 Jan 2021 13:45:52 -0700 Subject: [PATCH 01/11] Simplify return types of some test utils # Building wheels and fs_util will be skipped. Delete if not intended. [ci skip-build-wheels] --- .../src/remote_cache_tests.rs | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index a6381210919..b1258764a7c 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -30,7 +30,7 @@ struct RoundtripResults { maybe_cached: Result, } -fn create_local_runner() -> (Box, Store, TempDir, StubCAS) { +fn create_local_runner() -> (Box, Store) { let runtime = task_executor::Executor::new(); let base_dir = TempDir::new().unwrap(); let named_cache_dir = base_dir.path().join("named_cache_dir"); @@ -58,15 +58,14 @@ fn create_local_runner() -> (Box, Store, TempDir, StubCA NamedCaches::new(named_cache_dir), true, )); - (runner, store, base_dir, stub_cas) + (runner, store) } fn create_cached_runner( local: Box, store: Store, eager_fetch: bool, -) -> (Box, TempDir, StubActionCache) { - let cache_dir = TempDir::new().unwrap(); +) -> (Box, StubActionCache) { let action_cache = StubActionCache::new().unwrap(); let runner = Box::new( crate::remote_cache::CommandRunner::new( @@ -84,10 +83,10 @@ fn create_cached_runner( ) .expect("caching command runner"), ); - (runner, cache_dir, action_cache) + (runner, action_cache) } -fn create_script(script_exit_code: i8) -> (Process, PathBuf, TempDir) { +fn create_script(script_exit_code: i8) -> (Process, PathBuf) { let script_dir = TempDir::new().unwrap(); let script_path = script_dir.path().join("script"); std::fs::File::create(&script_path) @@ -107,16 +106,16 @@ fn create_script(script_exit_code: i8) -> (Process, PathBuf, TempDir) { ]) .output_files(relative_paths(&["roland"]).collect()); - (process, script_path, script_dir) + (process, script_path) } async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults { - let (local, store, _local_runner_dir, _stub_cas) = create_local_runner(); - let (process, script_path, _script_dir) = create_script(script_exit_code); + let (local, store) = create_local_runner(); + let (process, script_path) = create_script(script_exit_code); let local_result = local.run(process.clone().into(), Context::default()).await; - let (caching, _cache_dir, _stub_action_cache) = create_cached_runner(local, store.clone(), false); + let (caching, _stub_action_cache) = create_cached_runner(local, store.clone(), false); let uncached_result = caching .run(process.clone().into(), Context::default()) @@ -156,9 +155,9 @@ async fn failures_not_cached() { async fn skip_cache_on_error() { WorkunitStore::setup_for_tests(); - let (local, store, _local_runner_dir, _stub_cas) = create_local_runner(); - let (caching, _cache_dir, stub_action_cache) = create_cached_runner(local, store.clone(), false); - let (process, _script_path, _script_dir) = create_script(0); + let (local, store) = create_local_runner(); + let (caching, stub_action_cache) = create_cached_runner(local, store.clone(), false); + let (process, _script_path) = create_script(0); stub_action_cache .always_errors @@ -181,13 +180,12 @@ async fn eager_fetch() { WorkunitStore::setup_for_tests(); async fn run_process(eager_fetch: bool) -> FallibleProcessResultWithPlatform { - let (local, store, _local_runner_dir, _stub_cas) = create_local_runner(); - let (caching, _cache_dir, stub_action_cache) = - create_cached_runner(local, store.clone(), eager_fetch); + let (local, store) = create_local_runner(); + let (caching, stub_action_cache) = create_cached_runner(local, store.clone(), eager_fetch); // Get the `action_digest` for the Process that we're going to run. This will allow us to // insert a bogus value into the `stub_action_cache`. - let (process, _script_path, _script_dir) = create_script(1); + let (process, _script_path) = create_script(1); let (action, command, _exec_request) = make_execute_request(&process, ProcessMetadata::default()).unwrap(); let (_command_digest, action_digest) = ensure_action_stored_locally(&store, &command, &action) From cd018bc1525c727ca9ae0818a6a31ea62c3e02dc Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Mon, 11 Jan 2021 09:40:00 -0700 Subject: [PATCH 02/11] WIP to save my progress with laptop crash [ci skip-build-wheels] --- .../src/remote_cache_tests.rs | 110 +++++++++++++++++- 1 file changed, 109 insertions(+), 1 deletion(-) diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index b1258764a7c..b6871fc3248 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -9,7 +9,7 @@ use std::time::Duration; use async_trait::async_trait; use bazel_protos::gen::build::bazel::remote::execution::v2 as remexec; use fs::RelativePath; -use hashing::Digest; +use hashing::{Digest, EMPTY_DIGEST}; use maplit::hashset; use mock::{StubActionCache, StubCAS}; use remexec::ActionResult; @@ -24,12 +24,68 @@ use crate::{ CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, NamedCaches, Platform, Process, ProcessMetadata, }; +use term::terminfo::parm::Param::Words; struct RoundtripResults { uncached: Result, maybe_cached: Result, } +/// A mock of the local runner used for better hermeticity of the tests. +struct MockLocalCommandRunner { + result: Result, +} + +impl MockLocalCommandRunner { + pub fn new(exit_code: i32) -> MockLocalCommandRunner { + MockLocalCommandRunner { + result: Ok(FallibleProcessResultWithPlatform { + stdout_digest: EMPTY_DIGEST, + stderr_digest: EMPTY_DIGEST, + exit_code, + output_directory: EMPTY_DIGEST, + execution_attempts: vec![], + platform: Platform::current().unwrap(), + }), + } + } +} + +impl CommandRunnerTrait for MockLocalCommandRunner { + async fn run( + &self, + _req: MultiPlatformProcess, + _context: Context, + ) -> Result { + self.result.clone() + } + + fn extract_compatible_request(&self, req: &MultiPlatformProcess) -> Option { + Some(req.0.get(&None).unwrap().clone()) + } +} + +fn create_store() -> Store { + let runtime = task_executor::Executor::new(); + let stub_cas = StubCAS::builder().build(); + let store_dir = base_dir.path().join("store_dir"); + Store::with_remote( + runtime, + store_dir, + vec![stub_cas.address()], + None, + None, + None, + 1, + 10 * 1024 * 1024, + Duration::from_secs(1), + BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(), + 1, + 1, + ) + .unwrap() +} + fn create_local_runner() -> (Box, Store) { let runtime = task_executor::Executor::new(); let base_dir = TempDir::new().unwrap(); @@ -135,6 +191,58 @@ async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults { } } +#[tokio::test] +async fn cache_read_success() { + WorkunitStore::setup_for_tests(); + let store = create_store(); + let local_runner = Box::new(MockLocalCommandRunner::new(1)); + let (cache_runner, action_cache) = create_cached_runner(local_runner, store, false); + + // Insert a successful ActionResult. + let result = cache_runner.run().await.unwrap(); + assert_eq!(result.exit_code, 0); +} + +/// If the cache has any issues during failures, we should gracefully fallback to the local runner. +#[tokio::test] +async fn cache_read_skipped_on_errors() { + WorkunitStore::setup_for_tests(); + let store = create_store(); + let local_runner = Box::new(MockLocalCommandRunner::new(1)); + let (cache_runner, action_cache) = create_cached_runner(local, store, false); + + let result = cache_runner.run().await.unwrap(); + + let results = run_roundtrip(0).await; + assert_eq!(results.uncached, results.maybe_cached); +} + +#[tokio::test] +async fn cache_write_success() { + WorkunitStore::setup_for_tests(); + let store = create_store(); + let local_runner = Box::new(MockLocalCommandRunner::new(1)); + let (cache_runner, action_cache) = create_cached_runner(local, store, false); + + let result = cache_runner.run().await.unwrap(); + + let results = run_roundtrip(0).await; + assert_eq!(results.uncached, results.maybe_cached); +} + +#[tokio::test] +async fn cache_write_not_for_failures() { + WorkunitStore::setup_for_tests(); + let store = create_store(); + let local_runner = Box::new(MockLocalCommandRunner::new(1)); + let (cache_runner, action_cache) = create_cached_runner(local, store, false); + + let result = cache_runner.run().await.unwrap(); + + let results = run_roundtrip(0).await; + assert_eq!(results.uncached, results.maybe_cached); +} + #[tokio::test] async fn cache_success() { WorkunitStore::setup_for_tests(); From 730e14243c03ea8c059aec2dca66ed29b6963ae0 Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Fri, 15 Jan 2021 18:43:03 -0700 Subject: [PATCH 03/11] Try to add StubActionCache::insert --- src/rust/engine/Cargo.lock | 2 + .../src/remote_cache_tests.rs | 93 +++++++++++-------- src/rust/engine/testutil/mock/Cargo.toml | 2 + .../engine/testutil/mock/src/action_cache.rs | 34 +++++++ 4 files changed, 93 insertions(+), 38 deletions(-) diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 518e8324ad3..09691c85b8a 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -1609,8 +1609,10 @@ dependencies = [ "hyper", "log 0.4.11", "parking_lot", + "process_execution", "prost", "prost-types", + "store", "testutil", "tokio", "tonic", diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index b6871fc3248..8107b36fb65 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -24,7 +24,6 @@ use crate::{ CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, NamedCaches, Platform, Process, ProcessMetadata, }; -use term::terminfo::parm::Param::Words; struct RoundtripResults { uncached: Result, @@ -51,6 +50,7 @@ impl MockLocalCommandRunner { } } +#[async_trait] impl CommandRunnerTrait for MockLocalCommandRunner { async fn run( &self, @@ -65,10 +65,10 @@ impl CommandRunnerTrait for MockLocalCommandRunner { } } -fn create_store() -> Store { +fn create_remote_store() -> Store { let runtime = task_executor::Executor::new(); let stub_cas = StubCAS::builder().build(); - let store_dir = base_dir.path().join("store_dir"); + let store_dir = TempDir::new().unwrap().path().join("store_dir"); Store::with_remote( runtime, store_dir, @@ -142,6 +142,13 @@ fn create_cached_runner( (runner, action_cache) } +fn create_process() -> Process { + Process::new(vec![ + testutil::path::find_bash(), + "echo -n hello world".to_string(), + ]) +} + fn create_script(script_exit_code: i8) -> (Process, PathBuf) { let script_dir = TempDir::new().unwrap(); let script_path = script_dir.path().join("script"); @@ -194,54 +201,64 @@ async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults { #[tokio::test] async fn cache_read_success() { WorkunitStore::setup_for_tests(); - let store = create_store(); + let store = create_remote_store(); let local_runner = Box::new(MockLocalCommandRunner::new(1)); - let (cache_runner, action_cache) = create_cached_runner(local_runner, store, false); + let (cache_runner, action_cache) = create_cached_runner(local_runner, store.clone(), false); + + let process = create_process(); + action_cache.insert(&process, &store, 0, EMPTY_DIGEST, EMPTY_DIGEST); - // Insert a successful ActionResult. - let result = cache_runner.run().await.unwrap(); + let result = cache_runner + .run(process.clone().into(), Context::default()) + .await + .unwrap(); assert_eq!(result.exit_code, 0); } -/// If the cache has any issues during failures, we should gracefully fallback to the local runner. +/// If the cache has any issues during reads, we should gracefully fallback to the local runner. #[tokio::test] async fn cache_read_skipped_on_errors() { WorkunitStore::setup_for_tests(); - let store = create_store(); + let store = create_remote_store(); let local_runner = Box::new(MockLocalCommandRunner::new(1)); - let (cache_runner, action_cache) = create_cached_runner(local, store, false); - - let result = cache_runner.run().await.unwrap(); + let (cache_runner, action_cache) = create_cached_runner(local_runner, store.clone(), false); - let results = run_roundtrip(0).await; - assert_eq!(results.uncached, results.maybe_cached); -} + let process = create_process(); + action_cache.insert(&process, &store, 0, EMPTY_DIGEST, EMPTY_DIGEST); + action_cache.always_errors.store(true, Ordering::SeqCst); -#[tokio::test] -async fn cache_write_success() { - WorkunitStore::setup_for_tests(); - let store = create_store(); - let local_runner = Box::new(MockLocalCommandRunner::new(1)); - let (cache_runner, action_cache) = create_cached_runner(local, store, false); - - let result = cache_runner.run().await.unwrap(); - - let results = run_roundtrip(0).await; - assert_eq!(results.uncached, results.maybe_cached); + let result = cache_runner + .run(process.clone().into(), Context::default()) + .await + .unwrap(); + assert_eq!(result.exit_code, 1); } -#[tokio::test] -async fn cache_write_not_for_failures() { - WorkunitStore::setup_for_tests(); - let store = create_store(); - let local_runner = Box::new(MockLocalCommandRunner::new(1)); - let (cache_runner, action_cache) = create_cached_runner(local, store, false); - - let result = cache_runner.run().await.unwrap(); - - let results = run_roundtrip(0).await; - assert_eq!(results.uncached, results.maybe_cached); -} +// #[tokio::test] +// async fn cache_write_success() { +// WorkunitStore::setup_for_tests(); +// let store = create_remote_store(); +// let local_runner = Box::new(MockLocalCommandRunner::new(1)); +// let (cache_runner, action_cache) = create_cached_runner(local, store, false); +// +// let result = cache_runner.run().await.unwrap(); +// +// let results = run_roundtrip(0).await; +// assert_eq!(results.uncached, results.maybe_cached); +// } +// +// #[tokio::test] +// async fn cache_write_not_for_failures() { +// WorkunitStore::setup_for_tests(); +// let store = create_remote_store(); +// let local_runner = Box::new(MockLocalCommandRunner::new(1)); +// let (cache_runner, action_cache) = create_cached_runner(local, store, false); +// +// let result = cache_runner.run().await.unwrap(); +// +// let results = run_roundtrip(0).await; +// assert_eq!(results.uncached, results.maybe_cached); +// } #[tokio::test] async fn cache_success() { diff --git a/src/rust/engine/testutil/mock/Cargo.toml b/src/rust/engine/testutil/mock/Cargo.toml index 524faf9a750..04de9e886cc 100644 --- a/src/rust/engine/testutil/mock/Cargo.toml +++ b/src/rust/engine/testutil/mock/Cargo.toml @@ -14,8 +14,10 @@ hashing = { path = "../../hashing" } hyper = { version = "0.13", features = ["stream", "tcp"] } log = "0.4" parking_lot = "0.11" +process_execution = { path = "../../process_execution" } prost = { git = "https://github.com/danburkert/prost", rev = "a1cccbcee343e2c444e1cd2738c7fba2599fc391" } prost-types = "0.6" +store = { path = "../../fs/store" } testutil = { path = ".." } tokio = { version = "0.2.23", features = ["time"] } tonic = { version = "0.3" } diff --git a/src/rust/engine/testutil/mock/src/action_cache.rs b/src/rust/engine/testutil/mock/src/action_cache.rs index c346705eb52..40242470cf2 100644 --- a/src/rust/engine/testutil/mock/src/action_cache.rs +++ b/src/rust/engine/testutil/mock/src/action_cache.rs @@ -39,10 +39,13 @@ use hashing::{Digest, Fingerprint}; use parking_lot::Mutex; use remexec::action_cache_server::{ActionCache, ActionCacheServer}; use remexec::{ActionResult, GetActionResultRequest, UpdateActionResultRequest}; +use store::Store; use tonic::transport::Server; use tonic::{Request, Response, Status}; use crate::tonic_util::AddrIncomingWithStream; +use process_execution::remote::{ensure_action_stored_locally, make_execute_request}; +use process_execution::{Process, ProcessMetadata}; pub struct StubActionCache { pub action_map: Arc>>, @@ -172,4 +175,35 @@ impl StubActionCache { pub fn address(&self) -> String { format!("{}", self.local_addr) } + + /// + /// Insert an ActionResult for a particular Process. + /// + /// This will calculate the action_digest for the input Process, using the same logic as ` + /// CommandRunner::run()`. + /// + pub async fn insert( + &self, + process: &Process, + store: &Store, + exit_code: i32, + stdout_digest: Digest, + stderr_digest: Digest, + ) { + let (action, command, _exec_request) = + make_execute_request(process, ProcessMetadata::default()).unwrap(); + let (_command_digest, action_digest) = ensure_action_stored_locally(store, &command, &action) + .await + .unwrap(); + let action_result = ActionResult { + exit_code, + stdout_digest: Some(stdout_digest.into()), + stderr_digest: Some(stderr_digest.into()), + ..ActionResult::default() + }; + self + .action_map + .lock() + .insert(action_digest.0, action_result); + } } From 8c09866b4e11d3f5b7c55c7a38f39dc11a6d6029 Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Tue, 19 Jan 2021 16:09:05 -0700 Subject: [PATCH 04/11] Finish the remote cache reads tests Thanks Tom and Greg! # Building wheels and fs_util will be skipped. Delete if not intended. [ci skip-build-wheels] --- src/rust/engine/Cargo.lock | 2 - .../src/remote_cache_tests.rs | 122 +++++++++++++----- src/rust/engine/testutil/mock/Cargo.toml | 2 - .../engine/testutil/mock/src/action_cache.rs | 34 ----- 4 files changed, 87 insertions(+), 73 deletions(-) diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 09691c85b8a..518e8324ad3 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -1609,10 +1609,8 @@ dependencies = [ "hyper", "log 0.4.11", "parking_lot", - "process_execution", "prost", "prost-types", - "store", "testutil", "tokio", "tonic", diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index 8107b36fb65..c6f3edddb51 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -12,6 +12,7 @@ use fs::RelativePath; use hashing::{Digest, EMPTY_DIGEST}; use maplit::hashset; use mock::{StubActionCache, StubCAS}; +use parking_lot::Mutex; use remexec::ActionResult; use store::{BackoffConfig, Store}; use tempfile::TempDir; @@ -31,12 +32,14 @@ struct RoundtripResults { } /// A mock of the local runner used for better hermeticity of the tests. +#[derive(Clone)] struct MockLocalCommandRunner { result: Result, + call_counter: Arc>, } impl MockLocalCommandRunner { - pub fn new(exit_code: i32) -> MockLocalCommandRunner { + pub fn new(exit_code: i32, call_counter: Arc>) -> MockLocalCommandRunner { MockLocalCommandRunner { result: Ok(FallibleProcessResultWithPlatform { stdout_digest: EMPTY_DIGEST, @@ -46,6 +49,7 @@ impl MockLocalCommandRunner { execution_attempts: vec![], platform: Platform::current().unwrap(), }), + call_counter, } } } @@ -57,6 +61,8 @@ impl CommandRunnerTrait for MockLocalCommandRunner { _req: MultiPlatformProcess, _context: Context, ) -> Result { + let mut calls = self.call_counter.lock(); + *calls += 1; self.result.clone() } @@ -86,7 +92,7 @@ fn create_remote_store() -> Store { .unwrap() } -fn create_local_runner() -> (Box, Store) { +fn create_local_runner_old() -> (Box, Store) { let runtime = task_executor::Executor::new(); let base_dir = TempDir::new().unwrap(); let named_cache_dir = base_dir.path().join("named_cache_dir"); @@ -117,6 +123,12 @@ fn create_local_runner() -> (Box, Store) { (runner, store) } +fn create_local_runner(exit_code: i32) -> (Box, Arc>) { + let call_counter = Arc::new(Mutex::new(0)); + let local_runner = Box::new(MockLocalCommandRunner::new(exit_code, call_counter.clone())); + (local_runner, call_counter) +} + fn create_cached_runner( local: Box, store: Store, @@ -149,6 +161,31 @@ fn create_process() -> Process { ]) } +async fn insert_into_action_cache( + action_cache: &StubActionCache, + process: &Process, + store: &Store, + exit_code: i32, + stdout_digest: Digest, + stderr_digest: Digest, +) { + let (action, command, _exec_request) = + make_execute_request(process, ProcessMetadata::default()).unwrap(); + let (_command_digest, action_digest) = ensure_action_stored_locally(store, &command, &action) + .await + .unwrap(); + let action_result = ActionResult { + exit_code, + stdout_digest: Some(stdout_digest.into()), + stderr_digest: Some(stderr_digest.into()), + ..ActionResult::default() + }; + action_cache + .action_map + .lock() + .insert(action_digest.0, action_result); +} + fn create_script(script_exit_code: i8) -> (Process, PathBuf) { let script_dir = TempDir::new().unwrap(); let script_path = script_dir.path().join("script"); @@ -173,7 +210,7 @@ fn create_script(script_exit_code: i8) -> (Process, PathBuf) { } async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults { - let (local, store) = create_local_runner(); + let (local, store) = create_local_runner_old(); let (process, script_path) = create_script(script_exit_code); let local_result = local.run(process.clone().into(), Context::default()).await; @@ -202,17 +239,35 @@ async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults { async fn cache_read_success() { WorkunitStore::setup_for_tests(); let store = create_remote_store(); - let local_runner = Box::new(MockLocalCommandRunner::new(1)); - let (cache_runner, action_cache) = create_cached_runner(local_runner, store.clone(), false); + let (local_runner, local_runner_call_counter) = create_local_runner(1); + let (cache_runner, action_cache) = + create_cached_runner(local_runner.clone(), store.clone(), false); let process = create_process(); - action_cache.insert(&process, &store, 0, EMPTY_DIGEST, EMPTY_DIGEST); + insert_into_action_cache( + &action_cache, + &process, + &store, + 0, + EMPTY_DIGEST, + EMPTY_DIGEST, + ) + .await; - let result = cache_runner + assert_eq!(*local_runner_call_counter.lock(), 0); + let local_result = local_runner .run(process.clone().into(), Context::default()) .await .unwrap(); - assert_eq!(result.exit_code, 0); + assert_eq!(local_result.exit_code, 1); + assert_eq!(*local_runner_call_counter.lock(), 1); + + let remote_result = cache_runner + .run(process.clone().into(), Context::default()) + .await + .unwrap(); + assert_eq!(remote_result.exit_code, 0); + assert_eq!(*local_runner_call_counter.lock(), 1); } /// If the cache has any issues during reads, we should gracefully fallback to the local runner. @@ -220,18 +275,36 @@ async fn cache_read_success() { async fn cache_read_skipped_on_errors() { WorkunitStore::setup_for_tests(); let store = create_remote_store(); - let local_runner = Box::new(MockLocalCommandRunner::new(1)); - let (cache_runner, action_cache) = create_cached_runner(local_runner, store.clone(), false); + let (local_runner, local_runner_call_counter) = create_local_runner(1); + let (cache_runner, action_cache) = + create_cached_runner(local_runner.clone(), store.clone(), false); let process = create_process(); - action_cache.insert(&process, &store, 0, EMPTY_DIGEST, EMPTY_DIGEST); + insert_into_action_cache( + &action_cache, + &process, + &store, + 0, + EMPTY_DIGEST, + EMPTY_DIGEST, + ) + .await; action_cache.always_errors.store(true, Ordering::SeqCst); - let result = cache_runner + assert_eq!(*local_runner_call_counter.lock(), 0); + let local_result = local_runner .run(process.clone().into(), Context::default()) .await .unwrap(); - assert_eq!(result.exit_code, 1); + assert_eq!(local_result.exit_code, 1); + assert_eq!(*local_runner_call_counter.lock(), 1); + + let remote_result = cache_runner + .run(process.clone().into(), Context::default()) + .await + .unwrap(); + assert_eq!(remote_result.exit_code, 1); + assert_eq!(*local_runner_call_counter.lock(), 2); } // #[tokio::test] @@ -276,27 +349,6 @@ async fn failures_not_cached() { assert_eq!(results.maybe_cached.unwrap().exit_code, 127); // aka the return code for file not found } -#[tokio::test] -async fn skip_cache_on_error() { - WorkunitStore::setup_for_tests(); - - let (local, store) = create_local_runner(); - let (caching, stub_action_cache) = create_cached_runner(local, store.clone(), false); - let (process, _script_path) = create_script(0); - - stub_action_cache - .always_errors - .store(true, Ordering::SeqCst); - - // Run once to ensure the cache is skipped on errors. - let result = caching - .run(process.clone().into(), Context::default()) - .await - .unwrap(); - - assert_eq!(result.exit_code, 0); -} - /// With eager_fetch enabled, we should skip the remote cache if any of the process result's /// digests are invalid. This will force rerunning the process locally. Otherwise, we should use /// the cached result with its non-existent digests. @@ -305,7 +357,7 @@ async fn eager_fetch() { WorkunitStore::setup_for_tests(); async fn run_process(eager_fetch: bool) -> FallibleProcessResultWithPlatform { - let (local, store) = create_local_runner(); + let (local, store) = create_local_runner_old(); let (caching, stub_action_cache) = create_cached_runner(local, store.clone(), eager_fetch); // Get the `action_digest` for the Process that we're going to run. This will allow us to diff --git a/src/rust/engine/testutil/mock/Cargo.toml b/src/rust/engine/testutil/mock/Cargo.toml index 04de9e886cc..524faf9a750 100644 --- a/src/rust/engine/testutil/mock/Cargo.toml +++ b/src/rust/engine/testutil/mock/Cargo.toml @@ -14,10 +14,8 @@ hashing = { path = "../../hashing" } hyper = { version = "0.13", features = ["stream", "tcp"] } log = "0.4" parking_lot = "0.11" -process_execution = { path = "../../process_execution" } prost = { git = "https://github.com/danburkert/prost", rev = "a1cccbcee343e2c444e1cd2738c7fba2599fc391" } prost-types = "0.6" -store = { path = "../../fs/store" } testutil = { path = ".." } tokio = { version = "0.2.23", features = ["time"] } tonic = { version = "0.3" } diff --git a/src/rust/engine/testutil/mock/src/action_cache.rs b/src/rust/engine/testutil/mock/src/action_cache.rs index 40242470cf2..c346705eb52 100644 --- a/src/rust/engine/testutil/mock/src/action_cache.rs +++ b/src/rust/engine/testutil/mock/src/action_cache.rs @@ -39,13 +39,10 @@ use hashing::{Digest, Fingerprint}; use parking_lot::Mutex; use remexec::action_cache_server::{ActionCache, ActionCacheServer}; use remexec::{ActionResult, GetActionResultRequest, UpdateActionResultRequest}; -use store::Store; use tonic::transport::Server; use tonic::{Request, Response, Status}; use crate::tonic_util::AddrIncomingWithStream; -use process_execution::remote::{ensure_action_stored_locally, make_execute_request}; -use process_execution::{Process, ProcessMetadata}; pub struct StubActionCache { pub action_map: Arc>>, @@ -175,35 +172,4 @@ impl StubActionCache { pub fn address(&self) -> String { format!("{}", self.local_addr) } - - /// - /// Insert an ActionResult for a particular Process. - /// - /// This will calculate the action_digest for the input Process, using the same logic as ` - /// CommandRunner::run()`. - /// - pub async fn insert( - &self, - process: &Process, - store: &Store, - exit_code: i32, - stdout_digest: Digest, - stderr_digest: Digest, - ) { - let (action, command, _exec_request) = - make_execute_request(process, ProcessMetadata::default()).unwrap(); - let (_command_digest, action_digest) = ensure_action_stored_locally(store, &command, &action) - .await - .unwrap(); - let action_result = ActionResult { - exit_code, - stdout_digest: Some(stdout_digest.into()), - stderr_digest: Some(stderr_digest.into()), - ..ActionResult::default() - }; - self - .action_map - .lock() - .insert(action_digest.0, action_result); - } } From d53952b7d8c5a84dfa1e0bba88b558cd93e46a57 Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Tue, 19 Jan 2021 16:17:32 -0700 Subject: [PATCH 05/11] Delete old helper functions # Building wheels and fs_util will be skipped. Delete if not intended. [ci skip-build-wheels] --- .../src/remote_cache_tests.rs | 212 +++++------------- 1 file changed, 62 insertions(+), 150 deletions(-) diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index c6f3edddb51..83fe027caa1 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -1,7 +1,5 @@ use std::collections::{BTreeMap, HashSet}; use std::convert::TryInto; -use std::io::Write; -use std::path::PathBuf; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; @@ -17,20 +15,14 @@ use remexec::ActionResult; use store::{BackoffConfig, Store}; use tempfile::TempDir; use testutil::data::{TestData, TestDirectory, TestTree}; -use testutil::relative_paths; use workunit_store::WorkunitStore; use crate::remote::{ensure_action_stored_locally, make_execute_request}; use crate::{ CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, - MultiPlatformProcess, NamedCaches, Platform, Process, ProcessMetadata, + MultiPlatformProcess, Platform, Process, ProcessMetadata, }; -struct RoundtripResults { - uncached: Result, - maybe_cached: Result, -} - /// A mock of the local runner used for better hermeticity of the tests. #[derive(Clone)] struct MockLocalCommandRunner { @@ -92,37 +84,6 @@ fn create_remote_store() -> Store { .unwrap() } -fn create_local_runner_old() -> (Box, Store) { - let runtime = task_executor::Executor::new(); - let base_dir = TempDir::new().unwrap(); - let named_cache_dir = base_dir.path().join("named_cache_dir"); - let stub_cas = StubCAS::builder().build(); - let store_dir = base_dir.path().join("store_dir"); - let store = Store::with_remote( - runtime.clone(), - store_dir, - vec![stub_cas.address()], - None, - None, - None, - 1, - 10 * 1024 * 1024, - Duration::from_secs(1), - BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(), - 1, - 1, - ) - .unwrap(); - let runner = Box::new(crate::local::CommandRunner::new( - store.clone(), - runtime.clone(), - base_dir.path().to_owned(), - NamedCaches::new(named_cache_dir), - true, - )); - (runner, store) -} - fn create_local_runner(exit_code: i32) -> (Box, Arc>) { let call_counter = Arc::new(Mutex::new(0)); let local_runner = Box::new(MockLocalCommandRunner::new(exit_code, call_counter.clone())); @@ -186,55 +147,6 @@ async fn insert_into_action_cache( .insert(action_digest.0, action_result); } -fn create_script(script_exit_code: i8) -> (Process, PathBuf) { - let script_dir = TempDir::new().unwrap(); - let script_path = script_dir.path().join("script"); - std::fs::File::create(&script_path) - .and_then(|mut file| { - writeln!( - file, - "echo -n {} > roland && echo Hello && echo >&2 World; exit {}", - TestData::roland().string(), - script_exit_code - ) - }) - .unwrap(); - - let process = Process::new(vec![ - testutil::path::find_bash(), - format!("{}", script_path.display()), - ]) - .output_files(relative_paths(&["roland"]).collect()); - - (process, script_path) -} - -async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults { - let (local, store) = create_local_runner_old(); - let (process, script_path) = create_script(script_exit_code); - - let local_result = local.run(process.clone().into(), Context::default()).await; - - let (caching, _stub_action_cache) = create_cached_runner(local, store.clone(), false); - - let uncached_result = caching - .run(process.clone().into(), Context::default()) - .await; - - assert_eq!(local_result, uncached_result); - - // Removing the file means that were the command to be run again without any caching, it would - // fail due to a FileNotFound error. So, If the second run succeeds, that implies that the - // cache was successfully used. - std::fs::remove_file(&script_path).unwrap(); - let maybe_cached_result = caching.run(process.into(), Context::default()).await; - - RoundtripResults { - uncached: uncached_result, - maybe_cached: maybe_cached_result, - } -} - #[tokio::test] async fn cache_read_success() { WorkunitStore::setup_for_tests(); @@ -333,71 +245,71 @@ async fn cache_read_skipped_on_errors() { // assert_eq!(results.uncached, results.maybe_cached); // } -#[tokio::test] -async fn cache_success() { - WorkunitStore::setup_for_tests(); - let results = run_roundtrip(0).await; - assert_eq!(results.uncached, results.maybe_cached); -} - -#[tokio::test] -async fn failures_not_cached() { - WorkunitStore::setup_for_tests(); - let results = run_roundtrip(1).await; - assert_ne!(results.uncached, results.maybe_cached); - assert_eq!(results.uncached.unwrap().exit_code, 1); - assert_eq!(results.maybe_cached.unwrap().exit_code, 127); // aka the return code for file not found -} +// #[tokio::test] +// async fn cache_success() { +// WorkunitStore::setup_for_tests(); +// let results = run_roundtrip(0).await; +// assert_eq!(results.uncached, results.maybe_cached); +// } +// +// #[tokio::test] +// async fn failures_not_cached() { +// WorkunitStore::setup_for_tests(); +// let results = run_roundtrip(1).await; +// assert_ne!(results.uncached, results.maybe_cached); +// assert_eq!(results.uncached.unwrap().exit_code, 1); +// assert_eq!(results.maybe_cached.unwrap().exit_code, 127); // aka the return code for file not found +// } /// With eager_fetch enabled, we should skip the remote cache if any of the process result's /// digests are invalid. This will force rerunning the process locally. Otherwise, we should use /// the cached result with its non-existent digests. -#[tokio::test] -async fn eager_fetch() { - WorkunitStore::setup_for_tests(); - - async fn run_process(eager_fetch: bool) -> FallibleProcessResultWithPlatform { - let (local, store) = create_local_runner_old(); - let (caching, stub_action_cache) = create_cached_runner(local, store.clone(), eager_fetch); - - // Get the `action_digest` for the Process that we're going to run. This will allow us to - // insert a bogus value into the `stub_action_cache`. - let (process, _script_path) = create_script(1); - let (action, command, _exec_request) = - make_execute_request(&process, ProcessMetadata::default()).unwrap(); - let (_command_digest, action_digest) = ensure_action_stored_locally(&store, &command, &action) - .await - .unwrap(); - - // Insert an ActionResult with missing digests and a return code of 0 (instead of 1). - let bogus_action_result = ActionResult { - exit_code: 0, - stdout_digest: Some(TestData::roland().digest().into()), - stderr_digest: Some(TestData::roland().digest().into()), - ..ActionResult::default() - }; - stub_action_cache - .action_map - .lock() - .insert(action_digest.0, bogus_action_result); - - // Run the process, possibly by pulling from the `ActionCache`. - caching - .run(process.clone().into(), Context::default()) - .await - .unwrap() - } - - let lazy_result = run_process(false).await; - assert_eq!(lazy_result.exit_code, 0); - assert_eq!(lazy_result.stdout_digest, TestData::roland().digest()); - assert_eq!(lazy_result.stderr_digest, TestData::roland().digest()); - - let eager_result = run_process(true).await; - assert_eq!(eager_result.exit_code, 1); - assert_ne!(eager_result.stdout_digest, TestData::roland().digest()); - assert_ne!(eager_result.stderr_digest, TestData::roland().digest()); -} +// #[tokio::test] +// async fn eager_fetch() { +// WorkunitStore::setup_for_tests(); +// +// async fn run_process(eager_fetch: bool) -> FallibleProcessResultWithPlatform { +// let (local, store) = create_local_runner_old(); +// let (caching, stub_action_cache) = create_cached_runner(local, store.clone(), eager_fetch); +// +// // Get the `action_digest` for the Process that we're going to run. This will allow us to +// // insert a bogus value into the `stub_action_cache`. +// let (process, _script_path) = create_script(1); +// let (action, command, _exec_request) = +// make_execute_request(&process, ProcessMetadata::default()).unwrap(); +// let (_command_digest, action_digest) = ensure_action_stored_locally(&store, &command, &action) +// .await +// .unwrap(); +// +// // Insert an ActionResult with missing digests and a return code of 0 (instead of 1). +// let bogus_action_result = ActionResult { +// exit_code: 0, +// stdout_digest: Some(TestData::roland().digest().into()), +// stderr_digest: Some(TestData::roland().digest().into()), +// ..ActionResult::default() +// }; +// stub_action_cache +// .action_map +// .lock() +// .insert(action_digest.0, bogus_action_result); +// +// // Run the process, possibly by pulling from the `ActionCache`. +// caching +// .run(process.clone().into(), Context::default()) +// .await +// .unwrap() +// } +// +// let lazy_result = run_process(false).await; +// assert_eq!(lazy_result.exit_code, 0); +// assert_eq!(lazy_result.stdout_digest, TestData::roland().digest()); +// assert_eq!(lazy_result.stderr_digest, TestData::roland().digest()); +// +// let eager_result = run_process(true).await; +// assert_eq!(eager_result.exit_code, 1); +// assert_ne!(eager_result.stdout_digest, TestData::roland().digest()); +// assert_ne!(eager_result.stderr_digest, TestData::roland().digest()); +// } #[tokio::test] async fn make_tree_from_directory() { From e8184e0e60216c5491376cb94fb6fff42cad621d Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Tue, 19 Jan 2021 17:07:35 -0700 Subject: [PATCH 06/11] Rewrite eager_fetch test # Building wheels and fs_util will be skipped. Delete if not intended. [ci skip-build-wheels] --- .../src/remote_cache_tests.rs | 101 +++++++++--------- 1 file changed, 51 insertions(+), 50 deletions(-) diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index 83fe027caa1..79c6102cabe 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -219,6 +219,57 @@ async fn cache_read_skipped_on_errors() { assert_eq!(*local_runner_call_counter.lock(), 2); } +/// With eager_fetch enabled, we should skip the remote cache if any of the process result's +/// digests are invalid. This will force rerunning the process locally. Otherwise, we should use +/// the cached result with its non-existent digests. +#[tokio::test] +async fn cache_read_eager_fetch() { + WorkunitStore::setup_for_tests(); + + async fn run_process(eager_fetch: bool) -> (i32, u32) { + let store = create_remote_store(); + let (local_runner, local_runner_call_counter) = create_local_runner(1); + let (cache_runner, action_cache) = + create_cached_runner(local_runner.clone(), store.clone(), eager_fetch); + + let process = create_process(); + insert_into_action_cache( + &action_cache, + &process, + &store, + 0, + TestData::roland().digest(), + TestData::roland().digest(), + ) + .await; + + assert_eq!(*local_runner_call_counter.lock(), 0); + let local_result = local_runner + .run(process.clone().into(), Context::default()) + .await + .unwrap(); + assert_eq!(local_result.exit_code, 1); + assert_eq!(*local_runner_call_counter.lock(), 1); + + // Run the process, possibly by pulling from the `ActionCache`. + let remote_result = cache_runner + .run(process.clone().into(), Context::default()) + .await + .unwrap(); + + let final_local_count = *local_runner_call_counter.lock(); + (remote_result.exit_code, final_local_count) + } + + let (lazy_exit_code, lazy_local_call_count) = run_process(false).await; + assert_eq!(lazy_exit_code, 0); + assert_eq!(lazy_local_call_count, 1); + + let (eager_exit_code, eager_local_call_count) = run_process(true).await; + assert_eq!(eager_exit_code, 1); + assert_eq!(eager_local_call_count, 2); +} + // #[tokio::test] // async fn cache_write_success() { // WorkunitStore::setup_for_tests(); @@ -261,56 +312,6 @@ async fn cache_read_skipped_on_errors() { // assert_eq!(results.maybe_cached.unwrap().exit_code, 127); // aka the return code for file not found // } -/// With eager_fetch enabled, we should skip the remote cache if any of the process result's -/// digests are invalid. This will force rerunning the process locally. Otherwise, we should use -/// the cached result with its non-existent digests. -// #[tokio::test] -// async fn eager_fetch() { -// WorkunitStore::setup_for_tests(); -// -// async fn run_process(eager_fetch: bool) -> FallibleProcessResultWithPlatform { -// let (local, store) = create_local_runner_old(); -// let (caching, stub_action_cache) = create_cached_runner(local, store.clone(), eager_fetch); -// -// // Get the `action_digest` for the Process that we're going to run. This will allow us to -// // insert a bogus value into the `stub_action_cache`. -// let (process, _script_path) = create_script(1); -// let (action, command, _exec_request) = -// make_execute_request(&process, ProcessMetadata::default()).unwrap(); -// let (_command_digest, action_digest) = ensure_action_stored_locally(&store, &command, &action) -// .await -// .unwrap(); -// -// // Insert an ActionResult with missing digests and a return code of 0 (instead of 1). -// let bogus_action_result = ActionResult { -// exit_code: 0, -// stdout_digest: Some(TestData::roland().digest().into()), -// stderr_digest: Some(TestData::roland().digest().into()), -// ..ActionResult::default() -// }; -// stub_action_cache -// .action_map -// .lock() -// .insert(action_digest.0, bogus_action_result); -// -// // Run the process, possibly by pulling from the `ActionCache`. -// caching -// .run(process.clone().into(), Context::default()) -// .await -// .unwrap() -// } -// -// let lazy_result = run_process(false).await; -// assert_eq!(lazy_result.exit_code, 0); -// assert_eq!(lazy_result.stdout_digest, TestData::roland().digest()); -// assert_eq!(lazy_result.stderr_digest, TestData::roland().digest()); -// -// let eager_result = run_process(true).await; -// assert_eq!(eager_result.exit_code, 1); -// assert_ne!(eager_result.stdout_digest, TestData::roland().digest()); -// assert_ne!(eager_result.stderr_digest, TestData::roland().digest()); -// } - #[tokio::test] async fn make_tree_from_directory() { let store_dir = TempDir::new().unwrap(); From 4a0e0787470ef60467cc0431edf0a344807b8ba3 Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Tue, 19 Jan 2021 17:20:08 -0700 Subject: [PATCH 07/11] Factor setup of the action digest differently We need this for the cache write tests # Building wheels and fs_util will be skipped. Delete if not intended. [ci skip-build-wheels] --- .../src/remote_cache_tests.rs | 76 +++++++++---------- 1 file changed, 34 insertions(+), 42 deletions(-) diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index 79c6102cabe..07a17255174 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -115,26 +115,26 @@ fn create_cached_runner( (runner, action_cache) } -fn create_process() -> Process { - Process::new(vec![ +async fn create_process(store: &Store) -> (Process, Digest) { + let process = Process::new(vec![ testutil::path::find_bash(), "echo -n hello world".to_string(), - ]) + ]); + let (action, command, _exec_request) = + make_execute_request(&process, ProcessMetadata::default()).unwrap(); + let (_command_digest, action_digest) = ensure_action_stored_locally(store, &command, &action) + .await + .unwrap(); + (process, action_digest) } -async fn insert_into_action_cache( +fn insert_into_action_cache( action_cache: &StubActionCache, - process: &Process, - store: &Store, + action_digest: &Digest, exit_code: i32, stdout_digest: Digest, stderr_digest: Digest, ) { - let (action, command, _exec_request) = - make_execute_request(process, ProcessMetadata::default()).unwrap(); - let (_command_digest, action_digest) = ensure_action_stored_locally(store, &command, &action) - .await - .unwrap(); let action_result = ActionResult { exit_code, stdout_digest: Some(stdout_digest.into()), @@ -155,16 +155,8 @@ async fn cache_read_success() { let (cache_runner, action_cache) = create_cached_runner(local_runner.clone(), store.clone(), false); - let process = create_process(); - insert_into_action_cache( - &action_cache, - &process, - &store, - 0, - EMPTY_DIGEST, - EMPTY_DIGEST, - ) - .await; + let (process, action_digest) = create_process(&store).await; + insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST); assert_eq!(*local_runner_call_counter.lock(), 0); let local_result = local_runner @@ -191,16 +183,8 @@ async fn cache_read_skipped_on_errors() { let (cache_runner, action_cache) = create_cached_runner(local_runner.clone(), store.clone(), false); - let process = create_process(); - insert_into_action_cache( - &action_cache, - &process, - &store, - 0, - EMPTY_DIGEST, - EMPTY_DIGEST, - ) - .await; + let (process, action_digest) = create_process(&store).await; + insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST); action_cache.always_errors.store(true, Ordering::SeqCst); assert_eq!(*local_runner_call_counter.lock(), 0); @@ -232,16 +216,14 @@ async fn cache_read_eager_fetch() { let (cache_runner, action_cache) = create_cached_runner(local_runner.clone(), store.clone(), eager_fetch); - let process = create_process(); + let (process, action_digest) = create_process(&store).await; insert_into_action_cache( &action_cache, - &process, - &store, + &action_digest, 0, TestData::roland().digest(), TestData::roland().digest(), - ) - .await; + ); assert_eq!(*local_runner_call_counter.lock(), 0); let local_result = local_runner @@ -274,15 +256,25 @@ async fn cache_read_eager_fetch() { // async fn cache_write_success() { // WorkunitStore::setup_for_tests(); // let store = create_remote_store(); -// let local_runner = Box::new(MockLocalCommandRunner::new(1)); -// let (cache_runner, action_cache) = create_cached_runner(local, store, false); +// let (local_runner, local_runner_call_counter) = create_local_runner(0); +// let (cache_runner, action_cache) = +// create_cached_runner(local_runner, store.clone(), false); +// let process = create_process(); // -// let result = cache_runner.run().await.unwrap(); +// assert_eq!(*local_runner_call_counter.lock(), 0); +// assert!(action_cache.action_map.lock().is_empty()); // -// let results = run_roundtrip(0).await; -// assert_eq!(results.uncached, results.maybe_cached); -// } +// let local_result = cache_runner +// .run(process.clone().into(), Context::default()) +// .await +// .unwrap(); +// assert_eq!(local_result.exit_code, 0); +// assert_eq!(*local_runner_call_counter.lock(), 1); +// +// assert!() // +// } + // #[tokio::test] // async fn cache_write_not_for_failures() { // WorkunitStore::setup_for_tests(); From f29d6cef6d12acd45254bfb868c78f9d400e8b49 Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Tue, 19 Jan 2021 17:52:03 -0700 Subject: [PATCH 08/11] Add StoreSetup to fix the CAS dropping prematurely # Building wheels and fs_util will be skipped. Delete if not intended. [ci skip-build-wheels] --- .../src/remote_cache_tests.rs | 83 +++++++++++-------- 1 file changed, 50 insertions(+), 33 deletions(-) diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index 07a17255174..08544f82e89 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -1,5 +1,6 @@ use std::collections::{BTreeMap, HashSet}; use std::convert::TryInto; +use std::path::PathBuf; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; @@ -63,25 +64,39 @@ impl CommandRunnerTrait for MockLocalCommandRunner { } } -fn create_remote_store() -> Store { - let runtime = task_executor::Executor::new(); - let stub_cas = StubCAS::builder().build(); - let store_dir = TempDir::new().unwrap().path().join("store_dir"); - Store::with_remote( - runtime, - store_dir, - vec![stub_cas.address()], - None, - None, - None, - 1, - 10 * 1024 * 1024, - Duration::from_secs(1), - BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(), - 1, - 1, - ) - .unwrap() +// NB: We bundle these into a struct to ensure they share the same lifetime. +struct StoreSetup { + pub store: Store, + pub store_dir: PathBuf, + pub cas: StubCAS, +} + +impl StoreSetup { + pub fn new() -> StoreSetup { + let runtime = task_executor::Executor::new(); + let cas = StubCAS::builder().build(); + let store_dir = TempDir::new().unwrap().path().join("store_dir"); + let store = Store::with_remote( + runtime, + store_dir.clone(), + vec![cas.address()], + None, + None, + None, + 1, + 10 * 1024 * 1024, + Duration::from_secs(1), + BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(), + 1, + 1, + ) + .unwrap(); + StoreSetup { + store, + store_dir, + cas, + } + } } fn create_local_runner(exit_code: i32) -> (Box, Arc>) { @@ -150,12 +165,12 @@ fn insert_into_action_cache( #[tokio::test] async fn cache_read_success() { WorkunitStore::setup_for_tests(); - let store = create_remote_store(); + let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1); let (cache_runner, action_cache) = - create_cached_runner(local_runner.clone(), store.clone(), false); + create_cached_runner(local_runner.clone(), store_setup.store.clone(), false); - let (process, action_digest) = create_process(&store).await; + let (process, action_digest) = create_process(&store_setup.store).await; insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST); assert_eq!(*local_runner_call_counter.lock(), 0); @@ -178,12 +193,12 @@ async fn cache_read_success() { #[tokio::test] async fn cache_read_skipped_on_errors() { WorkunitStore::setup_for_tests(); - let store = create_remote_store(); + let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1); let (cache_runner, action_cache) = - create_cached_runner(local_runner.clone(), store.clone(), false); + create_cached_runner(local_runner.clone(), store_setup.store.clone(), false); - let (process, action_digest) = create_process(&store).await; + let (process, action_digest) = create_process(&store_setup.store).await; insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST); action_cache.always_errors.store(true, Ordering::SeqCst); @@ -211,12 +226,12 @@ async fn cache_read_eager_fetch() { WorkunitStore::setup_for_tests(); async fn run_process(eager_fetch: bool) -> (i32, u32) { - let store = create_remote_store(); + let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1); let (cache_runner, action_cache) = - create_cached_runner(local_runner.clone(), store.clone(), eager_fetch); + create_cached_runner(local_runner.clone(), store_setup.store.clone(), eager_fetch); - let (process, action_digest) = create_process(&store).await; + let (process, action_digest) = create_process(&store_setup.store).await; insert_into_action_cache( &action_cache, &action_digest, @@ -254,12 +269,13 @@ async fn cache_read_eager_fetch() { // #[tokio::test] // async fn cache_write_success() { +// let _logger = env_logger::try_init(); // WorkunitStore::setup_for_tests(); -// let store = create_remote_store(); +// let store_setup = StoreSetup::new(); // let (local_runner, local_runner_call_counter) = create_local_runner(0); // let (cache_runner, action_cache) = -// create_cached_runner(local_runner, store.clone(), false); -// let process = create_process(); +// create_cached_runner(local_runner, store_setup.store.clone(), false); +// let (process, _action_digest) = create_process(&store_setup.store).await; // // assert_eq!(*local_runner_call_counter.lock(), 0); // assert!(action_cache.action_map.lock().is_empty()); @@ -271,8 +287,9 @@ async fn cache_read_eager_fetch() { // assert_eq!(local_result.exit_code, 0); // assert_eq!(*local_runner_call_counter.lock(), 1); // -// assert!() -// +// assert_eq!(action_cache.action_map.lock().len(), 1); +// // let cache_entry = action_cache.action_map.lock().get(&action_digest.0); +// // assert_eq!(cache_entry.unwrap().exit_code, 0); // } // #[tokio::test] From bc54fe0c8ed775ea3b3dfe38245cba8d00f84e0b Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Tue, 19 Jan 2021 18:00:06 -0700 Subject: [PATCH 09/11] Add tests for remote cache writes # Building wheels and fs_util will be skipped. Delete if not intended. [ci skip-build-wheels] --- .../src/remote_cache_tests.rs | 104 +++++++++--------- 1 file changed, 51 insertions(+), 53 deletions(-) diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index 08544f82e89..1e910a8ba5b 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -267,59 +267,57 @@ async fn cache_read_eager_fetch() { assert_eq!(eager_local_call_count, 2); } -// #[tokio::test] -// async fn cache_write_success() { -// let _logger = env_logger::try_init(); -// WorkunitStore::setup_for_tests(); -// let store_setup = StoreSetup::new(); -// let (local_runner, local_runner_call_counter) = create_local_runner(0); -// let (cache_runner, action_cache) = -// create_cached_runner(local_runner, store_setup.store.clone(), false); -// let (process, _action_digest) = create_process(&store_setup.store).await; -// -// assert_eq!(*local_runner_call_counter.lock(), 0); -// assert!(action_cache.action_map.lock().is_empty()); -// -// let local_result = cache_runner -// .run(process.clone().into(), Context::default()) -// .await -// .unwrap(); -// assert_eq!(local_result.exit_code, 0); -// assert_eq!(*local_runner_call_counter.lock(), 1); -// -// assert_eq!(action_cache.action_map.lock().len(), 1); -// // let cache_entry = action_cache.action_map.lock().get(&action_digest.0); -// // assert_eq!(cache_entry.unwrap().exit_code, 0); -// } - -// #[tokio::test] -// async fn cache_write_not_for_failures() { -// WorkunitStore::setup_for_tests(); -// let store = create_remote_store(); -// let local_runner = Box::new(MockLocalCommandRunner::new(1)); -// let (cache_runner, action_cache) = create_cached_runner(local, store, false); -// -// let result = cache_runner.run().await.unwrap(); -// -// let results = run_roundtrip(0).await; -// assert_eq!(results.uncached, results.maybe_cached); -// } - -// #[tokio::test] -// async fn cache_success() { -// WorkunitStore::setup_for_tests(); -// let results = run_roundtrip(0).await; -// assert_eq!(results.uncached, results.maybe_cached); -// } -// -// #[tokio::test] -// async fn failures_not_cached() { -// WorkunitStore::setup_for_tests(); -// let results = run_roundtrip(1).await; -// assert_ne!(results.uncached, results.maybe_cached); -// assert_eq!(results.uncached.unwrap().exit_code, 1); -// assert_eq!(results.maybe_cached.unwrap().exit_code, 127); // aka the return code for file not found -// } +#[tokio::test] +async fn cache_write_success() { + WorkunitStore::setup_for_tests(); + let store_setup = StoreSetup::new(); + let (local_runner, local_runner_call_counter) = create_local_runner(0); + let (cache_runner, action_cache) = + create_cached_runner(local_runner, store_setup.store.clone(), false); + let (process, action_digest) = create_process(&store_setup.store).await; + + assert_eq!(*local_runner_call_counter.lock(), 0); + assert!(action_cache.action_map.lock().is_empty()); + + let local_result = cache_runner + .run(process.clone().into(), Context::default()) + .await + .unwrap(); + assert_eq!(local_result.exit_code, 0); + assert_eq!(*local_runner_call_counter.lock(), 1); + + assert_eq!(action_cache.action_map.lock().len(), 1); + let action_map_mutex_guard = action_cache.action_map.lock(); + assert_eq!( + action_map_mutex_guard + .get(&action_digest.0) + .unwrap() + .exit_code, + 0 + ); +} + +#[tokio::test] +async fn cache_write_not_for_failures() { + WorkunitStore::setup_for_tests(); + let store_setup = StoreSetup::new(); + let (local_runner, local_runner_call_counter) = create_local_runner(1); + let (cache_runner, action_cache) = + create_cached_runner(local_runner, store_setup.store.clone(), false); + let (process, _action_digest) = create_process(&store_setup.store).await; + + assert_eq!(*local_runner_call_counter.lock(), 0); + assert!(action_cache.action_map.lock().is_empty()); + + let local_result = cache_runner + .run(process.clone().into(), Context::default()) + .await + .unwrap(); + assert_eq!(local_result.exit_code, 1); + assert_eq!(*local_runner_call_counter.lock(), 1); + + assert!(action_cache.action_map.lock().is_empty()); +} #[tokio::test] async fn make_tree_from_directory() { From 06055748bedadf8aa0d76ab5562808f569cb4bb5 Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Tue, 19 Jan 2021 18:02:06 -0700 Subject: [PATCH 10/11] Simplify read tests to not run locally first We already have the atomic counter to keep track of local runs. It was adding unnecessary complexity # Building wheels and fs_util will be skipped. Delete if not intended. [ci skip-build-wheels] --- .../src/remote_cache_tests.rs | 30 +++---------------- 1 file changed, 4 insertions(+), 26 deletions(-) diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index 1e910a8ba5b..bd942d510dd 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -174,19 +174,12 @@ async fn cache_read_success() { insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST); assert_eq!(*local_runner_call_counter.lock(), 0); - let local_result = local_runner - .run(process.clone().into(), Context::default()) - .await - .unwrap(); - assert_eq!(local_result.exit_code, 1); - assert_eq!(*local_runner_call_counter.lock(), 1); - let remote_result = cache_runner .run(process.clone().into(), Context::default()) .await .unwrap(); assert_eq!(remote_result.exit_code, 0); - assert_eq!(*local_runner_call_counter.lock(), 1); + assert_eq!(*local_runner_call_counter.lock(), 0); } /// If the cache has any issues during reads, we should gracefully fallback to the local runner. @@ -203,19 +196,12 @@ async fn cache_read_skipped_on_errors() { action_cache.always_errors.store(true, Ordering::SeqCst); assert_eq!(*local_runner_call_counter.lock(), 0); - let local_result = local_runner - .run(process.clone().into(), Context::default()) - .await - .unwrap(); - assert_eq!(local_result.exit_code, 1); - assert_eq!(*local_runner_call_counter.lock(), 1); - let remote_result = cache_runner .run(process.clone().into(), Context::default()) .await .unwrap(); assert_eq!(remote_result.exit_code, 1); - assert_eq!(*local_runner_call_counter.lock(), 2); + assert_eq!(*local_runner_call_counter.lock(), 1); } /// With eager_fetch enabled, we should skip the remote cache if any of the process result's @@ -241,14 +227,6 @@ async fn cache_read_eager_fetch() { ); assert_eq!(*local_runner_call_counter.lock(), 0); - let local_result = local_runner - .run(process.clone().into(), Context::default()) - .await - .unwrap(); - assert_eq!(local_result.exit_code, 1); - assert_eq!(*local_runner_call_counter.lock(), 1); - - // Run the process, possibly by pulling from the `ActionCache`. let remote_result = cache_runner .run(process.clone().into(), Context::default()) .await @@ -260,11 +238,11 @@ async fn cache_read_eager_fetch() { let (lazy_exit_code, lazy_local_call_count) = run_process(false).await; assert_eq!(lazy_exit_code, 0); - assert_eq!(lazy_local_call_count, 1); + assert_eq!(lazy_local_call_count, 0); let (eager_exit_code, eager_local_call_count) = run_process(true).await; assert_eq!(eager_exit_code, 1); - assert_eq!(eager_local_call_count, 2); + assert_eq!(eager_local_call_count, 1); } #[tokio::test] From b053e3421c21f9e090f8a669e1a5197a1b321461 Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Tue, 19 Jan 2021 19:22:58 -0700 Subject: [PATCH 11/11] Use AtomicUsize instead of Mutex # Building wheels and fs_util will be skipped. Delete if not intended. [ci skip-build-wheels] --- .../src/remote_cache_tests.rs | 36 +++++++++---------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index bd942d510dd..8cc416e871a 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -1,7 +1,7 @@ use std::collections::{BTreeMap, HashSet}; use std::convert::TryInto; use std::path::PathBuf; -use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -11,7 +11,6 @@ use fs::RelativePath; use hashing::{Digest, EMPTY_DIGEST}; use maplit::hashset; use mock::{StubActionCache, StubCAS}; -use parking_lot::Mutex; use remexec::ActionResult; use store::{BackoffConfig, Store}; use tempfile::TempDir; @@ -28,11 +27,11 @@ use crate::{ #[derive(Clone)] struct MockLocalCommandRunner { result: Result, - call_counter: Arc>, + call_counter: Arc, } impl MockLocalCommandRunner { - pub fn new(exit_code: i32, call_counter: Arc>) -> MockLocalCommandRunner { + pub fn new(exit_code: i32, call_counter: Arc) -> MockLocalCommandRunner { MockLocalCommandRunner { result: Ok(FallibleProcessResultWithPlatform { stdout_digest: EMPTY_DIGEST, @@ -54,8 +53,7 @@ impl CommandRunnerTrait for MockLocalCommandRunner { _req: MultiPlatformProcess, _context: Context, ) -> Result { - let mut calls = self.call_counter.lock(); - *calls += 1; + self.call_counter.fetch_add(1, Ordering::SeqCst); self.result.clone() } @@ -99,8 +97,8 @@ impl StoreSetup { } } -fn create_local_runner(exit_code: i32) -> (Box, Arc>) { - let call_counter = Arc::new(Mutex::new(0)); +fn create_local_runner(exit_code: i32) -> (Box, Arc) { + let call_counter = Arc::new(AtomicUsize::new(0)); let local_runner = Box::new(MockLocalCommandRunner::new(exit_code, call_counter.clone())); (local_runner, call_counter) } @@ -173,13 +171,13 @@ async fn cache_read_success() { let (process, action_digest) = create_process(&store_setup.store).await; insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST); - assert_eq!(*local_runner_call_counter.lock(), 0); + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); let remote_result = cache_runner .run(process.clone().into(), Context::default()) .await .unwrap(); assert_eq!(remote_result.exit_code, 0); - assert_eq!(*local_runner_call_counter.lock(), 0); + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); } /// If the cache has any issues during reads, we should gracefully fallback to the local runner. @@ -195,13 +193,13 @@ async fn cache_read_skipped_on_errors() { insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST); action_cache.always_errors.store(true, Ordering::SeqCst); - assert_eq!(*local_runner_call_counter.lock(), 0); + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); let remote_result = cache_runner .run(process.clone().into(), Context::default()) .await .unwrap(); assert_eq!(remote_result.exit_code, 1); - assert_eq!(*local_runner_call_counter.lock(), 1); + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 1); } /// With eager_fetch enabled, we should skip the remote cache if any of the process result's @@ -211,7 +209,7 @@ async fn cache_read_skipped_on_errors() { async fn cache_read_eager_fetch() { WorkunitStore::setup_for_tests(); - async fn run_process(eager_fetch: bool) -> (i32, u32) { + async fn run_process(eager_fetch: bool) -> (i32, usize) { let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1); let (cache_runner, action_cache) = @@ -226,13 +224,13 @@ async fn cache_read_eager_fetch() { TestData::roland().digest(), ); - assert_eq!(*local_runner_call_counter.lock(), 0); + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); let remote_result = cache_runner .run(process.clone().into(), Context::default()) .await .unwrap(); - let final_local_count = *local_runner_call_counter.lock(); + let final_local_count = local_runner_call_counter.load(Ordering::SeqCst); (remote_result.exit_code, final_local_count) } @@ -254,7 +252,7 @@ async fn cache_write_success() { create_cached_runner(local_runner, store_setup.store.clone(), false); let (process, action_digest) = create_process(&store_setup.store).await; - assert_eq!(*local_runner_call_counter.lock(), 0); + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); assert!(action_cache.action_map.lock().is_empty()); let local_result = cache_runner @@ -262,7 +260,7 @@ async fn cache_write_success() { .await .unwrap(); assert_eq!(local_result.exit_code, 0); - assert_eq!(*local_runner_call_counter.lock(), 1); + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 1); assert_eq!(action_cache.action_map.lock().len(), 1); let action_map_mutex_guard = action_cache.action_map.lock(); @@ -284,7 +282,7 @@ async fn cache_write_not_for_failures() { create_cached_runner(local_runner, store_setup.store.clone(), false); let (process, _action_digest) = create_process(&store_setup.store).await; - assert_eq!(*local_runner_call_counter.lock(), 0); + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); assert!(action_cache.action_map.lock().is_empty()); let local_result = cache_runner @@ -292,7 +290,7 @@ async fn cache_write_not_for_failures() { .await .unwrap(); assert_eq!(local_result.exit_code, 1); - assert_eq!(*local_runner_call_counter.lock(), 1); + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 1); assert!(action_cache.action_map.lock().is_empty()); }