Skip to content

Commit

Permalink
chain: ShardManager: add ability to serve partial chunks from ShardChunk
Browse files Browse the repository at this point in the history
Add ability to respond to PartialEncodedChunkRequest from ShardChunk
objects in addition to PartialEncodedChunk.  In practice this is currently
dead code since there is no scenario in which the former is in the storage
while the latter isn’t but the plan is to start garbage collecting
ColPartialChunks column at which point we’ll have to serve requests from
data in ColChunks column.

Issue: #6242
  • Loading branch information
mina86 committed Mar 2, 2022
1 parent 434a94f commit 22c0844
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 9 deletions.
150 changes: 147 additions & 3 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ use near_primitives::hash::{hash, CryptoHash};
use near_primitives::merkle::{merklize, verify_path, MerklePath};
use near_primitives::receipt::Receipt;
use near_primitives::sharding::{
ChunkHash, EncodedShardChunk, PartialEncodedChunk, PartialEncodedChunkPart,
PartialEncodedChunkV1, PartialEncodedChunkV2, ReceiptList, ReceiptProof, ReedSolomonWrapper,
ShardChunkHeader, ShardProof,
ChunkHash, EncodedShardChunk, EncodedShardChunkBody, PartialEncodedChunk,
PartialEncodedChunkPart, PartialEncodedChunkV1, PartialEncodedChunkV2, ReceiptList,
ReceiptProof, ReedSolomonWrapper, ShardChunk, ShardChunkHeader, ShardProof,
};
use near_primitives::time::Clock;
use near_primitives::transaction::SignedTransaction;
Expand Down Expand Up @@ -995,13 +995,27 @@ impl ShardsManager {
request: PartialEncodedChunkRequestMsg,
route_back: CryptoHash,
chain_store: &mut ChainStore,
rs: &mut ReedSolomonWrapper,
) {
debug!(target: "chunks", "Received partial encoded chunk request for {:?}, part_ordinals: {:?}, shards: {:?}, I'm {:?}", request.chunk_hash.0, request.part_ords, request.tracking_shards, self.me);

let response = if let Some(entry) = self.encoded_chunks.get(&request.chunk_hash) {
Self::prepare_partial_encoded_chunk_response_from_cache(request, entry)
} else if let Ok(partial_chunk) = chain_store.get_partial_chunk(&request.chunk_hash) {
Self::prepare_partial_encoded_chunk_response_from_partial(request, partial_chunk)
} else if let Ok(chunk) = chain_store.get_chunk(&request.chunk_hash).map(|ch| ch.clone()) {
// Note: we need to clone the chunk because otherwise we would be
// holding multiple references to chain_store. One through the
// chunk and another through chain_store which we need to pass down
// to do further fetches.

// If we are archival node we might have garbage collected the
// partial chunk while we still keep the chunk itself. We can get
// the chunk, recalculate the parts and respond to the request.
//
// TODO(#6242): This is currently not implemented and effectively
// this is dead code.
self.prepare_partial_encoded_chunk_response_from_chunk(request, chain_store, rs, chunk)
} else {
None
};
Expand Down Expand Up @@ -1075,6 +1089,136 @@ impl ShardsManager {
)
}

