Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix peer store random fetch #4289

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion network/src/peer_store/addr_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,15 @@ impl AddrManager {
let addr_info: AddrInfo = self.id_to_info[&self.random_ids[i]].to_owned();
if let Some(socket_addr) = multiaddr_to_socketaddr(&addr_info.addr) {
let ip = socket_addr.ip();
let is_unique_ip = duplicate_ips.insert(ip);
let is_unique_ip = !duplicate_ips.contains(&ip);
// A trick to make our tests work
// TODO remove this after fix the network tests.
let is_test_ip = ip.is_unspecified() || ip.is_loopback();
if (is_test_ip || is_unique_ip)
&& addr_info.is_connectable(now_ms)
&& filter(&addr_info)
{
duplicate_ips.insert(ip);
addr_infos.push(addr_info);
}
if addr_infos.len() == count {
Expand Down
8 changes: 2 additions & 6 deletions network/src/peer_store/peer_store_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,15 @@ impl PeerStore {
/// Return valid addrs that success connected, used for discovery.
pub fn fetch_random_addrs(&mut self, count: usize, required_flags: Flags) -> Vec<AddrInfo> {
// Get info:
// 1. Already connected or Connected within 7 days
// 1. Connected within 7 days

let now_ms = ckb_systemtime::unix_time_as_millis();
let addr_expired_ms = now_ms.saturating_sub(ADDR_TIMEOUT_MS);
let peers = &self.connected_peers;
// get success connected addrs.
self.addr_manager
.fetch_random(count, |peer_addr: &AddrInfo| {
required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags))
&& (extract_peer_id(&peer_addr.addr)
.map(|peer_id| peers.contains_key(&peer_id))
.unwrap_or_default()
|| peer_addr.connected(|t| t > addr_expired_ms))
&& peer_addr.connected(|t| t > addr_expired_ms)
})
}

Expand Down
2 changes: 1 addition & 1 deletion network/src/protocols/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use p2p::{
use rand::seq::SliceRandom;

pub use self::{
addr::{AddrKnown, AddressManager, MisbehaveResult, Misbehavior},
addr::{AddressManager, MisbehaveResult, Misbehavior},
protocol::{DiscoveryMessage, Node, Nodes},
state::SessionState,
};
Expand Down
11 changes: 7 additions & 4 deletions network/src/protocols/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,11 +479,14 @@ fn test_discovery_behavior() {

wait_connect_state(&node2, 2);

wait_discovery(&node3, |num| num >= 2);
wait_discovery(&node3, |num| num >= 3);

// use node1 instead of node3 because ANNOUNCE_INTERVAL is 24 hour
// node2 can't announce node1 address to node3 within 24 hours
// but the reverse can
let addrs = {
let listen_addr = &node3.listen_addr;
let mut locked = node3.network_state.peer_store.lock();
let listen_addr = &node1.listen_addr;
let mut locked = node1.network_state.peer_store.lock();

locked
.fetch_addrs_to_feeler(6)
Expand All @@ -508,7 +511,7 @@ fn test_discovery_behavior() {
};

for addr in addrs {
node3.dial_addr(
node1.dial_addr(
addr,
TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
);
Expand Down
53 changes: 47 additions & 6 deletions network/src/tests/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ fn test_add_connected_peer() {
0
);
peer_store.add_connected_peer(addr.clone(), SessionType::Outbound);
peer_store.add_addr(addr, Flags::COMPATIBILITY).unwrap();
peer_store.add_outbound_addr(addr, Flags::COMPATIBILITY);
assert_eq!(
peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(),
1
Expand Down Expand Up @@ -314,14 +314,14 @@ fn test_fetch_random_addrs() {
.is_empty());
// get peer addr from outbound
peer_store.add_connected_peer(addr1.clone(), SessionType::Outbound);
peer_store.add_addr(addr1, Flags::COMPATIBILITY).unwrap();
peer_store.add_outbound_addr(addr1, Flags::COMPATIBILITY);
assert_eq!(
peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(),
1
);
// get peer addrs by limit
peer_store.add_connected_peer(addr2.clone(), SessionType::Outbound);
peer_store.add_addr(addr2, Flags::COMPATIBILITY).unwrap();
peer_store.add_outbound_addr(addr2, Flags::COMPATIBILITY);
assert_eq!(
peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(),
2
Expand All @@ -345,7 +345,7 @@ fn test_fetch_random_addrs() {
.mark_connected(0);
assert_eq!(
peer_store.fetch_random_addrs(3, Flags::COMPATIBILITY).len(),
3
2
);
peer_store.remove_disconnected_peer(&addr3);
assert_eq!(
Expand Down Expand Up @@ -424,12 +424,53 @@ fn test_get_random_restrict_addrs_from_same_ip() {
.unwrap();
peer_store.add_connected_peer(addr1.clone(), SessionType::Outbound);
peer_store.add_connected_peer(addr2.clone(), SessionType::Outbound);
peer_store.add_addr(addr1, Flags::COMPATIBILITY).unwrap();
peer_store.add_addr(addr2, Flags::COMPATIBILITY).unwrap();
peer_store.add_outbound_addr(addr1, Flags::COMPATIBILITY);
peer_store.add_outbound_addr(addr2, Flags::COMPATIBILITY);
assert_eq!(
peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(),
1
);
}

#[test]
fn test_get_random_with_connected_peer_and_same_peerid() {
let mut peer_store: PeerStore = Default::default();

let peer_id = PeerId::random().to_base58();
let addr1: Multiaddr = format!("/ip4/225.0.0.1/tcp/1867/p2p/{}", peer_id)
.parse()
.unwrap();
let addr2: Multiaddr = format!("/ip4/225.0.0.2/tcp/43/p2p/{}", peer_id)
.parse()
.unwrap();

peer_store
.add_addr(addr1.clone(), Flags::COMPATIBILITY)
.unwrap();
peer_store.add_outbound_addr(addr2, Flags::COMPATIBILITY);

// Node information that has not been connected must not be selected.
assert_eq!(
peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(),
1
);

// add remains connected node info
peer_store.add_connected_peer(addr1.clone(), SessionType::Outbound);

// Even if the node remains connected, node's info without connection information cannot be selected.
assert_eq!(
peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(),
1
);

peer_store.update_outbound_addr_last_connected_ms(addr1);

// Set connected info to address, it can be selected
assert_eq!(
peer_store.fetch_random_addrs(2, Flags::COMPATIBILITY).len(),
2
);
}

#[test]
Expand Down
3 changes: 2 additions & 1 deletion util/dao/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@ pub fn pack_dao_data(ar: u64, c: Capacity, s: Capacity, u: Capacity) -> Byte32 {
Byte32::from_slice(&buf).expect("impossible: fail to read array")
}

#[cfg(test)]
mod tests {
pub use super::{extract_dao_data, pack_dao_data};
pub use ckb_types::core::Capacity;
pub use ckb_types::h256;
pub use ckb_types::packed::Byte32;
pub use ckb_types::prelude::Pack;
pub use ckb_types::{h256, H256};

#[test]
#[allow(clippy::unreadable_literal)]
Expand Down
Loading