Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
update_sealing if there are transactions in pool after impoerting a b…
Browse files Browse the repository at this point in the history
…lock, some line formatting
  • Loading branch information
seunlanlege committed Oct 24, 2019
1 parent 4a95a09 commit 078de7e
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 45 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ethcore/client-traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ pub trait BlockChain: ChainInfo + BlockInfo + TransactionInfo {}
/// Client facilities used by internally sealing Engines.
pub trait EngineClient: Sync + Send + ChainInfo {
/// Make a new block and seal it.
fn update_sealing(&self);
fn update_sealing(&self) -> bool;

/// Submit a seal for a block in the mining queue.
fn submit_seal(&self, block_hash: H256, seal: Vec<Bytes>);
Expand Down
1 change: 1 addition & 0 deletions ethcore/engines/instant-seal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ license = "GPL-3.0"
common-types = { path = "../../types" }
engine = { path = "../../engine" }
ethjson = { path = "../../../json" }
client-traits = { path = "../../client-traits" }
ethereum-types = "0.8.0"
keccak-hash = "0.4.0"
machine = { path = "../../machine" }
Expand Down
6 changes: 6 additions & 0 deletions ethcore/engines/instant-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use common_types::{
};
use engine::Engine;
use ethjson;
use std::sync::Weak;
use client_traits::EngineClient;
use machine::{
ExecutedBlock,
Machine
Expand Down Expand Up @@ -74,6 +76,10 @@ impl Engine for InstantSeal {

fn sealing_state(&self) -> SealingState { SealingState::Ready }

fn register_client(&self, _client: Weak<dyn EngineClient>) {

}

fn generate_seal(&self, block: &ExecutedBlock, _parent: &Header) -> Seal {
if !block.transactions.is_empty() {
let block_number = block.header.number();
Expand Down
2 changes: 1 addition & 1 deletion ethcore/light/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ impl<T: ChainDataFetcher> client_traits::ChainInfo for Client<T> {
}

impl<T: ChainDataFetcher> client_traits::EngineClient for Client<T> {
fn update_sealing(&self) { }
fn update_sealing(&self) -> bool { false }
fn submit_seal(&self, _block_hash: H256, _seal: Vec<Vec<u8>>) { }
fn broadcast_consensus_message(&self, _message: Vec<u8>) { }

Expand Down
60 changes: 45 additions & 15 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2367,7 +2367,9 @@ impl ImportSealedBlock for Client {
let raw = block.rlp_bytes();
let header = block.header.clone();
let hash = header.hash();
self.notify(|n| n.block_pre_import(&raw, &hash, header.difficulty()));
self.notify(|n| {
n.block_pre_import(&raw, &hash, header.difficulty())
});

let route = {
// Do a super duper basic verification to detect potential bugs
Expand Down Expand Up @@ -2455,19 +2457,22 @@ impl ::miner::TransactionVerifierClient for Client {}
impl ::miner::BlockChainClient for Client {}

impl client_traits::EngineClient for Client {
fn update_sealing(&self) {
fn update_sealing(&self) -> bool {
self.importer.miner.update_sealing(self)
}

fn submit_seal(&self, block_hash: H256, seal: Vec<Bytes>) {
let import = self.importer.miner.submit_seal(block_hash, seal).and_then(|block| self.import_sealed_block(block));
let import = self.importer.miner.submit_seal(block_hash, seal)
.and_then(|block| self.import_sealed_block(block));
if let Err(err) = import {
warn!(target: "poa", "Wrong internal seal submission! {:?}", err);
}
}

fn broadcast_consensus_message(&self, message: Bytes) {
self.notify(|notify| notify.broadcast(ChainMessageType::Consensus(message.clone())));
self.notify(|notify| {
notify.broadcast(ChainMessageType::Consensus(message.clone()))
});
}

fn epoch_transition_for(&self, parent_hash: H256) -> Option<EpochTransition> {
Expand Down Expand Up @@ -2593,13 +2598,21 @@ impl ImportExportBlocks for Client {
if i % 10000 == 0 {
info!("#{}", i);
}
let b = self.block(BlockId::Number(i)).ok_or("Error exporting incomplete chain")?.into_inner();
let b = self.block(BlockId::Number(i))
.ok_or("Error exporting incomplete chain")?
.into_inner();
match format {
DataFormat::Binary => {
out.write(&b).map_err(|e| format!("Couldn't write to stream. Cause: {}", e))?;
out.write(&b)
.map_err(|e| {
format!("Couldn't write to stream. Cause: {}", e)
})?;
}
DataFormat::Hex => {
out.write_fmt(format_args!("{}\n", b.pretty())).map_err(|e| format!("Couldn't write to stream. Cause: {}", e))?;
out.write_fmt(format_args!("{}\n", b.pretty()))
.map_err(|e| {
format!("Couldn't write to stream. Cause: {}", e)
})?;
}
}
}
Expand All @@ -2619,7 +2632,10 @@ impl ImportExportBlocks for Client {
let format = match format {
Some(format) => format,
None => {
first_read = source.read(&mut first_bytes).map_err(|_| "Error reading from the file/stream.")?;
first_read = source.read(&mut first_bytes)
.map_err(|_| {
"Error reading from the file/stream."
})?;
match first_bytes[0] {
0xf9 => DataFormat::Binary,
_ => DataFormat::Hex,
Expand All @@ -2630,7 +2646,9 @@ impl ImportExportBlocks for Client {
let do_import = |bytes: Vec<u8>| {
let block = Unverified::from_rlp(bytes).map_err(|_| "Invalid block rlp")?;
let number = block.header.number();
while self.queue_info().is_full() { std::thread::sleep(Duration::from_secs(1)); }
while self.queue_info().is_full() {
std::thread::sleep(Duration::from_secs(1));
}
match self.import_block(block) {
Err(EthcoreError::Import(ImportError::AlreadyInChain)) => {
trace!("Skipping block #{}: already in chain.", number);
Expand All @@ -2651,33 +2669,45 @@ impl ImportExportBlocks for Client {
} else {
let mut bytes = vec![0; READAHEAD_BYTES];
let n = source.read(&mut bytes)
.map_err(|err| format!("Error reading from the file/stream: {:?}", err))?;
.map_err(|err| {
format!("Error reading from the file/stream: {:?}", err)
})?;
(bytes, n)
};
if n == 0 { break; }
first_read = 0;
let s = PayloadInfo::from(&bytes)
.map_err(|e| format!("Invalid RLP in the file/stream: {:?}", e))?.total();
.map_err(|e| {
format!("Invalid RLP in the file/stream: {:?}", e)
})?.total();
bytes.resize(s, 0);
source.read_exact(&mut bytes[n..])
.map_err(|err| format!("Error reading from the file/stream: {:?}", err))?;
.map_err(|err| {
format!("Error reading from the file/stream: {:?}", err)
})?;
do_import(bytes)?;
}
}
DataFormat::Hex => {
for line in BufReader::new(source).lines() {
let s = line
.map_err(|err| format!("Error reading from the file/stream: {:?}", err))?;
.map_err(|err| {
format!("Error reading from the file/stream: {:?}", err)
})?;
let s = if first_read > 0 {
from_utf8(&first_bytes)
.map_err(|err| format!("Invalid UTF-8: {:?}", err))?
.map_err(|err| {
format!("Invalid UTF-8: {:?}", err)
})?
.to_owned() + &(s[..])
} else {
s
};
first_read = 0;
let bytes = s.from_hex()
.map_err(|err| format!("Invalid hex in file/stream: {:?}", err))?;
.map_err(|err| {
format!("Invalid hex in file/stream: {:?}", err)
})?;
do_import(bytes)?;
}
}
Expand Down
49 changes: 26 additions & 23 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl Miner {

/// Creates new instance of miner Arc.
pub fn new<A: LocalAccounts + 'static>(
mut options: MinerOptions,
options: MinerOptions,
gas_pricer: GasPricer,
spec: &Spec,
accounts: A,
Expand All @@ -296,13 +296,6 @@ impl Miner {
let refuse_service_transactions = options.refuse_service_transactions;
let engine = spec.engine.clone();

// For the InstantSeal engine, set the reseal_{max & min}_period to zero
// This allows blocks to be sealed Instantly ;)
if engine.name() == "InstantSeal" {
options.reseal_max_period = Duration::from_secs(0);
options.reseal_min_period = Duration::from_secs(0);
}

Miner {
sealing: Mutex::new(SealingWork {
queue: UsingQueue::new(options.work_queue_size),
Expand Down Expand Up @@ -873,12 +866,12 @@ impl Miner {
match self.engine.sealing_state() {
SealingState::Ready => {
self.maybe_enable_sealing();
self.update_sealing(chain)
self.update_sealing(chain);
}
SealingState::External => {
// this calls `maybe_enable_sealing()`
if self.prepare_pending_block(chain) == BlockPreparationStatus::NotPrepared {
self.update_sealing(chain)
self.update_sealing(chain);
}
}
SealingState::NotReady => { self.maybe_enable_sealing(); },
Expand Down Expand Up @@ -1271,20 +1264,20 @@ impl miner::MinerService for Miner {

/// Update sealing if required.
/// Prepare the block and work if the Engine does not seal internally.
fn update_sealing<C>(&self, chain: &C) where
fn update_sealing<C>(&self, chain: &C) -> bool where
C: BlockChain + CallContract + BlockProducer + SealedBlockImporter + Nonce + Sync,
{
trace!(target: "miner", "update_sealing");

// Do nothing if reseal is not required,
// but note that `requires_reseal` updates internal state.
if !self.requires_reseal(chain.chain_info().best_block_number) {
return;
return false;
}

let sealing_state = self.engine.sealing_state();
if sealing_state == SealingState::NotReady {
return;
return false;
}

// --------------------------------------------------------------------------
Expand All @@ -1294,7 +1287,7 @@ impl miner::MinerService for Miner {
trace!(target: "miner", "update_sealing: preparing a block");
let (block, original_work_hash) = match self.prepare_block(chain) {
Some((block, original_work_hash)) => (block, original_work_hash),
None => return,
None => return false,
};

// refuse to seal the first block of the chain if it contains hard forks
Expand All @@ -1303,23 +1296,27 @@ impl miner::MinerService for Miner {
if let Some(name) = self.engine.params().nonzero_bugfix_hard_fork() {
warn!("Your chain specification contains one or more hard forks which are required to be \
on by default. Please remove these forks and start your chain again: {}.", name);
return;
return false;
}
}

match sealing_state {
SealingState::Ready => {
trace!(target: "miner", "update_sealing: engine indicates internal sealing");
if self.seal_and_import_block_internally(chain, block) {
let seal_and_import_block_internally = self.seal_and_import_block_internally(chain, block);
if seal_and_import_block_internally {
trace!(target: "miner", "update_sealing: imported internally sealed block");
}
return seal_and_import_block_internally
},
SealingState::NotReady => unreachable!("We returned right after sealing_state was computed. qed."),
SealingState::External => {
trace!(target: "miner", "update_sealing: engine does not seal internally, preparing work");
self.prepare_work(block, original_work_hash)
self.prepare_work(block, original_work_hash);
},
}
};

false
}

fn is_currently_sealing(&self) -> bool {
Expand Down Expand Up @@ -1449,7 +1446,7 @@ impl miner::MinerService for Miner {
let engine = self.engine.clone();
let accounts = self.accounts.clone();
let service_transaction_checker = self.service_transaction_checker.clone();

let cloned_tx_queue = self.transaction_queue.clone();
let cull = move |chain: &Client| {
let client = PoolClient::new(
chain,
Expand All @@ -1459,8 +1456,11 @@ impl miner::MinerService for Miner {
service_transaction_checker.as_ref(),
);
queue.cull(client);
if is_internal_import {
chain.update_sealing();
if cloned_tx_queue.all_transaction_hashes().len() > 0 {
if !chain.update_sealing() {
// TODO: call some method that waits `reseal_min_period` and re-tries
// `update_sealing` again
}
}
};

Expand All @@ -1469,8 +1469,11 @@ impl miner::MinerService for Miner {
}
} else {
self.transaction_queue.cull(client);
if is_internal_import {
self.update_sealing(chain);
if self.transaction_queue.all_transaction_hashes().len() > 0 {
if !self.update_sealing(chain) {
// TODO: call some method that waits `reseal_min_period` and tries
// `update_sealing` again
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/miner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub trait MinerService : Send + Sync {
where C: BlockChain + CallContract + BlockProducer + SealedBlockImporter + Nonce + Sync;

/// Update current pending block
fn update_sealing<C>(&self, chain: &C)
fn update_sealing<C>(&self, chain: &C) -> bool
where C: BlockChain + CallContract + BlockProducer + SealedBlockImporter + Nonce + Sync;

// Notifications
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/test_helpers/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ impl ProvingBlockChainClient for TestBlockChainClient {
}

impl client_traits::EngineClient for TestBlockChainClient {
fn update_sealing(&self) {
fn update_sealing(&self) -> bool {
self.miner.update_sealing(self)
}

Expand Down
12 changes: 9 additions & 3 deletions miner/src/pool/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,17 @@ impl TransactionsPoolNotifier {
.map(|(hash, _)| hash)
.collect()
);
self.pending_listeners.retain(|listener| listener.unbounded_send(to_pending_send.clone()).is_ok());
self.pending_listeners.retain(|listener| {
listener.unbounded_send(to_pending_send.clone()).is_ok()
});

let to_full_send = Arc::new(std::mem::replace(&mut self.tx_statuses, Vec::new()));
let to_full_send = Arc::new(
std::mem::replace(&mut self.tx_statuses, Vec::new())
);
self.full_listeners
.retain(|listener| listener.unbounded_send(to_full_send.clone()).is_ok());
.retain(|listener| {
listener.unbounded_send(to_full_send.clone()).is_ok()
});
}
}

Expand Down

0 comments on commit 078de7e

Please sign in to comment.