Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

validator_discovery: less flexible, but simpler design #3052

Merged
merged 5 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions node/network/availability-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use requester::Requester;

/// Handing requests for PoVs during backing.
mod pov_requester;
use pov_requester::PoVRequester;

/// Responding to erasure chunk requests:
mod responder;
Expand Down Expand Up @@ -90,7 +89,6 @@ impl AvailabilityDistributionSubsystem {
Context: SubsystemContext<Message = AvailabilityDistributionMessage> + Sync + Send,
{
let mut requester = Requester::new(self.metrics.clone()).fuse();
let mut pov_requester = PoVRequester::new();
loop {
let action = {
let mut subsystem_next = ctx.recv().fuse();
Expand All @@ -113,18 +111,6 @@ impl AvailabilityDistributionSubsystem {
};
match message {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
let result = pov_requester.update_connected_validators(
&mut ctx,
&mut self.runtime,
&update,
).await;
if let Err(error) = result {
tracing::debug!(
target: LOG_TARGET,
?error,
"PoVRequester::update_connected_validators",
);
}
log_error(
requester.get_mut().update_fetching_heads(&mut ctx, &mut self.runtime, update).await,
"Error in Requester::update_fetching_heads"
Expand Down Expand Up @@ -154,7 +140,7 @@ impl AvailabilityDistributionSubsystem {
},
} => {
log_error(
pov_requester.fetch_pov(
pov_requester::fetch_pov(
&mut ctx,
&mut self.runtime,
relay_parent,
Expand All @@ -163,7 +149,7 @@ impl AvailabilityDistributionSubsystem {
pov_hash,
tx,
).await,
"PoVRequester::fetch_pov"
"pov_requester::fetch_pov"
)?;
}
}
Expand Down
213 changes: 47 additions & 166 deletions node/network/availability-distribution/src/pov_requester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,118 +17,68 @@
//! PoV requester takes care of requesting PoVs from validators of a backing group.

use futures::{FutureExt, channel::oneshot, future::BoxFuture};
use lru::LruCache;

use polkadot_subsystem::jaeger;
use polkadot_node_network_protocol::{
peer_set::PeerSet,
request_response::{OutgoingRequest, Recipient, request::{RequestError, Requests},
v1::{PoVFetchingRequest, PoVFetchingResponse}}
};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, CandidateHash, Hash, SessionIndex, ValidatorIndex
CandidateHash, Hash, ValidatorIndex,
};
use polkadot_node_primitives::PoV;
use polkadot_subsystem::{
ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf,
SubsystemContext,
messages::{AllMessages, NetworkBridgeMessage, IfDisconnected}
};
use polkadot_node_subsystem_util::runtime::{RuntimeInfo, ValidatorInfo};
use polkadot_node_subsystem_util::runtime::RuntimeInfo;

use crate::error::{Fatal, NonFatal};
use crate::LOG_TARGET;

/// Number of sessions we want to keep in the LRU.
const NUM_SESSIONS: usize = 2;

pub struct PoVRequester {
/// We only ever care about being connected to validators of at most two sessions.
///
/// So we keep an LRU for managing connection requests of size 2.
/// Cache will contain `None` if we are not a validator in that session.
connected_validators: LruCache<SessionIndex, Option<oneshot::Sender<()>>>,
}

impl PoVRequester {
/// Create a new requester for PoVs.
pub fn new() -> Self {
Self {
connected_validators: LruCache::new(NUM_SESSIONS),
}
}

/// Make sure we are connected to the right set of validators.
///
/// On every `ActiveLeavesUpdate`, we check whether we are connected properly to our current
/// validator group.
pub async fn update_connected_validators<Context>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
update: &ActiveLeavesUpdate,
) -> super::Result<()>
where
Context: SubsystemContext,
{
let activated = update.activated.iter().map(|ActivatedLeaf { hash: h, .. }| h);
let activated_sessions =
get_activated_sessions(ctx, runtime, activated).await?;

for (parent, session_index) in activated_sessions {
if self.connected_validators.contains(&session_index) {
continue
}
let tx = connect_to_relevant_validators(ctx, runtime, parent, session_index).await?;
self.connected_validators.put(session_index, tx);
}
Ok(())
}

/// Start background worker for taking care of fetching the requested `PoV` from the network.
pub async fn fetch_pov<Context>(
&self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
parent: Hash,
from_validator: ValidatorIndex,
candidate_hash: CandidateHash,
pov_hash: Hash,
tx: oneshot::Sender<PoV>
) -> super::Result<()>
where
Context: SubsystemContext,
{
let info = &runtime.get_session_info(ctx, parent).await?.session_info;
let authority_id = info.discovery_keys.get(from_validator.0 as usize)
.ok_or(NonFatal::InvalidValidatorIndex)?
.clone();
let (req, pending_response) = OutgoingRequest::new(
Recipient::Authority(authority_id),
PoVFetchingRequest {
candidate_hash,
},
);
let full_req = Requests::PoVFetching(req);

ctx.send_message(
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
vec![full_req],
// We are supposed to be connected to validators of our group via `PeerSet`,
// but at session boundaries that is kind of racy, in case a connection takes
// longer to get established, so we try to connect in any case.
IfDisconnected::TryConnect
)
)).await;

let span = jaeger::Span::new(candidate_hash, "fetch-pov")
.with_validator_index(from_validator)
.with_relay_parent(parent);
ctx.spawn("pov-fetcher", fetch_pov_job(pov_hash, pending_response.boxed(), span, tx).boxed())
.await
.map_err(|e| Fatal::SpawnTask(e))?;
Ok(())
}
/// Start background worker for taking care of fetching the requested `PoV` from the network.
pub async fn fetch_pov<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
parent: Hash,
from_validator: ValidatorIndex,
candidate_hash: CandidateHash,
pov_hash: Hash,
tx: oneshot::Sender<PoV>
) -> super::Result<()>
where
Context: SubsystemContext,
{
let info = &runtime.get_session_info(ctx, parent).await?.session_info;
let authority_id = info.discovery_keys.get(from_validator.0 as usize)
.ok_or(NonFatal::InvalidValidatorIndex)?
.clone();
let (req, pending_response) = OutgoingRequest::new(
Recipient::Authority(authority_id),
PoVFetchingRequest {
candidate_hash,
},
);
let full_req = Requests::PoVFetching(req);

ctx.send_message(
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
vec![full_req],
// We are supposed to be connected to validators of our group via `PeerSet`,
// but at session boundaries that is kind of racy, in case a connection takes
// longer to get established, so we try to connect in any case.
IfDisconnected::TryConnect
)
)).await;

let span = jaeger::Span::new(candidate_hash, "fetch-pov")
.with_validator_index(from_validator)
.with_relay_parent(parent);
ctx.spawn("pov-fetcher", fetch_pov_job(pov_hash, pending_response.boxed(), span, tx).boxed())
.await
.map_err(|e| Fatal::SpawnTask(e))?;
Ok(())
}

