diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index 67c470734..6310d6c45 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "4f921e8c24fd74362f4fa660db203c02dfdc564ae44a23e8d4526d28aab7aafb", + "checksum": "5fb139361c531ebf8263b94f2cf9d4ac159c5ae3616a08fc92e6e0089a8c6c19", "crates": { "addr2line 0.20.0": { "name": "addr2line", @@ -190,6 +190,42 @@ }, "license": "Unlicense OR MIT" }, + "allocator-api2 0.2.16": { + "name": "allocator-api2", + "version": "0.2.16", + "repository": { + "Http": { + "url": "https://crates.io/api/v1/crates/allocator-api2/0.2.16/download", + "sha256": "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" + } + }, + "targets": [ + { + "Library": { + "crate_name": "allocator_api2", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "allocator_api2", + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "crate_features": { + "common": [ + "alloc" + ], + "selects": {} + }, + "edition": "2018", + "version": "0.2.16" + }, + "license": "MIT OR Apache-2.0" + }, "android-tzdata 0.1.1": { "name": "android-tzdata", "version": "0.1.1", @@ -2488,6 +2524,10 @@ "id": "futures 0.3.28", "target": "futures" }, + { + "id": "hashbrown 0.14.0", + "target": "hashbrown" + }, { "id": "hex 0.4.3", "target": "hex" @@ -4112,6 +4152,58 @@ }, "license": "MIT OR Apache-2.0" }, + "hashbrown 0.14.0": { + "name": "hashbrown", + "version": "0.14.0", + "repository": { + "Http": { + "url": "https://crates.io/api/v1/crates/hashbrown/0.14.0/download", + "sha256": "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" + } + }, + "targets": [ + { + "Library": { + "crate_name": "hashbrown", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "hashbrown", + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "crate_features": { + "common": [ + "ahash", + "allocator-api2", + "default", + "inline-more" + ], + "selects": {} + }, + "deps": { + "common": [ + { + "id": "ahash 0.8.3", + "target": "ahash" + }, + { + "id": "allocator-api2 0.2.16", + "target": "allocator_api2" + } + ], + "selects": {} + }, + "edition": "2021", + "version": "0.14.0" + }, + "license": "MIT OR Apache-2.0" + }, "heck 0.4.1": { "name": "heck", "version": "0.4.1", diff --git a/Cargo.lock b/Cargo.lock index 6565d5af6..85bb12ac7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -37,6 +37,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -482,6 +488,7 @@ dependencies = [ "filetime", "fixed-buffer", "futures", + "hashbrown 0.14.0", "hex", "http", "hyper", @@ -778,6 +785,16 @@ dependencies = [ "ahash", ] +[[package]] +name = "hashbrown" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +dependencies = [ + "ahash", + "allocator-api2", +] + [[package]] name = "heck" version = "0.4.1" diff --git a/Cargo.toml b/Cargo.toml index 995459608..63944e524 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ uuid = { version = "1.4.0", features = ["v4"] } shlex = "1.1.0" relative-path = "1.8.0" parking_lot = "0.12.1" +hashbrown = "0.14" [dev-dependencies] stdext = "0.3.1" diff --git a/cas/grpc_service/execution_server.rs b/cas/grpc_service/execution_server.rs index 48fb55f3a..fb8bc2a64 100644 --- a/cas/grpc_service/execution_server.rs +++ b/cas/grpc_service/execution_server.rs @@ -19,11 +19,12 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use futures::{Stream, StreamExt}; use rand::{thread_rng, Rng}; +use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tonic::{Request, Response, Status}; use ac_utils::get_and_decode_digest; -use action_messages::{ActionInfo, ActionInfoHashKey, DEFAULT_EXECUTION_PRIORITY}; +use action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY}; use common::{log, DigestInfo}; use config::cas_server::{ExecutionConfig, InstanceName}; use error::{make_input_err, Error, ResultExt}; @@ -163,6 +164,14 @@ impl ExecutionServer { Server::new(self) } + fn to_execute_stream(receiver: watch::Receiver>) -> Response { + let receiver_stream = Box::pin(WatchStream::new(receiver).map(|action_update| { + log::info!("\x1b[0;31mexecute Resp Stream\x1b[0m: {:?}", action_update); + Ok(action_update.as_ref().clone().into()) + })); + tonic::Response::new(receiver_stream) + } + async fn inner_execute(&self, request: Request) -> Result, Error> { let execute_req = request.into_inner(); let instance_name = execute_req.instance_name; @@ -194,11 +203,22 @@ impl ExecutionServer { .await .err_tip(|| "Failed to schedule task")?; - let receiver_stream = Box::pin(WatchStream::new(rx).map(|action_update| { - log::info!("\x1b[0;31mexecute Resp Stream\x1b[0m: {:?}", action_update); - Ok(action_update.as_ref().clone().into()) - })); - Ok(tonic::Response::new(receiver_stream)) + Ok(Self::to_execute_stream(rx)) + } + + async fn inner_wait_execution( + &self, + request: Request, + ) -> Result, Status> { + let unique_qualifier = ActionInfoHashKey::try_from(request.into_inner().name.as_str()) + .err_tip(|| "Decoding operation name into ActionInfoHashKey")?; + let Some(instance_info) = self.instance_infos.get(&unique_qualifier.instance_name) else { + return Err(Status::not_found(format!("No scheduler with the instance name {}", unique_qualifier.instance_name))); + }; + let Some(rx) = instance_info.scheduler.find_existing_action(&unique_qualifier).await else { + return Err(Status::not_found("Failed to find existing task")); + }; + Ok(Self::to_execute_stream(rx)) } } @@ -224,14 +244,11 @@ impl Execution for ExecutionServer { resp } - type WaitExecutionStream = Pin> + Send + Sync + 'static>>; - async fn wait_execution( - &self, - request: Request, - ) -> Result, Status> { - use stdext::function_name; - let output = format!("{} not yet implemented", function_name!()); - println!("{:?}", request); - Err(Status::unimplemented(output)) + type WaitExecutionStream = ExecuteStream; + async fn wait_execution(&self, request: Request) -> Result, Status> { + self.inner_wait_execution(request) + .await + .err_tip(|| "Failed on wait_execution() command") + .map_err(|e| e.into()) } } diff --git a/cas/scheduler/BUILD b/cas/scheduler/BUILD index fbee22706..25f87cf63 100644 --- a/cas/scheduler/BUILD +++ b/cas/scheduler/BUILD @@ -83,6 +83,7 @@ rust_library( "//util:error", "@crate_index//:parking_lot", "@crate_index//:futures", + "@crate_index//:hashbrown", "@crate_index//:lru", "@crate_index//:tokio", ], @@ -109,6 +110,7 @@ rust_library( "//util:common", "//util:error", "@crate_index//:futures", + "@crate_index//:parking_lot", "@crate_index//:tonic", "@crate_index//:tokio", "@crate_index//:tokio-stream", @@ -176,6 +178,7 @@ rust_library( "//config", "//util:error", "@crate_index//:futures", + "@crate_index//:tokio", ], ) diff --git a/cas/scheduler/cache_lookup_scheduler.rs b/cas/scheduler/cache_lookup_scheduler.rs index 909df04f6..56ad20d6e 100644 --- a/cas/scheduler/cache_lookup_scheduler.rs +++ b/cas/scheduler/cache_lookup_scheduler.rs @@ -12,20 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; use futures::stream::{FuturesUnordered, StreamExt}; +use tokio::select; use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tonic::Request; use ac_utils::get_and_decode_digest; -use action_messages::{ActionInfo, ActionResult, ActionStage, ActionState}; +use action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState}; use common::DigestInfo; use error::Error; use grpc_store::GrpcStore; +use parking_lot::{Mutex, MutexGuard}; use platform_property_manager::PlatformPropertyManager; use proto::build::bazel::remote::execution::v2::{ ActionResult as ProtoActionResult, FindMissingBlobsRequest, GetActionResultRequest, @@ -33,6 +36,11 @@ use proto::build::bazel::remote::execution::v2::{ use scheduler::ActionScheduler; use store::Store; +/// Actions that are having their cache checked or failed cache lookup and are +/// being forwarded upstream. Missing the skip_cache_check actions which are +/// forwarded directly. +type CheckActions = HashMap>>>; + pub struct CacheLookupScheduler { /// A reference to the CAS which is used to validate all the outputs of a /// cached ActionResult still exist. @@ -42,6 +50,8 @@ pub struct CacheLookupScheduler { /// The "real" scheduler to use to perform actions if they were not found /// in the action cache. action_scheduler: Arc, + /// Actions that are currently performing a CacheCheck. + cache_check_actions: Arc>, } async fn get_action_from_store( @@ -112,6 +122,21 @@ async fn validate_outputs_exist( } } +fn subscribe_to_existing_action( + cache_check_actions: &MutexGuard, + unique_qualifier: &ActionInfoHashKey, +) -> Option>> { + cache_check_actions.get(unique_qualifier).map(|tx| { + let current_value = tx.borrow(); + // Subscribe marks the current value as seen, so we have to + // re-send it to all receivers. + // TODO: Fix this when fixed upstream tokio-rs/tokio#5871 + let rx = tx.subscribe(); + let _ = tx.send(current_value.clone()); + rx + }) +} + impl CacheLookupScheduler { pub fn new( cas_store: Arc, @@ -122,6 +147,7 @@ impl CacheLookupScheduler { cas_store, ac_store, action_scheduler, + cache_check_actions: Default::default(), }) } } @@ -142,11 +168,22 @@ impl ActionScheduler for CacheLookupScheduler { stage: ActionStage::CacheCheck, }); let (tx, rx) = watch::channel(current_state.clone()); + let tx = Arc::new(tx); + { + let mut cache_check_actions = self.cache_check_actions.lock(); + // Check this isn't a duplicate request first. + if let Some(rx) = subscribe_to_existing_action(&cache_check_actions, &action_info.unique_qualifier) { + return Ok(rx); + } + cache_check_actions.insert(action_info.unique_qualifier.clone(), tx.clone()); + } let ac_store = self.ac_store.clone(); let cas_store = self.cas_store.clone(); let action_scheduler = self.action_scheduler.clone(); + let cache_check_actions = self.cache_check_actions.clone(); tokio::spawn(async move { let instance_name = action_info.instance_name().clone(); + let unique_qualifier = action_info.unique_qualifier.clone(); if let Some(proto_action_result) = get_action_from_store(ac_store, current_state.action_digest(), instance_name.clone()).await { @@ -154,24 +191,50 @@ impl ActionScheduler for CacheLookupScheduler { // Found in the cache, return the result immediately. Arc::make_mut(&mut current_state).stage = ActionStage::CompletedFromCache(proto_action_result); let _ = tx.send(current_state); + cache_check_actions.lock().remove(&unique_qualifier); return; } } // Not in cache, forward to upstream and proxy state. - let mut watch_stream = match action_scheduler.add_action(action_info).await { - Ok(rx) => WatchStream::new(rx), + match action_scheduler.add_action(action_info).await { + Ok(rx) => { + let mut watch_stream = WatchStream::new(rx); + loop { + select!( + Some(action_state) = watch_stream.next() => { + if tx.send(action_state).is_err() { + break; + } + } + _ = tx.closed() => { + break; + } + ) + } + } Err(err) => { Arc::make_mut(&mut current_state).stage = ActionStage::Error((err, ActionResult::default())); let _ = tx.send(current_state); - return; - } - }; - while let Some(action_state) = watch_stream.next().await { - if tx.send(action_state).is_err() { - break; } } + cache_check_actions.lock().remove(&unique_qualifier); }); Ok(rx) } + + async fn find_existing_action( + &self, + unique_qualifier: &ActionInfoHashKey, + ) -> Option>> { + { + let cache_check_actions = self.cache_check_actions.lock(); + if let Some(rx) = subscribe_to_existing_action(&cache_check_actions, unique_qualifier) { + return Some(rx); + } + } + // Cache skipped may be in the upstream scheduler. + self.action_scheduler.find_existing_action(unique_qualifier).await + } + + async fn clean_recently_completed_actions(&self) {} } diff --git a/cas/scheduler/default_scheduler_factory.rs b/cas/scheduler/default_scheduler_factory.rs index bbe379678..6a663a07f 100644 --- a/cas/scheduler/default_scheduler_factory.rs +++ b/cas/scheduler/default_scheduler_factory.rs @@ -14,8 +14,10 @@ use std::pin::Pin; use std::sync::Arc; +use std::time::Duration; use futures::Future; +use tokio::time::interval; use cache_lookup_scheduler::CacheLookupScheduler; use config::schedulers::SchedulerConfig; @@ -37,7 +39,7 @@ pub fn scheduler_factory<'a>( let scheduler = Arc::new(SimpleScheduler::new(config)); (Some(scheduler.clone()), Some(scheduler)) } - SchedulerConfig::grpc(config) => (Some(Arc::new(GrpcScheduler::new(config).await?)), None), + SchedulerConfig::grpc(config) => (Some(Arc::new(GrpcScheduler::new(config)?)), None), SchedulerConfig::cache_lookup(config) => { let cas_store = store_manager .get_store(&config.cas_store) @@ -54,6 +56,26 @@ pub fn scheduler_factory<'a>( (Some(cache_lookup_scheduler), worker_scheduler) } }; + + if let Some(action_scheduler) = &scheduler.0 { + start_cleanup_timer(action_scheduler); + } + Ok(scheduler) }) } + +fn start_cleanup_timer(action_scheduler: &Arc) { + let weak_scheduler = Arc::downgrade(action_scheduler); + tokio::spawn(async move { + let mut ticker = interval(Duration::from_secs(1)); + loop { + ticker.tick().await; + match weak_scheduler.upgrade() { + Some(scheduler) => scheduler.clean_recently_completed_actions().await, + // If we fail to upgrade, our service is probably destroyed, so return. + None => return, + } + } + }); +} diff --git a/cas/scheduler/grpc_scheduler.rs b/cas/scheduler/grpc_scheduler.rs index cd8519f8c..0e9815a91 100644 --- a/cas/scheduler/grpc_scheduler.rs +++ b/cas/scheduler/grpc_scheduler.rs @@ -17,37 +17,73 @@ use std::sync::Arc; use async_trait::async_trait; use parking_lot::Mutex; +use tokio::select; use tokio::sync::watch; -use tonic::{transport, Request}; +use tonic::{transport, Request, Streaming}; -use action_messages::{ActionInfo, ActionState, DEFAULT_EXECUTION_PRIORITY}; +use action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY}; use common::log; use error::{make_err, Code, Error, ResultExt}; use platform_property_manager::PlatformPropertyManager; use proto::build::bazel::remote::execution::v2::{ capabilities_client::CapabilitiesClient, execution_client::ExecutionClient, ExecuteRequest, ExecutionPolicy, - GetCapabilitiesRequest, + GetCapabilitiesRequest, WaitExecutionRequest, }; +use proto::google::longrunning::Operation; use scheduler::ActionScheduler; pub struct GrpcScheduler { - endpoint: transport::Channel, + capabilities_client: CapabilitiesClient, + execution_client: ExecutionClient, platform_property_managers: Mutex>>, } impl GrpcScheduler { - pub async fn new(config: &config::schedulers::GrpcScheduler) -> Result { - let endpoint = transport::Endpoint::new(config.endpoint.clone()) - .err_tip(|| format!("Could not parse {} in GrpcScheduler", config.endpoint))? - .connect() - .await - .err_tip(|| format!("Could not connect to {} in GrpcScheduler", config.endpoint))?; + pub fn new(config: &config::schedulers::GrpcScheduler) -> Result { + let endpoint = transport::Channel::balance_list(std::iter::once( + transport::Endpoint::new(config.endpoint.clone()) + .err_tip(|| format!("Could not parse {} in GrpcScheduler", config.endpoint))?, + )); Ok(Self { - endpoint, + capabilities_client: CapabilitiesClient::new(endpoint.clone()), + execution_client: ExecutionClient::new(endpoint), platform_property_managers: Mutex::new(HashMap::new()), }) } + + async fn stream_state(mut result_stream: Streaming) -> Result>, Error> { + if let Some(initial_response) = result_stream + .message() + .await + .err_tip(|| "Recieving response from upstream scheduler")? + { + let (tx, rx) = watch::channel(Arc::new(initial_response.try_into()?)); + tokio::spawn(async move { + loop { + select!( + _ = tx.closed() => { + log::info!("Client disconnected in GrpcScheduler"); + return; + } + Ok(Some(response)) = result_stream.message() => { + match response.try_into() { + Ok(response) => { + if let Err(err) = tx.send(Arc::new(response)) { + log::info!("Client disconnected in GrpcScheduler: {}", err); + return; + } + } + Err(err) => log::error!("Error converting response to ActionState in GrpcScheduler: {}", err), + } + } + ) + } + }); + return Ok(rx); + } + Err(make_err!(Code::Internal, "Upstream scheduler didn't accept action.")) + } } #[async_trait] @@ -58,8 +94,9 @@ impl ActionScheduler for GrpcScheduler { } // Not in the cache, lookup the capabilities with the upstream. - let mut capabilities_client = CapabilitiesClient::new(self.endpoint.clone()); - let capabilities = capabilities_client + let capabilities = self + .capabilities_client + .clone() .get_capabilities(GetCapabilitiesRequest { instance_name: instance_name.to_string(), }) @@ -97,32 +134,34 @@ impl ActionScheduler for GrpcScheduler { // TODO: Get me from the original request, not very important as we ignore it. results_cache_policy: None, }; - let mut result_stream = ExecutionClient::new(self.endpoint.clone()) + let result_stream = self + .execution_client + .clone() .execute(Request::new(request)) .await .err_tip(|| "Sending action to upstream scheduler")? .into_inner(); - if let Some(initial_response) = result_stream - .message() - .await - .err_tip(|| "Recieving response from upstream scheduler")? - { - let (tx, rx) = watch::channel(Arc::new(initial_response.try_into()?)); - tokio::spawn(async move { - while let Ok(Some(response)) = result_stream.message().await { - match response.try_into() { - Ok(response) => { - if let Err(err) = tx.send(Arc::new(response)) { - log::info!("Client disconnected in GrpcScheduler: {}", err); - return; - } - } - Err(err) => log::error!("Error converting response to watch in GrpcScheduler: {}", err), - } - } - }); - return Ok(rx); + Self::stream_state(result_stream).await + } + + async fn find_existing_action( + &self, + unique_qualifier: &ActionInfoHashKey, + ) -> Option>> { + let request = WaitExecutionRequest { + name: unique_qualifier.action_name(), + }; + let result_stream = self + .execution_client + .clone() + .wait_execution(Request::new(request)) + .await; + if let Err(err) = result_stream { + log::info!("Error response looking up action with upstream scheduler: {}", err); + return None; } - Err(make_err!(Code::Internal, "Upstream scheduler didn't accept action.")) + Self::stream_state(result_stream.unwrap().into_inner()).await.ok() } + + async fn clean_recently_completed_actions(&self) {} } diff --git a/cas/scheduler/scheduler.rs b/cas/scheduler/scheduler.rs index 9dc43c0a6..d0f8dda0c 100644 --- a/cas/scheduler/scheduler.rs +++ b/cas/scheduler/scheduler.rs @@ -31,6 +31,15 @@ pub trait ActionScheduler: Sync + Send + Unpin { /// Adds an action to the scheduler for remote execution. async fn add_action(&self, action_info: ActionInfo) -> Result>, Error>; + + /// Find an existing action by its name. + async fn find_existing_action( + &self, + unique_qualifier: &ActionInfoHashKey, + ) -> Option>>; + + /// Cleans up the cache of recently completed actions. + async fn clean_recently_completed_actions(&self); } /// WorkerScheduler interface is responsible for interactions between the scheduler diff --git a/cas/scheduler/simple_scheduler.rs b/cas/scheduler/simple_scheduler.rs index 227d25abc..6272f9e03 100644 --- a/cas/scheduler/simple_scheduler.rs +++ b/cas/scheduler/simple_scheduler.rs @@ -12,12 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Borrow; use std::cmp; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::BTreeMap; +use std::hash::{Hash, Hasher}; use std::sync::Arc; +use std::time::SystemTime; use async_trait::async_trait; use futures::Future; +use hashbrown::{HashMap, HashSet}; use lru::LruCache; use parking_lot::Mutex; use tokio::sync::{watch, Notify}; @@ -35,6 +39,10 @@ use worker::{Worker, WorkerId, WorkerTimestamp, WorkerUpdate}; /// If this changes, remember to change the documentation in the config. const DEFAULT_WORKER_TIMEOUT_S: u64 = 5; +/// Default timeout for recently completed actions in seconds. +/// If this changes, remember to change the documentation in the config. +const DEFAULT_RETAIN_COMPLETED_FOR_S: u64 = 60; + /// Default times a job can retry before failing. /// If this changes, remember to change the documentation in the config. const DEFAULT_MAX_JOB_RETRIES: usize = 3; @@ -144,6 +152,32 @@ impl Workers { } } +struct CompletedAction { + completed_time: SystemTime, + state: Arc, +} + +impl Hash for CompletedAction { + fn hash(&self, state: &mut H) { + ActionInfoHashKey::hash(&self.state.unique_qualifier, state); + } +} + +impl PartialEq for CompletedAction { + fn eq(&self, other: &Self) -> bool { + ActionInfoHashKey::eq(&self.state.unique_qualifier, &other.state.unique_qualifier) + } +} + +impl Eq for CompletedAction {} + +impl Borrow for CompletedAction { + #[inline] + fn borrow(&self) -> &ActionInfoHashKey { + &self.state.unique_qualifier + } +} + struct SimpleSchedulerImpl { // BTreeMap uses `cmp` to do it's comparisons, this is a problem because we want to sort our // actions based on priority and insert timestamp but also want to find and join new actions @@ -159,6 +193,13 @@ struct SimpleSchedulerImpl { queued_actions: BTreeMap, AwaitedAction>, workers: Workers, active_actions: HashMap, RunningAction>, + // These actions completed recently but had no listener, they might have + // completed while the caller was thinking about calling wait_execution, so + // keep their completion state around for a while to send back. + // TODO(#192) Revisit if this is the best way to handle recently completed actions. + recently_completed_actions: HashSet, + /// The duration that actions are kept in recently_completed_actions for. + retain_completed_for: Duration, /// Timeout of how long to evict workers if no response in this given amount of time in seconds. worker_timeout_s: u64, /// Default times a job can retry before failing. @@ -168,6 +209,16 @@ struct SimpleSchedulerImpl { } impl SimpleSchedulerImpl { + fn subscribe_to_channel(awaited_action: &AwaitedAction) -> watch::Receiver> { + let rx = awaited_action.notify_channel.subscribe(); + // TODO: Fix this when fixed upstream tokio-rs/tokio#5871 + awaited_action + .notify_channel + .send(awaited_action.current_state.clone()) + .unwrap(); + rx + } + /// Attempts to find a worker to execute an action and begins executing it. /// If an action is already running that is cacheable it may merge this action /// with the results and state changes of the already running action. @@ -177,13 +228,7 @@ impl SimpleSchedulerImpl { fn add_action(&mut self, action_info: ActionInfo) -> Result>, Error> { // Check to see if the action is running, if it is and cacheable, merge the actions. if let Some(running_action) = self.active_actions.get_mut(&action_info) { - let rx = running_action.action.notify_channel.subscribe(); - running_action - .action - .notify_channel - .send(running_action.action.current_state.clone()) - .unwrap(); - return Ok(rx); + return Ok(Self::subscribe_to_channel(&running_action.action)); } // Check to see if the action is queued, if it is and cacheable, merge the actions. @@ -201,10 +246,8 @@ impl SimpleSchedulerImpl { Arc::make_mut(&mut arc_action_info).priority = new_priority; let rx = queued_action.notify_channel.subscribe(); - queued_action - .notify_channel - .send(queued_action.current_state.clone()) - .unwrap(); + // TODO: Fix this when fixed upstream tokio-rs/tokio#5871 + let _ = queued_action.notify_channel.send(queued_action.current_state.clone()); // Even if we fail to send our action to the client, we need to add this action back to the // queue because it was remove earlier. @@ -238,6 +281,33 @@ impl SimpleSchedulerImpl { Ok(rx) } + fn clean_recently_completed_actions(&mut self) { + let expiry_time = SystemTime::now().checked_sub(self.retain_completed_for).unwrap(); + self.recently_completed_actions + .retain(|action| action.completed_time > expiry_time); + } + + fn find_recently_completed_action( + &mut self, + unique_qualifier: &ActionInfoHashKey, + ) -> Option>> { + self.recently_completed_actions + .get(unique_qualifier) + .map(|action| watch::channel(action.state.clone()).1) + } + + fn find_existing_action(&self, unique_qualifier: &ActionInfoHashKey) -> Option>> { + self.queued_actions_set + .get(unique_qualifier) + .and_then(|action_info| self.queued_actions.get(action_info)) + .or_else(|| { + self.active_actions + .get(unique_qualifier) + .map(|running_action| &running_action.action) + }) + .map(Self::subscribe_to_channel) + } + fn retry_action(&mut self, action_info: &Arc, worker_id: &WorkerId) { match self.active_actions.remove(action_info) { Some(running_action) => { @@ -445,20 +515,26 @@ impl SimpleSchedulerImpl { .action .notify_channel .send(running_action.action.current_state.clone()); - if send_result.is_err() { - log::warn!( - "Action {} has no more listeners during update_action()", - action_info.digest().str() - ); - } if !running_action.action.current_state.stage.is_finished() { + if send_result.is_err() { + log::warn!( + "Action {} has no more listeners during update_action()", + action_info.digest().str() + ); + } // If the operation is not finished it means the worker is still working on it, so put it // back or else we will loose track of the task. self.active_actions.insert(action_info, running_action); return Ok(()); } + // Keep in case this is asked for soon. + self.recently_completed_actions.insert(CompletedAction { + completed_time: SystemTime::now(), + state: running_action.action.current_state, + }); + let worker = self .workers .workers @@ -467,8 +543,6 @@ impl SimpleSchedulerImpl { worker.complete_action(&action_info); self.tasks_or_workers_change_notify.notify_one(); - // TODO(allada) We should probably hold a small queue of recent actions for debugging. - // Right now it will drop the action which also disconnects listeners here. Ok(()) } } @@ -511,6 +585,11 @@ impl SimpleScheduler { worker_timeout_s = DEFAULT_WORKER_TIMEOUT_S; } + let mut retain_completed_for_s = scheduler_cfg.retain_completed_for_s; + if retain_completed_for_s == 0 { + retain_completed_for_s = DEFAULT_RETAIN_COMPLETED_FOR_S; + } + let mut max_job_retries = scheduler_cfg.max_job_retries; if max_job_retries == 0 { max_job_retries = DEFAULT_MAX_JOB_RETRIES; @@ -523,6 +602,8 @@ impl SimpleScheduler { queued_actions: BTreeMap::new(), workers: Workers::new(scheduler_cfg.allocation_strategy), active_actions: HashMap::new(), + recently_completed_actions: HashSet::new(), + retain_completed_for: Duration::new(retain_completed_for_s, 0), worker_timeout_s, max_job_retries, tasks_or_workers_change_notify: tasks_or_workers_change_notify.clone(), @@ -584,6 +665,20 @@ impl ActionScheduler for SimpleScheduler { let mut inner = self.inner.lock(); inner.add_action(action_info) } + + async fn find_existing_action( + &self, + unique_qualifier: &ActionInfoHashKey, + ) -> Option>> { + let mut inner = self.inner.lock(); + inner + .find_existing_action(unique_qualifier) + .or_else(|| inner.find_recently_completed_action(unique_qualifier)) + } + + async fn clean_recently_completed_actions(&self) { + self.inner.lock().clean_recently_completed_actions(); + } } #[async_trait] diff --git a/cas/scheduler/tests/cache_lookup_scheduler_test.rs b/cas/scheduler/tests/cache_lookup_scheduler_test.rs index 803c3146d..5c58f7693 100644 --- a/cas/scheduler/tests/cache_lookup_scheduler_test.rs +++ b/cas/scheduler/tests/cache_lookup_scheduler_test.rs @@ -22,7 +22,7 @@ use tokio::{self, join, sync::watch}; use tokio_stream::wrappers::WatchStream; use tokio_stream::StreamExt; -use action_messages::{ActionResult, ActionStage, ActionState, DirectoryInfo}; +use action_messages::{ActionInfoHashKey, ActionResult, ActionStage, ActionState, DirectoryInfo}; use cache_lookup_scheduler::CacheLookupScheduler; use common::DigestInfo; use error::{Error, ResultExt}; @@ -145,4 +145,21 @@ mod cache_lookup_scheduler_tests { ); Ok(()) } + + #[tokio::test] + async fn find_existing_action_call_passed() -> Result<(), Error> { + let context = make_cache_scheduler()?; + let action_name = ActionInfoHashKey { + instance_name: "instance".to_string(), + digest: DigestInfo::new([8; 32], 1), + salt: 1000, + }; + let (actual_result, actual_action_name) = join!( + context.cache_scheduler.find_existing_action(&action_name), + context.mock_scheduler.expect_find_existing_action(None), + ); + assert_eq!(true, actual_result.is_none()); + assert_eq!(action_name, actual_action_name); + Ok(()) + } } diff --git a/cas/scheduler/tests/simple_scheduler_test.rs b/cas/scheduler/tests/simple_scheduler_test.rs index 008dc66e7..528d9ce5d 100644 --- a/cas/scheduler/tests/simple_scheduler_test.rs +++ b/cas/scheduler/tests/simple_scheduler_test.rs @@ -138,6 +138,63 @@ mod scheduler_tests { Ok(()) } + #[tokio::test] + async fn find_executing_action() -> Result<(), Error> { + const WORKER_ID: WorkerId = WorkerId(0x1234_5678_9111); + + let scheduler = + SimpleScheduler::new_with_callback(&config::schedulers::SimpleScheduler::default(), || async move {}); + let action_digest = DigestInfo::new([99u8; 32], 512); + + let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, PlatformProperties::default()).await?; + let insert_timestamp = make_system_time(1); + let client_rx = setup_action( + &scheduler, + action_digest, + PlatformProperties::default(), + insert_timestamp, + ) + .await?; + + // Drop our receiver and look up a new one. + let unique_qualifier = client_rx.borrow().unique_qualifier.clone(); + drop(client_rx); + let mut client_rx = scheduler + .find_existing_action(&unique_qualifier) + .await + .err_tip(|| "Action not found")?; + + { + // Worker should have been sent an execute command. + let expected_msg_for_worker = UpdateForWorker { + update: Some(update_for_worker::Update::StartAction(StartExecute { + execute_request: Some(ExecuteRequest { + instance_name: INSTANCE_NAME.to_string(), + skip_cache_lookup: true, + action_digest: Some(action_digest.into()), + ..Default::default() + }), + salt: 0, + queued_timestamp: Some(insert_timestamp.into()), + })), + }; + let msg_for_worker = rx_from_worker.recv().await.unwrap(); + assert_eq!(msg_for_worker, expected_msg_for_worker); + } + { + // Client should get notification saying it's being executed. + let action_state = client_rx.borrow_and_update(); + let expected_action_state = ActionState { + // Name is a random string, so we ignore it and just make it the same. + unique_qualifier: action_state.unique_qualifier.clone(), + stage: ActionStage::Executing, + }; + assert_eq!(action_state.as_ref(), &expected_action_state); + } + + Ok(()) + } + #[tokio::test] async fn remove_worker_reschedules_multiple_running_job_test() -> Result<(), Error> { const WORKER_ID1: WorkerId = WorkerId(0x0011_1111); @@ -663,6 +720,103 @@ mod scheduler_tests { Ok(()) } + #[tokio::test] + async fn update_action_sends_completed_result_after_disconnect() -> Result<(), Error> { + const WORKER_ID: WorkerId = WorkerId(0x1234_5678_9111); + + let scheduler = + SimpleScheduler::new_with_callback(&config::schedulers::SimpleScheduler::default(), || async move {}); + let action_digest = DigestInfo::new([99u8; 32], 512); + + let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, PlatformProperties::default()).await?; + let insert_timestamp = make_system_time(1); + let client_rx = setup_action( + &scheduler, + action_digest, + PlatformProperties::default(), + insert_timestamp, + ) + .await?; + + // Drop our receiver and don't reconnect until completed. + let unique_qualifier = client_rx.borrow().unique_qualifier.clone(); + drop(client_rx); + + { + // Other tests check full data. We only care if we got StartAction. + match rx_from_worker.recv().await.unwrap().update { + Some(update_for_worker::Update::StartAction(_)) => { /* Success */ } + v => panic!("Expected StartAction, got : {v:?}"), + } + } + + let action_info_hash_key = ActionInfoHashKey { + instance_name: INSTANCE_NAME.to_string(), + digest: action_digest, + salt: 0, + }; + let action_result = ActionResult { + output_files: vec![FileInfo { + name_or_path: NameOrPath::Name("hello".to_string()), + digest: DigestInfo::new([5u8; 32], 18), + is_executable: true, + }], + output_folders: vec![DirectoryInfo { + path: "123".to_string(), + tree_digest: DigestInfo::new([9u8; 32], 100), + }], + output_file_symlinks: vec![SymlinkInfo { + name_or_path: NameOrPath::Name("foo".to_string()), + target: "bar".to_string(), + }], + output_directory_symlinks: vec![SymlinkInfo { + name_or_path: NameOrPath::Name("foo2".to_string()), + target: "bar2".to_string(), + }], + exit_code: 0, + stdout_digest: DigestInfo::new([6u8; 32], 19), + stderr_digest: DigestInfo::new([7u8; 32], 20), + execution_metadata: ExecutionMetadata { + worker: WORKER_ID.to_string(), + queued_timestamp: make_system_time(5), + worker_start_timestamp: make_system_time(6), + worker_completed_timestamp: make_system_time(7), + input_fetch_start_timestamp: make_system_time(8), + input_fetch_completed_timestamp: make_system_time(9), + execution_start_timestamp: make_system_time(10), + execution_completed_timestamp: make_system_time(11), + output_upload_start_timestamp: make_system_time(12), + output_upload_completed_timestamp: make_system_time(13), + }, + server_logs: HashMap::default(), + }; + scheduler + .update_action( + &WORKER_ID, + &action_info_hash_key, + ActionStage::Completed(action_result.clone()), + ) + .await?; + + // Now look up a channel after the action has completed. + let mut client_rx = scheduler + .find_existing_action(&unique_qualifier) + .await + .err_tip(|| "Action not found")?; + { + // Client should get notification saying it has been completed. + let action_state = client_rx.borrow_and_update(); + let expected_action_state = ActionState { + // Name is a random string, so we ignore it and just make it the same. + unique_qualifier: action_state.unique_qualifier.clone(), + stage: ActionStage::Completed(action_result), + }; + assert_eq!(action_state.as_ref(), &expected_action_state); + } + + Ok(()) + } + #[tokio::test] async fn update_action_with_wrong_worker_id_errors_test() -> Result<(), Error> { const GOOD_WORKER_ID: WorkerId = WorkerId(0x1234_5678_9111); diff --git a/cas/scheduler/tests/utils/mock_scheduler.rs b/cas/scheduler/tests/utils/mock_scheduler.rs index 5fd7467be..4c0378823 100644 --- a/cas/scheduler/tests/utils/mock_scheduler.rs +++ b/cas/scheduler/tests/utils/mock_scheduler.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use async_trait::async_trait; use tokio::sync::{mpsc, watch, Mutex}; -use action_messages::{ActionInfo, ActionState}; +use action_messages::{ActionInfo, ActionInfoHashKey, ActionState}; use error::{make_input_err, Error}; use platform_property_manager::PlatformPropertyManager; use scheduler::ActionScheduler; @@ -26,11 +26,13 @@ use scheduler::ActionScheduler; enum ActionSchedulerCalls { GetPlatformPropertyManager(String), AddAction(ActionInfo), + FindExistingAction(ActionInfoHashKey), } enum ActionSchedulerReturns { GetPlatformPropertyManager(Result, Error>), AddAction(Result>, Error>), + FindExistingAction(Option>>), } pub struct MockActionScheduler { @@ -91,6 +93,24 @@ impl MockActionScheduler { .unwrap(); req } + + pub async fn expect_find_existing_action( + &self, + result: Option>>, + ) -> ActionInfoHashKey { + let mut rx_call_lock = self.rx_call.lock().await; + let ActionSchedulerCalls::FindExistingAction(req) = rx_call_lock + .recv() + .await + .expect("Could not receive msg in mpsc") else { + panic!("Got incorrect call waiting for find_existing_action") + }; + self.tx_resp + .send(ActionSchedulerReturns::FindExistingAction(result)) + .map_err(|_| make_input_err!("Could not send request to mpsc")) + .unwrap(); + req + } } #[async_trait] @@ -118,4 +138,20 @@ impl ActionScheduler for MockActionScheduler { _ => panic!("Expected add_action return value"), } } + + async fn find_existing_action( + &self, + unique_qualifier: &ActionInfoHashKey, + ) -> Option>> { + self.tx_call + .send(ActionSchedulerCalls::FindExistingAction(unique_qualifier.clone())) + .expect("Could not send request to mpsc"); + let mut rx_resp_lock = self.rx_resp.lock().await; + match rx_resp_lock.recv().await.expect("Could not receive msg in mpsc") { + ActionSchedulerReturns::FindExistingAction(result) => result, + _ => panic!("Expected find_existing_action return value"), + } + } + + async fn clean_recently_completed_actions(&self) {} } diff --git a/config/schedulers.rs b/config/schedulers.rs index 04d99d4a9..ec58b1e26 100644 --- a/config/schedulers.rs +++ b/config/schedulers.rs @@ -91,6 +91,12 @@ pub struct SimpleScheduler { /// config. pub supported_platform_properties: Option>, + /// The amount of time to retain completed actions in memory for in case + /// a WaitExecution is called after the action has completed. + /// Default: 60 (seconds) + #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")] + pub retain_completed_for_s: u64, + /// Remove workers from pool once the worker has not responded in this /// amount of time in seconds. /// Default: 5 (seconds)