Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ConnectivityEvent::Misbehaved to report unexpected handshakes #1245

Merged
merged 1 commit into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions p2p/src/net/default_backend/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,17 +560,10 @@ where
if let Some(pending_peer) = self.pending.get(&peer_id) {
log::debug!("Sending ConnectivityEvent::HandshakeFailed for peer {peer_id}");

let send_result = self.conn_event_tx.send(ConnectivityEvent::HandshakeFailed {
self.conn_event_tx.send(ConnectivityEvent::HandshakeFailed {
address: pending_peer.address,
error,
});
if let Err(send_error) = send_result {
log::error!(
"Unable to report a failed handshake for peer {} to the front end: {}",
peer_id,
send_error
);
}
})?;
} else {
log::error!("Cannot find pending peer for peer id {peer_id}");
}
Expand Down Expand Up @@ -607,6 +600,12 @@ where

Ok(())
}

PeerEvent::Misbehaved { error } => {
self.conn_event_tx.send(ConnectivityEvent::Misbehaved { peer_id, error })?;

Ok(())
}
}
}

Expand Down
12 changes: 8 additions & 4 deletions p2p/src/net/default_backend/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,15 @@ where
) -> crate::Result<()> {
match msg.categorize() {
CategorizedMessage::Handshake(_) => {
// TODO: this must be reported to the peer manager, so that it can adjust
// the peer's ban score. (We may add a separate PeerEvent for this and Backend
// can then use the now unused ConnectivityEvent::Misbehaved to forward the error
// to the peer manager.)
log::error!("Peer {peer_id} sent unexpected handshake message");

peer_event_tx
.send(PeerEvent::Misbehaved {
error: P2pError::ProtocolError(ProtocolError::UnexpectedMessage(
"Unexpected handshake message".to_owned(),
)),
})
.await?;
}
CategorizedMessage::PeerManagerMessage(msg) => {
peer_event_tx.send(PeerEvent::MessageReceived { message: msg }).await?
Expand Down
3 changes: 3 additions & 0 deletions p2p/src/net/default_backend/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ pub enum PeerEvent {

/// Message received from remote
MessageReceived { message: PeerManagerMessage },

/// Protocol violation
Misbehaved { error: P2pError },
}

/// Events sent by Backend to Peer
Expand Down
46 changes: 46 additions & 0 deletions p2p/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ where
subscribed_to_peer_addresses: BTreeSet<PeerId>,

peer_eviction_random_state: peers_eviction::RandomState,

/// PeerManager's observer for use by tests.
observer: Option<Box<dyn Observer + Send>>,
}

/// Takes IP or socket address and converts it to socket address (adding the default peer port if IP address is used)
Expand All @@ -196,6 +199,26 @@ where
peer_mgr_event_rx: mpsc::UnboundedReceiver<PeerManagerEvent>,
time_getter: TimeGetter,
peerdb_storage: S,
) -> crate::Result<Self> {
Self::new_with_observer(
chain_config,
p2p_config,
handle,
peer_mgr_event_rx,
time_getter,
peerdb_storage,
None,
)
}

pub fn new_with_observer(
chain_config: Arc<ChainConfig>,
p2p_config: Arc<P2pConfig>,
handle: T::ConnectivityHandle,
peer_mgr_event_rx: mpsc::UnboundedReceiver<PeerManagerEvent>,
time_getter: TimeGetter,
peerdb_storage: S,
observer: Option<Box<dyn Observer + Send>>,
) -> crate::Result<Self> {
let mut rng = make_pseudo_rng();
let peerdb = peerdb::PeerDb::new(
Expand All @@ -218,6 +241,7 @@ where
peerdb,
subscribed_to_peer_addresses: BTreeSet::new(),
peer_eviction_random_state: peers_eviction::RandomState::new(&mut rng),
observer,
})
}

Expand Down Expand Up @@ -367,6 +391,10 @@ where
peer.score
);

if let Some(o) = self.observer.as_mut() {
o.on_peer_ban_score_adjustment(peer.address, peer.score)
}

if peer.score >= *self.p2p_config.ban_threshold {
let address = peer.address.as_bannable();
self.ban(address);
Expand All @@ -391,6 +419,10 @@ where
return;
}

if let Some(o) = self.observer.as_mut() {
o.on_peer_ban_score_adjustment(peer_address, score);
}

if score >= *self.p2p_config.ban_threshold {
let address = peer_address.as_bannable();
self.ban(address);
Expand All @@ -414,6 +446,10 @@ where

self.peerdb.ban(address);

if let Some(o) = self.observer.as_mut() {
o.on_peer_ban(address);
}

for peer_id in to_disconnect {
self.disconnect(peer_id, PeerDisconnectionDbAction::Keep, None);
}
Expand Down Expand Up @@ -1538,11 +1574,21 @@ where
self.run_internal(None).await
}

