Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Request based PoV distribution #2640

Merged
merged 36 commits into from
Mar 28, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
df828ac
Indentation fix.
eskimor Mar 18, 2021
06d4d90
Prepare request-response for PoV fetching.
eskimor Mar 18, 2021
6a940eb
Drop old PoV distribution.
eskimor Mar 18, 2021
a7fc368
WIP: Fetch PoV directly from backing.
eskimor Mar 18, 2021
9847b81
Merge branch 'master' into rk-pov-distribution-2590
eskimor Mar 19, 2021
e03ff75
Backing compiles.
eskimor Mar 19, 2021
a49b4d4
Runtime access and connection management for PoV distribution.
eskimor Mar 23, 2021
545e950
Get rid of seemingly dead code.
eskimor Mar 23, 2021
47d9f5f
Implement PoV fetching.
eskimor Mar 23, 2021
0a283ab
Don't send `ConnectToValidators` for empty list.
eskimor Mar 24, 2021
afd795f
Even better - no need to check over and over again.
eskimor Mar 24, 2021
1c3eec8
PoV fetching implemented.
eskimor Mar 24, 2021
fb24855
Merge branch 'master' into rk-pov-distribution-2590
eskimor Mar 24, 2021
cceddce
Check PoV hash upon reception.
eskimor Mar 24, 2021
89f0bf9
Implement retry of PoV fetching in backing.
eskimor Mar 25, 2021
ab75fea
Avoid pointless validation spawning.
eskimor Mar 25, 2021
10da891
Merge branch 'master' into rk-pov-distribution-2590
eskimor Mar 25, 2021
3915a57
Add jaeger span to pov requesting.
eskimor Mar 25, 2021
fa6409e
Add back tracing.
eskimor Mar 25, 2021
8b9c2d4
Review remarks.
eskimor Mar 25, 2021
2d27be5
Merge branch 'master' into rk-pov-distribution-2590
eskimor Mar 25, 2021
4af7d2e
Whitespace.
eskimor Mar 25, 2021
5c09829
Whitespace again.
eskimor Mar 26, 2021
ea9bde4
Cleanup + fix tests.
eskimor Mar 27, 2021
4207eaf
Log to log target in overseer.
eskimor Mar 27, 2021
3691061
Fix more tests.
eskimor Mar 27, 2021
b1a201a
Don't fail if group cannot be found.
eskimor Mar 27, 2021
298fe9d
Simple test for PoV fetcher.
eskimor Mar 27, 2021
af9f12c
Handle missing group membership better.
eskimor Mar 27, 2021
0c30792
Add test for retry functionality.
eskimor Mar 27, 2021
eb47465
Fix flaky test.
eskimor Mar 27, 2021
071bcca
Merge branch 'master' into rk-pov-distribution-2590
eskimor Mar 27, 2021
3fa5791
Spaces again.
eskimor Mar 28, 2021
82d4a11
Guide updates.
eskimor Mar 28, 2021
b58a2ab
Merge branch 'master' into rk-pov-distribution-2590
eskimor Mar 28, 2021
a0609e7
Spaces.
eskimor Mar 28, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 2 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ members = [
"node/core/runtime-api",
"node/network/approval-distribution",
"node/network/bridge",
"node/network/pov-distribution",
"node/network/protocol",
"node/network/statement-distribution",
"node/network/bitfield-distribution",
Expand Down
1 change: 1 addition & 0 deletions node/core/backing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2018"
futures = "0.3.12"
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
Expand Down
121 changes: 84 additions & 37 deletions node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,12 @@ use sp_keystore::SyncCryptoStorePtr;
use polkadot_primitives::v1::{
AvailableData, BackedCandidate, CandidateCommitments, CandidateDescriptor, CandidateHash,
CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreIndex, CoreState, Hash, Id as ParaId,
PoV, SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation,
PoV, SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation, AuthorityDiscoveryId,
};
use polkadot_node_primitives::{
Statement, SignedFullStatement, ValidationResult,
};
use polkadot_subsystem::{
PerLeafSpan, Stage,
jaeger,
messages::{
AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
CandidateValidationMessage, PoVDistributionMessage, ProvisionableData,
ProvisionerMessage, StatementDistributionMessage, ValidationFailed, RuntimeApiRequest,
},
};
use polkadot_subsystem::{PerLeafSpan, Stage, errors::RuntimeApiError, jaeger, messages::{AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage, CandidateValidationMessage, IfDisconnected, NetworkBridgeMessage, PoVDistributionMessage, ProvisionableData, ProvisionerMessage, RuntimeApiRequest, StatementDistributionMessage, ValidationFailed}};
use polkadot_node_subsystem_util::{
self as util,
request_session_index_for_child,
Expand All @@ -54,6 +46,10 @@ use polkadot_node_subsystem_util::{
FromJobCommand,
metrics::{self, prometheus},
};
use polkadot_node_subsystem_util::Error as UtilError;
use polkadot_node_network_protocol::request_response::{Recipient, Requests, request::{OutgoingRequest, RequestError}, v1::{
PoVFetchingRequest, PoVFetchingResponse
}};
use statement_table::{
generic::AttestedCandidate as TableAttestedCandidate,
Context as TableContextTrait,
Expand All @@ -65,6 +61,7 @@ use statement_table::{
},
};
use thiserror::Error;
use util::request_session_info;

const LOG_TARGET: &str = "parachain::candidate-backing";

Expand All @@ -76,8 +73,22 @@ enum Error {
InvalidSignature,
#[error("Failed to send candidates {0:?}")]
Send(Vec<BackedCandidate>),
#[error("FetchPoV request error")]
FetchPoV(#[source] RequestError),
#[error("FetchPoV channel closed before receipt")]
FetchPoV(#[source] oneshot::Canceled),
FetchPoVCanceled(#[source] oneshot::Canceled),
#[error("FetchPoV runtime request failed in utilities")]
FetchPoVUtil(#[source] UtilError),
#[error("FetchPoV runtime request failed")]
FetchPoVRuntime(#[source] RuntimeApiError),
#[error("FetchPoV could not find session")]
FetchPoVNoSuchSession,
#[error("FetchPoV could not find index of signing validator")]
FetchPoVInvalidValidatorIndex,
#[error("Invalid response")]
FetchPoVInvalidResponse,
#[error("Backer did not have PoV")]
FetchPoVNoSuchPoV,
#[error("ValidateFromChainState channel closed before receipt")]
ValidateFromChainState(#[source] oneshot::Canceled),
#[error("StoreAvailableData channel closed before receipt")]
Expand All @@ -94,6 +105,14 @@ enum Error {
UtilError(#[from] util::Error),
}

/// PoV data to validate.
enum PoVData {
/// Allready available (from candidate selection).
Ready(Arc<PoV>),
/// Needs to be fetched from validator (we are checking a signed statement).
FetchFromValidator(ValidatorIndex),
}

enum ValidatedCandidateCommand {
// We were instructed to second the candidate.
Second(BackgroundValidationResult),
Expand Down Expand Up @@ -336,18 +355,51 @@ async fn make_pov_available(
Ok(Ok(()))
}

async fn request_pov_from_distribution(
async fn request_pov(
tx_from: &mut mpsc::Sender<FromJobCommand>,
parent: Hash,
descriptor: CandidateDescriptor,
from_validator: ValidatorIndex,
) -> Result<Arc<PoV>, Error> {
let (tx, rx) = oneshot::channel();

tx_from.send(AllMessages::PoVDistribution(
PoVDistributionMessage::FetchPoV(parent, descriptor, tx)
).into()).await?;

rx.await.map_err(Error::FetchPoV)
let session_index = request_session_index_for_child(parent, tx_from)
.await.map_err(Error::FetchPoVUtil)?
.await.map_err(Error::FetchPoVCanceled)?
.map_err(Error::FetchPoVRuntime)?;
let session_info = request_session_info(parent, session_index, tx_from)
.await.map_err(Error::FetchPoVUtil)?
.await.map_err(Error::FetchPoVCanceled)?
.map_err(Error::FetchPoVRuntime)?
.ok_or(Error::FetchPoVNoSuchSession)?;
let authority_id = session_info.discovery_keys
.get(from_validator.0 as usize)
.ok_or(Error::FetchPoVInvalidValidatorIndex)?;

let (req, pending_response) = OutgoingRequest::new(
Recipient::Authority(authority_id.clone()),
PoVFetchingRequest {
relay_parent: parent,
descriptor,
},
);
let full_req = Requests::PoVFetching(req);

tx_from.send(FromJobCommand::SendMessage(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
vec![full_req],
IfDisconnected::ImmediateError
)
))).await?;

let response = pending_response.await.map_err(Error::FetchPoV)?;
eskimor marked this conversation as resolved.
Show resolved Hide resolved
let pov = match response {
PoVFetchingResponse::PoV(compressed) => {
compressed.decompress().map_err(|_| Error::FetchPoVInvalidResponse)?
}
PoVFetchingResponse::NoSuchPoV => {
return Err(Error::FetchPoVNoSuchPoV)
eskimor marked this conversation as resolved.
Show resolved Hide resolved
}
};
Ok(Arc::new(pov))
}

async fn request_candidate_validation(
Expand Down Expand Up @@ -380,7 +432,7 @@ struct BackgroundValidationParams<F> {
tx_command: mpsc::Sender<ValidatedCandidateCommand>,
candidate: CandidateReceipt,
relay_parent: Hash,
pov: Option<Arc<PoV>>,
pov: PoVData,
validator_index: Option<ValidatorIndex>,
n_validators: usize,
span: Option<jaeger::Span>,
Expand All @@ -403,13 +455,14 @@ async fn validate_and_make_available(
} = params;

let pov = match pov {
Some(pov) => pov,
None => {
PoVData::Ready(pov) => pov,
PoVData::FetchFromValidator(validator_index) => {
let _span = span.as_ref().map(|s| s.child("request-pov"));
request_pov_from_distribution(
request_pov(
&mut tx_from,
relay_parent,
candidate.descriptor.clone(),
validator_index,
).await?
}
};
Expand Down Expand Up @@ -544,7 +597,6 @@ impl CandidateBackingJob {
).await? {
self.issue_candidate_seconded_message(stmt).await?;
}
self.distribute_pov(candidate.descriptor, pov).await?;
}
}
Err(candidate) => {
Expand Down Expand Up @@ -645,7 +697,7 @@ impl CandidateBackingJob {
tx_command: self.background_validation_tx.clone(),
candidate: candidate.clone(),
relay_parent: self.parent,
pov: Some(pov),
pov: PoVData::Ready(pov),
validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
n_validators: self.table_context.validators.len(),
span,
Expand Down Expand Up @@ -824,6 +876,7 @@ impl CandidateBackingJob {
async fn kick_off_validation_work(
&mut self,
summary: TableSummary,
statement: SignedFullStatement,
span: Option<jaeger::Span>,
) -> Result<(), Error> {
let candidate_hash = summary.candidate;
Expand Down Expand Up @@ -862,7 +915,7 @@ impl CandidateBackingJob {
tx_command: self.background_validation_tx.clone(),
candidate,
relay_parent: self.parent,
pov: None,
pov: PoVData::FetchFromValidator(statement.validator_index()),
validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
n_validators: self.table_context.validators.len(),
span,
Expand All @@ -887,7 +940,11 @@ impl CandidateBackingJob {
summary.group_id,
);

self.kick_off_validation_work(summary, span).await?;
self.kick_off_validation_work(
summary,
statement,
span,
).await?;
}
}
}
Expand Down Expand Up @@ -988,16 +1045,6 @@ impl CandidateBackingJob {
Ok(())
}

async fn distribute_pov(
&mut self,
descriptor: CandidateDescriptor,
pov: Arc<PoV>,
) -> Result<(), Error> {
self.tx_from.send(AllMessages::from(
PoVDistributionMessage::DistributePoV(self.parent, descriptor, pov),
).into()).await.map_err(Into::into)
}

async fn distribute_signed_statement(&mut self, s: SignedFullStatement) -> Result<(), Error> {
let smsg = StatementDistributionMessage::Share(self.parent, s);

Expand Down
52 changes: 42 additions & 10 deletions node/network/availability-distribution/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,25 @@

//! Error handling related code and Error/Result definitions.

use polkadot_node_network_protocol::request_response::request::RequestError;
use thiserror::Error;

use futures::channel::oneshot;

use polkadot_node_subsystem_util::Error as UtilError;
use polkadot_primitives::v1::SessionIndex;
use polkadot_primitives::v1::{CompressedPoVError, SessionIndex};
use polkadot_subsystem::{errors::RuntimeApiError, SubsystemError};

use crate::LOG_TARGET;

/// Errors of this subsystem.
#[derive(Debug, Error)]
pub enum Error {
#[error("Response channel to obtain QueryChunk failed")]
#[error("Response channel to obtain chunk failed")]
QueryChunkResponseChannel(#[source] oneshot::Canceled),

#[error("Response channel to obtain available data failed")]
QueryAvailableDataResponseChannel(#[source] oneshot::Canceled),

#[error("Receive channel closed")]
IncomingMessageChannel(#[source] SubsystemError),
Expand All @@ -53,24 +59,39 @@ pub enum Error {
/// Sending response failed.
#[error("Sending a request's response failed.")]
SendResponse,
}

/// Error that we should handle gracefully by logging it.
#[derive(Debug)]
pub enum NonFatalError {
/// Some request to utility functions failed.
/// This can be either `RuntimeRequestCanceled` or `RuntimeApiError`.
#[error("Utility request failed")]
UtilRequest(UtilError),

/// Runtime API subsystem is down, which means we're shutting down.
#[error("Runtime request canceled")]
RuntimeRequestCanceled(oneshot::Canceled),

/// Some request to the runtime failed.
/// For example if we prune a block we're requesting info about.
#[error("Runtime API error")]
RuntimeRequest(RuntimeApiError),

/// We tried fetching a session info which was not available.
#[error("There was no session with the given index")]
NoSuchSession(SessionIndex),

/// Decompressing PoV failed.
#[error("PoV could not be decompressed")]
PoVDecompression(CompressedPoVError),

/// Fetching PoV failed with `RequestError`.
#[error("FetchPoV request error")]
FetchPoV(#[source] RequestError),

#[error("Remote responded with `NoSuchPoV`")]
NoSuchPoV,

/// No validator with the index could be found in current session.
#[error("Given validator index could not be found")]
InvalidValidatorIndex,
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -87,9 +108,20 @@ pub(crate) async fn recv_runtime<V>(
oneshot::Receiver<std::result::Result<V, RuntimeApiError>>,
UtilError,
>,
) -> std::result::Result<V, NonFatalError> {
r.map_err(NonFatalError::UtilRequest)?
) -> std::result::Result<V, Error> {
r.map_err(Error::UtilRequest)?
.await
.map_err(NonFatalError::RuntimeRequestCanceled)?
.map_err(NonFatalError::RuntimeRequest)
.map_err(Error::RuntimeRequestCanceled)?
.map_err(Error::RuntimeRequest)
}


/// Utility for eating top level errors and log them.
///
/// We basically always want to try and continue on error. This utility function is meant to
/// consume top-level errors by simply logging them
pub fn log_error(result: Result<()>, ctx: &'static str) {
if let Err(error) = result {
tracing::warn!(target: LOG_TARGET, error = ?error, ctx);
}
}
Loading