Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2p protocol versioning support #1182

Merged
merged 6 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions dns_server/src/crawler_p2p/crawler/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use p2p::{
config::NodeType,
error::{DialError, P2pError},
net::types::PeerInfo,
protocol::NETWORK_PROTOCOL_CURRENT,
testing_utils::TEST_PROTOCOL_VERSION,
types::{peer_id::PeerId, socket_address::SocketAddress},
};
use rstest::rstest;
Expand Down Expand Up @@ -60,7 +60,7 @@ fn basic(#[case] seed: Seed) {
address: node1,
peer_info: PeerInfo {
peer_id: peer1,
protocol_version: NETWORK_PROTOCOL_CURRENT,
protocol_version: TEST_PROTOCOL_VERSION,
network: *chain_config.magic_bytes(),
software_version: *chain_config.software_version(),
user_agent: mintlayer_core_user_agent(),
Expand Down Expand Up @@ -92,7 +92,7 @@ fn basic(#[case] seed: Seed) {
address: node2,
peer_info: PeerInfo {
peer_id: peer2,
protocol_version: NETWORK_PROTOCOL_CURRENT,
protocol_version: TEST_PROTOCOL_VERSION,
network: *chain_config.magic_bytes(),
software_version: *chain_config.software_version(),
user_agent: mintlayer_core_user_agent(),
Expand Down Expand Up @@ -172,7 +172,7 @@ fn randomized(#[case] seed: Seed) {
address,
peer_info: PeerInfo {
peer_id: PeerId::new(),
protocol_version: NETWORK_PROTOCOL_CURRENT,
protocol_version: TEST_PROTOCOL_VERSION,
network: *chain_config.magic_bytes(),
software_version: *chain_config.software_version(),
user_agent: mintlayer_core_user_agent(),
Expand All @@ -191,7 +191,7 @@ fn randomized(#[case] seed: Seed) {
address,
peer_info: PeerInfo {
peer_id: PeerId::new(),
protocol_version: NETWORK_PROTOCOL_CURRENT,
protocol_version: TEST_PROTOCOL_VERSION,
network: [255, 255, 255, 255],
software_version: *chain_config.software_version(),
user_agent: mintlayer_core_user_agent(),
Expand Down Expand Up @@ -242,7 +242,7 @@ fn incompatible_node(#[case] seed: Seed) {
address: node1,
peer_info: PeerInfo {
peer_id: peer1,
protocol_version: NETWORK_PROTOCOL_CURRENT,
protocol_version: TEST_PROTOCOL_VERSION,
network: [255, 255, 255, 255],
software_version: *chain_config.software_version(),
user_agent: mintlayer_core_user_agent(),
Expand Down
3 changes: 2 additions & 1 deletion dns_server/src/crawler_p2p/crawler_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ where
} => {
unreachable!("unexpected inbound connection");
}
ConnectivityEvent::ConnectionError { address, error } => {
ConnectivityEvent::ConnectionError { address, error }
| ConnectivityEvent::HandshakeFailed { address, error } => {
self.send_crawler_event(CrawlerEvent::ConnectionError { address, error });
}
ConnectivityEvent::ConnectionClosed { peer_id } => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use p2p::{
types::{ConnectivityEvent, PeerInfo, SyncingEvent},
ConnectivityService, NetworkingService, SyncingEventReceiver,
},
protocol::NETWORK_PROTOCOL_CURRENT,
testing_utils::TEST_PROTOCOL_VERSION,
types::{
ip_or_socket_address::IpOrSocketAddress, peer_id::PeerId, services::Services,
socket_address::SocketAddress,
Expand Down Expand Up @@ -148,7 +148,7 @@ impl ConnectivityService<MockNetworkingService> for MockConnectivityHandle {
let peer_id = PeerId::new();
let peer_info = PeerInfo {
peer_id,
protocol_version: NETWORK_PROTOCOL_CURRENT,
protocol_version: TEST_PROTOCOL_VERSION,
network: *node.chain_config.magic_bytes(),
software_version: SemVer::new(1, 2, 3),
user_agent: mintlayer_core_user_agent(),
Expand Down
3 changes: 3 additions & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ utils = { path = "../utils/" }

async-trait.workspace = true
bytes.workspace = true
enum-iterator.workspace = true
futures.workspace = true
hex.workspace = true
itertools.workspace = true
jsonrpsee = { workspace = true, features = ["macros"] }
num-derive.workspace = true
num-traits.workspace = true
once_cell.workspace = true
parity-scale-codec.workspace = true
serde.workspace = true
Expand Down
1 change: 1 addition & 0 deletions p2p/backend-test-suite/src/ban.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ where
SyncingEvent::Connected {
peer_id,
common_services: _,
protocol_version: _,
sync_msg_rx,
} => (peer_id, sync_msg_rx),
e => panic!("Unexpected event type: {e:?}"),
Expand Down
2 changes: 2 additions & 0 deletions p2p/backend-test-suite/src/block_announcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ where
SyncingEvent::Connected {
peer_id: _,
common_services: _,
protocol_version: _,
sync_msg_rx,
} => sync_msg_rx,
event => panic!("Unexpected event: {event:?}"),
Expand Down Expand Up @@ -134,6 +135,7 @@ where
SyncingEvent::Connected {
peer_id: _,
common_services: _,
protocol_version: _,
sync_msg_rx,
} => sync_msg_rx,
event => panic!("Unexpected event: {event:?}"),
Expand Down
14 changes: 7 additions & 7 deletions p2p/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ use common::{
};
use mempool::error::{Error as MempoolError, MempoolBanScore};

use crate::protocol::NetworkProtocolVersion;
use crate::protocol::ProtocolVersion;

/// Errors related to invalid data/peer information that results in connection getting closed
/// and the peer getting banned.
#[derive(Error, Debug, PartialEq, Eq)]
#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum ProtocolError {
#[error("Peer has an unsupported network protocol: {0:?}")]
UnsupportedProtocol(NetworkProtocolVersion),
UnsupportedProtocol(ProtocolVersion),
#[error("Peer is in different network. Our network {0:?}, their network {1:?}")]
DifferentNetwork([u8; 4], [u8; 4]),
#[error("Peer is unresponsive")]
Expand Down Expand Up @@ -64,7 +64,7 @@ pub enum ProtocolError {
}

/// Peer state errors (Errors either for an individual peer or for the [`PeerManager`](crate::peer_manager::PeerManager))
#[derive(Error, Debug, PartialEq, Eq)]
#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum PeerError {
#[error("Peer doesn't exist")]
PeerDoesntExist,
Expand All @@ -90,7 +90,7 @@ pub enum PeerError {
}

/// Errors related to establishing a connection with a remote peer
#[derive(Error, Debug, PartialEq, Eq)]
#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum DialError {
#[error("Tried to dial self")]
AttemptToDialSelf,
Expand All @@ -105,7 +105,7 @@ pub enum DialError {
}

/// Conversion errors
#[derive(Error, Debug, PartialEq, Eq)]
#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum ConversionError {
#[error("Invalid address: {0}")]
InvalidAddress(String),
Expand All @@ -121,7 +121,7 @@ pub enum MessageCodecError {
InvalidEncodedData(serialization::Error),
}

#[derive(Error, Debug, PartialEq, Eq)]
#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum P2pError {
#[error("Protocol violation: {0}")]
ProtocolError(ProtocolError),
Expand Down
17 changes: 10 additions & 7 deletions p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,11 @@ pub mod testing_utils;
pub mod utils;

mod peer_manager_event;
#[cfg(test)]
mod tests;

pub use p2p_types as types;
use types::socket_address::SocketAddress;

pub use crate::{
peer_manager_event::PeerManagerEvent,
types::p2p_event::{P2pEvent, P2pEventHandler},
};

use std::{
marker::PhantomData,
net::{Ipv4Addr, Ipv6Addr, SocketAddr},
Expand Down Expand Up @@ -75,12 +71,19 @@ use crate::{
},
};

pub use p2p_types as types;

pub use crate::{
peer_manager_event::PeerManagerEvent,
types::p2p_event::{P2pEvent, P2pEventHandler},
};

/// Result type with P2P errors
pub type Result<T> = core::result::Result<T, P2pError>;

struct P2p<T: NetworkingService> {
/// A sender for the peer manager events.
pub tx_peer_manager: mpsc::UnboundedSender<PeerManagerEvent>,
tx_peer_manager: mpsc::UnboundedSender<PeerManagerEvent>,
mempool_handle: MempoolHandle,

backend_shutdown_sender: oneshot::Sender<()>,
Expand Down
42 changes: 39 additions & 3 deletions p2p/src/net/default_backend/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,18 @@ use crate::{
default_backend::{
peer,
transport::{TransportListener, TransportSocket},
types::{BackendEvent, Command, Message, PeerEvent},
types::{BackendEvent, Command, PeerEvent},
},
types::{services::Services, ConnectivityEvent, PeerInfo, SyncingEvent},
},
protocol::{ProtocolVersion, SupportedProtocolVersion},
types::{peer_address::PeerAddress, peer_id::PeerId},
P2pEvent, P2pEventHandler,
};

use super::{
peer::ConnectionInfo,
types::{HandshakeNonce, P2pTimestamp},
types::{HandshakeNonce, Message, P2pTimestamp},
};

/// Buffer size of the channel to the SyncManager peer task.
Expand All @@ -75,6 +76,8 @@ struct PeerContext {

inbound: bool,

protocol_version: SupportedProtocolVersion,

user_agent: UserAgent,

software_version: SemVer,
Expand Down Expand Up @@ -134,14 +137,19 @@ pub struct Backend<T: TransportSocket> {
syncing_event_tx: mpsc::UnboundedSender<SyncingEvent>,

/// List of incoming commands to the backend; we put them in a queue
/// to make receiving commands can run concurrently with other backend operations
/// to make sure receiving commands can run concurrently with other backend operations
command_queue: FuturesUnordered<BackendTask<T>>,

shutdown: Arc<SeqCstAtomicBool>,
shutdown_receiver: oneshot::Receiver<()>,

events_controller: EventsController<P2pEvent>,
subscribers_receiver: mpsc::UnboundedReceiver<P2pEventHandler>,

/// The protocol version that this node is running. Normally this will be
/// equal to default_networking_service::PREFERRED_PROTOCOL_VERSION, but it can be
/// overridden for testing purposes.
node_protocol_version: ProtocolVersion,
}

impl<T> Backend<T>
Expand All @@ -161,6 +169,7 @@ where
shutdown: Arc<SeqCstAtomicBool>,
shutdown_receiver: oneshot::Receiver<()>,
subscribers_receiver: mpsc::UnboundedReceiver<P2pEventHandler>,
node_protocol_version: ProtocolVersion,
) -> Self {
let (peer_event_tx, peer_event_rx) = mpsc::unbounded_channel();
Self {
Expand All @@ -181,6 +190,7 @@ where
shutdown_receiver,
events_controller: EventsController::new(),
subscribers_receiver,
node_protocol_version,
}
}

Expand Down Expand Up @@ -235,6 +245,7 @@ where
SyncingEvent::Connected {
peer_id,
common_services: peer.common_services,
protocol_version: peer.protocol_version,
sync_msg_rx,
},
&self.shutdown,
Expand Down Expand Up @@ -350,6 +361,7 @@ where
receiver_address,
peer_event_tx,
backend_event_rx,
self.node_protocol_version,
);
let shutdown = Arc::clone(&self.shutdown);
let local_time = P2pTimestamp::from_duration_since_epoch(self.time_getter.get_time());
Expand Down Expand Up @@ -400,6 +412,7 @@ where
}

let common_services = peer_info.common_services;
let protocol_version = peer_info.protocol_version;
let inbound = connection_info == ConnectionInfo::Inbound;
let user_agent = peer_info.user_agent.clone();
let software_version = peer_info.software_version;
Expand Down Expand Up @@ -430,6 +443,7 @@ where
handle,
address,
inbound,
protocol_version,
user_agent,
software_version,
common_services,
Expand Down Expand Up @@ -536,6 +550,28 @@ where

PeerEvent::MessageReceived { message } => self.handle_message(peer_id, message),

PeerEvent::HandshakeFailed { error } => {
if let Some(pending_peer) = self.pending.get(&peer_id) {
log::debug!("Sending ConnectivityEvent::HandshakeFailed for peer {peer_id}");

let send_result = self.conn_event_tx.send(ConnectivityEvent::HandshakeFailed {
address: pending_peer.address,
error,
});
if let Err(send_error) = send_result {
log::error!(
"Unable to report a failed handshake for peer {} to the front end: {}",
peer_id,
send_error
);
}
} else {
log::error!("Cannot find pending peer for peer id {peer_id}");
}

Ok(())
}

PeerEvent::ConnectionClosed => {
if let Some(pending_peer) = self.pending.remove(&peer_id) {
match pending_peer.connection_info {
Expand Down
Loading
Loading