Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/: Remove TInEvent and TOutEvent #2183

Merged
merged 14 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,18 @@

- Add `From<&PublicKey> for PeerId` (see [PR 2145]).

- Remove `TInEvent` and `TOutEvent` trait paramters on most public types.
`TInEvent` and `TOutEvent` are implied through `THandler` and thus
superflucious. Both are removed in favor of a derivation through `THandler`
(see [PR 2183]).

- Require `ConnectionHandler::{InEvent,OutEvent,Error}` to implement `Debug`
(see [PR 2183]).

[PR 2145]: https://github.com/libp2p/rust-libp2p/pull/2145
[PR 2142]: https://github.com/libp2p/rust-libp2p/pull/2142
[PR 2137]: https://github.com/libp2p/rust-libp2p/pull/2137/
[PR 2137]: https://github.com/libp2p/rust-libp2p/pull/2137
[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183

# 0.29.0 [2021-07-12]

Expand Down
2 changes: 1 addition & 1 deletion core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

mod error;
mod handler;
pub(crate) mod handler;
mod listeners;
mod substream;

Expand Down
13 changes: 8 additions & 5 deletions core/src/connection/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::Multiaddr;
use std::{task::Context, task::Poll};
use std::{fmt::Debug, task::Context, task::Poll};
use super::{Connected, SubstreamEndpoint};

/// The interface of a connection handler.
Expand All @@ -30,14 +30,14 @@ pub trait ConnectionHandler {
///
/// See also [`EstablishedConnection::notify_handler`](super::EstablishedConnection::notify_handler)
/// and [`ConnectionHandler::inject_event`].
type InEvent;
type InEvent: Debug + Send + 'static;
/// The outbound type of events that the handler emits to the `Network`
/// through [`ConnectionHandler::poll`].
///
/// See also [`NetworkEvent::ConnectionEvent`](crate::network::NetworkEvent::ConnectionEvent).
type OutEvent;
type OutEvent: Debug + Send + 'static;
/// The type of errors that the handler can produce when polled by the `Network`.
type Error;
type Error: Debug + Send + 'static;
/// The type of the substream containing the data.
type Substream;
/// Information about a substream. Can be sent to the handler through a `SubstreamEndpoint`,
Expand Down Expand Up @@ -91,6 +91,10 @@ where
}
}

pub(crate) type THandlerInEvent<THandler> = <<THandler as IntoConnectionHandler>::Handler as ConnectionHandler>::InEvent;
pub(crate) type THandlerOutEvent<THandler> = <<THandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent;
pub(crate) type THandlerError<THandler> = <<THandler as IntoConnectionHandler>::Handler as ConnectionHandler>::Error;

/// Event produced by a handler.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ConnectionHandlerEvent<TOutboundOpenInfo, TCustom> {
Expand Down Expand Up @@ -127,4 +131,3 @@ impl<TOutboundOpenInfo, TCustom> ConnectionHandlerEvent<TOutboundOpenInfo, TCust
}
}
}

49 changes: 21 additions & 28 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ use super::{
ConnectionHandler,
IntoConnectionHandler,
PendingConnectionError,
Substream
Substream,
handler::{
THandlerInEvent,
THandlerOutEvent,
THandlerError,
},
};
use task::{Task, TaskId};

Expand Down Expand Up @@ -88,15 +93,15 @@ impl ConnectionId {
}

