Skip to content

Commit

Permalink
Don't timeout when receiving too many messages from choked peer
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed May 20, 2024
1 parent 4a8e73c commit 5ddb344
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 36 deletions.
12 changes: 6 additions & 6 deletions lib/src/network/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{
constants::MAX_PENDING_RESPONSES,
constants::MAX_PENDING_REQUESTS_PER_CLIENT,
debug_payload::{DebugResponse, PendingDebugRequest},
message::{Content, Response, ResponseDisambiguator},
pending::{PendingRequest, PendingRequests, PendingResponse, ProcessedResponse},
Expand Down Expand Up @@ -43,10 +43,10 @@ impl Client {
// processing responses (which sometimes takes a while).
let (send_queue_tx, send_queue_rx) = mpsc::unbounded_channel();

// We're making sure to not send more requests than MAX_PENDING_RESPONSES, but there may be
// some unsolicited responses and also the peer may be malicious and send us too many
// responses (so we shoulnd't use unbounded_channel).
let (recv_queue_tx, recv_queue_rx) = mpsc::channel(2 * MAX_PENDING_RESPONSES);
// We're making sure to not send more requests than MAX_PENDING_REQUESTS_PER_CLIENT, but
// there may be some unsolicited responses and also the peer may be malicious and send us
// too many responses (so we shoulnd't use unbounded_channel).
let (recv_queue_tx, recv_queue_rx) = mpsc::channel(2 * MAX_PENDING_REQUESTS_PER_CLIENT);

let inner = Inner {
vault,
Expand Down Expand Up @@ -144,7 +144,7 @@ impl Inner {
send_queue_rx: &mut mpsc::UnboundedReceiver<(PendingRequest, Instant)>,
) {
// Limits requests per link (peer + repo)
let link_request_limiter = Arc::new(Semaphore::new(MAX_PENDING_RESPONSES));
let link_request_limiter = Arc::new(Semaphore::new(MAX_PENDING_REQUESTS_PER_CLIENT));

loop {
let Some((request, timestamp)) = send_queue_rx.recv().await else {
Expand Down
21 changes: 12 additions & 9 deletions lib/src/network/constants.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::time::Duration;

// If a response to a pending request is not received within this time, a request timeout error is
// triggered.
/// If a response to a pending request is not received within this time, a request timeout error is
/// triggered.
pub(super) const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);

// Maximum number of request which have been sent but for which we haven't received a response yet.
// Higher values give better performance but too high risks congesting the network. Also there is a
// point of diminishing returns. 32 seems to be the sweet spot based on a simple experiment.
/// Maximum number of requests that have been sent to a given peer but for which we haven't received
/// a response yet. Higher values give better performance but too high risks congesting the
/// network. There is also a point of diminishing returns. 32 seems to be the sweet spot based on a
/// simple experiment.
/// NOTE: This limit is protecting the peer against being overhelmed by too many requests from us.
// TODO: run more precise benchmarks to find the actual optimum.
pub(super) const MAX_REQUESTS_IN_FLIGHT: usize = 32;
pub(super) const MAX_IN_FLIGHT_REQUESTS_PER_PEER: usize = 32;

// Maximum number of respones that a `Client` received but had not yet processed before the client
// is allowed to send more requests.
pub(super) const MAX_PENDING_RESPONSES: usize = 2 * MAX_REQUESTS_IN_FLIGHT;
/// Maximum number of requests that have been sent on a given `Client` but for which the response
/// hasn't yet been processed (although it may have been received).
/// NOTE: This limit is protecting us against being overhelmed by too many responses from the peer.
pub(super) const MAX_PENDING_REQUESTS_PER_CLIENT: usize = 2 * MAX_IN_FLIGHT_REQUESTS_PER_PEER;
10 changes: 7 additions & 3 deletions lib/src/network/message_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::{
choke,
client::Client,
connection::ConnectionPermit,
constants::MAX_REQUESTS_IN_FLIGHT,
constants::MAX_IN_FLIGHT_REQUESTS_PER_PEER,
crypto::{self, DecryptingStream, EncryptingSink, EstablishError, RecvError, Role, SendError},
message::{Content, MessageChannelId, Request, Response},
message_dispatcher::{ContentSink, ContentStream, MessageDispatcher},
Expand All @@ -15,6 +15,7 @@ use super::{
};
use crate::{
collections::{hash_map::Entry, HashMap},
network::constants::MAX_PENDING_REQUESTS_PER_CLIENT,
repository::{LocalId, Vault},
};
use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
Expand Down Expand Up @@ -71,7 +72,7 @@ impl MessageBroker {
that_runtime_id,
dispatcher: MessageDispatcher::new(),
links: HashMap::default(),
request_limiter: Arc::new(Semaphore::new(MAX_REQUESTS_IN_FLIGHT)),
request_limiter: Arc::new(Semaphore::new(MAX_IN_FLIGHT_REQUESTS_PER_PEER)),
pex_peer,
monitor,
tracker,
Expand Down Expand Up @@ -304,7 +305,10 @@ async fn run_link(
pex_rx: &mut PexReceiver,
choker: choke::Choker,
) -> ControlFlow {
let (request_tx, request_rx) = mpsc::channel(1);
// If the peer is choked we may still receive requests from them but we won't process them until
// the peer is unchoked. Therefore, the capacity of this channel must be large enough to
// accomodate any such requests.
let (request_tx, request_rx) = mpsc::channel(MAX_PENDING_REQUESTS_PER_CLIENT);
let (response_tx, response_rx) = mpsc::channel(1);
let (content_tx, content_rx) = mpsc::channel(1);

Expand Down
21 changes: 5 additions & 16 deletions lib/src/network/message_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ use std::{
Arc,
},
task::{Context, Poll, Waker},
time::Duration,
};
use tokio::{
runtime, select,
sync::mpsc::{self, UnboundedReceiver, UnboundedSender},
sync::{Mutex as AsyncMutex, Notify, Semaphore},
time::{self, Duration},
};

// Time after which if no message is received, the connection is dropped.
Expand Down Expand Up @@ -396,15 +396,13 @@ struct MultiStream {

impl MultiStream {
fn new() -> Self {
const MAX_QUEUED_MESSAGES: usize = 32;

let inner = Arc::new(BlockingMutex::new(MultiStreamInner {
streams: SelectAll::new(),
waker: None,
}));

let stream_added = Arc::new(Notify::new());
let (tx, rx) = mpsc::channel(MAX_QUEUED_MESSAGES);
let (tx, rx) = mpsc::channel(1);

let _runner =
scoped_task::spawn(multi_stream_runner(inner.clone(), tx, stream_added.clone()));
Expand Down Expand Up @@ -471,18 +469,9 @@ async fn multi_stream_runner(
stream_added.notified().await;

while let Some((permit_id, message)) = (Recv { inner: &inner }).await {
// Close the connection if the sender is sending too many messages that we're not handling
// in a reasonable time. Note that if we don't have some of the repositories that the peer
// has, then they'll send some small number of messages from their Barrier code. That's
// fine because that number does not exceed MAX_QUEUED_MESSAGES and so the above `tx.send`
// won't block for long.

// FIXME: When this timeout is triggered it closes this stream which closes all its channels
// and the links attached to them. This can cause sync to stop. We should find a better way
// to handle this or at least restart the channels when this happens.
match time::timeout(KEEP_ALIVE_RECV_INTERVAL, tx.send((permit_id, message))).await {
Ok(Ok(())) => (),
Err(_) | Ok(Err(_)) => break,
match tx.send((permit_id, message)).await {
Ok(()) => (),
Err(_) => break,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions lib/src/network/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{
choke,
client::Client,
constants::MAX_REQUESTS_IN_FLIGHT,
constants::MAX_IN_FLIGHT_REQUESTS_PER_PEER,
message::{Content, Request, Response},
server::Server,
};
Expand Down Expand Up @@ -618,7 +618,7 @@ fn create_client(repo: Vault) -> ClientData {
repo,
send_tx,
recv_rx,
Arc::new(Semaphore::new(MAX_REQUESTS_IN_FLIGHT)),
Arc::new(Semaphore::new(MAX_IN_FLIGHT_REQUESTS_PER_PEER)),
);

(client, send_rx, recv_tx)
Expand Down

0 comments on commit 5ddb344

Please sign in to comment.