Skip to content

Commit

Permalink
core/: Return handler on connection error and closed
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Aug 12, 2021
1 parent ce23cbe commit a332591
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 48 deletions.
6 changes: 4 additions & 2 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,12 @@ where
self.handler.inject_event(event);
}

// TODO: Update comment
//
/// Begins an orderly shutdown of the connection, returning a
/// `Future` that resolves when connection shutdown is complete.
pub fn close(self) -> Close<TMuxer> {
self.muxing.close().0
pub fn close(self) -> (THandler, Close<TMuxer>) {
(self.handler, self.muxing.close().0)
}

/// Polls the connection for events produced by the associated handler
Expand Down
4 changes: 3 additions & 1 deletion core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ pub enum Event<'a, H: IntoConnectionHandler, TE> {
/// The error that occurred, if any. If `None`, the connection
/// has been actively closed.
error: Option<ConnectionError<THandlerError<H>>>,
handler: H::Handler,
},

/// A connection has been established.
Expand Down Expand Up @@ -384,14 +385,15 @@ impl<H: IntoConnectionHandler, TE> Manager<H, TE> {
new_endpoint: new,
}
}
task::Event::Closed { id, error } => {
task::Event::Closed { id, error, handler } => {
let id = ConnectionId(id);
let task = task.remove();
match task.state {
TaskState::Established(connected) => Event::ConnectionClosed {
id,
connected,
error,
handler,
},
TaskState::Pending => unreachable!(
"`Event::Closed` implies (2) occurred on that task and thus (3)."
Expand Down
26 changes: 21 additions & 5 deletions core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub enum Event<H: IntoConnectionHandler, TE> {
Closed {
id: TaskId,
error: Option<ConnectionError<THandlerError<H>>>,
handler: H::Handler,
},
}

Expand Down Expand Up @@ -177,7 +178,7 @@ where
},

/// The connection is closing (active close).
Closing(Close<M>),
Closing { closing_muxer: Close<M>, handler: H::Handler },

/// The task is terminating with a final event for the `Manager`.
Terminating(Event<H, E>),
Expand Down Expand Up @@ -265,7 +266,11 @@ where
// Don't accept any further commands.
this.commands.get_mut().close();
// Discard the event, if any, and start a graceful close.
this.state = State::Closing(connection.close());
let (handler, closing_muxer) = connection.close();
this.state = State::Closing {
handler,
closing_muxer,
};
continue 'poll;
}
Poll::Ready(None) => {
Expand Down Expand Up @@ -324,36 +329,47 @@ where
Poll::Ready(Err(error)) => {
// Don't accept any further commands.
this.commands.get_mut().close();
// TODO: Good idea if there is already an error?
let (handler, _) = connection.close();
// Terminate the task with the error, dropping the connection.
let event = Event::Closed {
id,
error: Some(error),
handler,
};
this.state = State::Terminating(event);
}
}
}
}

State::Closing(mut closing) => {
State::Closing {
handler,
mut closing_muxer,
} => {
// Try to gracefully close the connection.
match closing.poll_unpin(cx) {
match closing_muxer.poll_unpin(cx) {
Poll::Ready(Ok(())) => {
let event = Event::Closed {
id: this.id,
error: None,
handler,
};
this.state = State::Terminating(event);
}
Poll::Ready(Err(e)) => {
let event = Event::Closed {
id: this.id,
error: Some(ConnectionError::IO(e)),
handler,
};
this.state = State::Terminating(event);
}
Poll::Pending => {
this.state = State::Closing(closing);
this.state = State::Closing {
handler,
closing_muxer,
};
return Poll::Pending;
}
}
Expand Down
52 changes: 32 additions & 20 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ pub enum PoolEvent<'a, THandler: IntoConnectionHandler, TTransErr> {
pool: &'a mut Pool<THandler, TTransErr>,
/// The remaining number of established connections to the same peer.
num_established: u32,
handler: THandler::Handler,
},

/// A connection attempt failed.
Expand All @@ -114,7 +115,7 @@ pub enum PoolEvent<'a, THandler: IntoConnectionHandler, TTransErr> {
error: PendingConnectionError<TTransErr>,
/// The handler that was supposed to handle the connection,
/// if the connection failed before the handler was consumed.
handler: Option<THandler>,
handler: THandler,
/// The (expected) peer of the failed connection.
peer: Option<PeerId>,
/// A reference to the pool that managed the connection.
Expand Down Expand Up @@ -554,6 +555,7 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {
num_established,
error: None,
pool: self,
handler: todo!(),
});
}

Expand All @@ -572,7 +574,7 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {
id,
endpoint,
error,
handler: Some(handler),
handler: handler,
peer,
pool: self,
});
Expand All @@ -582,6 +584,7 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {
id,
connected,
error,
handler,
} => {
let num_established =
if let Some(conns) = self.established.get_mut(&connected.peer_id) {
Expand All @@ -601,6 +604,7 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {
error,
num_established,
pool: self,
handler,
});
}
manager::Event::ConnectionEstablished { entry } => {
Expand All @@ -610,30 +614,38 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {

// Check general established connection limit.
if let Err(e) = self.counters.check_max_established(&endpoint) {
let connected = entry.remove();
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint: connected.endpoint,
error: PendingConnectionError::ConnectionLimit(e),
handler: None,
peer,
pool: self,
});
// TODO: Good idea? How should we let the user know that the close
// happened due to a conneciton limit?
entry.start_close();
// let connected = entry.remove();
// return Poll::Ready(PoolEvent::PendingConnectionError {
// id,
// endpoint: connected.endpoint,
// error: PendingConnectionError::ConnectionLimit(e),
// handler: None,
// peer,
// pool: self,
// });
continue;
}

