Skip to content

Commit

Permalink
avoid panic'ing when peer is not on the connected peer list
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Jan 8, 2024
1 parent 5f43d26 commit 34dc65d
Showing 1 changed file with 148 additions and 152 deletions.
300 changes: 148 additions & 152 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,28 +711,27 @@ where
// Send to peers we know are subscribed to the topic.
let mut publish_failed = true;
for peer_id in recipient_peers.iter() {
tracing::trace!(peer=%peer_id, "Sending message to peer");
let sender = &mut self
.connected_peers
.get_mut(peer_id)
.expect("The peer must be connected")
.sender;

match sender.publish(
raw_message.clone(),
self.config.publish_queue_duration(),
self.metrics.as_mut(),
) {
Ok(_) => publish_failed = false,
Err(_) => {
self.failed_messages.entry(*peer_id).or_default().priority += 1;

tracing::warn!(peer=%peer_id, "Publish queue full. Could not publish to peer");
// Downscore the peer due to failed message.
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
if let Some(peer) = &mut self.connected_peers.get_mut(peer_id) {
tracing::trace!(peer=%peer_id, "Sending message to peer");
match peer.sender.publish(
raw_message.clone(),
self.config.publish_queue_duration(),
self.metrics.as_mut(),
) {
Ok(_) => publish_failed = false,
Err(_) => {
self.failed_messages.entry(*peer_id).or_default().priority += 1;

tracing::warn!(peer=%peer_id, "Publish queue full. Could not publish to peer");
// Downscore the peer due to failed message.
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
}
}
}
} else {
tracing::error!(peer = %peer_id,
"Could not PUBLISH, peer doesn't exist in connected peer list");
}
}

Expand Down Expand Up @@ -1013,19 +1012,18 @@ where

for peer_id in added_peers {
// Send a GRAFT control message
tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer");
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.graft(&peer_id, topic_hash.clone());
}
let sender = &mut self
.connected_peers
.get_mut(&peer_id)
.expect("Peer must be connected")
.sender;

sender.graft(Graft {
topic_hash: topic_hash.clone(),
});
if let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) {
tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer");
peer.sender.graft(Graft {
topic_hash: topic_hash.clone(),
});
} else {
tracing::error!(peer = %peer_id,
"Could not GRAFT, peer doesn't exist in connected peer list");
}

// If the peer did not previously exist in any mesh, inform the handler
peer_added_to_mesh(
Expand Down Expand Up @@ -1117,17 +1115,14 @@ where
}
for peer_id in peers {
// Send a PRUNE control message
tracing::debug!(%peer_id, "LEAVE: Sending PRUNE to peer");
let on_unsubscribe = true;
let prune =
self.make_prune(topic_hash, &peer_id, self.config.do_px(), on_unsubscribe);
let sender = &mut self
.connected_peers
.get_mut(&peer_id)
.expect("Peer must be connected")
.sender;

sender.prune(prune);
let prune = self.make_prune(topic_hash, &peer_id, self.config.do_px(), true);
if let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) {
tracing::debug!(%peer_id, "LEAVE: Sending PRUNE to peer");
peer.sender.prune(prune);
} else {
tracing::error!(peer = %peer_id,
"Could not PRUNE, peer doesn't exist in connected peer list");
}

// If the peer did not previously exist in any mesh, inform the handler
peer_removed_from_mesh(
Expand Down Expand Up @@ -1282,34 +1277,35 @@ where
Instant::now() + self.config.iwant_followup_time(),
);
}
tracing::trace!(
peer=%peer_id,
"IHAVE: Asking for the following messages from peer: {:?}",
iwant_ids_vec
);

let sender = &mut self
.connected_peers
.get_mut(peer_id)
.expect("Peer must be connected")
.sender;
if let Some(peer) = &mut self.connected_peers.get_mut(peer_id) {
tracing::trace!(
peer=%peer_id,
"IHAVE: Asking for the following messages from peer: {:?}",
iwant_ids_vec
);

if sender
.iwant(IWant {
message_ids: iwant_ids_vec,
})
.is_err()
{
tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IWANT");
if peer
.sender
.iwant(IWant {
message_ids: iwant_ids_vec,
})
.is_err()
{
tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IWANT");

if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
}
// Increment failed message count
self.failed_messages
.entry(*peer_id)
.or_default()
.non_priority += 1;
}
// Increment failed message count
self.failed_messages
.entry(*peer_id)
.or_default()
.non_priority += 1;
} else {
tracing::error!(peer = %peer_id,
"Could not IWANT, peer doesn't exist in connected peer list");
}
}
tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
Expand Down Expand Up @@ -1345,30 +1341,30 @@ where
"IWANT: Peer has asked for message too many times; ignoring request"
);
} else {

Check failure on line 1343 in protocols/gossipsub/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / clippy (nightly-2023-09-10)

this `else { if .. }` block can be collapsed
tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
let sender = &mut self
.connected_peers
.get_mut(peer_id)
.expect("Peer must be connected")
.sender;

if sender
.forward(
msg,
self.config.forward_queue_duration(),
self.metrics.as_mut(),
)
.is_err()
{
// Downscore the peer
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
if let Some(peer) = &mut self.connected_peers.get_mut(peer_id) {
tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
if peer
.sender
.forward(
msg,
self.config.forward_queue_duration(),
self.metrics.as_mut(),
)
.is_err()
{
// Downscore the peer
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
}
// Increment the failed message count
self.failed_messages
.entry(*peer_id)
.or_default()
.non_priority += 1;
}
// Increment the failed message count
self.failed_messages
.entry(*peer_id)
.or_default()
.non_priority += 1;
} else {
tracing::error!(peer = %peer_id,
"Could not IWANT, peer doesn't exist in connected peer list");
}
}
}
Expand Down Expand Up @@ -2016,13 +2012,13 @@ where

