Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Remove request multiplexer #3624

Merged
merged 23 commits into from
Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/network/availability-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ sp-core = { git = "https://github.com/paritytech/substrate", branch = "master",
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
thiserror = "1.0.26"
rand = "0.8.3"
derive_more = "0.99.11"
lru = "0.6.6"

[dev-dependencies]
Expand Down
42 changes: 23 additions & 19 deletions node/network/availability-distribution/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,31 @@

//! Error handling related code and Error/Result definitions.

use polkadot_node_network_protocol::request_response::request::RequestError;
use polkadot_node_network_protocol::request_response::outgoing::RequestError;
use thiserror::Error;

use futures::channel::oneshot;

use polkadot_node_subsystem_util::{runtime, unwrap_non_fatal, Fault};
use polkadot_node_subsystem_util::runtime;
use polkadot_subsystem::SubsystemError;

use crate::LOG_TARGET;

#[derive(Debug, Error)]
#[derive(Debug, Error, derive_more::From)]
#[error(transparent)]
pub struct Error(pub Fault<NonFatal, Fatal>);

impl From<NonFatal> for Error {
fn from(e: NonFatal) -> Self {
Self(Fault::from_non_fatal(e))
}
}

impl From<Fatal> for Error {
fn from(f: Fatal) -> Self {
Self(Fault::from_fatal(f))
}
pub enum Error {
/// All fatal errors.
Fatal(Fatal),
/// All nonfatal/potentially recoverable errors.
NonFatal(NonFatal),
}

impl From<runtime::Error> for Error {
fn from(o: runtime::Error) -> Self {
Self(Fault::from_other(o))
match o {
runtime::Error::Fatal(f) => Self::Fatal(Fatal::Runtime(f)),
runtime::Error::NonFatal(f) => Self::NonFatal(NonFatal::Runtime(f)),
}
}
}

Expand Down Expand Up @@ -107,15 +103,23 @@ pub enum NonFatal {
Runtime(#[from] runtime::NonFatal),
}

/// General result type for fatal/nonfatal errors.
pub type Result<T> = std::result::Result<T, Error>;

/// Results which are never fatal.
pub type NonFatalResult<T> = std::result::Result<T, NonFatal>;

/// Utility for eating top level errors and log them.
///
/// We basically always want to try and continue on error. This utility function is meant to
/// consume top-level errors by simply logging them
pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<(), Fatal> {
if let Some(error) = unwrap_non_fatal(result.map_err(|e| e.0))? {
tracing::warn!(target: LOG_TARGET, error = ?error, ctx);
match result {
Err(Error::Fatal(f)) => Err(f),
Err(Error::NonFatal(error)) => {
tracing::warn!(target: LOG_TARGET, error = ?error, ctx);
Ok(())
},
Ok(()) => Ok(()),
}
Ok(())
}
54 changes: 41 additions & 13 deletions node/network/availability-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use futures::{future::Either, FutureExt, StreamExt, TryFutureExt};

use sp_keystore::SyncCryptoStorePtr;

use polkadot_node_network_protocol::request_response::{v1, IncomingRequestReceiver};
use polkadot_subsystem::{
messages::AvailabilityDistributionMessage, overseer, FromOverseer, OverseerSignal,
SpawnedSubsystem, SubsystemContext, SubsystemError,
Expand All @@ -38,7 +39,7 @@ mod pov_requester;

/// Responding to erasure chunk requests:
mod responder;
use responder::{answer_chunk_request_log, answer_pov_request_log};
use responder::{run_chunk_receiver, run_pov_receiver};

mod metrics;
/// Prometheus `Metrics` for availability distribution.
Expand All @@ -53,10 +54,20 @@ const LOG_TARGET: &'static str = "parachain::availability-distribution";
pub struct AvailabilityDistributionSubsystem {
/// Easy and efficient runtime access for this subsystem.
runtime: RuntimeInfo,
/// Receivers to receive messages from.
recvs: IncomingRequestReceivers,
/// Prometheus metrics.
metrics: Metrics,
}

/// Receivers to be passed into availability distribution.
pub struct IncomingRequestReceivers {
/// Receiver for incoming PoV requests.
pub pov_req_receiver: IncomingRequestReceiver<v1::PoVFetchingRequest>,
/// Receiver for incoming availability chunk requests.
pub chunk_req_receiver: IncomingRequestReceiver<v1::ChunkFetchingRequest>,
}

impl<Context> overseer::Subsystem<Context, SubsystemError> for AvailabilityDistributionSubsystem
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
Expand All @@ -74,18 +85,41 @@ where

impl AvailabilityDistributionSubsystem {
/// Create a new instance of the availability distribution.
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
pub fn new(
keystore: SyncCryptoStorePtr,
recvs: IncomingRequestReceivers,
metrics: Metrics,
) -> Self {
let runtime = RuntimeInfo::new(Some(keystore));
Self { runtime, metrics }
Self { runtime, recvs, metrics }
}

/// Start processing work as passed on from the Overseer.
async fn run<Context>(mut self, mut ctx: Context) -> std::result::Result<(), Fatal>
async fn run<Context>(self, mut ctx: Context) -> std::result::Result<(), Fatal>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
Context: overseer::SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let mut requester = Requester::new(self.metrics.clone()).fuse();
let Self { mut runtime, recvs, metrics } = self;

let IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver } = recvs;
let mut requester = Requester::new(metrics.clone()).fuse();

{
let sender = ctx.sender().clone();
ctx.spawn(
"pov-receiver",
run_pov_receiver(sender.clone(), pov_req_receiver, metrics.clone()).boxed(),
)
.map_err(Fatal::SpawnTask)?;

ctx.spawn(
"chunk-receiver",
run_chunk_receiver(sender, chunk_req_receiver, metrics.clone()).boxed(),
)
.map_err(Fatal::SpawnTask)?;
}

loop {
let action = {
let mut subsystem_next = ctx.recv().fuse();
Expand All @@ -110,19 +144,13 @@ impl AvailabilityDistributionSubsystem {
log_error(
requester
.get_mut()
.update_fetching_heads(&mut ctx, &mut self.runtime, update)
.update_fetching_heads(&mut ctx, &mut runtime, update)
.await,
"Error in Requester::update_fetching_heads",
)?;
},
FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::ChunkFetchingRequest(req),
} => answer_chunk_request_log(&mut ctx, req, &self.metrics).await,
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::PoVFetchingRequest(req),
} => answer_pov_request_log(&mut ctx, req, &self.metrics).await,
FromOverseer::Communication {
msg:
AvailabilityDistributionMessage::FetchPoV {
Expand All @@ -136,7 +164,7 @@ impl AvailabilityDistributionSubsystem {
log_error(
pov_requester::fetch_pov(
&mut ctx,
&mut self.runtime,
&mut runtime,
relay_parent,
from_validator,
candidate_hash,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use futures::{channel::oneshot, future::BoxFuture, FutureExt};

use polkadot_node_network_protocol::request_response::{
request::{RequestError, Requests},
outgoing::{RequestError, Requests},
v1::{PoVFetchingRequest, PoVFetchingResponse},
OutgoingRequest, Recipient,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use futures::{

use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::request_response::{
request::{OutgoingRequest, Recipient, RequestError, Requests},
outgoing::{OutgoingRequest, Recipient, RequestError, Requests},
v1::{ChunkFetchingRequest, ChunkFetchingResponse},
};
use polkadot_node_primitives::ErasureChunk;
Expand Down
Loading