diff --git a/src/python/pants/engine/internals/native_engine.pyi b/src/python/pants/engine/internals/native_engine.pyi index 5fa1f14c2f6..382ddae11ea 100644 --- a/src/python/pants/engine/internals/native_engine.pyi +++ b/src/python/pants/engine/internals/native_engine.pyi @@ -62,3 +62,12 @@ class PyTasks: class PyTypes: def __init__(self, **kwargs: Any) -> None: ... + +class PyStubCASBuilder: + def always_errors(self): ... + def build(self, executor: PyExecutor) -> PyStubCAS: ... + +class PyStubCAS: + @classmethod + def builder(cls) -> PyStubCASBuilder: ... + def address(self) -> str: ... diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 0c99f3a76eb..518e8324ad3 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -651,6 +651,7 @@ dependencies = [ "lazy_static", "log 0.4.11", "logging", + "mock", "nailgun", "num_enum", "parking_lot", diff --git a/src/rust/engine/Cargo.toml b/src/rust/engine/Cargo.toml index f3f984ec432..d2a8728e97e 100644 --- a/src/rust/engine/Cargo.toml +++ b/src/rust/engine/Cargo.toml @@ -127,6 +127,7 @@ smallvec = "0.6" store = { path = "fs/store" } task_executor = { path = "task_executor" } tempfile = "3" +testutil_mock = { package = "mock", path = "testutil/mock" } time = "0.1.40" tokio = { version = "0.2.23", features = ["macros", "rt-threaded"] } tryfuture = { path = "tryfuture" } diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 7dd81cba055..932a80a9595 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -199,7 +199,7 @@ impl Core { } fn make_command_runner( - store: &Store, + full_store: &Store, remote_store_servers: &[String], executor: &Executor, local_execution_root_dir: &Path, @@ -218,9 +218,9 @@ impl Core { // with the local command runner. This reduces the surface area of where the remote store is // used to only be the remote cache command runner. let store_for_local_runner = if remote_caching_used && remoting_opts.cache_eager_fetch { - store.clone().into_local_only() + full_store.clone().into_local_only() } else { - store.clone() + full_store.clone() }; let local_command_runner = Core::make_local_execution_runner( @@ -238,7 +238,7 @@ impl Core { if remoting_opts.execution_enable { Box::new(BoundedCommandRunner::new( Core::make_remote_execution_runner( - store, + full_store, process_execution_metadata, &remoting_opts, root_ca_certs, @@ -253,7 +253,7 @@ impl Core { Box::new(process_execution::remote_cache::CommandRunner::new( local_command_runner.into(), process_execution_metadata.clone(), - store.clone(), + full_store.clone(), action_cache_address.as_str(), root_ca_certs.clone(), oauth_bearer_token.clone(), @@ -279,7 +279,7 @@ impl Core { Box::new(process_execution::cache::CommandRunner::new( maybe_remote_enabled_command_runner.into(), process_execution_store, - store.clone(), + full_store.clone(), process_execution_metadata.clone(), )) } else { @@ -378,7 +378,7 @@ impl Core { safe_create_dir_all_ioerror(&local_store_dir) .map_err(|e| format!("Error making directory {:?}: {:?}", local_store_dir, e))?; - let store = Core::make_store( + let full_store = Self::make_store( &executor, &local_store_dir, need_remote_store, @@ -389,14 +389,26 @@ impl Core { ) .map_err(|e| format!("Could not initialize Store: {:?}", e))?; + let store = if (exec_strategy_opts.remote_cache_read || exec_strategy_opts.remote_cache_write) + && remoting_opts.cache_eager_fetch + { + // In remote cache mode with eager fetching, the only interaction with the remote CAS + // should be through the remote cache code paths. Thus, the store seen by the rest of the + // code base should be the local-only store. + full_store.clone().into_local_only() + } else { + // Otherwise, the remote CAS should be visible everywhere. + full_store.clone() + }; + let process_execution_metadata = ProcessMetadata { instance_name: remoting_opts.instance_name.clone(), cache_key_gen_version: remoting_opts.execution_process_cache_namespace.clone(), platform_properties: remoting_opts.execution_extra_platform_properties.clone(), }; - let command_runner = Core::make_command_runner( - &store, + let command_runner = Self::make_command_runner( + &full_store, &remote_store_servers, &executor, &local_execution_root_dir, @@ -412,7 +424,7 @@ impl Core { let graph = Arc::new(InvalidatableGraph(Graph::new())); // These certs are for downloads, not to be confused with the ones used for remoting. - let ca_certs = Core::load_certificates(ca_certs_path)?; + let ca_certs = Self::load_certificates(ca_certs_path)?; let http_client_builder = ca_certs .iter() diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index fb505f3290b..9dcf011a3d9 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -76,6 +76,8 @@ use crate::{ Function, Intrinsics, Params, RemotingOptions, Rule, Scheduler, Session, Tasks, Types, Value, }; +mod testutil; + py_exception!(native_engine, PollTimeout); py_exception!(native_engine, NailgunConnectionException); py_exception!(native_engine, NailgunClientException); @@ -419,6 +421,9 @@ py_module_initializer!(native_engine, |py, m| { m.add_class::(py)?; m.add_class::(py)?; + m.add_class::(py)?; + m.add_class::(py)?; + Ok(()) }); @@ -482,7 +487,7 @@ py_class!(class PyTypes |py| { } }); -py_class!(class PyExecutor |py| { +py_class!(pub class PyExecutor |py| { data executor: Executor; def __new__(_cls, core_threads: usize, max_threads: usize) -> CPyResult { let executor = Executor::new_owned(core_threads, max_threads).map_err(|e| PyErr::new::(py, (e,)))?; diff --git a/src/rust/engine/src/externs/interface/testutil.rs b/src/rust/engine/src/externs/interface/testutil.rs new file mode 100644 index 00000000000..14ab3fd239f --- /dev/null +++ b/src/rust/engine/src/externs/interface/testutil.rs @@ -0,0 +1,48 @@ +use std::sync::Arc; + +use super::PyExecutor; +use cpython::{exc, py_class, PyErr, PyObject, PyResult, PyString}; +use parking_lot::Mutex; +use testutil_mock::{StubCAS, StubCASBuilder}; + +py_class!(pub class PyStubCASBuilder |py| { + data builder: Arc>>; + + def always_errors(&self) -> PyResult { + let mut builder_opt = self.builder(py).lock(); + let builder = builder_opt + .take() + .ok_or_else(|| PyErr::new::(py, (PyString::new(py, "unable to unwrap StubCASBuilder"),)))? + .always_errors(); + *builder_opt = Some(builder); + Ok(py.None()) + } + + def build(&self, executor: PyExecutor) -> PyResult { + let executor = executor.executor(py); + executor.enter(|| { + let mut builder_opt = self.builder(py).lock(); + let builder = builder_opt + .take() + .ok_or_else(|| PyErr::new::(py, (PyString::new(py, "unable to unwrap StubCASBuilder"),)))?; + let cas = builder.build(); + PyStubCAS::create_instance(py, cas) + }) + } +}); + +py_class!(pub class PyStubCAS |py| { + data server: StubCAS; + + @classmethod + def builder(cls) -> PyResult { + let builder = StubCAS::builder(); + PyStubCASBuilder::create_instance(py, Arc::new(Mutex::new(Some(builder)))) + } + + def address(&self) -> PyResult { + let server = self.server(py); + let address = server.address(); + Ok(PyString::new(py, &address)) + } +}); diff --git a/src/rust/engine/testutil/mock/src/cas.rs b/src/rust/engine/testutil/mock/src/cas.rs index 9753c548b3a..87f09d35097 100644 --- a/src/rust/engine/testutil/mock/src/cas.rs +++ b/src/rust/engine/testutil/mock/src/cas.rs @@ -44,7 +44,9 @@ pub struct StubCAS { impl Drop for StubCAS { fn drop(&mut self) { - self.shutdown_sender.take().unwrap().send(()).unwrap(); + if let Some(s) = self.shutdown_sender.take() { + let _ = s.send(()); + } } } diff --git a/src/rust/engine/testutil/mock/src/lib.rs b/src/rust/engine/testutil/mock/src/lib.rs index 96b2204e7b4..a8295875fd1 100644 --- a/src/rust/engine/testutil/mock/src/lib.rs +++ b/src/rust/engine/testutil/mock/src/lib.rs @@ -33,5 +33,5 @@ pub mod execution_server; mod tonic_util; pub use crate::action_cache::StubActionCache; -pub use crate::cas::StubCAS; +pub use crate::cas::{StubCAS, StubCASBuilder}; pub use crate::execution_server::MockExecution; diff --git a/tests/python/pants_test/integration/BUILD b/tests/python/pants_test/integration/BUILD index a2469e0eccc..c6484478ff4 100644 --- a/tests/python/pants_test/integration/BUILD +++ b/tests/python/pants_test/integration/BUILD @@ -57,4 +57,15 @@ python_integration_tests( timeout = 180, ) +python_integration_tests( + name = 'remote_cache_integration', + sources = ['remote_cache_integration_test.py'], + dependencies = [ + 'testprojects/src/python:hello_directory', + ], + uses_pants_run=True, + timeout = 180, +) + + python_library() diff --git a/tests/python/pants_test/integration/remote_cache_integration_test.py b/tests/python/pants_test/integration/remote_cache_integration_test.py new file mode 100644 index 00000000000..7008c5462ab --- /dev/null +++ b/tests/python/pants_test/integration/remote_cache_integration_test.py @@ -0,0 +1,45 @@ +# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). +# Licensed under the Apache License, Version 2.0 (see LICENSE). + +import re + +from pants.engine.internals.native_engine import PyExecutor, PyStubCAS +from pants.option.scope import GLOBAL_SCOPE_CONFIG_SECTION +from pants.testutil.pants_integration_test import run_pants + + +def test_warns_on_remote_cache_errors(): + executor = PyExecutor(2, 4) + builder = PyStubCAS.builder() + builder.always_errors() + cas = builder.build(executor) + address = cas.address() + + pants_run = run_pants( + [ + "--backend-packages=['pants.backend.python']", + "--no-dynamic-ui", + "--level=info", + "package", + "testprojects/src/python/hello/main:main", + ], + use_pantsd=False, + config={ + GLOBAL_SCOPE_CONFIG_SECTION: { + "remote_cache_read": True, + "remote_cache_write": True, + "remote_store_server": address, + } + }, + ) + + pants_run.assert_success() + assert "Failed to read from remote cache: Unimplemented" in pants_run.stderr + assert ( + re.search( + "Failed to write to remote cache:.*StubCAS is configured to always fail", + pants_run.stderr, + re.MULTILINE, + ) + is not None + )