Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Make InstantSeal Instant again #11186

Merged
merged 13 commits into from
Nov 10, 2019
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions ethcore/engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ pub trait Engine: Sync + Send {
/// Returns the engine's current sealing state.
fn sealing_state(&self) -> SealingState { SealingState::External }

/// Called in `miner.chain_new_blocks` if the engine wishes to `update_sealing`
/// after a block was recently sealed, and there are local pending tx in the pool.
tomusdrw marked this conversation as resolved.
Show resolved Hide resolved
///
/// returns false by default
fn should_reseal_on_update(&self) -> bool {
false
}

/// Attempt to seal the block internally.
///
/// If `Some` is returned, then you get a valid seal.
Expand Down
3 changes: 3 additions & 0 deletions ethcore/engines/instant-seal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ license = "GPL-3.0"
common-types = { path = "../../types" }
engine = { path = "../../engine" }
ethjson = { path = "../../../json" }
client-traits = { path = "../../client-traits" }
seunlanlege marked this conversation as resolved.
Show resolved Hide resolved
ethereum-types = "0.8.0"
keccak-hash = "0.4.0"
parking_lot = "0.9"
crossbeam-channel = "0.3.9"
machine = { path = "../../machine" }
trace = { path = "../../trace" }

Expand Down
7 changes: 7 additions & 0 deletions ethcore/engines/instant-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ impl Engine for InstantSeal {

fn sealing_state(&self) -> SealingState { SealingState::Ready }

fn should_reseal_on_update(&self) -> bool {
// We would like for the miner to `update_sealing` if there are local_pending_transactions
// in the pool to prevent transactions sent in parallel from stalling in the transaction
// pool. (see #9660)
true
}

fn generate_seal(&self, block: &ExecutedBlock, _parent: &Header) -> Seal {
if !block.transactions.is_empty() {
let block_number = block.header.number();
Expand Down
2 changes: 1 addition & 1 deletion ethcore/light/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ impl<T: ChainDataFetcher> client_traits::ChainInfo for Client<T> {
}

impl<T: ChainDataFetcher> client_traits::EngineClient for Client<T> {
fn update_sealing(&self) { }
fn update_sealing(&self) {}
fn submit_seal(&self, _block_hash: H256, _seal: Vec<Vec<u8>>) { }
fn broadcast_consensus_message(&self, _message: Vec<u8>) { }

Expand Down
58 changes: 44 additions & 14 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2367,7 +2367,9 @@ impl ImportSealedBlock for Client {
let raw = block.rlp_bytes();
let header = block.header.clone();
let hash = header.hash();
self.notify(|n| n.block_pre_import(&raw, &hash, header.difficulty()));
self.notify(|n| {
n.block_pre_import(&raw, &hash, header.difficulty())
});

let route = {
// Do a super duper basic verification to detect potential bugs
Expand Down Expand Up @@ -2460,14 +2462,17 @@ impl client_traits::EngineClient for Client {
}

fn submit_seal(&self, block_hash: H256, seal: Vec<Bytes>) {
let import = self.importer.miner.submit_seal(block_hash, seal).and_then(|block| self.import_sealed_block(block));
let import = self.importer.miner.submit_seal(block_hash, seal)
.and_then(|block| self.import_sealed_block(block));
if let Err(err) = import {
warn!(target: "poa", "Wrong internal seal submission! {:?}", err);
}
}

fn broadcast_consensus_message(&self, message: Bytes) {
self.notify(|notify| notify.broadcast(ChainMessageType::Consensus(message.clone())));
self.notify(|notify| {
notify.broadcast(ChainMessageType::Consensus(message.clone()))
});
}

fn epoch_transition_for(&self, parent_hash: H256) -> Option<EpochTransition> {
Expand Down Expand Up @@ -2593,13 +2598,21 @@ impl ImportExportBlocks for Client {
if i % 10000 == 0 {
info!("#{}", i);
}
let b = self.block(BlockId::Number(i)).ok_or("Error exporting incomplete chain")?.into_inner();
let b = self.block(BlockId::Number(i))
.ok_or("Error exporting incomplete chain")?
.into_inner();
match format {
DataFormat::Binary => {
out.write(&b).map_err(|e| format!("Couldn't write to stream. Cause: {}", e))?;
out.write(&b)
.map_err(|e| {
format!("Couldn't write to stream. Cause: {}", e)
})?;
}
DataFormat::Hex => {
out.write_fmt(format_args!("{}\n", b.pretty())).map_err(|e| format!("Couldn't write to stream. Cause: {}", e))?;
out.write_fmt(format_args!("{}\n", b.pretty()))
.map_err(|e| {
format!("Couldn't write to stream. Cause: {}", e)
})?;
}
}
}
Expand All @@ -2619,7 +2632,10 @@ impl ImportExportBlocks for Client {
let format = match format {
Some(format) => format,
None => {
first_read = source.read(&mut first_bytes).map_err(|_| "Error reading from the file/stream.")?;
first_read = source.read(&mut first_bytes)
.map_err(|_| {
"Error reading from the file/stream."
})?;
match first_bytes[0] {
0xf9 => DataFormat::Binary,
_ => DataFormat::Hex,
Expand All @@ -2630,7 +2646,9 @@ impl ImportExportBlocks for Client {
let do_import = |bytes: Vec<u8>| {
let block = Unverified::from_rlp(bytes).map_err(|_| "Invalid block rlp")?;
let number = block.header.number();
while self.queue_info().is_full() { std::thread::sleep(Duration::from_secs(1)); }
while self.queue_info().is_full() {
std::thread::sleep(Duration::from_secs(1));
}
match self.import_block(block) {
Err(EthcoreError::Import(ImportError::AlreadyInChain)) => {
trace!("Skipping block #{}: already in chain.", number);
Expand All @@ -2651,33 +2669,45 @@ impl ImportExportBlocks for Client {
} else {
let mut bytes = vec![0; READAHEAD_BYTES];
let n = source.read(&mut bytes)
.map_err(|err| format!("Error reading from the file/stream: {:?}", err))?;
.map_err(|err| {
format!("Error reading from the file/stream: {:?}", err)
})?;
(bytes, n)
};
if n == 0 { break; }
first_read = 0;
let s = PayloadInfo::from(&bytes)
.map_err(|e| format!("Invalid RLP in the file/stream: {:?}", e))?.total();
.map_err(|e| {
format!("Invalid RLP in the file/stream: {:?}", e)
})?.total();
bytes.resize(s, 0);
source.read_exact(&mut bytes[n..])
.map_err(|err| format!("Error reading from the file/stream: {:?}", err))?;
.map_err(|err| {
format!("Error reading from the file/stream: {:?}", err)
})?;
do_import(bytes)?;
}
}
DataFormat::Hex => {
for line in BufReader::new(source).lines() {
let s = line
.map_err(|err| format!("Error reading from the file/stream: {:?}", err))?;
.map_err(|err| {
format!("Error reading from the file/stream: {:?}", err)
})?;
let s = if first_read > 0 {
from_utf8(&first_bytes)
.map_err(|err| format!("Invalid UTF-8: {:?}", err))?
.map_err(|err| {
format!("Invalid UTF-8: {:?}", err)
})?
.to_owned() + &(s[..])
} else {
s
};
first_read = 0;
let bytes = s.from_hex()
.map_err(|err| format!("Invalid hex in file/stream: {:?}", err))?;
.map_err(|err| {
format!("Invalid hex in file/stream: {:?}", err)
})?;
do_import(bytes)?;
}
}
Expand Down
23 changes: 14 additions & 9 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use using_queue::{UsingQueue, GetAction};

use block::{ClosedBlock, SealedBlock};
use client::{BlockProducer, SealedBlockImporter, Client};
use client_traits::{BlockChain, ChainInfo, EngineClient, Nonce, TransactionInfo};
use client_traits::{BlockChain, ChainInfo, Nonce, TransactionInfo, EngineClient};
use engine::{Engine, signer::EngineSigner};
use machine::executive::contract_address;
use spec::Spec;
Expand Down Expand Up @@ -294,6 +294,7 @@ impl Miner {
let tx_queue_strategy = options.tx_queue_strategy;
let nonce_cache_size = cmp::max(4096, limits.max_count / 4);
let refuse_service_transactions = options.refuse_service_transactions;
let engine = spec.engine.clone();

Miner {
sealing: Mutex::new(SealingWork {
Expand All @@ -312,7 +313,7 @@ impl Miner {
options,
transaction_queue: Arc::new(TransactionQueue::new(limits, verifier_options, tx_queue_strategy)),
accounts: Arc::new(accounts),
engine: spec.engine.clone(),
engine,
io_channel: RwLock::new(None),
service_transaction_checker: if refuse_service_transactions {
None
Expand Down Expand Up @@ -865,12 +866,12 @@ impl Miner {
match self.engine.sealing_state() {
SealingState::Ready => {
self.maybe_enable_sealing();
self.update_sealing(chain)
self.update_sealing(chain);
}
SealingState::External => {
// this calls `maybe_enable_sealing()`
if self.prepare_pending_block(chain) == BlockPreparationStatus::NotPrepared {
self.update_sealing(chain)
self.update_sealing(chain);
}
}
SealingState::NotReady => { self.maybe_enable_sealing(); },
Expand Down Expand Up @@ -1305,13 +1306,14 @@ impl miner::MinerService for Miner {
if self.seal_and_import_block_internally(chain, block) {
trace!(target: "miner", "update_sealing: imported internally sealed block");
}
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this return doesn't change anything, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it doesn't

},
SealingState::NotReady => unreachable!("We returned right after sealing_state was computed. qed."),
SealingState::External => {
trace!(target: "miner", "update_sealing: engine does not seal internally, preparing work");
self.prepare_work(block, original_work_hash)
self.prepare_work(block, original_work_hash);
},
}
};
}

fn is_currently_sealing(&self) -> bool {
Expand Down Expand Up @@ -1441,7 +1443,6 @@ impl miner::MinerService for Miner {
let engine = self.engine.clone();
let accounts = self.accounts.clone();
let service_transaction_checker = self.service_transaction_checker.clone();

let cull = move |chain: &Client| {
let client = PoolClient::new(
chain,
Expand All @@ -1451,7 +1452,9 @@ impl miner::MinerService for Miner {
service_transaction_checker.as_ref(),
);
queue.cull(client);
if is_internal_import {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this previously true only for instant seal engine? otherwise, should it be something like

				if engine.should_reseal_on_update() {
						// force update_sealing here to skip `reseal_required` checks
						chain.update_sealing(ForceUpdateSealing::Yes);
				} else if is_internal_import {
						chain.update_sealing(ForceUpdateSealing::No);
				}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it was only for InstantSeal.

if queue.has_local_pending_transactions() &&
seunlanlege marked this conversation as resolved.
Show resolved Hide resolved
engine.should_reseal_on_update()
{
chain.update_sealing();
}
};
Expand All @@ -1461,7 +1464,9 @@ impl miner::MinerService for Miner {
}
} else {
self.transaction_queue.cull(client);
if is_internal_import {
if self.transaction_queue.has_local_pending_transactions() &&
seunlanlege marked this conversation as resolved.
Show resolved Hide resolved
self.engine.should_reseal_on_update()
{
self.update_sealing(chain);
}
}
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/test_helpers/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ impl ProvingBlockChainClient for TestBlockChainClient {
}

impl client_traits::EngineClient for TestBlockChainClient {
fn update_sealing(&self) {
fn update_sealing(&self) {
self.miner.update_sealing(self)
}

Expand Down
12 changes: 9 additions & 3 deletions miner/src/pool/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,17 @@ impl TransactionsPoolNotifier {
.map(|(hash, _)| hash)
.collect()
);
self.pending_listeners.retain(|listener| listener.unbounded_send(to_pending_send.clone()).is_ok());
self.pending_listeners.retain(|listener| {
listener.unbounded_send(to_pending_send.clone()).is_ok()
});

let to_full_send = Arc::new(std::mem::replace(&mut self.tx_statuses, Vec::new()));
let to_full_send = Arc::new(
std::mem::replace(&mut self.tx_statuses, Vec::new())
);
self.full_listeners
.retain(|listener| listener.unbounded_send(to_full_send.clone()).is_ok());
.retain(|listener| {
listener.unbounded_send(to_full_send.clone()).is_ok()
});
}
}

Expand Down