Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Credit first peer to return content #1447

Merged
merged 7 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ethportal-peertest/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ fn read_history_content_key_value(
/// Wrapper function for fixtures that directly returns the tuple.
fn read_fixture(file_name: &str) -> (HistoryContentKey, HistoryContentValue) {
read_history_content_key_value(file_name)
.unwrap_or_else(|err| panic!("Error reading fixture: {err}"))
.unwrap_or_else(|err| panic!("Error reading fixture in {file_name}: {err}"))
}

/// History HeaderWithProof content key & value
Expand Down
6 changes: 1 addition & 5 deletions light-client/src/consensus/consensus_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,11 +523,7 @@ impl<R: ConsensusRpc> ConsensusLightClient<R> {
))
})();

if let Ok(is_valid) = res {
is_valid
} else {
false
}
res.unwrap_or_default()
}

fn compute_committee_sign_root(&self, header: Bytes32, slot: u64) -> Result<Node> {
Expand Down
6 changes: 1 addition & 5 deletions light-client/src/consensus/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,7 @@ pub fn is_proof_valid<L: TreeHash>(
Ok(is_valid)
})();

if let Ok(is_valid) = res {
is_valid
} else {
false
}
res.unwrap_or_default()
}

#[derive(SimpleSerialize, Default, Debug)]
Expand Down
85 changes: 51 additions & 34 deletions portalnet/src/find/iterators/findcontent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ struct UtpAndPeerDetails<TNodeId> {
}

#[derive(Debug, Clone)]
pub struct FindContentQuery<TNodeId> {
pub struct FindContentQuery<TNodeId: std::fmt::Display> {
/// The target key we are looking for
target_key: Key<TNodeId>,

Expand All @@ -99,7 +99,7 @@ pub struct FindContentQuery<TNodeId> {

impl<TNodeId> Query<TNodeId> for FindContentQuery<TNodeId>
where
TNodeId: Into<Key<TNodeId>> + Eq + Clone,
TNodeId: Into<Key<TNodeId>> + Eq + Clone + std::fmt::Display,
{
type Response = FindContentQueryResponse<TNodeId>;
type Result = FindContentQueryResult<TNodeId>;
Expand Down Expand Up @@ -189,16 +189,20 @@ where
};
}
FindContentQueryResponse::Content(content) => {
self.content = Some(ContentAndPeer::Content(ContentAndPeerDetails {
content,
peer: peer.clone(),
}));
if self.content.is_none() {
self.content = Some(ContentAndPeer::Content(ContentAndPeerDetails {
content,
peer: peer.clone(),
}));
}
}
FindContentQueryResponse::ConnectionId(connection_id) => {
self.content = Some(ContentAndPeer::Utp(UtpAndPeerDetails {
connection_id: u16::from_be(connection_id),
peer: peer.clone(),
}));
if self.content.is_none() {
self.content = Some(ContentAndPeer::Utp(UtpAndPeerDetails {
connection_id: u16::from_be(connection_id),
peer: peer.clone(),
}));
}
}
}
}
Expand Down Expand Up @@ -361,7 +365,7 @@ where

