Skip to content

Commit

Permalink
fix(dre): Explicitly check if nodes are in open proposals for network…
Browse files Browse the repository at this point in the history
… --ensure-*

Not completely sure if this check is required, but better to be on the safe side.
  • Loading branch information
sasa-tomic committed Nov 15, 2024
1 parent 9d086ab commit 4f63870
Showing 1 changed file with 82 additions and 28 deletions.
110 changes: 82 additions & 28 deletions rs/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,23 +590,22 @@ impl Runner {
node_operators_all: &IndexMap<PrincipalId, Operator>,
all_nodes_grouped_by_operator: &HashMap<PrincipalId, Vec<Node>>,
available_nodes_grouped_by_operator: &HashMap<PrincipalId, Vec<Node>>,
nodes_in_subnets_or_proposals: &AHashMap<PrincipalId, Node>,
nodes_all: &IndexMap<PrincipalId, Node>,
subnets: &IndexMap<PrincipalId, Subnet>,
ensure_assigned: bool,
) -> Vec<(PrincipalId, String, Vec<Node>)> {
let nodes_in_subnets = subnets
.values()
.flat_map(|s| s.nodes.iter().map(|n| (n.principal, n)))
.collect::<AHashMap<_, _>>();

node_operators_all
.iter()
.filter_map(|(operator_id, operator)| {
all_nodes_grouped_by_operator.get(operator_id).and_then(|operator_nodes| {
let condition = if ensure_assigned {
operator_nodes.iter().all(|node| !nodes_in_subnets.contains_key(&node.principal))
operator_nodes
.iter()
.all(|node| !nodes_in_subnets_or_proposals.contains_key(&node.principal))
} else {
operator_nodes.iter().all(|node| nodes_in_subnets.contains_key(&node.principal))
operator_nodes
.iter()
.all(|node| nodes_in_subnets_or_proposals.contains_key(&node.principal))
};

if condition {
Expand All @@ -620,7 +619,7 @@ impl Runner {
} else {
operator_nodes
.iter()
.filter_map(|node| nodes_in_subnets.get(&node.principal))
.filter_map(|node| nodes_in_subnets_or_proposals.get(&node.principal))
.map(|n| (*n).clone())
.collect::<Vec<_>>()
};
Expand Down Expand Up @@ -715,13 +714,28 @@ impl Runner {
best_change
}

pub fn num_nodes_per_operator(all_nodes: &[Node], operator_id: &PrincipalId) -> usize {
all_nodes.iter().filter(|n| &n.operator.principal == operator_id).count()
}

pub async fn network_ensure_operator_nodes(
&self,
forum_post_link: Option<String>,
skip_subnets: &[String],
ensure_assigned: bool,
) -> anyhow::Result<Vec<RunnerProposal>> {
let mut subnets = self.get_subnets(skip_subnets).await?;
let subnets_not_skipped = self.get_subnets(skip_subnets).await?;
let mut subnets_that_have_no_proposals = subnets_not_skipped
.iter()
.filter(|(subnet_id, subnet)| match &subnet.proposal {
Some(p) => {
info!("Skipping subnet {} as it has a pending proposal {}", subnet_id, p.id);
false
}
None => true,
})
.map(|(id, subnet)| (*id, subnet.clone()))
.collect::<IndexMap<_, _>>();
let (mut available_nodes, health_of_nodes) = self.get_available_and_healthy_nodes().await?;
let all_node_operators = self.registry.operators().await?;
let all_nodes_map = self.registry.nodes().await?;
Expand All @@ -734,6 +748,15 @@ impl Runner {
.iter()
.map(|n| (*n).clone())
.into_group_map_by(|node| all_nodes_map.get(&node.principal).expect("Node should exist").operator.principal);
let nodes_with_proposals_grouped_by_operator = self
.registry
.nodes_and_proposals()
.await?
.iter()
.filter(|(_, n)| n.proposal.is_some())
.map(|(_, n)| n.clone())
.into_group_map_by(|node| node.operator.principal);
let nodes_in_subnets_or_proposals = nodes_in_subnets_or_proposals(&subnets_that_have_no_proposals, &nodes_with_proposals_grouped_by_operator);
let cordoned_features = self.cordoned_features_fetcher.fetch().await.unwrap_or_else(|e| {
warn!("Failed to fetch cordoned features with error: {:?}", e);
warn!("Will continue running as if no features were cordoned");
Expand All @@ -744,8 +767,8 @@ impl Runner {
&all_node_operators,
&all_nodes_grouped_by_operator,
&available_nodes_grouped_by_operator,
&nodes_in_subnets_or_proposals,
&all_nodes_map,
&subnets,
ensure_assigned,
);

Expand Down Expand Up @@ -773,7 +796,7 @@ impl Runner {
let node = healthy_operator_nodes.first().expect("At least one node should be present");
let best_change = self
.get_best_change_for_operator(
&subnets,
&subnets_that_have_no_proposals,
&available_nodes,
&health_of_nodes,
node,
Expand All @@ -784,26 +807,38 @@ impl Runner {
.await;

if let Some(change) = best_change {
info!(
"{} node {} of the operator {} in DC {} {} subnet {}",
if ensure_assigned { "Adding" } else { "Removing" },
node.principal.to_string().split_once('-').unwrap().0,
operator_id.to_string().split_once('-').unwrap().0,
dc,
if ensure_assigned { "to" } else { "from" },
change
.subnet_id
.expect("Subnet ID should be present")
.to_string()
.split_once('-')
.unwrap()
.0,
);
if ensure_assigned {
// If this is the single assigned node of the removed operator, we should not remove it
let node_id_removed = change.node_ids_removed.first().expect("At least one node should be present");
let node_removed = all_nodes_map.get(node_id_removed).expect("Node should exist");
if Self::num_nodes_per_operator(&all_nodes, &node_removed.operator.principal) < 2 {
warn!(
"Skipping node {} of the operator {} in DC {} as it is the only node of the operator",
node.principal.to_string().split_once('-').unwrap().0,
operator_id.to_string().split_once('-').unwrap().0,
dc
);
continue;
}
} else {
// Ensuring that some nodes are unassigned ==> prevent that we're making the same problem again by assigning all nodes of the newly picked operator
let node_id_added = change.node_ids_added.first().expect("At least one node should be present");
let node_added = all_nodes_map.get(node_id_added).expect("Node should exist");
if Self::num_nodes_per_operator(&available_nodes, &node_added.operator.principal) < 2 {
warn!(
"Skipping node {} of the operator {} in DC {} as it does not have enough available nodes",
node_added.principal.to_string().split_once('-').unwrap().0,
node_added.operator.principal.to_string().split_once('-').unwrap().0,
dc
);
continue;
}
};
changes.push(
self.run_membership_change(change.clone(), replace_proposal_options(&change, forum_post_link.clone()).await?)
.await?,
);
subnets.shift_remove(&change.subnet_id.expect("Subnet ID should be present"));
subnets_that_have_no_proposals.shift_remove(&change.subnet_id.expect("Subnet ID should be present"));
available_nodes.retain(|n| n.principal != node.principal);
} else {
warn!(
Expand Down Expand Up @@ -1038,6 +1073,25 @@ impl Runner {
}
}

fn nodes_in_subnets_or_proposals(
subnets: &IndexMap<PrincipalId, Subnet>,
nodes_with_proposals_grouped_by_operator: &HashMap<PrincipalId, Vec<Node>>,
) -> AHashMap<PrincipalId, Node> {
let nodes_in_subnets = subnets
.values()
.flat_map(|s| s.nodes.iter().map(|n| (n.principal, n.clone())))
.collect::<AHashMap<_, _>>();
let nodes_in_subnets_or_proposals = nodes_in_subnets
.into_iter()
.chain(
nodes_with_proposals_grouped_by_operator
.iter()
.flat_map(|(_, nodes)| nodes.iter().map(|n| (n.principal, n.clone()))),
)
.collect::<AHashMap<_, _>>();
nodes_in_subnets_or_proposals
}

pub async fn replace_proposal_options(change: &SubnetChangeResponse, forum_post_link: Option<String>) -> anyhow::Result<ic_admin::ProposeOptions> {
let subnet_id = change.subnet_id.ok_or_else(|| anyhow::anyhow!("subnet_id is required"))?.to_string();

Expand Down

0 comments on commit 4f63870

Please sign in to comment.