Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Request based availability distribution (#2423)
Browse files Browse the repository at this point in the history
* WIP

* availability distribution, still very wip.

Work on the requesting side of things.

* Some docs on what I intend to do.

* Checkpoint of session cache implementation

as I will likely replace it with something smarter.

* More work, mostly on cache

and getting things to type check.

* Only derive MallocSizeOf and Debug for std.

* availability-distribution: Cache feature complete.

* Sketch out logic in `FetchTask` for actual fetching.

- Compile fixes.
- Cleanup.

* Format cleanup.

* More format fixes.

* Almost feature complete `fetch_task`.

Missing:

- Check for cancel
- Actual querying of peer ids.

* Finish FetchTask so far.

* Directly use AuthorityDiscoveryId in protocol and cache.

* Resolve `AuthorityDiscoveryId` on sending requests.

* Rework fetch_task

- also make it impossible to check the wrong chunk index.
- Export needed function in validator_discovery.

* From<u32> implementation for `ValidatorIndex`.

* Fixes and more integration work.

* Make session cache proper lru cache.

* Use proper lru cache.

* Requester finished.

* ProtocolState -> Requester

Also make sure to not fetch our own chunk.

* Cleanup + fixes.

* Remove unused functions

- FetchTask::is_finished
- SessionCache::fetch_session_info

* availability-distribution responding side.

* Cleanup + Fixes.

* More fixes.

* More fixes.

adder-collator is running!

* Some docs.

* Docs.

* Fix reporting of bad guys.

* Fix tests

* Make all tests compile.

* Fix test.

* Cleanup + get rid of some warnings.

* state -> requester

* Mostly doc fixes.

* Fix test suite.

* Get rid of now redundant message types.

* WIP

* Rob's review remarks.

* Fix test suite.

* core.relay_parent -> leaf for session request.

* Style fix.

* Decrease request timeout.

* Cleanup obsolete errors.

* Metrics + don't fail on non fatal errors.

* requester.rs -> requester/mod.rs

* Panic on invalid BadValidator report.

* Fix indentation.

* Use typed default timeout constant.

* Make channel size 0, as each sender gets one slot anyways.

* Fix incorrect metrics initialization.

* Fix build after merge.

* More fixes.

* Hopefully valid metrics names.

* Better metrics names.

* Some tests that already work.

* Slightly better docs.

* Some more tests.

* Fix network bridge test.
  • Loading branch information
eskimor authored Feb 26, 2021
1 parent ee86a21 commit 950447e
Show file tree
Hide file tree
Showing 45 changed files with 2,032 additions and 1,518 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 35 additions & 35 deletions node/core/approval-voting/src/approval_checking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ pub fn tranches_to_approve(
assignments.iter()
.map(|(v_index, tick)| (v_index, tick.saturating_sub(clock_drift) + no_show_duration))
.filter(|&(v_index, no_show_at)| {
let has_approved = approvals.get(*v_index as usize).map(|b| *b).unwrap_or(false);
let has_approved = approvals.get(v_index.0 as usize).map(|b| *b).unwrap_or(false);

let is_no_show = !has_approved && no_show_at <= drifted_tick_now;

Expand Down Expand Up @@ -348,7 +348,7 @@ pub fn tranches_to_approve(
mod tests {
use super::*;

use polkadot_primitives::v1::GroupIndex;
use polkadot_primitives::v1::{GroupIndex, ValidatorIndex};
use bitvec::bitvec;
use bitvec::order::Lsb0 as BitOrderLsb0;

Expand Down Expand Up @@ -393,7 +393,7 @@ mod tests {
}.into();

for i in 0..6 {
candidate.mark_approval(i);
candidate.mark_approval(ValidatorIndex(i));
}

let approval_entry = approval_db::v1::ApprovalEntry {
Expand All @@ -406,7 +406,7 @@ mod tests {

assert!(!check_approval(&candidate, &approval_entry, RequiredTranches::All));

candidate.mark_approval(6);
candidate.mark_approval(ValidatorIndex(6));
assert!(check_approval(&candidate, &approval_entry, RequiredTranches::All));
}

Expand All @@ -420,22 +420,22 @@ mod tests {
}.into();

for i in 0..6 {
candidate.mark_approval(i);
candidate.mark_approval(ValidatorIndex(i));
}

let approval_entry = approval_db::v1::ApprovalEntry {
tranches: vec![
approval_db::v1::TrancheEntry {
tranche: 0,
assignments: (0..4).map(|i| (i, 0.into())).collect(),
assignments: (0..4).map(|i| (ValidatorIndex(i), 0.into())).collect(),
},
approval_db::v1::TrancheEntry {
tranche: 1,
assignments: (4..6).map(|i| (i, 1.into())).collect(),
assignments: (4..6).map(|i| (ValidatorIndex(i), 1.into())).collect(),
},
approval_db::v1::TrancheEntry {
tranche: 2,
assignments: (6..10).map(|i| (i, 0.into())).collect(),
assignments: (6..10).map(|i| (ValidatorIndex(i), 0.into())).collect(),
},
],
assignments: bitvec![BitOrderLsb0, u8; 1; 10],
Expand Down Expand Up @@ -487,13 +487,13 @@ mod tests {
approved: false,
}.into();

approval_entry.import_assignment(0, 0, block_tick);
approval_entry.import_assignment(0, 1, block_tick);
approval_entry.import_assignment(0,ValidatorIndex(0), block_tick);
approval_entry.import_assignment(0,ValidatorIndex(1), block_tick);

approval_entry.import_assignment(1, 2, block_tick + 1);
approval_entry.import_assignment(1, 3, block_tick + 1);
approval_entry.import_assignment(1,ValidatorIndex(2), block_tick + 1);
approval_entry.import_assignment(1,ValidatorIndex(3), block_tick + 1);

approval_entry.import_assignment(2, 4, block_tick + 2);
approval_entry.import_assignment(2,ValidatorIndex(4), block_tick + 2);

let approvals = bitvec![BitOrderLsb0, u8; 1; 5];

Expand Down Expand Up @@ -524,8 +524,8 @@ mod tests {
approved: false,
}.into();

approval_entry.import_assignment(0, 0, block_tick);
approval_entry.import_assignment(1, 2, block_tick);
approval_entry.import_assignment(0, ValidatorIndex(0), block_tick);
approval_entry.import_assignment(1, ValidatorIndex(2), block_tick);

let approvals = bitvec![BitOrderLsb0, u8; 0; 10];

Expand Down Expand Up @@ -562,10 +562,10 @@ mod tests {
approved: false,
}.into();

approval_entry.import_assignment(0, 0, block_tick);
approval_entry.import_assignment(0, 1, block_tick);
approval_entry.import_assignment(0, ValidatorIndex(0), block_tick);
approval_entry.import_assignment(0, ValidatorIndex(1), block_tick);

approval_entry.import_assignment(1, 2, block_tick);
approval_entry.import_assignment(1, ValidatorIndex(2), block_tick);

let mut approvals = bitvec![BitOrderLsb0, u8; 0; 10];
approvals.set(0, true);
Expand Down Expand Up @@ -605,11 +605,11 @@ mod tests {
approved: false,
}.into();

approval_entry.import_assignment(0, 0, block_tick);
approval_entry.import_assignment(0, 1, block_tick);
approval_entry.import_assignment(0, ValidatorIndex(0), block_tick);
approval_entry.import_assignment(0, ValidatorIndex(1), block_tick);

approval_entry.import_assignment(1, 2, block_tick);
approval_entry.import_assignment(1, 3, block_tick);
approval_entry.import_assignment(1, ValidatorIndex(2), block_tick);
approval_entry.import_assignment(1, ValidatorIndex(3), block_tick);

let mut approvals = bitvec![BitOrderLsb0, u8; 0; n_validators];
approvals.set(0, true);
Expand Down Expand Up @@ -670,14 +670,14 @@ mod tests {
approved: false,
}.into();

approval_entry.import_assignment(0, 0, block_tick);
approval_entry.import_assignment(0, 1, block_tick);
approval_entry.import_assignment(0, ValidatorIndex(0), block_tick);
approval_entry.import_assignment(0, ValidatorIndex(1), block_tick);

approval_entry.import_assignment(1, 2, block_tick + 1);
approval_entry.import_assignment(1, 3, block_tick + 1);
approval_entry.import_assignment(1, ValidatorIndex(2), block_tick + 1);
approval_entry.import_assignment(1, ValidatorIndex(3), block_tick + 1);

approval_entry.import_assignment(2, 4, block_tick + no_show_duration + 2);
approval_entry.import_assignment(2, 5, block_tick + no_show_duration + 2);
approval_entry.import_assignment(2, ValidatorIndex(4), block_tick + no_show_duration + 2);
approval_entry.import_assignment(2, ValidatorIndex(5), block_tick + no_show_duration + 2);

let mut approvals = bitvec![BitOrderLsb0, u8; 0; n_validators];
approvals.set(0, true);
Expand Down Expand Up @@ -757,14 +757,14 @@ mod tests {
approved: false,
}.into();

approval_entry.import_assignment(0, 0, block_tick);
approval_entry.import_assignment(0, 1, block_tick);
approval_entry.import_assignment(0, ValidatorIndex(0), block_tick);
approval_entry.import_assignment(0, ValidatorIndex(1), block_tick);

approval_entry.import_assignment(1, 2, block_tick + 1);
approval_entry.import_assignment(1, 3, block_tick + 1);
approval_entry.import_assignment(1, ValidatorIndex(2), block_tick + 1);
approval_entry.import_assignment(1, ValidatorIndex(3), block_tick + 1);

approval_entry.import_assignment(2, 4, block_tick + no_show_duration + 2);
approval_entry.import_assignment(2, 5, block_tick + no_show_duration + 2);
approval_entry.import_assignment(2, ValidatorIndex(4), block_tick + no_show_duration + 2);
approval_entry.import_assignment(2, ValidatorIndex(5), block_tick + no_show_duration + 2);

let mut approvals = bitvec![BitOrderLsb0, u8; 0; n_validators];
approvals.set(0, true);
Expand Down Expand Up @@ -813,7 +813,7 @@ mod tests {
},
);

approval_entry.import_assignment(3, 6, block_tick);
approval_entry.import_assignment(3, ValidatorIndex(6), block_tick);
approvals.set(6, true);

let tranche_now = no_show_duration as DelayTranche + 3;
Expand Down
14 changes: 7 additions & 7 deletions node/core/approval-voting/src/criteria.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ pub(crate) fn compute_assignments(
let (index, assignments_key): (ValidatorIndex, AssignmentPair) = {
let key = config.assignment_keys.iter().enumerate()
.find_map(|(i, p)| match keystore.key_pair(p) {
Ok(Some(pair)) => Some((i as ValidatorIndex, pair)),
Ok(Some(pair)) => Some((ValidatorIndex(i as _), pair)),
Ok(None) => None,
Err(sc_keystore::Error::Unavailable) => None,
Err(sc_keystore::Error::Io(e)) if e.kind() == std::io::ErrorKind::NotFound => None,
Expand Down Expand Up @@ -422,7 +422,7 @@ pub(crate) fn check_assignment_cert(
backing_group: GroupIndex,
) -> Result<DelayTranche, InvalidAssignment> {
let validator_public = config.assignment_keys
.get(validator_index as usize)
.get(validator_index.0 as usize)
.ok_or(InvalidAssignment)?;

let public = schnorrkel::PublicKey::from_bytes(validator_public.as_slice())
Expand Down Expand Up @@ -540,7 +540,7 @@ mod tests {
(0..n_groups).map(|i| {
(i * size .. (i + 1) *size)
.chain(if i < big_groups { Some(scraps + i) } else { None })
.map(|j| j as ValidatorIndex)
.map(|j| ValidatorIndex(j as _))
.collect::<Vec<_>>()
}).collect()
}
Expand Down Expand Up @@ -570,7 +570,7 @@ mod tests {
Sr25519Keyring::Bob,
Sr25519Keyring::Charlie,
]),
validator_groups: vec![vec![0], vec![1, 2]],
validator_groups: vec![vec![ValidatorIndex(0)], vec![ValidatorIndex(1), ValidatorIndex(2)]],
n_cores: 2,
zeroth_delay_tranche_width: 10,
relay_vrf_modulo_samples: 3,
Expand Down Expand Up @@ -601,7 +601,7 @@ mod tests {
Sr25519Keyring::Bob,
Sr25519Keyring::Charlie,
]),
validator_groups: vec![vec![0], vec![1, 2]],
validator_groups: vec![vec![ValidatorIndex(0)], vec![ValidatorIndex(1), ValidatorIndex(2)]],
n_cores: 2,
zeroth_delay_tranche_width: 10,
relay_vrf_modulo_samples: 3,
Expand Down Expand Up @@ -693,7 +693,7 @@ mod tests {
group: group_for_core(core.0 as _),
cert: assignment.cert,
own_group: GroupIndex(0),
val_index: 0,
val_index: ValidatorIndex(0),
config: config.clone(),
};

Expand Down Expand Up @@ -743,7 +743,7 @@ mod tests {
#[test]
fn check_rejects_nonexistent_key() {
check_mutated_assignments(200, 100, 25, |m| {
m.val_index += 200;
m.val_index.0 += 200;
Some(false)
});
}
Expand Down
3 changes: 2 additions & 1 deletion node/core/approval-voting/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ mod tests {
use super::*;
use polkadot_node_subsystem_test_helpers::make_subsystem_context;
use polkadot_node_primitives::approval::{VRFOutput, VRFProof};
use polkadot_primitives::v1::ValidatorIndex;
use polkadot_subsystem::messages::AllMessages;
use sp_core::testing::TaskExecutor;
use sp_runtime::{Digest, DigestItem};
Expand Down Expand Up @@ -1561,7 +1562,7 @@ mod tests {
validators: vec![Sr25519Keyring::Alice.public().into(); 6],
discovery_keys: Vec::new(),
assignment_keys: Vec::new(),
validator_groups: vec![vec![0; 5], vec![0; 2]],
validator_groups: vec![vec![ValidatorIndex(0); 5], vec![ValidatorIndex(0); 2]],
n_cores: 6,
needed_approvals: 2,
zeroth_delay_tranche_width: irrelevant,
Expand Down
10 changes: 5 additions & 5 deletions node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ fn check_and_import_assignment(
tracing::trace!(
target: LOG_TARGET,
"Imported assignment from validator {} on candidate {:?}",
assignment.validator,
assignment.validator.0,
(assigned_candidate_hash, candidate_entry.candidate_receipt().descriptor.para_id),
);

Expand Down Expand Up @@ -928,7 +928,7 @@ fn check_and_import_approval<T>(
block_entry.session(),
);

let pubkey = match session_info.validators.get(approval.validator as usize) {
let pubkey = match session_info.validators.get(approval.validator.0 as usize) {
Some(k) => k,
None => respond_early!(ApprovalCheckResult::Bad)
};
Expand Down Expand Up @@ -1492,13 +1492,13 @@ async fn issue_approval(
}
};

let validator_pubkey = match session_info.validators.get(validator_index as usize) {
let validator_pubkey = match session_info.validators.get(validator_index.0 as usize) {
Some(p) => p,
None => {
tracing::warn!(
target: LOG_TARGET,
"Validator index {} out of bounds in session {}",
validator_index,
validator_index.0,
block_entry.session(),
);

Expand All @@ -1517,7 +1517,7 @@ async fn issue_approval(
tracing::warn!(
target: LOG_TARGET,
"Could not issue approval signature with validator index {} in session {}. Assignment key present but not validator key?",
validator_index,
validator_index.0,
block_entry.session(),
);

Expand Down
10 changes: 5 additions & 5 deletions node/core/approval-voting/src/persisted_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl ApprovalEntry {

/// Whether a validator is already assigned.
pub fn is_assigned(&self, validator_index: ValidatorIndex) -> bool {
self.assignments.get(validator_index as usize).map(|b| *b).unwrap_or(false)
self.assignments.get(validator_index.0 as usize).map(|b| *b).unwrap_or(false)
}

/// Import an assignment. No-op if already assigned on the same tranche.
Expand Down Expand Up @@ -143,7 +143,7 @@ impl ApprovalEntry {
};

self.tranches[idx].assignments.push((validator_index, tick_now));
self.assignments.set(validator_index as _, true);
self.assignments.set(validator_index.0 as _, true);
}

// Produce a bitvec indicating the assignments of all validators up to and
Expand All @@ -153,7 +153,7 @@ impl ApprovalEntry {
.take_while(|e| e.tranche <= tranche)
.fold(bitvec::bitvec![BitOrderLsb0, u8; 0; self.assignments.len()], |mut a, e| {
for &(v, _) in &e.assignments {
a.set(v as _, true);
a.set(v.0 as _, true);
}

a
Expand Down Expand Up @@ -235,8 +235,8 @@ impl CandidateEntry {

/// Note that a given validator has approved. Return the previous approval state.
pub fn mark_approval(&mut self, validator: ValidatorIndex) -> bool {
let prev = self.approvals.get(validator as usize).map(|b| *b).unwrap_or(false);
self.approvals.set(validator as usize, true);
let prev = self.approvals.get(validator.0 as usize).map(|b| *b).unwrap_or(false);
self.approvals.set(validator.0 as usize, true);
prev
}

Expand Down
Loading

0 comments on commit 950447e

Please sign in to comment.