From f1ab37110db8293821fbcdfdbebf93a28f5c0ebe Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Tue, 16 May 2023 20:39:20 +0200 Subject: [PATCH 1/3] ConnectionHandlerEvent Custom to NotifyBehaviour --- protocols/dcutr/src/handler/direct.rs | 2 +- protocols/dcutr/src/handler/relayed.rs | 48 +++---- protocols/gossipsub/src/handler.rs | 14 +- protocols/identify/src/handler.rs | 21 +-- protocols/kad/src/handler.rs | 48 ++++--- protocols/perf/src/client/handler.rs | 4 +- protocols/perf/src/server/handler.rs | 4 +- protocols/ping/src/handler.rs | 14 +- protocols/relay/src/behaviour/handler.rs | 122 ++++++++++-------- protocols/relay/src/priv_client/handler.rs | 42 +++--- protocols/rendezvous/src/substream_handler.rs | 28 ++-- protocols/request-response/src/handler.rs | 4 +- swarm/src/connection.rs | 2 +- swarm/src/handler.rs | 22 +++- swarm/src/handler/map_out.rs | 4 +- swarm/src/handler/one_shot.rs | 4 +- swarm/src/handler/select.rs | 8 +- 17 files changed, 216 insertions(+), 175 deletions(-) diff --git a/protocols/dcutr/src/handler/direct.rs b/protocols/dcutr/src/handler/direct.rs index a06c9272baa..cd994be5c30 100644 --- a/protocols/dcutr/src/handler/direct.rs +++ b/protocols/dcutr/src/handler/direct.rs @@ -70,7 +70,7 @@ impl ConnectionHandler for Handler { > { if !self.reported { self.reported = true; - return Poll::Ready(ConnectionHandlerEvent::Custom( + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::DirectConnectionEstablished, )); } diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index eb1ad83cecb..c71b27b831d 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -171,12 +171,13 @@ impl Handler { ConnectedPoint::Dialer { address, role_override: _ } => address.clone(), ConnectedPoint::Listener { ..} => unreachable!("`::listen_protocol` denies all incoming substreams as a listener."), }; - self.queued_events.push_back(ConnectionHandlerEvent::Custom( - Event::InboundConnectRequest { - inbound_connect: Box::new(inbound_connect), - remote_addr, - }, - )); + self.queued_events + .push_back(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundConnectRequest { + inbound_connect: Box::new(inbound_connect), + remote_addr, + }, + )); } // A connection listener denies all incoming substreams, thus none can ever be fully negotiated. future::Either::Right(output) => void::unreachable(output), @@ -197,11 +198,12 @@ impl Handler { self.endpoint.is_listener(), "A connection dialer never initiates a connection upgrade." ); - self.queued_events.push_back(ConnectionHandlerEvent::Custom( - Event::OutboundConnectNegotiated { - remote_addrs: obs_addrs, - }, - )); + self.queued_events + .push_back(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundConnectNegotiated { + remote_addrs: obs_addrs, + }, + )); } fn on_listen_upgrade_error( @@ -228,21 +230,23 @@ impl Handler { match error { StreamUpgradeError::Timeout => { - self.queued_events.push_back(ConnectionHandlerEvent::Custom( - Event::OutboundNegotiationFailed { - error: StreamUpgradeError::Timeout, - }, - )); + self.queued_events + .push_back(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundNegotiationFailed { + error: StreamUpgradeError::Timeout, + }, + )); } StreamUpgradeError::NegotiationFailed => { // The remote merely doesn't support the DCUtR protocol. // This is no reason to close the connection, which may // successfully communicate with other protocols already. - self.queued_events.push_back(ConnectionHandlerEvent::Custom( - Event::OutboundNegotiationFailed { - error: StreamUpgradeError::NegotiationFailed, - }, - )); + self.queued_events + .push_back(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundNegotiationFailed { + error: StreamUpgradeError::NegotiationFailed, + }, + )); } _ => { // Anything else is considered a fatal error or misbehaviour of @@ -342,7 +346,7 @@ impl ConnectionHandler for Handler { self.inbound_connect = None; match result { Ok(addresses) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::InboundConnectNegotiated(addresses), )); } diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 615d4ae132b..8508e026cee 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -232,9 +232,9 @@ impl EnabledHandler { if !self.peer_kind_sent { if let Some(peer_kind) = self.peer_kind.as_ref() { self.peer_kind_sent = true; - return Poll::Ready(ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind( - peer_kind.clone(), - ))); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::PeerKind(peer_kind.clone()), + )); } } @@ -261,7 +261,7 @@ impl EnabledHandler { self.last_io_activity = Instant::now(); self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream)); - return Poll::Ready(ConnectionHandlerEvent::Custom(message)); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(message)); } Poll::Ready(Some(Err(error))) => { log::debug!("Failed to read from inbound stream: {error}"); @@ -466,9 +466,9 @@ impl ConnectionHandler for Handler { Handler::Disabled(DisabledHandler::ProtocolUnsupported { peer_kind_sent }) => { if !*peer_kind_sent { *peer_kind_sent = true; - return Poll::Ready(ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind( - PeerKind::NotSupported, - ))); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::PeerKind(PeerKind::NotSupported), + )); } Poll::Pending diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index d041fca731a..4c7ae58631e 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -174,13 +174,13 @@ impl Handler { future::Either::Left(remote_info) => { self.update_supported_protocols_for_remote(&remote_info); self.events - .push(ConnectionHandlerEvent::Custom(Event::Identified( + .push(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified( remote_info, ))); } - future::Either::Right(()) => self - .events - .push(ConnectionHandlerEvent::Custom(Event::IdentificationPushed)), + future::Either::Right(()) => self.events.push(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationPushed, + )), } } @@ -192,10 +192,9 @@ impl Handler { >, ) { let err = err.map_upgrade_err(|e| e.into_inner()); - self.events - .push(ConnectionHandlerEvent::Custom(Event::IdentificationError( - err, - ))); + self.events.push(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationError(err), + )); self.trigger_next_identify.reset(self.interval); } @@ -309,7 +308,9 @@ impl ConnectionHandler for Handler { if let Ok(info) = res { self.update_supported_protocols_for_remote(&info); - return Poll::Ready(ConnectionHandlerEvent::Custom(Event::Identified(info))); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified( + info, + ))); } } @@ -319,7 +320,7 @@ impl ConnectionHandler for Handler { .map(|()| Event::Identification) .unwrap_or_else(|err| Event::IdentificationError(StreamUpgradeError::Apply(err))); - return Poll::Ready(ConnectionHandlerEvent::Custom(event)); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); } Poll::Pending diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 75721c0dc31..99535e083f2 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -707,7 +707,7 @@ where > { if let ProtocolStatus::Confirmed = self.protocol_status { self.protocol_status = ProtocolStatus::Reported; - return Poll::Ready(ConnectionHandlerEvent::Custom( + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( KademliaHandlerEvent::ProtocolConfirmed { endpoint: self.endpoint.clone(), }, @@ -826,7 +826,7 @@ where Err(error) => { *this = OutboundSubstreamState::Done; let event = user_data.map(|user_data| { - ConnectionHandlerEvent::Custom( + ConnectionHandlerEvent::NotifyBehaviour( KademliaHandlerEvent::QueryError { error: KademliaHandlerQueryErr::Io(error), user_data, @@ -844,10 +844,12 @@ where Poll::Ready(Err(error)) => { *this = OutboundSubstreamState::Done; let event = user_data.map(|user_data| { - ConnectionHandlerEvent::Custom(KademliaHandlerEvent::QueryError { - error: KademliaHandlerQueryErr::Io(error), - user_data, - }) + ConnectionHandlerEvent::NotifyBehaviour( + KademliaHandlerEvent::QueryError { + error: KademliaHandlerQueryErr::Io(error), + user_data, + }, + ) }); return Poll::Ready(event); @@ -870,10 +872,12 @@ where Poll::Ready(Err(error)) => { *this = OutboundSubstreamState::Done; let event = user_data.map(|user_data| { - ConnectionHandlerEvent::Custom(KademliaHandlerEvent::QueryError { - error: KademliaHandlerQueryErr::Io(error), - user_data, - }) + ConnectionHandlerEvent::NotifyBehaviour( + KademliaHandlerEvent::QueryError { + error: KademliaHandlerQueryErr::Io(error), + user_data, + }, + ) }); return Poll::Ready(event); @@ -886,7 +890,9 @@ where *this = OutboundSubstreamState::Closing(substream); let event = process_kad_response(msg, user_data); - return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event))); + return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( + event, + ))); } Poll::Pending => { *this = OutboundSubstreamState::WaitingAnswer(substream, user_data); @@ -899,7 +905,9 @@ where user_data, }; - return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event))); + return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( + event, + ))); } Poll::Ready(None) => { *this = OutboundSubstreamState::Done; @@ -910,7 +918,9 @@ where user_data, }; - return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event))); + return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( + event, + ))); } } } @@ -918,7 +928,7 @@ where *this = OutboundSubstreamState::Done; let event = KademliaHandlerEvent::QueryError { error, user_data }; - return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event))); + return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(event))); } OutboundSubstreamState::Closing(mut stream) => match stream.poll_close_unpin(cx) { Poll::Ready(Ok(())) | Poll::Ready(Err(_)) => return Poll::Ready(None), @@ -971,7 +981,7 @@ where Poll::Ready(Some(Ok(KadRequestMsg::FindNode { key }))) => { *this = InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); - return Poll::Ready(Some(ConnectionHandlerEvent::Custom( + return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( KademliaHandlerEvent::FindNodeReq { key, request_id: KademliaRequestId { @@ -983,7 +993,7 @@ where Poll::Ready(Some(Ok(KadRequestMsg::GetProviders { key }))) => { *this = InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); - return Poll::Ready(Some(ConnectionHandlerEvent::Custom( + return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( KademliaHandlerEvent::GetProvidersReq { key, request_id: KademliaRequestId { @@ -998,14 +1008,14 @@ where connection_id, substream, }; - return Poll::Ready(Some(ConnectionHandlerEvent::Custom( + return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( KademliaHandlerEvent::AddProvider { key, provider }, ))); } Poll::Ready(Some(Ok(KadRequestMsg::GetValue { key }))) => { *this = InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); - return Poll::Ready(Some(ConnectionHandlerEvent::Custom( + return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( KademliaHandlerEvent::GetRecord { key, request_id: KademliaRequestId { @@ -1017,7 +1027,7 @@ where Poll::Ready(Some(Ok(KadRequestMsg::PutValue { record }))) => { *this = InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); - return Poll::Ready(Some(ConnectionHandlerEvent::Custom( + return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( KademliaHandlerEvent::PutRecord { record, request_id: KademliaRequestId { diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs index f8a1eff5c10..8a6df43d198 100644 --- a/protocols/perf/src/client/handler.rs +++ b/protocols/perf/src/client/handler.rs @@ -146,7 +146,7 @@ impl ConnectionHandler for Handler { .pop_front() .expect("requested stream without pending command"); self.queued_events - .push_back(ConnectionHandlerEvent::Custom(Event { + .push_back(ConnectionHandlerEvent::NotifyBehaviour(Event { id, result: Err(error), })); @@ -179,7 +179,7 @@ impl ConnectionHandler for Handler { while let Poll::Ready(Some(result)) = self.outbound.poll_next_unpin(cx) { match result { - Ok(event) => return Poll::Ready(ConnectionHandlerEvent::Custom(event)), + Ok(event) => return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)), Err(e) => { panic!("{e:?}") } diff --git a/protocols/perf/src/server/handler.rs b/protocols/perf/src/server/handler.rs index 6f5043b66a3..e8f7b72e605 100644 --- a/protocols/perf/src/server/handler.rs +++ b/protocols/perf/src/server/handler.rs @@ -129,7 +129,9 @@ impl ConnectionHandler for Handler { > { while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) { match result { - Ok(stats) => return Poll::Ready(ConnectionHandlerEvent::Custom(Event { stats })), + Ok(stats) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event { stats })) + } Err(e) => { error!("{e:?}") } diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index c21b234b3e3..e8fa40b0a67 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -258,7 +258,9 @@ impl ConnectionHandler for Handler { } State::Inactive { reported: false } => { self.state = State::Inactive { reported: true }; - return Poll::Ready(ConnectionHandlerEvent::Custom(Err(Failure::Unsupported))); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err( + Failure::Unsupported, + ))); } State::Active => {} } @@ -274,7 +276,7 @@ impl ConnectionHandler for Handler { Poll::Ready(Ok(stream)) => { // A ping from a remote peer has been answered, wait for the next. self.inbound = Some(protocol::recv_ping(stream).boxed()); - return Poll::Ready(ConnectionHandlerEvent::Custom(Ok(Success::Pong))); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(Success::Pong))); } } } @@ -299,7 +301,7 @@ impl ConnectionHandler for Handler { return Poll::Ready(ConnectionHandlerEvent::Close(error)); } - return Poll::Ready(ConnectionHandlerEvent::Custom(Err(error))); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(error))); } } @@ -318,9 +320,9 @@ impl ConnectionHandler for Handler { self.failures = 0; self.timer.reset(self.config.interval); self.outbound = Some(OutboundState::Idle(stream)); - return Poll::Ready(ConnectionHandlerEvent::Custom(Ok(Success::Ping { - rtt, - }))); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok( + Success::Ping { rtt }, + ))); } Poll::Ready(Err(e)) => { self.pending_errors diff --git a/protocols/relay/src/behaviour/handler.rs b/protocols/relay/src/behaviour/handler.rs index 752e5aeac89..9c1b8524ec3 100644 --- a/protocols/relay/src/behaviour/handler.rs +++ b/protocols/relay/src/behaviour/handler.rs @@ -410,21 +410,23 @@ impl Handler { ) { match request { inbound_hop::Req::Reserve(inbound_reservation_req) => { - self.queued_events.push_back(ConnectionHandlerEvent::Custom( - Event::ReservationReqReceived { - inbound_reservation_req, - endpoint: self.endpoint.clone(), - renewed: self.active_reservation.is_some(), - }, - )); + self.queued_events + .push_back(ConnectionHandlerEvent::NotifyBehaviour( + Event::ReservationReqReceived { + inbound_reservation_req, + endpoint: self.endpoint.clone(), + renewed: self.active_reservation.is_some(), + }, + )); } inbound_hop::Req::Connect(inbound_circuit_req) => { - self.queued_events.push_back(ConnectionHandlerEvent::Custom( - Event::CircuitReqReceived { - inbound_circuit_req, - endpoint: self.endpoint.clone(), - }, - )); + self.queued_events + .push_back(ConnectionHandlerEvent::NotifyBehaviour( + Event::CircuitReqReceived { + inbound_circuit_req, + endpoint: self.endpoint.clone(), + }, + )); } } } @@ -448,17 +450,18 @@ impl Handler { let (tx, rx) = oneshot::channel(); self.alive_lend_out_substreams.push(rx); - self.queued_events.push_back(ConnectionHandlerEvent::Custom( - Event::OutboundConnectNegotiated { - circuit_id, - src_peer_id, - src_connection_id, - inbound_circuit_req, - dst_handler_notifier: tx, - dst_stream, - dst_pending_data, - }, - )); + self.queued_events + .push_back(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundConnectNegotiated { + circuit_id, + src_peer_id, + src_connection_id, + inbound_circuit_req, + dst_handler_notifier: tx, + dst_stream, + dst_pending_data, + }, + )); } fn on_listen_upgrade_error( @@ -525,16 +528,17 @@ impl Handler { src_connection_id, } = open_info; - self.queued_events.push_back(ConnectionHandlerEvent::Custom( - Event::OutboundConnectNegotiationFailed { - circuit_id, - src_peer_id, - src_connection_id, - inbound_circuit_req, - status, - error: non_fatal_error, - }, - )); + self.queued_events + .push_back(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundConnectNegotiationFailed { + circuit_id, + src_peer_id, + src_connection_id, + inbound_circuit_req, + status, + error: non_fatal_error, + }, + )); } } @@ -692,18 +696,22 @@ impl ConnectionHandler for Handler { { match result { Ok(()) => { - return Poll::Ready(ConnectionHandlerEvent::Custom(Event::CircuitClosed { - circuit_id, - dst_peer_id, - error: None, - })) + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::CircuitClosed { + circuit_id, + dst_peer_id, + error: None, + }, + )) } Err(e) => { - return Poll::Ready(ConnectionHandlerEvent::Custom(Event::CircuitClosed { - circuit_id, - dst_peer_id, - error: Some(e), - })) + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::CircuitClosed { + circuit_id, + dst_peer_id, + error: Some(e), + }, + )) } } } @@ -714,13 +722,15 @@ impl ConnectionHandler for Handler { { match result { Ok(()) => { - return Poll::Ready(ConnectionHandlerEvent::Custom(Event::CircuitReqDenied { - circuit_id, - dst_peer_id, - })); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::CircuitReqDenied { + circuit_id, + dst_peer_id, + }, + )); } Err(error) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::CircuitReqDenyFailed { circuit_id, dst_peer_id, @@ -773,7 +783,7 @@ impl ConnectionHandler for Handler { self.circuits.push(circuit); - return Poll::Ready(ConnectionHandlerEvent::Custom( + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::CircuitReqAccepted { circuit_id, dst_peer_id, @@ -781,7 +791,7 @@ impl ConnectionHandler for Handler { )); } Err((circuit_id, dst_peer_id, error)) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::CircuitReqAcceptFailed { circuit_id, dst_peer_id, @@ -799,7 +809,7 @@ impl ConnectionHandler for Handler { .map(|fut| fut.poll_unpin(cx)) { self.active_reservation = None; - return Poll::Ready(ConnectionHandlerEvent::Custom( + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::ReservationTimedOut {}, )); } @@ -816,12 +826,12 @@ impl ConnectionHandler for Handler { .active_reservation .replace(Delay::new(self.config.reservation_duration)) .is_some(); - return Poll::Ready(ConnectionHandlerEvent::Custom( + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::ReservationReqAccepted { renewed }, )); } Err(error) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::ReservationReqAcceptFailed { error }, )); } @@ -834,12 +844,12 @@ impl ConnectionHandler for Handler { match result { Ok(()) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::ReservationReqDenied {}, )) } Err(error) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::ReservationReqDenyFailed { error }, )); } diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index dee51cde664..414ec4fc8df 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -194,9 +194,10 @@ impl Handler { relay_addr: self.remote_addr.clone(), }); - self.queued_events.push_back(ConnectionHandlerEvent::Custom( - Event::InboundCircuitEstablished { src_peer_id, limit }, - )); + self.queued_events + .push_back(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundCircuitEstablished { src_peer_id, limit }, + )); } Reservation::None => { let src_peer_id = inbound_circuit.src_peer_id(); @@ -254,7 +255,7 @@ impl Handler { ); self.queued_events - .push_back(ConnectionHandlerEvent::Custom(event)); + .push_back(ConnectionHandlerEvent::NotifyBehaviour(event)); } // Outbound circuit @@ -272,9 +273,10 @@ impl Handler { })) { Ok(()) => { self.alive_lend_out_substreams.push(rx); - self.queued_events.push_back(ConnectionHandlerEvent::Custom( - Event::OutboundCircuitEstablished { limit }, - )); + self.queued_events + .push_back(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundCircuitEstablished { limit }, + )); } Err(_) => debug!( "Oneshot to `client::transport::Dial` future dropped. \ @@ -350,12 +352,13 @@ impl Handler { } let renewal = self.reservation.failed(); - self.queued_events.push_back(ConnectionHandlerEvent::Custom( - Event::ReservationReqFailed { - renewal, - error: non_fatal_error, - }, - )); + self.queued_events + .push_back(ConnectionHandlerEvent::NotifyBehaviour( + Event::ReservationReqFailed { + renewal, + error: non_fatal_error, + }, + )); } OutboundOpenInfo::Connect { send_back } => { let non_fatal_error = match error { @@ -382,11 +385,12 @@ impl Handler { let _ = send_back.send(Err(())); - self.queued_events.push_back(ConnectionHandlerEvent::Custom( - Event::OutboundCircuitReqFailed { - error: non_fatal_error, - }, - )); + self.queued_events + .push_back(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundCircuitReqFailed { + error: non_fatal_error, + }, + )); } } } @@ -485,7 +489,7 @@ impl ConnectionHandler for Handler { }); if let Some((src_peer_id, event)) = maybe_event { self.circuit_deny_futs.remove(&src_peer_id); - return Poll::Ready(ConnectionHandlerEvent::Custom(event)); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); } // Send errors to transport. diff --git a/protocols/rendezvous/src/substream_handler.rs b/protocols/rendezvous/src/substream_handler.rs index 6fc5cb561b2..2434b691fb2 100644 --- a/protocols/rendezvous/src/substream_handler.rs +++ b/protocols/rendezvous/src/substream_handler.rs @@ -468,32 +468,28 @@ where match poll_substreams(&mut self.inbound_substreams, cx) { Poll::Ready(Ok((id, message))) => { - return Poll::Ready(ConnectionHandlerEvent::Custom(OutEvent::InboundEvent { - id, - message, - })) + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + OutEvent::InboundEvent { id, message }, + )) } Poll::Ready(Err((id, error))) => { - return Poll::Ready(ConnectionHandlerEvent::Custom(OutEvent::InboundError { - id, - error, - })) + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + OutEvent::InboundError { id, error }, + )) } Poll::Pending => {} } match poll_substreams(&mut self.outbound_substreams, cx) { Poll::Ready(Ok((id, message))) => { - return Poll::Ready(ConnectionHandlerEvent::Custom(OutEvent::OutboundEvent { - id, - message, - })) + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + OutEvent::OutboundEvent { id, message }, + )) } Poll::Ready(Err((id, error))) => { - return Poll::Ready(ConnectionHandlerEvent::Custom(OutEvent::OutboundError { - id, - error, - })) + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + OutEvent::OutboundError { id, error }, + )) } Poll::Pending => {} } diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 54b1b925847..35a2db98bdc 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -296,7 +296,7 @@ where > { // Drain pending events. if let Some(event) = self.pending_events.pop_front() { - return Poll::Ready(ConnectionHandlerEvent::Custom(event)); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { self.pending_events.shrink_to_fit(); } @@ -307,7 +307,7 @@ where Ok(((id, rq), rs_sender)) => { // We received an inbound request. self.keep_alive = KeepAlive::Yes; - return Poll::Ready(ConnectionHandlerEvent::Custom(Event::Request { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Request { request_id: id, request: rq, sender: rs_sender, diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 43be1c98e3a..eab521593ac 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -263,7 +263,7 @@ where requested_substreams.push(SubstreamRequested::new(user_data, timeout, upgrade)); continue; // Poll handler until exhausted. } - Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { + Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)) => { return Poll::Ready(Ok(Event::Handler(event))); } Poll::Ready(ConnectionHandlerEvent::Close(err)) => { diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index edefb7a1cb9..dcc7ab1c09d 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -101,7 +101,7 @@ use std::{cmp::Ordering, error, fmt, io, task::Context, task::Poll, time::Durati pub trait ConnectionHandler: Send + 'static { /// A type representing the message(s) a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) can send to a [`ConnectionHandler`] via [`ToSwarm::NotifyHandler`](crate::behaviour::ToSwarm::NotifyHandler) type FromBehaviour: fmt::Debug + Send + 'static; - /// A type representing message(s) a [`ConnectionHandler`] can send to a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) via [`ConnectionHandlerEvent::Custom`]. + /// A type representing message(s) a [`ConnectionHandler`] can send to a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) via [`ConnectionHandlerEvent::NotifyBehaviour`]. type ToBehaviour: fmt::Debug + Send + 'static; /// The type of errors returned by [`ConnectionHandler::poll`]. type Error: error::Error + fmt::Debug + Send + 'static; @@ -508,8 +508,8 @@ pub enum ConnectionHandlerEvent protocol: protocol.map_info(map), } } - ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(val), + ConnectionHandlerEvent::NotifyBehaviour(val) => { + ConnectionHandlerEvent::NotifyBehaviour(val) + } ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val), ConnectionHandlerEvent::ReportRemoteProtocols(support) => { ConnectionHandlerEvent::ReportRemoteProtocols(support) @@ -562,7 +564,9 @@ impl protocol: protocol.map_upgrade(map), } } - ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(val), + ConnectionHandlerEvent::NotifyBehaviour(val) => { + ConnectionHandlerEvent::NotifyBehaviour(val) + } ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val), ConnectionHandlerEvent::ReportRemoteProtocols(support) => { ConnectionHandlerEvent::ReportRemoteProtocols(support) @@ -582,7 +586,9 @@ impl ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } } - ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(map(val)), + ConnectionHandlerEvent::NotifyBehaviour(val) => { + ConnectionHandlerEvent::NotifyBehaviour(map(val)) + } ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val), ConnectionHandlerEvent::ReportRemoteProtocols(support) => { ConnectionHandlerEvent::ReportRemoteProtocols(support) @@ -602,7 +608,9 @@ impl ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } } - ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(val), + ConnectionHandlerEvent::NotifyBehaviour(val) => { + ConnectionHandlerEvent::NotifyBehaviour(val) + } ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(map(val)), ConnectionHandlerEvent::ReportRemoteProtocols(support) => { ConnectionHandlerEvent::ReportRemoteProtocols(support) diff --git a/swarm/src/handler/map_out.rs b/swarm/src/handler/map_out.rs index f3240a76b2d..e92d1403ce6 100644 --- a/swarm/src/handler/map_out.rs +++ b/swarm/src/handler/map_out.rs @@ -76,7 +76,9 @@ where >, > { self.inner.poll(cx).map(|ev| match ev { - ConnectionHandlerEvent::Custom(ev) => ConnectionHandlerEvent::Custom((self.map)(ev)), + ConnectionHandlerEvent::NotifyBehaviour(ev) => { + ConnectionHandlerEvent::NotifyBehaviour((self.map)(ev)) + } ConnectionHandlerEvent::Close(err) => ConnectionHandlerEvent::Close(err), ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } diff --git a/swarm/src/handler/one_shot.rs b/swarm/src/handler/one_shot.rs index b7c98e189e7..439d3f47ee3 100644 --- a/swarm/src/handler/one_shot.rs +++ b/swarm/src/handler/one_shot.rs @@ -157,7 +157,9 @@ where } if !self.events_out.is_empty() { - return Poll::Ready(ConnectionHandlerEvent::Custom(self.events_out.remove(0))); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + self.events_out.remove(0), + )); } else { self.events_out.shrink_to_fit(); } diff --git a/swarm/src/handler/select.rs b/swarm/src/handler/select.rs index 48048befc81..69888da1a01 100644 --- a/swarm/src/handler/select.rs +++ b/swarm/src/handler/select.rs @@ -227,8 +227,8 @@ where >, > { match self.proto1.poll(cx) { - Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { - return Poll::Ready(ConnectionHandlerEvent::Custom(Either::Left(event))); + Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Either::Left(event))); } Poll::Ready(ConnectionHandlerEvent::Close(event)) => { return Poll::Ready(ConnectionHandlerEvent::Close(Either::Left(event))); @@ -247,8 +247,8 @@ where }; match self.proto2.poll(cx) { - Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { - return Poll::Ready(ConnectionHandlerEvent::Custom(Either::Right(event))); + Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Either::Right(event))); } Poll::Ready(ConnectionHandlerEvent::Close(event)) => { return Poll::Ready(ConnectionHandlerEvent::Close(Either::Right(event))); From ae88c6c9d3eae9ee7e6b132ecab01a1b7a301ebb Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Tue, 16 May 2023 20:43:15 +0200 Subject: [PATCH 2/3] add changelog --- swarm/CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 159c61af9b6..1284f57cffa 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -50,6 +50,8 @@ - Remove deprecated `NetworkBehaviourAction` type. See [PR 3919]. +- Rename `ConnectionHandlerEvent::Custom` to `ConnectionHandlerEvent::NotifyBehaviour`. See [PR 3955]. + [PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605 [PR 3651]: https://github.com/libp2p/rust-libp2p/pull/3651 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 @@ -62,6 +64,7 @@ [PR 3886]: https://github.com/libp2p/rust-libp2p/pull/3886 [PR 3912]: https://github.com/libp2p/rust-libp2p/pull/3912 [PR 3919]: https://github.com/libp2p/rust-libp2p/pull/3919 +[PR 3955]: https://github.com/libp2p/rust-libp2p/pull/3955 ## 0.42.2 From c3c88af1066aa198b904d421335deffa857fe5cd Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Tue, 16 May 2023 20:47:41 +0200 Subject: [PATCH 3/3] fix fmt --- swarm/src/handler/select.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/swarm/src/handler/select.rs b/swarm/src/handler/select.rs index 69888da1a01..65db4ab525b 100644 --- a/swarm/src/handler/select.rs +++ b/swarm/src/handler/select.rs @@ -248,7 +248,9 @@ where match self.proto2.poll(cx) { Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Either::Right(event))); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Either::Right( + event, + ))); } Poll::Ready(ConnectionHandlerEvent::Close(event)) => { return Poll::Ready(ConnectionHandlerEvent::Close(Either::Right(event)));