Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement graceful shutdown for the peer set #3071

Merged
merged 2 commits into from
Nov 18, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions zebra-network/src/peer_set/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

use std::{
collections::HashMap,
convert,
fmt::Debug,
future::Future,
marker::PhantomData,
Expand Down Expand Up @@ -147,7 +148,18 @@ where
<D::Service as Service<Request>>::Future: Send + 'static,
<D::Service as Load>::Metric: Debug,
{
/// Construct a peerset which uses `discover` internally.
/// Construct a peerset which uses `discover` to manage peer connections.
///
/// Arguments:
/// - `config`: configures the peer set connection limit;
/// - `discover`: handles peer connects and disconnects;
/// - `demand_signal`: requests more peers when all peers are busy (unready);
/// - `handle_rx`: receives background task handles,
/// monitors them to make sure they're still running,
/// and shuts down all the tasks as soon as one task exits;
/// - `inv_stream`: receives inventory advertisements for peers,
/// allowing the peer set to direct inventory requests;
/// - `address_book`: when peer set is busy, it logs address book diagnostics.
pub fn new(
config: &Config,
discover: D,
Expand All @@ -172,6 +184,10 @@ where
}
}

/// Check background task handles to make sure they're still running.
///
/// If any background task exits, shuts down all other background tasks,
/// and returns an error.
fn poll_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxError> {
if self.guards.is_empty() {
match self.handle_rx.try_recv() {
Expand All @@ -187,13 +203,28 @@ where
}
}

match Pin::new(&mut self.guards).poll_next(cx) {
Poll::Pending => {}
Poll::Ready(Some(res)) => res??,
Poll::Ready(None) => Err("all background tasks have exited")?,
let exit_error = match Pin::new(&mut self.guards).poll_next(cx) {
Poll::Pending => return Ok(()),
Poll::Ready(Some(res)) => {
info!(
background_tasks = %self.guards.len(),
"a peer set background task exited, shutting down other peer set tasks"
);

// Flatten the join result and inner result,
// then turn Ok() task exits into errors.
res.map_err(Into::into)
.and_then(convert::identity)
.and(Err("a peer set background task exited".into()))
}
Poll::Ready(None) => Err("all peer set background tasks have exited".into()),
};

for guard in self.guards.iter() {
guard.abort();
}

Ok(())
exit_error
}

fn poll_unready(&mut self, cx: &mut Context<'_>) {
Expand Down