fn prepare_partial_encoded_chunk_response_from_chunk(
&mut self,
request: PartialEncodedChunkRequestMsg,
chain_store: &mut ChainStore,
rs: &mut ReedSolomonWrapper,
chunk: ShardChunk,
) -> Option<PartialEncodedChunkResponseMsg> {
// TODO(mina86): Is it correct to use rs.data_shard_count() and
// rs.total_shard_count() here? Can those values change over time?
let total_parts = rs.total_shard_count();
for ord in request.part_ords.iter() {
let ord: usize = (*ord).try_into().unwrap();
if ord >= total_parts {
debug!(target:"chunks", "Not sending {:?}, requested part_ord {} but we only expect {} total",
request.chunk_hash.0, ord, total_parts);
return None;
}
}

// Figure out basic parameters such as shard id, epoch id, protocol
// version and shard layout. Those will be needed later when fetching
// other data and constructing encoded shard chunk.
let header = chunk.cloned_header();
let prev_block_hash = header.prev_block_hash_ref();
let shard_id = header.shard_id();
let shard_layout = self
.runtime_adapter
.get_shard_layout_from_prev_block(prev_block_hash)
.map_err(|err| {
debug!(target: "chunks", "Not sending {:?}, failed to get shard layout: {}", request.chunk_hash.0, err);
})
.ok()?;

// Get outgoing receipts for the chunk and construct vector of their
// proofs.
let outgoing_receipts = chain_store
.get_outgoing_receipts_for_shard(
&*self.runtime_adapter,
*chunk.prev_block(),
shard_id,
chunk.height_included(),
)
.map_err(|err| {
debug!(target: "chunks", "Not sending {:?}, failed to get outgoing receipts: {}", request.chunk_hash.0, err);
}).ok()?;
let outgoing_receipts_hashes =
Chain::build_receipts_hashes(&outgoing_receipts, &shard_layout);
let (outgoing_receipts_root, outgoing_receipts_proofs) =
merklize(&outgoing_receipts_hashes);
if header.outgoing_receipts_root() != outgoing_receipts_root {
error!(target: "chunks",
"Not sending {:?}, expected outgoing receipts root doesn’t match calculated: {} != {}",
request.chunk_hash.0, header.outgoing_receipts_root(), outgoing_receipts_root);
return None;
}

let mut receipts_by_shard =
Chain::group_receipts_by_shard(outgoing_receipts.clone(), &shard_layout);
let receipt_proofs: Vec<_> = outgoing_receipts_proofs
.iter()
.enumerate()
.map(|(proof_shard_id, proof)| {
let proof_shard_id = proof_shard_id as u64;
let receipts = receipts_by_shard.remove(&proof_shard_id).unwrap_or_else(Vec::new);
let shard_proof = ShardProof {
from_shard_id: shard_id,
to_shard_id: proof_shard_id,
proof: proof.clone(),
};
ReceiptProof(receipts, shard_proof)
})
.collect();

// Construct EncodedShardChunk. If we earlier determined that we will
// need parity parts, instruct the constructor to calculate them as
// well. Otherwise we won’t bother.
let (parts, encoded_length) = EncodedShardChunk::encode_transaction_receipts(
rs,
chunk.transactions().to_vec(),
&outgoing_receipts).map_err(|err| {
debug!(target: "chunks", "Not sending {:?}, failed to encode transaction receipts: {}", request.chunk_hash.0, err);
}).ok()?;
if header.encoded_length() != encoded_length {
error!(target: "chunks",
"Not sending {:?}, expected encoded length doesn’t match calculated: {} != {}",
request.chunk_hash.0, header.encoded_length(), encoded_length);
return None;
}

let mut content = EncodedShardChunkBody { parts };
if let Err(err) = content.reconstruct(rs) {
error!(target: "chunks",
"Not sending {:?}, failed to reconstruct RS parity parts: {}",
request.chunk_hash.0, err);
return None;
}

let (encoded_merkle_root, merkle_paths) = content.get_merkle_hash_and_paths();
if header.encoded_merkle_root() != encoded_merkle_root {
error!(target: "chunks",
"Not sending {:?}, expected encoded Merkle root doesn’t match calculated: {} != {}",
request.chunk_hash.0, header.encoded_merkle_root(), encoded_merkle_root);
return None;
}

let parts_iter = request.part_ords.into_iter().map(|part_ord| {
let ord: usize = part_ord.try_into().unwrap();
content.parts[ord].take().map(|part| PartialEncodedChunkPart {
part_ord,
part,
merkle_proof: merkle_paths[ord].clone(),
})
});

// Same process for receipts as above for parts.
let present_receipts: HashMap<ShardId, _> =
receipt_proofs.iter().map(|receipt| (receipt.1.to_shard_id, receipt)).collect();
let receipts_iter = request
.tracking_shards
.iter()
.map(|shard_id| present_receipts.get(shard_id).map(|x| *x).cloned());

// Pass iterators to function, same as cache case.
Self::prepare_partial_encoded_chunk_response_from_iters(
request.chunk_hash,
parts_iter,
receipts_iter,
)
}

