Skip to content

Commit

Permalink
Remove nodes from ReceiveFilter on block expiration
Browse files Browse the repository at this point in the history
And also when BlockNotFound message arrives. This is so that nodes can
re-download relevant portions of the index.
  • Loading branch information
inetic committed Aug 30, 2023
1 parent 6ecf019 commit b604415
Show file tree
Hide file tree
Showing 15 changed files with 247 additions and 42 deletions.
40 changes: 38 additions & 2 deletions lib/src/network/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use crate::{
crypto::{sign::PublicKey, CacheHash, Hashable},
error::{Error, Result},
protocol::{
BlockData, BlockNonce, InnerNodeMap, LeafNodeSet, MultiBlockPresence, UntrustedProof,
BlockData, BlockId, BlockNonce, InnerNodeMap, LeafNodeSet, MultiBlockPresence,
UntrustedProof,
},
repository::{BlockRequestMode, RepositoryMonitor, Vault},
store::{self, ReceiveFilter},
Expand Down Expand Up @@ -64,6 +65,8 @@ impl Client {
pub async fn run(&mut self, rx: &mut mpsc::Receiver<Response>) -> Result<()> {
self.receive_filter.reset().await?;

let mut reload_index_rx = self.vault.store().client_reload_index_tx.subscribe();

let mut block_promise_acceptor = self.block_tracker.acceptor();

// We're making sure to not send more requests than MAX_PENDING_RESPONSES, but there may be
Expand Down Expand Up @@ -108,6 +111,12 @@ impl Client {
result?;
break;
}
branch_to_reload = reload_index_rx.recv() => {
match branch_to_reload {
Ok(branch_to_reload) => self.reload_index(&branch_to_reload),
Err(_) => (),
}
}
}
}

Expand Down Expand Up @@ -186,6 +195,17 @@ impl Client {
.measure_ok(self.handle_block(data, nonce, block_promise, debug))
.await
}
PendingResponse::BlockNotFound {
block_id,
permit: _permit,
debug,
} => {
self.vault
.monitor
.handle_block_not_found_metric
.measure_ok(self.handle_block_not_found(block_id, debug))
.await
}
}
}

Expand Down Expand Up @@ -347,6 +367,18 @@ impl Client {
}
}

#[instrument(skip_all, fields(block_id), err(Debug))]
async fn handle_block_not_found(
&self,
block_id: BlockId,
_debug: DebugReceivedResponse,
) -> Result<()> {
tracing::trace!("Client received block not found {:?}", block_id);
self.vault
.receive_block_not_found(block_id, &self.receive_filter)
.await
}

// Request again the branches that became completed. This is to cover the following edge
// case:
//
Expand All @@ -366,7 +398,7 @@ impl Client {
// requested as soon as possible.
fn refresh_branches(&self, branches: &[PublicKey]) {
for branch_id in branches {
self.enqueue_request(PendingRequest::RootNode(*branch_id, DebugRequest::start()));
self.reload_index(branch_id);
}
}

Expand Down Expand Up @@ -406,6 +438,10 @@ impl Client {
);
}
}

fn reload_index(&self, branch_id: &PublicKey) {
self.enqueue_request(PendingRequest::RootNode(*branch_id, DebugRequest::start()));
}
}

