Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
runtime: replace validator_discovery api with session_info
Browse files Browse the repository at this point in the history
  • Loading branch information
ordian committed Nov 24, 2020
1 parent 3d22556 commit 5304563
Show file tree
Hide file tree
Showing 14 changed files with 114 additions and 139 deletions.
15 changes: 8 additions & 7 deletions node/core/runtime-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ fn make_runtime_api_request<Client>(
Request::CandidatePendingAvailability(para, sender) =>
query!(candidate_pending_availability(para), sender),
Request::CandidateEvents(sender) => query!(candidate_events(), sender),
Request::ValidatorDiscovery(ids, sender) => query!(validator_discovery(ids), sender),
Request::SessionInfo(index, sender) => query!(session_info(index), sender),
Request::DmqContents(id, sender) => query!(dmq_contents(id), sender),
Request::InboundHrmpChannelsContents(id, sender) => query!(inbound_hrmp_channels_contents(id), sender),
}
Expand Down Expand Up @@ -201,8 +201,8 @@ mod tests {
use polkadot_primitives::v1::{
ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, PersistedValidationData,
Id as ParaId, OccupiedCoreAssumption, ValidationData, SessionIndex, ValidationCode,
CommittedCandidateReceipt, CandidateEvent, AuthorityDiscoveryId, InboundDownwardMessage,
BlockNumber, InboundHrmpMessage,
CommittedCandidateReceipt, CandidateEvent, InboundDownwardMessage,
BlockNumber, InboundHrmpMessage, SessionInfo,
};
use polkadot_node_subsystem_test_helpers as test_helpers;
use sp_core::testing::TaskExecutor;
Expand All @@ -216,6 +216,7 @@ mod tests {
availability_cores: Vec<CoreState>,
validation_data: HashMap<ParaId, ValidationData>,
session_index_for_child: SessionIndex,
session_info: HashMap<SessionIndex, SessionInfo>,
validation_code: HashMap<ParaId, ValidationCode>,
historical_validation_code: HashMap<ParaId, Vec<(BlockNumber, ValidationCode)>>,
validation_outputs_results: HashMap<ParaId, bool>,
Expand Down Expand Up @@ -289,6 +290,10 @@ mod tests {
self.session_index_for_child.clone()
}

fn session_info(&self, index: SessionIndex) -> Option<SessionInfo> {
self.session_info.get(&index).cloned()
}

fn validation_code(
&self,
para: ParaId,
Expand Down Expand Up @@ -321,10 +326,6 @@ mod tests {
self.candidate_events.clone()
}

fn validator_discovery(ids: Vec<ValidatorId>) -> Vec<Option<AuthorityDiscoveryId>> {
vec![None; ids.len()]
}

fn dmq_contents(
&self,
recipient: ParaId,
Expand Down
46 changes: 24 additions & 22 deletions node/network/collator-protocol/src/collator_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ mod tests {
use polkadot_primitives::v1::{
BlockData, CandidateDescriptor, CollatorPair, ScheduledCore,
ValidatorIndex, GroupRotationInfo, AuthorityDiscoveryId,
SessionInfo,
};
use polkadot_subsystem::{ActiveLeavesUpdate, messages::{RuntimeApiMessage, RuntimeApiRequest}};
use polkadot_node_subsystem_util::TimeoutExt;
Expand Down Expand Up @@ -870,20 +871,6 @@ mod tests {
.collect()
}

fn next_group_validator_ids(&self) -> Vec<ValidatorId> {
self.next_group_validator_indices()
.iter()
.map(|i| self.validator_public[*i as usize].clone())
.collect()
}

/// Returns the unique count of validators in the current and next group.
fn current_and_next_group_unique_validator_count(&self) -> usize {
let mut indices = self.next_group_validator_indices().iter().collect::<HashSet<_>>();
indices.extend(self.current_group_validator_indices());
indices.len()
}

/// Generate a new relay parent and inform the subsystem about the new view.
///
/// If `merge_views == true` it means the subsystem will be informed that we working on the old `relay_parent`
Expand Down Expand Up @@ -1085,25 +1072,40 @@ mod tests {
}
);

let current_index = 1;

// obtain the validator_id to authority_id mapping
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::ValidatorDiscovery(validators, tx),
RuntimeApiRequest::SessionIndexForChild(tx),
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
assert_eq!(validators.len(), test_state.current_and_next_group_unique_validator_count());
tx.send(Ok(current_index)).unwrap();
}
);

let current_validators = test_state.current_group_validator_ids();
let next_validators = test_state.next_group_validator_ids();
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::SessionInfo(index, tx),
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
assert_eq!(index, current_index);

assert!(validators.iter().all(|v| current_validators.contains(&v) || next_validators.contains(&v)));
let validators = test_state.current_group_validator_ids();
let current_discovery_keys = test_state.current_group_validator_authority_ids();
let next_discovery_keys = test_state.next_group_validator_authority_ids();

let current_validators = test_state.current_group_validator_authority_ids();
let next_validators = test_state.next_group_validator_authority_ids();
let discovery_keys = [&current_discovery_keys[..], &next_discovery_keys[..]].concat();

tx.send(Ok(current_validators.into_iter().chain(next_validators).map(Some).collect())).unwrap();
tx.send(Ok(Some(SessionInfo {
validators,
discovery_keys,
..Default::default()
}))).unwrap();
}
);