// If we need to send grafts to peer, do so immediately, rather than waiting for the
// heartbeat.
let sender = &mut self
.connected_peers
.get_mut(propagation_source)
.expect("Peer must be connected")
.sender;
for topic_hash in topics_to_graft.into_iter() {
sender.graft(Graft { topic_hash });
if let Some(peer) = &mut self.connected_peers.get_mut(propagation_source) {
for topic_hash in topics_to_graft.into_iter() {
peer.sender.graft(Graft { topic_hash });
}
} else {
tracing::error!(peer = %propagation_source,
"Could not GRAFT, peer doesn't exist in connected peer list");
}

// Notify the application of the subscriptions
Expand Down Expand Up @@ -2505,28 +2501,29 @@ where
}

// send an IHAVE message
let sender = &mut self
.connected_peers
.get_mut(&peer_id)
.expect("Peer must be connected")
.sender;
if sender
.ihave(IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
})
.is_err()
{
tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IHAVE");
if let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) {
if peer
.sender
.ihave(IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
})
.is_err()
{
tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IHAVE");

if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(&peer_id);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(&peer_id);
}
// Increment failed message count
self.failed_messages
.entry(peer_id)
.or_default()
.non_priority += 1;
}
// Increment failed message count
self.failed_messages
.entry(peer_id)
.or_default()
.non_priority += 1;
} else {
tracing::error!(peer = %peer_id,
"Could not IHAVE, peer doesn't exist in connected peer list");
}
}
}
Expand Down Expand Up @@ -2607,13 +2604,12 @@ where
self.config.do_px() && !no_px.contains(peer_id),
false,
);
let sender = &mut self
.connected_peers
.get_mut(peer_id)
.expect("Peer must be connected")
.sender;

sender.prune(prune);
if let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) {

Check failure on line 2607 in protocols/gossipsub/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / clippy (nightly-2023-09-10)

this expression creates a reference which is immediately dereferenced by the compiler
peer.sender.prune(prune);
} else {
tracing::error!(peer = %peer_id,
"Could not PRUNE, peer doesn't exist in connected peer list");
}

// inform the handler
peer_removed_from_mesh(
Expand Down Expand Up @@ -2679,29 +2675,30 @@ where
// forward the message to peers
if !recipient_peers.is_empty() {
for peer_id in recipient_peers.iter() {
tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer");
let sender = &mut self
.connected_peers
.get_mut(peer_id)
.expect("Peer must be connected")
.sender;
if sender
.forward(
message.clone(),
self.config.forward_queue_duration(),
self.metrics.as_mut(),
)
.is_err()
{
// Downscore the peer
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
if let Some(peer) = self.connected_peers.get_mut(peer_id) {
tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer");
if peer
.sender
.forward(
message.clone(),
self.config.forward_queue_duration(),
self.metrics.as_mut(),
)
.is_err()
{
// Downscore the peer
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
}
// Increment the failed message count
self.failed_messages
.entry(*peer_id)
.or_default()
.non_priority += 1;
}
// Increment the failed message count
self.failed_messages
.entry(*peer_id)
.or_default()
.non_priority += 1;
} else {
tracing::error!(peer = %peer_id,
"Could not FORWARD, peer doesn't exist in connected peer list");
}
}
tracing::debug!("Completed forwarding message");
Expand Down Expand Up @@ -2850,14 +2847,13 @@ where

tracing::debug!(peer=%peer_id, "New peer connected");
// We need to send our subscriptions to the newly-connected node.
let sender = &mut self
.connected_peers
.get_mut(&peer_id)
.expect("Peer must be connected")
.sender;

for topic_hash in self.mesh.clone().into_keys() {
sender.subscribe(topic_hash);
if let Some(peer) = self.connected_peers.get_mut(&peer_id) {
for topic_hash in self.mesh.clone().into_keys() {
peer.sender.subscribe(topic_hash);
}
} else {
tracing::error!(peer = %peer_id,
"Could not SUBSCRIBE, peer doesn't exist in connected peer list");
}
}

Expand Down

0 comments on commit 34dc65d

Please sign in to comment.