Skip to content

Commit

Permalink
feat: add support for removing cordoned nodes from subnets (#1120)
Browse files Browse the repository at this point in the history
  • Loading branch information
sasa-tomic authored Nov 26, 2024
1 parent ba28578 commit 1ea97a9
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 12 deletions.
25 changes: 23 additions & 2 deletions rs/cli/src/commands/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ pub struct Network {
/// Skip provided subnets.
#[clap(long, num_args(1..))]
pub skip_subnets: Vec<String>,

/// Remove cordoned nodes from their subnets.
#[clap(long)]
pub remove_cordoned_nodes: bool,
}

impl ExecutableCommand for Network {
Expand Down Expand Up @@ -90,6 +94,23 @@ impl ExecutableCommand for Network {
info!("No network ensure operator nodes unassigned requested");
}

if self.remove_cordoned_nodes {
info!("Removing cordoned nodes from their subnets");
let maybe_proposals = runner.network_remove_cordoned_nodes(ctx.forum_post_link(), &self.skip_subnets).await;
match maybe_proposals {
Ok(remove_cordoned_nodes_proposals) => proposals.extend(remove_cordoned_nodes_proposals),
Err(e) => errors.push(DetailedError {
proposal: None,
error: anyhow::anyhow!(
"Failed to calculate proposals for removing cordoned nodes and they won't be submitted. Error received: {:?}",
e
),
}),
}
} else {
info!("No network remove cordoned nodes requested");
}

// This check saves time if there are no proposals to be submitted
// because it won't check for new versions of ic admin
if !proposals.is_empty() {
Expand Down Expand Up @@ -226,10 +247,10 @@ Error {}.:
fn validate(&self, _args: &crate::commands::Args, cmd: &mut clap::Command) {
// At least one of the two options must be provided
let network_heal = self.heal || std::env::args().any(|arg| arg == "heal");
if !network_heal && !self.ensure_operator_nodes_assigned && !self.ensure_operator_nodes_unassigned {
if !network_heal && !self.ensure_operator_nodes_assigned && !self.ensure_operator_nodes_unassigned && !self.remove_cordoned_nodes {
cmd.error(
clap::error::ErrorKind::MissingRequiredArgument,
"At least one of '--heal' or '--ensure-operator-nodes-assigned' or '--ensure-operator-nodes-unassigned' must be specified.",
"At least one of '--heal' or '--ensure-operator-nodes-assigned' or '--ensure-operator-nodes-unassigned' or '--remove-cordoned-nodes' must be specified.",
)
.exit()
}
Expand Down
6 changes: 5 additions & 1 deletion rs/cli/src/ic_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl IcAdminImpl {
opts.motivation
.map(|m| format!(
"\n\nMotivation: {m}{}",
match opts.forum_post_link {
match &opts.forum_post_link {
Some(link) => format!("\nForum post link: {}\n", link),
None => "".to_string(),
}
Expand All @@ -275,6 +275,10 @@ impl IcAdminImpl {
.unwrap_or_default(),
cmd.args(),
self.neuron.proposer_as_arg_vec(),
match &opts.forum_post_link {
Some(link) if link.to_lowercase().starts_with("https://") => vec!["--proposal-url".to_string(), link.clone()],
_ => vec![],
},
]
.concat()
.as_slice(),
Expand Down
112 changes: 103 additions & 9 deletions rs/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,17 @@ impl Runner {
.collect::<IndexMap<_, _>>())
}

async fn get_available_and_healthy_nodes(&self) -> anyhow::Result<(Vec<Node>, IndexMap<PrincipalId, HealthStatus>)> {
try_join(self.registry.available_nodes().map_err(anyhow::Error::from), self.health_client.nodes()).await
async fn get_available_and_healthy_nodes(
&self,
cordoned_features: &[CordonedFeature],
) -> anyhow::Result<(Vec<Node>, IndexMap<PrincipalId, HealthStatus>)> {
let (available_nodes, node_health) =
try_join(self.registry.available_nodes().map_err(anyhow::Error::from), self.health_client.nodes()).await?;
let available_nodes = available_nodes
.into_iter()
.filter(|n| !cordoned_features.iter().any(|cf| n.get_feature(&cf.feature).as_ref() == Some(&cf.value)))
.collect();
Ok((available_nodes, node_health))
}

fn get_operators_to_optimize(
Expand Down Expand Up @@ -736,7 +745,12 @@ impl Runner {
})
.map(|(id, subnet)| (*id, subnet.clone()))
.collect::<IndexMap<_, _>>();
let (mut available_nodes, health_of_nodes) = self.get_available_and_healthy_nodes().await?;
let cordoned_features = self.cordoned_features_fetcher.fetch().await.unwrap_or_else(|e| {
warn!("Failed to fetch cordoned features with error: {:?}", e);
warn!("Will continue running as if no features were cordoned");
vec![]
});
let (mut available_nodes, health_of_nodes) = self.get_available_and_healthy_nodes(&cordoned_features).await?;
let all_node_operators = self.registry.operators().await?;
let all_nodes_map = self.registry.nodes().await?;
let all_nodes = all_nodes_map.values().cloned().collect_vec();
Expand All @@ -757,11 +771,6 @@ impl Runner {
.map(|(_, n)| n.clone())
.into_group_map_by(|node| node.operator.principal);
let nodes_in_subnets_or_proposals = nodes_in_subnets_or_proposals(&subnets_that_have_no_proposals, &nodes_with_proposals_grouped_by_operator);
let cordoned_features = self.cordoned_features_fetcher.fetch().await.unwrap_or_else(|e| {
warn!("Failed to fetch cordoned features with error: {:?}", e);
warn!("Will continue running as if no features were cordoned");
vec![]
});

let operators_to_optimize = self.get_operators_to_optimize(
&all_node_operators,
Expand Down Expand Up @@ -839,7 +848,7 @@ impl Runner {
.await?,
);
subnets_that_have_no_proposals.shift_remove(&change.subnet_id.expect("Subnet ID should be present"));
available_nodes.retain(|n| n.principal != node.principal);
available_nodes.retain(|n| !change.node_ids_added.contains(&n.principal));
} else {
warn!(
"{} node {} of the operator {} in DC {} would worsen decentralization in all subnets!",
Expand Down Expand Up @@ -869,6 +878,91 @@ impl Runner {
self.network_ensure_operator_nodes(forum_post_link, skip_subnets, false).await
}

pub async fn network_remove_cordoned_nodes(
&self,
forum_post_link: Option<String>,
skip_subnets: &[String],
) -> anyhow::Result<Vec<RunnerProposal>> {
let subnets_not_skipped = self.get_subnets(skip_subnets).await?;
let cordoned_features = self.cordoned_features_fetcher.fetch().await.unwrap_or_else(|e| {
warn!("Failed to fetch cordoned features with error: {:?}", e);
warn!("Will continue running as if no features were cordoned");
vec![]
});
let (mut available_nodes, health_of_nodes) = self.get_available_and_healthy_nodes(&cordoned_features).await?;
let all_nodes = self.registry.nodes().await?;
let all_nodes = all_nodes.values().cloned().collect::<Vec<Node>>();
let all_nodes_map = self.registry.nodes().await?;

let mut changes = vec![];
// Iterate through all subnets and then through all nodes of each subnet, and check if any of the nodes matches the cordoned features
for (_subnet_id, subnet) in &subnets_not_skipped {
let subnet = DecentralizedSubnet::from(subnet);
let subnet_id_short = subnet.id.to_string().split_once('-').unwrap().0.to_string();

let mut nodes_to_remove_with_explanations = vec![];
for node in &subnet.nodes {
let node = all_nodes_map.get(&node.principal).expect("Node should exist");
if let Some(explanation) = cordoned_features.iter().find_map(|cf| {
if node.get_feature(&cf.feature).as_ref() == Some(&cf.value) {
Some(cf.explanation.as_ref().map(|e| format!(": {}", e)).unwrap_or_default())
} else {
None
}
}) {
nodes_to_remove_with_explanations.push((node.clone(), explanation));
}
}

if !nodes_to_remove_with_explanations.is_empty() {
let change_request = SubnetChangeRequest::new(
subnet,
available_nodes.clone(),
vec![],
nodes_to_remove_with_explanations.iter().map(|(n, _)| n.clone()).collect_vec(),
vec![],
)
.resize(
nodes_to_remove_with_explanations.len(),
0,
0,
&health_of_nodes,
cordoned_features.clone(),
&all_nodes,
);

if let Ok(change) = change_request {
let change_response = SubnetChangeResponse::new(
&change,
&health_of_nodes,
Some(format!(
"The following nodes in subnet `{}` have been cordoned and will be removed:\n{}",
subnet_id_short,
nodes_to_remove_with_explanations
.iter()
.map(|(n, e)| format!("- {}{}", n.principal.to_string().split_once('-').unwrap().0, e))
.join("\n"),
)),
);

if change_response.node_ids_added.is_empty() && change_response.node_ids_removed.is_empty() {
continue;
}

changes.push(
self.run_membership_change(
change_response.clone(),
replace_proposal_options(&change_response, forum_post_link.clone()).await?,
)
.await?,
);
available_nodes.retain(|n| !change_response.node_ids_added.contains(&n.principal));
}
}
}
Ok(changes)
}

pub async fn decentralization_change(
&self,
change: &ChangeSubnetMembershipPayload,
Expand Down

0 comments on commit 1ea97a9

Please sign in to comment.