Skip to content

Commit

Permalink
Create a separate crate for workunit_store
Browse files Browse the repository at this point in the history
  • Loading branch information
patliu85 authored and cattibrie committed Jun 28, 2019
1 parent c41dee9 commit a04d9fc
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 35 deletions.
2 changes: 2 additions & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/rust/engine/process_execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ time = "0.1.40"
tokio-codec = "0.1"
tokio-process = "0.2.1"
tokio-timer = "0.2"
workunit_store = { path = "../workunit_store" }

[dev-dependencies]
mock = { path = "../testutil/mock" }
Expand Down
7 changes: 4 additions & 3 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use store::UploadSummary;
use workunit_store::WorkUnitStore;

use async_semaphore::AsyncSemaphore;

Expand Down Expand Up @@ -133,7 +134,7 @@ impl AddAssign<UploadSummary> for ExecutionStats {
}

pub trait CommandRunner: Send + Sync {
fn run(&self, req: ExecuteProcessRequest) -> BoxFuture<FallibleExecuteProcessResult, String>;
fn run(&self, req: ExecuteProcessRequest, workunit_store: Arc<WorkUnitStore>) -> BoxFuture<FallibleExecuteProcessResult, String>;
}

///
Expand All @@ -153,9 +154,9 @@ impl BoundedCommandRunner {
}

