Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocols/kad/query/peers/closest: Make at_capacity use max #1548

Merged
merged 5 commits into from
Apr 16, 2020
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 76 additions & 1 deletion protocols/kad/src/query/peers/closest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,9 @@ impl ClosestPeersIter {
/// k closest nodes it has not already queried".
fn at_capacity(&self) -> bool {
match self.state {
State::Stalled => self.num_waiting >= self.config.num_results,
State::Stalled => self.num_waiting >= usize::max(
self.config.num_results, self.config.parallelism
),
State::Iterating { .. } => self.num_waiting >= self.config.parallelism,
State::Finished => true
}
Expand Down Expand Up @@ -691,4 +693,77 @@ mod tests {

QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
}

#[test]
fn stalled_iter_at_capacity_use_max_of_parallelism_and_num_results() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this make for a good property test? The property being the following: "For any ClosestPeersIter, if it is in State::Stalled then it must be at capacity only if num_waiting = max{num_results, parallelism}" ?

let target: KeyBytes = Key::from(PeerId::random()).into();

let mut iter = ClosestPeersIter::with_config(
ClosestPeersIterConfig {
num_results: 4,
parallelism: 8,
..ClosestPeersIterConfig::default()
},
target,
std::iter::empty(),
);
iter.state = State::Stalled;
iter.num_waiting = 7; // Smaller than `parallelism`, but larger than `num_results`.

assert!(!iter.at_capacity());
}

/// When not making progress for `parallelism` time a [`ClosestPeersIter`] becomes
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is likely a bit over the top. I left it here to showcase the issue in a more in-depth scenario. I would suggest removing it before merging. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I don't think it adds much to the other test, especially if the other is turned into a property test. Thanks for writing it though.

/// [`State::Stalled`]. When stalled an iterator is allowed to make more parallel requests up to
/// `num_results`. If `num_results` is smaller than `parallelism` make sure to still allow up to
/// `parallelism` requests in-flight.
#[test]
fn stalled_iter_allows_equal_or_more_than_parallelism_in_flight() {
let now = Instant::now();

let mut num_requests_in_flight = 0;

let parallelism = 8;
let num_results = parallelism / 2;
let target: KeyBytes = Key::from(PeerId::random()).into();

let mut closest_peers = random_peers(100).map(Key::from).collect::<Vec<_>>();
closest_peers.sort_unstable_by(|a, b| {
target.distance(a).cmp(&target.distance(b))
});
let mut pool = closest_peers.split_off(K_VALUE.get());

let mut iter = ClosestPeersIter::with_config(
ClosestPeersIterConfig {
num_results,
parallelism,
..ClosestPeersIterConfig::default()
},
target,
closest_peers.into_iter(),
);

// Have the request for the closest known peer be in-flight for the remainder of the test.
// Thereby the iterator never finishes (it ignores timeouts).
iter.next(now);
num_requests_in_flight += 1;

while matches!(iter.state, State::Iterating{ .. }) {
if let PeersIterState::Waiting(Some(peer)) = iter.next(now) {
let peer = peer.clone().into_owned();
iter.on_success(
&peer,
pool.pop().map(|p| vec![p.preimage().clone()]).unwrap().into_iter(),
);
} else {
panic!("Expected iterator to yield another peer.");
}
}

while matches!(iter.next(now), PeersIterState::Waiting(Some(_))) {
num_requests_in_flight += 1;
}

assert_eq!(usize::max(parallelism, num_results), num_requests_in_flight);
}
}