Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chain: ShardManager: add ability to serve partial chunks from ShardChunk #6377

Merged
merged 16 commits into from
Mar 9, 2022
Merged
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions chain/chunks-primitives/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub enum Error {
KnownPart,
ChainError(near_chain_primitives::Error),
IOError(std::io::Error),
Other(String),
mina86 marked this conversation as resolved.
Show resolved Hide resolved
}

impl fmt::Display for Error {
Expand Down
2 changes: 2 additions & 0 deletions chain/chunks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ tracing = "0.1.13"
borsh = "0.9"
lru = "0.7.2"
reed-solomon-erasure = "4"
visibility = "*"
mina86 marked this conversation as resolved.
Show resolved Hide resolved

near-crypto = { path = "../../core/crypto" }
near-primitives = { path = "../../core/primitives" }
Expand All @@ -33,3 +34,4 @@ assert_matches = "1.5.0"
[features]
byzantine_asserts = ["near-chain/byzantine_asserts"]
expensive_tests = []
test_features = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment. Also maybe we should call it 'reconstruct_partial_chunk_on_the_fly' or something like that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m planning to make GCing a configurable feature.

195 changes: 80 additions & 115 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ impl ShardsManager {
//
// TODO(#6242): This is currently not implemented and effectively
// this is dead code.
self.prepare_partial_encoded_chunk_response_from_chunk(request, chain_store, rs, chunk)
self.prepare_partial_encoded_chunk_response_from_chunk(request, rs, chunk)
} else {
None
};
Expand All @@ -1027,11 +1027,12 @@ impl ShardsManager {
}
}

/// Prepares response to a partial encoded chunk request from
/// a corresponding encoded_chunks cache entry. If the entry can satisfy
/// the requests (i.e. contains all necessary parts and shards) the method
/// returns a [`PartialEncodedChunkResponseMsg`] object; otherwise returns
/// `None`.
/// Prepares response to a partial encoded chunk request from an entry in
/// a encoded_chunks in-memory cache.
///
/// If the entry can satisfy the requests (i.e. contains all necessary parts
/// and shards) the method returns a [`PartialEncodedChunkResponseMsg`]
/// object; otherwise returns `None`.
fn prepare_partial_encoded_chunk_response_from_cache(
request: PartialEncodedChunkRequestMsg,
entry: &EncodedChunksCacheEntry,
Expand All @@ -1051,11 +1052,13 @@ impl ShardsManager {
)
}

/// Prepares response to a partial encoded chunk request from
/// a corresponding partial chunk read from the storage. If the partial
/// chunk can satisfy the requests (i.e. contains all necessary parts and
/// shards) the method returns a [`PartialEncodedChunkResponseMsg`] object;
/// otherwise returns `None`.
/// Prepares response to a partial encoded chunk request from a partial
/// chunk read from the storage.
///
/// If the partial chunk can satisfy the requests (i.e. contains all
/// necessary parts and shards) the method returns
/// a [`PartialEncodedChunkResponseMsg`] object; otherwise returns `None`.
#[cfg_attr(feature = "test_features", visibility::make(pub))]
mina86 marked this conversation as resolved.
Show resolved Hide resolved
fn prepare_partial_encoded_chunk_response_from_partial(
request: PartialEncodedChunkRequestMsg,
partial_chunk: &PartialEncodedChunk,
Expand Down Expand Up @@ -1089,78 +1092,81 @@ impl ShardsManager {
)
}

