diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index be3178ba8b2..6c0c3840c08 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -516,6 +516,7 @@ where ); Ok(connection_id) } + /// Polls the connection pool for events. pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll> where diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index abe3fcf0705..461e8c7d13f 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -665,423 +665,445 @@ where &mut self.behaviour } - /// Internal function used by everything event-related. - /// - /// Polls the `Swarm` for the next event. - fn poll_next_event( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - // We use a `this` variable because the compiler can't mutably borrow multiple times - // across a `Deref`. - let this = &mut *self; - - loop { - let mut listeners_not_ready = false; - let mut connections_not_ready = false; + fn handle_pool_event( + &mut self, + event: PoolEvent, transport::Boxed<(PeerId, StreamMuxerBox)>>, + ) -> Option>> { + match event { + PoolEvent::ConnectionEstablished { + peer_id, + id, + endpoint, + other_established_connection_ids, + concurrent_dial_errors, + } => { + if self.banned_peers.contains(&peer_id) { + // Mark the connection for the banned peer as banned, thus withholding any + // future events from the connection to the behaviour. + self.banned_peer_connections.insert(id); + self.pool.disconnect(peer_id); + return Some(SwarmEvent::BannedPeer { peer_id, endpoint }); + } else { + let num_established = NonZeroU32::new( + u32::try_from(other_established_connection_ids.len() + 1).unwrap(), + ) + .expect("n + 1 is always non-zero; qed"); + let non_banned_established = other_established_connection_ids + .into_iter() + .filter(|conn_id| !self.banned_peer_connections.contains(conn_id)) + .count(); - // Poll the listener(s) for new connections. - match ListenersStream::poll(Pin::new(&mut this.listeners), cx) { - Poll::Pending => { - listeners_not_ready = true; + log::debug!( + "Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}", + peer_id, + endpoint, + num_established, + non_banned_established + 1, + ); + let failed_addresses = concurrent_dial_errors + .as_ref() + .map(|es| es.iter().map(|(a, _)| a).cloned().collect()); + self.behaviour.inject_connection_established( + &peer_id, + &id, + &endpoint, + failed_addresses.as_ref(), + non_banned_established, + ); + return Some(SwarmEvent::ConnectionEstablished { + peer_id, + num_established, + endpoint, + concurrent_dial_errors, + }); } - Poll::Ready(ListenersEvent::Incoming { - listener_id: _, - upgrade, + } + PoolEvent::PendingOutboundConnectionError { + id: _, + error, + handler, + peer, + } => { + let error = error.into(); + + self.behaviour.inject_dial_failure(peer, handler, &error); + + if let Some(peer) = peer { + log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,); + } else { + log::debug!("Connection attempt to unknown peer failed with {:?}", error); + } + + return Some(SwarmEvent::OutgoingConnectionError { + peer_id: peer, + error, + }); + } + PoolEvent::PendingInboundConnectionError { + id: _, + send_back_addr, + local_addr, + error, + handler, + } => { + log::debug!("Incoming connection failed: {:?}", error); + self.behaviour + .inject_listen_failure(&local_addr, &send_back_addr, handler); + return Some(SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, - }) => { - let handler = this.behaviour.new_handler(); - match this.pool.add_incoming( - upgrade, - handler, - IncomingInfo { - local_addr: &local_addr, - send_back_addr: &send_back_addr, - }, - ) { - Ok(_connection_id) => { - return Poll::Ready(SwarmEvent::IncomingConnection { - local_addr, - send_back_addr, - }); - } - Err((connection_limit, handler)) => { - this.behaviour.inject_listen_failure( - &local_addr, - &send_back_addr, - handler, - ); - log::warn!("Incoming connection rejected: {:?}", connection_limit); - } - }; - } - Poll::Ready(ListenersEvent::NewAddress { - listener_id, - listen_addr, - }) => { - log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr); - if !this.listened_addrs.contains(&listen_addr) { - this.listened_addrs.push(listen_addr.clone()) - } - this.behaviour - .inject_new_listen_addr(listener_id, &listen_addr); - return Poll::Ready(SwarmEvent::NewListenAddr { - listener_id, - address: listen_addr, - }); - } - Poll::Ready(ListenersEvent::AddressExpired { - listener_id, - listen_addr, - }) => { + error, + }); + } + PoolEvent::ConnectionClosed { + id, + connected, + error, + remaining_established_connection_ids, + handler, + .. + } => { + if let Some(error) = error.as_ref() { log::debug!( - "Listener {:?}; Expired address {:?}.", - listener_id, - listen_addr + "Connection closed with error {:?}: {:?}; Total (peer): {}.", + error, + connected, + remaining_established_connection_ids.len() ); - this.listened_addrs.retain(|a| a != &listen_addr); - this.behaviour - .inject_expired_listen_addr(listener_id, &listen_addr); - return Poll::Ready(SwarmEvent::ExpiredListenAddr { - listener_id, - address: listen_addr, - }); - } - Poll::Ready(ListenersEvent::Closed { - listener_id, - addresses, - reason, - }) => { - log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason); - for addr in addresses.iter() { - this.behaviour.inject_expired_listen_addr(listener_id, addr); - } - this.behaviour.inject_listener_closed( - listener_id, - match &reason { - Ok(()) => Ok(()), - Err(err) => Err(err), - }, + } else { + log::debug!( + "Connection closed: {:?}; Total (peer): {}.", + connected, + remaining_established_connection_ids.len() ); - return Poll::Ready(SwarmEvent::ListenerClosed { - listener_id, - addresses, - reason, - }); - } - Poll::Ready(ListenersEvent::Error { listener_id, error }) => { - this.behaviour.inject_listener_error(listener_id, &error); - return Poll::Ready(SwarmEvent::ListenerError { listener_id, error }); } - } - - // Poll the known peers. - match this.pool.poll(cx) { - Poll::Pending => { - connections_not_ready = true; + let peer_id = connected.peer_id; + let endpoint = connected.endpoint; + let num_established = + u32::try_from(remaining_established_connection_ids.len()).unwrap(); + let conn_was_reported = !self.banned_peer_connections.remove(&id); + if conn_was_reported { + let remaining_non_banned = remaining_established_connection_ids + .into_iter() + .filter(|conn_id| !self.banned_peer_connections.contains(conn_id)) + .count(); + self.behaviour.inject_connection_closed( + &peer_id, + &id, + &endpoint, + handler, + remaining_non_banned, + ); } - Poll::Ready(PoolEvent::ConnectionEstablished { - id, + return Some(SwarmEvent::ConnectionClosed { peer_id, endpoint, - other_established_connection_ids, - concurrent_dial_errors, - }) => { - if this.banned_peers.contains(&peer_id) { - // Mark the connection for the banned peer as banned, thus withholding any - // future events from the connection to the behaviour. - this.banned_peer_connections.insert(id); - this.pool.disconnect(peer_id); - return Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint }); - } else { - let num_established = NonZeroU32::new( - u32::try_from(other_established_connection_ids.len() + 1).unwrap(), - ) - .expect("n + 1 is always non-zero; qed"); - let non_banned_established = other_established_connection_ids - .into_iter() - .filter(|conn_id| !this.banned_peer_connections.contains(conn_id)) - .count(); - - log::debug!( - "Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}", - peer_id, - endpoint, - num_established, - non_banned_established + 1, - ); - let failed_addresses = concurrent_dial_errors - .as_ref() - .map(|es| es.iter().map(|(a, _)| a).cloned().collect()); - this.behaviour.inject_connection_established( - &peer_id, - &id, - &endpoint, - failed_addresses.as_ref(), - non_banned_established, - ); - return Poll::Ready(SwarmEvent::ConnectionEstablished { - peer_id, - num_established, - endpoint, - concurrent_dial_errors, - }); - } + cause: error, + num_established, + }); + } + PoolEvent::ConnectionEvent { peer_id, id, event } => { + if self.banned_peer_connections.contains(&id) { + log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id); + } else { + self.behaviour.inject_event(peer_id, id, event); } - Poll::Ready(PoolEvent::PendingOutboundConnectionError { - id: _, - error, - handler, - peer, - }) => { - let error = error.into(); + } + PoolEvent::AddressChange { + peer_id, + id, + new_endpoint, + old_endpoint, + } => { + if !self.banned_peer_connections.contains(&id) { + self.behaviour.inject_address_change( + &peer_id, + &id, + &old_endpoint, + &new_endpoint, + ); + } + } + } - this.behaviour.inject_dial_failure(peer, handler, &error); + None + } - if let Some(peer) = peer { - log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,); - } else { - log::debug!("Connection attempt to unknown peer failed with {:?}", error); + fn handle_listeners_event( + &mut self, + event: ListenersEvent>, + ) -> Option>> { + match event { + ListenersEvent::Incoming { + listener_id: _, + upgrade, + local_addr, + send_back_addr, + } => { + let handler = self.behaviour.new_handler(); + match self.pool.add_incoming( + upgrade, + handler, + IncomingInfo { + local_addr: &local_addr, + send_back_addr: &send_back_addr, + }, + ) { + Ok(_connection_id) => { + return Some(SwarmEvent::IncomingConnection { + local_addr, + send_back_addr, + }); } - - return Poll::Ready(SwarmEvent::OutgoingConnectionError { - peer_id: peer, - error, - }); + Err((connection_limit, handler)) => { + self.behaviour + .inject_listen_failure(&local_addr, &send_back_addr, handler); + log::warn!("Incoming connection rejected: {:?}", connection_limit); + } + }; + } + ListenersEvent::NewAddress { + listener_id, + listen_addr, + } => { + log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr); + if !self.listened_addrs.contains(&listen_addr) { + self.listened_addrs.push(listen_addr.clone()) } - Poll::Ready(PoolEvent::PendingInboundConnectionError { - id: _, - send_back_addr, - local_addr, - error, - handler, - }) => { - log::debug!("Incoming connection failed: {:?}", error); - this.behaviour - .inject_listen_failure(&local_addr, &send_back_addr, handler); - return Poll::Ready(SwarmEvent::IncomingConnectionError { - local_addr, - send_back_addr, - error, - }); + self.behaviour + .inject_new_listen_addr(listener_id, &listen_addr); + return Some(SwarmEvent::NewListenAddr { + listener_id, + address: listen_addr, + }); + } + ListenersEvent::AddressExpired { + listener_id, + listen_addr, + } => { + log::debug!( + "Listener {:?}; Expired address {:?}.", + listener_id, + listen_addr + ); + self.listened_addrs.retain(|a| a != &listen_addr); + self.behaviour + .inject_expired_listen_addr(listener_id, &listen_addr); + return Some(SwarmEvent::ExpiredListenAddr { + listener_id, + address: listen_addr, + }); + } + ListenersEvent::Closed { + listener_id, + addresses, + reason, + } => { + log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason); + for addr in addresses.iter() { + self.behaviour.inject_expired_listen_addr(listener_id, addr); } - Poll::Ready(PoolEvent::ConnectionClosed { - id, - connected, - error, - remaining_established_connection_ids, - handler, - .. - }) => { - if let Some(error) = error.as_ref() { - log::debug!( - "Connection closed with error {:?}: {:?}; Total (peer): {}.", - error, - connected, - remaining_established_connection_ids.len() - ); - } else { - log::debug!( - "Connection closed: {:?}; Total (peer): {}.", - connected, - remaining_established_connection_ids.len() - ); - } - let peer_id = connected.peer_id; - let endpoint = connected.endpoint; - let num_established = - u32::try_from(remaining_established_connection_ids.len()).unwrap(); - let conn_was_reported = !this.banned_peer_connections.remove(&id); - if conn_was_reported { - let remaining_non_banned = remaining_established_connection_ids - .into_iter() - .filter(|conn_id| !this.banned_peer_connections.contains(conn_id)) - .count(); - this.behaviour.inject_connection_closed( - &peer_id, - &id, - &endpoint, - handler, - remaining_non_banned, - ); + self.behaviour.inject_listener_closed( + listener_id, + match &reason { + Ok(()) => Ok(()), + Err(err) => Err(err), + }, + ); + return Some(SwarmEvent::ListenerClosed { + listener_id, + addresses, + reason, + }); + } + ListenersEvent::Error { listener_id, error } => { + self.behaviour.inject_listener_error(listener_id, &error); + return Some(SwarmEvent::ListenerError { listener_id, error }); + } + } + None + } + + fn handle_behaviour_event( + &mut self, + event: NetworkBehaviourAction, + ) -> Option>> { + match event { + NetworkBehaviourAction::GenerateEvent(event) => { + return Some(SwarmEvent::Behaviour(event)) + } + NetworkBehaviourAction::Dial { opts, handler } => { + let peer_id = opts.get_peer_id(); + if let Ok(()) = self.dial_with_handler(opts, handler) { + if let Some(peer_id) = peer_id { + return Some(SwarmEvent::Dialing(peer_id)); } - return Poll::Ready(SwarmEvent::ConnectionClosed { - peer_id, - endpoint, - cause: error, - num_established, - }); } - Poll::Ready(PoolEvent::ConnectionEvent { id, peer_id, event }) => { - if this.banned_peer_connections.contains(&id) { - log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id); - } else { - this.behaviour.inject_event(peer_id, id, event); + } + NetworkBehaviourAction::NotifyHandler { + peer_id, + handler, + event, + } => { + assert!(self.pending_event.is_none()); + let handler = match handler { + NotifyHandler::One(connection) => PendingNotifyHandler::One(connection), + NotifyHandler::Any => { + let ids = self + .pool + .iter_established_connections_of_peer(&peer_id) + .collect(); + PendingNotifyHandler::Any(ids) } + }; + + self.pending_event = Some((peer_id, handler, event)); + } + NetworkBehaviourAction::ReportObservedAddr { address, score } => { + // Maps the given `observed_addr`, representing an address of the local + // node observed by a remote peer, onto the locally known listen addresses + // to yield one or more addresses of the local node that may be publicly + // reachable. + // + // I.e. self method incorporates the view of other peers into the listen + // addresses seen by the local node to account for possible IP and port + // mappings performed by intermediate network devices in an effort to + // obtain addresses for the local peer that are also reachable for peers + // other than the peer who reported the `observed_addr`. + // + // The translation is transport-specific. See [`Transport::address_translation`]. + let translated_addresses = { + let transport = self.listeners.transport(); + let mut addrs: Vec<_> = self + .listeners + .listen_addrs() + .filter_map(move |server| transport.address_translation(server, &address)) + .collect(); + + // remove duplicates + addrs.sort_unstable(); + addrs.dedup(); + addrs + }; + for addr in translated_addresses { + self.add_external_address(addr, score); } - Poll::Ready(PoolEvent::AddressChange { - id, - peer_id, - new_endpoint, - old_endpoint, - }) => { - if !this.banned_peer_connections.contains(&id) { - this.behaviour.inject_address_change( - &peer_id, - &id, - &old_endpoint, - &new_endpoint, - ); + } + NetworkBehaviourAction::CloseConnection { + peer_id, + connection, + } => match connection { + CloseConnection::One(connection_id) => { + if let Some(conn) = self.pool.get_established(connection_id) { + conn.start_close(); } } - }; + CloseConnection::All => { + self.pool.disconnect(peer_id); + } + }, + } - // After the network had a chance to make progress, try to deliver - // the pending event emitted by the behaviour in the previous iteration - // to the connection handler(s). The pending event must be delivered - // before polling the behaviour again. If the targeted peer - // meanwhie disconnected, the event is discarded. - if let Some((peer_id, handler, event)) = this.pending_event.take() { - match handler { + None + } + + /// Internal function used by everything event-related. + /// + /// Polls the `Swarm` for the next event. + fn poll_next_event( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + // We use a `this` variable because the compiler can't mutably borrow multiple times + // across a `Deref`. + let this = &mut *self; + + // This loop polls the components below in a prioritized order. + // + // 1. [`NetworkBehaviour`] + // 2. Connection [`Pool`] + // 3. [`ListenersStream`] + // + // (1) is polled before (2) to prioritize local work over work coming from a remote. + // + // (2) is polled before (3) to prioritize existing connections over upgrading new incoming connections. + loop { + match this.pending_event.take() { + // Try to deliver the pending event emitted by the [`NetworkBehaviour`] in the previous + // iteration to the connection handler(s). + Some((peer_id, handler, event)) => match handler { PendingNotifyHandler::One(conn_id) => { - if let Some(mut conn) = this.pool.get_established(conn_id) { - if let Some(event) = notify_one(&mut conn, event, cx) { - this.pending_event = Some((peer_id, handler, event)); - if listeners_not_ready && connections_not_ready { - return Poll::Pending; - } else { - continue; + match this.pool.get_established(conn_id) { + Some(mut conn) => match notify_one(&mut conn, event, cx) { + None => continue, + Some(event) => { + this.pending_event = Some((peer_id, handler, event)); } - } + }, + None => continue, } } PendingNotifyHandler::Any(ids) => { - if let Some((event, ids)) = - notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx) - { - let handler = PendingNotifyHandler::Any(ids); - this.pending_event = Some((peer_id, handler, event)); - if listeners_not_ready && connections_not_ready { - return Poll::Pending; - } else { - continue; + match notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx) { + None => continue, + Some((event, ids)) => { + let handler = PendingNotifyHandler::Any(ids); + this.pending_event = Some((peer_id, handler, event)); } } } - } - } - - debug_assert!(this.pending_event.is_none()); + }, + // No pending event. Allow the [`NetworkBehaviour`] to make progress. + None => { + let behaviour_poll = { + let mut parameters = SwarmPollParameters { + local_peer_id: &this.local_peer_id, + supported_protocols: &this.supported_protocols, + listened_addrs: &this.listened_addrs, + external_addrs: &this.external_addrs, + }; + this.behaviour.poll(cx, &mut parameters) + }; - let behaviour_poll = { - let mut parameters = SwarmPollParameters { - local_peer_id: &this.local_peer_id, - supported_protocols: &this.supported_protocols, - listened_addrs: &this.listened_addrs, - external_addrs: &this.external_addrs, - }; - this.behaviour.poll(cx, &mut parameters) - }; + match behaviour_poll { + Poll::Pending => {} + Poll::Ready(behaviour_event) => { + if let Some(swarm_event) = this.handle_behaviour_event(behaviour_event) + { + return Poll::Ready(swarm_event); + } - match behaviour_poll { - Poll::Pending if listeners_not_ready && connections_not_ready => { - return Poll::Pending - } - Poll::Pending => (), - Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { - return Poll::Ready(SwarmEvent::Behaviour(event)) - } - Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }) => { - let peer_id = opts.get_peer_id(); - if let Ok(()) = this.dial_with_handler(opts, handler) { - if let Some(peer_id) = peer_id { - return Poll::Ready(SwarmEvent::Dialing(peer_id)); + continue; } } } - Poll::Ready(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event, - }) => match handler { - NotifyHandler::One(connection) => { - if let Some(mut conn) = this.pool.get_established(connection) { - if let Some(event) = notify_one(&mut conn, event, cx) { - let handler = PendingNotifyHandler::One(connection); - this.pending_event = Some((peer_id, handler, event)); - if listeners_not_ready && connections_not_ready { - return Poll::Pending; - } else { - continue; - } - } - } - } - NotifyHandler::Any => { - let ids = this - .pool - .iter_established_connections_of_peer(&peer_id) - .collect(); - if let Some((event, ids)) = - notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx) - { - let handler = PendingNotifyHandler::Any(ids); - this.pending_event = Some((peer_id, handler, event)); - if listeners_not_ready && connections_not_ready { - return Poll::Pending; - } else { - continue; - } - } - } - }, - Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => { - // Maps the given `observed_addr`, representing an address of the local - // node observed by a remote peer, onto the locally known listen addresses - // to yield one or more addresses of the local node that may be publicly - // reachable. - // - // I.e. this method incorporates the view of other peers into the listen - // addresses seen by the local node to account for possible IP and port - // mappings performed by intermediate network devices in an effort to - // obtain addresses for the local peer that are also reachable for peers - // other than the peer who reported the `observed_addr`. - // - // The translation is transport-specific. See [`Transport::address_translation`]. - let translated_addresses = { - let transport = this.listeners.transport(); - let mut addrs: Vec<_> = this - .listeners - .listen_addrs() - .filter_map(move |server| { - transport.address_translation(server, &address) - }) - .collect(); + } - // remove duplicates - addrs.sort_unstable(); - addrs.dedup(); - addrs - }; - for addr in translated_addresses { - this.add_external_address(addr, score); + // Poll the known peers. + match this.pool.poll(cx) { + Poll::Pending => {} + Poll::Ready(pool_event) => { + if let Some(swarm_event) = this.handle_pool_event(pool_event) { + return Poll::Ready(swarm_event); } + + continue; } - Poll::Ready(NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - }) => match connection { - CloseConnection::One(connection_id) => { - if let Some(conn) = this.pool.get_established(connection_id) { - conn.start_close(); - } - } - CloseConnection::All => { - this.pool.disconnect(peer_id); + }; + + // Poll the listener(s) for new connections. + match ListenersStream::poll(Pin::new(&mut this.listeners), cx) { + Poll::Pending => {} + Poll::Ready(listeners_event) => { + if let Some(swarm_event) = this.handle_listeners_event(listeners_event) { + return Poll::Ready(swarm_event); } - }, + + continue; + } } + + return Poll::Pending; } } }