Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chain: ShardManager: add ability to serve partial chunks from ShardChunk #6377

Merged
merged 16 commits into from
Mar 9, 2022
Merged
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions chain/chunks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ tracing = "0.1.13"
borsh = "0.9"
lru = "0.7.2"
reed-solomon-erasure = "4"
visibility = "*"
mina86 marked this conversation as resolved.
Show resolved Hide resolved

near-crypto = { path = "../../core/crypto" }
near-primitives = { path = "../../core/primitives" }
Expand All @@ -33,3 +34,4 @@ assert_matches = "1.5.0"
[features]
byzantine_asserts = ["near-chain/byzantine_asserts"]
expensive_tests = []
test_features = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment. Also maybe we should call it 'reconstruct_partial_chunk_on_the_fly' or something like that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m planning to make GCing a configurable feature.

180 changes: 167 additions & 13 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ use near_primitives::hash::{hash, CryptoHash};
use near_primitives::merkle::{merklize, verify_path, MerklePath};
use near_primitives::receipt::Receipt;
use near_primitives::sharding::{
ChunkHash, EncodedShardChunk, PartialEncodedChunk, PartialEncodedChunkPart,
PartialEncodedChunkV1, PartialEncodedChunkV2, ReceiptList, ReceiptProof, ReedSolomonWrapper,
ShardChunkHeader, ShardProof,
ChunkHash, EncodedShardChunk, EncodedShardChunkBody, PartialEncodedChunk,
PartialEncodedChunkPart, PartialEncodedChunkV1, PartialEncodedChunkV2, ReceiptList,
ReceiptProof, ReedSolomonWrapper, ShardChunk, ShardChunkHeader, ShardProof,
};
use near_primitives::time::Clock;
use near_primitives::transaction::SignedTransaction;
Expand Down Expand Up @@ -995,13 +995,27 @@ impl ShardsManager {
request: PartialEncodedChunkRequestMsg,
route_back: CryptoHash,
chain_store: &mut ChainStore,
rs: &mut ReedSolomonWrapper,
) {
debug!(target: "chunks", "Received partial encoded chunk request for {:?}, part_ordinals: {:?}, shards: {:?}, I'm {:?}", request.chunk_hash.0, request.part_ords, request.tracking_shards, self.me);

let response = if let Some(entry) = self.encoded_chunks.get(&request.chunk_hash) {
Self::prepare_partial_encoded_chunk_response_from_cache(request, entry)
} else if let Ok(partial_chunk) = chain_store.get_partial_chunk(&request.chunk_hash) {
Self::prepare_partial_encoded_chunk_response_from_partial(request, partial_chunk)
} else if let Ok(chunk) = chain_store.get_chunk(&request.chunk_hash).map(|ch| ch.clone()) {
mina86 marked this conversation as resolved.
Show resolved Hide resolved
// Note: we need to clone the chunk because otherwise we would be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should have metrics for all 3 cases here (can be done in separate PR)

// holding multiple references to chain_store. One through the
// chunk and another through chain_store which we need to pass down
// to do further fetches.

// If we are archival node we might have garbage collected the
// partial chunk while we still keep the chunk itself. We can get
// the chunk, recalculate the parts and respond to the request.
//
// TODO(#6242): This is currently not implemented and effectively
// this is dead code.
self.prepare_partial_encoded_chunk_response_from_chunk(request, chain_store, rs, chunk)
} else {
None
};
Expand All @@ -1013,11 +1027,12 @@ impl ShardsManager {
}
}

/// Prepares response to a partial encoded chunk request from
/// a corresponding encoded_chunks cache entry. If the entry can satisfy
/// the requests (i.e. contains all necessary parts and shards) the method
/// returns a [`PartialEncodedChunkResponseMsg`] object; otherwise returns
/// `None`.
/// Prepares response to a partial encoded chunk request from an entry in
/// a encoded_chunks in-memory cache.
///
/// If the entry can satisfy the requests (i.e. contains all necessary parts
/// and shards) the method returns a [`PartialEncodedChunkResponseMsg`]
/// object; otherwise returns `None`.
fn prepare_partial_encoded_chunk_response_from_cache(
request: PartialEncodedChunkRequestMsg,
entry: &EncodedChunksCacheEntry,
Expand All @@ -1037,11 +1052,13 @@ impl ShardsManager {
)
}

