Skip to content

Commit

Permalink
Move more functions to the right module in linera-core. (#2895)
Browse files Browse the repository at this point in the history
* Move blob ID error checks into remote_node.

* ValidatorUpdater::send_optimized_certificate → RemoteNode::handle_optimized_certificate.

* Move next_block_heights and chain_info to LocalNode.

* Fix formatting, spelling.

* check_blobs_not_found_error → check_blobs_not_found
  • Loading branch information
afck authored Nov 14, 2024
1 parent 258c3f4 commit 2eba9c2
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 103 deletions.
49 changes: 14 additions & 35 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use dashmap::{
use futures::{
future::{self, try_join_all, FusedFuture, Future},
stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt},
TryStreamExt as _,
};
#[cfg(not(target_arch = "wasm32"))]
use linera_base::data_types::Bytecode;
Expand Down Expand Up @@ -305,7 +304,7 @@ where
let mut validators = validators.iter().collect::<Vec<_>>();
validators.shuffle(&mut rand::thread_rng());
for remote_node in validators {
let info = self.local_node.local_chain_info(chain_id).await?;
let info = self.local_node.chain_info(chain_id).await?;
if target_next_block_height <= info.next_block_height {
return Ok(info);
}
Expand All @@ -317,7 +316,7 @@ where
)
.await?;
}
let info = self.local_node.local_chain_info(chain_id).await?;
let info = self.local_node.chain_info(chain_id).await?;
if target_next_block_height <= info.next_block_height {
Ok(info)
} else {
Expand Down Expand Up @@ -1304,8 +1303,13 @@ where

// Obtain the next block height we need in the local node, for each chain.
let local_next_heights = self
.local_next_block_heights(remote_max_heights.keys(), chain_worker_limit)
.await?;
.client
.local_node
.next_block_heights(remote_max_heights.keys(), chain_worker_limit)
.await
.map_err(|error| NodeError::LocalError {
error: error.to_string(),
})?;

// We keep track of the height we've successfully downloaded and checked, per chain.
let mut downloaded_heights = BTreeMap::new();
Expand Down Expand Up @@ -1706,7 +1710,7 @@ where

self.client
.local_node
.local_chain_info(chain_id)
.chain_info(chain_id)
.await
.map_err(Into::into)
}
Expand All @@ -1719,7 +1723,7 @@ where
remote_node: &RemoteNode<P::Node>,
chain_id: ChainId,
) -> Result<(), ChainClientError> {
let local_info = self.client.local_node.local_chain_info(chain_id).await?;
let local_info = self.client.local_node.chain_info(chain_id).await?;
let range = BlockHeightRange {
start: local_info.next_block_height,
limit: None,
Expand Down Expand Up @@ -1797,8 +1801,8 @@ where
Ok(())
}

/// Downloads and processes from the specified validator a confirmed block certificate that
/// uses the given blob. If this succeeds, the blob will be in our storage.
/// Downloads and processes from the specified validator a confirmed block certificates that
/// use the given blobs. If this succeeds, the blob will be in our storage.
async fn update_local_node_with_blobs_from(
&self,
blob_ids: Vec<BlobId>,
Expand Down Expand Up @@ -3071,7 +3075,7 @@ where
chain_id: ChainId,
local_node: &mut LocalNodeClient<S>,
) -> Option<Box<ChainInfo>> {
let Ok(info) = local_node.local_chain_info(chain_id).await else {
let Ok(info) = local_node.chain_info(chain_id).await else {
error!("Fail to read local chain info for {chain_id}");
return None;
};
Expand Down Expand Up @@ -3331,31 +3335,6 @@ where
Ok(())
}

/// Given a list of chain IDs, returns a map that assigns to each of them the next block
/// height, i.e. the lowest block height that we have not processed in the local node yet.
///
/// It makes at most `chain_worker_limit` requests to the local node in parallel.
async fn local_next_block_heights(
&self,
chain_ids: impl IntoIterator<Item = &ChainId>,
chain_worker_limit: usize,
) -> Result<BTreeMap<ChainId, BlockHeight>, NodeError> {
let futures = chain_ids
.into_iter()
.map(|chain_id| async move {
let local_info = self.client.local_node.local_chain_info(*chain_id).await?;
Ok::<_, LocalNodeError>((*chain_id, local_info.next_block_height))
})
.collect::<Vec<_>>();
stream::iter(futures)
.buffer_unordered(chain_worker_limit)
.try_collect()
.await
.map_err(|error| NodeError::LocalNodeQuery {
error: error.to_string(),
})
}

/// Given a set of chain ID-block height pairs, returns a map that assigns to each chain ID
/// the highest height seen.
fn max_height_per_chain(remote_log: &[ChainAndHeight]) -> BTreeMap<ChainId, BlockHeight> {
Expand Down
67 changes: 34 additions & 33 deletions linera-core/src/local_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
// SPDX-License-Identifier: Apache-2.0

use std::{
collections::{HashSet, VecDeque},
collections::{BTreeMap, VecDeque},
sync::Arc,
};

use futures::{stream, StreamExt as _, TryStreamExt as _};
use linera_base::{
data_types::{ArithmeticError, Blob, UserApplicationDescription},
data_types::{ArithmeticError, Blob, BlockHeight, UserApplicationDescription},
identifiers::{BlobId, ChainId, MessageId, UserApplicationId},
};
use linera_chain::{
data_types::{Block, BlockProposal, ExecutedBlock},
types::{Certificate, CertificateValue, ConfirmedBlockCertificate, LiteCertificate},
types::{Certificate, ConfirmedBlockCertificate, LiteCertificate},
ChainStateView,
};
use linera_execution::{Query, Response};
Expand Down Expand Up @@ -182,42 +183,18 @@ where
}

/// Given a list of missing `BlobId`s and a `Certificate` for a block:
/// - Makes sure they're required by the block provided
/// - Makes sure there's no duplicate blobs being requested
/// - Searches for the blob in different places of the local node: blob cache,
/// chain manager's pending blobs, and blob storage.
/// - Returns `None` if not all blobs could be found.
pub async fn find_missing_blobs(
&self,
certificate: &Certificate,
missing_blob_ids: &Vec<BlobId>,
mut missing_blob_ids: Vec<BlobId>,
chain_id: ChainId,
) -> Result<Option<Vec<Blob>>, NodeError> {
if missing_blob_ids.is_empty() {
return Ok(Some(Vec::new()));
}

// Find the missing blobs locally and retry.
let required = match certificate.inner() {
CertificateValue::ConfirmedBlock(confirmed) => confirmed.inner().required_blob_ids(),
CertificateValue::ValidatedBlock(validated) => validated.inner().required_blob_ids(),
CertificateValue::Timeout(_) => HashSet::new(),
};
for blob_id in missing_blob_ids {
if !required.contains(blob_id) {
warn!(
"validator requested blob {:?} but it is not required",
blob_id
);
return Err(NodeError::InvalidChainInfoResponse);
}
}
let mut unique_missing_blob_ids = missing_blob_ids.iter().cloned().collect::<HashSet<_>>();
if missing_blob_ids.len() > unique_missing_blob_ids.len() {
warn!("blobs requested by validator contain duplicates");
return Err(NodeError::InvalidChainInfoResponse);
}

let mut chain_manager_pending_blobs = self
.chain_state_view(chain_id)
.await?
Expand All @@ -226,16 +203,18 @@ where
.pending_blobs
.clone();
let mut found_blobs = Vec::new();
for blob_id in missing_blob_ids {
missing_blob_ids.retain(|blob_id| {
if let Some(blob) = chain_manager_pending_blobs.remove(blob_id) {
found_blobs.push(blob);
unique_missing_blob_ids.remove(blob_id);
false
} else {
true
}
}
});

let storage = self.storage_client();
let Some(read_blobs) = storage
.read_blobs(&unique_missing_blob_ids.into_iter().collect::<Vec<_>>())
.read_blobs(&missing_blob_ids)
.await?
.into_iter()
.collect::<Option<Vec<_>>>()
Expand All @@ -260,7 +239,7 @@ where
}

#[instrument(level = "trace", skip(self))]
pub(crate) async fn local_chain_info(
pub(crate) async fn chain_info(
&self,
chain_id: ChainId,
) -> Result<Box<ChainInfo>, LocalNodeError> {
Expand Down Expand Up @@ -332,4 +311,26 @@ where
}
Ok(())
}

/// Given a list of chain IDs, returns a map that assigns to each of them the next block
/// height, i.e. the lowest block height that we have not processed in the local node yet.
///
/// It makes at most `chain_worker_limit` requests to the local node in parallel.
pub async fn next_block_heights(
&self,
chain_ids: impl IntoIterator<Item = &ChainId>,
chain_worker_limit: usize,
) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
let futures = chain_ids
.into_iter()
.map(|chain_id| async move {
let local_info = self.chain_info(*chain_id).await?;
Ok::<_, LocalNodeError>((*chain_id, local_info.next_block_height))
})
.collect::<Vec<_>>();
stream::iter(futures)
.buffer_unordered(chain_worker_limit)
.try_collect()
.await
}
}
4 changes: 4 additions & 0 deletions linera-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ pub enum NodeError {

#[error("Node failed to provide a 'last used by' certificate for the blob")]
InvalidCertificateForBlob(BlobId),
#[error("Node returned a BlobsNotFound error with duplicates")]
DuplicatesInBlobsNotFound,
#[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
UnexpectedEntriesInBlobsNotFound,
#[error("Local error handling validator response")]
LocalError { error: String },
}
Expand Down
60 changes: 58 additions & 2 deletions linera-core/src/remote_node.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

