Skip to content

Commit

Permalink
Add remote workunits
Browse files Browse the repository at this point in the history
  • Loading branch information
cattibrie committed Jun 28, 2019
1 parent a04d9fc commit 62c4d83
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 33 deletions.
2 changes: 2 additions & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 49 additions & 7 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use std;
use std::cmp::min;
use std::collections::btree_map::BTreeMap;
use workunit_store::WorkUnitStore;
use workunit_store::{WorkUnit, generate_random_64bit_string};
use time::Timespec;
use workunit_store::get_parent_id;

// Environment variable which is exclusively used for cache key invalidation.
// This may be not specified in an ExecuteProcessRequest, and may be populated only by the
Expand Down Expand Up @@ -127,7 +130,7 @@ impl super::CommandRunner for CommandRunner {
///
/// TODO: Request jdk_home be created if set.
///
fn run(&self, req: ExecuteProcessRequest, _workunit_store: Arc<WorkUnitStore>) -> BoxFuture<FallibleExecuteProcessResult, String> {
fn run(&self, req: ExecuteProcessRequest, workunit_store: Arc<WorkUnitStore>) -> BoxFuture<FallibleExecuteProcessResult, String> {
let operations_client = self.operations_client.clone();

let store = self.store.clone();
Expand Down Expand Up @@ -188,7 +191,7 @@ impl super::CommandRunner for CommandRunner {
let operations_client = operations_client.clone();
let command_runner2 = command_runner2.clone();
let command_runner3 = command_runner3.clone();
let f = command_runner2.extract_execute_response(operation, &mut history);
let f = command_runner2.extract_execute_response(operation, &mut history, workunit_store.clone());
f.map(future::Loop::Break).or_else(move |value| {
match value {
ExecutionError::Fatal(err) => future::err(err).to_boxed(),
Expand Down Expand Up @@ -374,6 +377,7 @@ impl CommandRunner {
&self,
operation_or_status: OperationOrStatus,
attempts: &mut ExecutionHistory,
workunit_store: Arc<WorkUnitStore>,
) -> BoxFuture<FallibleExecuteProcessResult, ExecutionError> {
trace!("Got operation response: {:?}", operation_or_status);

Expand Down Expand Up @@ -410,21 +414,34 @@ impl CommandRunner {
let output_upload_start = timespec_from(metadata.get_output_upload_start_timestamp());
let output_upload_completed =
timespec_from(metadata.get_output_upload_completed_timestamp());

let parent_id = get_parent_id();
let result_cached = execute_response.get_cached_result();
match (worker_start - enqueued).to_std() {
Ok(duration) => attempts.current_attempt.remote_queue = Some(duration),
Ok(duration) => {
attempts.current_attempt.remote_queue = Some(duration);
maybe_add_workunit(&result_cached, "scheduling", &enqueued, &worker_start, parent_id.clone(), &workunit_store);
},
Err(err) => warn!("Got negative remote queue time: {}", err),
}
match (input_fetch_completed - input_fetch_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_input_fetch = Some(duration),
Ok(duration) => {
attempts.current_attempt.remote_input_fetch = Some(duration);
maybe_add_workunit(&result_cached, "fetching", &input_fetch_start, &input_fetch_completed, parent_id.clone(), &workunit_store);
},
Err(err) => warn!("Got negative remote input fetch time: {}", err),
}
match (execution_completed - execution_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_execution = Some(duration),
Ok(duration) => {
attempts.current_attempt.remote_execution = Some(duration);
maybe_add_workunit(&result_cached, "executing", &execution_start, &execution_completed, parent_id.clone(), &workunit_store);
},
Err(err) => warn!("Got negative remote execution time: {}", err),
}
match (output_upload_completed - output_upload_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_output_store = Some(duration),
Ok(duration) => {
attempts.current_attempt.remote_output_store = Some(duration);
maybe_add_workunit(&result_cached, "uploading", &output_upload_start, &output_upload_completed, parent_id, &workunit_store);
},
Err(err) => warn!("Got negative remote output store time: {}", err),
}
attempts.current_attempt.was_cache_hit = execute_response.cached_result;
Expand Down Expand Up @@ -734,6 +751,30 @@ impl CommandRunner {
}
}

fn maybe_add_workunit(result_cached: &bool, name: &str, start_time: &Timespec, end_time: &Timespec, parent_id: Option<String>, workunit_store: &Arc<WorkUnitStore>) {
// TODO: workunits for scheduling, fetching, executing and uploading should be recorded
// only if '--reporting-zipkin-trace-v2' is set
if !result_cached {
let workunit = WorkUnit {
name: String::from(name),
start_timestamp: timespec_as_float_secs(start_time),
end_timestamp: timespec_as_float_secs(end_time),
span_id: generate_random_64bit_string(),
parent_id,
};
workunit_store.add_workunit(workunit);
}
}

fn timespec_as_float_secs(timespec: &Timespec) -> f64 {
// Returning value is formed by representing duration as a hole number of seconds (u64) plus
// a hole number of microseconds (u32) turned into a f64 type.
// Reverting time from duration to f64 decrease precision.
let whole_secs = timespec.sec as f64;
let fract_part_in_nanos = timespec.nsec as f64;
whole_secs + fract_part_in_nanos / 1_000_000_000.0
}

fn make_execute_request(
req: &ExecuteProcessRequest,
instance_name: &Option<String>,
Expand Down Expand Up @@ -2648,6 +2689,7 @@ mod tests {
runtime.block_on(command_runner.extract_execute_response(
super::OperationOrStatus::Operation(operation),
&mut ExecutionHistory::default(),
Arc::new(SafeWorkUnitStore::new()),
))
}

Expand Down
6 changes: 5 additions & 1 deletion src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ pub extern "C" fn scheduler_metrics(
.lock()
.iter()
.map(|workunit| {
let workunit_zipkin_trace_info = vec![
let mut workunit_zipkin_trace_info = vec![
externs::store_utf8("name"),
externs::store_utf8(&workunit.name),
externs::store_utf8("start_timestamp"),
Expand All @@ -357,6 +357,10 @@ pub extern "C" fn scheduler_metrics(
externs::store_utf8("span_id"),
externs::store_utf8(&workunit.span_id),
];
if let Some(parent_id) = &workunit.parent_id {
workunit_zipkin_trace_info.push(externs::store_utf8("parent_id"));
workunit_zipkin_trace_info.push(externs::store_utf8(parent_id));
}
externs::store_dict(&workunit_zipkin_trace_info)
})
.collect::<Vec<_>>();
Expand Down
47 changes: 22 additions & 25 deletions src/rust/engine/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@ use process_execution::{self, CommandRunner};
use rule_graph;

use graph::{Entry, Node, NodeError, NodeTracer, NodeVisualizer};
use rand::thread_rng;
use rand::Rng;
use store::{self, StoreFileByDigest};
use workunit_store::{WorkUnit, WorkUnitStore};
use workunit_store::{WorkUnit, WorkUnitStore, generate_random_64bit_string, set_parent_id};

pub type NodeFuture<T> = BoxFuture<T, Failure>;

Expand Down Expand Up @@ -1093,38 +1091,43 @@ impl Node for NodeKey {
type Error = Failure;

fn run(self, context: Context) -> NodeFuture<NodeResult> {
let node_name_and_start_timestamp = if context.session.should_record_zipkin_spans() {
let span_id = generate_random_64bit_string();
let node_workunit_params = if context.session.should_record_zipkin_spans() {
let node_name = format!("{}", self);
let start_timestamp_duration = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap();
let start_timestamp = duration_as_float_secs(&start_timestamp_duration);
Some((node_name, start_timestamp))
Some((node_name, start_timestamp, span_id.clone()))
} else {
None
};
let context2 = context.clone();
match self {
NodeKey::DigestFile(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::DownloadedFile(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::ExecuteProcess(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::ReadLink(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::Scandir(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::Select(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::Snapshot(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::Task(n) => n.run(context).map(NodeResult::from).to_boxed(),
}
futures::future::ok(()).and_then(|()| {
set_parent_id(span_id);
match self {
NodeKey::DigestFile(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::DownloadedFile(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::ExecuteProcess(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::ReadLink(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::Scandir(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::Select(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::Snapshot(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::Task(n) => n.run(context).map(NodeResult::from).to_boxed(),
}
})
.inspect(move |_: &NodeResult| {
if let Some((node_name, start_timestamp)) = node_name_and_start_timestamp {
if let Some((node_name, start_timestamp, span_id)) = node_workunit_params {
let end_timestamp_duration = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap();
let end_timestamp = duration_as_float_secs(&end_timestamp_duration);
let workunit = WorkUnit {
name: node_name,
start_timestamp: start_timestamp,
end_timestamp: end_timestamp,
span_id: generate_random_64bit_string(),
start_timestamp,
end_timestamp,
span_id,
parent_id: None,
};
context2.session.workunit_store().add_workunit(workunit)
};
Expand Down Expand Up @@ -1162,12 +1165,6 @@ fn duration_as_float_secs(duration: &Duration) -> f64 {
whole_secs_in_duration + fract_part_of_duration_in_micros / 1_000_000.0
}

fn generate_random_64bit_string() -> String {
let mut rng = thread_rng();
let random_u64: u64 = rng.gen();
format!("{:16.x}", random_u64)
}

impl Display for NodeKey {
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
match self {
Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/workunit_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ authors = [ "Pants Build <pantsbuild@gmail.com>" ]
publish = false

[dependencies]
futures = "0.1.27"
parking_lot = "0.6"
rand = "0.6"
27 changes: 27 additions & 0 deletions src/rust/engine/workunit_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@
#![allow(clippy::mutex_atomic)]

use parking_lot::Mutex;
use rand::thread_rng;
use rand::Rng;
use futures::task_local;

pub struct WorkUnit {
pub name: String,
pub start_timestamp: f64,
pub end_timestamp: f64,
pub span_id: String,
pub parent_id: Option<String>,
}

pub trait WorkUnitStore: Send + Sync {
Expand Down Expand Up @@ -61,3 +65,26 @@ impl WorkUnitStore for SafeWorkUnitStore {
self.workunits.lock().push(workunit);
}
}

pub fn generate_random_64bit_string() -> String {
let mut rng = thread_rng();
let random_u64: u64 = rng.gen();
format!("{:16.x}", random_u64)
}

task_local! {
static TASK_PARENT_ID: Mutex<Option<String>> = Mutex::new(None)
}

pub fn set_parent_id(parent_id: String) {
TASK_PARENT_ID.with(|task_parent_id| {
*task_parent_id.lock() = Some(parent_id);
})
}

pub fn get_parent_id() -> Option<String> {
TASK_PARENT_ID.with(|task_parent_id| {
let task_parent_id = task_parent_id.lock();
(*task_parent_id).clone()
})
}

0 comments on commit 62c4d83

Please sign in to comment.