Skip to content

Commit

Permalink
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
Browse files Browse the repository at this point in the history
* Stop ignoring inbound message errors and handshake timeouts

To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
  (not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout

Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.

* Avoid hangs by adding a timeout to the candidate set update

Also increase the fanout from 1 to 2, to increase address diversity.

But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.

Also log Peers response errors in the CandidateSet.

* Use the select macro in the crawler to reduce hangs

The `select` function is biased towards its first argument, risking
starvation.

As a side-benefit, this change also makes the code a lot easier to read
and maintain.

* Split CrawlerAction::Demand into separate actions

This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.

That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.

* Spawn a separate task for each handshake

This change avoids deadlocks by letting each handshake make progress
independently.

* Move the dial task into a separate function

This refactor improves readability.

* Fix buggy future::select function usage

And document the correctness of the new code.
  • Loading branch information
teor2345 authored Apr 7, 2021
1 parent 4185754 commit 375c8d8
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 144 deletions.
8 changes: 8 additions & 0 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ pub const LIVE_PEER_DURATION: Duration = Duration::from_secs(60 + 20 + 20 + 20);
/// connected peer.
pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(60);

/// The number of GetAddr requests sent when crawling for new peers.
///
/// ## SECURITY
///
/// The fanout should be greater than 1, to ensure that Zebra's address book is
/// not dominated by a single peer.
pub const GET_ADDR_FANOUT: usize = 2;

/// Truncate timestamps in outbound address messages to this time interval.
///
/// This is intended to prevent a peer from learning exactly when we received
Expand Down
37 changes: 30 additions & 7 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,22 @@ where
match self.state {
State::AwaitingRequest => {
trace!("awaiting client request or peer message");
// CORRECTNESS
//
// Currently, select prefers the first future if multiple
// futures are ready.
//
// The peer can starve client requests if it sends an
// uninterrupted series of messages. But this is unlikely in
// practice, due to network delays.
//
// If both futures are ready, there's no particular reason
// to prefer one over the other.
//
// TODO: use `futures::select!`, which chooses a ready future
// at random, avoiding starvation
// (To use `select!`, we'll need to map the different
// results to a new enum types.)
match future::select(peer_rx.next(), self.client_rx.next()).await {
Either::Left((None, _)) => {
self.fail_with(PeerError::ConnectionClosed);
Expand Down Expand Up @@ -404,14 +420,21 @@ where
.request_timer
.as_mut()
.expect("timeout must be set while awaiting response");
let cancel = future::select(timer_ref, tx.cancellation());
match future::select(peer_rx.next(), cancel)
// CORRECTNESS
//
// Currently, select prefers the first future if multiple
// futures are ready.
//
// If multiple futures are ready, we want the cancellation
// to take priority, then the timeout, then peer responses.
let cancel = future::select(tx.cancellation(), timer_ref);
match future::select(cancel, peer_rx.next())
.instrument(span.clone())
.await
{
Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed),
Either::Left((Some(Err(e)), _)) => self.fail_with(e),
Either::Left((Some(Ok(peer_msg)), _cancel)) => {
Either::Right((None, _)) => self.fail_with(PeerError::ConnectionClosed),
Either::Right((Some(Err(e)), _)) => self.fail_with(e),
Either::Right((Some(Ok(peer_msg)), _cancel)) => {
// Try to process the message using the handler.
// This extremely awkward construction avoids
// keeping a live reference to handler across the
Expand Down Expand Up @@ -455,7 +478,7 @@ where
};
}
}
Either::Right((Either::Left(_), _peer_fut)) => {
Either::Left((Either::Right(_), _peer_fut)) => {
trace!(parent: &span, "client request timed out");
let e = PeerError::ClientRequestTimeout;
self.state = match self.state {
Expand All @@ -478,7 +501,7 @@ where
),
};
}
Either::Right((Either::Right(_), _peer_fut)) => {
Either::Left((Either::Left(_), _peer_fut)) => {
trace!(parent: &span, "client request was cancelled");
self.state = State::AwaitingRequest;
}
Expand Down
9 changes: 9 additions & 0 deletions zebra-network/src/peer/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,13 @@ pub enum HandshakeError {
/// The remote peer offered a version older than our minimum version.
#[error("Peer offered obsolete version: {0:?}")]
ObsoleteVersion(crate::protocol::external::types::Version),
/// Sending or receiving a message timed out.
#[error("Timeout when sending or receiving a message to peer")]
Timeout,
}

impl From<tokio::time::error::Elapsed> for HandshakeError {
fn from(_source: tokio::time::error::Elapsed) -> Self {
HandshakeError::Timeout
}
}
168 changes: 128 additions & 40 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures::{
channel::{mpsc, oneshot},
prelude::*,
};
use tokio::{net::TcpStream, sync::broadcast};
use tokio::{net::TcpStream, sync::broadcast, time::timeout};
use tokio_util::codec::Framed;
use tower::Service;
use tracing::{span, Level, Span};
Expand All @@ -34,6 +34,12 @@ use super::{Client, Connection, ErrorSlot, HandshakeError, PeerError};

/// A [`Service`] that handshakes with a remote peer and constructs a
/// client/server pair.
///
/// CORRECTNESS
///
/// To avoid hangs, each handshake (or its connector) should be:
/// - launched in a separate task, and
/// - wrapped in a timeout.
#[derive(Clone)]
pub struct Handshake<S> {
config: Config,
Expand Down Expand Up @@ -211,6 +217,10 @@ where
let fut = async move {
debug!("connecting to remote peer");

// CORRECTNESS
//
// As a defence-in-depth against hangs, every send or next on stream
// should be wrapped in a timeout.
let mut stream = Framed::new(
tcp_stream,
Codec::builder()
Expand Down Expand Up @@ -260,11 +270,10 @@ where
};

debug!(?version, "sending initial version message");
stream.send(version).await?;
timeout(constants::REQUEST_TIMEOUT, stream.send(version)).await??;

let remote_msg = stream
.next()
.await
let remote_msg = timeout(constants::REQUEST_TIMEOUT, stream.next())
.await?
.ok_or(HandshakeError::ConnectionClosed)??;

// Check that we got a Version and destructure its fields into the local scope.
Expand Down Expand Up @@ -293,11 +302,10 @@ where
return Err(HandshakeError::NonceReuse);
}

stream.send(Message::Verack).await?;
timeout(constants::REQUEST_TIMEOUT, stream.send(Message::Verack)).await??;

let remote_msg = stream
.next()
.await
let remote_msg = timeout(constants::REQUEST_TIMEOUT, stream.next())
.await?
.ok_or(HandshakeError::ConnectionClosed)??;
if let Message::Verack = remote_msg {
debug!("got verack from remote peer");
Expand Down Expand Up @@ -376,22 +384,42 @@ where
future::ready(Ok(msg))
});

// CORRECTNESS
//
// Every message and error must update the peer address state via
// the inbound_ts_collector.
let inbound_ts_collector = timestamp_collector.clone();
let peer_rx = peer_rx
.then(move |msg| {
// Add a metric for inbound messages and fire a timestamp event.
let mut timestamp_collector = timestamp_collector.clone();
// Add a metric for inbound messages and errors.
// Fire a timestamp or failure event.
let mut inbound_ts_collector = inbound_ts_collector.clone();
async move {
if let Ok(msg) = &msg {
metrics::counter!(
"zcash.net.in.messages",
1,
"command" => msg.to_string(),
"addr" => addr.to_string(),
);
use futures::sink::SinkExt;
let _ = timestamp_collector
.send(MetaAddr::new_responded(&addr, &remote_services))
.await;
match &msg {
Ok(msg) => {
metrics::counter!(
"zcash.net.in.messages",
1,
"command" => msg.to_string(),
"addr" => addr.to_string(),
);
// the collector doesn't depend on network activity,
// so this await should not hang
let _ = inbound_ts_collector
.send(MetaAddr::new_responded(&addr, &remote_services))
.await;
}
Err(err) => {
metrics::counter!(
"zebra.net.in.errors",
1,
"error" => err.to_string(),
"addr" => addr.to_string(),
);
let _ = inbound_ts_collector
.send(MetaAddr::new_errored(&addr, &remote_services))
.await;
}
}
msg
}
Expand Down Expand Up @@ -452,6 +480,16 @@ where
.boxed(),
);

// CORRECTNESS
//
// To prevent hangs:
// - every await that depends on the network must have a timeout (or interval)
// - every error/shutdown must update the address book state and return
//
// The address book state can be updated via `ClientRequest.tx`, or the
// timestamp_collector.
//
// Returning from the spawned closure terminates the connection's heartbeat task.
let heartbeat_span = tracing::debug_span!(parent: connection_span, "heartbeat");
tokio::spawn(
async move {
Expand All @@ -460,11 +498,23 @@ where

let mut shutdown_rx = shutdown_rx;
let mut server_tx = server_tx;
let mut timestamp_collector = timestamp_collector.clone();
let mut interval_stream = tokio::time::interval(constants::HEARTBEAT_INTERVAL);
loop {
let shutdown_rx_ref = Pin::new(&mut shutdown_rx);
match future::select(interval_stream.next(), shutdown_rx_ref).await {
Either::Left(_) => {
let mut send_addr_err = false;

// CORRECTNESS
//
// Currently, select prefers the first future if multiple
// futures are ready.
//
// Starvation is impossible here, because interval has a
// slow rate, and shutdown is a oneshot. If both futures
// are ready, we want the shutdown to take priority over
// sending a useless heartbeat.
match future::select(shutdown_rx_ref, interval_stream.next()).await {
Either::Right(_) => {
let (tx, rx) = oneshot::channel();
let request = Request::Ping(Nonce::default());
tracing::trace!(?request, "queueing heartbeat request");
Expand All @@ -474,19 +524,28 @@ where
span: tracing::Span::current(),
}) {
Ok(()) => {
match server_tx.flush().await {
Ok(()) => {}
// TODO: also wait on the shutdown_rx here
match timeout(
constants::HEARTBEAT_INTERVAL,
server_tx.flush(),
)
.await
{
Ok(Ok(())) => {
}
Ok(Err(e)) => {
tracing::warn!(
?e,
"flushing client request failed, shutting down"
);
send_addr_err = true;
}
Err(e) => {
// We can't get the client request for this failure,
// so we can't send an error back here. But that's ok,
// because:
// - this error never happens (or it's very rare)
// - if the flush() fails, the server hasn't
// received the request
tracing::warn!(
"flushing client request failed: {:?}",
e
?e,
"flushing client request timed out, shutting down"
);
send_addr_err = true;
}
}
}
Expand Down Expand Up @@ -514,17 +573,46 @@ where
// Heartbeats are checked internally to the
// connection logic, but we need to wait on the
// response to avoid canceling the request.
match rx.await {
Ok(_) => tracing::trace!("got heartbeat response"),
Err(_) => {
tracing::trace!(
//
// TODO: also wait on the shutdown_rx here
match timeout(constants::HEARTBEAT_INTERVAL, rx).await {
Ok(Ok(_)) => tracing::trace!("got heartbeat response"),
Ok(Err(e)) => {
tracing::warn!(
?e,
"error awaiting heartbeat response, shutting down"
);
return;
send_addr_err = true;
}
Err(e) => {
tracing::warn!(
?e,
"heartbeat response timed out, shutting down"
);
send_addr_err = true;
}
}
}
Either::Right(_) => return, // got shutdown signal
Either::Left(_) => {
tracing::trace!("shutting down due to Client shut down");
// awaiting a local task won't hang
let _ = timestamp_collector
.send(MetaAddr::new_shutdown(&addr, &remote_services))
.await;
return;
}
}
if send_addr_err {
// We can't get the client request for this failure,
// so we can't send an error back on `tx`. So
// we just update the address book with a failure.
let _ = timestamp_collector
.send(MetaAddr::new_errored(
&addr,
&remote_services,
))
.await;
return;
}
}
}
Expand Down
Loading

0 comments on commit 375c8d8

Please sign in to comment.