Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chain: refactor process_partial_encoded_chunk_request method #6356

Merged
merged 3 commits into from
Mar 1, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 94 additions & 68 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -996,98 +996,124 @@ impl ShardsManager {
route_back: CryptoHash,
chain_store: &mut ChainStore,
) {
debug!(target: "chunks", "Received partial encoded chunk request for {:?}, part_ordinals: {:?}, receipts: {:?}, I'm {:?}", request.chunk_hash.0, request.part_ords, request.tracking_shards, self.me);

// Check if we have the chunk in our cache
if let Some(entry) = self.encoded_chunks.get(&request.chunk_hash) {
// Create iterators which _might_ contain the requested parts.
let parts_iter = request.part_ords.iter().map(|ord| entry.parts.get(ord).cloned());
let receipts_iter = request
.tracking_shards
.iter()
.map(|shard_id| entry.receipts.get(shard_id).cloned());

// Pass iterators to function which will evaluate them. Since iterators are lazy
// we will clone as few elements as possible before realizing not all are present.
// In the case all are present, the response is sent.
return self.maybe_send_partial_encoded_chunk_response(
request.chunk_hash,
route_back,
parts_iter,
receipts_iter,
);
// If not in the cache then check the storage
debug!(target: "chunks", "Received partial encoded chunk request for {:?}, part_ordinals: {:?}, shards: {:?}, I'm {:?}", request.chunk_hash.0, request.part_ords, request.tracking_shards, self.me);

let response = if let Some(entry) = self.encoded_chunks.get(&request.chunk_hash) {
Self::prepare_partial_encoded_chunk_response_from_cache(request, entry)
} else if let Ok(partial_chunk) = chain_store.get_partial_chunk(&request.chunk_hash) {
// Index _references_ to the parts we know about by their `part_ord`. Since only
// references are used in this index, we will only clone the requested parts, not
// all of them.
let present_parts: HashMap<u64, _> =
partial_chunk.parts().iter().map(|part| (part.part_ord, part)).collect();
// Create an iterator which _might_ contain the request parts. Again, we are
// using the laziness of iterators for efficiency.
let parts_iter =
request.part_ords.iter().map(|ord| present_parts.get(ord).map(|x| *x).cloned());

// Same process for receipts as above for parts.
let present_receipts: HashMap<ShardId, _> = partial_chunk
.receipts()
.iter()
.map(|receipt| (receipt.1.to_shard_id, receipt))
.collect();
let receipts_iter = request
.tracking_shards
.iter()
.map(|shard_id| present_receipts.get(shard_id).map(|x| *x).cloned());

// Pass iterators to function, same as cache case.
return self.maybe_send_partial_encoded_chunk_response(
request.chunk_hash,
route_back,
parts_iter,
receipts_iter,
);
Self::prepare_partial_encoded_chunk_response_from_partial(request, partial_chunk)
mina86 marked this conversation as resolved.
Show resolved Hide resolved
} else {
None
};

if let Some(response) = response {
self.peer_manager_adapter.do_send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkResponse { route_back, response },
))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in separate PR:

maybe we should have some stats - on how many times we don't have the response (as normally this should not happen).

}

