Skip to content

Commit

Permalink
core/src/connection: Report ConnectionLimit through task
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Aug 18, 2021
1 parent 5c2aef6 commit ce0d278
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 50 deletions.
16 changes: 8 additions & 8 deletions core/src/connection/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub enum ConnectionError<THandlerErr> {
// TODO: Eventually this should also be a custom error?
IO(io::Error),

/// The connection was dropped because the connection limit
/// for a peer has been reached.
ConnectionLimit(ConnectionLimit),

/// The connection handler produced an error.
Handler(THandlerErr),
}
Expand All @@ -41,6 +45,9 @@ where
match self {
ConnectionError::IO(err) => write!(f, "Connection error: I/O error: {}", err),
ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {}", err),
ConnectionError::ConnectionLimit(l) => {
write!(f, "Connection error: Connection limit: {}.", l)
}
}
}
}
Expand All @@ -53,6 +60,7 @@ where
match self {
ConnectionError::IO(err) => Some(err),
ConnectionError::Handler(err) => Some(err),
ConnectionError::ConnectionLimit(..) => None,
}
}
}
Expand All @@ -67,10 +75,6 @@ pub enum PendingConnectionError<TTransErr> {
/// match the one that was expected or is otherwise invalid.
InvalidPeerId,

/// The connection was dropped because the connection limit
/// for a peer has been reached.
ConnectionLimit(ConnectionLimit),

/// An I/O error occurred on the connection.
// TODO: Eventually this should also be a custom error?
IO(io::Error),
Expand All @@ -89,9 +93,6 @@ where
PendingConnectionError::InvalidPeerId => {
write!(f, "Pending connection: Invalid peer ID.")
}
PendingConnectionError::ConnectionLimit(l) => {
write!(f, "Connection error: Connection limit: {}.", l)
}
}
}
}
Expand All @@ -105,7 +106,6 @@ where
PendingConnectionError::IO(err) => Some(err),
PendingConnectionError::Transport(err) => Some(err),
PendingConnectionError::InvalidPeerId => None,
PendingConnectionError::ConnectionLimit(..) => None,
}
}
}
6 changes: 3 additions & 3 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use super::{
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
Connected, ConnectedPoint, Connection, ConnectionError, ConnectionHandler,
Connected, ConnectedPoint, Connection, ConnectionError, ConnectionHandler, ConnectionLimit,
IntoConnectionHandler, PendingConnectionError, Substream,
};
use crate::{muxing::StreamMuxer, Executor};
Expand Down Expand Up @@ -473,15 +473,15 @@ impl<'a, I> EstablishedEntry<'a, I> {
///
/// When the connection is ultimately closed, [`Event::ConnectionClosed`]
/// is emitted by [`Manager::poll`].
pub fn start_close(mut self) {
pub fn start_close(mut self, error: Option<ConnectionLimit>) {
// Clone the sender so that we are guaranteed to have
// capacity for the close command (every sender gets a slot).
match self
.task
.get_mut()
.sender
.clone()
.try_send(task::Command::Close)
.try_send(task::Command::Close(error))
{
Ok(()) => {}
Err(e) => assert!(e.is_disconnected(), "No capacity for close command."),
Expand Down
19 changes: 13 additions & 6 deletions core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::{
connection::{
self,
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
Close, Connected, Connection, ConnectionError, ConnectionHandler, IntoConnectionHandler,
PendingConnectionError, Substream,
Close, Connected, Connection, ConnectionError, ConnectionHandler, ConnectionLimit,
IntoConnectionHandler, PendingConnectionError, Substream,
},
muxing::StreamMuxer,
Multiaddr,
Expand All @@ -43,7 +43,7 @@ pub enum Command<T> {
NotifyHandler(T),
/// Gracefully close the connection (active close) before
/// terminating the task.
Close,
Close(Option<ConnectionLimit>),
}

/// Events that a task can emit to its manager.
Expand Down Expand Up @@ -178,7 +178,11 @@ where
},

/// The connection is closing (active close).
Closing { closing_muxer: Close<M>, handler: H::Handler },
Closing {
closing_muxer: Close<M>,
handler: H::Handler,
error: Option<ConnectionLimit>,
},

/// The task is terminating with a final event for the `Manager`.
Terminating(Event<H, E>),
Expand Down Expand Up @@ -263,14 +267,15 @@ where
Poll::Ready(Some(Command::NotifyHandler(event))) => {
connection.inject_event(event)
}
Poll::Ready(Some(Command::Close)) => {
Poll::Ready(Some(Command::Close(error))) => {
// Don't accept any further commands.
this.commands.get_mut().close();
// Discard the event, if any, and start a graceful close.
let (handler, closing_muxer) = connection.close();
this.state = State::Closing {
handler,
closing_muxer,
error,
};
continue 'poll;
}
Expand Down Expand Up @@ -346,14 +351,15 @@ where

State::Closing {
handler,
error,
mut closing_muxer,
} => {
// Try to gracefully close the connection.
match closing_muxer.poll_unpin(cx) {
Poll::Ready(Ok(())) => {
let event = Event::Closed {
id: this.id,
error: None,
error: error.map(|limit| ConnectionError::ConnectionLimit(limit)),
handler,
};
this.state = State::Terminating(event);
Expand All @@ -369,6 +375,7 @@ where
Poll::Pending => {
this.state = State::Closing {
handler,
error,
closing_muxer,
};
return Poll::Pending;
Expand Down
34 changes: 6 additions & 28 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

use crate::{
connection::{
self,

handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
manager::{self, Manager, ManagerConfig},
Connected, Connection, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit,
Connected, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit,
IncomingInfo, IntoConnectionHandler, OutgoingInfo, PendingConnectionError, Substream,
},
muxing::StreamMuxer,
Expand Down Expand Up @@ -385,7 +385,7 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {
// Count upwards because we push to / pop from the end. See also `Pool::poll`.
for (&id, endpoint) in conns.iter() {
if let Some(manager::Entry::Established(e)) = self.manager.entry(id) {
e.start_close();
e.start_close(None);
// TODO: I removed the disconnected logic, thus depending on start_close to
// eventually trigger a ConnectionClosed event. Make sure that is the case and
// also that the num_established counters are kept consistent.
Expand Down Expand Up @@ -549,37 +549,15 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {

// Check general established connection limit.
if let Err(e) = self.counters.check_max_established(&endpoint) {
// TODO: Good idea? How should we let the user know that the close
// happened due to a conneciton limit?
entry.start_close();
// let connected = entry.remove();
// return Poll::Ready(PoolEvent::PendingConnectionError {
// id,
// endpoint: connected.endpoint,
// error: PendingConnectionError::ConnectionLimit(e),
// handler: None,
// peer,
// pool: self,
// });
entry.start_close(Some(e));
continue;
}

// Check per-peer established connection limit.
let current =
num_peer_established(&self.established, &entry.connected().peer_id);
if let Err(e) = self.counters.check_max_established_per_peer(current) {
// TODO: Good idea? How should we let the user know that the close
// happened due to a conneciton limit?
entry.start_close();
// let connected = entry.remove();
// return Poll::Ready(PoolEvent::PendingConnectionError {
// id,
// endpoint: connected.endpoint,
// error: PendingConnectionError::ConnectionLimit(e),
// handler: None,
// peer,
// pool: self,
// });
entry.start_close(Some(e));
continue;
}

Expand Down Expand Up @@ -769,7 +747,7 @@ impl<TInEvent> EstablishedConnection<'_, TInEvent> {
///
/// Has no effect if the connection is already closing.
pub fn start_close(self) {
self.entry.start_close()
self.entry.start_close(None)
}
}

Expand Down
3 changes: 1 addition & 2 deletions core/src/network/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,7 @@ where
error,
} => f
.debug_struct("DialError")
// TODO: Bring back.
// .field("attempts_remaining", attempts_remaining)
.field("attempts_remaining", &Into::<u32>::into(attempts_remaining))
.field("peer_id", peer_id)
.field("multiaddr", multiaddr)
.field("error", error)
Expand Down
6 changes: 3 additions & 3 deletions core/src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
use super::{DialError, DialingOpts, Network};
use crate::{
connection::{
handler::THandlerInEvent, pool::Pool, Connected, ConnectedPoint, Connection,
ConnectionHandler, ConnectionId, ConnectionLimit, EstablishedConnection,
EstablishedConnectionIter, IntoConnectionHandler, PendingConnection, Substream,
handler::THandlerInEvent, pool::Pool, ConnectedPoint, ConnectionHandler, ConnectionId,
ConnectionLimit, EstablishedConnection, EstablishedConnectionIter, IntoConnectionHandler,
PendingConnection, Substream,
},
Multiaddr, PeerId, StreamMuxer, Transport,
};
Expand Down

0 comments on commit ce0d278

Please sign in to comment.