/// Prepares response to a partial encoded chunk request from
/// a corresponding partial chunk read from the storage. If the partial
/// chunk can satisfy the requests (i.e. contains all necessary parts and
/// shards) the method returns a [`PartialEncodedChunkResponseMsg`] object;
/// otherwise returns `None`.
/// Prepares response to a partial encoded chunk request from a partial
/// chunk read from the storage.
///
/// If the partial chunk can satisfy the requests (i.e. contains all
/// necessary parts and shards) the method returns
/// a [`PartialEncodedChunkResponseMsg`] object; otherwise returns `None`.
#[cfg_attr(feature = "test_features", visibility::make(pub))]
mina86 marked this conversation as resolved.
Show resolved Hide resolved
fn prepare_partial_encoded_chunk_response_from_partial(
request: PartialEncodedChunkRequestMsg,
partial_chunk: &PartialEncodedChunk,
Expand Down Expand Up @@ -1075,6 +1092,143 @@ impl ShardsManager {
)
}

/// Prepares response to a partial encoded chunk request from a chunk read
/// from the storage.
///
/// This requires encoding the chunk and as such is computationally
/// expensive operation. If possible, the request should be served from
/// EncodedChunksCacheEntry or PartialEncodedChunk instead.
#[cfg_attr(feature = "test_features", visibility::make(pub))]
fn prepare_partial_encoded_chunk_response_from_chunk(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should move it to a separate file (to keep this lib.rs a little bit smaller?)

Copy link
Contributor Author

@mina86 mina86 Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s not immediately obvious to be me what should be moved to separate file though to be honest. Code that handles partial encoded chunk requests is less than 250 lines so moving just it wouldn’t really help reduce the size of lib.rs which is 2700 lines. This would need some more care and thought to figure out which of the methods in the struct belong together in a separate file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't think this makes sense to move to a separate file, as it seems to be pretty entangled with the rest.

The easiest wins in terms of keeping files small and logic clean are usually around various small helpers. Things like RequestPool or SealsManager seem like better candidates for extraction.

&mut self,
request: PartialEncodedChunkRequestMsg,
chain_store: &mut ChainStore,
rs: &mut ReedSolomonWrapper,
chunk: ShardChunk,
) -> Option<PartialEncodedChunkResponseMsg> {
mina86 marked this conversation as resolved.
Show resolved Hide resolved
// TODO(mina86): Is it correct to use rs.data_shard_count() and
// rs.total_shard_count() here? Can those values change over time?
mina86 marked this conversation as resolved.
Show resolved Hide resolved
let total_parts = rs.total_shard_count();
for ord in request.part_ords.iter() {
mina86 marked this conversation as resolved.
Show resolved Hide resolved
let ord: usize = (*ord).try_into().unwrap();
mina86 marked this conversation as resolved.
Show resolved Hide resolved
if ord >= total_parts {
debug!(target:"chunks", "Not sending {:?}, requested part_ord {} but we only expect {} total",
request.chunk_hash.0, ord, total_parts);
return None;
}
}

// Figure out basic parameters such as shard id, epoch id, protocol
// version and shard layout. Those will be needed later when fetching
// other data and constructing encoded shard chunk.
let header = chunk.cloned_header();
let prev_block_hash = header.prev_block_hash_ref();
let shard_id = header.shard_id();
let shard_layout = self
.runtime_adapter
.get_shard_layout_from_prev_block(prev_block_hash)
.map_err(|err| {
debug!(target: "chunks", "Not sending {:?}, failed to get shard layout: {}", request.chunk_hash.0, err);
})
.ok()?;

// Get outgoing receipts for the chunk and construct vector of their
// proofs.
let outgoing_receipts = chain_store
mina86 marked this conversation as resolved.
Show resolved Hide resolved
.get_outgoing_receipts_for_shard(
&*self.runtime_adapter,
*chunk.prev_block(),
shard_id,
chunk.height_included(),
)
.map_err(|err| {
debug!(target: "chunks", "Not sending {:?}, failed to get outgoing receipts: {}", request.chunk_hash.0, err);
}).ok()?;
let outgoing_receipts_hashes =
Chain::build_receipts_hashes(&outgoing_receipts, &shard_layout);
let (outgoing_receipts_root, outgoing_receipts_proofs) =
merklize(&outgoing_receipts_hashes);
if header.outgoing_receipts_root() != outgoing_receipts_root {
error!(target: "chunks",
"Not sending {:?}, expected outgoing receipts root doesn’t match calculated: {} != {}",
request.chunk_hash.0, header.outgoing_receipts_root(), outgoing_receipts_root);
return None;
}

let mut receipts_by_shard =
mina86 marked this conversation as resolved.
Show resolved Hide resolved
Chain::group_receipts_by_shard(outgoing_receipts.clone(), &shard_layout);
let receipt_proofs: Vec<_> = outgoing_receipts_proofs
.iter()
.enumerate()
.map(|(proof_shard_id, proof)| {
let proof_shard_id = proof_shard_id as u64;
let receipts = receipts_by_shard.remove(&proof_shard_id).unwrap_or_else(Vec::new);
let shard_proof = ShardProof {
from_shard_id: shard_id,
to_shard_id: proof_shard_id,
proof: proof.clone(),
};
ReceiptProof(receipts, shard_proof)
})
.collect();

// Construct EncodedShardChunk. If we earlier determined that we will
// need parity parts, instruct the constructor to calculate them as
// well. Otherwise we won’t bother.
let (parts, encoded_length) = EncodedShardChunk::encode_transaction_receipts(
rs,
chunk.transactions().to_vec(),
&outgoing_receipts).map_err(|err| {
debug!(target: "chunks", "Not sending {:?}, failed to encode transaction receipts: {}", request.chunk_hash.0, err);
}).ok()?;
if header.encoded_length() != encoded_length {
error!(target: "chunks",
"Not sending {:?}, expected encoded length doesn’t match calculated: {} != {}",
request.chunk_hash.0, header.encoded_length(), encoded_length);
return None;
}

let mut content = EncodedShardChunkBody { parts };
if let Err(err) = content.reconstruct(rs) {
error!(target: "chunks",
"Not sending {:?}, failed to reconstruct RS parity parts: {}",
request.chunk_hash.0, err);
return None;
}

let (encoded_merkle_root, merkle_paths) = content.get_merkle_hash_and_paths();
if header.encoded_merkle_root() != encoded_merkle_root {
error!(target: "chunks",
"Not sending {:?}, expected encoded Merkle root doesn’t match calculated: {} != {}",
request.chunk_hash.0, header.encoded_merkle_root(), encoded_merkle_root);
return None;
}

let parts_iter = request.part_ords.into_iter().map(|part_ord| {
let ord: usize = part_ord.try_into().unwrap();
content.parts[ord].take().map(|part| PartialEncodedChunkPart {
part_ord,
part,
merkle_proof: merkle_paths[ord].clone(),
})
});

// Same process for receipts as above for parts.
let present_receipts: HashMap<ShardId, _> =
receipt_proofs.iter().map(|receipt| (receipt.1.to_shard_id, receipt)).collect();
let receipts_iter = request
.tracking_shards
.iter()
.map(|shard_id| present_receipts.get(shard_id).map(|x| *x).cloned());

// Pass iterators to function, same as cache case.
Self::prepare_partial_encoded_chunk_response_from_iters(
request.chunk_hash,
parts_iter,
receipts_iter,
)
}

