Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: split metadata and peer exchange #234

Merged
merged 3 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ pub mod grpc;
pub mod http;
pub mod p2p;

pub const PROTOCOL_VERSION: u64 = 29;
pub const PROTOCOL_VERSION: u64 = 30;
12 changes: 12 additions & 0 deletions src/server/p2p/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,18 @@ impl CatchUpSyncResponse {
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct MetaDataRequest {
pub peer_id: String,
pub my_info: PeerInfo,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct MetaDataResponse {
pub peer_id: String,
pub info: PeerInfo,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DirectPeerInfoRequest {
pub peer_id: String,
Expand Down
252 changes: 223 additions & 29 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,7 @@ use tokio::{
};

use super::{
messages::{
CatchUpSyncRequest,
CatchUpSyncResponse,
DirectPeerInfoRequest,
DirectPeerInfoResponse,
NotifyNewTipBlock,
},
messages::{CatchUpSyncRequest, CatchUpSyncResponse, MetaDataRequest, MetaDataResponse, NotifyNewTipBlock},
setup,
};
use crate::{
Expand All @@ -74,7 +68,14 @@ use crate::{
http::stats_collector::StatsBroadcastClient,
p2p::{
client::ServiceClient,
messages::{self, PeerInfo, SyncMissingBlocksRequest, SyncMissingBlocksResponse},
messages::{
self,
DirectPeerInfoRequest,
DirectPeerInfoResponse,
PeerInfo,
SyncMissingBlocksRequest,
SyncMissingBlocksResponse,
},
peer_store::{AddPeerStatus, PeerStore},
relay_store::RelayStore,
},
Expand All @@ -90,6 +91,7 @@ const PEER_INFO_TOPIC: &str = "peer_info";
const BLOCK_NOTIFY_TOPIC: &str = "block_notify";
pub(crate) const SHARE_CHAIN_SYNC_REQ_RESP_PROTOCOL: &str = "/share_chain_sync/5";
pub(crate) const DIRECT_PEER_EXCHANGE_REQ_RESP_PROTOCOL: &str = "/tari_direct_peer_info/5";
pub(crate) const META_DATA_EXCHANGE_REQ_RESP_PROTOCOL: &str = "/tari_meta_data_info/5";
pub(crate) const CATCH_UP_SYNC_REQUEST_RESPONSE_PROTOCOL: &str = "/catch_up_sync/5";
const LOG_TARGET: &str = "tari::p2pool::server::p2p";
const SYNC_REQUEST_LOG_TARGET: &str = "sync_request";
Expand All @@ -106,7 +108,8 @@ const MAX_CATCH_UP_BLOCKS_TO_RETURN: usize = 10;
// Time to start up and catch up before we start processing new tip messages
const NUM_PEERS_TO_SYNC_PER_ALGO: usize = 32;
const NUM_PEERS_INITIAL_SYNC: usize = 100;
const NUM_PEERS_TO_HEIGHT_EXCHANGE: usize = 8;
const NUM_PEERS_TO_META_DATA_EXCHANGE: usize = 8;
const NUM_PEERS_TO_PEER_INFO_EXCHANGE: usize = 8;

#[derive(Clone, Debug)]
#[allow(clippy::struct_excessive_bools)]
Expand All @@ -124,7 +127,8 @@ pub(crate) struct Config {
pub user_agent: String,
pub grey_list_clear_interval: Duration,
pub black_list_clear_interval: Duration,
pub chain_height_exchange_interval: Duration,
pub meta_data_exchange_interval: Duration,
pub peer_exchange_interval: Duration,
pub is_seed_peer: bool,
pub debug_print_chain: bool,
pub sync_job_enabled: bool,
Expand All @@ -151,7 +155,8 @@ impl Default for Config {
user_agent: "tari-p2pool".to_string(),
grey_list_clear_interval: Duration::from_secs(60 * 15),
black_list_clear_interval: Duration::from_secs(60 * 60),
chain_height_exchange_interval: Duration::from_secs(5),
meta_data_exchange_interval: Duration::from_secs(5),
peer_exchange_interval: Duration::from_secs(60 * 60),
is_seed_peer: false,
debug_print_chain: false,
sync_job_enabled: true,
Expand Down Expand Up @@ -190,6 +195,7 @@ pub struct ServerNetworkBehaviour {
pub gossipsub: gossipsub::Behaviour,
pub share_chain_sync: cbor::Behaviour<SyncMissingBlocksRequest, Result<SyncMissingBlocksResponse, String>>,
pub direct_peer_exchange: cbor::Behaviour<DirectPeerInfoRequest, Result<DirectPeerInfoResponse, String>>,
pub meta_data_exchange: cbor::Behaviour<MetaDataRequest, Result<MetaDataResponse, String>>,
pub catch_up_sync: cbor::Behaviour<CatchUpSyncRequest, Result<CatchUpSyncResponse, String>>,
pub identify: identify::Behaviour,
pub relay_server: Toggle<relay::Behaviour>,
Expand Down Expand Up @@ -794,6 +800,29 @@ where S: ShareChain
}
}

async fn initiate_meta_data_exchange(&mut self, peer: &PeerId) {
if let Ok(my_info) = self
.create_peer_info(self.swarm.external_addresses().cloned().collect())
.await
.inspect_err(|error| {
error!(target: LOG_TARGET, "Failed to create peer info: {error:?}");
})
{
let local_peer_id = *self.swarm.local_peer_id();
if peer == &local_peer_id {
return;
}

self.swarm
.behaviour_mut()
.meta_data_exchange
.send_request(peer, MetaDataRequest {
my_info,
peer_id: local_peer_id.to_base58(),
});
}
}

async fn handle_direct_peer_exchange_request(
&mut self,
channel: ResponseChannel<Result<DirectPeerInfoResponse, String>>,
Expand Down Expand Up @@ -873,6 +902,133 @@ where S: ShareChain
}
}

async fn handle_meta_data_exchange_request(
&mut self,
channel: ResponseChannel<Result<MetaDataResponse, String>>,
request: MetaDataRequest,
) {
if request.my_info.version != PROTOCOL_VERSION {
// debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping",
// request.peer_id);
let _unused = self
.swarm
.behaviour_mut()
.meta_data_exchange
.send_response(channel, Err("Peer has an outdated version".to_string()))
.inspect_err(|e| {
error!(target: LOG_TARGET, "Failed to send peer info response: {e:?}");
});

return;
}

let source_peer = request.my_info.peer_id;

info!(target: PEER_INFO_LOGGING_LOG_TARGET, "[META_DATA_REQ] New peer info: {source_peer:?}");
let local_peer_id = *self.swarm.local_peer_id();
if let Ok(info) = self
.create_peer_info(self.swarm.external_addresses().cloned().collect())
.await
.inspect_err(|error| {
error!(target: LOG_TARGET, "Failed to create peer info: {error:?}");
})
{
if let Err(e) = self.swarm.behaviour_mut().meta_data_exchange.send_response(
channel,
Ok(MetaDataResponse {
peer_id: local_peer_id.to_base58(),
info,
}),
) {
error!(target: LOG_TARGET, "Failed to send meta data response to {:?}: {:?}", source_peer, e);
}
} else {
error!(target: LOG_TARGET, "Failed to create peer info");
}

match request.peer_id.parse::<PeerId>() {
Ok(peer_id) => {
if self.add_peer(request.my_info, peer_id).await {
// self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
},
Err(error) => {
error!(target: LOG_TARGET, "Failed to parse peer id: {error:?}");
},
}
}

async fn handle_meta_data_exchange_response(&mut self, response: MetaDataResponse) {
if response.info.version != PROTOCOL_VERSION {
debug!(target: LOG_TARGET, "Peer {} has an outdated version, skipping", response.peer_id);
return;
}
info!(target: PEER_INFO_LOGGING_LOG_TARGET, "[META_DATA_EXCHANGE_RESP] New peer info: {}", response.peer_id);
match response.peer_id.parse::<PeerId>() {
Ok(peer_id) => {
if response.info.squad != self.squad {
warn!(target: LOG_TARGET, "Peer {} is not in the same squad, skipping", peer_id);
let _ = self.swarm.disconnect_peer_id(peer_id);
return;
}
if self.add_peer(response.info.clone(), peer_id).await {
self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
// Once we have peer info from the seed peers, disconnect from them.
if self.network_peer_store.read().await.is_seed_peer(&peer_id) {
warn!(target: LOG_TARGET, "Disconnecting from seed peer {}", peer_id);
let _ = self.swarm.disconnect_peer_id(peer_id);
return;
}

// If they are talking an older version, disconnect
if response.info.version != PROTOCOL_VERSION {
warn!(target: LOG_TARGET, "Peer {} has an outdated version, disconnecting", peer_id);
let _ = self.swarm.disconnect_peer_id(peer_id);
return;
}
// if we are a seed peer, end here
if self.config.is_seed_peer {
return;
}

let our_tip_sha3x = self.share_chain_sha3x.chain_pow().await;

if self.config.sha3x_enabled && response.info.current_sha3x_pow > our_tip_sha3x.as_u128() {
let perform_catch_up_sync = PerformCatchUpSync {
algo: PowAlgorithm::Sha3x,
peer: peer_id,
last_block_from_them: None,
their_height: response.info.current_sha3x_height,
// their_pow: response.info.current_sha3x_pow,
permit: None,
};
let _unused = self
.inner_request_tx
.send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync));
}
let our_tip_rx = self.share_chain_random_x.chain_pow().await;

if self.config.randomx_enabled && response.info.current_random_x_pow > our_tip_rx.as_u128() {
let perform_catch_up_sync = PerformCatchUpSync {
algo: PowAlgorithm::RandomX,
peer: peer_id,
last_block_from_them: None,
their_height: response.info.current_random_x_height,
// their_pow: response.info.current_random_x_pow,
permit: None,
};
let _unused = self
.inner_request_tx
.send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync));
}
},
Err(error) => {
error!(target: LOG_TARGET, "Failed to parse peer id: {error:?}");
},
}
}

async fn handle_direct_peer_exchange_response(&mut self, response: DirectPeerInfoResponse) {
if response.info.version != PROTOCOL_VERSION {
debug!(target: LOG_TARGET, "Peer {} has an outdated version, skipping", response.peer_id);
Expand Down Expand Up @@ -1285,6 +1441,36 @@ where S: ShareChain
gossipsub::Event::GossipsubNotSupported { .. } => {},
}
},
ServerNetworkBehaviourEvent::MetaDataExchange(event) => match event {
request_response::Event::Message { peer: _, message } => match message {
request_response::Message::Request {
request_id: _request_id,
request,
channel,
} => {
self.handle_meta_data_exchange_request(channel, request).await;
},
request_response::Message::Response {
request_id: _request_id,
response,
} => match response {
Ok(response) => {
self.handle_meta_data_exchange_response(response).await;
},
Err(error) => {
error!(target: LOG_TARGET, "REQ-RES peer info response error: {error:?}");
},
},
},
request_response::Event::OutboundFailure { peer, error, .. } => {
// Peers can be offline
debug!(target: LOG_TARGET, "REQ-RES meta data outbound failure: {peer:?} -> {error:?}");
},
request_response::Event::InboundFailure { peer, error, .. } => {
error!(target: LOG_TARGET, "REQ-RES meta data inbound failure: {peer:?} -> {error:?}");
},
request_response::Event::ResponseSent { .. } => {},
},
ServerNetworkBehaviourEvent::DirectPeerExchange(event) => match event {
request_response::Event::Message { peer: _, message } => match message {
request_response::Message::Request {
Expand Down Expand Up @@ -2179,8 +2365,10 @@ where S: ShareChain

let mut black_list_clear_interval = tokio::time::interval(self.config.black_list_clear_interval);
black_list_clear_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut chain_height_exchange_interval = tokio::time::interval(self.config.chain_height_exchange_interval);
chain_height_exchange_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut meta_data_exchange_interval = tokio::time::interval(self.config.meta_data_exchange_interval);
meta_data_exchange_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut peer_exchange_interval = tokio::time::interval(self.config.peer_exchange_interval);
peer_exchange_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

let mut connection_stats_publish = tokio::time::interval(Duration::from_secs(10));
connection_stats_publish.set_missed_tick_behavior(MissedTickBehavior::Skip);
Expand All @@ -2200,7 +2388,8 @@ where S: ShareChain
tokio::pin!(shutdown_signal);
tokio::pin!(grey_list_clear_interval);
tokio::pin!(black_list_clear_interval);
tokio::pin!(chain_height_exchange_interval);
tokio::pin!(meta_data_exchange_interval);
tokio::pin!(peer_exchange_interval);
tokio::pin!(connection_stats_publish);
tokio::pin!(seek_connections_interval);

Expand Down Expand Up @@ -2334,29 +2523,34 @@ where S: ShareChain
warn!(target: LOG_TARGET, "Peer info publishing took too long: {:?}", timer.elapsed());
}
},
_ = chain_height_exchange_interval.tick() => {
_ = meta_data_exchange_interval.tick() => {
let timer = Instant::now();
if !self.config.is_seed_peer && self.config.sync_job_enabled {
let mut connected_peers = self.swarm.connected_peers().copied().collect::<Vec::<_>>();
let mut rng = thread_rng();
connected_peers.shuffle(&mut rng);
for peer in connected_peers.iter().take(NUM_PEERS_TO_HEIGHT_EXCHANGE) {
for peer in connected_peers.iter().take(NUM_PEERS_TO_META_DATA_EXCHANGE) {
// Update their latest tip.
self.initiate_direct_peer_exchange(peer).await;
self.initiate_meta_data_exchange(peer).await;

}
// self.try_sync_from_best_peer().await;
}
if timer.elapsed() > MAX_ACCEPTABLE_NETWORK_EVENT_TIMEOUT {
warn!(target: LOG_TARGET, "Chain height exchange took too long: {:?}", timer.elapsed());
}
},

_ = peer_exchange_interval.tick() => {
let timer = Instant::now();
if !self.config.is_seed_peer && self.config.sync_job_enabled {
let mut connected_peers = self.swarm.connected_peers().copied().collect::<Vec::<_>>();
let mut rng = thread_rng();
connected_peers.shuffle(&mut rng);
for peer in connected_peers.iter().take(NUM_PEERS_TO_PEER_INFO_EXCHANGE) {
// Update their latest tip.
self.initiate_direct_peer_exchange(peer).await;

// let _ = self.perform_catch_up_sync(PerformCatchUpSync {
// algo: PowAlgorithm::RandomX,
// peer,
// last_block_from_them: None,
// their_height: 0,
// });
// let _ = self.perform_catch_up_sync(PerformCatchUpSync {
// algo: PowAlgorithm::Sha3x,
// peer,
// last_block_from_them: None,
// their_height: 0,
// });
}
// self.try_sync_from_best_peer().await;
}
Expand Down
Loading
Loading