Skip to content

Commit

Permalink
implement sticky assignment
Browse files Browse the repository at this point in the history
  • Loading branch information
Longarithm committed May 8, 2024
1 parent a1c964d commit b5224eb
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 45 deletions.
26 changes: 20 additions & 6 deletions chain/epoch-manager/src/shard_assignment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub fn assign_shards<T: HasStake + Eq + Clone>(
chunk_producers: Vec<T>,
num_shards: NumShards,
min_validators_per_shard: usize,
prev_chunk_producers_settlement: &[Vec<near_primitives::types::ValidatorId>],
prev_remained_chunk_producers_settlement: &[Vec<usize>],
) -> Result<Vec<Vec<T>>, ()> {
// If there's not enough chunk producers to fill up a single shard there’s
// nothing we can do. Return with an error.
Expand All @@ -37,13 +37,27 @@ pub fn assign_shards<T: HasStake + Eq + Clone>(
);
}

// Initially, sort by number of validators first so we fill shards up.
let mut result: Vec<Vec<T>> = (0..num_shards).map(|_| Vec::new()).collect();
let mut initial_stakes: Vec<Balance> = vec![0; num_shards as usize];

// Initially, sort by number of validators first so we fill shards up.
let mut shard_index: MinHeap<(usize, Balance, ShardId)> =
(0..num_shards).map(|s| (0, 0, s)).collect();
// First, fill seats with previous chunk producer remained online.
for (shard_id, remained_chunk_producers) in
prev_remained_chunk_producers_settlement.iter().enumerate()
{
for &cp_index in remained_chunk_producers {
let cp = chunk_producers[cp_index].clone();
initial_stakes[shard_id] += cp.get_stake();
result[shard_id].push(cp);
}
}
let mut shard_index: MinHeap<(usize, Balance, ShardId)> = (0..num_shards)
.map(|i| (result[i as usize].len(), initial_stakes[i as usize], i))
.collect();

// TODO: move used chunk producers to the end of the list

// First, distribute chunk producers until all shards have at least the
// Second, distribute chunk producers until all shards have at least the
// minimum requested number. If there are not enough validators to satisfy
// that requirement, assign some of the validators to multiple shards.
let mut chunk_producers = chunk_producers.into_iter().enumerate().cycle();
Expand All @@ -54,7 +68,7 @@ pub fn assign_shards<T: HasStake + Eq + Clone>(
min_validators_per_shard,
);

// Second, if there are any unassigned chunk producers left, distribute them
// Third, if there are any unassigned chunk producers left, distribute them
// between shards trying to balance total stake.
let remaining_producers =
num_chunk_producers.saturating_sub(num_shards as usize * min_validators_per_shard);
Expand Down
122 changes: 83 additions & 39 deletions chain/epoch-manager/src/validator_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use near_primitives::types::{
use near_primitives::validator_mandates::{ValidatorMandates, ValidatorMandatesConfig};
use num_rational::Ratio;
use rand::seq::SliceRandom;
use rand::Rng;
use std::cmp::{self, Ordering};
use std::collections::hash_map;
use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet};
Expand Down Expand Up @@ -156,46 +157,13 @@ pub fn proposals_to_epoch_info(
}
}

