Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: minor refactor for weighted choose #4917

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 30 additions & 14 deletions src/meta-srv/src/selector/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::metasrv::SelectTarget;
use crate::selector::SelectorOptions;

/// According to the `opts`, choose peers from the `weight_array` through `weighted_choose`.
pub fn choose_peers<W>(opts: &SelectorOptions, weighted_choose: &mut W) -> Result<Vec<Peer>>
pub fn choose_items<W>(opts: &SelectorOptions, weighted_choose: &mut W) -> Result<Vec<Peer>>
where
W: WeightedChoose<Peer>,
{
Expand All @@ -36,20 +36,36 @@ where
}
);

if min_required_items == 1 {
// fast path
return Ok(vec![weighted_choose.choose_one()?]);
}

let available_count = weighted_choose.len();

if opts.allow_duplication {
(0..min_required_items)
.map(|_| weighted_choose.choose_one())
.collect::<Result<_>>()
} else {
let weight_array_len = weighted_choose.len();
// Calculate how many complete rounds of `available_count` items to select,
// plus any additional items needed after complete rounds.
let complete_batches = min_required_items / available_count;
let leftover_items = min_required_items % available_count;
if complete_batches == 0 {
return weighted_choose.choose_multiple(leftover_items);
}

let mut result = Vec::with_capacity(min_required_items);
for _ in 0..complete_batches {
result.extend(weighted_choose.choose_multiple(available_count)?);
}
result.extend(weighted_choose.choose_multiple(leftover_items)?);

// When opts.allow_duplication is false, we need to check that the length of the weighted array is greater than
// or equal to min_required_items, otherwise it may cause an infinite loop.
Ok(result)
} else {
// Ensure the available items are sufficient when duplication is not allowed.
ensure!(
weight_array_len >= min_required_items,
available_count >= min_required_items,
error::NoEnoughAvailableNodeSnafu {
required: min_required_items,
available: weight_array_len,
available: available_count,
select_target: SelectTarget::Datanode
}
);
Expand All @@ -64,7 +80,7 @@ mod tests {

use common_meta::peer::Peer;

use crate::selector::common::choose_peers;
use crate::selector::common::choose_items;
use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
use crate::selector::SelectorOptions;

Expand Down Expand Up @@ -115,7 +131,7 @@ mod tests {
};

let selected_peers: HashSet<_> =
choose_peers(&opts, &mut RandomWeightedChoose::new(weight_array.clone()))
choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone()))
.unwrap()
.into_iter()
.collect();
Expand All @@ -129,7 +145,7 @@ mod tests {
};

let selected_result =
choose_peers(&opts, &mut RandomWeightedChoose::new(weight_array.clone()));
choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone()));
assert!(selected_result.is_err());

for i in 1..=50 {
Expand All @@ -139,7 +155,7 @@ mod tests {
};

let selected_peers =
choose_peers(&opts, &mut RandomWeightedChoose::new(weight_array.clone())).unwrap();
choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone())).unwrap();

assert_eq!(i, selected_peers.len());
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/selector/lease_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use common_meta::peer::Peer;
use crate::error::Result;
use crate::lease;
use crate::metasrv::SelectorContext;
use crate::selector::common::choose_peers;
use crate::selector::common::choose_items;
use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
use crate::selector::{Namespace, Selector, SelectorOptions};

Expand Down Expand Up @@ -53,7 +53,7 @@ impl Selector for LeaseBasedSelector {

// 3. choose peers by weight_array.
let mut weighted_choose = RandomWeightedChoose::new(weight_array);
let selected = choose_peers(&opts, &mut weighted_choose)?;
let selected = choose_items(&opts, &mut weighted_choose)?;

Ok(selected)
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/selector/load_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::error::{self, Result};
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::lease;
use crate::metasrv::SelectorContext;
use crate::selector::common::choose_peers;
use crate::selector::common::choose_items;
use crate::selector::weight_compute::{RegionNumsBasedWeightCompute, WeightCompute};
use crate::selector::weighted_choose::RandomWeightedChoose;
use crate::selector::{Namespace, Selector, SelectorOptions};
Expand Down Expand Up @@ -94,7 +94,7 @@ where

// 5. choose peers by weight_array.
let mut weighted_choose = RandomWeightedChoose::new(weight_array);
let selected = choose_peers(&opts, &mut weighted_choose)?;
let selected = choose_items(&opts, &mut weighted_choose)?;

debug!(
"LoadBasedSelector select peers: {:?}, namespace: {}, opts: {:?}.",
Expand Down