diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index a6381210919..8cc416e871a 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -1,72 +1,113 @@ use std::collections::{BTreeMap, HashSet}; use std::convert::TryInto; -use std::io::Write; use std::path::PathBuf; -use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use bazel_protos::gen::build::bazel::remote::execution::v2 as remexec; use fs::RelativePath; -use hashing::Digest; +use hashing::{Digest, EMPTY_DIGEST}; use maplit::hashset; use mock::{StubActionCache, StubCAS}; use remexec::ActionResult; use store::{BackoffConfig, Store}; use tempfile::TempDir; use testutil::data::{TestData, TestDirectory, TestTree}; -use testutil::relative_paths; use workunit_store::WorkunitStore; use crate::remote::{ensure_action_stored_locally, make_execute_request}; use crate::{ CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, - MultiPlatformProcess, NamedCaches, Platform, Process, ProcessMetadata, + MultiPlatformProcess, Platform, Process, ProcessMetadata, }; -struct RoundtripResults { - uncached: Result, - maybe_cached: Result, +/// A mock of the local runner used for better hermeticity of the tests. +#[derive(Clone)] +struct MockLocalCommandRunner { + result: Result, + call_counter: Arc, } -fn create_local_runner() -> (Box, Store, TempDir, StubCAS) { - let runtime = task_executor::Executor::new(); - let base_dir = TempDir::new().unwrap(); - let named_cache_dir = base_dir.path().join("named_cache_dir"); - let stub_cas = StubCAS::builder().build(); - let store_dir = base_dir.path().join("store_dir"); - let store = Store::with_remote( - runtime.clone(), - store_dir, - vec![stub_cas.address()], - None, - None, - None, - 1, - 10 * 1024 * 1024, - Duration::from_secs(1), - BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(), - 1, - 1, - ) - .unwrap(); - let runner = Box::new(crate::local::CommandRunner::new( - store.clone(), - runtime.clone(), - base_dir.path().to_owned(), - NamedCaches::new(named_cache_dir), - true, - )); - (runner, store, base_dir, stub_cas) +impl MockLocalCommandRunner { + pub fn new(exit_code: i32, call_counter: Arc) -> MockLocalCommandRunner { + MockLocalCommandRunner { + result: Ok(FallibleProcessResultWithPlatform { + stdout_digest: EMPTY_DIGEST, + stderr_digest: EMPTY_DIGEST, + exit_code, + output_directory: EMPTY_DIGEST, + execution_attempts: vec![], + platform: Platform::current().unwrap(), + }), + call_counter, + } + } +} + +#[async_trait] +impl CommandRunnerTrait for MockLocalCommandRunner { + async fn run( + &self, + _req: MultiPlatformProcess, + _context: Context, + ) -> Result { + self.call_counter.fetch_add(1, Ordering::SeqCst); + self.result.clone() + } + + fn extract_compatible_request(&self, req: &MultiPlatformProcess) -> Option { + Some(req.0.get(&None).unwrap().clone()) + } +} + +// NB: We bundle these into a struct to ensure they share the same lifetime. +struct StoreSetup { + pub store: Store, + pub store_dir: PathBuf, + pub cas: StubCAS, +} + +impl StoreSetup { + pub fn new() -> StoreSetup { + let runtime = task_executor::Executor::new(); + let cas = StubCAS::builder().build(); + let store_dir = TempDir::new().unwrap().path().join("store_dir"); + let store = Store::with_remote( + runtime, + store_dir.clone(), + vec![cas.address()], + None, + None, + None, + 1, + 10 * 1024 * 1024, + Duration::from_secs(1), + BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(), + 1, + 1, + ) + .unwrap(); + StoreSetup { + store, + store_dir, + cas, + } + } +} + +fn create_local_runner(exit_code: i32) -> (Box, Arc) { + let call_counter = Arc::new(AtomicUsize::new(0)); + let local_runner = Box::new(MockLocalCommandRunner::new(exit_code, call_counter.clone())); + (local_runner, call_counter) } fn create_cached_runner( local: Box, store: Store, eager_fetch: bool, -) -> (Box, TempDir, StubActionCache) { - let cache_dir = TempDir::new().unwrap(); +) -> (Box, StubActionCache) { let action_cache = StubActionCache::new().unwrap(); let runner = Box::new( crate::remote_cache::CommandRunner::new( @@ -84,144 +125,174 @@ fn create_cached_runner( ) .expect("caching command runner"), ); - (runner, cache_dir, action_cache) + (runner, action_cache) } -fn create_script(script_exit_code: i8) -> (Process, PathBuf, TempDir) { - let script_dir = TempDir::new().unwrap(); - let script_path = script_dir.path().join("script"); - std::fs::File::create(&script_path) - .and_then(|mut file| { - writeln!( - file, - "echo -n {} > roland && echo Hello && echo >&2 World; exit {}", - TestData::roland().string(), - script_exit_code - ) - }) - .unwrap(); - +async fn create_process(store: &Store) -> (Process, Digest) { let process = Process::new(vec![ testutil::path::find_bash(), - format!("{}", script_path.display()), - ]) - .output_files(relative_paths(&["roland"]).collect()); - - (process, script_path, script_dir) + "echo -n hello world".to_string(), + ]); + let (action, command, _exec_request) = + make_execute_request(&process, ProcessMetadata::default()).unwrap(); + let (_command_digest, action_digest) = ensure_action_stored_locally(store, &command, &action) + .await + .unwrap(); + (process, action_digest) } -async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults { - let (local, store, _local_runner_dir, _stub_cas) = create_local_runner(); - let (process, script_path, _script_dir) = create_script(script_exit_code); - - let local_result = local.run(process.clone().into(), Context::default()).await; - - let (caching, _cache_dir, _stub_action_cache) = create_cached_runner(local, store.clone(), false); - - let uncached_result = caching - .run(process.clone().into(), Context::default()) - .await; - - assert_eq!(local_result, uncached_result); - - // Removing the file means that were the command to be run again without any caching, it would - // fail due to a FileNotFound error. So, If the second run succeeds, that implies that the - // cache was successfully used. - std::fs::remove_file(&script_path).unwrap(); - let maybe_cached_result = caching.run(process.into(), Context::default()).await; - - RoundtripResults { - uncached: uncached_result, - maybe_cached: maybe_cached_result, - } +fn insert_into_action_cache( + action_cache: &StubActionCache, + action_digest: &Digest, + exit_code: i32, + stdout_digest: Digest, + stderr_digest: Digest, +) { + let action_result = ActionResult { + exit_code, + stdout_digest: Some(stdout_digest.into()), + stderr_digest: Some(stderr_digest.into()), + ..ActionResult::default() + }; + action_cache + .action_map + .lock() + .insert(action_digest.0, action_result); } #[tokio::test] -async fn cache_success() { +async fn cache_read_success() { WorkunitStore::setup_for_tests(); - let results = run_roundtrip(0).await; - assert_eq!(results.uncached, results.maybe_cached); -} + let store_setup = StoreSetup::new(); + let (local_runner, local_runner_call_counter) = create_local_runner(1); + let (cache_runner, action_cache) = + create_cached_runner(local_runner.clone(), store_setup.store.clone(), false); -#[tokio::test] -async fn failures_not_cached() { - WorkunitStore::setup_for_tests(); - let results = run_roundtrip(1).await; - assert_ne!(results.uncached, results.maybe_cached); - assert_eq!(results.uncached.unwrap().exit_code, 1); - assert_eq!(results.maybe_cached.unwrap().exit_code, 127); // aka the return code for file not found + let (process, action_digest) = create_process(&store_setup.store).await; + insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST); + + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); + let remote_result = cache_runner + .run(process.clone().into(), Context::default()) + .await + .unwrap(); + assert_eq!(remote_result.exit_code, 0); + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); } +/// If the cache has any issues during reads, we should gracefully fallback to the local runner. #[tokio::test] -async fn skip_cache_on_error() { +async fn cache_read_skipped_on_errors() { WorkunitStore::setup_for_tests(); + let store_setup = StoreSetup::new(); + let (local_runner, local_runner_call_counter) = create_local_runner(1); + let (cache_runner, action_cache) = + create_cached_runner(local_runner.clone(), store_setup.store.clone(), false); - let (local, store, _local_runner_dir, _stub_cas) = create_local_runner(); - let (caching, _cache_dir, stub_action_cache) = create_cached_runner(local, store.clone(), false); - let (process, _script_path, _script_dir) = create_script(0); - - stub_action_cache - .always_errors - .store(true, Ordering::SeqCst); + let (process, action_digest) = create_process(&store_setup.store).await; + insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST); + action_cache.always_errors.store(true, Ordering::SeqCst); - // Run once to ensure the cache is skipped on errors. - let result = caching + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); + let remote_result = cache_runner .run(process.clone().into(), Context::default()) .await .unwrap(); - - assert_eq!(result.exit_code, 0); + assert_eq!(remote_result.exit_code, 1); + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 1); } /// With eager_fetch enabled, we should skip the remote cache if any of the process result's /// digests are invalid. This will force rerunning the process locally. Otherwise, we should use /// the cached result with its non-existent digests. #[tokio::test] -async fn eager_fetch() { +async fn cache_read_eager_fetch() { WorkunitStore::setup_for_tests(); - async fn run_process(eager_fetch: bool) -> FallibleProcessResultWithPlatform { - let (local, store, _local_runner_dir, _stub_cas) = create_local_runner(); - let (caching, _cache_dir, stub_action_cache) = - create_cached_runner(local, store.clone(), eager_fetch); - - // Get the `action_digest` for the Process that we're going to run. This will allow us to - // insert a bogus value into the `stub_action_cache`. - let (process, _script_path, _script_dir) = create_script(1); - let (action, command, _exec_request) = - make_execute_request(&process, ProcessMetadata::default()).unwrap(); - let (_command_digest, action_digest) = ensure_action_stored_locally(&store, &command, &action) + async fn run_process(eager_fetch: bool) -> (i32, usize) { + let store_setup = StoreSetup::new(); + let (local_runner, local_runner_call_counter) = create_local_runner(1); + let (cache_runner, action_cache) = + create_cached_runner(local_runner.clone(), store_setup.store.clone(), eager_fetch); + + let (process, action_digest) = create_process(&store_setup.store).await; + insert_into_action_cache( + &action_cache, + &action_digest, + 0, + TestData::roland().digest(), + TestData::roland().digest(), + ); + + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); + let remote_result = cache_runner + .run(process.clone().into(), Context::default()) .await .unwrap(); - // Insert an ActionResult with missing digests and a return code of 0 (instead of 1). - let bogus_action_result = ActionResult { - exit_code: 0, - stdout_digest: Some(TestData::roland().digest().into()), - stderr_digest: Some(TestData::roland().digest().into()), - ..ActionResult::default() - }; - stub_action_cache - .action_map - .lock() - .insert(action_digest.0, bogus_action_result); - - // Run the process, possibly by pulling from the `ActionCache`. - caching - .run(process.clone().into(), Context::default()) - .await - .unwrap() + let final_local_count = local_runner_call_counter.load(Ordering::SeqCst); + (remote_result.exit_code, final_local_count) } - let lazy_result = run_process(false).await; - assert_eq!(lazy_result.exit_code, 0); - assert_eq!(lazy_result.stdout_digest, TestData::roland().digest()); - assert_eq!(lazy_result.stderr_digest, TestData::roland().digest()); + let (lazy_exit_code, lazy_local_call_count) = run_process(false).await; + assert_eq!(lazy_exit_code, 0); + assert_eq!(lazy_local_call_count, 0); + + let (eager_exit_code, eager_local_call_count) = run_process(true).await; + assert_eq!(eager_exit_code, 1); + assert_eq!(eager_local_call_count, 1); +} + +#[tokio::test] +async fn cache_write_success() { + WorkunitStore::setup_for_tests(); + let store_setup = StoreSetup::new(); + let (local_runner, local_runner_call_counter) = create_local_runner(0); + let (cache_runner, action_cache) = + create_cached_runner(local_runner, store_setup.store.clone(), false); + let (process, action_digest) = create_process(&store_setup.store).await; + + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); + assert!(action_cache.action_map.lock().is_empty()); + + let local_result = cache_runner + .run(process.clone().into(), Context::default()) + .await + .unwrap(); + assert_eq!(local_result.exit_code, 0); + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 1); + + assert_eq!(action_cache.action_map.lock().len(), 1); + let action_map_mutex_guard = action_cache.action_map.lock(); + assert_eq!( + action_map_mutex_guard + .get(&action_digest.0) + .unwrap() + .exit_code, + 0 + ); +} + +#[tokio::test] +async fn cache_write_not_for_failures() { + WorkunitStore::setup_for_tests(); + let store_setup = StoreSetup::new(); + let (local_runner, local_runner_call_counter) = create_local_runner(1); + let (cache_runner, action_cache) = + create_cached_runner(local_runner, store_setup.store.clone(), false); + let (process, _action_digest) = create_process(&store_setup.store).await; + + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); + assert!(action_cache.action_map.lock().is_empty()); + + let local_result = cache_runner + .run(process.clone().into(), Context::default()) + .await + .unwrap(); + assert_eq!(local_result.exit_code, 1); + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 1); - let eager_result = run_process(true).await; - assert_eq!(eager_result.exit_code, 1); - assert_ne!(eager_result.stdout_digest, TestData::roland().digest()); - assert_ne!(eager_result.stderr_digest, TestData::roland().digest()); + assert!(action_cache.action_map.lock().is_empty()); } #[tokio::test]