From 62c4d8309f4b773ad4e045a821c63829b960876f Mon Sep 17 00:00:00 2001 From: Ekaterina Tyurina Date: Fri, 7 Jun 2019 18:04:37 +0100 Subject: [PATCH] Add remote workunits --- src/rust/engine/Cargo.lock | 2 + .../engine/process_execution/src/remote.rs | 56 ++++++++++++++++--- src/rust/engine/src/lib.rs | 6 +- src/rust/engine/src/nodes.rs | 47 ++++++++-------- src/rust/engine/workunit_store/Cargo.toml | 2 + src/rust/engine/workunit_store/src/lib.rs | 27 +++++++++ 6 files changed, 107 insertions(+), 33 deletions(-) diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 1d215138de43..e4a95b9ae296 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -3050,7 +3050,9 @@ dependencies = [ name = "workunit_store" version = "0.0.1" dependencies = [ + "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index a02b58691781..01b62b433ba9 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -24,6 +24,9 @@ use std; use std::cmp::min; use std::collections::btree_map::BTreeMap; use workunit_store::WorkUnitStore; +use workunit_store::{WorkUnit, generate_random_64bit_string}; +use time::Timespec; +use workunit_store::get_parent_id; // Environment variable which is exclusively used for cache key invalidation. // This may be not specified in an ExecuteProcessRequest, and may be populated only by the @@ -127,7 +130,7 @@ impl super::CommandRunner for CommandRunner { /// /// TODO: Request jdk_home be created if set. /// - fn run(&self, req: ExecuteProcessRequest, _workunit_store: Arc) -> BoxFuture { + fn run(&self, req: ExecuteProcessRequest, workunit_store: Arc) -> BoxFuture { let operations_client = self.operations_client.clone(); let store = self.store.clone(); @@ -188,7 +191,7 @@ impl super::CommandRunner for CommandRunner { let operations_client = operations_client.clone(); let command_runner2 = command_runner2.clone(); let command_runner3 = command_runner3.clone(); - let f = command_runner2.extract_execute_response(operation, &mut history); + let f = command_runner2.extract_execute_response(operation, &mut history, workunit_store.clone()); f.map(future::Loop::Break).or_else(move |value| { match value { ExecutionError::Fatal(err) => future::err(err).to_boxed(), @@ -374,6 +377,7 @@ impl CommandRunner { &self, operation_or_status: OperationOrStatus, attempts: &mut ExecutionHistory, + workunit_store: Arc, ) -> BoxFuture { trace!("Got operation response: {:?}", operation_or_status); @@ -410,21 +414,34 @@ impl CommandRunner { let output_upload_start = timespec_from(metadata.get_output_upload_start_timestamp()); let output_upload_completed = timespec_from(metadata.get_output_upload_completed_timestamp()); - + let parent_id = get_parent_id(); + let result_cached = execute_response.get_cached_result(); match (worker_start - enqueued).to_std() { - Ok(duration) => attempts.current_attempt.remote_queue = Some(duration), + Ok(duration) => { + attempts.current_attempt.remote_queue = Some(duration); + maybe_add_workunit(&result_cached, "scheduling", &enqueued, &worker_start, parent_id.clone(), &workunit_store); + }, Err(err) => warn!("Got negative remote queue time: {}", err), } match (input_fetch_completed - input_fetch_start).to_std() { - Ok(duration) => attempts.current_attempt.remote_input_fetch = Some(duration), + Ok(duration) => { + attempts.current_attempt.remote_input_fetch = Some(duration); + maybe_add_workunit(&result_cached, "fetching", &input_fetch_start, &input_fetch_completed, parent_id.clone(), &workunit_store); + }, Err(err) => warn!("Got negative remote input fetch time: {}", err), } match (execution_completed - execution_start).to_std() { - Ok(duration) => attempts.current_attempt.remote_execution = Some(duration), + Ok(duration) => { + attempts.current_attempt.remote_execution = Some(duration); + maybe_add_workunit(&result_cached, "executing", &execution_start, &execution_completed, parent_id.clone(), &workunit_store); + }, Err(err) => warn!("Got negative remote execution time: {}", err), } match (output_upload_completed - output_upload_start).to_std() { - Ok(duration) => attempts.current_attempt.remote_output_store = Some(duration), + Ok(duration) => { + attempts.current_attempt.remote_output_store = Some(duration); + maybe_add_workunit(&result_cached, "uploading", &output_upload_start, &output_upload_completed, parent_id, &workunit_store); + }, Err(err) => warn!("Got negative remote output store time: {}", err), } attempts.current_attempt.was_cache_hit = execute_response.cached_result; @@ -734,6 +751,30 @@ impl CommandRunner { } } +fn maybe_add_workunit(result_cached: &bool, name: &str, start_time: &Timespec, end_time: &Timespec, parent_id: Option, workunit_store: &Arc) { +// TODO: workunits for scheduling, fetching, executing and uploading should be recorded +// only if '--reporting-zipkin-trace-v2' is set + if !result_cached { + let workunit = WorkUnit { + name: String::from(name), + start_timestamp: timespec_as_float_secs(start_time), + end_timestamp: timespec_as_float_secs(end_time), + span_id: generate_random_64bit_string(), + parent_id, + }; + workunit_store.add_workunit(workunit); + } +} + +fn timespec_as_float_secs(timespec: &Timespec) -> f64 { + // Returning value is formed by representing duration as a hole number of seconds (u64) plus + // a hole number of microseconds (u32) turned into a f64 type. + // Reverting time from duration to f64 decrease precision. + let whole_secs = timespec.sec as f64; + let fract_part_in_nanos = timespec.nsec as f64; + whole_secs + fract_part_in_nanos / 1_000_000_000.0 +} + fn make_execute_request( req: &ExecuteProcessRequest, instance_name: &Option, @@ -2648,6 +2689,7 @@ mod tests { runtime.block_on(command_runner.extract_execute_response( super::OperationOrStatus::Operation(operation), &mut ExecutionHistory::default(), + Arc::new(SafeWorkUnitStore::new()), )) } diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index 5499c0ee0186..34128550aa08 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -347,7 +347,7 @@ pub extern "C" fn scheduler_metrics( .lock() .iter() .map(|workunit| { - let workunit_zipkin_trace_info = vec![ + let mut workunit_zipkin_trace_info = vec![ externs::store_utf8("name"), externs::store_utf8(&workunit.name), externs::store_utf8("start_timestamp"), @@ -357,6 +357,10 @@ pub extern "C" fn scheduler_metrics( externs::store_utf8("span_id"), externs::store_utf8(&workunit.span_id), ]; + if let Some(parent_id) = &workunit.parent_id { + workunit_zipkin_trace_info.push(externs::store_utf8("parent_id")); + workunit_zipkin_trace_info.push(externs::store_utf8(parent_id)); + } externs::store_dict(&workunit_zipkin_trace_info) }) .collect::>(); diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 29324dd8f264..dd923f0e6790 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -30,10 +30,8 @@ use process_execution::{self, CommandRunner}; use rule_graph; use graph::{Entry, Node, NodeError, NodeTracer, NodeVisualizer}; -use rand::thread_rng; -use rand::Rng; use store::{self, StoreFileByDigest}; -use workunit_store::{WorkUnit, WorkUnitStore}; +use workunit_store::{WorkUnit, WorkUnitStore, generate_random_64bit_string, set_parent_id}; pub type NodeFuture = BoxFuture; @@ -1093,38 +1091,43 @@ impl Node for NodeKey { type Error = Failure; fn run(self, context: Context) -> NodeFuture { - let node_name_and_start_timestamp = if context.session.should_record_zipkin_spans() { + let span_id = generate_random_64bit_string(); + let node_workunit_params = if context.session.should_record_zipkin_spans() { let node_name = format!("{}", self); let start_timestamp_duration = std::time::SystemTime::now() .duration_since(std::time::SystemTime::UNIX_EPOCH) .unwrap(); let start_timestamp = duration_as_float_secs(&start_timestamp_duration); - Some((node_name, start_timestamp)) + Some((node_name, start_timestamp, span_id.clone())) } else { None }; let context2 = context.clone(); - match self { - NodeKey::DigestFile(n) => n.run(context).map(NodeResult::from).to_boxed(), - NodeKey::DownloadedFile(n) => n.run(context).map(NodeResult::from).to_boxed(), - NodeKey::ExecuteProcess(n) => n.run(context).map(NodeResult::from).to_boxed(), - NodeKey::ReadLink(n) => n.run(context).map(NodeResult::from).to_boxed(), - NodeKey::Scandir(n) => n.run(context).map(NodeResult::from).to_boxed(), - NodeKey::Select(n) => n.run(context).map(NodeResult::from).to_boxed(), - NodeKey::Snapshot(n) => n.run(context).map(NodeResult::from).to_boxed(), - NodeKey::Task(n) => n.run(context).map(NodeResult::from).to_boxed(), - } + futures::future::ok(()).and_then(|()| { + set_parent_id(span_id); + match self { + NodeKey::DigestFile(n) => n.run(context).map(NodeResult::from).to_boxed(), + NodeKey::DownloadedFile(n) => n.run(context).map(NodeResult::from).to_boxed(), + NodeKey::ExecuteProcess(n) => n.run(context).map(NodeResult::from).to_boxed(), + NodeKey::ReadLink(n) => n.run(context).map(NodeResult::from).to_boxed(), + NodeKey::Scandir(n) => n.run(context).map(NodeResult::from).to_boxed(), + NodeKey::Select(n) => n.run(context).map(NodeResult::from).to_boxed(), + NodeKey::Snapshot(n) => n.run(context).map(NodeResult::from).to_boxed(), + NodeKey::Task(n) => n.run(context).map(NodeResult::from).to_boxed(), + } + }) .inspect(move |_: &NodeResult| { - if let Some((node_name, start_timestamp)) = node_name_and_start_timestamp { + if let Some((node_name, start_timestamp, span_id)) = node_workunit_params { let end_timestamp_duration = std::time::SystemTime::now() .duration_since(std::time::SystemTime::UNIX_EPOCH) .unwrap(); let end_timestamp = duration_as_float_secs(&end_timestamp_duration); let workunit = WorkUnit { name: node_name, - start_timestamp: start_timestamp, - end_timestamp: end_timestamp, - span_id: generate_random_64bit_string(), + start_timestamp, + end_timestamp, + span_id, + parent_id: None, }; context2.session.workunit_store().add_workunit(workunit) }; @@ -1162,12 +1165,6 @@ fn duration_as_float_secs(duration: &Duration) -> f64 { whole_secs_in_duration + fract_part_of_duration_in_micros / 1_000_000.0 } -fn generate_random_64bit_string() -> String { - let mut rng = thread_rng(); - let random_u64: u64 = rng.gen(); - format!("{:16.x}", random_u64) -} - impl Display for NodeKey { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { match self { diff --git a/src/rust/engine/workunit_store/Cargo.toml b/src/rust/engine/workunit_store/Cargo.toml index 544b15fe6939..9cee707108ee 100644 --- a/src/rust/engine/workunit_store/Cargo.toml +++ b/src/rust/engine/workunit_store/Cargo.toml @@ -6,4 +6,6 @@ authors = [ "Pants Build " ] publish = false [dependencies] +futures = "0.1.27" parking_lot = "0.6" +rand = "0.6" diff --git a/src/rust/engine/workunit_store/src/lib.rs b/src/rust/engine/workunit_store/src/lib.rs index d7c265a19ade..cfb2238005ac 100644 --- a/src/rust/engine/workunit_store/src/lib.rs +++ b/src/rust/engine/workunit_store/src/lib.rs @@ -27,12 +27,16 @@ #![allow(clippy::mutex_atomic)] use parking_lot::Mutex; +use rand::thread_rng; +use rand::Rng; +use futures::task_local; pub struct WorkUnit { pub name: String, pub start_timestamp: f64, pub end_timestamp: f64, pub span_id: String, + pub parent_id: Option, } pub trait WorkUnitStore: Send + Sync { @@ -61,3 +65,26 @@ impl WorkUnitStore for SafeWorkUnitStore { self.workunits.lock().push(workunit); } } + +pub fn generate_random_64bit_string() -> String { + let mut rng = thread_rng(); + let random_u64: u64 = rng.gen(); + format!("{:16.x}", random_u64) +} + +task_local! { + static TASK_PARENT_ID: Mutex> = Mutex::new(None) +} + +pub fn set_parent_id(parent_id: String) { + TASK_PARENT_ID.with(|task_parent_id| { + *task_parent_id.lock() = Some(parent_id); + }) +} + +pub fn get_parent_id() -> Option { + TASK_PARENT_ID.with(|task_parent_id| { + let task_parent_id = task_parent_id.lock(); + (*task_parent_id).clone() + }) +}