Skip to content

Commit

Permalink
core/: Emit Event::Failed on aborted pending connection
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Aug 19, 2021
1 parent ce0d278 commit 425e777
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 70 deletions.
5 changes: 5 additions & 0 deletions core/src/connection/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ pub enum PendingConnectionError<TTransErr> {
/// An error occurred while negotiating the transport protocol(s).
Transport(TransportError<TTransErr>),

/// Pending connection attempt has been aborted.
Aborted,

/// The peer identity obtained on the connection did not
/// match the one that was expected or is otherwise invalid.
InvalidPeerId,
Expand All @@ -87,6 +90,7 @@ where
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PendingConnectionError::IO(err) => write!(f, "Pending connection: I/O error: {}", err),
PendingConnectionError::Aborted => write!(f, "Pending connection: Aborted."),
PendingConnectionError::Transport(err) => {
write!(f, "Pending connection: Transport error: {}", err)
}
Expand All @@ -106,6 +110,7 @@ where
PendingConnectionError::IO(err) => Some(err),
PendingConnectionError::Transport(err) => Some(err),
PendingConnectionError::InvalidPeerId => None,
PendingConnectionError::Aborted => None,
}
}
}
48 changes: 1 addition & 47 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use super::{
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
Connected, ConnectedPoint, Connection, ConnectionError, ConnectionHandler, ConnectionLimit,
Connected, ConnectedPoint, ConnectionError, ConnectionHandler, ConnectionLimit,
IntoConnectionHandler, PendingConnectionError, Substream,
};
use crate::{muxing::StreamMuxer, Executor};
Expand Down Expand Up @@ -277,40 +277,6 @@ impl<H: IntoConnectionHandler, TE> Manager<H, TE> {
ConnectionId(task_id)
}

/// Adds an existing connection to the manager.
pub fn add<M>(&mut self, conn: Connection<M, H::Handler>, info: Connected) -> ConnectionId
where
H: IntoConnectionHandler + Send + 'static,
H::Handler: ConnectionHandler<Substream = Substream<M>> + Send + 'static,
<H::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TE: error::Error + Send + 'static,
M: StreamMuxer + Send + Sync + 'static,
M::OutboundSubstream: Send + 'static,
{
let task_id = self.next_task_id;
self.next_task_id.0 += 1;

let (tx, rx) = mpsc::channel(self.task_command_buffer_size);
self.tasks.insert(
task_id,
TaskInfo {
sender: tx,
state: TaskState::Established(info),
},
);

let task: Pin<Box<Task<Pin<Box<future::Pending<_>>>, _, _, _>>> =
Box::pin(Task::established(task_id, self.events_tx.clone(), rx, conn));

if let Some(executor) = &mut self.executor {
executor.exec(task);
} else {
self.local_spawns.push(task);
}

ConnectionId(task_id)
}

/// Gets an entry for a managed connection, if it exists.
pub fn entry(&mut self, id: ConnectionId) -> Option<Entry<'_, THandlerInEvent<H>>> {
if let hash_map::Entry::Occupied(task) = self.tasks.entry(id.0) {
Expand Down Expand Up @@ -496,18 +462,6 @@ impl<'a, I> EstablishedEntry<'a, I> {
}
}

// TODO: Needed?
// /// Instantly removes the entry from the manager, dropping
// /// the command channel to the background task of the connection,
// /// which will thus drop the connection asap without an orderly
// /// close or emitting another event.
// pub fn remove(self) -> Connected {
// match self.task.remove().state {
// TaskState::Established(c) => c,
// TaskState::Pending => unreachable!("By Entry::new()"),
// }
// }

/// Returns the connection ID.
pub fn id(&self) -> ConnectionId {
ConnectionId(*self.task.key())
Expand Down
33 changes: 12 additions & 21 deletions core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,24 +131,6 @@ where
},
}
}

/// Create a task for an existing node we are already connected to.
pub fn established(
id: TaskId,
events: mpsc::Sender<Event<H, E>>,
commands: mpsc::Receiver<Command<THandlerInEvent<H>>>,
connection: Connection<M, H::Handler>,
) -> Self {
Task {
id,
events,
commands: commands.fuse(),
state: State::Established {
connection,
event: None,
},
}
}
}

/// The state associated with the `Task` of a connection.
Expand Down Expand Up @@ -227,8 +209,16 @@ where
Poll::Pending => {}
Poll::Ready(None) => {
// The manager has dropped the task; abort.
// TODO: Should we return the handler in this case?
return Poll::Ready(());
// Don't accept any further commands and terminate the
// task with a final event.
this.commands.get_mut().close();
let event = Event::Failed {
id,
handler,
error: PendingConnectionError::Aborted,
};
this.state = State::Terminating(event);
continue 'poll;
}
Poll::Ready(Some(_)) => {
panic!("Task received command while the connection is pending.")
Expand Down Expand Up @@ -280,9 +270,10 @@ where
continue 'poll;
}
Poll::Ready(None) => {
todo!("Safe to assume that this should never happen?");
// The manager has dropped the task or disappeared; abort.
// TODO: Should we return the handler in this case?
return Poll::Ready(());
// return Poll::Ready(());
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,7 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {
if let Some(manager::Entry::Established(e)) = self.manager.entry(id) {
e.start_close(None);
// TODO: I removed the disconnected logic, thus depending on start_close to
// eventually trigger a ConnectionClosed event. Make sure that is the case and
// also that the num_established counters are kept consistent.
// eventually trigger a ConnectionClosed event. Make sure that is the case.
}
self.counters.dec_established(endpoint);
}
Expand Down

0 comments on commit 425e777

Please sign in to comment.