Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collation fetching fairness #4880

Open
wants to merge 59 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
f4738dc
Collation fetching fairness
tdimitrov Jun 26, 2024
c7074da
Comments
tdimitrov Jun 26, 2024
73eee87
Fix tests and add some logs
tdimitrov Jun 26, 2024
fa321ce
Fix per para limit calculation in `is_collations_limit_reached`
tdimitrov Jun 27, 2024
96392a5
Fix default `TestState` initialization: claim queue len should be equ…
tdimitrov Jun 27, 2024
0f28aa8
clippy
tdimitrov Jun 27, 2024
e5ea548
Update `is_collations_limit_reached` - remove seconded limit
tdimitrov Jun 28, 2024
9abc898
Fix pending fetches and more tests
tdimitrov Jul 1, 2024
c07890b
Remove unnecessary clone
tdimitrov Jul 1, 2024
e50440e
Comments
tdimitrov Jul 1, 2024
42b05c7
Better var names
tdimitrov Jul 1, 2024
2f5a466
Fix `pick_a_collation_to_fetch` and add more tests
tdimitrov Jul 2, 2024
ff96ef9
Fix test: `collation_fetching_respects_claim_queue`
tdimitrov Jul 2, 2024
e837689
Add `collation_fetching_fallback_works` test + comments
tdimitrov Jul 2, 2024
91cdd13
More tests
tdimitrov Jul 3, 2024
9f2d59b
Fix collation limit fallback
tdimitrov Jul 3, 2024
a10c86d
Separate `claim_queue_support` from `ProspectiveParachainsMode`
tdimitrov Jul 3, 2024
b39858a
Fix comments and add logs
tdimitrov Jul 3, 2024
b30f340
Update test: `collation_fetching_prefer_entries_earlier_in_claim_queue`
tdimitrov Jul 3, 2024
c0f18b9
Fix `pick_a_collation_to_fetch` and more tests
tdimitrov Jul 3, 2024
703ed6d
Merge branch 'master' into tsv-collator-proto-fairness
tdimitrov Jul 3, 2024
fba7ca6
Fix `pick_a_collation_to_fetch` - iter 1
tdimitrov Jul 4, 2024
d4f4ce2
Fix `pick_a_collation_to_fetch` - iter 2
tdimitrov Jul 4, 2024
5f52712
Remove a redundant runtime version check
tdimitrov Jul 4, 2024
6c73e24
formatting and comments
tdimitrov Jul 4, 2024
752f3cc
pr doc
tdimitrov Jul 4, 2024
f0069f1
add license
tdimitrov Jul 4, 2024
6b9f0b3
clippy
tdimitrov Jul 4, 2024
5f6dcdd
Merge branch 'master' into tsv-collator-proto-fairness
tdimitrov Jul 4, 2024
b8c1b85
Update prdoc/pr_4880.prdoc
tdimitrov Jul 5, 2024
f26362f
Limit collations based on seconded count instead of number of fetches
tdimitrov Jul 7, 2024
d6857fc
Undo rename: is_seconded_limit_reached
tdimitrov Jul 7, 2024
cde28cd
fix collation tests
tdimitrov Jul 8, 2024
4c3db2a
`collations_fetching_respects_seconded_limit` test
tdimitrov Jul 8, 2024
b2bbdfe
nits
tdimitrov Jul 8, 2024
e220cb4
Merge branch 'master' into tsv-collator-proto-fairness
tdimitrov Aug 26, 2024
01d121e
Remove duplicated dependency after merge
tdimitrov Aug 26, 2024
7b3c002
Remove `ProspectiveParachainsMode` from collator-protocol, validator-…
tdimitrov Jul 10, 2024
5dffdde
Fix compilation errors in collation tests
tdimitrov Jul 10, 2024
1c1744b
`is_seconded_limit_reached` uses the whole view
tdimitrov Jul 11, 2024
aaccab1
Fix `is_seconded_limit_reached` check
tdimitrov Aug 30, 2024
b1df2e3
Trace logs useful for debugging tests
tdimitrov Sep 11, 2024
ce3a95e
Handle unconnected candidates
tdimitrov Sep 11, 2024
fe3c09d
Rework pre-prospective parachains tests to work with claim queue
tdimitrov Sep 11, 2024
b9ab579
Fix `collation_fetches_without_claimqueue`
tdimitrov Sep 12, 2024
fe623bc
Test - `collation_fetching_prefer_entries_earlier_in_claim_queue`
tdimitrov Sep 13, 2024
d216689
Remove collations test file - all tests are moved in prospective_para…
tdimitrov Sep 13, 2024
ea99c7a
fixup - collation_fetching_prefer_entries_earlier_in_claim_queue
tdimitrov Sep 16, 2024
ee155f5
New test - `collation_fetching_considers_advertisements_from_the_whol…
tdimitrov Sep 16, 2024
55b7902
Merge branch 'master' into tsv-collator-proto-fairness
tdimitrov Sep 17, 2024
515a784
Update PRdoc and comments
tdimitrov Sep 17, 2024
4ef6919
Combine `seconded_per_para` and `claims_per_para` from collations in …
tdimitrov Sep 17, 2024
bd7174f
No need to handle missing claim queue anymore
tdimitrov Sep 17, 2024
df6165e
Remove dead code and fix some comments
tdimitrov Sep 17, 2024
4c5c271
Remove `is_seconded_limit_reached` and use the code directly due to t…
tdimitrov Sep 17, 2024
b0e4627
Fix comments
tdimitrov Sep 17, 2024
d1cf41d
`pending_for_para` -> `is_pending_for_para`
tdimitrov Sep 17, 2024
df3a215
Fix `0011-async-backing-6-seconds-rate.toml` - set `lookahead` to 3 o…
tdimitrov Sep 19, 2024
b70807b
Set `lookahead` in polkadot/zombienet_tests/elastic_scaling/0002-elas…
tdimitrov Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 112 additions & 13 deletions polkadot/node/network/collator-protocol/src/validator_side/collation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@
//! ┌──────────────────────────────────────────┐
//! └─▶Advertised ─▶ Pending ─▶ Fetched ─▶ Validated
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved

use std::{collections::VecDeque, future::Future, pin::Pin, task::Poll};
use std::{
collections::{BTreeMap, VecDeque},
future::Future,
pin::Pin,
task::Poll,
};

use futures::{future::BoxFuture, FutureExt};
use polkadot_node_network_protocol::{
Expand All @@ -48,6 +53,8 @@ use tokio_util::sync::CancellationToken;

use crate::{error::SecondingError, LOG_TARGET};

use super::GroupAssignments;

/// Candidate supplied with a para head it's built on top of.
#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)]
pub struct ProspectiveCandidate {
Expand Down Expand Up @@ -216,7 +223,6 @@ impl CollationStatus {
}

/// Information about collations per relay parent.
#[derive(Default)]
pub struct Collations {
/// What is the current status in regards to a collation for this relay parent?
pub status: CollationStatus,
Expand All @@ -225,18 +231,44 @@ pub struct Collations {
/// This is the currently last started fetch, which did not exceed `MAX_UNSHARED_DOWNLOAD_TIME`
/// yet.
pub fetching_from: Option<(CollatorId, Option<CandidateHash>)>,
/// Collation that were advertised to us, but we did not yet fetch.
pub waiting_queue: VecDeque<(PendingCollation, CollatorId)>,
/// Collation that were advertised to us, but we did not yet fetch. Grouped by `ParaId`.
waiting_queue: BTreeMap<ParaId, VecDeque<(PendingCollation, CollatorId)>>,
/// How many collations have been seconded.
pub seconded_count: usize,
/// What collations were fetched so far for this relay parent.
fetched_per_para: BTreeMap<ParaId, usize>,
// Claims per `ParaId` for the assigned core at the relay parent. This information is obtained
// from the claim queue.
claims_per_para: BTreeMap<ParaId, usize>,
}

impl Collations {
pub(super) fn new(assignments: &Vec<ParaId>) -> Self {
let mut claims_per_para = BTreeMap::new();
for para_id in assignments {
*claims_per_para.entry(*para_id).or_default() += 1;
}

Self {
status: Default::default(),
fetching_from: None,
waiting_queue: Default::default(),
seconded_count: 0,
fetched_per_para: Default::default(),
claims_per_para,
}
}

/// Note a seconded collation for a given para.
pub(super) fn note_seconded(&mut self) {
self.seconded_count += 1
}

// Note a collation which has been successfully fetched.
pub(super) fn note_fetched(&mut self, para_id: ParaId) {
*self.fetched_per_para.entry(para_id).or_default() += 1
}

/// Returns the next collation to fetch from the `waiting_queue`.
///
/// This will reset the status back to `Waiting` using [`CollationStatus::back_to_waiting`].
Expand All @@ -247,6 +279,7 @@ impl Collations {
&mut self,
finished_one: &(CollatorId, Option<CandidateHash>),
relay_parent_mode: ProspectiveParachainsMode,
assignments: &GroupAssignments,
) -> Option<(PendingCollation, CollatorId)> {
// If finished one does not match waiting_collation, then we already dequeued another fetch
// to replace it.
Expand All @@ -269,21 +302,20 @@ impl Collations {
match self.status {
// We don't need to fetch any other collation when we already have seconded one.
CollationStatus::Seconded => None,
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
CollationStatus::Waiting =>
if self.is_seconded_limit_reached(relay_parent_mode) {
None
} else {
self.waiting_queue.pop_front()
},
CollationStatus::Waiting => self.pick_a_collation_to_fetch(&assignments.current),
CollationStatus::WaitingOnValidation | CollationStatus::Fetching =>
unreachable!("We have reset the status above!"),
}
}

/// Checks the limit of seconded candidates.
pub(super) fn is_seconded_limit_reached(
/// Checks if another collation can be accepted. There are two limits:
/// 1. The number of collations that can be seconded.
/// 2. The number of collations that can be fetched per parachain. This is based on the number
/// of entries in the claim queue.
pub(super) fn is_collations_limit_reached(
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
&self,
relay_parent_mode: ProspectiveParachainsMode,
para_id: ParaId,
) -> bool {
let seconded_limit =
if let ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } =
Expand All @@ -293,7 +325,74 @@ impl Collations {
} else {
1
};
self.seconded_count >= seconded_limit

let respected_per_para_limit =
self.claims_per_para.get(&para_id).copied().unwrap_or_default() >=
self.fetched_per_para.get(&para_id).copied().unwrap_or_default();

self.seconded_count >= seconded_limit || !respected_per_para_limit
}

/// Adds a new collation to the waiting queue for the relay parent. This function doesn't
/// perform any limits check. The caller (`enqueue_collation`) should assure that the collation
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
/// can be enqueued.
pub(super) fn add_to_waiting_queue(&mut self, collation: (PendingCollation, CollatorId)) {
self.waiting_queue.entry(collation.0.para_id).or_default().push_back(collation);
}

/// Picks a collation to fetch from the waiting queue.
/// When fetching collations we need to ensure that each parachain has got a fair core time
/// share depending on its assignments in the claim queue. This means that the number of
/// collations fetched per parachain should ideally be equal to (but not bigger than) the number
/// of claims for the particular parachain in the claim queue.
///
/// To achieve this each parachain with at an entry in the `waiting_queue` has got a score
/// calculated by dividing the number of fetched collations by the number of entries in the
/// claim queue. Lower the score means higher fetching priority. Note that if a parachain hasn't
/// got anything fetched at this relay parent it will have score 0 which means highest priority.
///
/// If two parachains has got the same score the one which is earlier in the claim queue will be
/// picked.
fn pick_a_collation_to_fetch(
&mut self,
claims: &Vec<ParaId>,
) -> Option<(PendingCollation, CollatorId)> {
// Find the parachain(s) with the lowest score.
let mut lowest_score = None;
for (para_id, collations) in &mut self.waiting_queue {
let para_score = self
.fetched_per_para
.get(para_id)
.copied()
.unwrap_or_default()
.saturating_div(self.claims_per_para.get(para_id).copied().unwrap_or_default());

match lowest_score {
Some((score, _)) if para_score < score =>
lowest_score = Some((para_score, vec![(para_id, collations)])),
Some((_, ref mut paras)) => {
paras.push((para_id, collations));
},
None => lowest_score = Some((para_score, vec![(para_id, collations)])),
}
}

if let Some((_, mut lowest_score)) = lowest_score {
for claim in claims {
if let Some((_, collations)) = lowest_score.iter_mut().find(|(id, _)| *id == claim)
{
match collations.pop_front() {
Some(collation) => return Some(collation),
None => {
unreachable!("Collation can't be empty!")
},
}
}
}
unreachable!("All entries in waiting_queue should also be in claim queue")
} else {
None
}
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
65 changes: 30 additions & 35 deletions polkadot/node/network/collator-protocol/src/validator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,9 @@ struct PerRelayParent {
}

impl PerRelayParent {
fn new(mode: ProspectiveParachainsMode) -> Self {
Self {
prospective_parachains_mode: mode,
assignment: GroupAssignments { current: vec![] },
collations: Collations::default(),
}
fn new(mode: ProspectiveParachainsMode, assignments: GroupAssignments) -> Self {
let collations = Collations::new(&assignments.current);
Self { prospective_parachains_mode: mode, assignment: assignments, collations }
}
}

Expand Down Expand Up @@ -467,12 +464,11 @@ fn is_relay_parent_in_implicit_view(

async fn assign_incoming<Sender>(
sender: &mut Sender,
group_assignment: &mut GroupAssignments,
current_assignments: &mut HashMap<ParaId, usize>,
keystore: &KeystorePtr,
relay_parent: Hash,
relay_parent_mode: ProspectiveParachainsMode,
) -> Result<()>
) -> Result<GroupAssignments>
where
Sender: CollatorProtocolSenderTrait,
{
Expand All @@ -499,7 +495,7 @@ where
rotation_info.core_for_group(group, cores.len())
} else {
gum::trace!(target: LOG_TARGET, ?relay_parent, "Not a validator");
return Ok(())
return Ok(GroupAssignments { current: Vec::new() })
};

let paras_now = match fetch_claim_queue(sender, relay_parent).await.map_err(Error::Runtime)? {
Expand Down Expand Up @@ -532,9 +528,7 @@ where
}
}

*group_assignment = GroupAssignments { current: paras_now.into_iter().collect() };

Ok(())
Ok(GroupAssignments { current: paras_now.into_iter().collect::<Vec<ParaId>>() })
}

fn remove_outgoing(
Expand Down Expand Up @@ -1107,7 +1101,10 @@ where
)
.map_err(AdvertisementError::Invalid)?;

if per_relay_parent.collations.is_seconded_limit_reached(relay_parent_mode) {
if per_relay_parent
.collations
.is_collations_limit_reached(relay_parent_mode, para_id)
{
return Err(AdvertisementError::SecondedLimitReached)
}

Expand Down Expand Up @@ -1199,7 +1196,7 @@ where
});

let collations = &mut per_relay_parent.collations;
if collations.is_seconded_limit_reached(relay_parent_mode) {
if collations.is_collations_limit_reached(relay_parent_mode, para_id) {
gum::trace!(
target: LOG_TARGET,
peer_id = ?peer_id,
Expand All @@ -1222,14 +1219,16 @@ where
?relay_parent,
"Added collation to the pending list"
);
collations.waiting_queue.push_back((pending_collation, collator_id));
collations.add_to_waiting_queue((pending_collation, collator_id));
},
CollationStatus::Waiting => {
// We were waiting for a collation to be advertised to us (we were idle) so we can fetch
// the new collation immediately
fetch_collation(sender, state, pending_collation, collator_id).await?;
},
CollationStatus::Seconded if relay_parent_mode.is_enabled() => {
// Limit is not reached, it's allowed to second another
// collation.
// Limit is not reached (checked with `is_collations_limit_reached` before the match
// expression), it's allowed to second another collation.
fetch_collation(sender, state, pending_collation, collator_id).await?;
},
CollationStatus::Seconded => {
Expand Down Expand Up @@ -1270,19 +1269,11 @@ where
state.span_per_relay_parent.insert(*leaf, per_leaf_span);
}

let mut per_relay_parent = PerRelayParent::new(mode);
assign_incoming(
sender,
&mut per_relay_parent.assignment,
&mut state.current_assignments,
keystore,
*leaf,
mode,
)
.await?;
let assignments =
assign_incoming(sender, &mut state.current_assignments, keystore, *leaf, mode).await?;

state.active_leaves.insert(*leaf, mode);
state.per_relay_parent.insert(*leaf, per_relay_parent);
state.per_relay_parent.insert(*leaf, PerRelayParent::new(mode, assignments));

if mode.is_enabled() {
state
Expand All @@ -1298,18 +1289,16 @@ where
.unwrap_or_default();
for block_hash in allowed_ancestry {
if let Entry::Vacant(entry) = state.per_relay_parent.entry(*block_hash) {
let mut per_relay_parent = PerRelayParent::new(mode);
assign_incoming(
let assignments = assign_incoming(
sender,
&mut per_relay_parent.assignment,
&mut state.current_assignments,
keystore,
*block_hash,
mode,
)
.await?;

entry.insert(per_relay_parent);
entry.insert(PerRelayParent::new(mode, assignments));
}
}
}
Expand Down Expand Up @@ -1665,6 +1654,10 @@ async fn run_inner<Context>(

let CollationEvent {collator_id, pending_collation, .. } = res.collation_event.clone();

state.per_relay_parent.get_mut(&pending_collation.relay_parent).map(|rp_state| {
rp_state.collations.note_fetched(pending_collation.para_id);
});

match kick_off_seconding(&mut ctx, &mut state, res).await {
Err(err) => {
gum::warn!(
Expand Down Expand Up @@ -1737,9 +1730,11 @@ async fn dequeue_next_collation_and_fetch<Context>(
previous_fetch: (CollatorId, Option<CandidateHash>),
) {
while let Some((next, id)) = state.per_relay_parent.get_mut(&relay_parent).and_then(|state| {
state
.collations
.get_next_collation_to_fetch(&previous_fetch, state.prospective_parachains_mode)
state.collations.get_next_collation_to_fetch(
&previous_fetch,
state.prospective_parachains_mode,
&state.assignment,
)
}) {
gum::debug!(
target: LOG_TARGET,
Expand Down
Loading