// Assign chunk producers to shards.
let num_chunk_producers = chunk_producers.len();
let chunk_producer_set =
chunk_producers.iter().map(|vs| vs.account_id().clone()).collect::<HashSet<_>>();
let minimum_validators_per_shard =
epoch_config.validator_selection_config.minimum_validators_per_shard as usize;
let prev_chunk_producers_settlement = if !will_shard_layout_change {
let s = prev_epoch_info.chunk_producers_settlement();
let prev_chunk_validator_accounts = s
.iter()
.map(|vs| {
vs.into_iter()
.map(|v| {
let account_id =
prev_epoch_info.get_validator(*v).account_id().clone();
if chunk_producer_set.contains(&account_id) {
validator_to_index.get(&account_id).copied()
} else {
None
}
})
.flatten()
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
prev_chunk_validator_accounts
} else {
vec![]
};
let shard_assignment = assign_shards(
let shard_assignment = assign_chunk_producers_to_shards(
epoch_config,
&rng_seed,
chunk_producers,
shard_ids.len() as NumShards,
minimum_validators_per_shard,
&prev_chunk_producers_settlement,
)
.map_err(|_| EpochError::NotEnoughValidators {
num_validators: num_chunk_producers as u64,
num_shards: shard_ids.len() as NumShards,
})?;

prev_epoch_info,
will_shard_layout_change,
)?;
let chunk_producers_settlement = shard_assignment
.into_iter()
.map(|vs| vs.into_iter().map(|v| validator_to_index[v.account_id()]).collect())
Expand Down Expand Up @@ -384,6 +352,82 @@ fn select_validators(
}
}

fn assign_chunk_producers_to_shards(
epoch_config: &EpochConfig,
rng_seed: &RngSeed,
chunk_producers: Vec<ValidatorStake>,
prev_epoch_info: &EpochInfo,
will_shard_layout_change: bool,
) -> Result<Vec<Vec<ValidatorStake>>, EpochError> {
let shard_ids: Vec<_> = epoch_config.shard_layout.shard_ids().collect();
let num_chunk_producers = chunk_producers.len();
let chunk_producer_indices = chunk_producers
.iter()
.enumerate()
.map(|(i, vs)| (vs.account_id().clone(), i))
.collect::<HashMap<_, _>>();
let minimum_validators_per_shard =
epoch_config.validator_selection_config.minimum_validators_per_shard as usize;
let prev_remained_chunk_producers_settlement = if !will_shard_layout_change {
let s = prev_epoch_info.chunk_producers_settlement();
let prev_chunk_validator_accounts = s
.iter()
.map(|vs| {
vs.into_iter()
.map(|v| {
let account_id = prev_epoch_info.get_validator(*v).account_id().clone();
chunk_producer_indices.get(&account_id).copied()
})
.flatten()
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
prev_chunk_validator_accounts
} else {
vec![vec![]; shard_ids.len()]
};

// filter out same shard ids which could appear due to
// minimum_validators_per_shard requirement
let mut prev_remained_chunk_producers_to_shards = HashMap::<usize, Vec<usize>>::new();
for (index, chunk_producers) in prev_remained_chunk_producers_settlement.iter().enumerate() {
for chunk_producer in chunk_producers {
prev_remained_chunk_producers_to_shards
.entry(*chunk_producer)
.or_insert(vec![])
.push(index);
}
}

let mut rng = EpochInfo::shard_assignment_shuffling_rng(rng_seed);
let prev_remained_chunk_producers_to_unique_shards = prev_remained_chunk_producers_to_shards
.iter()
.map(|(chunk_producer, shards)| {
let index = rng.gen_range(0..shards.len());
(*chunk_producer, shards[index])
})
.collect::<HashMap<_, _>>();
let mut prev_remained_unique_chunk_producers_settlement = vec![vec![]; shard_ids.len()];
for (chunk_producer, shard) in prev_remained_chunk_producers_to_unique_shards {
prev_remained_unique_chunk_producers_settlement[shard].push(chunk_producer);
}
for chunk_producers in prev_remained_unique_chunk_producers_settlement.iter_mut() {
chunk_producers.sort();
}

let shard_assignment = assign_shards(
chunk_producers,
shard_ids.len() as NumShards,
minimum_validators_per_shard,
&prev_remained_unique_chunk_producers_settlement,
)
.map_err(|_| EpochError::NotEnoughValidators {
num_validators: num_chunk_producers as u64,
num_shards: shard_ids.len() as NumShards,
})?;

Ok(shard_assignment)
}
/// We store stakes in max heap and want to order them such that the validator
/// with the largest state and (in case of a tie) lexicographically smallest
/// AccountId comes at the top.
Expand Down

0 comments on commit b5224eb

Please sign in to comment.