Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: Remove NotifyHandler::All #1880

Merged
merged 3 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ impl From<ProtocolsHandlerUpgrErr<io::Error>> for KademliaHandlerQueryErr {
}

/// Event to send to the handler.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub enum KademliaHandlerIn<TUserData> {
/// Resets the (sub)stream associated with the given request ID,
/// thus signaling an error to the remote.
Expand Down Expand Up @@ -429,7 +429,7 @@ pub enum KademliaHandlerIn<TUserData> {

/// Unique identifier for a request. Must be passed back in order to answer a request from
/// the remote.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq)]
pub struct KademliaRequestId {
/// Unique identifier for an incoming connection.
connec_unique_id: UniqueConnecId,
Expand Down
2 changes: 1 addition & 1 deletion protocols/request-response/src/handler/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ where
/// Request substream upgrade protocol.
///
/// Sends a request and receives a response.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct RequestProtocol<TCodec>
where
TCodec: RequestResponseCodec
Expand Down
8 changes: 3 additions & 5 deletions swarm/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ pub trait NetworkBehaviourEventProcess<TEvent> {
/// in whose context it is executing.
///
/// [`Swarm`]: super::Swarm
#[derive(Debug, Clone)]
#[derive(Debug)]
pub enum NetworkBehaviourAction<TInEvent, TOutEvent> {
/// Instructs the `Swarm` to return an event when it is being polled.
GenerateEvent(TOutEvent),
Expand Down Expand Up @@ -264,7 +264,7 @@ pub enum NetworkBehaviourAction<TInEvent, TOutEvent> {
NotifyHandler {
/// The peer for whom a `ProtocolsHandler` should be notified.
peer_id: PeerId,
/// The ID of the connection whose `ProtocolsHandler` to notify.
/// The options w.r.t. which connection handler to notify of the event.
handler: NotifyHandler,
/// The event to send.
event: TInEvent,
Expand Down Expand Up @@ -325,15 +325,13 @@ impl<TInEvent, TOutEvent> NetworkBehaviourAction<TInEvent, TOutEvent> {
}
}

/// The options w.r.t. which connection handlers to notify of an event.
/// The options w.r.t. which connection handler to notify of an event.
#[derive(Debug, Clone)]
pub enum NotifyHandler {
/// Notify a particular connection handler.
One(ConnectionId),
/// Notify an arbitrary connection handler.
Any,
/// Notify all connection handlers.
All
}

/// The available conditions under which a new dialing attempt to
Expand Down
81 changes: 8 additions & 73 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ where
impl<TBehaviour, TInEvent, TOutEvent, THandler, THandleErr>
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
TInEvent: Clone + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: IntoProtocolsHandler + Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandleErr>,
Expand Down Expand Up @@ -661,13 +661,6 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
return Poll::Pending
}
}
PendingNotifyHandler::All(ids) => {
if let Some((event, ids)) = notify_all(ids, &mut peer, event, cx) {
let handler = PendingNotifyHandler::All(ids);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
}
}
}
}
}
Expand Down Expand Up @@ -747,14 +740,6 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
return Poll::Pending
}
}
NotifyHandler::All => {
let ids = peer.connections().into_ids().collect();
if let Some((event, ids)) = notify_all(ids, &mut peer, event, cx) {
let handler = PendingNotifyHandler::All(ids);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
}
}
}
}
},
Expand All @@ -771,16 +756,15 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
}
}

/// Connections to notify of a pending event.
/// Connection to notify of a pending event.
///
/// The connection IDs to notify of an event are captured at the time
/// the behaviour emits the event, in order not to forward the event
/// to new connections which the behaviour may not have been aware of
/// at the time it issued the request for sending it.
/// The connection IDs out of which to notify one of an event are captured at
/// the time the behaviour emits the event, in order not to forward the event to
/// a new connection which the behaviour may not have been aware of at the time
/// it issued the request for sending it.
enum PendingNotifyHandler {
One(ConnectionId),
Any(SmallVec<[ConnectionId; 10]>),
All(SmallVec<[ConnectionId; 10]>),
}

/// Notify a single connection of an event.
Expand Down Expand Up @@ -855,60 +839,11 @@ where
})
}

/// Notify all of the given connections of a peer of an event.
///
/// Returns `Some` with the given event and a new list of connections if
/// at least one of the given connections is currently not able to receive
/// the event, in which case the current task is scheduled to be woken up and
/// the returned connections are those which still need to receive the event.
///
/// Returns `None` if all connections are either closing or the event
/// was successfully sent to all handlers whose connections are not closing,
/// in either case the event is consumed.
fn notify_all<'a, TTrans, TInEvent, TOutEvent, THandler>(
ids: SmallVec<[ConnectionId; 10]>,
peer: &mut ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>,
event: TInEvent,
cx: &mut Context<'_>,
) -> Option<(TInEvent, SmallVec<[ConnectionId; 10]>)>
where
TTrans: Transport,
TInEvent: Clone,
THandler: IntoConnectionHandler,
{
if ids.len() == 1 {
if let Some(mut conn) = peer.connection(ids[0]) {
return notify_one(&mut conn, event, cx).map(|e| (e, ids))
}
}

let mut pending = SmallVec::new();
for id in ids.into_iter() {
if let Some(mut conn) = peer.connection(id) {
match conn.poll_ready_notify_handler(cx) {
Poll::Pending => pending.push(id),
Poll::Ready(Ok(())) => {
// Can now only fail due to the connection suddenly closing,
// which we ignore.
let _ = conn.notify_handler(event.clone());
},
Poll::Ready(Err(())) => {} // connection is closing
}
}
}

if !pending.is_empty() {
return Some((event, pending))
}

None
}

impl<TBehaviour, TInEvent, TOutEvent, THandler> Stream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Clone + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
{
Expand All @@ -929,7 +864,7 @@ impl<TBehaviour, TInEvent, TOutEvent, THandler> FusedStream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Clone + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
{
Expand Down