Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swarm): rename Custom variant to NotifyBehaviour #3955

Merged
merged 3 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion protocols/dcutr/src/handler/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl ConnectionHandler for Handler {
> {
if !self.reported {
self.reported = true;
return Poll::Ready(ConnectionHandlerEvent::Custom(
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::DirectConnectionEstablished,
));
}
Expand Down
48 changes: 26 additions & 22 deletions protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,13 @@ impl Handler {
ConnectedPoint::Dialer { address, role_override: _ } => address.clone(),
ConnectedPoint::Listener { ..} => unreachable!("`<Handler as ConnectionHandler>::listen_protocol` denies all incoming substreams as a listener."),
};
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::InboundConnectRequest {
inbound_connect: Box::new(inbound_connect),
remote_addr,
},
));
self.queued_events
.push_back(ConnectionHandlerEvent::NotifyBehaviour(
Event::InboundConnectRequest {
inbound_connect: Box::new(inbound_connect),
remote_addr,
},
));
}
// A connection listener denies all incoming substreams, thus none can ever be fully negotiated.
future::Either::Right(output) => void::unreachable(output),
Expand All @@ -197,11 +198,12 @@ impl Handler {
self.endpoint.is_listener(),
"A connection dialer never initiates a connection upgrade."
);
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::OutboundConnectNegotiated {
remote_addrs: obs_addrs,
},
));
self.queued_events
.push_back(ConnectionHandlerEvent::NotifyBehaviour(
Event::OutboundConnectNegotiated {
remote_addrs: obs_addrs,
},
));
}

fn on_listen_upgrade_error(
Expand All @@ -228,21 +230,23 @@ impl Handler {

match error {
StreamUpgradeError::Timeout => {
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::OutboundNegotiationFailed {
error: StreamUpgradeError::Timeout,
},
));
self.queued_events
.push_back(ConnectionHandlerEvent::NotifyBehaviour(
Event::OutboundNegotiationFailed {
error: StreamUpgradeError::Timeout,
},
));
}
StreamUpgradeError::NegotiationFailed => {
// The remote merely doesn't support the DCUtR protocol.
// This is no reason to close the connection, which may
// successfully communicate with other protocols already.
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::OutboundNegotiationFailed {
error: StreamUpgradeError::NegotiationFailed,
},
));
self.queued_events
.push_back(ConnectionHandlerEvent::NotifyBehaviour(
Event::OutboundNegotiationFailed {
error: StreamUpgradeError::NegotiationFailed,
},
));
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
Expand Down Expand Up @@ -342,7 +346,7 @@ impl ConnectionHandler for Handler {
self.inbound_connect = None;
match result {
Ok(addresses) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::InboundConnectNegotiated(addresses),
));
}
Expand Down
14 changes: 7 additions & 7 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,9 @@ impl EnabledHandler {
if !self.peer_kind_sent {
if let Some(peer_kind) = self.peer_kind.as_ref() {
self.peer_kind_sent = true;
return Poll::Ready(ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind(
peer_kind.clone(),
)));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::PeerKind(peer_kind.clone()),
));
}
}

Expand All @@ -261,7 +261,7 @@ impl EnabledHandler {
self.last_io_activity = Instant::now();
self.inbound_substream =
Some(InboundSubstreamState::WaitingInput(substream));
return Poll::Ready(ConnectionHandlerEvent::Custom(message));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(message));
}
Poll::Ready(Some(Err(error))) => {
log::debug!("Failed to read from inbound stream: {error}");
Expand Down Expand Up @@ -466,9 +466,9 @@ impl ConnectionHandler for Handler {
Handler::Disabled(DisabledHandler::ProtocolUnsupported { peer_kind_sent }) => {
if !*peer_kind_sent {
*peer_kind_sent = true;
return Poll::Ready(ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind(
PeerKind::NotSupported,
)));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::PeerKind(PeerKind::NotSupported),
));
}

Poll::Pending
Expand Down
21 changes: 11 additions & 10 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,13 @@ impl Handler {
future::Either::Left(remote_info) => {
self.update_supported_protocols_for_remote(&remote_info);
self.events
.push(ConnectionHandlerEvent::Custom(Event::Identified(
.push(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
remote_info,
)));
}
future::Either::Right(()) => self
.events
.push(ConnectionHandlerEvent::Custom(Event::IdentificationPushed)),
future::Either::Right(()) => self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationPushed,
)),
}
}

Expand All @@ -192,10 +192,9 @@ impl Handler {
>,
) {
let err = err.map_upgrade_err(|e| e.into_inner());
self.events
.push(ConnectionHandlerEvent::Custom(Event::IdentificationError(
err,
)));
self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(err),
));
self.trigger_next_identify.reset(self.interval);
}

Expand Down Expand Up @@ -309,7 +308,9 @@ impl ConnectionHandler for Handler {

if let Ok(info) = res {
self.update_supported_protocols_for_remote(&info);
return Poll::Ready(ConnectionHandlerEvent::Custom(Event::Identified(info)));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
info,
)));
}
}

Expand All @@ -319,7 +320,7 @@ impl ConnectionHandler for Handler {
.map(|()| Event::Identification)
.unwrap_or_else(|err| Event::IdentificationError(StreamUpgradeError::Apply(err)));

return Poll::Ready(ConnectionHandlerEvent::Custom(event));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
}

