From b5224eb0dd6de6007c41e72d36ac51bc7d56b03b Mon Sep 17 00:00:00 2001 From: Longarithm Date: Thu, 9 May 2024 03:00:25 +0400 Subject: [PATCH] implement sticky assignment --- chain/epoch-manager/src/shard_assignment.rs | 26 +++- .../epoch-manager/src/validator_selection.rs | 122 ++++++++++++------ 2 files changed, 103 insertions(+), 45 deletions(-) diff --git a/chain/epoch-manager/src/shard_assignment.rs b/chain/epoch-manager/src/shard_assignment.rs index 3a8275d2d6e..ca522c6bf0d 100644 --- a/chain/epoch-manager/src/shard_assignment.rs +++ b/chain/epoch-manager/src/shard_assignment.rs @@ -20,7 +20,7 @@ pub fn assign_shards( chunk_producers: Vec, num_shards: NumShards, min_validators_per_shard: usize, - prev_chunk_producers_settlement: &[Vec], + prev_remained_chunk_producers_settlement: &[Vec], ) -> Result>, ()> { // If there's not enough chunk producers to fill up a single shard there’s // nothing we can do. Return with an error. @@ -37,13 +37,27 @@ pub fn assign_shards( ); } + // Initially, sort by number of validators first so we fill shards up. let mut result: Vec> = (0..num_shards).map(|_| Vec::new()).collect(); + let mut initial_stakes: Vec = vec![0; num_shards as usize]; - // Initially, sort by number of validators first so we fill shards up. - let mut shard_index: MinHeap<(usize, Balance, ShardId)> = - (0..num_shards).map(|s| (0, 0, s)).collect(); + // First, fill seats with previous chunk producer remained online. + for (shard_id, remained_chunk_producers) in + prev_remained_chunk_producers_settlement.iter().enumerate() + { + for &cp_index in remained_chunk_producers { + let cp = chunk_producers[cp_index].clone(); + initial_stakes[shard_id] += cp.get_stake(); + result[shard_id].push(cp); + } + } + let mut shard_index: MinHeap<(usize, Balance, ShardId)> = (0..num_shards) + .map(|i| (result[i as usize].len(), initial_stakes[i as usize], i)) + .collect(); + + // TODO: move used chunk producers to the end of the list - // First, distribute chunk producers until all shards have at least the + // Second, distribute chunk producers until all shards have at least the // minimum requested number. If there are not enough validators to satisfy // that requirement, assign some of the validators to multiple shards. let mut chunk_producers = chunk_producers.into_iter().enumerate().cycle(); @@ -54,7 +68,7 @@ pub fn assign_shards( min_validators_per_shard, ); - // Second, if there are any unassigned chunk producers left, distribute them + // Third, if there are any unassigned chunk producers left, distribute them // between shards trying to balance total stake. let remaining_producers = num_chunk_producers.saturating_sub(num_shards as usize * min_validators_per_shard); diff --git a/chain/epoch-manager/src/validator_selection.rs b/chain/epoch-manager/src/validator_selection.rs index e2db264a24f..9a3750b615d 100644 --- a/chain/epoch-manager/src/validator_selection.rs +++ b/chain/epoch-manager/src/validator_selection.rs @@ -10,6 +10,7 @@ use near_primitives::types::{ use near_primitives::validator_mandates::{ValidatorMandates, ValidatorMandatesConfig}; use num_rational::Ratio; use rand::seq::SliceRandom; +use rand::Rng; use std::cmp::{self, Ordering}; use std::collections::hash_map; use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet}; @@ -156,46 +157,13 @@ pub fn proposals_to_epoch_info( } } - // Assign chunk producers to shards. - let num_chunk_producers = chunk_producers.len(); - let chunk_producer_set = - chunk_producers.iter().map(|vs| vs.account_id().clone()).collect::>(); - let minimum_validators_per_shard = - epoch_config.validator_selection_config.minimum_validators_per_shard as usize; - let prev_chunk_producers_settlement = if !will_shard_layout_change { - let s = prev_epoch_info.chunk_producers_settlement(); - let prev_chunk_validator_accounts = s - .iter() - .map(|vs| { - vs.into_iter() - .map(|v| { - let account_id = - prev_epoch_info.get_validator(*v).account_id().clone(); - if chunk_producer_set.contains(&account_id) { - validator_to_index.get(&account_id).copied() - } else { - None - } - }) - .flatten() - .collect::>() - }) - .collect::>(); - prev_chunk_validator_accounts - } else { - vec![] - }; - let shard_assignment = assign_shards( + let shard_assignment = assign_chunk_producers_to_shards( + epoch_config, + &rng_seed, chunk_producers, - shard_ids.len() as NumShards, - minimum_validators_per_shard, - &prev_chunk_producers_settlement, - ) - .map_err(|_| EpochError::NotEnoughValidators { - num_validators: num_chunk_producers as u64, - num_shards: shard_ids.len() as NumShards, - })?; - + prev_epoch_info, + will_shard_layout_change, + )?; let chunk_producers_settlement = shard_assignment .into_iter() .map(|vs| vs.into_iter().map(|v| validator_to_index[v.account_id()]).collect()) @@ -384,6 +352,82 @@ fn select_validators( } } +fn assign_chunk_producers_to_shards( + epoch_config: &EpochConfig, + rng_seed: &RngSeed, + chunk_producers: Vec, + prev_epoch_info: &EpochInfo, + will_shard_layout_change: bool, +) -> Result>, EpochError> { + let shard_ids: Vec<_> = epoch_config.shard_layout.shard_ids().collect(); + let num_chunk_producers = chunk_producers.len(); + let chunk_producer_indices = chunk_producers + .iter() + .enumerate() + .map(|(i, vs)| (vs.account_id().clone(), i)) + .collect::>(); + let minimum_validators_per_shard = + epoch_config.validator_selection_config.minimum_validators_per_shard as usize; + let prev_remained_chunk_producers_settlement = if !will_shard_layout_change { + let s = prev_epoch_info.chunk_producers_settlement(); + let prev_chunk_validator_accounts = s + .iter() + .map(|vs| { + vs.into_iter() + .map(|v| { + let account_id = prev_epoch_info.get_validator(*v).account_id().clone(); + chunk_producer_indices.get(&account_id).copied() + }) + .flatten() + .collect::>() + }) + .collect::>(); + prev_chunk_validator_accounts + } else { + vec![vec![]; shard_ids.len()] + }; + + // filter out same shard ids which could appear due to + // minimum_validators_per_shard requirement + let mut prev_remained_chunk_producers_to_shards = HashMap::>::new(); + for (index, chunk_producers) in prev_remained_chunk_producers_settlement.iter().enumerate() { + for chunk_producer in chunk_producers { + prev_remained_chunk_producers_to_shards + .entry(*chunk_producer) + .or_insert(vec![]) + .push(index); + } + } + + let mut rng = EpochInfo::shard_assignment_shuffling_rng(rng_seed); + let prev_remained_chunk_producers_to_unique_shards = prev_remained_chunk_producers_to_shards + .iter() + .map(|(chunk_producer, shards)| { + let index = rng.gen_range(0..shards.len()); + (*chunk_producer, shards[index]) + }) + .collect::>(); + let mut prev_remained_unique_chunk_producers_settlement = vec![vec![]; shard_ids.len()]; + for (chunk_producer, shard) in prev_remained_chunk_producers_to_unique_shards { + prev_remained_unique_chunk_producers_settlement[shard].push(chunk_producer); + } + for chunk_producers in prev_remained_unique_chunk_producers_settlement.iter_mut() { + chunk_producers.sort(); + } + + let shard_assignment = assign_shards( + chunk_producers, + shard_ids.len() as NumShards, + minimum_validators_per_shard, + &prev_remained_unique_chunk_producers_settlement, + ) + .map_err(|_| EpochError::NotEnoughValidators { + num_validators: num_chunk_producers as u64, + num_shards: shard_ids.len() as NumShards, + })?; + + Ok(shard_assignment) +} /// We store stakes in max heap and want to order them such that the validator /// with the largest state and (in case of a tie) lexicographically smallest /// AccountId comes at the top.