Skip to content

Commit

Permalink
Merge branch 'load-balancing'
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Dec 4, 2023
2 parents b9fc79e + d426ad7 commit 3e0bb09
Show file tree
Hide file tree
Showing 8 changed files with 465 additions and 404 deletions.
468 changes: 276 additions & 192 deletions lib/src/block_tracker.rs

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions lib/src/network/choke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,19 +291,19 @@ impl Drop for ChokerInner {
mod tests {
use super::*;
use assert_matches::assert_matches;
use std::iter;

#[tokio::test(flavor = "multi_thread")]
// use simulated time (`start_paused`) to avoid having wait for the timeout.
#[tokio::test(start_paused = true)]
async fn sanity() {
let manager = Manager::new();
let mut chokers = Vec::new();
let mut chokers: Vec<_> = iter::repeat_with(|| manager.new_choker())
.take(MAX_UNCHOKED_COUNT + 1)
.collect();

for _ in 0..(MAX_UNCHOKED_COUNT + 1) {
chokers.push(manager.new_choker());
}

for i in 0..MAX_UNCHOKED_COUNT {
for choker in chokers.iter_mut().take(MAX_UNCHOKED_COUNT) {
assert_matches!(
chokers[i].try_get_permit().await,
choker.try_get_permit().await,
Some(GetPermitResult::Granted)
);
}
Expand Down
69 changes: 36 additions & 33 deletions lib/src/network/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,48 +66,29 @@ impl Client {
self.receive_filter.reset().await?;

let mut reload_index_rx = self.vault.store().client_reload_index_tx.subscribe();

let mut block_promise_acceptor = self.block_tracker.acceptor();
let mut block_offers = self.block_tracker.offers();

// We're making sure to not send more requests than MAX_PENDING_RESPONSES, but there may be
// some unsolicited responses and also the peer may be malicious and send us too many
// responses (so we shoulnd't use unbounded_channel).
let (queued_responses_tx, queued_responses_rx) = mpsc::channel(2 * MAX_PENDING_RESPONSES);

let mut handle_queued_responses = pin!(self.handle_responses(queued_responses_rx));

// NOTE: It is important to keep `remove`ing requests from `pending_requests` in parallel
// with response handling. It is because response handling may take a long time (e.g. due
// to acquiring database write transaction if there are other write transactions currently
// going on - such as when forking) and so waiting for it could cause some requests in
// `pending_requests` to time out.
let mut move_from_pending_into_queued = pin!(async {
loop {
let response = match rx.recv().await {
Some(response) => response,
None => break,
};

let response = match self.pending_requests.remove(response) {
Some(response) => response,
// Unsolicited and non-root response.
None => continue,
};

if queued_responses_tx.send(response).await.is_err() {
break;
}
}
});
let mut prepare_responses = pin!(self.prepare_responses(rx, queued_responses_tx));
let mut handle_responses = pin!(self.handle_responses(queued_responses_rx));

loop {
select! {
block_promise = block_promise_acceptor.accept() => {
block_offer = block_offers.next() => {
let debug = PendingDebugRequest::start();
self.enqueue_request(PendingRequest::Block(block_promise, debug));
self.enqueue_request(PendingRequest::Block(block_offer, debug));
}
_ = &mut move_from_pending_into_queued => break,
result = &mut handle_queued_responses => {
_ = &mut prepare_responses => break,
result = &mut handle_responses => {
result?;
break;
}
Expand All @@ -129,6 +110,29 @@ impl Client {
self.request_tx.send((request, Instant::now())).unwrap();
}

async fn prepare_responses(
&self,
rx: &mut mpsc::Receiver<Response>,
tx: mpsc::Sender<PendingResponse>,
) {
loop {
let response = match rx.recv().await {
Some(response) => response,
None => break,
};

let response = match self.pending_requests.remove(response) {
Some(response) => response,
// Unsolicited and non-root response.
None => continue,
};

if tx.send(response).await.is_err() {
break;
}
}
}

async fn handle_responses(
&self,
mut queued_responses_rx: mpsc::Receiver<PendingResponse>,
Expand Down Expand Up @@ -328,12 +332,12 @@ impl Client {
match self.vault.block_request_mode {
BlockRequestMode::Lazy => {
for node in status.request_blocks {
self.block_tracker.offer(node.block_id, offer_state);
self.block_tracker.register(node.block_id, offer_state);
}
}
BlockRequestMode::Greedy => {
for node in status.request_blocks {
if self.block_tracker.offer(node.block_id, offer_state) {
if self.block_tracker.register(node.block_id, offer_state) {
self.vault.block_tracker.require(node.block_id);
}
}
Expand Down Expand Up @@ -485,16 +489,15 @@ fn start_sender(

monitor.request_queued_metric.record(time_created.elapsed());

let msg = request.to_message();

if !pending_requests.insert(request, peer_permit, client_permit) {
let Some(request) = pending_requests.insert(request, peer_permit, client_permit)
else {
// The same request is already in-flight.
continue;
}
};

*monitor.total_requests_cummulative.get() += 1;

tx.send(Content::Request(msg)).await.unwrap_or(());
tx.send(Content::Request(request)).await.unwrap_or(());
}

// Don't exist so we don't need to check whether `request_tx.send()` fails or not.
Expand Down
Loading

0 comments on commit 3e0bb09

Please sign in to comment.