Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
line lengths reduced
Browse files Browse the repository at this point in the history
  • Loading branch information
gilescope committed Jul 10, 2021
1 parent eb6752d commit b59e1cf
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 37 deletions.
10 changes: 8 additions & 2 deletions client/consensus/api/src/import_queue/buffered_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_u
use std::{pin::Pin, task::Context, task::Poll};
use crate::import_queue::{Origin, Link, BlockImportStatus, BlockImportError};

use super::BlockImportResult;

/// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and
/// can be used to buffer commands, and the receiver can be used to poll said commands and transfer
/// them to another link.
Expand Down Expand Up @@ -78,7 +80,11 @@ impl<B: BlockT> Clone for BufferedLinkSender<B> {

/// Internal buffered message.
enum BlockImportWorkerMsg<B: BlockT> {
BlocksProcessed(usize, usize, Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>),
BlocksProcessed(
usize,
usize,
Vec<(BlockImportResult<B>, B::Hash)>
),
JustificationImported(Origin, B::Hash, NumberFor<B>, bool),
RequestJustification(B::Hash, NumberFor<B>),
}
Expand All @@ -88,7 +94,7 @@ impl<B: BlockT> Link<B> for BufferedLinkSender<B> {
&mut self,
imported: usize,
count: usize,
results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>
results: Vec<(BlockImportResult<B>, B::Hash)>
) {
let _ = self.tx.unbounded_send(BlockImportWorkerMsg::BlocksProcessed(imported, count, results));
}
Expand Down
31 changes: 23 additions & 8 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ use sp_consensus::{
};
use sc_consensus_api::{
import_queue::{BoxJustificationImport, BasicQueue, DefaultImportQueue, Verifier},
block_import::{BlockCheckParams, BlockImport, BlockImportParams, BlockOrigin,ForkChoiceStrategy, ImportResult, StateAction},
block_import::{
BlockCheckParams, BlockImport, BlockImportParams, BlockOrigin,
ForkChoiceStrategy, ImportResult, StateAction
},
};
use sp_consensus_babe::inherents::BabeInherentData;
use sp_consensus_slots::Slot;
Expand Down Expand Up @@ -503,7 +506,12 @@ where

let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);

let answer_requests = answer_requests(worker_rx, config.0, client, babe_link.epoch_changes.clone());
let answer_requests = answer_requests(
worker_rx,
config.0,
client,
babe_link.epoch_changes.clone());