/// Checks if `parts_iter` and `receipts_iter` contain no `None` elements.
/// It evaluates the iterators only up to the first `None` value (if any);
/// since iterators are lazy this saves some work if there are any `Some`
Expand Down
1 change: 1 addition & 0 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ impl Handler<NetworkClientMessages> for ClientActor {
part_request_msg,
route_back,
self.client.chain.mut_store(),
&mut self.client.rs,
);
NetworkClientResponses::NoResponse
}
Expand Down
3 changes: 2 additions & 1 deletion chain/client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1410,7 +1410,7 @@ impl TestEnv {
}
}

fn get_partial_encoded_chunk_response(
pub fn get_partial_encoded_chunk_response(
&mut self,
id: usize,
request: PartialEncodedChunkRequestMsg,
Expand All @@ -1420,6 +1420,7 @@ impl TestEnv {
request,
CryptoHash::default(),
client.chain.mut_store(),
&mut client.rs,
);
let response = self.network_adapters[id].pop().unwrap();
if let PeerManagerMessageRequest::NetworkRequests(
Expand Down
6 changes: 6 additions & 0 deletions chain/client/src/tests/chunks_management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use near_logger_utils::init_integration_logger;
use near_network::types::NetworkRequests;
use near_network_primitives::types::PartialEncodedChunkRequestMsg;
use near_primitives::hash::CryptoHash;
use near_primitives::sharding::ReedSolomonWrapper;

#[test]
fn test_request_chunk_restart() {
Expand All @@ -22,10 +23,14 @@ fn test_request_chunk_restart() {
tracking_shards: HashSet::default(),
};
let client = &mut env.clients[0];
let num_total_parts = client.runtime_adapter.num_total_parts();
let num_data_parts = client.runtime_adapter.num_data_parts();
let mut rs = ReedSolomonWrapper::new(num_data_parts, num_total_parts - num_data_parts);
client.shards_mgr.process_partial_encoded_chunk_request(
request.clone(),
CryptoHash::default(),
client.chain.mut_store(),
&mut rs,
);
assert!(env.network_adapters[0].pop().is_some());

Expand All @@ -35,6 +40,7 @@ fn test_request_chunk_restart() {
request,
CryptoHash::default(),
client.chain.mut_store(),
&mut rs,
);
let response = env.network_adapters[0].pop().unwrap().as_network_requests();

Expand Down
27 changes: 21 additions & 6 deletions core/primitives/src/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,17 @@ impl ShardChunkHeader {
}
}

// TODO(mina86): Change return type of prev_block_hash to &CryptoHash and
mina86 marked this conversation as resolved.
Show resolved Hide resolved
// then this method can be deleted.
#[inline]
pub fn prev_block_hash_ref(&self) -> &CryptoHash {
match self {
Self::V1(header) => &header.inner.prev_block_hash,
Self::V2(header) => &header.inner.prev_block_hash,
Self::V3(header) => header.inner.prev_block_hash(),
}
}

#[inline]
pub fn encoded_merkle_root(&self) -> CryptoHash {
match self {
Expand Down Expand Up @@ -527,11 +538,7 @@ impl PartialEncodedChunk {
pub fn prev_block(&self) -> &CryptoHash {
match &self {
PartialEncodedChunk::V1(chunk) => &chunk.header.inner.prev_block_hash,
PartialEncodedChunk::V2(chunk) => match &chunk.header {
ShardChunkHeader::V1(header) => &header.inner.prev_block_hash,
ShardChunkHeader::V2(header) => &header.inner.prev_block_hash,
ShardChunkHeader::V3(header) => header.inner.prev_block_hash(),
},
PartialEncodedChunk::V2(chunk) => chunk.header.prev_block_hash_ref(),
}
}

Expand Down Expand Up @@ -700,6 +707,14 @@ impl ShardChunk {
}
}

#[inline]
pub fn prev_block(&self) -> &CryptoHash {
match &self {
ShardChunk::V1(chunk) => &chunk.header.inner.prev_block_hash,
ShardChunk::V2(chunk) => chunk.header.prev_block_hash_ref(),
}
}

#[inline]
pub fn prev_state_root(&self) -> StateRoot {
match self {
Expand Down Expand Up @@ -937,7 +952,7 @@ impl EncodedShardChunk {
TransactionReceipt::try_from_slice(&encoded_data)
}

fn encode_transaction_receipts(
pub fn encode_transaction_receipts(
rs: &mut ReedSolomonWrapper,
transactions: Vec<SignedTransaction>,
outgoing_receipts: &[Receipt],
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ performance_stats = [
"near-network/performance_stats",
]
expensive_tests = []
test_features = ["nearcore/test_features"]
test_features = ["nearcore/test_features", "near-chunks/test_features"]
mina86 marked this conversation as resolved.
Show resolved Hide resolved
protocol_feature_alt_bn128 = [
"near-primitives/protocol_feature_alt_bn128",
"node-runtime/protocol_feature_alt_bn128",
Expand Down
2 changes: 2 additions & 0 deletions integration-tests/src/tests/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ mod runtimes;
#[cfg(feature = "sandbox")]
mod sandbox;
mod sharding_upgrade;
#[cfg(feature = "test_features")]
mod shards_manager;
Loading