/// Checks if `parts_iter` and `receipts_iter` contain no `None` elements.
/// It evaluates the iterators only up to the first `None` value (if any);
/// since iterators are lazy this saves some work if there are any `Some`
Expand Down
1 change: 1 addition & 0 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ impl Handler<NetworkClientMessages> for ClientActor {
part_request_msg,
route_back,
self.client.chain.mut_store(),
&mut self.client.rs,
);
NetworkClientResponses::NoResponse
}
Expand Down
5 changes: 5 additions & 0 deletions chain/client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1415,10 +1415,15 @@ impl TestEnv {
request: PartialEncodedChunkRequestMsg,
) -> PartialEncodedChunkResponseMsg {
let client = &mut self.clients[id];
let total_parts = client.chain.runtime_adapter.num_total_parts();
let data_parts = client.chain.runtime_adapter.num_data_parts();
let parity_parts = total_parts - data_parts;
let mut rs = ReedSolomonWrapper::new(data_parts, parity_parts);
client.shards_mgr.process_partial_encoded_chunk_request(
request,
CryptoHash::default(),
client.chain.mut_store(),
&mut rs,
);
let response = self.network_adapters[id].pop().unwrap();
if let PeerManagerMessageRequest::NetworkRequests(
Expand Down
6 changes: 6 additions & 0 deletions chain/client/src/tests/chunks_management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use near_logger_utils::init_integration_logger;
use near_network::types::NetworkRequests;
use near_network_primitives::types::PartialEncodedChunkRequestMsg;
use near_primitives::hash::CryptoHash;
use near_primitives::sharding::ReedSolomonWrapper;

#[test]
fn test_request_chunk_restart() {
Expand All @@ -22,10 +23,14 @@ fn test_request_chunk_restart() {
tracking_shards: HashSet::default(),
};
let client = &mut env.clients[0];
let num_total_parts = client.runtime_adapter.num_total_parts();
let num_data_parts = client.runtime_adapter.num_data_parts();
let mut rs = ReedSolomonWrapper::new(num_data_parts, num_total_parts - num_data_parts);
client.shards_mgr.process_partial_encoded_chunk_request(
request.clone(),
CryptoHash::default(),
client.chain.mut_store(),
&mut rs,
);
assert!(env.network_adapters[0].pop().is_some());

Expand All @@ -35,6 +40,7 @@ fn test_request_chunk_restart() {
request,
CryptoHash::default(),
client.chain.mut_store(),
&mut rs,
);
let response = env.network_adapters[0].pop().unwrap().as_network_requests();

Expand Down
27 changes: 21 additions & 6 deletions core/primitives/src/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,17 @@ impl ShardChunkHeader {
}
}

// TODO(mina86): Change return type of prev_block_hash to &CryptoHash and
// then this method can be deleted.
#[inline]
pub fn prev_block_hash_ref(&self) -> &CryptoHash {
match self {
Self::V1(header) => &header.inner.prev_block_hash,
Self::V2(header) => &header.inner.prev_block_hash,
Self::V3(header) => header.inner.prev_block_hash(),
}
}

#[inline]
pub fn encoded_merkle_root(&self) -> CryptoHash {
match self {
Expand Down Expand Up @@ -527,11 +538,7 @@ impl PartialEncodedChunk {
pub fn prev_block(&self) -> &CryptoHash {
match &self {
PartialEncodedChunk::V1(chunk) => &chunk.header.inner.prev_block_hash,
PartialEncodedChunk::V2(chunk) => match &chunk.header {
ShardChunkHeader::V1(header) => &header.inner.prev_block_hash,
ShardChunkHeader::V2(header) => &header.inner.prev_block_hash,
ShardChunkHeader::V3(header) => header.inner.prev_block_hash(),
},
PartialEncodedChunk::V2(chunk) => chunk.header.prev_block_hash_ref(),
}
}

Expand Down Expand Up @@ -700,6 +707,14 @@ impl ShardChunk {
}
}

#[inline]
pub fn prev_block(&self) -> &CryptoHash {
match &self {
ShardChunk::V1(chunk) => &chunk.header.inner.prev_block_hash,
ShardChunk::V2(chunk) => chunk.header.prev_block_hash_ref(),
}
}

#[inline]
pub fn prev_state_root(&self) -> StateRoot {
match self {
Expand Down Expand Up @@ -937,7 +952,7 @@ impl EncodedShardChunk {
TransactionReceipt::try_from_slice(&encoded_data)
}

fn encode_transaction_receipts(
pub fn encode_transaction_receipts(
rs: &mut ReedSolomonWrapper,
transactions: Vec<SignedTransaction>,
outgoing_receipts: &[Receipt],
Expand Down

0 comments on commit 22c0844

Please sign in to comment.