Ok(BabeWorker {
inner: Box::pin(future::join(inner, answer_requests).map(|_| ())),
slot_notification_sinks,
Expand Down Expand Up @@ -891,8 +899,10 @@ fn find_next_epoch_digest<B: BlockT>(header: &B::Header)
trace!(target: "babe", "Checking log {:?}, looking for epoch change digest.", log);
let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
match (log, epoch_digest.is_some()) {
(Some(ConsensusLog::NextEpochData(_)), true) => return Err(babe_err(Error::MultipleEpochChangeDigests)),
(Some(ConsensusLog::NextEpochData(epoch)), false) => epoch_digest = Some(epoch),
(Some(ConsensusLog::NextEpochData(_)), true) =>
return Err(babe_err(Error::MultipleEpochChangeDigests)),
(Some(ConsensusLog::NextEpochData(epoch)), false) =>
epoch_digest = Some(epoch),
_ => trace!(target: "babe", "Ignoring digest not meant for us"),
}
}
Expand All @@ -910,8 +920,10 @@ fn find_next_config_digest<B: BlockT>(header: &B::Header)
trace!(target: "babe", "Checking log {:?}, looking for epoch change digest.", log);
let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
match (log, config_digest.is_some()) {
(Some(ConsensusLog::NextConfigData(_)), true) => return Err(babe_err(Error::MultipleConfigChangeDigests)),
(Some(ConsensusLog::NextConfigData(config)), false) => config_digest = Some(config),
(Some(ConsensusLog::NextConfigData(_)), true) =>
return Err(babe_err(Error::MultipleConfigChangeDigests)),
(Some(ConsensusLog::NextConfigData(config)), false) =>
config_digest = Some(config),
_ => trace!(target: "babe", "Ignoring digest not meant for us"),
}
}
Expand Down Expand Up @@ -1631,8 +1643,11 @@ pub fn import_queue<Block: BlockT, Client, SelectChain, Inner, CAW, CIDP>(
can_author_with: CAW,
telemetry: Option<TelemetryHandle>,
) -> ClientResult<DefaultImportQueue<Block, Client>> where
Inner: BlockImport<Block, Error = ConsensusError, Transaction = sp_api::TransactionFor<Client, Block>>
+ Send + Sync + 'static,
Inner: BlockImport<
Block,
Error = ConsensusError,
Transaction = sp_api::TransactionFor<Client, Block>
> + Send + Sync + 'static,
Client: ProvideRuntimeApi<Block> + ProvideCache<Block> + HeaderBackend<Block>
+ HeaderMetadata<Block, Error = sp_blockchain::Error> + AuxStore
+ Send + Sync + 'static,
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use sp_blockchain::{Error as ClientError, HeaderMetadata};
use sp_consensus::{BlockStatus,
block_validation::{BlockAnnounceValidator, Validation},
};
use sc_consensus_api::{block_import::BlockOrigin, import_queue::{IncomingBlock, BlockImportStatus, BlockImportError}};
use sc_consensus_api::{BlockOrigin, IncomingBlock, BlockImportStatus, BlockImportError};
use crate::protocol::message::{
self, BlockAnnounce, BlockAttributes, BlockRequest, BlockResponse,
};
Expand Down
81 changes: 55 additions & 26 deletions client/service/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,25 +127,27 @@ pub struct Client<B, E, Block, RA> where Block: BlockT {
_phantom: PhantomData<RA>,
}

// used in importing a block, where additional changes are made after the runtime
// executed.
/// Used in importing a block, where additional changes are made after the runtime
/// executed.
enum PrePostHeader<H> {
// they are the same: no post-runtime digest items.
/// they are the same: no post-runtime digest items.
Same(H),
// different headers (pre, post).
/// different headers (pre, post).
Different(H, H),
}

