From 09041ec73714f1a0fd48fd5e3b721015ade5fdd2 Mon Sep 17 00:00:00 2001 From: Michal Nazarewicz Date: Wed, 9 Mar 2022 21:28:58 +0100 Subject: [PATCH] chain: ShardManager: add ability to serve partial chunks from ShardChunk (#6377) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add ability to respond to PartialEncodedChunkRequest from ShardChunk objects in addition to PartialEncodedChunk. In practice this is currently dead code since there is no scenario in which the former is in the storage while the latter isn’t but the plan is to start garbage collecting ColPartialChunks column at which point we’ll have to serve requests from data in ColChunks column. Issue: https://github.com/near/nearcore/issues/6242 --- Cargo.lock | 1 + chain/chunks-primitives/Cargo.toml | 1 + chain/chunks/Cargo.toml | 1 + chain/chunks/src/lib.rs | 220 +++++++++++++----- chain/client/Cargo.toml | 1 + chain/client/src/client_actor.rs | 1 + chain/client/src/test_utils.rs | 3 +- chain/client/src/tests/chunks_management.rs | 6 + core/primitives/src/sharding.rs | 27 ++- integration-tests/src/tests/client/mod.rs | 2 + .../src/tests/client/shards_manager.rs | 63 +++++ 11 files changed, 259 insertions(+), 67 deletions(-) create mode 100644 integration-tests/src/tests/client/shards_manager.rs diff --git a/Cargo.lock b/Cargo.lock index 6da8e4eba1d..71424c4d205 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2598,6 +2598,7 @@ name = "near-chunks-primitives" version = "0.0.0" dependencies = [ "near-chain-primitives", + "near-primitives", ] [[package]] diff --git a/chain/chunks-primitives/Cargo.toml b/chain/chunks-primitives/Cargo.toml index e83f032887d..454697eff32 100644 --- a/chain/chunks-primitives/Cargo.toml +++ b/chain/chunks-primitives/Cargo.toml @@ -12,3 +12,4 @@ description = "This crate hosts NEAR chunks-related error types" [dependencies] near-chain-primitives = { path = "../chain-primitives" } +near-primitives = { path = "../../core/primitives" } diff --git a/chain/chunks/Cargo.toml b/chain/chunks/Cargo.toml index 0ab468fbdde..1f81400b9af 100644 --- a/chain/chunks/Cargo.toml +++ b/chain/chunks/Cargo.toml @@ -33,3 +33,4 @@ assert_matches = "1.5.0" [features] byzantine_asserts = ["near-chain/byzantine_asserts"] expensive_tests = [] +test_features = [] diff --git a/chain/chunks/src/lib.rs b/chain/chunks/src/lib.rs index b00318b1552..65d66016683 100644 --- a/chain/chunks/src/lib.rs +++ b/chain/chunks/src/lib.rs @@ -101,9 +101,9 @@ use near_primitives::hash::{hash, CryptoHash}; use near_primitives::merkle::{merklize, verify_path, MerklePath}; use near_primitives::receipt::Receipt; use near_primitives::sharding::{ - ChunkHash, EncodedShardChunk, PartialEncodedChunk, PartialEncodedChunkPart, - PartialEncodedChunkV1, PartialEncodedChunkV2, ReceiptList, ReceiptProof, ReedSolomonWrapper, - ShardChunkHeader, ShardProof, + ChunkHash, EncodedShardChunk, EncodedShardChunkBody, PartialEncodedChunk, + PartialEncodedChunkPart, PartialEncodedChunkV1, PartialEncodedChunkV2, ReceiptList, + ReceiptProof, ReedSolomonWrapper, ShardChunk, ShardChunkHeader, ShardProof, }; use near_primitives::time::Clock; use near_primitives::transaction::SignedTransaction; @@ -995,6 +995,7 @@ impl ShardsManager { request: PartialEncodedChunkRequestMsg, route_back: CryptoHash, chain_store: &mut ChainStore, + rs: &mut ReedSolomonWrapper, ) { debug!(target: "chunks", "Received partial encoded chunk request for {:?}, part_ordinals: {:?}, shards: {:?}, I'm {:?}", request.chunk_hash.0, request.part_ords, request.tracking_shards, self.me); @@ -1002,6 +1003,19 @@ impl ShardsManager { Self::prepare_partial_encoded_chunk_response_from_cache(request, entry) } else if let Ok(partial_chunk) = chain_store.get_partial_chunk(&request.chunk_hash) { Self::prepare_partial_encoded_chunk_response_from_partial(request, partial_chunk) + } else if let Ok(chunk) = chain_store.get_chunk(&request.chunk_hash).map(|ch| ch.clone()) { + // Note: we need to clone the chunk because otherwise we would be + // holding multiple references to chain_store. One through the + // chunk and another through chain_store which we need to pass down + // to do further fetches. + + // If we are archival node we might have garbage collected the + // partial chunk while we still keep the chunk itself. We can get + // the chunk, recalculate the parts and respond to the request. + // + // TODO(#6242): This is currently not implemented and effectively + // this is dead code. + self.prepare_partial_encoded_chunk_response_from_chunk(request, rs, chunk) } else { None }; @@ -1013,11 +1027,12 @@ impl ShardsManager { } } - /// Prepares response to a partial encoded chunk request from - /// a corresponding encoded_chunks cache entry. If the entry can satisfy - /// the requests (i.e. contains all necessary parts and shards) the method - /// returns a [`PartialEncodedChunkResponseMsg`] object; otherwise returns - /// `None`. + /// Prepares response to a partial encoded chunk request from an entry in + /// a encoded_chunks in-memory cache. + /// + /// If the entry can satisfy the requests (i.e. contains all necessary parts + /// and shards) the method returns a [`PartialEncodedChunkResponseMsg`] + /// object; otherwise returns `None`. fn prepare_partial_encoded_chunk_response_from_cache( request: PartialEncodedChunkRequestMsg, entry: &EncodedChunksCacheEntry, @@ -1037,12 +1052,14 @@ impl ShardsManager { ) } - /// Prepares response to a partial encoded chunk request from - /// a corresponding partial chunk read from the storage. If the partial - /// chunk can satisfy the requests (i.e. contains all necessary parts and - /// shards) the method returns a [`PartialEncodedChunkResponseMsg`] object; - /// otherwise returns `None`. - fn prepare_partial_encoded_chunk_response_from_partial( + /// Prepares response to a partial encoded chunk request from a partial + /// chunk read from the storage. + /// + /// If the partial chunk can satisfy the requests (i.e. contains all + /// necessary parts and shards) the method returns + /// a [`PartialEncodedChunkResponseMsg`] object; otherwise returns `None`. + // pub for testing + pub fn prepare_partial_encoded_chunk_response_from_partial( request: PartialEncodedChunkRequestMsg, partial_chunk: &PartialEncodedChunk, ) -> Option { @@ -1075,6 +1092,128 @@ impl ShardsManager { ) } + /// Constructs receipt proofs for specified chunk and returns them in an + /// iterator. + fn make_outgoing_receipts_proofs( + &self, + chunk_header: &ShardChunkHeader, + outgoing_receipts: &[Receipt], + ) -> Result, near_chunks_primitives::Error> { + let shard_id = chunk_header.shard_id(); + let shard_layout = self + .runtime_adapter + .get_shard_layout_from_prev_block(chunk_header.prev_block_hash_ref())?; + + let hashes = Chain::build_receipts_hashes(&outgoing_receipts, &shard_layout); + let (root, proofs) = merklize(&hashes); + assert_eq!(chunk_header.outgoing_receipts_root(), root); + + let mut receipts_by_shard = + Chain::group_receipts_by_shard(outgoing_receipts.to_vec(), &shard_layout); + let it = proofs.into_iter().enumerate().map(move |(proof_shard_id, proof)| { + let proof_shard_id = proof_shard_id as u64; + let receipts = receipts_by_shard.remove(&proof_shard_id).unwrap_or_else(Vec::new); + let shard_proof = + ShardProof { from_shard_id: shard_id, to_shard_id: proof_shard_id, proof: proof }; + ReceiptProof(receipts, shard_proof) + }); + Ok(it) + } + + /// Prepares response to a partial encoded chunk request from a chunk read + /// from the storage. + /// + /// This requires encoding the chunk and as such is computationally + /// expensive operation. If possible, the request should be served from + /// EncodedChunksCacheEntry or PartialEncodedChunk instead. + // pub for testing + pub fn prepare_partial_encoded_chunk_response_from_chunk( + &mut self, + request: PartialEncodedChunkRequestMsg, + rs: &mut ReedSolomonWrapper, + chunk: ShardChunk, + ) -> Option { + let total_parts = self.runtime_adapter.num_total_parts(); + // rs is created with self.runtime_adapter.num_total_parts() so this + // assert should always hold true. If it doesn’t than something strange + // is going on. + assert_eq!(total_parts, rs.total_shard_count()); + for &ord in request.part_ords.iter() { + let ord: usize = ord.try_into().unwrap(); + if ord >= total_parts { + warn!(target:"chunks", "Not sending {}, requested part_ord {} but we only expect {} total", + request.chunk_hash.0, ord, total_parts); + return None; + } + } + + let header = chunk.cloned_header(); + + // Get outgoing receipts for the chunk and construct vector of their + // proofs. + let outgoing_receipts = chunk.receipts(); + let present_receipts: HashMap = self + .make_outgoing_receipts_proofs(&header, &outgoing_receipts) + .map_err(|err| { + warn!(target: "chunks", "Not sending {}, {}", request.chunk_hash.0, err); + }) + .ok()? + .map(|receipt| (receipt.1.to_shard_id, receipt)) + .collect(); + let receipts_iter = request + .tracking_shards + .into_iter() + .map(move |shard_id| present_receipts.get(&shard_id).cloned()); + + // Construct EncodedShardChunk. If we earlier determined that we will + // need parity parts, instruct the constructor to calculate them as + // well. Otherwise we won’t bother. + let (parts, encoded_length) = EncodedShardChunk::encode_transaction_receipts( + rs, + chunk.transactions().to_vec(), + &outgoing_receipts).map_err(|err| { + warn!(target: "chunks", "Not sending {}, failed to encode transaction receipts: {}", request.chunk_hash.0, err); + }).ok()?; + if header.encoded_length() != encoded_length { + warn!(target: "chunks", + "Not sending {}, expected encoded length doesn’t match calculated: {} != {}", + request.chunk_hash.0, header.encoded_length(), encoded_length); + return None; + } + + let mut content = EncodedShardChunkBody { parts }; + if let Err(err) = content.reconstruct(rs) { + warn!(target: "chunks", + "Not sending {}, failed to reconstruct RS parity parts: {}", + request.chunk_hash.0, err); + return None; + } + + let (encoded_merkle_root, merkle_paths) = content.get_merkle_hash_and_paths(); + if header.encoded_merkle_root() != encoded_merkle_root { + warn!(target: "chunks", + "Not sending {}, expected encoded Merkle root doesn’t match calculated: {} != {}", + request.chunk_hash.0, header.encoded_merkle_root(), encoded_merkle_root); + return None; + } + + let parts_iter = request.part_ords.into_iter().map(|part_ord| { + let ord: usize = part_ord.try_into().unwrap(); + content.parts[ord].take().map(|part| PartialEncodedChunkPart { + part_ord, + part, + merkle_proof: merkle_paths[ord].clone(), + }) + }); + + // Pass iterators to function, same as cache case. + Self::prepare_partial_encoded_chunk_response_from_iters( + request.chunk_hash, + parts_iter, + receipts_iter, + ) + } + /// Checks if `parts_iter` and `receipts_iter` contain no `None` elements. /// It evaluates the iterators only up to the first `None` value (if any); /// since iterators are lazy this saves some work if there are any `Some` @@ -1096,7 +1235,7 @@ impl ShardsManager { let maybe_known_parts: Option> = parts_iter.collect(); let parts = match maybe_known_parts { None => { - debug!(target:"chunks", "Not sending {:?}, some parts are missing", + debug!(target:"chunks", "Not sending {}, some parts are missing", chunk_hash.0); return None; } @@ -1106,7 +1245,7 @@ impl ShardsManager { let maybe_known_receipts: Option> = receipts_iter.collect(); let receipts = match maybe_known_receipts { None => { - debug!(target:"chunks", "Not sending {:?}, some receipts are missing", + debug!(target:"chunks", "Not sending {}, some receipts are missing", chunk_hash.0); return None; } @@ -1911,28 +2050,7 @@ impl ShardsManager { store_update: &mut ChainStoreUpdate<'_>, ) -> Result<(), Error> { let header = encoded_chunk.cloned_header(); - let shard_id = header.shard_id(); - let shard_layout = - self.runtime_adapter.get_shard_layout_from_prev_block(&header.prev_block_hash())?; - let outgoing_receipts_hashes = - Chain::build_receipts_hashes(&outgoing_receipts, &shard_layout); - let (outgoing_receipts_root, outgoing_receipts_proofs) = - merklize(&outgoing_receipts_hashes); - assert_eq!(header.outgoing_receipts_root(), outgoing_receipts_root); - - // Save this chunk into encoded_chunks & process encoded chunk to add to the store. - let mut receipts_by_shard = - Chain::group_receipts_by_shard(outgoing_receipts, &shard_layout); - let receipts = outgoing_receipts_proofs - .into_iter() - .enumerate() - .map(|(to_shard_id, proof)| { - let to_shard_id = to_shard_id as u64; - let receipts = receipts_by_shard.remove(&to_shard_id).unwrap_or_else(Vec::new); - let shard_proof = ShardProof { from_shard_id: shard_id, to_shard_id, proof }; - ReceiptProof(receipts, shard_proof) - }) - .collect(); + let receipts = self.make_outgoing_receipts_proofs(&header, &outgoing_receipts)?.collect(); let partial_chunk = PartialEncodedChunkV2 { header, parts: encoded_chunk @@ -1973,16 +2091,8 @@ impl ShardsManager { let chunk_header = encoded_chunk.cloned_header(); let prev_block_hash = chunk_header.prev_block_hash(); let shard_id = chunk_header.shard_id(); - let shard_layout = - self.runtime_adapter.get_shard_layout_from_prev_block(&prev_block_hash)?; - let outgoing_receipts_hashes = - Chain::build_receipts_hashes(&outgoing_receipts, &shard_layout); - let (outgoing_receipts_root, outgoing_receipts_proofs) = - merklize(&outgoing_receipts_hashes); - assert_eq!(chunk_header.outgoing_receipts_root(), outgoing_receipts_root); let mut block_producer_mapping = HashMap::new(); - for part_ord in 0..self.runtime_adapter.num_total_parts() { let part_ord = part_ord as u64; let to_whom = self.runtime_adapter.get_part_owner(&prev_block_hash, part_ord).unwrap(); @@ -1991,20 +2101,10 @@ impl ShardsManager { entry.push(part_ord); } - let mut receipts_by_shard = - Chain::group_receipts_by_shard(outgoing_receipts, &shard_layout); - let receipt_proofs: Vec<_> = outgoing_receipts_proofs - .into_iter() - .enumerate() - .map(|(proof_shard_id, proof)| { - let proof_shard_id = proof_shard_id as u64; - let receipts = receipts_by_shard.remove(&proof_shard_id).unwrap_or_else(Vec::new); - let shard_proof = - ShardProof { from_shard_id: shard_id, to_shard_id: proof_shard_id, proof }; - Arc::new(ReceiptProof(receipts, shard_proof)) - }) - .collect(); - + let receipt_proofs = self + .make_outgoing_receipts_proofs(&chunk_header, &outgoing_receipts)? + .map(Arc::new) + .collect::>(); for (to_whom, part_ords) in block_producer_mapping { let part_receipt_proofs = receipt_proofs .iter() diff --git a/chain/client/Cargo.toml b/chain/client/Cargo.toml index 68bb99de93c..e3a8be9716b 100644 --- a/chain/client/Cargo.toml +++ b/chain/client/Cargo.toml @@ -54,6 +54,7 @@ expensive_tests = [] test_features = [ "near-network/test_features", "near-chain/test_features", + "near-chunks/test_features", ] delay_detector = [ "near-chain/delay_detector", diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index 0d3603fadea..fd89e235024 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -563,6 +563,7 @@ impl Handler for ClientActor { part_request_msg, route_back, self.client.chain.mut_store(), + &mut self.client.rs, ); NetworkClientResponses::NoResponse } diff --git a/chain/client/src/test_utils.rs b/chain/client/src/test_utils.rs index 9053c395f18..875691ba736 100644 --- a/chain/client/src/test_utils.rs +++ b/chain/client/src/test_utils.rs @@ -1410,7 +1410,7 @@ impl TestEnv { } } - fn get_partial_encoded_chunk_response( + pub fn get_partial_encoded_chunk_response( &mut self, id: usize, request: PartialEncodedChunkRequestMsg, @@ -1420,6 +1420,7 @@ impl TestEnv { request, CryptoHash::default(), client.chain.mut_store(), + &mut client.rs, ); let response = self.network_adapters[id].pop().unwrap(); if let PeerManagerMessageRequest::NetworkRequests( diff --git a/chain/client/src/tests/chunks_management.rs b/chain/client/src/tests/chunks_management.rs index 63bc794d9aa..c087eb2815d 100644 --- a/chain/client/src/tests/chunks_management.rs +++ b/chain/client/src/tests/chunks_management.rs @@ -6,6 +6,7 @@ use near_logger_utils::init_integration_logger; use near_network::types::NetworkRequests; use near_network_primitives::types::PartialEncodedChunkRequestMsg; use near_primitives::hash::CryptoHash; +use near_primitives::sharding::ReedSolomonWrapper; #[test] fn test_request_chunk_restart() { @@ -22,10 +23,14 @@ fn test_request_chunk_restart() { tracking_shards: HashSet::default(), }; let client = &mut env.clients[0]; + let num_total_parts = client.runtime_adapter.num_total_parts(); + let num_data_parts = client.runtime_adapter.num_data_parts(); + let mut rs = ReedSolomonWrapper::new(num_data_parts, num_total_parts - num_data_parts); client.shards_mgr.process_partial_encoded_chunk_request( request.clone(), CryptoHash::default(), client.chain.mut_store(), + &mut rs, ); assert!(env.network_adapters[0].pop().is_some()); @@ -35,6 +40,7 @@ fn test_request_chunk_restart() { request, CryptoHash::default(), client.chain.mut_store(), + &mut rs, ); let response = env.network_adapters[0].pop().unwrap().as_network_requests(); diff --git a/core/primitives/src/sharding.rs b/core/primitives/src/sharding.rs index 0e26f4d67b2..42d08754a79 100644 --- a/core/primitives/src/sharding.rs +++ b/core/primitives/src/sharding.rs @@ -303,6 +303,17 @@ impl ShardChunkHeader { } } + // TODO(mina86): Change return type of prev_block_hash to &CryptoHash and + // then this method can be deleted. + #[inline] + pub fn prev_block_hash_ref(&self) -> &CryptoHash { + match self { + Self::V1(header) => &header.inner.prev_block_hash, + Self::V2(header) => &header.inner.prev_block_hash, + Self::V3(header) => header.inner.prev_block_hash(), + } + } + #[inline] pub fn encoded_merkle_root(&self) -> CryptoHash { match self { @@ -527,11 +538,7 @@ impl PartialEncodedChunk { pub fn prev_block(&self) -> &CryptoHash { match &self { PartialEncodedChunk::V1(chunk) => &chunk.header.inner.prev_block_hash, - PartialEncodedChunk::V2(chunk) => match &chunk.header { - ShardChunkHeader::V1(header) => &header.inner.prev_block_hash, - ShardChunkHeader::V2(header) => &header.inner.prev_block_hash, - ShardChunkHeader::V3(header) => header.inner.prev_block_hash(), - }, + PartialEncodedChunk::V2(chunk) => chunk.header.prev_block_hash_ref(), } } @@ -700,6 +707,14 @@ impl ShardChunk { } } + #[inline] + pub fn prev_block(&self) -> &CryptoHash { + match &self { + ShardChunk::V1(chunk) => &chunk.header.inner.prev_block_hash, + ShardChunk::V2(chunk) => chunk.header.prev_block_hash_ref(), + } + } + #[inline] pub fn prev_state_root(&self) -> StateRoot { match self { @@ -937,7 +952,7 @@ impl EncodedShardChunk { TransactionReceipt::try_from_slice(&encoded_data) } - fn encode_transaction_receipts( + pub fn encode_transaction_receipts( rs: &mut ReedSolomonWrapper, transactions: Vec, outgoing_receipts: &[Receipt], diff --git a/integration-tests/src/tests/client/mod.rs b/integration-tests/src/tests/client/mod.rs index 88a53f5fe7e..bc9acdfa83a 100644 --- a/integration-tests/src/tests/client/mod.rs +++ b/integration-tests/src/tests/client/mod.rs @@ -5,3 +5,5 @@ mod runtimes; #[cfg(feature = "sandbox")] mod sandbox; mod sharding_upgrade; +#[cfg(feature = "test_features")] +mod shards_manager; diff --git a/integration-tests/src/tests/client/shards_manager.rs b/integration-tests/src/tests/client/shards_manager.rs new file mode 100644 index 00000000000..f4a4d75b31f --- /dev/null +++ b/integration-tests/src/tests/client/shards_manager.rs @@ -0,0 +1,63 @@ +use near_chain::{ChainGenesis, ChainStoreAccess}; +use near_chunks::ShardsManager; +use near_client::test_utils::TestEnv; +use near_logger_utils::init_test_logger; + +use near_network_primitives::types::PartialEncodedChunkRequestMsg; + +/// Checks that various ways of preparing partial encode chunk request give the +/// same result. +/// +/// The test generates a block and than uses different methods of constructing +/// a PartialEncodedChunkResponseMsg to at the end compare if all of the methods +/// gave the same result. +#[test] +fn test_prepare_partial_encoded_chunk_response() { + init_test_logger(); + + let mut env = TestEnv::builder(ChainGenesis::test()).build(); + + let height = env.clients[0].chain.head().unwrap().height; + env.produce_block(0, height + 1); + // No idea why, but the test does not work if we produce just one block. + // For some reason the partial chunks aren’t saved in the storage and then + // various reads we expect to succeed fail. Generating two blocks makes + // things work. -- mina86 + env.produce_block(0, height + 2); + + // Figure out chunk hash. + let block = env.clients[0].chain.get_block_by_height(height + 2).unwrap(); + let chunk_hash = block.chunks()[0].chunk_hash(); + tracing::debug!(target: "chunks", "block hash: {}; chunk_hash: {}", block.hash(), chunk_hash.0); + + // Request partial encoded chunk from the validator. This goes through the + // regular network path and since we’ve only ever produced one chunk this + // will be served from an in-memory cache. + let request = PartialEncodedChunkRequestMsg { + chunk_hash: chunk_hash.clone(), + part_ords: (0..(env.clients[0].rs.total_shard_count() as u64)).collect(), + tracking_shards: Some(0u64).into_iter().collect(), + }; + let res = Some(env.get_partial_encoded_chunk_response(0, request.clone())); + + // Make the same request but this time call directly to ShardsManager and + // get the request from a PartialEncodedChunk object. + let partial_chunk = env.clients[0].chain.mut_store().get_partial_chunk(&chunk_hash).unwrap(); + let res_from_partial = ShardsManager::prepare_partial_encoded_chunk_response_from_partial( + request.clone(), + &partial_chunk, + ); + + // And finally, once more make the same request but this time construct the + // response from ShardChunk object. + let chunk = env.clients[0].chain.mut_store().get_chunk(&chunk_hash).unwrap().clone(); + let client = &mut env.clients[0]; + let res_from_chunk = client.shards_mgr.prepare_partial_encoded_chunk_response_from_chunk( + request.clone(), + &mut client.rs, + chunk, + ); + + assert_eq!(res, res_from_partial); + assert_eq!(res, res_from_chunk); +}