fn start_sender(
Expand Down
6 changes: 0 additions & 6 deletions lib/src/network/dht_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,6 @@ pub const DHT_ROUTERS: &[&str] = &[
pub const MIN_DHT_ANNOUNCE_DELAY: Duration = Duration::from_secs(3 * 60);
pub const MAX_DHT_ANNOUNCE_DELAY: Duration = Duration::from_secs(6 * 60);

#[derive(Clone)]
pub struct ActiveDhtNodes {
pub good: HashSet<SocketAddr>,
pub questionable: HashSet<SocketAddr>,
}

#[async_trait]
pub trait DhtContactsStoreTrait: Sync + Send + 'static {
async fn load_v4(&self) -> io::Result<HashSet<SocketAddrV4>>;
Expand Down
12 changes: 12 additions & 0 deletions lib/src/network/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ pub(super) enum PendingResponse {
permit: Option<ClientPermit>,
debug: DebugReceivedResponse,
},
BlockNotFound {
block_id: BlockId,
permit: Option<ClientPermit>,
debug: DebugReceivedResponse,
},
}

pub(super) struct PendingRequests {
Expand Down Expand Up @@ -184,6 +189,13 @@ impl PendingRequests {
};
Some(r)
}
ProcessedResponse::Failure(processed_response::Failure::Block(block_id, debug)) => {
Some(PendingResponse::BlockNotFound {
block_id,
permit,
debug,
})
}
ProcessedResponse::Failure(_) => None,
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/network/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl<'a> Responder<'a> {
.store()
.acquire_read()
.await?
.read_block(&id, &mut content)
.read_block_on_peer_request(&id, &mut content, &self.vault.block_tracker)
.await;

match result {
Expand Down
4 changes: 4 additions & 0 deletions lib/src/protocol/summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ impl NodeState {
matches!(self, Self::Approved)
}

pub fn is_incomplete(self) -> bool {
matches!(self, Self::Incomplete)
}

pub fn update(&mut self, other: Self) {
*self = match (*self, other) {
(Self::Incomplete, _) | (_, Self::Incomplete) => Self::Incomplete,
Expand Down
3 changes: 3 additions & 0 deletions lib/src/repository/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub(crate) struct RepositoryMonitor {
pub handle_inner_nodes_metric: Metric,
pub handle_leaf_nodes_metric: Metric,
pub handle_block_metric: Metric,
pub handle_block_not_found_metric: Metric,
pub request_queued_metric: Metric,
pub request_inflight_metric: Metric,
pub handle_request_metric: Metric,
Expand Down Expand Up @@ -65,6 +66,7 @@ impl RepositoryMonitor {
let handle_inner_nodes_metric = metrics.get("handle_inner_node");
let handle_leaf_nodes_metric = metrics.get("handle_leaf_node");
let handle_block_metric = metrics.get("handle_block");
let handle_block_not_found_metric = metrics.get("handle_block_not_found");
let request_queued_metric = metrics.get("request queued");
let request_inflight_metric = metrics.get("request inflight");
let handle_request_metric = metrics.get("handle_request");
Expand All @@ -89,6 +91,7 @@ impl RepositoryMonitor {
handle_inner_nodes_metric,
handle_leaf_nodes_metric,
handle_block_metric,
handle_block_not_found_metric,
request_queued_metric,
request_inflight_metric,
handle_request_metric,
Expand Down
30 changes: 25 additions & 5 deletions lib/src/repository/vault.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,19 @@ use crate::{
debug::DebugPrinter,
error::{Error, Result},
event::{EventSender, Payload},
progress::Progress,
protocol::{
BlockData, BlockNonce, InnerNodeMap, LeafNodeSet, MultiBlockPresence, ProofError, RootNode,
BlockData, BlockId, BlockNonce, InnerNodeMap, LeafNodeSet, MultiBlockPresence, ProofError,
UntrustedProof,
},
storage_size::StorageSize,
store::{
self, BlockIdsPage, InnerNodeReceiveStatus, LeafNodeReceiveStatus, ReceiveFilter,
RootNodeReceiveStatus, Store, WriteTransaction,
self, InnerNodeReceiveStatus, LeafNodeReceiveStatus, ReceiveFilter, RootNodeReceiveStatus,
Store, WriteTransaction,
},
};
use futures_util::TryStreamExt;
use sqlx::Row;
use std::{sync::Arc, time::Duration};
use tracing::Level;

#[derive(Clone)]
pub(crate) struct Vault {
Expand Down Expand Up @@ -165,6 +163,28 @@ impl Vault {
Ok(())
}

/// Receive a message that the block has been found on the peer.
pub async fn receive_block_not_found(
&self,
block_id: BlockId,
receive_filter: &ReceiveFilter,
) -> Result<()> {
// We received a 'block not found' because we sent a request for the block, and we sent
// that request because the index that we downloaded from the peer indicated that the peer
// had the block. But it could have been lying and the block at the peer could have
// expired. If that's the case, then the peer should have updated their index and we'll
// need to re-download the part referring to the `block_id`. Thus we need to remove that
// part from the `receive_filter`.

self.store()
.begin_write()
.await?
.remove_from_receive_filter_index_nodes_for(block_id, receive_filter)
.await?;

Ok(())
}

pub fn metadata(&self) -> Metadata {
Metadata::new(self.store().db().clone())
}
Expand Down
4 changes: 2 additions & 2 deletions lib/src/repository/vault_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ async fn receive_bumped_root_node() {
.await;

let node = vault
.store
.store()
.acquire_read()
.await
.unwrap()
Expand All @@ -451,7 +451,7 @@ async fn receive_bumped_root_node() {
.unwrap();

let node = vault
.store
.store()
.acquire_read()
.await
.unwrap()
Expand Down
23 changes: 4 additions & 19 deletions lib/src/store/block_expiration_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ pub(crate) struct BlockExpirationTracker {
}

impl BlockExpirationTracker {
pub fn pool(&self) -> &db::Pool {
&self.pool
}

pub async fn enable_expiration(
pool: db::Pool,
expiration_time: Duration,
Expand Down Expand Up @@ -81,25 +85,6 @@ impl BlockExpirationTracker {
self.watch_tx.send(()).unwrap_or(());
}

pub async fn set_as_missing_if_expired(&self, block: &BlockId) -> Result<(), Error> {
let mut tx = self.pool.begin_write().await?;

sqlx::query(
"UPDATE snapshot_leaf_nodes
SET block_presence = ?
WHERE block_id = ? AND block_presence = ?",
)
.bind(SingleBlockPresence::Missing)
.bind(&block)
.bind(SingleBlockPresence::Expired)
.execute(&mut tx)
.await?;

tx.commit().await?;

Ok(())
}

pub fn handle_block_removed(&self, block: &BlockId) {
self.shared.lock().unwrap().handle_block_removed(block);
}
Expand Down
1 change: 1 addition & 0 deletions lib/src/store/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub(super) struct ReceiveStatus {
}

/// Reason for updating the summary
#[derive(Debug)]
pub(super) enum UpdateSummaryReason {
/// Updating summary because a block was removed
BlockRemoved,
Expand Down
14 changes: 13 additions & 1 deletion lib/src/store/inner_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
db,
protocol::{InnerNode, InnerNodeMap, LeafNodeSet, Summary, EMPTY_INNER_HASH, EMPTY_LEAF_HASH},
};
use futures_util::{future, TryStreamExt};
use futures_util::{future, Stream, TryStreamExt};
use sqlx::Row;
use std::convert::TryInto;

Expand Down Expand Up @@ -57,6 +57,18 @@ pub(super) async fn load_children(
.map_err(From::from)
}

/// Load all inner nodes with the specified parent hash.
pub(super) fn load_parent_hashes<'a>(
conn: &'a mut db::Connection,
hash: &'a Hash,
) -> impl Stream<Item = Result<Hash, Error>> + 'a {
sqlx::query("SELECT parent FROM snapshot_inner_nodes WHERE hash = ?")
.bind(hash)
.fetch(conn)
.map_ok(|row| row.get(0))
.err_into()
}

pub(super) async fn load(
conn: &mut db::Connection,
hash: &Hash,
Expand Down
24 changes: 24 additions & 0 deletions lib/src/store/leaf_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,30 @@ pub(super) async fn set_missing(
Ok(())
}

/// Returns true the block changed status from expired to missing
pub(super) async fn set_missing_if_expired(
tx: &mut db::WriteTransaction,
block: &BlockId,
) -> Result<bool, Error> {
let result = sqlx::query(
"UPDATE snapshot_leaf_nodes
SET block_presence = ?
WHERE block_id = ? AND block_presence = ?",
)
.bind(SingleBlockPresence::Missing)
.bind(&block)
.bind(SingleBlockPresence::Expired)
.execute(tx)
.await?;

if result.rows_affected() > 0 {
tracing::warn!("Marking 'Expired' block {block:?} as 'Missing'");
return Ok(true);
}

Ok(false)
}

// Filter nodes that the remote replica has a block for but the local one is missing it.
pub(super) async fn filter_nodes_with_new_blocks(
conn: &mut db::Connection,
Expand Down
Loading

0 comments on commit b604415

Please sign in to comment.