Skip to content

Commit

Permalink
fix use of remote store when in eager cache mode (#11468)
Browse files Browse the repository at this point in the history
### Problem

As described in #11455, in remote cache mode with eager fetching, the only accesses to the remote CAS should be through the remote cache code; the remainder of the Pants code base should only see the local store. Intrinsics, such as download_file_to_digest, still have access to the remote CAS currently, which means that any issues with the remote CAS will cause an error instead of being a warning if the access to the remote CAS had occurred through the remote cache code. The intended behavior is for all problems with the remote CAS to be warnings when in remote cache mode.

### Solution

In remote cache mode with eager fetching, only expose the local store to most of the Pants code base.

### Result

Added an integration test to test that warnings are generated in remote cache mode.

Fixes #11455
  • Loading branch information
Tom Dyas committed Jan 15, 2021
1 parent 2dc94ca commit c2065ea
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 13 deletions.
9 changes: 9 additions & 0 deletions src/python/pants/engine/internals/native_engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,12 @@ class PyTasks:

class PyTypes:
def __init__(self, **kwargs: Any) -> None: ...

class PyStubCASBuilder:
def always_errors(self): ...
def build(self, executor: PyExecutor) -> PyStubCAS: ...

class PyStubCAS:
@classmethod
def builder(cls) -> PyStubCASBuilder: ...
def address(self) -> str: ...
1 change: 1 addition & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ smallvec = "0.6"
store = { path = "fs/store" }
task_executor = { path = "task_executor" }
tempfile = "3"
testutil_mock = { package = "mock", path = "testutil/mock" }
time = "0.1.40"
tokio = { version = "0.2.23", features = ["macros", "rt-threaded"] }
tryfuture = { path = "tryfuture" }
Expand Down
32 changes: 22 additions & 10 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl Core {
}

fn make_command_runner(
store: &Store,
full_store: &Store,
remote_store_servers: &[String],
executor: &Executor,
local_execution_root_dir: &Path,
Expand All @@ -218,9 +218,9 @@ impl Core {
// with the local command runner. This reduces the surface area of where the remote store is
// used to only be the remote cache command runner.
let store_for_local_runner = if remote_caching_used && remoting_opts.cache_eager_fetch {
store.clone().into_local_only()
full_store.clone().into_local_only()
} else {
store.clone()
full_store.clone()
};

let local_command_runner = Core::make_local_execution_runner(
Expand All @@ -238,7 +238,7 @@ impl Core {
if remoting_opts.execution_enable {
Box::new(BoundedCommandRunner::new(
Core::make_remote_execution_runner(
store,
full_store,
process_execution_metadata,
&remoting_opts,
root_ca_certs,
Expand All @@ -253,7 +253,7 @@ impl Core {
Box::new(process_execution::remote_cache::CommandRunner::new(
local_command_runner.into(),
process_execution_metadata.clone(),
store.clone(),
full_store.clone(),
action_cache_address.as_str(),
root_ca_certs.clone(),
oauth_bearer_token.clone(),
Expand All @@ -279,7 +279,7 @@ impl Core {
Box::new(process_execution::cache::CommandRunner::new(
maybe_remote_enabled_command_runner.into(),
process_execution_store,
store.clone(),
full_store.clone(),
process_execution_metadata.clone(),
))
} else {
Expand Down Expand Up @@ -378,7 +378,7 @@ impl Core {

safe_create_dir_all_ioerror(&local_store_dir)
.map_err(|e| format!("Error making directory {:?}: {:?}", local_store_dir, e))?;
let store = Core::make_store(
let full_store = Self::make_store(
&executor,
&local_store_dir,
need_remote_store,
Expand All @@ -389,14 +389,26 @@ impl Core {
)
.map_err(|e| format!("Could not initialize Store: {:?}", e))?;

let store = if (exec_strategy_opts.remote_cache_read || exec_strategy_opts.remote_cache_write)
&& remoting_opts.cache_eager_fetch
{
// In remote cache mode with eager fetching, the only interaction with the remote CAS
// should be through the remote cache code paths. Thus, the store seen by the rest of the
// code base should be the local-only store.
full_store.clone().into_local_only()
} else {
// Otherwise, the remote CAS should be visible everywhere.
full_store.clone()
};

let process_execution_metadata = ProcessMetadata {
instance_name: remoting_opts.instance_name.clone(),
cache_key_gen_version: remoting_opts.execution_process_cache_namespace.clone(),
platform_properties: remoting_opts.execution_extra_platform_properties.clone(),
};

let command_runner = Core::make_command_runner(
&store,
let command_runner = Self::make_command_runner(
&full_store,
&remote_store_servers,
&executor,
&local_execution_root_dir,
Expand All @@ -412,7 +424,7 @@ impl Core {
let graph = Arc::new(InvalidatableGraph(Graph::new()));

// These certs are for downloads, not to be confused with the ones used for remoting.
let ca_certs = Core::load_certificates(ca_certs_path)?;
let ca_certs = Self::load_certificates(ca_certs_path)?;

let http_client_builder = ca_certs
.iter()
Expand Down
7 changes: 6 additions & 1 deletion src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ use crate::{
Function, Intrinsics, Params, RemotingOptions, Rule, Scheduler, Session, Tasks, Types, Value,
};

mod testutil;

py_exception!(native_engine, PollTimeout);
py_exception!(native_engine, NailgunConnectionException);
py_exception!(native_engine, NailgunClientException);
Expand Down Expand Up @@ -419,6 +421,9 @@ py_module_initializer!(native_engine, |py, m| {
m.add_class::<externs::fs::PyDigest>(py)?;
m.add_class::<externs::fs::PySnapshot>(py)?;

m.add_class::<self::testutil::PyStubCAS>(py)?;
m.add_class::<self::testutil::PyStubCASBuilder>(py)?;

Ok(())
});

Expand Down Expand Up @@ -482,7 +487,7 @@ py_class!(class PyTypes |py| {
}
});

py_class!(class PyExecutor |py| {
py_class!(pub class PyExecutor |py| {
data executor: Executor;
def __new__(_cls, core_threads: usize, max_threads: usize) -> CPyResult<Self> {
let executor = Executor::new_owned(core_threads, max_threads).map_err(|e| PyErr::new::<exc::Exception, _>(py, (e,)))?;
Expand Down
48 changes: 48 additions & 0 deletions src/rust/engine/src/externs/interface/testutil.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::sync::Arc;

use super::PyExecutor;
use cpython::{exc, py_class, PyErr, PyObject, PyResult, PyString};
use parking_lot::Mutex;
use testutil_mock::{StubCAS, StubCASBuilder};

py_class!(pub class PyStubCASBuilder |py| {
data builder: Arc<Mutex<Option<StubCASBuilder>>>;

def always_errors(&self) -> PyResult<PyObject> {
let mut builder_opt = self.builder(py).lock();
let builder = builder_opt
.take()
.ok_or_else(|| PyErr::new::<exc::Exception, _>(py, (PyString::new(py, "unable to unwrap StubCASBuilder"),)))?
.always_errors();
*builder_opt = Some(builder);
Ok(py.None())
}

def build(&self, executor: PyExecutor) -> PyResult<PyStubCAS> {
let executor = executor.executor(py);
executor.enter(|| {
let mut builder_opt = self.builder(py).lock();
let builder = builder_opt
.take()
.ok_or_else(|| PyErr::new::<exc::Exception, _>(py, (PyString::new(py, "unable to unwrap StubCASBuilder"),)))?;
let cas = builder.build();
PyStubCAS::create_instance(py, cas)
})
}
});

py_class!(pub class PyStubCAS |py| {
data server: StubCAS;

@classmethod
def builder(cls) -> PyResult<PyStubCASBuilder> {
let builder = StubCAS::builder();
PyStubCASBuilder::create_instance(py, Arc::new(Mutex::new(Some(builder))))
}

def address(&self) -> PyResult<PyString> {
let server = self.server(py);
let address = server.address();
Ok(PyString::new(py, &address))
}
});
4 changes: 3 additions & 1 deletion src/rust/engine/testutil/mock/src/cas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ pub struct StubCAS {

impl Drop for StubCAS {
fn drop(&mut self) {
self.shutdown_sender.take().unwrap().send(()).unwrap();
if let Some(s) = self.shutdown_sender.take() {
let _ = s.send(());
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/testutil/mock/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ pub mod execution_server;
mod tonic_util;

pub use crate::action_cache::StubActionCache;
pub use crate::cas::StubCAS;
pub use crate::cas::{StubCAS, StubCASBuilder};
pub use crate::execution_server::MockExecution;
11 changes: 11 additions & 0 deletions tests/python/pants_test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,15 @@ python_integration_tests(
timeout = 180,
)

python_integration_tests(
name = 'remote_cache_integration',
sources = ['remote_cache_integration_test.py'],
dependencies = [
'testprojects/src/python:hello_directory',
],
uses_pants_run=True,
timeout = 180,
)


python_library()
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

import re

from pants.engine.internals.native_engine import PyExecutor, PyStubCAS
from pants.option.scope import GLOBAL_SCOPE_CONFIG_SECTION
from pants.testutil.pants_integration_test import run_pants


def test_warns_on_remote_cache_errors():
executor = PyExecutor(2, 4)
builder = PyStubCAS.builder()
builder.always_errors()
cas = builder.build(executor)
address = cas.address()

pants_run = run_pants(
[
"--backend-packages=['pants.backend.python']",
"--no-dynamic-ui",
"--level=info",
"package",
"testprojects/src/python/hello/main:main",
],
use_pantsd=False,
config={
GLOBAL_SCOPE_CONFIG_SECTION: {
"remote_cache_read": True,
"remote_cache_write": True,
"remote_store_server": address,
}
},
)

pants_run.assert_success()
assert "Failed to read from remote cache: Unimplemented" in pants_run.stderr
assert (
re.search(
"Failed to write to remote cache:.*StubCAS is configured to always fail",
pants_run.stderr,
re.MULTILINE,
)
is not None
)

0 comments on commit c2065ea

Please sign in to comment.