Skip to content

Commit

Permalink
chain: ShardManager: add ability to serve partial chunks from ShardCh…
Browse files Browse the repository at this point in the history
…unk (near#6377)

This is commit 09041ec upstream.

Add ability to respond to PartialEncodedChunkRequest from ShardChunk
objects in addition to PartialEncodedChunk.  In practice this is currently
dead code since there is no scenario in which the former is in the storage
while the latter isn’t but the plan is to start garbage collecting
ColPartialChunks column at which point we’ll have to serve requests from
data in ColChunks column.

Issue: near#6242
  • Loading branch information
mina86 committed Apr 7, 2022
1 parent 6142c58 commit 58bf692
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 67 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions chain/chunks-primitives/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ description = "This crate hosts NEAR chunks-related error types"

[dependencies]
near-chain-primitives = { path = "../chain-primitives" }
near-primitives = { path = "../../core/primitives" }
1 change: 1 addition & 0 deletions chain/chunks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ assert_matches = "1.5.0"
[features]
byzantine_asserts = ["near-chain/byzantine_asserts"]
expensive_tests = []
test_features = []
220 changes: 160 additions & 60 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ use near_primitives::hash::{hash, CryptoHash};
use near_primitives::merkle::{merklize, verify_path, MerklePath};
use near_primitives::receipt::Receipt;
use near_primitives::sharding::{
ChunkHash, EncodedShardChunk, PartialEncodedChunk, PartialEncodedChunkPart,
PartialEncodedChunkV1, PartialEncodedChunkV2, ReceiptList, ReceiptProof, ReedSolomonWrapper,
ShardChunkHeader, ShardProof,
ChunkHash, EncodedShardChunk, EncodedShardChunkBody, PartialEncodedChunk,
PartialEncodedChunkPart, PartialEncodedChunkV1, PartialEncodedChunkV2, ReceiptList,
ReceiptProof, ReedSolomonWrapper, ShardChunk, ShardChunkHeader, ShardProof,
};
use near_primitives::time::Clock;
use near_primitives::transaction::SignedTransaction;
Expand Down Expand Up @@ -995,13 +995,27 @@ impl ShardsManager {
request: PartialEncodedChunkRequestMsg,
route_back: CryptoHash,
chain_store: &mut ChainStore,
rs: &mut ReedSolomonWrapper,
) {
debug!(target: "chunks", "Received partial encoded chunk request for {:?}, part_ordinals: {:?}, shards: {:?}, I'm {:?}", request.chunk_hash.0, request.part_ords, request.tracking_shards, self.me);

let response = if let Some(entry) = self.encoded_chunks.get(&request.chunk_hash) {
Self::prepare_partial_encoded_chunk_response_from_cache(request, entry)
} else if let Ok(partial_chunk) = chain_store.get_partial_chunk(&request.chunk_hash) {
Self::prepare_partial_encoded_chunk_response_from_partial(request, partial_chunk)
} else if let Ok(chunk) = chain_store.get_chunk(&request.chunk_hash).map(|ch| ch.clone()) {
// Note: we need to clone the chunk because otherwise we would be
// holding multiple references to chain_store. One through the
// chunk and another through chain_store which we need to pass down
// to do further fetches.

// If we are archival node we might have garbage collected the
// partial chunk while we still keep the chunk itself. We can get
// the chunk, recalculate the parts and respond to the request.
//
// TODO(#6242): This is currently not implemented and effectively
// this is dead code.
self.prepare_partial_encoded_chunk_response_from_chunk(request, rs, chunk)
} else {
None
};
Expand All @@ -1013,11 +1027,12 @@ impl ShardsManager {
}
}

/// Prepares response to a partial encoded chunk request from
/// a corresponding encoded_chunks cache entry. If the entry can satisfy
/// the requests (i.e. contains all necessary parts and shards) the method
/// returns a [`PartialEncodedChunkResponseMsg`] object; otherwise returns
/// `None`.
/// Prepares response to a partial encoded chunk request from an entry in
/// a encoded_chunks in-memory cache.
///
/// If the entry can satisfy the requests (i.e. contains all necessary parts
/// and shards) the method returns a [`PartialEncodedChunkResponseMsg`]
/// object; otherwise returns `None`.
fn prepare_partial_encoded_chunk_response_from_cache(
request: PartialEncodedChunkRequestMsg,
entry: &EncodedChunksCacheEntry,
Expand All @@ -1037,12 +1052,14 @@ impl ShardsManager {
)
}

/// Prepares response to a partial encoded chunk request from
/// a corresponding partial chunk read from the storage. If the partial
/// chunk can satisfy the requests (i.e. contains all necessary parts and
/// shards) the method returns a [`PartialEncodedChunkResponseMsg`] object;
/// otherwise returns `None`.
fn prepare_partial_encoded_chunk_response_from_partial(
/// Prepares response to a partial encoded chunk request from a partial
/// chunk read from the storage.
///
/// If the partial chunk can satisfy the requests (i.e. contains all
/// necessary parts and shards) the method returns
/// a [`PartialEncodedChunkResponseMsg`] object; otherwise returns `None`.
// pub for testing
pub fn prepare_partial_encoded_chunk_response_from_partial(
request: PartialEncodedChunkRequestMsg,
partial_chunk: &PartialEncodedChunk,
) -> Option<PartialEncodedChunkResponseMsg> {
Expand Down Expand Up @@ -1075,6 +1092,128 @@ impl ShardsManager {
)
}

/// Constructs receipt proofs for specified chunk and returns them in an
/// iterator.
fn make_outgoing_receipts_proofs(
&self,
chunk_header: &ShardChunkHeader,
outgoing_receipts: &[Receipt],
) -> Result<impl Iterator<Item = ReceiptProof>, near_chunks_primitives::Error> {
let shard_id = chunk_header.shard_id();
let shard_layout = self
.runtime_adapter
.get_shard_layout_from_prev_block(chunk_header.prev_block_hash_ref())?;

let hashes = Chain::build_receipts_hashes(&outgoing_receipts, &shard_layout);
let (root, proofs) = merklize(&hashes);
assert_eq!(chunk_header.outgoing_receipts_root(), root);

let mut receipts_by_shard =
Chain::group_receipts_by_shard(outgoing_receipts.to_vec(), &shard_layout);
let it = proofs.into_iter().enumerate().map(move |(proof_shard_id, proof)| {
let proof_shard_id = proof_shard_id as u64;
let receipts = receipts_by_shard.remove(&proof_shard_id).unwrap_or_else(Vec::new);
let shard_proof =
ShardProof { from_shard_id: shard_id, to_shard_id: proof_shard_id, proof: proof };
ReceiptProof(receipts, shard_proof)
});
Ok(it)
}

/// Prepares response to a partial encoded chunk request from a chunk read
/// from the storage.
///
/// This requires encoding the chunk and as such is computationally
/// expensive operation. If possible, the request should be served from
/// EncodedChunksCacheEntry or PartialEncodedChunk instead.
// pub for testing
pub fn prepare_partial_encoded_chunk_response_from_chunk(
&mut self,
request: PartialEncodedChunkRequestMsg,
rs: &mut ReedSolomonWrapper,
chunk: ShardChunk,
) -> Option<PartialEncodedChunkResponseMsg> {
let total_parts = self.runtime_adapter.num_total_parts();
// rs is created with self.runtime_adapter.num_total_parts() so this
// assert should always hold true. If it doesn’t than something strange
// is going on.
assert_eq!(total_parts, rs.total_shard_count());
for &ord in request.part_ords.iter() {
let ord: usize = ord.try_into().unwrap();
if ord >= total_parts {
warn!(target:"chunks", "Not sending {}, requested part_ord {} but we only expect {} total",
request.chunk_hash.0, ord, total_parts);
return None;
}
}

let header = chunk.cloned_header();

// Get outgoing receipts for the chunk and construct vector of their
// proofs.
let outgoing_receipts = chunk.receipts();
let present_receipts: HashMap<ShardId, _> = self
.make_outgoing_receipts_proofs(&header, &outgoing_receipts)
.map_err(|err| {
warn!(target: "chunks", "Not sending {}, {}", request.chunk_hash.0, err);
})
.ok()?
.map(|receipt| (receipt.1.to_shard_id, receipt))
.collect();
let receipts_iter = request
.tracking_shards
.into_iter()
.map(move |shard_id| present_receipts.get(&shard_id).cloned());

// Construct EncodedShardChunk. If we earlier determined that we will
// need parity parts, instruct the constructor to calculate them as
// well. Otherwise we won’t bother.
let (parts, encoded_length) = EncodedShardChunk::encode_transaction_receipts(
rs,
chunk.transactions().to_vec(),
&outgoing_receipts).map_err(|err| {
warn!(target: "chunks", "Not sending {}, failed to encode transaction receipts: {}", request.chunk_hash.0, err);
}).ok()?;
if header.encoded_length() != encoded_length {
warn!(target: "chunks",
"Not sending {}, expected encoded length doesn’t match calculated: {} != {}",
request.chunk_hash.0, header.encoded_length(), encoded_length);
return None;
}

let mut content = EncodedShardChunkBody { parts };
if let Err(err) = content.reconstruct(rs) {
warn!(target: "chunks",
"Not sending {}, failed to reconstruct RS parity parts: {}",
request.chunk_hash.0, err);
return None;
}

let (encoded_merkle_root, merkle_paths) = content.get_merkle_hash_and_paths();
if header.encoded_merkle_root() != encoded_merkle_root {
warn!(target: "chunks",
"Not sending {}, expected encoded Merkle root doesn’t match calculated: {} != {}",
request.chunk_hash.0, header.encoded_merkle_root(), encoded_merkle_root);
return None;
}

let parts_iter = request.part_ords.into_iter().map(|part_ord| {
let ord: usize = part_ord.try_into().unwrap();
content.parts[ord].take().map(|part| PartialEncodedChunkPart {
part_ord,
part,
merkle_proof: merkle_paths[ord].clone(),
})
});

// Pass iterators to function, same as cache case.
Self::prepare_partial_encoded_chunk_response_from_iters(
request.chunk_hash,
parts_iter,
receipts_iter,
)
}

/// Checks if `parts_iter` and `receipts_iter` contain no `None` elements.
/// It evaluates the iterators only up to the first `None` value (if any);
/// since iterators are lazy this saves some work if there are any `Some`
Expand All @@ -1096,7 +1235,7 @@ impl ShardsManager {
let maybe_known_parts: Option<Vec<_>> = parts_iter.collect();
let parts = match maybe_known_parts {
None => {
debug!(target:"chunks", "Not sending {:?}, some parts are missing",
debug!(target:"chunks", "Not sending {}, some parts are missing",
chunk_hash.0);
return None;
}
Expand All @@ -1106,7 +1245,7 @@ impl ShardsManager {
let maybe_known_receipts: Option<Vec<_>> = receipts_iter.collect();
let receipts = match maybe_known_receipts {
None => {
debug!(target:"chunks", "Not sending {:?}, some receipts are missing",
debug!(target:"chunks", "Not sending {}, some receipts are missing",
chunk_hash.0);
return None;
}
Expand Down Expand Up @@ -1920,28 +2059,7 @@ impl ShardsManager {
store_update: &mut ChainStoreUpdate<'_>,
) -> Result<(), Error> {
let header = encoded_chunk.cloned_header();
let shard_id = header.shard_id();
let shard_layout =
self.runtime_adapter.get_shard_layout_from_prev_block(&header.prev_block_hash())?;
let outgoing_receipts_hashes =
Chain::build_receipts_hashes(&outgoing_receipts, &shard_layout);
let (outgoing_receipts_root, outgoing_receipts_proofs) =
merklize(&outgoing_receipts_hashes);
assert_eq!(header.outgoing_receipts_root(), outgoing_receipts_root);

// Save this chunk into encoded_chunks & process encoded chunk to add to the store.
let mut receipts_by_shard =
Chain::group_receipts_by_shard(outgoing_receipts, &shard_layout);
let receipts = outgoing_receipts_proofs
.into_iter()
.enumerate()
.map(|(to_shard_id, proof)| {
let to_shard_id = to_shard_id as u64;
let receipts = receipts_by_shard.remove(&to_shard_id).unwrap_or_else(Vec::new);
let shard_proof = ShardProof { from_shard_id: shard_id, to_shard_id, proof };
ReceiptProof(receipts, shard_proof)
})
.collect();
let receipts = self.make_outgoing_receipts_proofs(&header, &outgoing_receipts)?.collect();
let partial_chunk = PartialEncodedChunkV2 {
header,
parts: encoded_chunk
Expand Down Expand Up @@ -1982,16 +2100,8 @@ impl ShardsManager {
let chunk_header = encoded_chunk.cloned_header();
let prev_block_hash = chunk_header.prev_block_hash();
let shard_id = chunk_header.shard_id();
let shard_layout =
self.runtime_adapter.get_shard_layout_from_prev_block(&prev_block_hash)?;
let outgoing_receipts_hashes =
Chain::build_receipts_hashes(&outgoing_receipts, &shard_layout);
let (outgoing_receipts_root, outgoing_receipts_proofs) =
merklize(&outgoing_receipts_hashes);
assert_eq!(chunk_header.outgoing_receipts_root(), outgoing_receipts_root);

let mut block_producer_mapping = HashMap::new();

for part_ord in 0..self.runtime_adapter.num_total_parts() {
let part_ord = part_ord as u64;
let to_whom = self.runtime_adapter.get_part_owner(&prev_block_hash, part_ord).unwrap();
Expand All @@ -2000,20 +2110,10 @@ impl ShardsManager {
entry.push(part_ord);
}

let mut receipts_by_shard =
Chain::group_receipts_by_shard(outgoing_receipts, &shard_layout);
let receipt_proofs: Vec<_> = outgoing_receipts_proofs
.into_iter()
.enumerate()
.map(|(proof_shard_id, proof)| {
let proof_shard_id = proof_shard_id as u64;
let receipts = receipts_by_shard.remove(&proof_shard_id).unwrap_or_else(Vec::new);
let shard_proof =
ShardProof { from_shard_id: shard_id, to_shard_id: proof_shard_id, proof };
Arc::new(ReceiptProof(receipts, shard_proof))
})
.collect();

let receipt_proofs = self
.make_outgoing_receipts_proofs(&chunk_header, &outgoing_receipts)?
.map(Arc::new)
.collect::<Vec<_>>();
for (to_whom, part_ords) in block_producer_mapping {
let part_receipt_proofs = receipt_proofs
.iter()
Expand Down
1 change: 1 addition & 0 deletions chain/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ expensive_tests = []
test_features = [
"near-network/test_features",
"near-chain/test_features",
"near-chunks/test_features",
]
delay_detector = [
"near-chain/delay_detector",
Expand Down
1 change: 1 addition & 0 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ impl Handler<NetworkClientMessages> for ClientActor {
part_request_msg,
route_back,
self.client.chain.mut_store(),
&mut self.client.rs,
);
NetworkClientResponses::NoResponse
}
Expand Down
3 changes: 2 additions & 1 deletion chain/client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1409,7 +1409,7 @@ impl TestEnv {
}
}

fn get_partial_encoded_chunk_response(
pub fn get_partial_encoded_chunk_response(
&mut self,
id: usize,
request: PartialEncodedChunkRequestMsg,
Expand All @@ -1419,6 +1419,7 @@ impl TestEnv {
request,
CryptoHash::default(),
client.chain.mut_store(),
&mut client.rs,
);
let response = self.network_adapters[id].pop().unwrap();
if let PeerManagerMessageRequest::NetworkRequests(
Expand Down
6 changes: 6 additions & 0 deletions chain/client/src/tests/chunks_management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use near_logger_utils::init_integration_logger;
use near_network::types::NetworkRequests;
use near_network_primitives::types::PartialEncodedChunkRequestMsg;
use near_primitives::hash::CryptoHash;
use near_primitives::sharding::ReedSolomonWrapper;

#[test]
fn test_request_chunk_restart() {
Expand All @@ -22,10 +23,14 @@ fn test_request_chunk_restart() {
tracking_shards: HashSet::default(),
};
let client = &mut env.clients[0];
let num_total_parts = client.runtime_adapter.num_total_parts();
let num_data_parts = client.runtime_adapter.num_data_parts();
let mut rs = ReedSolomonWrapper::new(num_data_parts, num_total_parts - num_data_parts);
client.shards_mgr.process_partial_encoded_chunk_request(
request.clone(),
CryptoHash::default(),
client.chain.mut_store(),
&mut rs,
);
assert!(env.network_adapters[0].pop().is_some());

Expand All @@ -35,6 +40,7 @@ fn test_request_chunk_restart() {
request,
CryptoHash::default(),
client.chain.mut_store(),
&mut rs,
);
let response = env.network_adapters[0].pop().unwrap().as_network_requests();

Expand Down
Loading

0 comments on commit 58bf692

Please sign in to comment.