/// Constructs a vector of [`ReceiptsProof`]s for specified chunk.
fn make_outgoing_receipts_proofs<T>(
&self,
chunk_header: &ShardChunkHeader,
outgoing_receipts: &Vec<Receipt>,
mina86 marked this conversation as resolved.
Show resolved Hide resolved
make_elem: impl Fn(ReceiptProof) -> T,
mina86 marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<Vec<T>, near_chunks_primitives::Error> {
mina86 marked this conversation as resolved.
Show resolved Hide resolved
let shard_id = chunk_header.shard_id();
let shard_layout = self
.runtime_adapter
.get_shard_layout_from_prev_block(chunk_header.prev_block_hash_ref())?;

let hashes = Chain::build_receipts_hashes(&outgoing_receipts, &shard_layout);
let (root, proofs) = merklize(&hashes);
if chunk_header.outgoing_receipts_root() != root {
return Err(near_chunks_primitives::Error::Other(format!(
"unexpected outgoing receipts root; want: {}, but got: {}",
chunk_header.outgoing_receipts_root(),
root
)));
mina86 marked this conversation as resolved.
Show resolved Hide resolved
}

let mut receipts_by_shard =
Chain::group_receipts_by_shard(outgoing_receipts.clone(), &shard_layout);
let proofs = proofs
.into_iter()
.enumerate()
.map(|(proof_shard_id, proof)| {
let proof_shard_id = proof_shard_id as u64;
let receipts = receipts_by_shard.remove(&proof_shard_id).unwrap_or_else(Vec::new);
let shard_proof = ShardProof {
from_shard_id: shard_id,
to_shard_id: proof_shard_id,
proof: proof,
};
make_elem(ReceiptProof(receipts, shard_proof))
})
.collect();
Ok(proofs)
}

/// Prepares response to a partial encoded chunk request from a chunk read
/// from the storage.
///
/// This requires encoding the chunk and as such is computationally
/// expensive operation. If possible, the request should be served from
/// EncodedChunksCacheEntry or PartialEncodedChunk instead.
#[cfg_attr(feature = "test_features", visibility::make(pub))]
fn prepare_partial_encoded_chunk_response_from_chunk(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should move it to a separate file (to keep this lib.rs a little bit smaller?)

Copy link
Contributor Author

@mina86 mina86 Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s not immediately obvious to be me what should be moved to separate file though to be honest. Code that handles partial encoded chunk requests is less than 250 lines so moving just it wouldn’t really help reduce the size of lib.rs which is 2700 lines. This would need some more care and thought to figure out which of the methods in the struct belong together in a separate file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't think this makes sense to move to a separate file, as it seems to be pretty entangled with the rest.

The easiest wins in terms of keeping files small and logic clean are usually around various small helpers. Things like RequestPool or SealsManager seem like better candidates for extraction.

&mut self,
request: PartialEncodedChunkRequestMsg,
chain_store: &mut ChainStore,
rs: &mut ReedSolomonWrapper,
chunk: ShardChunk,
) -> Option<PartialEncodedChunkResponseMsg> {
mina86 marked this conversation as resolved.
Show resolved Hide resolved
// TODO(mina86): Is it correct to use rs.data_shard_count() and
// rs.total_shard_count() here? Can those values change over time?
let total_parts = rs.total_shard_count();
for ord in request.part_ords.iter() {
mina86 marked this conversation as resolved.
Show resolved Hide resolved
let ord: usize = (*ord).try_into().unwrap();
mina86 marked this conversation as resolved.
Show resolved Hide resolved
if ord >= total_parts {
debug!(target:"chunks", "Not sending {:?}, requested part_ord {} but we only expect {} total",
warn!(target:"chunks", "Not sending {}, requested part_ord {} but we only expect {} total",
mina86 marked this conversation as resolved.
Show resolved Hide resolved
request.chunk_hash.0, ord, total_parts);
return None;
}
}

// Figure out basic parameters such as shard id, epoch id, protocol
// version and shard layout. Those will be needed later when fetching
// other data and constructing encoded shard chunk.
let header = chunk.cloned_header();
let prev_block_hash = header.prev_block_hash_ref();
let shard_id = header.shard_id();
let shard_layout = self
.runtime_adapter
.get_shard_layout_from_prev_block(prev_block_hash)
.map_err(|err| {
debug!(target: "chunks", "Not sending {:?}, failed to get shard layout: {}", request.chunk_hash.0, err);
})
.ok()?;

// Get outgoing receipts for the chunk and construct vector of their
// proofs.
let outgoing_receipts = chain_store
.get_outgoing_receipts_for_shard(
&*self.runtime_adapter,
*chunk.prev_block(),
shard_id,
chunk.height_included(),
)
let outgoing_receipts = chunk.receipts();
let receipt_proofs = self
.make_outgoing_receipts_proofs(&header, &outgoing_receipts, |p| p)
.map_err(|err| {
debug!(target: "chunks", "Not sending {:?}, failed to get outgoing receipts: {}", request.chunk_hash.0, err);
}).ok()?;
let outgoing_receipts_hashes =
Chain::build_receipts_hashes(&outgoing_receipts, &shard_layout);
let (outgoing_receipts_root, outgoing_receipts_proofs) =
merklize(&outgoing_receipts_hashes);
if header.outgoing_receipts_root() != outgoing_receipts_root {
error!(target: "chunks",
"Not sending {:?}, expected outgoing receipts root doesn’t match calculated: {} != {}",
request.chunk_hash.0, header.outgoing_receipts_root(), outgoing_receipts_root);
return None;
}

let mut receipts_by_shard =
Chain::group_receipts_by_shard(outgoing_receipts.clone(), &shard_layout);
let receipt_proofs: Vec<_> = outgoing_receipts_proofs
.iter()
.enumerate()
.map(|(proof_shard_id, proof)| {
let proof_shard_id = proof_shard_id as u64;
let receipts = receipts_by_shard.remove(&proof_shard_id).unwrap_or_else(Vec::new);
let shard_proof = ShardProof {
from_shard_id: shard_id,
to_shard_id: proof_shard_id,
proof: proof.clone(),
};
ReceiptProof(receipts, shard_proof)
warn!(target: "chunks", "Not sending {}, {}", request.chunk_hash.0, err);
})
.collect();
.ok()?;

// Construct EncodedShardChunk. If we earlier determined that we will
// need parity parts, instruct the constructor to calculate them as
Expand All @@ -1169,27 +1175,27 @@ impl ShardsManager {
rs,
chunk.transactions().to_vec(),
&outgoing_receipts).map_err(|err| {
debug!(target: "chunks", "Not sending {:?}, failed to encode transaction receipts: {}", request.chunk_hash.0, err);
warn!(target: "chunks", "Not sending {}, failed to encode transaction receipts: {}", request.chunk_hash.0, err);
}).ok()?;
if header.encoded_length() != encoded_length {
error!(target: "chunks",
"Not sending {:?}, expected encoded length doesn’t match calculated: {} != {}",
warn!(target: "chunks",
"Not sending {}, expected encoded length doesn’t match calculated: {} != {}",
request.chunk_hash.0, header.encoded_length(), encoded_length);
return None;
}

let mut content = EncodedShardChunkBody { parts };
if let Err(err) = content.reconstruct(rs) {
error!(target: "chunks",
"Not sending {:?}, failed to reconstruct RS parity parts: {}",
warn!(target: "chunks",
"Not sending {}, failed to reconstruct RS parity parts: {}",
request.chunk_hash.0, err);
return None;
}

let (encoded_merkle_root, merkle_paths) = content.get_merkle_hash_and_paths();
if header.encoded_merkle_root() != encoded_merkle_root {
error!(target: "chunks",
"Not sending {:?}, expected encoded Merkle root doesn’t match calculated: {} != {}",
warn!(target: "chunks",
"Not sending {}, expected encoded Merkle root doesn’t match calculated: {} != {}",
request.chunk_hash.0, header.encoded_merkle_root(), encoded_merkle_root);
return None;
}
Expand Down Expand Up @@ -1240,7 +1246,7 @@ impl ShardsManager {
let maybe_known_parts: Option<Vec<_>> = parts_iter.collect();
let parts = match maybe_known_parts {
None => {
debug!(target:"chunks", "Not sending {:?}, some parts are missing",
debug!(target:"chunks", "Not sending {}, some parts are missing",
chunk_hash.0);
return None;
}
Expand All @@ -1250,7 +1256,7 @@ impl ShardsManager {
let maybe_known_receipts: Option<Vec<_>> = receipts_iter.collect();
let receipts = match maybe_known_receipts {
None => {
debug!(target:"chunks", "Not sending {:?}, some receipts are missing",
debug!(target:"chunks", "Not sending {}, some receipts are missing",
chunk_hash.0);
return None;
}
Expand Down Expand Up @@ -2055,28 +2061,7 @@ impl ShardsManager {
store_update: &mut ChainStoreUpdate<'_>,
) -> Result<(), Error> {
let header = encoded_chunk.cloned_header();
let shard_id = header.shard_id();
let shard_layout =
self.runtime_adapter.get_shard_layout_from_prev_block(&header.prev_block_hash())?;
let outgoing_receipts_hashes =
Chain::build_receipts_hashes(&outgoing_receipts, &shard_layout);
let (outgoing_receipts_root, outgoing_receipts_proofs) =
merklize(&outgoing_receipts_hashes);
assert_eq!(header.outgoing_receipts_root(), outgoing_receipts_root);

// Save this chunk into encoded_chunks & process encoded chunk to add to the store.
let mut receipts_by_shard =
Chain::group_receipts_by_shard(outgoing_receipts, &shard_layout);
let receipts = outgoing_receipts_proofs
.into_iter()
.enumerate()
.map(|(to_shard_id, proof)| {
let to_shard_id = to_shard_id as u64;
let receipts = receipts_by_shard.remove(&to_shard_id).unwrap_or_else(Vec::new);
let shard_proof = ShardProof { from_shard_id: shard_id, to_shard_id, proof };
ReceiptProof(receipts, shard_proof)
})
.collect();
let receipts = self.make_outgoing_receipts_proofs(&header, &outgoing_receipts, |p| p)?;
let partial_chunk = PartialEncodedChunkV2 {
header,
parts: encoded_chunk
Expand Down Expand Up @@ -2117,16 +2102,8 @@ impl ShardsManager {
let chunk_header = encoded_chunk.cloned_header();
let prev_block_hash = chunk_header.prev_block_hash();
let shard_id = chunk_header.shard_id();
let shard_layout =
self.runtime_adapter.get_shard_layout_from_prev_block(&prev_block_hash)?;
let outgoing_receipts_hashes =
Chain::build_receipts_hashes(&outgoing_receipts, &shard_layout);
let (outgoing_receipts_root, outgoing_receipts_proofs) =
merklize(&outgoing_receipts_hashes);
assert_eq!(chunk_header.outgoing_receipts_root(), outgoing_receipts_root);

let mut block_producer_mapping = HashMap::new();

for part_ord in 0..self.runtime_adapter.num_total_parts() {
let part_ord = part_ord as u64;
let to_whom = self.runtime_adapter.get_part_owner(&prev_block_hash, part_ord).unwrap();
Expand All @@ -2135,20 +2112,8 @@ impl ShardsManager {
entry.push(part_ord);
}

let mut receipts_by_shard =
Chain::group_receipts_by_shard(outgoing_receipts, &shard_layout);
let receipt_proofs: Vec<_> = outgoing_receipts_proofs
.into_iter()
.enumerate()
.map(|(proof_shard_id, proof)| {
let proof_shard_id = proof_shard_id as u64;
let receipts = receipts_by_shard.remove(&proof_shard_id).unwrap_or_else(Vec::new);
let shard_proof =
ShardProof { from_shard_id: shard_id, to_shard_id: proof_shard_id, proof };
Arc::new(ReceiptProof(receipts, shard_proof))
})
.collect();

let receipt_proofs =
self.make_outgoing_receipts_proofs(&chunk_header, &outgoing_receipts, Arc::new)?;
for (to_whom, part_ords) in block_producer_mapping {
let part_receipt_proofs = receipt_proofs
.iter()
Expand Down
8 changes: 2 additions & 6 deletions chain/client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1410,21 +1410,17 @@ impl TestEnv {
}
}

fn get_partial_encoded_chunk_response(
pub fn get_partial_encoded_chunk_response(
&mut self,
id: usize,
request: PartialEncodedChunkRequestMsg,
) -> PartialEncodedChunkResponseMsg {
let client = &mut self.clients[id];
let total_parts = client.chain.runtime_adapter.num_total_parts();
let data_parts = client.chain.runtime_adapter.num_data_parts();
let parity_parts = total_parts - data_parts;
let mut rs = ReedSolomonWrapper::new(data_parts, parity_parts);
client.shards_mgr.process_partial_encoded_chunk_request(
request,
CryptoHash::default(),
client.chain.mut_store(),
&mut rs,
&mut client.rs,
);
let response = self.network_adapters[id].pop().unwrap();
if let PeerManagerMessageRequest::NetworkRequests(
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ performance_stats = [
"near-network/performance_stats",
]
expensive_tests = []
test_features = ["nearcore/test_features"]
test_features = ["nearcore/test_features", "near-chunks/test_features"]
mina86 marked this conversation as resolved.
Show resolved Hide resolved
protocol_feature_alt_bn128 = [
"near-primitives/protocol_feature_alt_bn128",
"node-runtime/protocol_feature_alt_bn128",
Expand Down
2 changes: 2 additions & 0 deletions integration-tests/src/tests/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ mod runtimes;
#[cfg(feature = "sandbox")]
mod sandbox;
mod sharding_upgrade;
#[cfg(feature = "test_features")]
mod shards_manager;
Loading