impl<H> PrePostHeader<H> {
// get a reference to the "post-header" -- the header as it should be after all changes are applied.
/// get a reference to the "post-header" -- the header as it should be
/// after all changes are applied.
fn post(&self) -> &H {
match *self {
PrePostHeader::Same(ref h) => h,
PrePostHeader::Different(_, ref h) => h,
}
}

// convert to the "post-header" -- the header as it should be after all changes are applied.
/// convert to the "post-header" -- the header as it should be after
/// all changes are applied.
fn into_post(self) -> H {
match self {
PrePostHeader::Same(h) => h,
Expand Down Expand Up @@ -238,7 +240,9 @@ pub fn new_with_backend<B, E, Block, S, RA>(
Block: BlockT,
B: backend::LocalBackend<Block> + 'static,
{
let call_executor = LocalCallExecutor::new(backend.clone(), executor, spawn_handle, config.clone())?;
let call_executor = LocalCallExecutor::new(
backend.clone(), executor, spawn_handle, config.clone())?;

let extensions = ExecutionExtensions::new(
Default::default(),
keystore,
Expand Down Expand Up @@ -413,7 +417,9 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
id: &BlockId<Block>,
cht_size: NumberFor<Block>,
) -> sp_blockchain::Result<(Block::Header, StorageProof)> {
let proof_error = || sp_blockchain::Error::Backend(format!("Failed to generate header proof for {:?}", id));
let proof_error = || sp_blockchain::Error::Backend(
format!("Failed to generate header proof for {:?}", id));

let header = self.backend.blockchain().expect_header(*id)?;
let block_num = *header.number();
let cht_num = cht::block_to_cht_number(cht_size, block_num).ok_or_else(proof_error)?;
Expand Down Expand Up @@ -574,7 +580,8 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
Ok(StorageProof::merge(proofs))
}

/// Generates CHT-based proof for roots of changes tries at given blocks (that are part of single CHT).
/// Generates CHT-based proof for roots of changes tries at given blocks
/// (that are part of single CHT).
fn changes_trie_roots_proof_at_cht(
&self,
cht_size: NumberFor<Block>,
Expand Down Expand Up @@ -602,11 +609,12 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
Ok(proof)
}

/// Returns changes trie storage and all configurations that have been active in the range [first; last].
/// Returns changes trie storage and all configurations that have been active
/// in the range [first; last].
///
/// Configurations are returned in descending order (and obviously never overlap).
/// If fail_if_disabled is false, returns maximal consequent configurations ranges, starting from last and
/// stopping on either first, or when CT have been disabled.
/// If fail_if_disabled is false, returns maximal consequent configurations ranges,
/// starting from last and stopping on either first, or when CT have been disabled.
/// If fail_if_disabled is true, fails when there's a subrange where CT have been disabled
/// inside first..last blocks range.
fn require_changes_trie(
Expand Down Expand Up @@ -635,7 +643,8 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
break;
}

current = *self.backend.blockchain().expect_header(BlockId::Hash(config_range.zero.1))?.parent_hash();
current = *self.backend.blockchain()
.expect_header(BlockId::Hash(config_range.zero.1))?.parent_hash();
}

Ok((storage, configs))
Expand All @@ -648,7 +657,9 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
operation: &mut ClientImportOperation<Block, B>,
import_block: BlockImportParams<Block, backend::TransactionFor<B, Block>>,
new_cache: HashMap<CacheKeyId, Vec<u8>>,
storage_changes: Option<sc_consensus_api::StorageChanges<Block, backend::TransactionFor<B, Block>>>,
storage_changes: Option<
sc_consensus_api::StorageChanges<Block, backend::TransactionFor<B, Block>>
>,
) -> sp_blockchain::Result<ImportResult> where
Self: ProvideRuntimeApi<Block>,
<Self as ProvideRuntimeApi<Block>>::Api: CoreApi<Block> +
Expand Down Expand Up @@ -737,7 +748,9 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
import_headers: PrePostHeader<Block::Header>,
justifications: Option<Justifications>,
body: Option<Vec<Block::Extrinsic>>,
storage_changes: Option<sc_consensus_api::StorageChanges<Block, backend::TransactionFor<B, Block>>>,
storage_changes: Option<
sc_consensus_api::StorageChanges<Block, backend::TransactionFor<B, Block>>
>,
new_cache: HashMap<CacheKeyId, Vec<u8>>,
finalized: bool,
aux: Vec<(Vec<u8>, Option<Vec<u8>>)>,
Expand Down Expand Up @@ -880,7 +893,8 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where

operation.op.insert_aux(aux)?;

// we only notify when we are already synced to the tip of the chain or if this import triggers a re-org
// we only notify when we are already synced to the tip of the chain
// or if this import triggers a re-org
if make_notifications || tree_route.is_some() {
if finalized {
operation.notify_finalized.push(hash);
Expand Down Expand Up @@ -917,8 +931,10 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
let at = BlockId::Hash(*parent_hash);
let state_action = std::mem::replace(&mut import_block.state_action, StateAction::Skip);
let (enact_state, storage_changes) = match (self.block_status(&at)?, state_action) {
(BlockStatus::Unknown, _) => return Ok(PrepareStorageChangesResult::Discard(ImportResult::UnknownParent)),
(BlockStatus::KnownBad, _) => return Ok(PrepareStorageChangesResult::Discard(ImportResult::KnownBad)),
(BlockStatus::Unknown, _) =>
return Ok(PrepareStorageChangesResult::Discard(ImportResult::UnknownParent)),
(BlockStatus::KnownBad, _) =>
return Ok(PrepareStorageChangesResult::Discard(ImportResult::KnownBad)),
(_, StateAction::Skip) => (false, None),
(BlockStatus::InChainPruned, StateAction::ApplyChanges(sc_consensus_api::StorageChanges::Changes(_))) =>
return Ok(PrepareStorageChangesResult::Discard(ImportResult::MissingState)),
Expand Down Expand Up @@ -990,11 +1006,13 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
let last_finalized = self.backend.blockchain().last_finalized()?;

if block == last_finalized {
warn!("Possible safety violation: attempted to re-finalize last finalized block {:?} ", last_finalized);
warn!("Possible safety violation: attempted to re-finalize last finalized block {:?} ",
last_finalized);
return Ok(());
}

let route_from_finalized = sp_blockchain::tree_route(self.backend.blockchain(), last_finalized, block)?;
let route_from_finalized = sp_blockchain::tree_route(
self.backend.blockchain(), last_finalized, block)?;

if let Some(retracted) = route_from_finalized.retracted().get(0) {
warn!("Safety violation: attempted to revert finalized block {:?} which is not in the \
Expand Down Expand Up @@ -1197,12 +1215,16 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
}

/// Get block header by id.
pub fn header(&self, id: &BlockId<Block>) -> sp_blockchain::Result<Option<<Block as BlockT>::Header>> {
pub fn header(&self, id: &BlockId<Block>)
-> sp_blockchain::Result<Option<<Block as BlockT>::Header>>
{
self.backend.blockchain().header(*id)
}

/// Get block body by id.
pub fn body(&self, id: &BlockId<Block>) -> sp_blockchain::Result<Option<Vec<<Block as BlockT>::Extrinsic>>> {
pub fn body(&self, id: &BlockId<Block>)
-> sp_blockchain::Result<Option<Vec<<Block as BlockT>::Extrinsic>>>
{
self.backend.blockchain().body(*id)
}

Expand Down Expand Up @@ -1314,7 +1336,9 @@ impl<B, E, Block, RA> ProofProvider<Block> for Client<B, E, Block, RA> where
})
}

fn header_proof(&self, id: &BlockId<Block>) -> sp_blockchain::Result<(Block::Header, StorageProof)> {
fn header_proof(&self, id: &BlockId<Block>)
-> sp_blockchain::Result<(Block::Header, StorageProof)>
{
self.header_proof_with_cht_size(id, cht::size())
}

Expand Down Expand Up @@ -1408,7 +1432,8 @@ impl<B, E, Block, RA> BlockBuilderProvider<B, Block, Self> for Client<B, E, Bloc
E: CallExecutor<Block> + Send + Sync + 'static,
Block: BlockT,
Self: ChainHeaderBackend<Block> + ProvideRuntimeApi<Block>,
<Self as ProvideRuntimeApi<Block>>::Api: ApiExt<Block, StateBackend = backend::StateBackendFor<B, Block>>
<Self as ProvideRuntimeApi<Block>>::Api:
ApiExt<Block, StateBackend = backend::StateBackendFor<B, Block>>
+ BlockBuilderApi<Block>,
{
fn new_block_at<R: Into<RecordProof>>(
Expand Down Expand Up @@ -1464,7 +1489,9 @@ impl<B, E, Block, RA> StorageProvider<Block, B> for Client<B, E, Block, RA> wher
E: CallExecutor<Block>,
Block: BlockT,
{
fn storage_keys(&self, id: &BlockId<Block>, key_prefix: &StorageKey) -> sp_blockchain::Result<Vec<StorageKey>> {
fn storage_keys(&self, id: &BlockId<Block>, key_prefix: &StorageKey)
-> sp_blockchain::Result<Vec<StorageKey>>
{
let keys = self.state_at(id)?.keys(&key_prefix.0).into_iter().map(StorageKey).collect();
Ok(keys)
}
Expand Down Expand Up @@ -1658,7 +1685,9 @@ impl<B, E, Block, RA> ProvideUncles<Block> for Client<B, E, Block, RA> where
E: CallExecutor<Block>,
Block: BlockT,
{
fn uncles(&self, target_hash: Block::Hash, max_generation: NumberFor<Block>) -> sp_blockchain::Result<Vec<Block::Header>> {
fn uncles(&self, target_hash: Block::Hash, max_generation: NumberFor<Block>)
-> sp_blockchain::Result<Vec<Block::Header>>
{
Ok(Client::uncles(self, target_hash, max_generation)?
.into_iter()
.filter_map(|hash| Client::header(self, &BlockId::Hash(hash)).unwrap_or(None))
Expand Down

0 comments on commit b59e1cf

Please sign in to comment.