Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Make InstantSeal Instant again #11186

Merged
merged 13 commits into from
Nov 10, 2019
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ethcore/client-traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ pub trait BlockChain: ChainInfo + BlockInfo + TransactionInfo {}
/// Client facilities used by internally sealing Engines.
pub trait EngineClient: Sync + Send + ChainInfo {
/// Make a new block and seal it.
fn update_sealing(&self);
fn update_sealing(&self) -> bool;
seunlanlege marked this conversation as resolved.
Show resolved Hide resolved

/// Submit a seal for a block in the mining queue.
fn submit_seal(&self, block_hash: H256, seal: Vec<Bytes>);
Expand Down
1 change: 1 addition & 0 deletions ethcore/engines/instant-seal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ license = "GPL-3.0"
common-types = { path = "../../types" }
engine = { path = "../../engine" }
ethjson = { path = "../../../json" }
client-traits = { path = "../../client-traits" }
seunlanlege marked this conversation as resolved.
Show resolved Hide resolved
ethereum-types = "0.8.0"
keccak-hash = "0.4.0"
machine = { path = "../../machine" }
Expand Down
6 changes: 6 additions & 0 deletions ethcore/engines/instant-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use common_types::{
};
use engine::Engine;
use ethjson;
use std::sync::Weak;
use client_traits::EngineClient;
use machine::{
ExecutedBlock,
Machine
Expand Down Expand Up @@ -74,6 +76,10 @@ impl Engine for InstantSeal {

fn sealing_state(&self) -> SealingState { SealingState::Ready }

fn register_client(&self, _client: Weak<dyn EngineClient>) {

}
seunlanlege marked this conversation as resolved.
Show resolved Hide resolved

fn generate_seal(&self, block: &ExecutedBlock, _parent: &Header) -> Seal {
if !block.transactions.is_empty() {
let block_number = block.header.number();
Expand Down
2 changes: 1 addition & 1 deletion ethcore/light/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ impl<T: ChainDataFetcher> client_traits::ChainInfo for Client<T> {
}

impl<T: ChainDataFetcher> client_traits::EngineClient for Client<T> {
fn update_sealing(&self) { }
fn update_sealing(&self) -> bool { false }
fn submit_seal(&self, _block_hash: H256, _seal: Vec<Vec<u8>>) { }
fn broadcast_consensus_message(&self, _message: Vec<u8>) { }

Expand Down
60 changes: 45 additions & 15 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2367,7 +2367,9 @@ impl ImportSealedBlock for Client {
let raw = block.rlp_bytes();
let header = block.header.clone();
let hash = header.hash();
self.notify(|n| n.block_pre_import(&raw, &hash, header.difficulty()));
self.notify(|n| {
n.block_pre_import(&raw, &hash, header.difficulty())
});

let route = {
// Do a super duper basic verification to detect potential bugs
Expand Down Expand Up @@ -2455,19 +2457,22 @@ impl ::miner::TransactionVerifierClient for Client {}
impl ::miner::BlockChainClient for Client {}

impl client_traits::EngineClient for Client {
fn update_sealing(&self) {
fn update_sealing(&self) -> bool {
self.importer.miner.update_sealing(self)
}

fn submit_seal(&self, block_hash: H256, seal: Vec<Bytes>) {
let import = self.importer.miner.submit_seal(block_hash, seal).and_then(|block| self.import_sealed_block(block));
let import = self.importer.miner.submit_seal(block_hash, seal)
.and_then(|block| self.import_sealed_block(block));
if let Err(err) = import {
warn!(target: "poa", "Wrong internal seal submission! {:?}", err);
}
}

fn broadcast_consensus_message(&self, message: Bytes) {
self.notify(|notify| notify.broadcast(ChainMessageType::Consensus(message.clone())));
self.notify(|notify| {
notify.broadcast(ChainMessageType::Consensus(message.clone()))
});
}

fn epoch_transition_for(&self, parent_hash: H256) -> Option<EpochTransition> {
Expand Down Expand Up @@ -2593,13 +2598,21 @@ impl ImportExportBlocks for Client {
if i % 10000 == 0 {
info!("#{}", i);
}
let b = self.block(BlockId::Number(i)).ok_or("Error exporting incomplete chain")?.into_inner();
let b = self.block(BlockId::Number(i))
.ok_or("Error exporting incomplete chain")?
.into_inner();
match format {
DataFormat::Binary => {
out.write(&b).map_err(|e| format!("Couldn't write to stream. Cause: {}", e))?;
out.write(&b)
.map_err(|e| {
format!("Couldn't write to stream. Cause: {}", e)
})?;
}
DataFormat::Hex => {
out.write_fmt(format_args!("{}\n", b.pretty())).map_err(|e| format!("Couldn't write to stream. Cause: {}", e))?;
out.write_fmt(format_args!("{}\n", b.pretty()))
.map_err(|e| {
format!("Couldn't write to stream. Cause: {}", e)
})?;
}
}
}
Expand All @@ -2619,7 +2632,10 @@ impl ImportExportBlocks for Client {
let format = match format {
Some(format) => format,
None => {
first_read = source.read(&mut first_bytes).map_err(|_| "Error reading from the file/stream.")?;
first_read = source.read(&mut first_bytes)
.map_err(|_| {
"Error reading from the file/stream."
})?;
match first_bytes[0] {
0xf9 => DataFormat::Binary,
_ => DataFormat::Hex,
Expand All @@ -2630,7 +2646,9 @@ impl ImportExportBlocks for Client {
let do_import = |bytes: Vec<u8>| {
let block = Unverified::from_rlp(bytes).map_err(|_| "Invalid block rlp")?;
let number = block.header.number();
while self.queue_info().is_full() { std::thread::sleep(Duration::from_secs(1)); }
while self.queue_info().is_full() {
std::thread::sleep(Duration::from_secs(1));
}
match self.import_block(block) {
Err(EthcoreError::Import(ImportError::AlreadyInChain)) => {
trace!("Skipping block #{}: already in chain.", number);
Expand All @@ -2651,33 +2669,45 @@ impl ImportExportBlocks for Client {
} else {
let mut bytes = vec![0; READAHEAD_BYTES];
let n = source.read(&mut bytes)
.map_err(|err| format!("Error reading from the file/stream: {:?}", err))?;
.map_err(|err| {
format!("Error reading from the file/stream: {:?}", err)
})?;
(bytes, n)
};
if n == 0 { break; }
first_read = 0;
let s = PayloadInfo::from(&bytes)
.map_err(|e| format!("Invalid RLP in the file/stream: {:?}", e))?.total();
.map_err(|e| {
format!("Invalid RLP in the file/stream: {:?}", e)
})?.total();
bytes.resize(s, 0);
source.read_exact(&mut bytes[n..])
.map_err(|err| format!("Error reading from the file/stream: {:?}", err))?;
.map_err(|err| {
format!("Error reading from the file/stream: {:?}", err)
})?;
do_import(bytes)?;
}
}
DataFormat::Hex => {
for line in BufReader::new(source).lines() {
let s = line
.map_err(|err| format!("Error reading from the file/stream: {:?}", err))?;
.map_err(|err| {
format!("Error reading from the file/stream: {:?}", err)
})?;
let s = if first_read > 0 {
from_utf8(&first_bytes)
.map_err(|err| format!("Invalid UTF-8: {:?}", err))?
.map_err(|err| {
format!("Invalid UTF-8: {:?}", err)
})?
.to_owned() + &(s[..])
} else {
s
};
first_read = 0;
let bytes = s.from_hex()
.map_err(|err| format!("Invalid hex in file/stream: {:?}", err))?;
.map_err(|err| {
format!("Invalid hex in file/stream: {:?}", err)
})?;
do_import(bytes)?;
}
}
Expand Down
43 changes: 27 additions & 16 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ impl Miner {
let tx_queue_strategy = options.tx_queue_strategy;
let nonce_cache_size = cmp::max(4096, limits.max_count / 4);
let refuse_service_transactions = options.refuse_service_transactions;
let engine = spec.engine.clone();

Miner {
sealing: Mutex::new(SealingWork {
Expand All @@ -312,7 +313,7 @@ impl Miner {
options,
transaction_queue: Arc::new(TransactionQueue::new(limits, verifier_options, tx_queue_strategy)),
accounts: Arc::new(accounts),
engine: spec.engine.clone(),
engine,
io_channel: RwLock::new(None),
service_transaction_checker: if refuse_service_transactions {
None
Expand Down Expand Up @@ -865,12 +866,12 @@ impl Miner {
match self.engine.sealing_state() {
SealingState::Ready => {
self.maybe_enable_sealing();
self.update_sealing(chain)
self.update_sealing(chain);
}
SealingState::External => {
// this calls `maybe_enable_sealing()`
if self.prepare_pending_block(chain) == BlockPreparationStatus::NotPrepared {
self.update_sealing(chain)
self.update_sealing(chain);
}
}
SealingState::NotReady => { self.maybe_enable_sealing(); },
Expand Down Expand Up @@ -1263,20 +1264,20 @@ impl miner::MinerService for Miner {

/// Update sealing if required.
/// Prepare the block and work if the Engine does not seal internally.
seunlanlege marked this conversation as resolved.
Show resolved Hide resolved
fn update_sealing<C>(&self, chain: &C) where
fn update_sealing<C>(&self, chain: &C) -> bool where
C: BlockChain + CallContract + BlockProducer + SealedBlockImporter + Nonce + Sync,
{
trace!(target: "miner", "update_sealing");

// Do nothing if reseal is not required,
// but note that `requires_reseal` updates internal state.
if !self.requires_reseal(chain.chain_info().best_block_number) {
return;
return false;
}

let sealing_state = self.engine.sealing_state();
if sealing_state == SealingState::NotReady {
return;
return false;
}

// --------------------------------------------------------------------------
Expand All @@ -1286,7 +1287,7 @@ impl miner::MinerService for Miner {
trace!(target: "miner", "update_sealing: preparing a block");
let (block, original_work_hash) = match self.prepare_block(chain) {
Some((block, original_work_hash)) => (block, original_work_hash),
None => return,
None => return false,
};

// refuse to seal the first block of the chain if it contains hard forks
Expand All @@ -1295,23 +1296,27 @@ impl miner::MinerService for Miner {
if let Some(name) = self.engine.params().nonzero_bugfix_hard_fork() {
warn!("Your chain specification contains one or more hard forks which are required to be \
on by default. Please remove these forks and start your chain again: {}.", name);
return;
return false;
}
}

match sealing_state {
SealingState::Ready => {
trace!(target: "miner", "update_sealing: engine indicates internal sealing");
if self.seal_and_import_block_internally(chain, block) {
let seal_and_import_block_internally = self.seal_and_import_block_internally(chain, block);
if seal_and_import_block_internally {
trace!(target: "miner", "update_sealing: imported internally sealed block");
}
return seal_and_import_block_internally
},
SealingState::NotReady => unreachable!("We returned right after sealing_state was computed. qed."),
SealingState::External => {
trace!(target: "miner", "update_sealing: engine does not seal internally, preparing work");
self.prepare_work(block, original_work_hash)
self.prepare_work(block, original_work_hash);
},
}
};

false
}

fn is_currently_sealing(&self) -> bool {
Expand Down Expand Up @@ -1441,7 +1446,7 @@ impl miner::MinerService for Miner {
let engine = self.engine.clone();
let accounts = self.accounts.clone();
let service_transaction_checker = self.service_transaction_checker.clone();

let cloned_tx_queue = self.transaction_queue.clone();
seunlanlege marked this conversation as resolved.
Show resolved Hide resolved
let cull = move |chain: &Client| {
let client = PoolClient::new(
chain,
Expand All @@ -1451,8 +1456,11 @@ impl miner::MinerService for Miner {
service_transaction_checker.as_ref(),
);
queue.cull(client);
if is_internal_import {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this previously true only for instant seal engine? otherwise, should it be something like

				if engine.should_reseal_on_update() {
						// force update_sealing here to skip `reseal_required` checks
						chain.update_sealing(ForceUpdateSealing::Yes);
				} else if is_internal_import {
						chain.update_sealing(ForceUpdateSealing::No);
				}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it was only for InstantSeal.

chain.update_sealing();
if cloned_tx_queue.all_transaction_hashes().len() > 0 {
seunlanlege marked this conversation as resolved.
Show resolved Hide resolved
if !chain.update_sealing() {
seunlanlege marked this conversation as resolved.
Show resolved Hide resolved
// TODO: call some method that waits `reseal_min_period` and re-tries
// `update_sealing` again
}
}
};

Expand All @@ -1461,8 +1469,11 @@ impl miner::MinerService for Miner {
}
} else {
self.transaction_queue.cull(client);
if is_internal_import {
self.update_sealing(chain);
if self.transaction_queue.all_transaction_hashes().len() > 0 {
seunlanlege marked this conversation as resolved.
Show resolved Hide resolved
if !self.update_sealing(chain) {
// TODO: call some method that waits `reseal_min_period` and tries
// `update_sealing` again
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/miner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub trait MinerService : Send + Sync {
where C: BlockChain + CallContract + BlockProducer + SealedBlockImporter + Nonce + Sync;

/// Update current pending block
fn update_sealing<C>(&self, chain: &C)
fn update_sealing<C>(&self, chain: &C) -> bool
where C: BlockChain + CallContract + BlockProducer + SealedBlockImporter + Nonce + Sync;

// Notifications
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/test_helpers/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ impl ProvingBlockChainClient for TestBlockChainClient {
}

impl client_traits::EngineClient for TestBlockChainClient {
fn update_sealing(&self) {
fn update_sealing(&self) -> bool {
self.miner.update_sealing(self)
}

Expand Down
12 changes: 9 additions & 3 deletions miner/src/pool/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,17 @@ impl TransactionsPoolNotifier {
.map(|(hash, _)| hash)
.collect()
);
self.pending_listeners.retain(|listener| listener.unbounded_send(to_pending_send.clone()).is_ok());
self.pending_listeners.retain(|listener| {
listener.unbounded_send(to_pending_send.clone()).is_ok()
});

let to_full_send = Arc::new(std::mem::replace(&mut self.tx_statuses, Vec::new()));
let to_full_send = Arc::new(
std::mem::replace(&mut self.tx_statuses, Vec::new())
);
self.full_listeners
.retain(|listener| listener.unbounded_send(to_full_send.clone()).is_ok());
.retain(|listener| {
listener.unbounded_send(to_full_send.clone()).is_ok()
});
}
}

Expand Down