Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Fix parallel transactions race-condition #10995

Merged
merged 20 commits into from
Sep 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 16 additions & 4 deletions ethcore/engines/instant-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.

use std::sync::atomic::{AtomicU64, Ordering};

use common_types::{
header::Header,
engines::{
Expand Down Expand Up @@ -51,6 +53,7 @@ impl From<ethjson::spec::InstantSealParams> for InstantSealParams {
pub struct InstantSeal {
params: InstantSealParams,
machine: Machine,
last_sealed_block: AtomicU64,
}

impl InstantSeal {
Expand All @@ -59,6 +62,7 @@ impl InstantSeal {
InstantSeal {
params,
machine,
last_sealed_block: AtomicU64::new(0),
}
}
}
Expand All @@ -71,11 +75,19 @@ impl Engine for InstantSeal {
fn sealing_state(&self) -> SealingState { SealingState::Ready }

fn generate_seal(&self, block: &ExecutedBlock, _parent: &Header) -> Seal {
if block.transactions.is_empty() {
Seal::None
} else {
Seal::Regular(Vec::new())
if !block.transactions.is_empty() {
let block_number = block.header.number();
let last_sealed_block = self.last_sealed_block.load(Ordering::SeqCst);
// Return a regular seal if the given block is _higher_ than
// the last sealed one
if block_number > last_sealed_block {
let prev_last_sealed_block = self.last_sealed_block.compare_and_swap(last_sealed_block, block_number, Ordering::SeqCst);
if prev_last_sealed_block == last_sealed_block {
return Seal::Regular(Vec::new())
}
}
}
Seal::None
}

fn verify_local_seal(&self, _header: &Header) -> Result<(), Error> {
Expand Down
2 changes: 1 addition & 1 deletion ethcore/private-tx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ state-db = { path = "../state-db" }
time-utils = { path = "../../util/time-utils" }
tiny-keccak = "1.4"
trace = { path = "../trace" }
transaction-pool = "2.0"
transaction-pool = "2.0.1"
url = "1"
vm = { path = "../vm" }

Expand Down
13 changes: 10 additions & 3 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use futures::sync::mpsc;
use io::IoChannel;
use miner::filter_options::{FilterOptions, FilterOperator};
use miner::pool_client::{PoolClient, CachedNonceClient, NonceCache};
use miner;
use miner::{self, MinerService};
use parking_lot::{Mutex, RwLock};
use rayon::prelude::*;
use types::{
Expand All @@ -58,7 +58,7 @@ use using_queue::{UsingQueue, GetAction};

use block::{ClosedBlock, SealedBlock};
use client::{BlockProducer, SealedBlockImporter, Client};
use client_traits::{BlockChain, ChainInfo, Nonce, TransactionInfo};
use client_traits::{BlockChain, ChainInfo, EngineClient, Nonce, TransactionInfo};
use engine::{Engine, signer::EngineSigner};
use machine::executive::contract_address;
use spec::Spec;
Expand Down Expand Up @@ -859,9 +859,9 @@ impl Miner {
false
}
}

/// Prepare pending block, check whether sealing is needed, and then update sealing.
fn prepare_and_update_sealing<C: miner::BlockChainClient>(&self, chain: &C) {
use miner::MinerService;
match self.engine.sealing_state() {
SealingState::Ready => {
self.maybe_enable_sealing();
Expand Down Expand Up @@ -1429,15 +1429,22 @@ impl miner::MinerService for Miner {
service_transaction_checker.as_ref(),
);
queue.cull(client);
if is_internal_import {
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
chain.update_sealing();
}
};

if let Err(e) = channel.send(ClientIoMessage::<Client>::execute(cull)) {
warn!(target: "miner", "Error queueing cull: {:?}", e);
}
} else {
self.transaction_queue.cull(client);
if is_internal_import {
self.update_sealing(chain);
}
}
}

if let Some(ref service_transaction_checker) = self.service_transaction_checker {
match service_transaction_checker.refresh_cache(chain) {
Ok(true) => {
Expand Down
2 changes: 1 addition & 1 deletion miner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
trace-time = "0.1"
transaction-pool = "2.0"
transaction-pool = "2.0.1"

[dev-dependencies]
env_logger = "0.5"
Expand Down
2 changes: 1 addition & 1 deletion rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fake-fetch = { path = "../util/fake-fetch" }
macros = { path = "../util/macros" }
spec = { path = "../ethcore/spec" }
pretty_assertions = "0.1"
transaction-pool = "2.0"
transaction-pool = "2.0.1"
verification = { path = "../ethcore/verification" }

[features]
Expand Down