Skip to content

Commit

Permalink
chore: minor refactor for weighted choose
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Oct 31, 2024
1 parent 758ad0a commit 1ba9ff2
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 18 deletions.
44 changes: 30 additions & 14 deletions src/meta-srv/src/selector/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::metasrv::SelectTarget;
use crate::selector::SelectorOptions;

/// According to the `opts`, choose peers from the `weight_array` through `weighted_choose`.
pub fn choose_peers<W>(opts: &SelectorOptions, weighted_choose: &mut W) -> Result<Vec<Peer>>
pub fn choose_items<W>(opts: &SelectorOptions, weighted_choose: &mut W) -> Result<Vec<Peer>>
where
W: WeightedChoose<Peer>,
{
Expand All @@ -36,20 +36,36 @@ where
}
);

if min_required_items == 1 {
// fast path
return Ok(vec![weighted_choose.choose_one()?]);
}

let available_count = weighted_choose.len();

if opts.allow_duplication {
(0..min_required_items)
.map(|_| weighted_choose.choose_one())
.collect::<Result<_>>()
} else {
let weight_array_len = weighted_choose.len();
// Calculate how many complete rounds of `available_count` items to select,
// plus any additional items needed after complete rounds.
let complete_batches = min_required_items / available_count;
let leftover_items = min_required_items % available_count;
if complete_batches == 0 {
return weighted_choose.choose_multiple(leftover_items);
}

let mut result = Vec::with_capacity(min_required_items);
for _ in 0..complete_batches {
result.extend(weighted_choose.choose_multiple(available_count)?);
}
result.extend(weighted_choose.choose_multiple(leftover_items)?);

// When opts.allow_duplication is false, we need to check that the length of the weighted array is greater than
// or equal to min_required_items, otherwise it may cause an infinite loop.
Ok(result)
} else {
// Ensure the available items are sufficient when duplication is not allowed.
ensure!(
weight_array_len >= min_required_items,
available_count >= min_required_items,
error::NoEnoughAvailableNodeSnafu {
required: min_required_items,
available: weight_array_len,
available: available_count,
select_target: SelectTarget::Datanode
}
);
Expand All @@ -64,7 +80,7 @@ mod tests {

use common_meta::peer::Peer;

use crate::selector::common::choose_peers;
use crate::selector::common::choose_items;
use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
use crate::selector::SelectorOptions;

Expand Down Expand Up @@ -115,7 +131,7 @@ mod tests {
};

let selected_peers: HashSet<_> =
choose_peers(&opts, &mut RandomWeightedChoose::new(weight_array.clone()))
choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone()))
.unwrap()
.into_iter()
.collect();
Expand All @@ -129,7 +145,7 @@ mod tests {
};

let selected_result =
choose_peers(&opts, &mut RandomWeightedChoose::new(weight_array.clone()));
choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone()));
assert!(selected_result.is_err());

for i in 1..=50 {
Expand All @@ -139,7 +155,7 @@ mod tests {
};

let selected_peers =
choose_peers(&opts, &mut RandomWeightedChoose::new(weight_array.clone())).unwrap();
choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone())).unwrap();

assert_eq!(i, selected_peers.len());
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/selector/lease_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use common_meta::peer::Peer;
use crate::error::Result;
use crate::lease;
use crate::metasrv::SelectorContext;
use crate::selector::common::choose_peers;
use crate::selector::common::choose_items;
use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
use crate::selector::{Namespace, Selector, SelectorOptions};

Expand Down Expand Up @@ -53,7 +53,7 @@ impl Selector for LeaseBasedSelector {

// 3. choose peers by weight_array.
let mut weighted_choose = RandomWeightedChoose::new(weight_array);
let selected = choose_peers(&opts, &mut weighted_choose)?;
let selected = choose_items(&opts, &mut weighted_choose)?;

Ok(selected)
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/selector/load_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::error::{self, Result};
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::lease;
use crate::metasrv::SelectorContext;
use crate::selector::common::choose_peers;
use crate::selector::common::choose_items;
use crate::selector::weight_compute::{RegionNumsBasedWeightCompute, WeightCompute};
use crate::selector::weighted_choose::RandomWeightedChoose;
use crate::selector::{Namespace, Selector, SelectorOptions};
Expand Down Expand Up @@ -94,7 +94,7 @@ where

// 5. choose peers by weight_array.
let mut weighted_choose = RandomWeightedChoose::new(weight_array);
let selected = choose_peers(&opts, &mut weighted_choose)?;
let selected = choose_items(&opts, &mut weighted_choose)?;

debug!(
"LoadBasedSelector select peers: {:?}, namespace: {}, opts: {:?}.",
Expand Down
5 changes: 5 additions & 0 deletions src/meta-srv/src/selector/weighted_choose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ where
}

fn choose_multiple(&mut self, amount: usize) -> Result<Vec<Item>> {
if amount >= self.items.len() {
// fast path
return Ok(self.items.iter().map(|item| item.item.clone()).collect());
}

Ok(self
.items
.choose_multiple_weighted(&mut thread_rng(), amount, |item| item.weight as f64)
Expand Down

0 comments on commit 1ba9ff2

Please sign in to comment.