/// Future to be spawned for taking care of handling reception and sending of PoV.
Expand Down Expand Up @@ -170,74 +120,6 @@ async fn do_fetch_pov(
}
}

/// Get the session indeces for the given relay chain parents.
async fn get_activated_sessions<Context>(ctx: &mut Context, runtime: &mut RuntimeInfo, new_heads: impl Iterator<Item = &Hash>)
-> super::Result<impl Iterator<Item = (Hash, SessionIndex)>>
where
Context: SubsystemContext,
{
let mut sessions = Vec::new();
for parent in new_heads {
sessions.push((*parent, runtime.get_session_index(ctx, *parent).await?));
}
Ok(sessions.into_iter())
}

/// Connect to validators of our validator group.
async fn connect_to_relevant_validators<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
parent: Hash,
session: SessionIndex
)
-> super::Result<Option<oneshot::Sender<()>>>
where
Context: SubsystemContext,
{
if let Some(validator_ids) = determine_relevant_validators(ctx, runtime, parent, session).await? {
let (tx, keep_alive) = oneshot::channel();
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators {
validator_ids, peer_set: PeerSet::Validation, keep_alive
})).await;
Ok(Some(tx))
} else {
Ok(None)
}
}

/// Get the validators in our validator group.
///
/// Return: `None` if not a validator.
async fn determine_relevant_validators<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
parent: Hash,
session: SessionIndex,
)
-> super::Result<Option<Vec<AuthorityDiscoveryId>>>
where
Context: SubsystemContext,
{
let info = runtime.get_session_info_by_index(ctx, parent, session).await?;
if let ValidatorInfo {
our_index: Some(our_index),
our_group: Some(our_group)
} = &info.validator_info {

let indeces = info.session_info.validator_groups.get(our_group.0 as usize)
.expect("Our group got retrieved from that session info, it must exist. qed.")
.clone();
Ok(Some(
indeces.into_iter()
.filter(|i| *i != *our_index)
.map(|i| info.session_info.discovery_keys[i.0 as usize].clone())
.collect()
))
} else {
Ok(None)
}
}

#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
Expand Down Expand Up @@ -274,7 +156,6 @@ mod tests {
}

fn test_run(pov_hash: Hash, pov: PoV) {
let requester = PoVRequester::new();
let pool = TaskExecutor::new();
let (mut context, mut virtual_overseer) =
test_helpers::make_subsystem_context::<AvailabilityDistributionMessage, TaskExecutor>(pool.clone());
Expand All @@ -283,7 +164,7 @@ mod tests {

let (tx, rx) = oneshot::channel();
let testee = async {
requester.fetch_pov(
fetch_pov(
&mut context,
&mut runtime,
Hash::default(),
Expand Down
2 changes: 0 additions & 2 deletions node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,6 @@ where
NetworkBridgeMessage::ConnectToValidators {
validator_ids,
peer_set,
keep_alive,
} => {
tracing::trace!(
target: LOG_TARGET,
Expand All @@ -522,7 +521,6 @@ where
let (ns, ads) = validator_discovery.on_request(
validator_ids,
peer_set,
keep_alive,
network_service,
authority_discovery_service,
).await;
Expand Down
Loading