// Check per-peer established connection limit.
let current =
num_peer_established(&self.established, &entry.connected().peer_id);
if let Err(e) = self.counters.check_max_established_per_peer(current) {
let connected = entry.remove();
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint: connected.endpoint,
error: PendingConnectionError::ConnectionLimit(e),
handler: None,
peer,
pool: self,
});
// TODO: Good idea? How should we let the user know that the close
// happened due to a conneciton limit?
entry.start_close();
// let connected = entry.remove();
// return Poll::Ready(PoolEvent::PendingConnectionError {
// id,
// endpoint: connected.endpoint,
// error: PendingConnectionError::ConnectionLimit(e),
// handler: None,
// peer,
// pool: self,
// });
continue;
}

// Peer ID checks must already have happened. See `add_pending`.
Expand Down
37 changes: 19 additions & 18 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mod event;
pub mod peer;

pub use crate::connection::{ConnectionCounters, ConnectionLimits};
pub use event::{IncomingConnection, NetworkEvent};
pub use event::{IncomingConnection, NetworkEvent, DialAttemptsRemaining};
pub use peer::Peer;

use crate::{
Expand Down Expand Up @@ -438,19 +438,22 @@ where
log::warn!("Dialing aborted: {:?}", e);
}
}
// TODO: Include handler in event.
event
}
Poll::Ready(PoolEvent::ConnectionClosed {
id,
connected,
error,
num_established,
handler,
..
}) => NetworkEvent::ConnectionClosed {
id,
connected,
num_established,
error,
handler,
},
Poll::Ready(PoolEvent::ConnectionEvent { connection, event }) => {
NetworkEvent::ConnectionEvent { connection, event }
Expand Down Expand Up @@ -563,7 +566,7 @@ fn on_connection_failed<'a, TTrans, THandler>(
id: ConnectionId,
endpoint: ConnectedPoint,
error: PendingConnectionError<TTrans::Error>,
handler: Option<THandler>,
handler: THandler,
) -> (
Option<DialingOpts<PeerId, THandler>>,
NetworkEvent<'a, TTrans, THandlerInEvent<THandler>, THandlerOutEvent<THandler>, THandler>,
Expand Down Expand Up @@ -592,27 +595,21 @@ where
let failed_addr = attempt.current.1.clone();

let (opts, attempts_remaining) = if num_remain > 0 {
if let Some(handler) = handler {
let next_attempt = attempt.remaining.remove(0);
let opts = DialingOpts {
peer: peer_id,
handler,
address: next_attempt,
remaining: attempt.remaining,
};
(Some(opts), num_remain)
} else {
// The error is "fatal" for the dialing attempt, since
// the handler was already consumed. All potential
// remaining connection attempts are thus void.
(None, 0)
}
let next_attempt = attempt.remaining.remove(0);
let opts = DialingOpts {
peer: peer_id,
handler,
address: next_attempt,
remaining: attempt.remaining,
};
(Some(opts), DialAttemptsRemaining::Some(num_remain))
} else {
(None, 0)
(None, DialAttemptsRemaining::None(handler))
};

(
opts,
// TODO: This is the place to return the handler.
NetworkEvent::DialError {
attempts_remaining,
peer_id,
Expand All @@ -625,20 +622,24 @@ where
match endpoint {
ConnectedPoint::Dialer { address } => (
None,
// TODO: This is the place to return the handler.
NetworkEvent::UnknownPeerDialError {
multiaddr: address,
error,
handler,
},
),
ConnectedPoint::Listener {
local_addr,
send_back_addr,
} => (
None,
// TODO: This is the place to return the handler.
NetworkEvent::IncomingConnectionError {
local_addr,
send_back_addr,
error,
handler,
},
),
}
Expand Down
17 changes: 15 additions & 2 deletions core/src/network/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ where
send_back_addr: Multiaddr,
/// The error that happened.
error: PendingConnectionError<TTrans::Error>,
handler: THandler,
},

/// A new connection to a peer has been established.
Expand Down Expand Up @@ -124,12 +125,13 @@ where
error: Option<ConnectionError<<THandler::Handler as ConnectionHandler>::Error>>,
/// The remaining number of established connections to the same peer.
num_established: u32,
handler: THandler::Handler,
},

/// A dialing attempt to an address of a peer failed.
DialError {
/// The number of remaining dialing attempts.
attempts_remaining: u32,
attempts_remaining: DialAttemptsRemaining<THandler>,

/// Id of the peer we were trying to dial.
peer_id: PeerId,
Expand All @@ -148,6 +150,8 @@ where

/// The error that happened.
error: PendingConnectionError<TTrans::Error>,

handler: THandler,
},

/// An established connection produced an event.
Expand All @@ -169,6 +173,12 @@ where
},
}

pub enum DialAttemptsRemaining<THandler> {
// TODO: Make this a NonZeroU32.
Some(u32),
None(THandler),
}

impl<TTrans, TInEvent, TOutEvent, THandler> fmt::Debug
for NetworkEvent<'_, TTrans, TInEvent, TOutEvent, THandler>
where
Expand Down Expand Up @@ -221,6 +231,8 @@ where
local_addr,
send_back_addr,
error,
// TODO: Should this be printed as well?
handler: _,
} => f
.debug_struct("IncomingConnectionError")
.field("local_addr", local_addr)
Expand Down Expand Up @@ -249,7 +261,8 @@ where
error,
} => f
.debug_struct("DialError")
.field("attempts_remaining", attempts_remaining)
// TODO: Bring back.
// .field("attempts_remaining", attempts_remaining)
.field("peer_id", peer_id)
.field("multiaddr", multiaddr)
.field("error", error)
Expand Down

0 comments on commit a332591

Please sign in to comment.