Skip to content

Commit

Permalink
Clients that failed to receive a block shouldn't retry right a way
Browse files Browse the repository at this point in the history
This patch prevents clients that timed out or responded with not-found
error to acquire block promise right a way.
  • Loading branch information
inetic committed Oct 23, 2023
1 parent 77ce357 commit 01bbcdb
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 60 deletions.
135 changes: 78 additions & 57 deletions lib/src/block_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl BlockTracker {
shared: Arc::new(Shared {
inner: BlockingMutex::new(Inner {
missing_blocks: HashMap::default(),
clients: Slab::new(),
offering_clients: Slab::new(),
}),
notify_tx,
}),
Expand All @@ -32,15 +32,17 @@ impl BlockTracker {
pub fn require(&self, block_id: BlockId) {
let mut inner = self.shared.inner.lock().unwrap();

let missing_block = inner
.missing_blocks
.entry(block_id)
.or_insert_with(|| MissingBlock {
clients: HashSet::default(),
accepted_by: None,
required: false,
approved: false,
});
let missing_block =
inner
.missing_blocks
.entry(block_id)
.or_insert_with(|| MissingBlockState {
offering_clients: HashSet::default(),
currently_accepted_by: None,
previously_accepted_by: Default::default(),
required: false,
approved: false,
});

if missing_block.required {
return;
Expand All @@ -50,16 +52,17 @@ impl BlockTracker {

missing_block.required = true;

if !missing_block.clients.is_empty() {
if !missing_block.offering_clients.is_empty() {
self.shared.notify();
}
}

/// Approve the block request if offered.
pub fn approve(&self, block_id: &BlockId) {
/// Approve the block request if offered. This is called when `quota` is not `None`, otherwise
/// blocks are pre-approved from `TrackerClient::offer(block_id, OfferState::Approved)`.
pub fn approve(&self, block_id: BlockId) {
let mut inner = self.shared.inner.lock().unwrap();

let Some(missing_block) = inner.missing_blocks.get_mut(block_id) else {
let Some(missing_block) = inner.missing_blocks.get_mut(&block_id) else {
return;
};

Expand All @@ -72,23 +75,23 @@ impl BlockTracker {
missing_block.approved = true;

// If required and offered, notify the waiting acceptors.
if missing_block.required && !missing_block.clients.is_empty() {
if missing_block.required && !missing_block.offering_clients.is_empty() {
self.shared.notify();
}
}

pub fn client(&self) -> BlockTrackerClient {
pub fn client(&self) -> TrackerClient {
let client_id = self
.shared
.inner
.lock()
.unwrap()
.clients
.offering_clients
.insert(HashSet::default());

let notify_rx = self.shared.notify_tx.subscribe();

BlockTrackerClient {
TrackerClient {
shared: self.shared.clone(),
client_id,
notify_rx,
Expand All @@ -102,13 +105,13 @@ pub(crate) enum OfferState {
Approved,
}

pub(crate) struct BlockTrackerClient {
pub(crate) struct TrackerClient {
shared: Arc<Shared>,
client_id: ClientId,
notify_rx: watch::Receiver<()>,
}

impl BlockTrackerClient {
impl TrackerClient {
pub fn acceptor(&self) -> BlockPromiseAcceptor {
BlockPromiseAcceptor {
shared: self.shared.clone(),
Expand All @@ -118,29 +121,35 @@ impl BlockTrackerClient {
}

/// Offer to request the given block by the client with `client_id` if it is, or will become,
/// required. Returns `true` if this block was offered for the first time (by any client), `false` if it was
/// already offered before but not yet accepted or cancelled.
/// required and approved. Returns `true` if this block was offered for the first time (by any
/// client), `false` if it was already offered before but not yet accepted or cancelled.
pub fn offer(&self, block_id: BlockId, state: OfferState) -> bool {
let mut inner = self.shared.inner.lock().unwrap();

if !inner.clients[self.client_id].insert(block_id) {
if !inner.offering_clients[self.client_id].insert(block_id) {
// Already offered
return false;
}

tracing::trace!(?block_id, ?state, "offer");

let missing_block = inner
.missing_blocks
.entry(block_id)
.or_insert_with(|| MissingBlock {
clients: HashSet::default(),
accepted_by: None,
required: false,
approved: false,
});
let missing_block =
inner
.missing_blocks
.entry(block_id)
.or_insert_with(|| MissingBlockState {
offering_clients: HashSet::default(),
currently_accepted_by: None,
previously_accepted_by: Default::default(),
required: false,
approved: false,
});

missing_block.offering_clients.insert(self.client_id);

missing_block.clients.insert(self.client_id);
// The peer could have previously lied about having the block (e.g. block expired). Now
// it's claiming to have it again so we need to reconsider loading from it again.
missing_block.previously_accepted_by.remove(&self.client_id);

match state {
OfferState::Approved => {
Expand All @@ -154,22 +163,25 @@ impl BlockTrackerClient {
}
}

impl Drop for BlockTrackerClient {
impl Drop for TrackerClient {
fn drop(&mut self) {
let mut inner = self.shared.inner.lock().unwrap();
let block_ids = inner.clients.remove(self.client_id);
let block_ids = inner.offering_clients.remove(self.client_id);
let mut notify = false;

for block_id in block_ids {
// unwrap is ok because of the invariant in `Inner`
let missing_block = inner.missing_blocks.get_mut(&block_id).unwrap();

missing_block.clients.remove(&self.client_id);
missing_block.offering_clients.remove(&self.client_id);

if missing_block.unaccept_by(self.client_id) {
notify = true;
}

// When the client reconnects, we allow it to accept the again.
missing_block.previously_accepted_by.remove(&self.client_id);

// TODO: if the block hasn't other offers and isn't required, remove it
}

Expand All @@ -186,8 +198,12 @@ pub(crate) struct BlockPromiseAcceptor {
}

impl BlockPromiseAcceptor {
/// Returns the next required and offered block request. If there is no such request at the
/// moment this function is called, waits until one appears.
/// Returns the next required, offered and approved block request. If there is no such request
/// at the moment this function is called, waits until one appears.
///
/// When the client receives this promise, it can request the block from the peer. The peer
/// either responds and the client can fullfill the promise, or the promise can time out (or be
/// dropped). If the latter, another will `accept` the promise.
///
/// # Cancel safety
///
Expand All @@ -211,15 +227,19 @@ impl BlockPromiseAcceptor {
let inner = &mut *inner;

// TODO: OPTIMIZE (but profile first) this linear lookup
for block_id in &inner.clients[self.client_id] {
for block_id in &inner.offering_clients[self.client_id] {
// unwrap is ok because of the invariant in `Inner`
let missing_block = inner.missing_blocks.get_mut(block_id).unwrap();

if missing_block.required
&& missing_block.approved
&& missing_block.accepted_by.is_none()
&& missing_block.currently_accepted_by.is_none()
&& !missing_block
.previously_accepted_by
.contains(&self.client_id)
{
missing_block.accepted_by = Some(self.client_id);
missing_block.currently_accepted_by = Some(self.client_id);
missing_block.previously_accepted_by.insert(self.client_id);

return Some(BlockPromise {
shared: self.shared.clone(),
Expand Down Expand Up @@ -255,8 +275,8 @@ impl BlockPromise {
return;
};

for client_id in missing_block.clients {
if let Some(block_ids) = inner.clients.get_mut(client_id) {
for client_id in missing_block.offering_clients {
if let Some(block_ids) = inner.offering_clients.get_mut(client_id) {
block_ids.remove(&self.block_id);
}
}
Expand Down Expand Up @@ -284,7 +304,7 @@ impl Drop for BlockPromise {

let mut inner = self.shared.inner.lock().unwrap();

let client = match inner.clients.get_mut(self.client_id) {
let client = match inner.offering_clients.get_mut(self.client_id) {
Some(client) => client,
None => return,
};
Expand All @@ -295,7 +315,7 @@ impl Drop for BlockPromise {

// unwrap is ok because of the invariant in `Inner`
let missing_block = inner.missing_blocks.get_mut(&self.block_id).unwrap();
missing_block.clients.remove(&self.client_id);
missing_block.offering_clients.remove(&self.client_id);

if missing_block.unaccept_by(self.client_id) {
self.shared.notify();
Expand All @@ -316,31 +336,32 @@ impl Shared {

// Invariant: for all `block_id` and `client_id` such that
//
// missing_blocks[block_id].clients.contains(client_id)
// missing_blocks[block_id].offering_clients.contains(client_id)
//
// it must hold that
//
// clients[client_id].contains(block_id)
// offering_clients[client_id].contains(block_id)
//
// and vice-versa.
struct Inner {
missing_blocks: HashMap<BlockId, MissingBlock>,
clients: Slab<HashSet<BlockId>>,
missing_blocks: HashMap<BlockId, MissingBlockState>,
offering_clients: Slab<HashSet<BlockId>>,
}

#[derive(Debug)]
struct MissingBlock {
clients: HashSet<ClientId>,
accepted_by: Option<ClientId>,
struct MissingBlockState {
offering_clients: HashSet<ClientId>,
currently_accepted_by: Option<ClientId>,
previously_accepted_by: HashSet<ClientId>,
required: bool,
approved: bool,
}

impl MissingBlock {
impl MissingBlockState {
fn unaccept_by(&mut self, client_id: ClientId) -> bool {
if let Some(accepted_by) = &self.accepted_by {
if accepted_by == &client_id {
self.accepted_by = None;
if let Some(currently_accepted_by) = &self.currently_accepted_by {
if currently_accepted_by == &client_id {
self.currently_accepted_by = None;
return true;
}
}
Expand Down Expand Up @@ -563,7 +584,7 @@ mod tests {
client.offer(block.id, OfferState::Pending);
assert!(client.acceptor().try_accept().is_none());

tracker.approve(&block.id);
tracker.approve(block.id);
assert_eq!(
client
.acceptor()
Expand Down
4 changes: 2 additions & 2 deletions lib/src/network/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::{
pending::{PendingRequest, PendingRequests, PendingResponse},
};
use crate::{
block_tracker::{BlockPromise, BlockTrackerClient, OfferState},
block_tracker::{BlockPromise, OfferState, TrackerClient},
crypto::{sign::PublicKey, CacheHash, Hashable},
error::{Error, Result},
protocol::{
Expand All @@ -28,7 +28,7 @@ pub(super) struct Client {
pending_requests: Arc<PendingRequests>,
request_tx: mpsc::UnboundedSender<(PendingRequest, Instant)>,
receive_filter: ReceiveFilter,
block_tracker: BlockTrackerClient,
block_tracker: TrackerClient,
_request_sender: ScopedJoinHandle<()>,
}

Expand Down
2 changes: 1 addition & 1 deletion lib/src/repository/vault.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl Vault {
let mut block_ids = tx.missing_block_ids_in_branch(branch_id);

while let Some(block_id) = block_ids.try_next().await? {
self.block_tracker.approve(&block_id);
self.block_tracker.approve(block_id);
}

Ok(())
Expand Down

0 comments on commit 01bbcdb

Please sign in to comment.