Skip to content

Commit

Permalink
chain: refactor process_partial_encoded_chunk_request method
Browse files Browse the repository at this point in the history
Extract two more methods from `process_partial_encoded_chunk_request`
which correspond to `if` bodies that used to be in it.  This makes each
function shorter and thus easier to read especially as in the future
more branches will be added to the method.

Furthermore, move sending of the message to the method changing
`maybe_send_partial_encoded_chunk_response` into method which prepares
the response only.

Issue: near#6242
  • Loading branch information
mina86 committed Feb 28, 2022
1 parent c81b873 commit ddb6207
Showing 1 changed file with 88 additions and 64 deletions.
152 changes: 88 additions & 64 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -998,78 +998,106 @@ impl ShardsManager {
) {
debug!(target: "chunks", "Received partial encoded chunk request for {:?}, part_ordinals: {:?}, receipts: {:?}, I'm {:?}", request.chunk_hash.0, request.part_ords, request.tracking_shards, self.me);

// Check if we have the chunk in our cache
if let Some(entry) = self.encoded_chunks.get(&request.chunk_hash) {
// Create iterators which _might_ contain the requested parts.
let parts_iter = request.part_ords.iter().map(|ord| entry.parts.get(ord).cloned());
let receipts_iter = request
.tracking_shards
.iter()
.map(|shard_id| entry.receipts.get(shard_id).cloned());

// Pass iterators to function which will evaluate them. Since iterators are lazy
// we will clone as few elements as possible before realizing not all are present.
// In the case all are present, the response is sent.
return self.maybe_send_partial_encoded_chunk_response(
request.chunk_hash,
route_back,
parts_iter,
receipts_iter,
);
// If not in the cache then check the storage
let response = if let Some(entry) = self.encoded_chunks.get(&request.chunk_hash) {
Self::prepare_partial_encoded_chunk_response_from_cache(request, entry)
} else if let Ok(partial_chunk) = chain_store.get_partial_chunk(&request.chunk_hash) {
// Index _references_ to the parts we know about by their `part_ord`. Since only
// references are used in this index, we will only clone the requested parts, not
// all of them.
let present_parts: HashMap<u64, _> =
partial_chunk.parts().iter().map(|part| (part.part_ord, part)).collect();
// Create an iterator which _might_ contain the request parts. Again, we are
// using the laziness of iterators for efficiency.
let parts_iter =
request.part_ords.iter().map(|ord| present_parts.get(ord).map(|x| *x).cloned());

// Same process for receipts as above for parts.
let present_receipts: HashMap<ShardId, _> = partial_chunk
.receipts()
.iter()
.map(|receipt| (receipt.1.to_shard_id, receipt))
.collect();
let receipts_iter = request
.tracking_shards
.iter()
.map(|shard_id| present_receipts.get(shard_id).map(|x| *x).cloned());

// Pass iterators to function, same as cache case.
return self.maybe_send_partial_encoded_chunk_response(
request.chunk_hash,
route_back,
parts_iter,
receipts_iter,
);
Self::prepare_partial_encoded_chunk_response_from_partial(request, partial_chunk)
} else {
None
};

if let Some(response) = response {
self.peer_manager_adapter.do_send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkResponse { route_back, response },
))
}
}