impl<TNodeId> FindContentQuery<TNodeId>
where
TNodeId: Into<Key<TNodeId>> + Eq + Clone,
TNodeId: Into<Key<TNodeId>> + Eq + Clone + std::fmt::Display,
{
/// Creates a new query with the given configuration.
pub fn with_config<I>(
Expand Down Expand Up @@ -456,8 +460,9 @@ mod tests {
use discv5::enr::NodeId;
use quickcheck::*;
use rand::{thread_rng, Rng};
use std::time::Duration;
use std::{cmp::min, time::Duration};
use test_log::test;
use tracing::debug;

type TestQuery = FindContentQuery<NodeId>;

Expand Down Expand Up @@ -525,42 +530,44 @@ mod tests {
}
}

#[test]
#[test_log::test]
fn termination_and_parallelism() {
fn prop(mut query: TestQuery) {
let now = Instant::now();
let mut rng = thread_rng();

let mut expected = query
let mut remaining = query
.closest_peers
.values()
.map(|e| e.key().clone())
.collect::<Vec<_>>();
let num_known = expected.len();
let num_known = remaining.len();
let max_parallelism = usize::min(query.config.parallelism, num_known);

let target = query.target_key.clone();
let mut remaining;
let mut expected: Vec<_>;
let mut num_failures = 0;

let found_content: Vec<u8> = vec![0xef];
let mut content_peer = None;

'finished: loop {
if expected.is_empty() {
if remaining.is_empty() {
debug!("ending test: no more peers to pull from");
break;
}
// Split off the next up to `parallelism` expected peers.
else if expected.len() < max_parallelism {
remaining = Vec::new();
} else {
remaining = expected.split_off(max_parallelism);
// Split off the next (up to) `parallelism` peers, who we expect to poll.
let num_expected = min(max_parallelism, remaining.len());
expected = remaining.drain(..num_expected).collect();
}

// Advance the query for maximum parallelism.
for k in expected.iter() {
match query.poll(now) {
QueryState::Finished => break 'finished,
QueryState::Finished => {
debug!("Ending test loop: query state is finished");
break 'finished;
}
QueryState::Waiting(Some(p)) => assert_eq!(&p, k.preimage()),
QueryState::Waiting(None) => panic!("Expected another peer."),
QueryState::WaitingAtCapacity => panic!("Unexpectedly reached capacity."),
Expand All @@ -583,7 +590,11 @@ mod tests {
k.preimage(),
FindContentQueryResponse::Content(found_content.clone()),
);
content_peer = Some(k.clone());
// The first peer to return the content should be the one reported at
// the end.
if content_peer.is_none() {
content_peer = Some(k.clone());
}
} else {
let num_closer = rng.gen_range(0..query.config.num_results + 1);
let closer_peers = random_nodes(num_closer).collect::<Vec<_>>();
Expand All @@ -602,8 +613,6 @@ mod tests {

// Re-sort the remaining expected peers for the next "round".
remaining.sort_by_key(|k| target.distance(k));

expected = remaining;
}

// The query must be finished.
Expand All @@ -613,12 +622,16 @@ mod tests {
// Determine if all peers have been contacted by the query. This _must_ be
// the case if the query finished without content and with fewer than the
// requested number of results.
let all_contacted = query.closest_peers.values().all(|e| {
!matches!(
e.state(),
QueryPeerState::NotContacted | QueryPeerState::Waiting { .. }
)
});
let final_peers = query.closest_peers.clone();
let uncontacted: Vec<_> = final_peers
.values()
.filter(|e| {
matches!(
e.state(),
QueryPeerState::NotContacted | QueryPeerState::Waiting { .. }
)
})
.collect();

let target_key = query.target_key.clone();
let num_results = query.config.num_results;
Expand Down Expand Up @@ -652,7 +665,11 @@ mod tests {
// have been failures.
assert!(num_known < num_results || num_failures > 0);
// All peers must have been contacted.
assert!(all_contacted, "Not all peers have been contacted.");
assert_eq!(
uncontacted.len(),
0,
"Not all peers have been contacted: {uncontacted:?}"
);
} else {
assert_eq!(num_results, closest_nodes.len(), "Too many results.");
}
Expand All @@ -661,7 +678,7 @@ mod tests {
}
}

QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
QuickCheck::new().tests(100).quickcheck(prop as fn(_) -> _)
}

#[test]
Expand Down
6 changes: 3 additions & 3 deletions portalnet/src/overlay/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ where
let utp_processing = UtpProcessing::from(&*self);
tokio::spawn(async move {
Self::process_received_content(
content.clone(),
content,
false,
content_key,
callback,
Expand Down Expand Up @@ -971,9 +971,9 @@ where
Ok(Content::ConnectionId(cid_send.to_be()))
}
}
// If we don't have data to send back or can't obtain a permit, send the requester a
// If we can't obtain a permit or don't have data to send back, send the requester a
// list of closer ENRs.
(Ok(Some(_)), _) | (Ok(None), _) => {
(Ok(_), None) | (Ok(None), _) => {
let mut enrs = self.find_nodes_close_to_content(content_key);
enrs.retain(|enr| source != &enr.node_id());
pop_while_ssz_bytes_len_gt(&mut enrs, MAX_PORTAL_CONTENT_PAYLOAD_SIZE);
Expand Down
Loading