Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/src: Remove poll_broadcast connection notification mechanism #1527

Merged
merged 3 commits into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 4 additions & 35 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ where
/// the associated user data.
#[derive(Debug)]
struct TaskInfo<I, C> {
/// channel endpoint to send messages to the task
/// Channel endpoint to send messages to the task.
//
// Note: Only established tasks can handle commands. Do not send commands to
// pending tasks (see `[<Task as Future>::poll]`).
mxinden marked this conversation as resolved.
Show resolved Hide resolved
sender: mpsc::Sender<task::Command<I>>,
/// The state of the task as seen by the `Manager`.
state: TaskState<C>,
Expand Down Expand Up @@ -286,40 +289,6 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
ConnectionId(task_id)
}

/// Notifies the handlers of all managed connections of an event.
///
/// This function is "atomic", in the sense that if `Poll::Pending` is
/// returned then no event has been sent.
#[must_use]
pub fn poll_broadcast(&mut self, event: &I, cx: &mut Context) -> Poll<()>
where
I: Clone
{
for task in self.tasks.values_mut() {
if let Poll::Pending = task.sender.poll_ready(cx) { // (*)
return Poll::Pending;
}
}

for (id, task) in self.tasks.iter_mut() {
let cmd = task::Command::NotifyHandler(event.clone());
match task.sender.start_send(cmd) {
Ok(()) => {},
Err(e) if e.is_full() => unreachable!("by (*)"),
Err(e) if e.is_disconnected() => {
// The background task ended. The manager will eventually be
// informed through an `Error` event from the task.
log::trace!("Connection dropped: {:?}", id);
},
Err(e) => {
log::error!("Unexpected error: {:?}", e);
}
}
}

Poll::Ready(())
}

/// Gets an entry for a managed connection, if it exists.
pub fn entry(&mut self, id: ConnectionId) -> Option<Entry<'_, I, C>> {
if let hash_map::Entry::Occupied(task) = self.tasks.entry(id.0) {
Expand Down
41 changes: 16 additions & 25 deletions core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ where
commands: stream::Fuse<mpsc::Receiver<Command<I>>>,

/// Inner state of this `Task`.
state: State<F, M, H, I, O, E, C>,
state: State<F, M, H, O, E, C>,
}

impl<F, M, H, I, O, E, C> Task<F, M, H, I, O, E, C>
Expand All @@ -111,7 +111,6 @@ where
state: State::Pending {
future: Box::pin(future),
handler,
events: Vec::new()
},
}
}
Expand All @@ -133,7 +132,7 @@ where
}

/// The state associated with the `Task` of a connection.
enum State<F, M, H, I, O, E, C>
enum State<F, M, H, O, E, C>
where
M: StreamMuxer,
H: IntoConnectionHandler<C>,
Expand All @@ -146,12 +145,6 @@ where
future: Pin<Box<F>>,
/// The intended handler for the established connection.
handler: H,
/// While we are dialing the future, we need to buffer the events received via
/// `Command::NotifyHandler` so that they get delivered to the `handler`
/// once the connection is established. We can't leave these in `Task::receiver`
/// because we have to detect if the connection attempt has been aborted (by
/// dropping the corresponding `sender` owned by the manager).
events: Vec<I>
},

/// The connection is established and a new event is ready to be emitted.
Expand Down Expand Up @@ -198,30 +191,29 @@ where

'poll: loop {
match std::mem::replace(&mut this.state, State::Done) {
State::Pending { mut future, handler, mut events } => {
// Process commands from the manager.
loop {
match Stream::poll_next(Pin::new(&mut this.commands), cx) {
Poll::Pending => break,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(Command::NotifyHandler(event))) =>
events.push(event),
}
State::Pending { mut future, handler } => {
// Check if the manager aborted this task by dropping the `commands`
// channel sender side.
match Stream::poll_next(Pin::new(&mut this.commands), cx) {
Poll::Pending => {},
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(Command::NotifyHandler(_))) => unreachable!(
"Manager does not allow sending commands to pending tasks.",
)
}
// Check if the connection succeeded.
match Future::poll(Pin::new(&mut future), cx) {
Poll::Ready(Ok((info, muxer))) => {
let mut c = Connection::new(muxer, handler.into_handler(&info));
for event in events {
c.inject_event(event)
}
this.state = State::EstablishedReady {
connection: Some(c),
connection: Some(Connection::new(
muxer,
handler.into_handler(&info),
)),
event: Event::Established { id, info }
}
}
Poll::Pending => {
this.state = State::Pending { future, handler, events };
this.state = State::Pending { future, handler };
return Poll::Pending
}
Poll::Ready(Err(error)) => {
Expand Down Expand Up @@ -338,4 +330,3 @@ where
}
}
}

12 changes: 0 additions & 12 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,18 +330,6 @@ where
id
}

/// Sends an event to all nodes.
///
/// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event
/// has been sent to any node yet.
#[must_use]
pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()>
where
TInEvent: Clone
{
self.manager.poll_broadcast(event, cx)
}

/// Adds an existing established connection to the pool.
///
/// Returns the assigned connection ID on success. An error is returned
Expand Down
13 changes: 0 additions & 13 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,18 +269,6 @@ where
})
}

/// Notifies the connection handler of _every_ connection of _every_ peer of an event.
///
/// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event
/// has been sent to any node yet.
#[must_use]
pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()>
where
TInEvent: Clone
{
self.pool.poll_broadcast(event, cx)
}

/// Returns a list of all connected peers, i.e. peers to whom the `Network`
/// has at least one established connection.
pub fn connected_peers(&self) -> impl Iterator<Item = &TPeerId> {
Expand Down Expand Up @@ -641,4 +629,3 @@ impl NetworkConfig {
self
}
}