/// A connection `Manager` orchestrates the I/O of a set of connections.
pub struct Manager<I, O, H, E, HE> {
pub struct Manager<H: IntoConnectionHandler, E> {
/// The tasks of the managed connections.
///
/// Each managed connection is associated with a (background) task
/// spawned onto an executor. Each `TaskInfo` in `tasks` is linked to such a
/// background task via a channel. Closing that channel (i.e. dropping
/// the sender in the associated `TaskInfo`) stops the background task,
/// which will attempt to gracefully close the connection.
tasks: FnvHashMap<TaskId, TaskInfo<I>>,
tasks: FnvHashMap<TaskId, TaskInfo<THandlerInEvent<H>>>,

/// Next available identifier for a new connection / task.
next_task_id: TaskId,
Expand All @@ -115,13 +120,13 @@ pub struct Manager<I, O, H, E, HE> {

/// Sender distributed to managed tasks for reporting events back
/// to the manager.
events_tx: mpsc::Sender<task::Event<O, H, E, HE>>,
events_tx: mpsc::Sender<task::Event<H, E>>,

/// Receiver for events reported from managed tasks.
events_rx: mpsc::Receiver<task::Event<O, H, E, HE>>
events_rx: mpsc::Receiver<task::Event<H, E>>
}

impl<I, O, H, E, HE> fmt::Debug for Manager<I, O, H, E, HE>
impl<H: IntoConnectionHandler, E> fmt::Debug for Manager<H, E>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_map()
Expand Down Expand Up @@ -179,7 +184,7 @@ enum TaskState {

/// Events produced by the [`Manager`].
#[derive(Debug)]
pub enum Event<'a, I, O, H, TE, HE> {
pub enum Event<'a, H: IntoConnectionHandler, TE> {
/// A connection attempt has failed.
PendingConnectionError {
/// The connection ID.
Expand All @@ -206,35 +211,35 @@ pub enum Event<'a, I, O, H, TE, HE> {
connected: Connected,
/// The error that occurred, if any. If `None`, the connection
/// has been actively closed.
error: Option<ConnectionError<HE>>,
error: Option<ConnectionError<THandlerError<H>>>,
},

/// A connection has been established.
ConnectionEstablished {
/// The entry associated with the new connection.
entry: EstablishedEntry<'a, I>,
entry: EstablishedEntry<'a, THandlerInEvent<H>>,
},

/// A connection handler has produced an event.
ConnectionEvent {
/// The entry associated with the connection that produced the event.
entry: EstablishedEntry<'a, I>,
entry: EstablishedEntry<'a, THandlerInEvent<H>>,
/// The produced event.
event: O
event: THandlerOutEvent<H>
},

/// A connection to a node has changed its address.
AddressChange {
/// The entry associated with the connection that changed address.
entry: EstablishedEntry<'a, I>,
entry: EstablishedEntry<'a, THandlerInEvent<H>>,
/// The former [`ConnectedPoint`].
old_endpoint: ConnectedPoint,
/// The new [`ConnectedPoint`].
new_endpoint: ConnectedPoint,
},
}

impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
impl<H: IntoConnectionHandler, TE> Manager<H, TE> {
/// Creates a new connection manager.
pub fn new(config: ManagerConfig) -> Self {
let (tx, rx) = mpsc::channel(config.task_event_buffer_size);
Expand All @@ -255,19 +260,13 @@ impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
/// processing the node's events.
pub fn add_pending<F, M>(&mut self, future: F, handler: H) -> ConnectionId
where
I: Send + 'static,
O: Send + 'static,
TE: error::Error + Send + 'static,
HE: error::Error + Send + 'static,
M: StreamMuxer + Send + Sync + 'static,
M::OutboundSubstream: Send + 'static,
F: Future<Output = ConnectResult<M, TE>> + Send + 'static,
H: IntoConnectionHandler + Send + 'static,
H::Handler: ConnectionHandler<
Substream = Substream<M>,
InEvent = I,
OutEvent = O,
Error = HE
> + Send + 'static,
<H::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
{
Expand All @@ -293,15 +292,9 @@ impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
H: IntoConnectionHandler + Send + 'static,
H::Handler: ConnectionHandler<
Substream = Substream<M>,
InEvent = I,
OutEvent = O,
Error = HE
> + Send + 'static,
<H::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TE: error::Error + Send + 'static,
HE: error::Error + Send + 'static,
I: Send + 'static,
O: Send + 'static,
M: StreamMuxer + Send + Sync + 'static,
M::OutboundSubstream: Send + 'static,
{
Expand All @@ -313,7 +306,7 @@ impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
sender: tx, state: TaskState::Established(info)
});

let task: Pin<Box<Task<Pin<Box<future::Pending<_>>>, _, _, _, _, _>>> =
let task: Pin<Box<Task<Pin<Box<future::Pending<_>>>, _, _, _>>> =
Box::pin(Task::established(task_id, self.events_tx.clone(), rx, conn));

if let Some(executor) = &mut self.executor {
Expand All @@ -326,7 +319,7 @@ impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
}

/// Gets an entry for a managed connection, if it exists.
pub fn entry(&mut self, id: ConnectionId) -> Option<Entry<'_, I>> {
pub fn entry(&mut self, id: ConnectionId) -> Option<Entry<'_, THandlerInEvent<H>>> {
if let hash_map::Entry::Occupied(task) = self.tasks.entry(id.0) {
Some(Entry::new(task))
} else {
Expand All @@ -340,7 +333,7 @@ impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
}

/// Polls the manager for events relating to the managed connections.
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Event<'a, I, O, H, TE, HE>> {
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Event<'a, H, TE>> {
// Advance the content of `local_spawns`.
while let Poll::Ready(Some(_)) = self.local_spawns.poll_next_unpin(cx) {}

Expand Down
45 changes: 26 additions & 19 deletions core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ use crate::{
IntoConnectionHandler,
PendingConnectionError,
Substream,
handler::{
THandlerInEvent,
THandlerOutEvent,
THandlerError,
},
},
};
use futures::{prelude::*, channel::mpsc, stream};
Expand All @@ -53,23 +58,23 @@ pub enum Command<T> {

/// Events that a task can emit to its manager.
#[derive(Debug)]
pub enum Event<T, H, TE, HE> {
pub enum Event<H: IntoConnectionHandler, TE> {
/// A connection to a node has succeeded.
Established { id: TaskId, info: Connected },
/// A pending connection failed.
Failed { id: TaskId, error: PendingConnectionError<TE>, handler: H },
/// A node we are connected to has changed its address.
AddressChange { id: TaskId, new_address: Multiaddr },
/// Notify the manager of an event from the connection.
Notify { id: TaskId, event: T },
Notify { id: TaskId, event: THandlerOutEvent<H> },
/// A connection closed, possibly due to an error.
///
/// If `error` is `None`, the connection has completed
/// an active orderly close.
Closed { id: TaskId, error: Option<ConnectionError<HE>> }
Closed { id: TaskId, error: Option<ConnectionError<THandlerError<H>>> }
}

impl<T, H, TE, HE> Event<T, H, TE, HE> {
impl<H: IntoConnectionHandler, TE> Event<H, TE> {
pub fn id(&self) -> &TaskId {
match self {
Event::Established { id, .. } => id,
Expand All @@ -82,7 +87,7 @@ impl<T, H, TE, HE> Event<T, H, TE, HE> {
}

/// A `Task` is a [`Future`] that handles a single connection.
pub struct Task<F, M, H, I, O, E>
pub struct Task<F, M, H, E>
where
M: StreamMuxer,
H: IntoConnectionHandler,
Expand All @@ -92,16 +97,16 @@ where
id: TaskId,

/// Sender to emit events to the manager of this task.
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>,
events: mpsc::Sender<Event<H, E>>,

/// Receiver for commands sent by the manager of this task.
commands: stream::Fuse<mpsc::Receiver<Command<I>>>,
commands: stream::Fuse<mpsc::Receiver<Command<THandlerInEvent<H>>>>,

/// Inner state of this `Task`.
state: State<F, M, H, O, E>,
state: State<F, M, H, E>,
}

impl<F, M, H, I, O, E> Task<F, M, H, I, O, E>
impl<F, M, H, E> Task<F, M, H, E>
where
M: StreamMuxer,
H: IntoConnectionHandler,
Expand All @@ -110,8 +115,8 @@ where
/// Create a new task to connect and handle some node.
pub fn pending(
id: TaskId,
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>,
commands: mpsc::Receiver<Command<I>>,
events: mpsc::Sender<Event<H, E>>,
commands: mpsc::Receiver<Command<THandlerInEvent<H>>>,
future: F,
handler: H
) -> Self {
Expand All @@ -129,8 +134,8 @@ where
/// Create a task for an existing node we are already connected to.
pub fn established(
id: TaskId,
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>,
commands: mpsc::Receiver<Command<I>>,
events: mpsc::Sender<Event<H, E>>,
commands: mpsc::Receiver<Command<THandlerInEvent<H>>>,
connection: Connection<M, H::Handler>
) -> Self {
Task {
Expand All @@ -143,7 +148,7 @@ where
}

/// The state associated with the `Task` of a connection.
enum State<F, M, H, O, E>
enum State<F, M, H, E>
where
M: StreamMuxer,
H: IntoConnectionHandler,
Expand All @@ -165,33 +170,35 @@ where
/// is polled for new events in this state, otherwise the event
/// must be sent to the `Manager` before the connection can be
/// polled again.
event: Option<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>
event: Option<Event<H, E>>,
},

/// The connection is closing (active close).
Closing(Close<M>),

/// The task is terminating with a final event for the `Manager`.
Terminating(Event<O, H, E, <H::Handler as ConnectionHandler>::Error>),
Terminating(Event<H, E>),

/// The task has finished.
Done
}

impl<F, M, H, I, O, E> Unpin for Task<F, M, H, I, O, E>
impl<F, M, H, E> Unpin for Task<F, M, H, E>
where
M: StreamMuxer,
H: IntoConnectionHandler,
H::Handler: ConnectionHandler<Substream = Substream<M>>
{
}

impl<F, M, H, I, O, E> Future for Task<F, M, H, I, O, E>
impl<F, M, H, E> Future for Task<F, M, H, E>
where
M: StreamMuxer,
F: Future<Output = ConnectResult<M, E>>,
H: IntoConnectionHandler,
H::Handler: ConnectionHandler<Substream = Substream<M>, InEvent = I, OutEvent = O>
H::Handler: ConnectionHandler<
Substream = Substream<M>,
> + Send + 'static,
{
type Output = ();

Expand Down
Loading