diff --git a/src/server/mod.rs b/src/server/mod.rs index 380cb80..181f831 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -12,4 +12,4 @@ pub mod grpc; pub mod http; pub mod p2p; -pub const PROTOCOL_VERSION: u64 = 29; +pub const PROTOCOL_VERSION: u64 = 30; diff --git a/src/server/p2p/messages.rs b/src/server/p2p/messages.rs index c20f5b7..fe8c551 100644 --- a/src/server/p2p/messages.rs +++ b/src/server/p2p/messages.rs @@ -209,6 +209,18 @@ impl CatchUpSyncResponse { } } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct MetaDataRequest { + pub peer_id: String, + pub my_info: PeerInfo, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct MetaDataResponse { + pub peer_id: String, + pub info: PeerInfo, +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct DirectPeerInfoRequest { pub peer_id: String, diff --git a/src/server/p2p/network.rs b/src/server/p2p/network.rs index 3c7195b..2c93dc5 100644 --- a/src/server/p2p/network.rs +++ b/src/server/p2p/network.rs @@ -59,13 +59,7 @@ use tokio::{ }; use super::{ - messages::{ - CatchUpSyncRequest, - CatchUpSyncResponse, - DirectPeerInfoRequest, - DirectPeerInfoResponse, - NotifyNewTipBlock, - }, + messages::{CatchUpSyncRequest, CatchUpSyncResponse, MetaDataRequest, MetaDataResponse, NotifyNewTipBlock}, setup, }; use crate::{ @@ -74,7 +68,14 @@ use crate::{ http::stats_collector::StatsBroadcastClient, p2p::{ client::ServiceClient, - messages::{self, PeerInfo, SyncMissingBlocksRequest, SyncMissingBlocksResponse}, + messages::{ + self, + DirectPeerInfoRequest, + DirectPeerInfoResponse, + PeerInfo, + SyncMissingBlocksRequest, + SyncMissingBlocksResponse, + }, peer_store::{AddPeerStatus, PeerStore}, relay_store::RelayStore, }, @@ -90,6 +91,7 @@ const PEER_INFO_TOPIC: &str = "peer_info"; const BLOCK_NOTIFY_TOPIC: &str = "block_notify"; pub(crate) const SHARE_CHAIN_SYNC_REQ_RESP_PROTOCOL: &str = "/share_chain_sync/5"; pub(crate) const DIRECT_PEER_EXCHANGE_REQ_RESP_PROTOCOL: &str = "/tari_direct_peer_info/5"; +pub(crate) const META_DATA_EXCHANGE_REQ_RESP_PROTOCOL: &str = "/tari_meta_data_info/5"; pub(crate) const CATCH_UP_SYNC_REQUEST_RESPONSE_PROTOCOL: &str = "/catch_up_sync/5"; const LOG_TARGET: &str = "tari::p2pool::server::p2p"; const SYNC_REQUEST_LOG_TARGET: &str = "sync_request"; @@ -106,7 +108,8 @@ const MAX_CATCH_UP_BLOCKS_TO_RETURN: usize = 10; // Time to start up and catch up before we start processing new tip messages const NUM_PEERS_TO_SYNC_PER_ALGO: usize = 32; const NUM_PEERS_INITIAL_SYNC: usize = 100; -const NUM_PEERS_TO_HEIGHT_EXCHANGE: usize = 8; +const NUM_PEERS_TO_META_DATA_EXCHANGE: usize = 8; +const NUM_PEERS_TO_PEER_INFO_EXCHANGE: usize = 8; #[derive(Clone, Debug)] #[allow(clippy::struct_excessive_bools)] @@ -124,7 +127,8 @@ pub(crate) struct Config { pub user_agent: String, pub grey_list_clear_interval: Duration, pub black_list_clear_interval: Duration, - pub chain_height_exchange_interval: Duration, + pub meta_data_exchange_interval: Duration, + pub peer_exchange_interval: Duration, pub is_seed_peer: bool, pub debug_print_chain: bool, pub sync_job_enabled: bool, @@ -151,7 +155,8 @@ impl Default for Config { user_agent: "tari-p2pool".to_string(), grey_list_clear_interval: Duration::from_secs(60 * 15), black_list_clear_interval: Duration::from_secs(60 * 60), - chain_height_exchange_interval: Duration::from_secs(5), + meta_data_exchange_interval: Duration::from_secs(5), + peer_exchange_interval: Duration::from_secs(60 * 60), is_seed_peer: false, debug_print_chain: false, sync_job_enabled: true, @@ -190,6 +195,7 @@ pub struct ServerNetworkBehaviour { pub gossipsub: gossipsub::Behaviour, pub share_chain_sync: cbor::Behaviour>, pub direct_peer_exchange: cbor::Behaviour>, + pub meta_data_exchange: cbor::Behaviour>, pub catch_up_sync: cbor::Behaviour>, pub identify: identify::Behaviour, pub relay_server: Toggle, @@ -794,6 +800,29 @@ where S: ShareChain } } + async fn initiate_meta_data_exchange(&mut self, peer: &PeerId) { + if let Ok(my_info) = self + .create_peer_info(self.swarm.external_addresses().cloned().collect()) + .await + .inspect_err(|error| { + error!(target: LOG_TARGET, "Failed to create peer info: {error:?}"); + }) + { + let local_peer_id = *self.swarm.local_peer_id(); + if peer == &local_peer_id { + return; + } + + self.swarm + .behaviour_mut() + .meta_data_exchange + .send_request(peer, MetaDataRequest { + my_info, + peer_id: local_peer_id.to_base58(), + }); + } + } + async fn handle_direct_peer_exchange_request( &mut self, channel: ResponseChannel>, @@ -873,6 +902,133 @@ where S: ShareChain } } + async fn handle_meta_data_exchange_request( + &mut self, + channel: ResponseChannel>, + request: MetaDataRequest, + ) { + if request.my_info.version != PROTOCOL_VERSION { + // debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", + // request.peer_id); + let _unused = self + .swarm + .behaviour_mut() + .meta_data_exchange + .send_response(channel, Err("Peer has an outdated version".to_string())) + .inspect_err(|e| { + error!(target: LOG_TARGET, "Failed to send peer info response: {e:?}"); + }); + + return; + } + + let source_peer = request.my_info.peer_id; + + info!(target: PEER_INFO_LOGGING_LOG_TARGET, "[META_DATA_REQ] New peer info: {source_peer:?}"); + let local_peer_id = *self.swarm.local_peer_id(); + if let Ok(info) = self + .create_peer_info(self.swarm.external_addresses().cloned().collect()) + .await + .inspect_err(|error| { + error!(target: LOG_TARGET, "Failed to create peer info: {error:?}"); + }) + { + if let Err(e) = self.swarm.behaviour_mut().meta_data_exchange.send_response( + channel, + Ok(MetaDataResponse { + peer_id: local_peer_id.to_base58(), + info, + }), + ) { + error!(target: LOG_TARGET, "Failed to send meta data response to {:?}: {:?}", source_peer, e); + } + } else { + error!(target: LOG_TARGET, "Failed to create peer info"); + } + + match request.peer_id.parse::() { + Ok(peer_id) => { + if self.add_peer(request.my_info, peer_id).await { + // self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id); + } + }, + Err(error) => { + error!(target: LOG_TARGET, "Failed to parse peer id: {error:?}"); + }, + } + } + + async fn handle_meta_data_exchange_response(&mut self, response: MetaDataResponse) { + if response.info.version != PROTOCOL_VERSION { + debug!(target: LOG_TARGET, "Peer {} has an outdated version, skipping", response.peer_id); + return; + } + info!(target: PEER_INFO_LOGGING_LOG_TARGET, "[META_DATA_EXCHANGE_RESP] New peer info: {}", response.peer_id); + match response.peer_id.parse::() { + Ok(peer_id) => { + if response.info.squad != self.squad { + warn!(target: LOG_TARGET, "Peer {} is not in the same squad, skipping", peer_id); + let _ = self.swarm.disconnect_peer_id(peer_id); + return; + } + if self.add_peer(response.info.clone(), peer_id).await { + self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id); + } + // Once we have peer info from the seed peers, disconnect from them. + if self.network_peer_store.read().await.is_seed_peer(&peer_id) { + warn!(target: LOG_TARGET, "Disconnecting from seed peer {}", peer_id); + let _ = self.swarm.disconnect_peer_id(peer_id); + return; + } + + // If they are talking an older version, disconnect + if response.info.version != PROTOCOL_VERSION { + warn!(target: LOG_TARGET, "Peer {} has an outdated version, disconnecting", peer_id); + let _ = self.swarm.disconnect_peer_id(peer_id); + return; + } + // if we are a seed peer, end here + if self.config.is_seed_peer { + return; + } + + let our_tip_sha3x = self.share_chain_sha3x.chain_pow().await; + + if self.config.sha3x_enabled && response.info.current_sha3x_pow > our_tip_sha3x.as_u128() { + let perform_catch_up_sync = PerformCatchUpSync { + algo: PowAlgorithm::Sha3x, + peer: peer_id, + last_block_from_them: None, + their_height: response.info.current_sha3x_height, + // their_pow: response.info.current_sha3x_pow, + permit: None, + }; + let _unused = self + .inner_request_tx + .send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync)); + } + let our_tip_rx = self.share_chain_random_x.chain_pow().await; + + if self.config.randomx_enabled && response.info.current_random_x_pow > our_tip_rx.as_u128() { + let perform_catch_up_sync = PerformCatchUpSync { + algo: PowAlgorithm::RandomX, + peer: peer_id, + last_block_from_them: None, + their_height: response.info.current_random_x_height, + // their_pow: response.info.current_random_x_pow, + permit: None, + }; + let _unused = self + .inner_request_tx + .send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync)); + } + }, + Err(error) => { + error!(target: LOG_TARGET, "Failed to parse peer id: {error:?}"); + }, + } + } + async fn handle_direct_peer_exchange_response(&mut self, response: DirectPeerInfoResponse) { if response.info.version != PROTOCOL_VERSION { debug!(target: LOG_TARGET, "Peer {} has an outdated version, skipping", response.peer_id); @@ -1285,6 +1441,36 @@ where S: ShareChain gossipsub::Event::GossipsubNotSupported { .. } => {}, } }, + ServerNetworkBehaviourEvent::MetaDataExchange(event) => match event { + request_response::Event::Message { peer: _, message } => match message { + request_response::Message::Request { + request_id: _request_id, + request, + channel, + } => { + self.handle_meta_data_exchange_request(channel, request).await; + }, + request_response::Message::Response { + request_id: _request_id, + response, + } => match response { + Ok(response) => { + self.handle_meta_data_exchange_response(response).await; + }, + Err(error) => { + error!(target: LOG_TARGET, "REQ-RES peer info response error: {error:?}"); + }, + }, + }, + request_response::Event::OutboundFailure { peer, error, .. } => { + // Peers can be offline + debug!(target: LOG_TARGET, "REQ-RES meta data outbound failure: {peer:?} -> {error:?}"); + }, + request_response::Event::InboundFailure { peer, error, .. } => { + error!(target: LOG_TARGET, "REQ-RES meta data inbound failure: {peer:?} -> {error:?}"); + }, + request_response::Event::ResponseSent { .. } => {}, + }, ServerNetworkBehaviourEvent::DirectPeerExchange(event) => match event { request_response::Event::Message { peer: _, message } => match message { request_response::Message::Request { @@ -2179,8 +2365,10 @@ where S: ShareChain let mut black_list_clear_interval = tokio::time::interval(self.config.black_list_clear_interval); black_list_clear_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - let mut chain_height_exchange_interval = tokio::time::interval(self.config.chain_height_exchange_interval); - chain_height_exchange_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + let mut meta_data_exchange_interval = tokio::time::interval(self.config.meta_data_exchange_interval); + meta_data_exchange_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + let mut peer_exchange_interval = tokio::time::interval(self.config.peer_exchange_interval); + peer_exchange_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); let mut connection_stats_publish = tokio::time::interval(Duration::from_secs(10)); connection_stats_publish.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -2200,7 +2388,8 @@ where S: ShareChain tokio::pin!(shutdown_signal); tokio::pin!(grey_list_clear_interval); tokio::pin!(black_list_clear_interval); - tokio::pin!(chain_height_exchange_interval); + tokio::pin!(meta_data_exchange_interval); + tokio::pin!(peer_exchange_interval); tokio::pin!(connection_stats_publish); tokio::pin!(seek_connections_interval); @@ -2334,29 +2523,34 @@ where S: ShareChain warn!(target: LOG_TARGET, "Peer info publishing took too long: {:?}", timer.elapsed()); } }, - _ = chain_height_exchange_interval.tick() => { + _ = meta_data_exchange_interval.tick() => { let timer = Instant::now(); if !self.config.is_seed_peer && self.config.sync_job_enabled { let mut connected_peers = self.swarm.connected_peers().copied().collect::>(); let mut rng = thread_rng(); connected_peers.shuffle(&mut rng); - for peer in connected_peers.iter().take(NUM_PEERS_TO_HEIGHT_EXCHANGE) { + for peer in connected_peers.iter().take(NUM_PEERS_TO_META_DATA_EXCHANGE) { // Update their latest tip. - self.initiate_direct_peer_exchange(peer).await; + self.initiate_meta_data_exchange(peer).await; + + } + // self.try_sync_from_best_peer().await; + } + if timer.elapsed() > MAX_ACCEPTABLE_NETWORK_EVENT_TIMEOUT { + warn!(target: LOG_TARGET, "Chain height exchange took too long: {:?}", timer.elapsed()); + } + }, + _ = peer_exchange_interval.tick() => { + let timer = Instant::now(); + if !self.config.is_seed_peer && self.config.sync_job_enabled { + let mut connected_peers = self.swarm.connected_peers().copied().collect::>(); + let mut rng = thread_rng(); + connected_peers.shuffle(&mut rng); + for peer in connected_peers.iter().take(NUM_PEERS_TO_PEER_INFO_EXCHANGE) { + // Update their latest tip. + self.initiate_direct_peer_exchange(peer).await; - // let _ = self.perform_catch_up_sync(PerformCatchUpSync { - // algo: PowAlgorithm::RandomX, - // peer, - // last_block_from_them: None, - // their_height: 0, - // }); - // let _ = self.perform_catch_up_sync(PerformCatchUpSync { - // algo: PowAlgorithm::Sha3x, - // peer, - // last_block_from_them: None, - // their_height: 0, - // }); } // self.try_sync_from_best_peer().await; } diff --git a/src/server/p2p/setup.rs b/src/server/p2p/setup.rs index bc97022..2261cd2 100644 --- a/src/server/p2p/setup.rs +++ b/src/server/p2p/setup.rs @@ -29,17 +29,23 @@ use tokio::{ }; use super::{ - messages::{CatchUpSyncRequest, CatchUpSyncResponse, DirectPeerInfoRequest, DirectPeerInfoResponse}, + messages::{CatchUpSyncRequest, CatchUpSyncResponse, MetaDataRequest, MetaDataResponse}, Config, ServerNetworkBehaviour, CATCH_UP_SYNC_REQUEST_RESPONSE_PROTOCOL, DIRECT_PEER_EXCHANGE_REQ_RESP_PROTOCOL, + META_DATA_EXCHANGE_REQ_RESP_PROTOCOL, SHARE_CHAIN_SYNC_REQ_RESP_PROTOCOL, STABLE_PRIVATE_KEY_FILE, }; use crate::server::{ config, - p2p::messages::{SyncMissingBlocksRequest, SyncMissingBlocksResponse}, + p2p::messages::{ + DirectPeerInfoRequest, + DirectPeerInfoResponse, + SyncMissingBlocksRequest, + SyncMissingBlocksResponse, + }, }; /// Generates or reads libp2p private key if stable_peer is set to true otherwise returns a random key. @@ -158,6 +164,13 @@ pub(crate) async fn new_swarm(config: &config::Config) -> Result>::new( + [( + StreamProtocol::new(META_DATA_EXCHANGE_REQ_RESP_PROTOCOL), + request_response::ProtocolSupport::Full, + )], + request_response::Config::default().with_request_timeout(Duration::from_secs(60)), // 10 is the default + ), catch_up_sync: cbor::Behaviour::>::new( [( StreamProtocol::new(CATCH_UP_SYNC_REQUEST_RESPONSE_PROTOCOL), diff --git a/src/server/server.rs b/src/server/server.rs index 46475bb..252797e 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -58,7 +58,6 @@ where S: ShareChain let are_we_synced_with_randomx_p2pool = Arc::new(AtomicBool::new(false)); let are_we_synced_with_sha3x_p2pool = Arc::new(AtomicBool::new(false)); let stats_client = stats_collector.create_client(); - let mut p2p_service: p2p::Service = p2p::Service::new( &config, share_chain_sha3x.clone(),