impl CommandRunner for BoundedCommandRunner {
fn run(&self, req: ExecuteProcessRequest) -> BoxFuture<FallibleExecuteProcessResult, String> {
fn run(&self, req: ExecuteProcessRequest, workunit_store: Arc<WorkUnitStore>) -> BoxFuture<FallibleExecuteProcessResult, String> {
let inner = self.inner.clone();
self.inner.1.with_acquired(move || inner.0.run(req))
self.inner.1.with_acquired(move || inner.0.run(req, workunit_store))
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tokio_process::CommandExt;
use super::{ExecuteProcessRequest, FallibleExecuteProcessResult};

use bytes::{Bytes, BytesMut};
use workunit_store::WorkUnitStore;

pub struct CommandRunner {
store: Store,
Expand Down Expand Up @@ -205,7 +206,7 @@ impl super::CommandRunner for CommandRunner {
///
/// Runs a command on this machine in the passed working directory.
///
fn run(&self, req: ExecuteProcessRequest) -> BoxFuture<FallibleExecuteProcessResult, String> {
fn run(&self, req: ExecuteProcessRequest, _workunit_store: Arc<WorkUnitStore>) -> BoxFuture<FallibleExecuteProcessResult, String> {
let workdir = try_future!(tempfile::Builder::new()
.prefix("process-execution")
.tempdir_in(&self.work_dir)
Expand Down Expand Up @@ -340,6 +341,7 @@ mod tests {
use testutil::data::{TestData, TestDirectory};
use testutil::path::find_bash;
use testutil::{as_bytes, owned_string_vec};
use workunit_store::SafeWorkUnitStore;

#[test]
#[cfg(unix)]
Expand Down Expand Up @@ -897,6 +899,6 @@ mod tests {
};
tokio::runtime::Runtime::new()
.unwrap()
.block_on(runner.run(req))
.block_on(runner.run(req, Arc::new(SafeWorkUnitStore::new())))
}
}
14 changes: 8 additions & 6 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use super::{ExecuteProcessRequest, ExecutionStats, FallibleExecuteProcessResult}
use std;
use std::cmp::min;
use std::collections::btree_map::BTreeMap;
use workunit_store::WorkUnitStore;

// Environment variable which is exclusively used for cache key invalidation.
// This may be not specified in an ExecuteProcessRequest, and may be populated only by the
Expand Down Expand Up @@ -126,7 +127,7 @@ impl super::CommandRunner for CommandRunner {
///
/// TODO: Request jdk_home be created if set.
///
fn run(&self, req: ExecuteProcessRequest) -> BoxFuture<FallibleExecuteProcessResult, String> {
fn run(&self, req: ExecuteProcessRequest, _workunit_store: Arc<WorkUnitStore>) -> BoxFuture<FallibleExecuteProcessResult, String> {
let operations_client = self.operations_client.clone();

let store = self.store.clone();
Expand Down Expand Up @@ -927,6 +928,7 @@ mod tests {
use std::ops::Sub;
use std::path::PathBuf;
use std::time::Duration;
use workunit_store::SafeWorkUnitStore;

#[derive(Debug, PartialEq)]
enum StdoutType {
Expand Down Expand Up @@ -1482,7 +1484,7 @@ mod tests {
store,
);
let result = runtime
.block_on(cmd_runner.run(echo_roland_request()))
.block_on(cmd_runner.run(echo_roland_request(), Arc::new(SafeWorkUnitStore::new())))
.unwrap();
assert_eq!(
result.without_execution_attempts(),
Expand Down Expand Up @@ -1855,7 +1857,7 @@ mod tests {
);

let result = runtime
.block_on(command_runner.run(cat_roland_request()))
.block_on(command_runner.run(cat_roland_request(), Arc::new(SafeWorkUnitStore::new())))
.unwrap();
assert_eq!(
result.without_execution_attempts(),
Expand Down Expand Up @@ -1947,7 +1949,7 @@ mod tests {
1,
store,
)
.run(cat_roland_request())
.run(cat_roland_request(), Arc::new(SafeWorkUnitStore::new()))
.wait();
assert_eq!(
result,
Expand Down Expand Up @@ -2015,7 +2017,7 @@ mod tests {
);

let error = runtime
.block_on(runner.run(cat_roland_request()))
.block_on(runner.run(cat_roland_request(), Arc::new(SafeWorkUnitStore::new())))
.expect_err("Want error");
assert_contains(&error, &format!("{}", missing_digest.0));
}
Expand Down Expand Up @@ -2610,7 +2612,7 @@ mod tests {
.build();
let command_runner = create_command_runner(address, &cas);
let mut runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(command_runner.run(request))
runtime.block_on(command_runner.run(request, Arc::new(SafeWorkUnitStore::new())))
}

fn create_command_runner(address: String, cas: &mock::StubCAS) -> CommandRunner {
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/process_executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ hashing = { path = "../hashing" }
process_execution = { path = "../process_execution" }
store = { path = "../fs/store" }
tokio = "0.1"
workunit_store = { path = "../workunit_store"}
3 changes: 2 additions & 1 deletion src/rust/engine/process_executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use std::process::exit;
use std::time::Duration;
use store::{BackoffConfig, Store};
use tokio::runtime::Runtime;
use workunit_store::SafeWorkUnitStore;

/// A binary which takes args of format:
/// process_executor --env=FOO=bar --env=SOME=value --input-digest=abc123 --input-digest-length=80
Expand Down Expand Up @@ -318,7 +319,7 @@ fn main() {
let mut runtime = Runtime::new().unwrap();

let result = runtime
.block_on(runner.run(request))
.block_on(runner.run(request, Arc::new(SafeWorkUnitStore::new())))
.expect("Error executing");

if let Some(output) = args.value_of("materialize-output-to").map(PathBuf::from) {
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ pub extern "C" fn scheduler_metrics(
.flat_map(|(metric, value)| vec![externs::store_utf8(metric), externs::store_i64(value)])
.collect::<Vec<_>>();
if session.should_record_zipkin_spans() {
let workunits = session
let workunits = session.workunit_store()
.get_workunits()
.lock()
.iter()
Expand Down
6 changes: 3 additions & 3 deletions src/rust/engine/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,11 +415,11 @@ impl WrappedNode for ExecuteProcess {

fn run(self, context: Context) -> NodeFuture<ProcessResult> {
let request = self.0;

let workunit_store = context.session.workunit_store();
context
.core
.command_runner
.run(request)
.run(request, workunit_store)
.map(ProcessResult)
.map_err(|e| throw(&format!("Failed to execute process: {}", e)))
.to_boxed()
Expand Down Expand Up @@ -1126,7 +1126,7 @@ impl Node for NodeKey {
end_timestamp: end_timestamp,
span_id: generate_random_64bit_string(),
};
context2.session.add_workunit(workunit)
context2.session.workunit_store().add_workunit(workunit)
};
})
.to_boxed()
Expand Down
28 changes: 11 additions & 17 deletions src/rust/engine/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use indexmap::IndexMap;
use log::{debug, info, warn};
use parking_lot::Mutex;
use ui::EngineDisplay;
use workunit_store::{WorkUnit, WorkUnitStore};
use workunit_store::SafeWorkUnitStore;

///
/// A Session represents a related series of requests (generally: one run of the pants CLI) on an
Expand All @@ -37,26 +37,12 @@ struct InnerSession {
// If enabled, Zipkin spans for v2 engine will be collected.
should_record_zipkin_spans: bool,
// A place to store info about workunits in rust part
workunits: Mutex<Vec<WorkUnit>>,
workunit_store: Arc<SafeWorkUnitStore>,
}

#[derive(Clone)]
pub struct Session(Arc<InnerSession>);

impl WorkUnitStore for Session {
fn should_record_zipkin_spans(&self) -> bool {
self.0.should_record_zipkin_spans
}

fn get_workunits(&self) -> &Mutex<Vec<WorkUnit>> {
&self.0.workunits
}

fn add_workunit(&self, workunit: WorkUnit) {
self.0.workunits.lock().push(workunit);
}
}

impl Session {
pub fn new(
scheduler: &Scheduler,
Expand All @@ -69,7 +55,7 @@ impl Session {
roots: Mutex::new(HashSet::new()),
display: EngineDisplay::create(ui_worker_count, should_render_ui).map(Mutex::new),
should_record_zipkin_spans: should_record_zipkin_spans,
workunits: Mutex::new(Vec::new()),
workunit_store: Arc::new(SafeWorkUnitStore::new()),
};
Session(Arc::new(inner_session))
}
Expand All @@ -91,6 +77,14 @@ impl Session {
pub fn display(&self) -> &Option<Mutex<EngineDisplay>> {
&self.0.display
}

pub fn should_record_zipkin_spans(&self) -> bool {
self.0.should_record_zipkin_spans
}

pub fn workunit_store(&self) -> Arc<SafeWorkUnitStore> {
self.0.workunit_store.clone()
}
}

pub struct ExecutionRequest {
Expand Down
25 changes: 23 additions & 2 deletions src/rust/engine/workunit_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,29 @@ pub struct WorkUnit {
pub span_id: String,
}

pub trait WorkUnitStore {
fn should_record_zipkin_spans(&self) -> bool;
pub trait WorkUnitStore: Send + Sync {
fn get_workunits(&self) -> &Mutex<Vec<WorkUnit>>;
fn add_workunit(&self, workunit: WorkUnit);
}

pub struct SafeWorkUnitStore {
pub workunits: Mutex<Vec<WorkUnit>>,
}

impl SafeWorkUnitStore {
pub fn new() -> SafeWorkUnitStore {
SafeWorkUnitStore {
workunits: Mutex::new(Vec::new()),
}
}
}

impl WorkUnitStore for SafeWorkUnitStore {
fn get_workunits(&self) -> &Mutex<Vec<WorkUnit>> {
&self.workunits
}

fn add_workunit(&self, workunit: WorkUnit) {
self.workunits.lock().push(workunit);
}
}

0 comments on commit a04d9fc

Please sign in to comment.