#[cfg(test)]
pub fn peers(&self) -> &BTreeMap<PeerId, PeerContext> {
&self.peers
}

#[cfg(test)]
pub fn peerdb(&self) -> &peerdb::PeerDb<S> {
&self.peerdb
}
}

pub trait Observer {
fn on_peer_ban_score_adjustment(&mut self, address: SocketAddress, new_score: u32);
fn on_peer_ban(&mut self, address: BannableAddress);
}

#[cfg(test)]
mod tests;
8 changes: 4 additions & 4 deletions p2p/src/peer_manager/tests/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async fn ping_timeout() {
})
.unwrap();

let event = expect_recv!(&mut cmd_rx);
let event = expect_recv!(cmd_rx);
match event {
Command::Accept { peer_id: _ } => {}
_ => panic!("unexpected event: {event:?}"),
Expand All @@ -120,7 +120,7 @@ async fn ping_timeout() {
for _ in 0..5 {
time_getter.advance_time(ping_check_period);

let cmd = expect_recv!(&mut cmd_rx);
let cmd = expect_recv!(cmd_rx);
let (peer_id, peer_msg) = cmd_to_peer_man_msg(cmd);
let nonce = assert_matches_return_val!(
peer_msg,
Expand All @@ -138,7 +138,7 @@ async fn ping_timeout() {

// Receive one more ping request but do not send a ping response
time_getter.advance_time(ping_check_period);
let cmd = expect_recv!(&mut cmd_rx);
let cmd = expect_recv!(cmd_rx);
let (_, peer_msg) = cmd_to_peer_man_msg(cmd);
assert_matches!(
peer_msg,
Expand All @@ -148,7 +148,7 @@ async fn ping_timeout() {
time_getter.advance_time(ping_timeout);

// PeerManager should ask backend to close connection
let event = expect_recv!(&mut cmd_rx);
let event = expect_recv!(cmd_rx);
match event {
Command::Disconnect { peer_id } => {
conn_tx.send(ConnectivityEvent::ConnectionClosed { peer_id }).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/sync/tests/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl TestNode {

/// Receives a message from the sync manager.
pub async fn message(&mut self) -> (PeerId, SyncMessage) {
expect_recv!(&mut self.sync_msg_receiver)
expect_recv!(self.sync_msg_receiver)
}

/// Try to receive a message from the sync manager.
Expand Down
34 changes: 24 additions & 10 deletions p2p/src/tests/correct_handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ where
let chain_config = Arc::new(common::chain::config::create_unit_test_config());
let p2p_config = Arc::new(test_p2p_config());

let test_node = TestNode::<TTM>::start(
let mut test_node = TestNode::<TTM>::start(
Arc::clone(&chain_config),
Arc::clone(&p2p_config),
TTM::make_address(),
Expand Down Expand Up @@ -82,11 +82,18 @@ where
// which one it is though).
let _msg = msg_stream.recv().await.unwrap();

// This is mainly needed to ensure that the corresponding events, if any, reach
// peer manager before we end the test.
test_node.expect_no_banning().await;

let test_node_remnants = test_node.join().await;
assert_eq!(
test_node_remnants.peer_mgr.peerdb().list_banned().count(),
0
);

let bans_count = test_node_remnants.peer_mgr.peerdb().list_banned().count();
assert_eq!(bans_count, 0);

assert_eq!(test_node_remnants.peer_mgr.peers().len(), 1);
let peer_score = test_node_remnants.peer_mgr.peers().first_key_value().unwrap().1.score;
assert_eq!(peer_score, 0);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand All @@ -112,7 +119,7 @@ where
let chain_config = Arc::new(common::chain::config::create_unit_test_config());
let p2p_config = Arc::new(test_p2p_config());

let test_node = TestNode::<TTM>::start(
let mut test_node = TestNode::<TTM>::start(
Arc::clone(&chain_config),
Arc::clone(&p2p_config),
TTM::make_address(),
Expand Down Expand Up @@ -149,11 +156,18 @@ where
// which one it is though).
let _msg = msg_stream.recv().await.unwrap();

// This is mainly needed to ensure that the corresponding events, if any, reach
// peer manager before we end the test.
test_node.expect_no_banning().await;

let test_node_remnants = test_node.join().await;
assert_eq!(
test_node_remnants.peer_mgr.peerdb().list_banned().count(),
0
);

let bans_count = test_node_remnants.peer_mgr.peerdb().list_banned().count();
assert_eq!(bans_count, 0);

assert_eq!(test_node_remnants.peer_mgr.peers().len(), 1);
let peer_score = test_node_remnants.peer_mgr.peers().first_key_value().unwrap().1.score;
assert_eq!(peer_score, 0);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand Down
Loading
Loading