diff --git a/protocols/relay/src/v2/relay.rs b/protocols/relay/src/v2/relay.rs index 378c5cce1a4..1a67ff533a1 100644 --- a/protocols/relay/src/v2/relay.rs +++ b/protocols/relay/src/v2/relay.rs @@ -24,6 +24,7 @@ mod handler; pub mod rate_limiter; use crate::v2::message_proto; +use crate::v2::protocol::inbound_hop; use instant::Instant; use libp2p_core::connection::{ConnectedPoint, ConnectionId}; use libp2p_core::multiaddr::Protocol; @@ -156,7 +157,7 @@ pub struct Relay { circuits: CircuitsTracker, /// Queue of actions to return when polled. - queued_actions: VecDeque>, + queued_actions: VecDeque, } impl Relay { @@ -212,14 +213,14 @@ impl NetworkBehaviour for Relay { // Only emit [`CircuitClosed`] for accepted requests. .filter(|c| matches!(c.status, CircuitStatus::Accepted)) { - self.queued_actions - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::CircuitClosed { - src_peer_id: circuit.src_peer_id, - dst_peer_id: circuit.dst_peer_id, - error: Some(std::io::ErrorKind::ConnectionAborted.into()), - }, - )); + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::CircuitClosed { + src_peer_id: circuit.src_peer_id, + dst_peer_id: circuit.dst_peer_id, + error: Some(std::io::ErrorKind::ConnectionAborted.into()), + }) + .into(), + ); } } @@ -236,12 +237,17 @@ impl NetworkBehaviour for Relay { } => { let now = Instant::now(); - let handler_event = if remote_addr.iter().any(|p| p == Protocol::P2pCircuit) { + let action = if remote_addr.iter().any(|p| p == Protocol::P2pCircuit) { // Deny reservation requests over relayed circuits. - handler::In::DenyReservationReq { - inbound_reservation_req, - status: message_proto::Status::PermissionDenied, + NetworkBehaviourAction::NotifyHandler { + handler: NotifyHandler::One(connection), + peer_id: event_source, + event: handler::In::DenyReservationReq { + inbound_reservation_req, + status: message_proto::Status::PermissionDenied, + }, } + .into() } else if self .reservations .iter() @@ -255,28 +261,30 @@ impl NetworkBehaviour for Relay { .all(|limiter| limiter.try_next(event_source, &remote_addr, now)) { // Deny reservation exceeding limits. - handler::In::DenyReservationReq { - inbound_reservation_req, - status: message_proto::Status::ResourceLimitExceeded, + NetworkBehaviourAction::NotifyHandler { + handler: NotifyHandler::One(connection), + peer_id: event_source, + event: handler::In::DenyReservationReq { + inbound_reservation_req, + status: message_proto::Status::ResourceLimitExceeded, + }, } + .into() } else { // Accept reservation. self.reservations .entry(event_source) .or_default() .insert(connection); - handler::In::AcceptReservationReq { + + Action::AcceptReservationPrototype { + handler: NotifyHandler::One(connection), + peer_id: event_source, inbound_reservation_req, - addrs: vec![], } }; - self.queued_actions - .push_back(NetworkBehaviourAction::NotifyHandler { - handler: NotifyHandler::One(connection), - peer_id: event_source, - event: handler_event, - }); + self.queued_actions.push_back(action); } handler::Event::ReservationReqAccepted { renewed } => { // Ensure local eventual consistent reservation state matches handler (source of @@ -286,51 +294,51 @@ impl NetworkBehaviour for Relay { .or_default() .insert(connection); - self.queued_actions - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::ReservationReqAccepted { - src_peer_id: event_source, - renewed, - }, - )); + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::ReservationReqAccepted { + src_peer_id: event_source, + renewed, + }) + .into(), + ); } handler::Event::ReservationReqAcceptFailed { error } => { - self.queued_actions - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::ReservationReqAcceptFailed { - src_peer_id: event_source, - error, - }, - )); + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::ReservationReqAcceptFailed { + src_peer_id: event_source, + error, + }) + .into(), + ); } handler::Event::ReservationReqDenied {} => { - self.queued_actions - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::ReservationReqDenied { - src_peer_id: event_source, - }, - )); + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::ReservationReqDenied { + src_peer_id: event_source, + }) + .into(), + ); } handler::Event::ReservationReqDenyFailed { error } => { - self.queued_actions - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::ReservationReqDenyFailed { - src_peer_id: event_source, - error, - }, - )); + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::ReservationReqDenyFailed { + src_peer_id: event_source, + error, + }) + .into(), + ); } handler::Event::ReservationTimedOut {} => { self.reservations .get_mut(&event_source) .map(|cs| cs.remove(&connection)); - self.queued_actions - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::ReservationTimedOut { - src_peer_id: event_source, - }, - )); + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::ReservationTimedOut { + src_peer_id: event_source, + }) + .into(), + ); } handler::Event::CircuitReqReceived { inbound_circuit_req, @@ -406,7 +414,7 @@ impl NetworkBehaviour for Relay { }, } }; - self.queued_actions.push_back(action); + self.queued_actions.push_back(action.into()); } handler::Event::CircuitReqDenied { circuit_id, @@ -416,13 +424,13 @@ impl NetworkBehaviour for Relay { self.circuits.remove(circuit_id); } - self.queued_actions - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::CircuitReqDenied { - src_peer_id: event_source, - dst_peer_id, - }, - )); + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::CircuitReqDenied { + src_peer_id: event_source, + dst_peer_id, + }) + .into(), + ); } handler::Event::CircuitReqDenyFailed { circuit_id, @@ -433,14 +441,14 @@ impl NetworkBehaviour for Relay { self.circuits.remove(circuit_id); } - self.queued_actions - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::CircuitReqDenyFailed { - src_peer_id: event_source, - dst_peer_id, - error, - }, - )); + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::CircuitReqDenyFailed { + src_peer_id: event_source, + dst_peer_id, + error, + }) + .into(), + ); } handler::Event::OutboundConnectNegotiated { circuit_id, @@ -451,8 +459,8 @@ impl NetworkBehaviour for Relay { dst_stream, dst_pending_data, } => { - self.queued_actions - .push_back(NetworkBehaviourAction::NotifyHandler { + self.queued_actions.push_back( + NetworkBehaviourAction::NotifyHandler { handler: NotifyHandler::One(src_connection_id), peer_id: src_peer_id, event: handler::In::AcceptAndDriveCircuit { @@ -463,7 +471,9 @@ impl NetworkBehaviour for Relay { dst_stream, dst_pending_data, }, - }); + } + .into(), + ); } handler::Event::OutboundConnectNegotiationFailed { circuit_id, @@ -472,8 +482,8 @@ impl NetworkBehaviour for Relay { inbound_circuit_req, status, } => { - self.queued_actions - .push_back(NetworkBehaviourAction::NotifyHandler { + self.queued_actions.push_back( + NetworkBehaviourAction::NotifyHandler { handler: NotifyHandler::One(src_connection_id), peer_id: src_peer_id, event: handler::In::DenyCircuitReq { @@ -481,20 +491,22 @@ impl NetworkBehaviour for Relay { inbound_circuit_req, status, }, - }); + } + .into(), + ); } handler::Event::CircuitReqAccepted { dst_peer_id, circuit_id, } => { self.circuits.accepted(circuit_id); - self.queued_actions - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::CircuitReqAccepted { - src_peer_id: event_source, - dst_peer_id, - }, - )); + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::CircuitReqAccepted { + src_peer_id: event_source, + dst_peer_id, + }) + .into(), + ); } handler::Event::CircuitReqAcceptFailed { dst_peer_id, @@ -502,14 +514,14 @@ impl NetworkBehaviour for Relay { error, } => { self.circuits.remove(circuit_id); - self.queued_actions - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::CircuitReqAcceptFailed { - src_peer_id: event_source, - dst_peer_id, - error, - }, - )); + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::CircuitReqAcceptFailed { + src_peer_id: event_source, + dst_peer_id, + error, + }) + .into(), + ); } handler::Event::CircuitClosed { dst_peer_id, @@ -518,14 +530,14 @@ impl NetworkBehaviour for Relay { } => { self.circuits.remove(circuit_id); - self.queued_actions - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::CircuitClosed { - src_peer_id: event_source, - dst_peer_id, - error, - }, - )); + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::CircuitClosed { + src_peer_id: event_source, + dst_peer_id, + error, + }) + .into(), + ); } } } @@ -535,23 +547,8 @@ impl NetworkBehaviour for Relay { _cx: &mut Context<'_>, poll_parameters: &mut impl PollParameters, ) -> Poll> { - if let Some(mut event) = self.queued_actions.pop_front() { - // Set external addresses in [`AcceptReservationReq`]. - if let NetworkBehaviourAction::NotifyHandler { - event: handler::In::AcceptReservationReq { ref mut addrs, .. }, - .. - } = &mut event - { - *addrs = poll_parameters - .external_addresses() - .map(|a| { - a.addr - .with(Protocol::P2p((*poll_parameters.local_peer_id()).into())) - }) - .collect(); - } - - return Poll::Ready(event); + if let Some(action) = self.queued_actions.pop_front() { + return Poll::Ready(action.build(poll_parameters)); } Poll::Pending @@ -640,3 +637,49 @@ impl Add for CircuitId { CircuitId(self.0 + rhs) } } + +/// A [`NetworkBehaviourAction`], either complete, or still requiring data from [`PollParameters`] +/// before being returned in [`::poll`]. +enum Action { + Done(NetworkBehaviourAction), + AcceptReservationPrototype { + inbound_reservation_req: inbound_hop::ReservationReq, + handler: NotifyHandler, + peer_id: PeerId, + }, +} + +impl From> for Action { + fn from(action: NetworkBehaviourAction) -> Self { + Self::Done(action) + } +} + +impl Action { + fn build( + self, + poll_parameters: &mut impl PollParameters, + ) -> NetworkBehaviourAction { + match self { + Action::Done(action) => action, + Action::AcceptReservationPrototype { + inbound_reservation_req, + handler, + peer_id, + } => NetworkBehaviourAction::NotifyHandler { + handler, + peer_id, + event: handler::In::AcceptReservationReq { + inbound_reservation_req, + addrs: poll_parameters + .external_addresses() + .map(|a| { + a.addr + .with(Protocol::P2p((*poll_parameters.local_peer_id()).into())) + }) + .collect(), + }, + }, + } + } +}