Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collation fetching fairness #4880

Open
wants to merge 106 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
106 commits
Select commit Hold shift + click to select a range
f4738dc
Collation fetching fairness
tdimitrov Jun 26, 2024
c7074da
Comments
tdimitrov Jun 26, 2024
73eee87
Fix tests and add some logs
tdimitrov Jun 26, 2024
fa321ce
Fix per para limit calculation in `is_collations_limit_reached`
tdimitrov Jun 27, 2024
96392a5
Fix default `TestState` initialization: claim queue len should be equ…
tdimitrov Jun 27, 2024
0f28aa8
clippy
tdimitrov Jun 27, 2024
e5ea548
Update `is_collations_limit_reached` - remove seconded limit
tdimitrov Jun 28, 2024
9abc898
Fix pending fetches and more tests
tdimitrov Jul 1, 2024
c07890b
Remove unnecessary clone
tdimitrov Jul 1, 2024
e50440e
Comments
tdimitrov Jul 1, 2024
42b05c7
Better var names
tdimitrov Jul 1, 2024
2f5a466
Fix `pick_a_collation_to_fetch` and add more tests
tdimitrov Jul 2, 2024
ff96ef9
Fix test: `collation_fetching_respects_claim_queue`
tdimitrov Jul 2, 2024
e837689
Add `collation_fetching_fallback_works` test + comments
tdimitrov Jul 2, 2024
91cdd13
More tests
tdimitrov Jul 3, 2024
9f2d59b
Fix collation limit fallback
tdimitrov Jul 3, 2024
a10c86d
Separate `claim_queue_support` from `ProspectiveParachainsMode`
tdimitrov Jul 3, 2024
b39858a
Fix comments and add logs
tdimitrov Jul 3, 2024
b30f340
Update test: `collation_fetching_prefer_entries_earlier_in_claim_queue`
tdimitrov Jul 3, 2024
c0f18b9
Fix `pick_a_collation_to_fetch` and more tests
tdimitrov Jul 3, 2024
703ed6d
Merge branch 'master' into tsv-collator-proto-fairness
tdimitrov Jul 3, 2024
fba7ca6
Fix `pick_a_collation_to_fetch` - iter 1
tdimitrov Jul 4, 2024
d4f4ce2
Fix `pick_a_collation_to_fetch` - iter 2
tdimitrov Jul 4, 2024
5f52712
Remove a redundant runtime version check
tdimitrov Jul 4, 2024
6c73e24
formatting and comments
tdimitrov Jul 4, 2024
752f3cc
pr doc
tdimitrov Jul 4, 2024
f0069f1
add license
tdimitrov Jul 4, 2024
6b9f0b3
clippy
tdimitrov Jul 4, 2024
5f6dcdd
Merge branch 'master' into tsv-collator-proto-fairness
tdimitrov Jul 4, 2024
b8c1b85
Update prdoc/pr_4880.prdoc
tdimitrov Jul 5, 2024
f26362f
Limit collations based on seconded count instead of number of fetches
tdimitrov Jul 7, 2024
d6857fc
Undo rename: is_seconded_limit_reached
tdimitrov Jul 7, 2024
cde28cd
fix collation tests
tdimitrov Jul 8, 2024
4c3db2a
`collations_fetching_respects_seconded_limit` test
tdimitrov Jul 8, 2024
b2bbdfe
nits
tdimitrov Jul 8, 2024
e220cb4
Merge branch 'master' into tsv-collator-proto-fairness
tdimitrov Aug 26, 2024
01d121e
Remove duplicated dependency after merge
tdimitrov Aug 26, 2024
7b3c002
Remove `ProspectiveParachainsMode` from collator-protocol, validator-…
tdimitrov Jul 10, 2024
5dffdde
Fix compilation errors in collation tests
tdimitrov Jul 10, 2024
1c1744b
`is_seconded_limit_reached` uses the whole view
tdimitrov Jul 11, 2024
aaccab1
Fix `is_seconded_limit_reached` check
tdimitrov Aug 30, 2024
b1df2e3
Trace logs useful for debugging tests
tdimitrov Sep 11, 2024
ce3a95e
Handle unconnected candidates
tdimitrov Sep 11, 2024
fe3c09d
Rework pre-prospective parachains tests to work with claim queue
tdimitrov Sep 11, 2024
b9ab579
Fix `collation_fetches_without_claimqueue`
tdimitrov Sep 12, 2024
fe623bc
Test - `collation_fetching_prefer_entries_earlier_in_claim_queue`
tdimitrov Sep 13, 2024
d216689
Remove collations test file - all tests are moved in prospective_para…
tdimitrov Sep 13, 2024
ea99c7a
fixup - collation_fetching_prefer_entries_earlier_in_claim_queue
tdimitrov Sep 16, 2024
ee155f5
New test - `collation_fetching_considers_advertisements_from_the_whol…
tdimitrov Sep 16, 2024
55b7902
Merge branch 'master' into tsv-collator-proto-fairness
tdimitrov Sep 17, 2024
515a784
Update PRdoc and comments
tdimitrov Sep 17, 2024
4ef6919
Combine `seconded_per_para` and `claims_per_para` from collations in …
tdimitrov Sep 17, 2024
bd7174f
No need to handle missing claim queue anymore
tdimitrov Sep 17, 2024
df6165e
Remove dead code and fix some comments
tdimitrov Sep 17, 2024
4c5c271
Remove `is_seconded_limit_reached` and use the code directly due to t…
tdimitrov Sep 17, 2024
b0e4627
Fix comments
tdimitrov Sep 17, 2024
d1cf41d
`pending_for_para` -> `is_pending_for_para`
tdimitrov Sep 17, 2024
df3a215
Fix `0011-async-backing-6-seconds-rate.toml` - set `lookahead` to 3 o…
tdimitrov Sep 19, 2024
b70807b
Set `lookahead` in polkadot/zombienet_tests/elastic_scaling/0002-elas…
tdimitrov Sep 20, 2024
f047036
paras_now -> assigned_paras
tdimitrov Sep 30, 2024
94e4fc3
Remove a duplicated parameter in `update_view`
tdimitrov Sep 30, 2024
386488b
Remove an outdated comment
tdimitrov Sep 30, 2024
ff312c9
Fix `seconded_and_pending_for_para_in_view`
tdimitrov Oct 2, 2024
88d0307
`claim_queue_state` becomes `unfulfilled_claim_queue_entries` - the b…
tdimitrov Oct 2, 2024
af78352
For consistency use `chain_ids` only from `test_state`
tdimitrov Oct 2, 2024
d636091
Limit the number of advertisements accepted by each peer for spam pro…
tdimitrov Oct 3, 2024
2bb82eb
Zombienet test
tdimitrov Oct 7, 2024
c782058
Rearrange imports
tdimitrov Oct 7, 2024
903f7f4
Newline and outdated comment
tdimitrov Oct 7, 2024
cefbce8
Undo `lookahead = 3` in zombienet tests
tdimitrov Oct 14, 2024
cb69361
Consider what's scheduled on the core when determining assignments
tdimitrov Oct 14, 2024
1142a90
Merge branch 'master' into tsv-collator-proto-fairness
tdimitrov Oct 14, 2024
4438349
Fix a clippy warning
tdimitrov Oct 14, 2024
e82c386
Update PRdoc
tdimitrov Oct 15, 2024
4b2d4c5
Apply suggestions from code review
tdimitrov Oct 15, 2024
1c91371
".git/.scripts/commands/fmt/fmt.sh"
Oct 15, 2024
be34132
Code review feedback
tdimitrov Oct 15, 2024
5c7b2ac
Fix a typo in prdoc
tdimitrov Oct 15, 2024
62c6473
`seconded_and_pending_for_para_in_view` looks up to the len of the cl…
tdimitrov Oct 16, 2024
9e3f62d
Merge branch 'master' into tsv-collator-proto-fairness
tdimitrov Oct 16, 2024
a4bc21f
rerun CI
tdimitrov Oct 16, 2024
6c103df
Fix zombienet test
tdimitrov Oct 18, 2024
15e3a74
Merge branch 'master' into tsv-collator-proto-fairness
tdimitrov Oct 18, 2024
d6b35ca
Relax expected block counts for each para
tdimitrov Oct 18, 2024
586b56b
Bump lookahead and decrease timeout
tdimitrov Oct 18, 2024
a04d480
Fix ZN pipeline - try 1
tdimitrov Oct 18, 2024
13d5d15
Fix ZN pipeline - try 2
tdimitrov Oct 18, 2024
86870d0
Fix ZN pipeline - try 3
tdimitrov Oct 18, 2024
7b822af
Fix ZN pipeline - try 4
tdimitrov Oct 18, 2024
558c82e
Merge branch 'master' into tsv-collator-proto-fairness
tdimitrov Oct 21, 2024
06c0fd0
Merge branch 'master' into tsv-collator-proto-fairness
tdimitrov Oct 24, 2024
ade7f9b
Rename ZN test
tdimitrov Oct 24, 2024
8ba2a80
Merge branch 'master' into tsv-collator-proto-fairness
tdimitrov Oct 28, 2024
ab70567
Handle merge conflicts
tdimitrov Oct 28, 2024
d24fdc1
When counting occupied slots from the claim queue consider relay pare…
tdimitrov Nov 6, 2024
f55390e
Add a test
tdimitrov Nov 7, 2024
a2093ee
Small style fixes in tests
tdimitrov Nov 7, 2024
ded6fb5
Fix a todo
tdimitrov Nov 7, 2024
cda9330
Fix `paths_to_relay_parent`
tdimitrov Nov 7, 2024
505eb24
Additional test for `paths_to_relay_parent`
tdimitrov Nov 8, 2024
94f573a
Simplifications
tdimitrov Nov 8, 2024
fa82404
Merge branch 'master' into tsv-collator-proto-fairness
tdimitrov Nov 8, 2024
55e7fb2
Resolve merge conflicts
tdimitrov Nov 8, 2024
e27ddd4
Fix todos
tdimitrov Nov 8, 2024
a10c0c1
Comment
tdimitrov Nov 8, 2024
ee11c6a
Remove unneeded log line
tdimitrov Nov 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions polkadot/node/network/collator-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ rstest = { workspace = true }
sp-core = { features = ["std"], workspace = true, default-features = true }
sp-keyring = { workspace = true, default-features = true }
sc-keystore = { workspace = true, default-features = true }
sp-tracing = { workspace = true }
sc-network = { workspace = true, default-features = true }
codec = { features = ["std"], workspace = true, default-features = true }

Expand Down
255 changes: 233 additions & 22 deletions polkadot/node/network/collator-protocol/src/validator_side/collation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@
//! ┌──────────────────────────────────────────┐
//! └─▶Advertised ─▶ Pending ─▶ Fetched ─▶ Validated
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved

use std::{collections::VecDeque, future::Future, pin::Pin, task::Poll};
use std::{
collections::{BTreeMap, VecDeque},
future::Future,
pin::Pin,
task::Poll,
};

use futures::{future::BoxFuture, FutureExt};
use polkadot_node_network_protocol::{
Expand Down Expand Up @@ -216,27 +221,97 @@ impl CollationStatus {
}

/// Information about collations per relay parent.
#[derive(Default)]
pub struct Collations {
/// What is the current status in regards to a collation for this relay parent?
pub status: CollationStatus,
/// Collator we're fetching from, optionally which candidate was requested.
///
/// This is the currently last started fetch, which did not exceed `MAX_UNSHARED_DOWNLOAD_TIME`
/// yet.
/// This is the last fetch for the relay parent. The value is used in
/// `get_next_collation_to_fetch` (called from `dequeue_next_collation_and_fetch`) to determine
/// if the last fetched collation is the same as the one which just finished. If yes - another
/// collation should be fetched. If not - another fetch was already initiated and
/// `get_next_collation_to_fetch` will do nothing.
///
/// For the reasons above this value is not set to `None` when the fetch is done! Don't use it
/// to check if there is a pending fetch.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should focus in documentation on what this is not how it is used. If you focus on how it is used and what to expect from it then you also open up for possibilities of it getting used elsewhere safely, as everybody can use it as long as they are fine with the stated contract.

Here in particular it seems that the name should also be changed to e.g. last_fetch, with documentation explaining what is to be expected: E.g. is this the last successful fetch or the last fetch that got initiated? When will this be None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is is confusing. I wanted to to use this field at some point and was surprised by the behaviour so I left a comment with my findings. I agree it would have been better to rename the field and leave a better comment. I'll fix it.

pub fetching_from: Option<(CollatorId, Option<CandidateHash>)>,
/// Collation that were advertised to us, but we did not yet fetch.
pub waiting_queue: VecDeque<(PendingCollation, CollatorId)>,
/// Collation that were advertised to us, but we did not yet fetch. Grouped by `ParaId`.
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
waiting_queue: BTreeMap<ParaId, VecDeque<(PendingCollation, CollatorId)>>,
/// How many collations have been seconded.
pub seconded_count: usize,
/// What collations were fetched so far for this relay parent.
fetched_per_para: BTreeMap<ParaId, usize>,
// Claims per `ParaId` for the assigned core at the relay parent. This information is obtained
// from `GroupAssignments` which contains either the claim queue (if runtime supports it) for
// the core or the `ParaId` of the parachain assigned to the core.
claims_per_para: BTreeMap<ParaId, usize>,
// Represents the claim queue at the relay parent. The `bool` field indicates if a candidate
// was fetched for the `ParaId` at the position in question. In other words - if the claim is
// 'satisfied'. If the claim queue is not available `claim_queue_state` will be `None`.
claim_queue_state: Option<Vec<(bool, ParaId)>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I think it's a good idea to optimistically track what claims should be 'satisfied' already.

Once we fetch something it does not necessarily mean we second it. We send it to backing for validation and only then second it so we might effectively backpedal from our decision.

Consider cq: [a, b, b, b, b, b, b, b, b, ...]
We fetch candidate for a.
cq_state: [1, 0, 0, 0, 0, 0, ...]
So then we effectively stop fetching for a.
Turns out the candidate we fetched from a was rejected in backing for being invalid. Nevertheless our claim_queue_state stays as is and we heavily de-prioritise fetches for a despite having seconded nothing for that slot. An malicious collator just by sending an invalid collation to all backers to block other incoming collations for that parachain.

Is that how the proposed system works or have I missed something? Thinking how big of an issue it is but it seems concerning. We might have to clean-up the cq_state in such cases (maybe).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that how the proposed system works or have I missed something? Thinking how big of an issue it is but it seems concerning. We might have to clean-up the cq_state in such cases (maybe).

I'm afraid that's correct. We run some checks before enqueuing the candidate but we don't validate it of course. I suppose it shouldn't be hard to craft a garbage candidate which will pass the second checks, claim the slot and prevent legitimate validators from pushing candidates.

Overall I think it's a good idea to optimistically track what claims should be 'satisfied' already.

It will make more sense to have the 'satisfaction' check based on what's seconded and keep the fetches as 'pending seconding' (similar to the way I handle pending fetches in this PR). We can track this with Seconded and Invalid messages and accept (and fetch) advertisements until we have enough seconded to 'satisfy' the claim queue.

A misbehaving collator will still be able to spam the validators but on the first invalid collation it is supposed to get reported (and disconnected maybe? I don't know how this part of the code works).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reworked the PR to track the number of seconded candidates per para. Additionally I realized that pending items per relay parent can't be more than one which simplifies the PR a little bit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also keep in mind that cores are rotated. In general we have to consider that a claim queue position can be occupied by a backing of another validator in the backing group, but with rotations at boundaries it can even happen that a claim queue position is already occupied by a backing of the previous group.

E.g. one relay parent earlier a previous backing group was assigned and the claim queue looked like this [A,B,C].
Now in our view the claim queue looks like: [B,C,D] ... A already moved out, but B and C are still valid and might have been already provided by the previous backing group.

Let me know how complex it gets to account for that, maybe we might want to make this part of the bigger refactor.

}

impl Collations {
/// `Collations` should work with and without claim queue support. If the claim queue runtime
/// api is available `GroupAssignments` the claim queue. If not - group assignments will contain
/// just one item (what's scheduled on the core).
///
/// Some of the logic in `Collations` relies on the claim queue and if it is not available
/// fallbacks to another logic. For this reason `Collations` needs to know if claim queue is
/// available or not.
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
///
/// Once claim queue runtime api is released everywhere this logic won't be needed anymore and
/// can be cleaned up.
pub(super) fn new(group_assignments: &Vec<ParaId>, has_claim_queue: bool) -> Self {
let mut claims_per_para = BTreeMap::new();
let mut claim_queue_state = Vec::with_capacity(group_assignments.len());

for para_id in group_assignments {
*claims_per_para.entry(*para_id).or_default() += 1;
claim_queue_state.push((false, *para_id));
}

// Not optimal but if the claim queue is not available `group_assignments` will have just
// one element. Can be fixed once claim queue api is released everywhere and the fallback
// code is cleaned up.
let claim_queue_state = if has_claim_queue { Some(claim_queue_state) } else { None };

Self {
status: Default::default(),
fetching_from: None,
waiting_queue: Default::default(),
seconded_count: 0,
fetched_per_para: Default::default(),
claims_per_para,
claim_queue_state,
}
}

/// Note a seconded collation for a given para.
pub(super) fn note_seconded(&mut self) {
self.seconded_count += 1
}

// Note a collation which has been successfully fetched.
pub(super) fn note_fetched(&mut self, para_id: ParaId) {
// update the number of fetched collations for the para_id
*self.fetched_per_para.entry(para_id).or_default() += 1;

// and the claim queue state
if let Some(claim_queue_state) = self.claim_queue_state.as_mut() {
for (satisfied, assignment) in claim_queue_state {
if *satisfied {
continue
}

if assignment == &para_id {
*satisfied = true;
break
}
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

/// Returns the next collation to fetch from the `waiting_queue`.
///
/// This will reset the status back to `Waiting` using [`CollationStatus::back_to_waiting`].
Expand All @@ -247,6 +322,8 @@ impl Collations {
&mut self,
finished_one: &(CollatorId, Option<CandidateHash>),
relay_parent_mode: ProspectiveParachainsMode,
group_assignments: &Vec<ParaId>,
pending_fetches: &BTreeMap<ParaId, usize>,
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
) -> Option<(PendingCollation, CollatorId)> {
// If finished one does not match waiting_collation, then we already dequeued another fetch
// to replace it.
Expand All @@ -267,33 +344,167 @@ impl Collations {
self.status.back_to_waiting(relay_parent_mode);

match self.status {
// We don't need to fetch any other collation when we already have seconded one.
// If async backing is enabled `back_to_waiting` will change `Seconded` state to
// `Waiting` so that we can fetch more collations. If async backing is disabled we can't
// fetch more than one collation per relay parent so `None` is returned.
CollationStatus::Seconded => None,
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
CollationStatus::Waiting =>
if self.is_seconded_limit_reached(relay_parent_mode) {
None
} else {
self.waiting_queue.pop_front()
},
self.pick_a_collation_to_fetch(&group_assignments, pending_fetches),
CollationStatus::WaitingOnValidation | CollationStatus::Fetching =>
unreachable!("We have reset the status above!"),
}
}

/// Checks the limit of seconded candidates.
pub(super) fn is_seconded_limit_reached(
/// Checks if another collation can be accepted. The number of collations that can be fetched
/// per parachain is limited by the entries in claim queue for the `ParaId` in question.
///
/// If prospective parachains mode is not enabled then we fall back to synchronous backing. In
/// this case there is a limit of 1 collation per relay parent.
///
/// If prospective parachains mode is enabled but claim queue is not supported then up to
/// `max_candidate_depth + 1` seconded collations are accepted. In theory in this case if two
/// parachains are sharing a core no fairness is guaranteed between them and the faster one can
/// starve the slower one by exhausting the limit with its own advertisements. In practice this
/// should not happen because core sharing implies core time support which implies the claim
/// queue being available.
pub(super) fn is_collations_limit_reached(
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
&self,
relay_parent_mode: ProspectiveParachainsMode,
para_id: ParaId,
num_pending_fetches: usize,
) -> bool {
let seconded_limit =
if let ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } =
relay_parent_mode
match relay_parent_mode {
ProspectiveParachainsMode::Disabled => {
gum::trace!(
target: LOG_TARGET,
?para_id,
seconded_count=self.seconded_count,
"is_collations_limit_reached - ProspectiveParachainsMode::Disabled"
);

self.seconded_count >= 1
},
ProspectiveParachainsMode::Enabled { max_candidate_depth, allowed_ancestry_len: _ }
if !self.claim_queue_state.is_some() =>
{
max_candidate_depth + 1
} else {
1
};
self.seconded_count >= seconded_limit
gum::trace!(
target: LOG_TARGET,
?para_id,
seconded_count=self.seconded_count,
max_candidate_depth,
"is_collations_limit_reached - ProspectiveParachainsMode::Enabled without claim queue support"
);

self.seconded_count > max_candidate_depth
},
ProspectiveParachainsMode::Enabled {
max_candidate_depth: _,
allowed_ancestry_len: _,
} => {
// Successful fetches + pending fetches < claim queue entries for `para_id`
let respected_per_para_limit =
self.claims_per_para.get(&para_id).copied().unwrap_or_default() >
self.fetched_per_para.get(&para_id).copied().unwrap_or_default() +
num_pending_fetches;

gum::trace!(
target: LOG_TARGET,
?para_id,
claims_per_para=?self.claims_per_para,
fetched_per_para=?self.fetched_per_para,
?num_pending_fetches,
?respected_per_para_limit,
"is_collations_limit_reached - ProspectiveParachainsMode::Enabled with claim queue support"
);

!respected_per_para_limit
},
}
}

/// Adds a new collation to the waiting queue for the relay parent. This function doesn't
/// perform any limits check. The caller (`enqueue_collation`) should assure that the collation
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
/// limit is respected.
pub(super) fn add_to_waiting_queue(&mut self, collation: (PendingCollation, CollatorId)) {
self.waiting_queue.entry(collation.0.para_id).or_default().push_back(collation);
}

/// Picks a collation to fetch from the waiting queue.
/// When fetching collations we need to ensure that each parachain has got a fair core time
/// share depending on its assignments in the claim queue. This means that the number of
/// collations fetched per parachain should ideally be equal to the number of claims for the
/// particular parachain in the claim queue.
///
/// To achieve this each parachain with at an entry in the `waiting_queue` has got a score
/// calculated by dividing the number of fetched collations by the number of entries in the
/// claim queue. Lower score means higher fetching priority. Note that if a parachain hasn't got
/// anything fetched at this relay parent it will have score 0 which means highest priority. If
/// two parachains has got the same score the one which is earlier in the claim queue will be
/// picked.
///
/// If claim queue is not supported then `group_assignment` should contain just one element and
/// the score won't matter. In this case collations will be fetched in the order they were
/// received.
///
/// Note: `group_assignments` is needed just for the fall back logic. It should be removed once
/// claim queue runtime api is released everywhere since it will be redundant - claim queue will
/// already be available in `self.claim_queue_state`.
fn pick_a_collation_to_fetch(
&mut self,
group_assignments: &Vec<ParaId>,
pending_fetches: &BTreeMap<ParaId, usize>,
) -> Option<(PendingCollation, CollatorId)> {
gum::trace!(
target: LOG_TARGET,
waiting_queue=?self.waiting_queue,
fetched_per_para=?self.fetched_per_para,
claims_per_para=?self.claims_per_para,
?group_assignments,
"Pick a collation to fetch."
);

let claim_queue_state = match self.claim_queue_state.as_mut() {
Some(cqs) => cqs,
// Fallback if claim queue is not available. There is only one assignment in
// `group_assignments` so fetch the first advertisement for it and return.
None =>
if let Some(assigned_para_id) = group_assignments.first() {
return self
.waiting_queue
.get_mut(assigned_para_id)
.and_then(|collations| collations.pop_front())
} else {
unreachable!("Group assignments should contain at least one element.")
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
},
};

let mut pending_fetches = pending_fetches.clone();

for (fulfilled, assignment) in claim_queue_state {
// if this assignment has been already fulfilled - move on
if *fulfilled {
continue
}

// if there is a pending fetch for this assignment, we should consider it satisfied and
// proceed with the next
if let Some(pending_fetch) = pending_fetches.get_mut(assignment) {
if *pending_fetch > 0 {
*pending_fetch -= 1;
continue
}
}

// we have found and unfulfilled assignment - try to fulfill it
if let Some(collations) = self.waiting_queue.get_mut(assignment) {
if let Some(collation) = collations.pop_front() {
// we don't mark the entry as fulfilled because it is considered pending
return Some(collation)
}
}
}

None
}
}

Expand Down
Loading
Loading