/// Checks `parts_iter` and `receipts_iter`, if all elements are `Some` then sends
/// a `PartialEncodedChunkResponse`. `parts_iter` is only evaluated up to the first `None`
/// (if any); since iterators are lazy this could save some work if there were any `Some`
/// elements later in the iterator. `receipts_iter` is only evaluated if `part_iter` was
/// completely present. Similarly, `receipts_iter` is only evaluated up to the first `None`
/// if it is evaluated at all.
fn maybe_send_partial_encoded_chunk_response<A, B>(
&self,
/// Prepares response to a partial encoded chunk request from
/// a corresponding encoded_chunks cache entry. If the entry can satisfy
/// the requests (i.e. contains all necessary parts and shards) the method
/// returns a [`PartialEncodedChunkResponseMsg`] object; otherwise returns
/// `None`.
fn prepare_partial_encoded_chunk_response_from_cache(
request: PartialEncodedChunkRequestMsg,
entry: &EncodedChunksCacheEntry,
) -> Option<PartialEncodedChunkResponseMsg> {
// Create iterators which _might_ contain the requested parts.
let parts_iter = request.part_ords.iter().map(|ord| entry.parts.get(ord).cloned());
let receipts_iter =
request.tracking_shards.iter().map(|shard_id| entry.receipts.get(shard_id).cloned());

// Pass iterators to function which will evaluate them. Since iterators are lazy
// we will clone as few elements as possible before realizing not all are present.
// In the case all are present, the response is sent.
Self::prepare_partial_encoded_chunk_response_from_iters(
request.chunk_hash,
parts_iter,
receipts_iter,
)
}

/// Prepares response to a partial encoded chunk request from
/// a corresponding partial chunk read from the storage. If the partial
/// chunk can satisfy the requests (i.e. contains all necessary parts and
/// shards) the method returns a [`PartialEncodedChunkResponseMsg`] object;
/// otherwise returns `None`.
fn prepare_partial_encoded_chunk_response_from_partial(
request: PartialEncodedChunkRequestMsg,
partial_chunk: &PartialEncodedChunk,
) -> Option<PartialEncodedChunkResponseMsg> {
// Index _references_ to the parts we know about by their `part_ord`. Since only
// references are used in this index, we will only clone the requested parts, not
// all of them.
let present_parts: HashMap<u64, _> =
partial_chunk.parts().iter().map(|part| (part.part_ord, part)).collect();
// Create an iterator which _might_ contain the request parts. Again, we are
// using the laziness of iterators for efficiency.
let parts_iter =
request.part_ords.iter().map(|ord| present_parts.get(ord).map(|x| *x).cloned());

// Same process for receipts as above for parts.
let present_receipts: HashMap<ShardId, _> = partial_chunk
.receipts()
.iter()
.map(|receipt| (receipt.1.to_shard_id, receipt))
.collect();
let receipts_iter = request
.tracking_shards
.iter()
.map(|shard_id| present_receipts.get(shard_id).map(|x| *x).cloned());

// Pass iterators to function, same as cache case.
Self::prepare_partial_encoded_chunk_response_from_iters(
request.chunk_hash,
parts_iter,
receipts_iter,
)
}

/// Checks if `parts_iter` and `receipts_iter` contain no `None` elements.
/// It evaluates the iterators only up to the first `None` value (if any);
/// since iterators are lazy this saves some work if there are any `Some`
/// elements later in the iterator. `receipts_iter` is not iterated if
/// `part_iter` contained any `None` values.
///
/// If there were no `None` elements in either of the iterators, the method
/// returns a [`PartialEncodedChunkResponseMsg`] object; otherwise returns
/// `None`.
fn prepare_partial_encoded_chunk_response_from_iters<A, B>(
chunk_hash: ChunkHash,
route_back: CryptoHash,
parts_iter: A,
receipts_iter: B,
) where
) -> Option<PartialEncodedChunkResponseMsg>
where
A: Iterator<Item = Option<PartialEncodedChunkPart>>,
B: Iterator<Item = Option<ReceiptProof>>,
{
let maybe_known_parts: Option<Vec<_>> = parts_iter.collect();
let parts = match maybe_known_parts {
None => {
debug!(target:"chunks", "Not responding, some parts are missing");
return;
return None;
}
Some(known_parts) => known_parts,
};
Expand All @@ -1078,16 +1106,12 @@ impl ShardsManager {
let receipts = match maybe_known_receipts {
None => {
debug!(target:"chunks", "Not responding, some receipts are missing");
return;
return None;
}
Some(known_receipts) => known_receipts,
};

let response = PartialEncodedChunkResponseMsg { chunk_hash, parts, receipts };

self.peer_manager_adapter.do_send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkResponse { route_back, response },
));
Some(PartialEncodedChunkResponseMsg { chunk_hash, parts, receipts })
}

pub fn check_chunk_complete(
Expand Down

0 comments on commit ddb6207

Please sign in to comment.