Skip to content

Commit

Permalink
[Refactor] Workers::find_worker_for_action should take PlatformProper…
Browse files Browse the repository at this point in the history
…ties (#1068)

Refactoring out the dependency of `AwaitedAction` when looking up
`PlatformProperties` in `do_try_match()`.
  • Loading branch information
adam-singer committed Jun 29, 2024
1 parent 67e3164 commit f5e7276
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 27 deletions.
15 changes: 5 additions & 10 deletions nativelink-scheduler/src/scheduler_state/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
use lru::LruCache;
use nativelink_config::schedulers::WorkerAllocationStrategy;
use nativelink_error::{error_if, make_input_err, Error, ResultExt};
use nativelink_util::action_messages::{ActionStage, WorkerId};
use nativelink_util::action_messages::WorkerId;
use nativelink_util::platform_properties::PlatformProperties;
use tracing::{event, Level};

use crate::scheduler_state::awaited_action::AwaitedAction;
use crate::worker::{Worker, WorkerTimestamp};

/// A collection of workers that are available to run tasks.
Expand Down Expand Up @@ -96,22 +96,17 @@ impl Workers {
// simulation of worst cases in a single threaded environment.
pub(crate) fn find_worker_for_action(
&self,
awaited_action: &AwaitedAction,
platform_properties: &PlatformProperties,
) -> Option<WorkerId> {
assert!(matches!(
awaited_action.current_state.stage,
ActionStage::Queued
));
let action_properties = &awaited_action.action_info.platform_properties;
let mut workers_iter = self.workers.iter();
let workers_iter = match self.allocation_strategy {
// Use rfind to get the least recently used that satisfies the properties.
WorkerAllocationStrategy::least_recently_used => workers_iter.rfind(|(_, w)| {
w.can_accept_work() && action_properties.is_satisfied_by(&w.platform_properties)
w.can_accept_work() && platform_properties.is_satisfied_by(&w.platform_properties)
}),
// Use find to get the most recently used that satisfies the properties.
WorkerAllocationStrategy::most_recently_used => workers_iter.find(|(_, w)| {
w.can_accept_work() && action_properties.is_satisfied_by(&w.platform_properties)
w.can_accept_work() && platform_properties.is_satisfied_by(&w.platform_properties)
}),
};
workers_iter.map(|(_, w)| &w.id).copied()
Expand Down
30 changes: 13 additions & 17 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use crate::operation_state_manager::{
OperationStageFlags, WorkerStateManager,
};
use crate::platform_property_manager::PlatformPropertyManager;
use crate::scheduler_state::awaited_action::AwaitedAction;
use crate::scheduler_state::metrics::Metrics as SchedulerMetrics;
use crate::scheduler_state::state_manager::StateManager;
use crate::scheduler_state::workers::Workers;
Expand Down Expand Up @@ -274,6 +273,17 @@ impl SimpleSchedulerImpl {
match action_state_results {
Ok(mut stream) => {
while let Some(action_state_result) = stream.next().await {
let as_state_result = action_state_result.as_state().await;
let Ok(state) = as_state_result else {
let _ = as_state_result.inspect_err(|err| {
event!(
Level::ERROR,
?err,
"Failed to get action_info from as_state_result stream"
);
});
continue;
};
let action_state_result = action_state_result.as_action_info().await;
let Ok(action_info) = action_state_result else {
let _ = action_state_result.inspect_err(|err| {
Expand All @@ -286,28 +296,14 @@ impl SimpleSchedulerImpl {
continue;
};

let Some(awaited_action): Option<&AwaitedAction> = self
.state_manager
.inner
.queued_actions
.get(action_info.as_ref())
else {
event!(
Level::ERROR,
?action_info,
"queued_actions out of sync with itself"
);
continue;
};

let maybe_worker_id: Option<WorkerId> = {
self.state_manager
.inner
.workers
.find_worker_for_action(awaited_action)
.find_worker_for_action(&action_info.platform_properties)
};

let operation_id = awaited_action.current_state.id.clone();
let operation_id = state.id.clone();
let ret = <StateManager as MatchingEngineStateManager>::update_operation(
&mut self.state_manager,
operation_id.clone(),
Expand Down

0 comments on commit f5e7276

Please sign in to comment.