/// Checks `parts_iter` and `receipts_iter`, if all elements are `Some` then sends
/// a `PartialEncodedChunkResponse`. `parts_iter` is only evaluated up to the first `None`
/// (if any); since iterators are lazy this could save some work if there were any `Some`
/// elements later in the iterator. `receipts_iter` is only evaluated if `part_iter` was
/// completely present. Similarly, `receipts_iter` is only evaluated up to the first `None`
/// if it is evaluated at all.
fn maybe_send_partial_encoded_chunk_response<A, B>(
&self,
/// Prepares response to a partial encoded chunk request from
/// a corresponding encoded_chunks cache entry. If the entry can satisfy
/// the requests (i.e. contains all necessary parts and shards) the method
/// returns a [`PartialEncodedChunkResponseMsg`] object; otherwise returns
/// `None`.
fn prepare_partial_encoded_chunk_response_from_cache(
request: PartialEncodedChunkRequestMsg,
entry: &EncodedChunksCacheEntry,
) -> Option<PartialEncodedChunkResponseMsg> {
// Create iterators which _might_ contain the requested parts.
let parts_iter = request.part_ords.iter().map(|ord| entry.parts.get(ord).cloned());
let receipts_iter =
request.tracking_shards.iter().map(|shard_id| entry.receipts.get(shard_id).cloned());

// Pass iterators to function which will evaluate them. Since iterators are lazy
// we will clone as few elements as possible before realizing not all are present.
// In the case all are present, the response is sent.
Self::prepare_partial_encoded_chunk_response_from_iters(
request.chunk_hash,
parts_iter,
receipts_iter,
)
}

/// Prepares response to a partial encoded chunk request from
/// a corresponding partial chunk read from the storage. If the partial
/// chunk can satisfy the requests (i.e. contains all necessary parts and
/// shards) the method returns a [`PartialEncodedChunkResponseMsg`] object;
/// otherwise returns `None`.
fn prepare_partial_encoded_chunk_response_from_partial(
request: PartialEncodedChunkRequestMsg,
partial_chunk: &PartialEncodedChunk,
) -> Option<PartialEncodedChunkResponseMsg> {
// Index _references_ to the parts we know about by their `part_ord`. Since only
// references are used in this index, we will only clone the requested parts, not
// all of them.
let present_parts: HashMap<u64, _> =
partial_chunk.parts().iter().map(|part| (part.part_ord, part)).collect();
// Create an iterator which _might_ contain the request parts. Again, we are
// using the laziness of iterators for efficiency.
let parts_iter =
request.part_ords.iter().map(|ord| present_parts.get(ord).map(|x| *x).cloned());

// Same process for receipts as above for parts.
let present_receipts: HashMap<ShardId, _> = partial_chunk
.receipts()
.iter()
.map(|receipt| (receipt.1.to_shard_id, receipt))
.collect();
let receipts_iter = request
.tracking_shards
.iter()
.map(|shard_id| present_receipts.get(shard_id).map(|x| *x).cloned());

// Pass iterators to function, same as cache case.
Self::prepare_partial_encoded_chunk_response_from_iters(
request.chunk_hash,
parts_iter,
receipts_iter,
)
}

/// Checks if `parts_iter` and `receipts_iter` contain no `None` elements.
/// It evaluates the iterators only up to the first `None` value (if any);
/// since iterators are lazy this saves some work if there are any `Some`
/// elements later in the iterator. `receipts_iter` is not iterated if
/// `part_iter` contained any `None` values.
///
/// If there were no `None` elements in either of the iterators, the method
/// returns a [`PartialEncodedChunkResponseMsg`] object; otherwise returns
/// `None`.
fn prepare_partial_encoded_chunk_response_from_iters<A, B>(
chunk_hash: ChunkHash,
route_back: CryptoHash,
parts_iter: A,
receipts_iter: B,
) where
) -> Option<PartialEncodedChunkResponseMsg>
where
A: Iterator<Item = Option<PartialEncodedChunkPart>>,
B: Iterator<Item = Option<ReceiptProof>>,
{
let maybe_known_parts: Option<Vec<_>> = parts_iter.collect();
let parts = match maybe_known_parts {
None => {
debug!(target:"chunks", "Not responding, some parts are missing");
return;
debug!(target:"chunks", "Not sending {:?}, some parts are missing",
chunk_hash.0);
return None;
}
Some(known_parts) => known_parts,
};

let maybe_known_receipts: Option<Vec<_>> = receipts_iter.collect();
let receipts = match maybe_known_receipts {
None => {
debug!(target:"chunks", "Not responding, some receipts are missing");
return;
debug!(target:"chunks", "Not sending {:?}, some receipts are missing",
chunk_hash.0);
return None;
}
Some(known_receipts) => known_receipts,
};

let response = PartialEncodedChunkResponseMsg { chunk_hash, parts, receipts };

self.peer_manager_adapter.do_send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkResponse { route_back, response },
));
Some(PartialEncodedChunkResponseMsg { chunk_hash, parts, receipts })
}

pub fn check_chunk_complete(
Expand Down