Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge inject_* paired methods #2445

Merged
merged 16 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions protocols/autonat/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,11 +348,16 @@ impl NetworkBehaviour for Behaviour {
conn: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
remaining_established: usize,
) {
self.inner
.inject_connection_closed(peer, conn, endpoint, handler);
let connections = self.connected.get_mut(peer).expect("Peer is connected.");
connections.remove(conn);
.inject_connection_closed(peer, conn, endpoint, handler, remaining_established);
if remaining_established == 0 {
self.connected.remove(peer);
} else {
let connections = self.connected.get_mut(peer).expect("Peer is connected.");
connections.remove(conn);
}
}

fn inject_dial_failure(
Expand All @@ -368,11 +373,6 @@ impl NetworkBehaviour for Behaviour {
}
}

fn inject_disconnected(&mut self, peer: &PeerId) {
self.inner.inject_disconnected(peer);
self.connected.remove(peer);
}

fn inject_address_change(
&mut self,
peer: &PeerId,
Expand Down
14 changes: 13 additions & 1 deletion protocols/floodsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,19 @@ impl NetworkBehaviour for Floodsub {
self.connected_peers.insert(*id, SmallVec::new());
}

fn inject_disconnected(&mut self, id: &PeerId) {
fn inject_connection_closed(
&mut self,
id: &PeerId,
_: &ConnectionId,
_: &ConnectedPoint,
_: Self::ProtocolsHandler,
remaining_established: usize,
) {
if remaining_established > 0 {
// we only care about peer disconnections
return;
}

let was_in = self.connected_peers.remove(id);
debug_assert!(was_in.is_some());

Expand Down
206 changes: 103 additions & 103 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3055,86 +3055,6 @@ where
)
}

fn inject_disconnected(&mut self, peer_id: &PeerId) {
// remove from mesh, topic_peers, peer_topic and the fanout
debug!("Peer disconnected: {}", peer_id);
{
let topics = match self.peer_topics.get(peer_id) {
Some(topics) => (topics),
None => {
debug_assert!(
self.blacklisted_peers.contains(peer_id),
"Disconnected node not in connected list"
);
return;
}
};

// remove peer from all mappings
for topic in topics {
// check the mesh for the topic
if let Some(mesh_peers) = self.mesh.get_mut(topic) {
// check if the peer is in the mesh and remove it
if mesh_peers.remove(peer_id) {
if let Some(m) = self.metrics.as_mut() {
m.peers_removed(topic, Churn::Dc, 1);
m.set_mesh_peers(topic, mesh_peers.len());
}
};
}

// remove from topic_peers
if let Some(peer_list) = self.topic_peers.get_mut(topic) {
if !peer_list.remove(peer_id) {
// debugging purposes
warn!(
"Disconnected node: {} not in topic_peers peer list",
peer_id
);
}
if let Some(m) = self.metrics.as_mut() {
m.set_topic_peers(topic, peer_list.len())
}
} else {
warn!(
"Disconnected node: {} with topic: {:?} not in topic_peers",
&peer_id, &topic
);
}

// remove from fanout
self.fanout
.get_mut(topic)
.map(|peers| peers.remove(peer_id));
}
}

// Forget px and outbound status for this peer
self.px_peers.remove(peer_id);
self.outbound_peers.remove(peer_id);

// Remove peer from peer_topics and connected_peers
// NOTE: It is possible the peer has already been removed from all mappings if it does not
// support the protocol.
self.peer_topics.remove(peer_id);

// If metrics are enabled, register the disconnection of a peer based on its protocol.
if let Some(metrics) = self.metrics.as_mut() {
let peer_kind = &self
.connected_peers
.get(peer_id)
.expect("Connected peer must be registered")
.kind;
metrics.peer_protocol_disconnected(peer_kind.clone());
}

self.connected_peers.remove(peer_id);

if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.remove_peer(peer_id);
}
}

fn inject_connection_established(
&mut self,
peer_id: &PeerId,
Expand Down Expand Up @@ -3228,6 +3148,7 @@ where
connection_id: &ConnectionId,
endpoint: &ConnectedPoint,
_: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
remaining_established: usize,
) {
// Remove IP from peer scoring system
if let Some((peer_score, ..)) = &mut self.peer_score {
Expand All @@ -3242,35 +3163,114 @@ where
}
}