Poll::Pending
Expand Down
48 changes: 29 additions & 19 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ where
> {
if let ProtocolStatus::Confirmed = self.protocol_status {
self.protocol_status = ProtocolStatus::Reported;
return Poll::Ready(ConnectionHandlerEvent::Custom(
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::ProtocolConfirmed {
endpoint: self.endpoint.clone(),
},
Expand Down Expand Up @@ -826,7 +826,7 @@ where
Err(error) => {
*this = OutboundSubstreamState::Done;
let event = user_data.map(|user_data| {
ConnectionHandlerEvent::Custom(
ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
user_data,
Expand All @@ -844,10 +844,12 @@ where
Poll::Ready(Err(error)) => {
*this = OutboundSubstreamState::Done;
let event = user_data.map(|user_data| {
ConnectionHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
user_data,
})
ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
user_data,
},
)
});

return Poll::Ready(event);
Expand All @@ -870,10 +872,12 @@ where
Poll::Ready(Err(error)) => {
*this = OutboundSubstreamState::Done;
let event = user_data.map(|user_data| {
ConnectionHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
user_data,
})
ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
user_data,
},
)
});

return Poll::Ready(event);
Expand All @@ -886,7 +890,9 @@ where
*this = OutboundSubstreamState::Closing(substream);
let event = process_kad_response(msg, user_data);

return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event)));
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
event,
)));
}
Poll::Pending => {
*this = OutboundSubstreamState::WaitingAnswer(substream, user_data);
Expand All @@ -899,7 +905,9 @@ where
user_data,
};

return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event)));
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
event,
)));
}
Poll::Ready(None) => {
*this = OutboundSubstreamState::Done;
Expand All @@ -910,15 +918,17 @@ where
user_data,
};

return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event)));
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
event,
)));
}
}
}
OutboundSubstreamState::ReportError(error, user_data) => {
*this = OutboundSubstreamState::Done;
let event = KademliaHandlerEvent::QueryError { error, user_data };

return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event)));
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(event)));
}
OutboundSubstreamState::Closing(mut stream) => match stream.poll_close_unpin(cx) {
Poll::Ready(Ok(())) | Poll::Ready(Err(_)) => return Poll::Ready(None),
Expand Down Expand Up @@ -971,7 +981,7 @@ where
Poll::Ready(Some(Ok(KadRequestMsg::FindNode { key }))) => {
*this =
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::FindNodeReq {
key,
request_id: KademliaRequestId {
Expand All @@ -983,7 +993,7 @@ where
Poll::Ready(Some(Ok(KadRequestMsg::GetProviders { key }))) => {
*this =
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::GetProvidersReq {
key,
request_id: KademliaRequestId {
Expand All @@ -998,14 +1008,14 @@ where
connection_id,
substream,
};
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::AddProvider { key, provider },
)));
}
Poll::Ready(Some(Ok(KadRequestMsg::GetValue { key }))) => {
*this =
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::GetRecord {
key,
request_id: KademliaRequestId {
Expand All @@ -1017,7 +1027,7 @@ where
Poll::Ready(Some(Ok(KadRequestMsg::PutValue { record }))) => {
*this =
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::PutRecord {
record,
request_id: KademliaRequestId {
Expand Down
4 changes: 2 additions & 2 deletions protocols/perf/src/client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl ConnectionHandler for Handler {
.pop_front()
.expect("requested stream without pending command");
self.queued_events
.push_back(ConnectionHandlerEvent::Custom(Event {
.push_back(ConnectionHandlerEvent::NotifyBehaviour(Event {
id,
result: Err(error),
}));
Expand Down Expand Up @@ -179,7 +179,7 @@ impl ConnectionHandler for Handler {

while let Poll::Ready(Some(result)) = self.outbound.poll_next_unpin(cx) {
match result {
Ok(event) => return Poll::Ready(ConnectionHandlerEvent::Custom(event)),
Ok(event) => return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)),
Err(e) => {
panic!("{e:?}")
}
Expand Down
4 changes: 3 additions & 1 deletion protocols/perf/src/server/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ impl ConnectionHandler for Handler {
> {
while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) {
match result {
Ok(stats) => return Poll::Ready(ConnectionHandlerEvent::Custom(Event { stats })),
Ok(stats) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event { stats }))
}
Err(e) => {
error!("{e:?}")
}
Expand Down
14 changes: 8 additions & 6 deletions protocols/ping/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,9 @@ impl ConnectionHandler for Handler {
}
State::Inactive { reported: false } => {
self.state = State::Inactive { reported: true };
return Poll::Ready(ConnectionHandlerEvent::Custom(Err(Failure::Unsupported)));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(
Failure::Unsupported,
)));
}
State::Active => {}
}
Expand All @@ -274,7 +276,7 @@ impl ConnectionHandler for Handler {
Poll::Ready(Ok(stream)) => {
// A ping from a remote peer has been answered, wait for the next.
self.inbound = Some(protocol::recv_ping(stream).boxed());
return Poll::Ready(ConnectionHandlerEvent::Custom(Ok(Success::Pong)));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(Success::Pong)));
}
}
}
Expand All @@ -299,7 +301,7 @@ impl ConnectionHandler for Handler {
return Poll::Ready(ConnectionHandlerEvent::Close(error));
}

return Poll::Ready(ConnectionHandlerEvent::Custom(Err(error)));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(error)));
}
}

Expand All @@ -318,9 +320,9 @@ impl ConnectionHandler for Handler {
self.failures = 0;
self.timer.reset(self.config.interval);
self.outbound = Some(OutboundState::Idle(stream));
return Poll::Ready(ConnectionHandlerEvent::Custom(Ok(Success::Ping {
rtt,
})));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(
Success::Ping { rtt },
)));
}
Poll::Ready(Err(e)) => {
self.pending_errors
Expand Down
Loading