use custom_debug_derive::Debug;
use futures::{stream::FuturesUnordered, StreamExt};
Expand All @@ -13,7 +13,7 @@ use linera_base::{
};
use linera_chain::{
data_types::BlockProposal,
types::{Certificate, ConfirmedBlockCertificate, LiteCertificate},
types::{Certificate, CertificateValue, ConfirmedBlockCertificate, LiteCertificate},
};
use linera_execution::committee::ValidatorName;
use rand::seq::SliceRandom as _;
Expand Down Expand Up @@ -82,6 +82,29 @@ impl<N: ValidatorNode> RemoteNode<N> {
self.check_and_return_info(response, chain_id)
}

pub(crate) async fn handle_optimized_certificate(
&mut self,
certificate: &Certificate,
delivery: CrossChainMessageDelivery,
) -> Result<Box<ChainInfo>, NodeError> {
if certificate.is_signed_by(&self.name) {
let result = self
.handle_lite_certificate(certificate.lite_certificate(), delivery)
.await;
match result {
Err(NodeError::MissingCertificateValue) => {
warn!(
"Validator {} forgot a certificate value that they signed before",
self.name
);
}
_ => return result,
}
}
self.handle_certificate(certificate.clone(), vec![], delivery)
.await
}

fn check_and_return_info(
&self,
response: ChainInfoResponse,
Expand Down Expand Up @@ -278,4 +301,37 @@ impl<N: ValidatorNode> RemoteNode<N> {
}
Some(blobs)
}

/// Checks that requesting these blobs when trying to handle this certificate is legitimate,
/// i.e. that there are no duplicates and the blobs are actually required.
pub fn check_blobs_not_found(
&self,
certificate: &Certificate,
blob_ids: &[BlobId],
) -> Result<(), NodeError> {
// Find the missing blobs locally and retry.
let required = match certificate.inner() {
CertificateValue::ConfirmedBlock(confirmed) => confirmed.inner().required_blob_ids(),
CertificateValue::ValidatedBlock(validated) => validated.inner().required_blob_ids(),
CertificateValue::Timeout(_) => HashSet::new(),
};
for blob_id in blob_ids {
if !required.contains(blob_id) {
warn!(
"validator {} requested blob {blob_id:?} but it is not required",
self.name
);
return Err(NodeError::UnexpectedEntriesInBlobsNotFound);
}
}
let unique_missing_blob_ids = blob_ids.iter().cloned().collect::<HashSet<_>>();
if blob_ids.len() > unique_missing_blob_ids.len() {
warn!(
"blobs requested by validator {} contain duplicates",
self.name
);
return Err(NodeError::DuplicatesInBlobsNotFound);
}
Ok(())
}
}
Loading

0 comments on commit 2eba9c2

Please sign in to comment.