// Remove the connection from the list
// If there are no connections left, inject_disconnected will remove the mapping entirely.
if let Some(connections) = self.connected_peers.get_mut(peer_id) {
let index = connections
.connections
.iter()
.position(|v| v == connection_id)
.expect("Previously established connection to peer must be present");
connections.connections.remove(index);
if remaining_established != 0 {
// Remove the connection from the list
if let Some(connections) = self.connected_peers.get_mut(peer_id) {
let index = connections
.connections
.iter()
.position(|v| v == connection_id)
.expect("Previously established connection to peer must be present");
connections.connections.remove(index);

// If there are more connections and this peer is in a mesh, inform the first connection
// handler.
if !connections.connections.is_empty() {
if let Some(topics) = self.peer_topics.get(peer_id) {
for topic in topics {
if let Some(mesh_peers) = self.mesh.get(topic) {
if mesh_peers.contains(peer_id) {
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: *peer_id,
event: Arc::new(GossipsubHandlerIn::JoinedMesh),
handler: NotifyHandler::One(connections.connections[0]),
});
break;
}
}
}
}
}
}
} else {
// remove from mesh, topic_peers, peer_topic and the fanout
debug!("Peer disconnected: {}", peer_id);
{
let topics = match self.peer_topics.get(peer_id) {
Some(topics) => (topics),
None => {
debug_assert!(
self.blacklisted_peers.contains(peer_id),
"Disconnected node not in connected list"
);
return;
}
};

// If there are more connections and this peer is in a mesh, inform the first connection
// handler.
if !connections.connections.is_empty() {
if let Some(topics) = self.peer_topics.get(peer_id) {
for topic in topics {
if let Some(mesh_peers) = self.mesh.get(topic) {
if mesh_peers.contains(peer_id) {
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: *peer_id,
event: Arc::new(GossipsubHandlerIn::JoinedMesh),
handler: NotifyHandler::One(connections.connections[0]),
});
break;
// remove peer from all mappings
for topic in topics {
// check the mesh for the topic
if let Some(mesh_peers) = self.mesh.get_mut(topic) {
// check if the peer is in the mesh and remove it
if mesh_peers.remove(peer_id) {
if let Some(m) = self.metrics.as_mut() {
m.peers_removed(topic, Churn::Dc, 1);
m.set_mesh_peers(topic, mesh_peers.len());
}
};
}

// remove from topic_peers
if let Some(peer_list) = self.topic_peers.get_mut(topic) {
if !peer_list.remove(peer_id) {
// debugging purposes
warn!(
"Disconnected node: {} not in topic_peers peer list",
peer_id
);
}
if let Some(m) = self.metrics.as_mut() {
m.set_topic_peers(topic, peer_list.len())
}
} else {
warn!(
"Disconnected node: {} with topic: {:?} not in topic_peers",
&peer_id, &topic
);
}

// remove from fanout
self.fanout
.get_mut(topic)
.map(|peers| peers.remove(peer_id));
}
}

// Forget px and outbound status for this peer
self.px_peers.remove(peer_id);
self.outbound_peers.remove(peer_id);

// Remove peer from peer_topics and connected_peers
// NOTE: It is possible the peer has already been removed from all mappings if it does not
// support the protocol.
self.peer_topics.remove(peer_id);

// If metrics are enabled, register the disconnection of a peer based on its protocol.
if let Some(metrics) = self.metrics.as_mut() {
let peer_kind = &self
.connected_peers
.get(peer_id)
.expect("Connected peer must be registered")
.kind;
metrics.peer_protocol_disconnected(peer_kind.clone());
}

self.connected_peers.remove(peer_id);

if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.remove_peer(peer_id);
}
}
}

Expand Down
30 changes: 28 additions & 2 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,32 @@ mod tests {
peer
}

fn disconnect_peer<D, F>(gs: &mut Gossipsub<D, F>, peer_id: &PeerId)
where
D: DataTransform + Default + Clone + Send + 'static,
F: TopicSubscriptionFilter + Clone + Default + Send + 'static,
{
if let Some(peer_connections) = gs.connected_peers.get(peer_id) {
let fake_endpoint = ConnectedPoint::Dialer {
address: Multiaddr::empty(),
role_override: Endpoint::Dialer,
}; // this is not relevant
// peer_connections.connections should never be empty.
let mut active_connections = peer_connections.connections.len() - 1;
for conn_id in peer_connections.connections.clone() {
let handler = gs.new_handler();
gs.inject_connection_closed(
peer_id,
&conn_id,
&fake_endpoint,
handler,
active_connections,
);
active_connections = active_connections.saturating_sub(1);
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

// Converts a protobuf message into a gossipsub message for reading the Gossipsub event queue.
fn proto_to_message(rpc: &crate::rpc_proto::Rpc) -> GossipsubRpc {
// Store valid messages.
Expand Down Expand Up @@ -1382,7 +1408,7 @@ mod tests {
flush_events(&mut gs);

//disconnect peer
gs.inject_disconnected(peer);
disconnect_peer(&mut gs, peer);

gs.heartbeat();

Expand Down Expand Up @@ -5335,7 +5361,7 @@ mod tests {
gs.handle_graft(&peers[0], subscribe_topic_hash);

// The node disconnects
gs.inject_disconnected(&peers[0]);
disconnect_peer(&mut gs, &peers[0]);

// We unsubscribe from the topic.
let _ = gs.unsubscribe(&Topic::new(topic));
Expand Down
11 changes: 5 additions & 6 deletions protocols/identify/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,12 @@ impl NetworkBehaviour for Identify {
conn: &ConnectionId,
_: &ConnectedPoint,
_: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
remaining_established: usize,
) {
if let Some(addrs) = self.connected.get_mut(peer_id) {
if remaining_established == 0 {
self.connected.remove(peer_id);
self.pending_push.remove(peer_id);
} else if let Some(addrs) = self.connected.get_mut(peer_id) {
addrs.remove(conn);
}
}
Expand All @@ -282,11 +286,6 @@ impl NetworkBehaviour for Identify {
}
}

fn inject_disconnected(&mut self, peer_id: &PeerId) {
self.connected.remove(peer_id);
self.pending_push.remove(peer_id);
}

fn inject_new_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
if self.config.push_listen_addr_updates {
self.pending_push.extend(self.connected.keys());
Expand Down
19 changes: 14 additions & 5 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1949,12 +1949,21 @@ where
}
}

fn inject_disconnected(&mut self, id: &PeerId) {
for query in self.queries.iter_mut() {
query.on_failure(id);
fn inject_connection_closed(
&mut self,
id: &PeerId,
_: &ConnectionId,
_: &ConnectedPoint,
_: <Self::ProtocolsHandler as libp2p_swarm::IntoProtocolsHandler>::Handler,
remaining_established: usize,
) {
if remaining_established == 0 {
for query in self.queries.iter_mut() {
query.on_failure(id);
}
self.connection_updated(*id, None, NodeStatus::Disconnected);
self.connected_peers.remove(id);
}
self.connection_updated(*id, None, NodeStatus::Disconnected);
self.connected_peers.remove(id);
}

fn inject_event(
Expand Down
Loading