Expand Down
10 changes: 0 additions & 10 deletions node/network/collator-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,6 @@ enum Error {
Prometheus(#[from] prometheus::PrometheusError),
}

impl From<util::validator_discovery::Error> for Error {
fn from(me: util::validator_discovery::Error) -> Self {
match me {
util::validator_discovery::Error::Subsystem(s) => Error::Subsystem(s),
util::validator_discovery::Error::RuntimeApi(ra) => Error::RuntimeApi(ra),
util::validator_discovery::Error::Oneshot(c) => Error::Oneshot(c),
}
}
}

type Result<T> = std::result::Result<T, Error>;

/// What side of the collator protocol is being engaged
Expand Down
2 changes: 2 additions & 0 deletions node/subsystem-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use polkadot_primitives::v1::{
CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, PersistedValidationData,
GroupRotationInfo, Hash, Id as ParaId, ValidationData, OccupiedCoreAssumption,
SessionIndex, Signed, SigningContext, ValidationCode, ValidatorId, ValidatorIndex,
SessionInfo,
};
use sp_core::{
traits::SpawnNamed,
Expand Down Expand Up @@ -274,6 +275,7 @@ specialize_requests_ctx! {
fn request_validation_code_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
fn request_candidate_pending_availability_ctx(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
fn request_candidate_events_ctx() -> Vec<CandidateEvent>; CandidateEvents;
fn request_session_info_ctx(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
}

/// From the given set of validators, find the first key we can sign with, if any.
Expand Down
57 changes: 29 additions & 28 deletions node/subsystem-util/src/validator_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,52 +20,53 @@ use std::collections::HashMap;
use std::pin::Pin;

use futures::{
channel::{mpsc, oneshot},
channel::mpsc,
task::{Poll, self},
stream,
};
use streamunordered::{StreamUnordered, StreamYield};
use thiserror::Error;

use polkadot_node_subsystem::{
errors::RuntimeApiError, SubsystemError,
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeMessage},
errors::RuntimeApiError,
messages::{AllMessages, NetworkBridgeMessage},
SubsystemContext,
};
use polkadot_primitives::v1::{Hash, ValidatorId, AuthorityDiscoveryId};
use sc_network::PeerId;

/// Error when making a request to connect to validators.
#[derive(Debug, Error)]
pub enum Error {
/// Attempted to send or receive on a oneshot channel which had been canceled
#[error(transparent)]
Oneshot(#[from] oneshot::Canceled),
/// A subsystem error.
#[error(transparent)]
Subsystem(#[from] SubsystemError),
/// An error in the Runtime API.
#[error(transparent)]
RuntimeApi(#[from] RuntimeApiError),
}
use crate::Error;

/// Utility function to make it easier to connect to validators.
pub async fn connect_to_validators<Context: SubsystemContext>(
ctx: &mut Context,
relay_parent: Hash,
validators: Vec<ValidatorId>,
) -> Result<ConnectionRequest, Error> {
// ValidatorId -> AuthorityDiscoveryId
let (tx, rx) = oneshot::channel();

ctx.send_message(AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::ValidatorDiscovery(validators.clone(), tx),
)
)).await;
let current_index = crate::request_session_index_for_child_ctx(relay_parent, ctx).await?.await??;
let session_info = crate::request_session_info_ctx(
relay_parent,
current_index,
ctx,
).await?.await??;

let (session_validators, discovery_keys) = match session_info {
Some(info) => (info.validators, info.discovery_keys),
None => return Err(RuntimeApiError::from(
"No session_info found for the current index".to_owned()
).into()),
};

let id_to_index = session_validators.iter()
.zip(0usize..)
.collect::<HashMap<_, _>>();

// We assume the same ordering in authorities as in validators so we can do an index search
let maybe_authorities: Vec<_> = validators.iter()
.map(|id| {
let validator_index = id_to_index.get(&id);
validator_index.and_then(|i| discovery_keys.get(*i).cloned())
})
.collect();

let maybe_authorities = rx.await??;
let authorities: Vec<_> = maybe_authorities.iter()
.cloned()
.filter_map(|id| id)
Expand Down
12 changes: 3 additions & 9 deletions node/subsystem/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use polkadot_node_primitives::{
CollationGenerationConfig, MisbehaviorReport, SignedFullStatement, ValidationResult,
};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, AvailableData, BackedCandidate, BlockNumber,
AuthorityDiscoveryId, AvailableData, BackedCandidate, BlockNumber, SessionInfo,
Header as BlockHeader, CandidateDescriptor, CandidateEvent, CandidateReceipt,
CollatorId, CommittedCandidateReceipt, CoreState, ErasureChunk,
GroupRotationInfo, Hash, Id as ParaId, OccupiedCoreAssumption,
Expand Down Expand Up @@ -434,14 +434,8 @@ pub enum RuntimeApiRequest {
/// Get all events concerning candidates (backing, inclusion, time-out) in the parent of
/// the block in whose state this request is executed.
CandidateEvents(RuntimeApiSender<Vec<CandidateEvent>>),
/// Get the `AuthorityDiscoveryId`s corresponding to the given `ValidatorId`s.
/// Currently this request is limited to validators in the current session.
///
/// Returns `None` for validators not found in the current session.
ValidatorDiscovery(
Vec<ValidatorId>,
RuntimeApiSender<Vec<Option<AuthorityDiscoveryId>>>,
),
/// Get the session info for the given session, if stored.
SessionInfo(SessionIndex, RuntimeApiSender<Option<SessionInfo>>),
/// Get all the pending inbound messages in the downward message queue for a para.
DmqContents(
ParaId,
Expand Down
13 changes: 5 additions & 8 deletions primitives/src/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,8 @@ pub enum CandidateEvent<H = Hash> {
}

/// Information about validator sets of a session.
#[derive(Clone, Encode, Decode)]
#[derive(Clone, Encode, Decode, RuntimeDebug)]
#[cfg_attr(feature = "std", derive(PartialEq, Default))]
pub struct SessionInfo {
/// Validators in canonical ordering.
pub validators: Vec<ValidatorId>,
Expand Down Expand Up @@ -740,6 +741,9 @@ sp_api::decl_runtime_apis! {
/// This can be used to instantiate a `SigningContext`.
fn session_index_for_child() -> SessionIndex;

/// Get the session info for the given session, if stored.
fn session_info(index: SessionIndex) -> Option<SessionInfo>;

/// Fetch the validation code used by a para, making the given `OccupiedCoreAssumption`.
///
/// Returns `None` if either the para is not registered or the assumption is `Freed`
Expand All @@ -765,13 +769,6 @@ sp_api::decl_runtime_apis! {
#[skip_initialize_block]
fn candidate_events() -> Vec<CandidateEvent<H>>;

/// Get the `AuthorityDiscoveryId`s corresponding to the given `ValidatorId`s.
/// Currently this request is limited to validators in the current session.
///
/// We assume that every validator runs authority discovery,
/// which would allow us to establish point-to-point connection to given validators.
fn validator_discovery(validators: Vec<ValidatorId>) -> Vec<Option<AuthorityDiscoveryId>>;

/// Get all the pending inbound messages in the downward message queue for a para.
fn dmq_contents(
recipient: Id,
Expand Down
27 changes: 17 additions & 10 deletions roadmap/implementers-guide/src/types/overseer-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -398,14 +398,8 @@ enum RuntimeApiRequest {
Validators(ResponseChannel<Vec<ValidatorId>>),
/// Get the validator groups and rotation info.
ValidatorGroups(ResponseChannel<(Vec<Vec<ValidatorIndex>>, GroupRotationInfo)>),
/// Get the session index for children of the block. This can be used to construct a signing
/// context.
SessionIndex(ResponseChannel<SessionIndex>),
/// Get the validation code for a specific para, using the given occupied core assumption.
ValidationCode(ParaId, OccupiedCoreAssumption, ResponseChannel<Option<ValidationCode>>),
/// Fetch the historical validation code used by a para for candidates executed in
/// the context of a given block height in the current chain.
HistoricalValidationCode(ParaId, BlockNumber, ResponseChannel<Option<ValidationCode>>),
/// Get information about all availability cores.
AvailabilityCores(ResponseChannel<Vec<CoreState>>),
/// with the given occupied core assumption.
PersistedValidationData(
ParaId,
Expand All @@ -424,12 +418,25 @@ enum RuntimeApiRequest {
CandidateCommitments,
RuntimeApiSender<bool>,
),
/// Get information about all availability cores.
AvailabilityCores(ResponseChannel<Vec<CoreState>>),
/// Get the session index for children of the block. This can be used to construct a signing
/// context.
SessionIndexForChild(ResponseChannel<SessionIndex>),
/// Get the validation code for a specific para, using the given occupied core assumption.
ValidationCode(ParaId, OccupiedCoreAssumption, ResponseChannel<Option<ValidationCode>>),
/// Fetch the historical validation code used by a para for candidates executed in
/// the context of a given block height in the current chain.
HistoricalValidationCode(ParaId, BlockNumber, ResponseChannel<Option<ValidationCode>>),
/// Get a committed candidate receipt for all candidates pending availability.
CandidatePendingAvailability(ParaId, ResponseChannel<Option<CommittedCandidateReceipt>>),
/// Get all events concerning candidates in the last block.
CandidateEvents(ResponseChannel<Vec<CandidateEvent>>),
/// Get the session info for the given session, if stored.
SessionInfo(SessionIndex, ResponseChannel<Option<SessionInfo>>),
/// Get all the pending inbound messages in the downward message queue for a para.
DmqContents(ParaId, ResponseChannel<Vec<InboundDownwardMessage<BlockNumber>>>),
/// Get the contents of all channels addressed to the given recipient. Channels that have no
/// messages in them are also included.
InboundHrmpChannelsContents(ParaId, ResponseChannel<BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumber>>>>),
}

enum RuntimeApiMessage {
Expand Down
10 changes: 5 additions & 5 deletions runtime/kusama/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use primitives::v1::{
AccountId, AccountIndex, Balance, BlockNumber, CandidateEvent, CommittedCandidateReceipt,
CoreState, GroupRotationInfo, Hash, Id, Moment, Nonce, OccupiedCoreAssumption,
PersistedValidationData, Signature, ValidationCode, ValidationData, ValidatorId, ValidatorIndex,
InboundDownwardMessage, InboundHrmpMessage,
InboundDownwardMessage, InboundHrmpMessage, SessionInfo,
};
use runtime_common::{
claims, SlowAdjustingFeeUpdate, CurrencyToVote,
Expand Down Expand Up @@ -1092,6 +1092,10 @@ sp_api::impl_runtime_apis! {
0
}

fn session_info(_: SessionIndex) -> Option<SessionInfo> {
None
}

fn validation_code(_: Id, _: OccupiedCoreAssumption) -> Option<ValidationCode> {
None
}
Expand All @@ -1108,10 +1112,6 @@ sp_api::impl_runtime_apis! {
Vec::new()
}

fn validator_discovery(_: Vec<ValidatorId>) -> Vec<Option<AuthorityDiscoveryId>> {
Vec::new()
}

fn dmq_contents(
_recipient: Id,
) -> Vec<InboundDownwardMessage<BlockNumber>> {
Expand Down
Loading

0 comments on commit 5304563

Please sign in to comment.