From fe55764596e3699d70185c754582511ee8451395 Mon Sep 17 00:00:00 2001 From: David Palm Date: Thu, 23 May 2019 20:50:47 +0200 Subject: [PATCH 1/7] Use Drop to shutdown stepper thread Make period == 0 an error and remove the Option from step_service --- ethcore/src/engines/clique/mod.rs | 35 +++++++--------------- ethcore/src/engines/clique/step_service.rs | 27 +++++++++-------- ethcore/src/engines/mod.rs | 3 -- 3 files changed, 25 insertions(+), 40 deletions(-) diff --git a/ethcore/src/engines/clique/mod.rs b/ethcore/src/engines/clique/mod.rs index fff6cb3ab67..b4cc6a0f826 100644 --- a/ethcore/src/engines/clique/mod.rs +++ b/ethcore/src/engines/clique/mod.rs @@ -168,7 +168,7 @@ pub struct Clique { block_state_by_hash: RwLock>, proposals: RwLock>, signer: RwLock>>, - step_service: Option, + step_service: StepService, } #[cfg(test)] @@ -181,13 +181,16 @@ pub struct Clique { pub block_state_by_hash: RwLock>, pub proposals: RwLock>, pub signer: RwLock>>, - pub step_service: Option, + pub step_service: StepService, } impl Clique { /// Initialize Clique engine from empty state. pub fn new(params: CliqueParams, machine: EthereumMachine) -> Result, Error> { - let mut engine = Clique { + if params.period == 0 { + return Err("bad params: period can not be 0".into()); + } + let engine = Clique { epoch_length: params.epoch, period: params.period, client: Default::default(), @@ -195,19 +198,12 @@ impl Clique { proposals: Default::default(), signer: Default::default(), machine, - step_service: None, + step_service: StepService::new(), }; - if params.period > 0 { - engine.step_service = Some(StepService::new()); - let engine = Arc::new(engine); - let weak_eng = Arc::downgrade(&engine); - if let Some(step_service) = &engine.step_service { - step_service.start(weak_eng); - } - Ok(engine) - } else { - Ok(Arc::new(engine)) - } + let engine = Arc::new(engine); + let weak_eng = Arc::downgrade(&engine); + engine.step_service.start(weak_eng); + Ok(engine) } #[cfg(test)] @@ -348,15 +344,6 @@ impl Clique { } } -impl Drop for Clique { - fn drop(&mut self) { - if let Some(step_service) = &self.step_service { - trace!(target: "shutdown", "Clique; stopping step service"); - step_service.stop(); - } - } -} - impl Engine for Clique { fn name(&self) -> &str { "Clique" } diff --git a/ethcore/src/engines/clique/step_service.rs b/ethcore/src/engines/clique/step_service.rs index a7c977953ff..1d11a1bbbe1 100644 --- a/ethcore/src/engines/clique/step_service.rs +++ b/ethcore/src/engines/clique/step_service.rs @@ -40,41 +40,42 @@ impl StepService { /// Start the StepService: spawns a thread that loops and triggers a sealing operation every 2sec. pub fn start(&self, engine: Weak>) { + /// Pause before starting to step Clique + const INITIAL_DELAY: Duration = Duration::from_secs(5); + /// Step Clique at most every 2 seconds + const SEALING_FREQ: Duration = Duration::from_secs(2); let shutdown = self.shutdown.clone(); let thr = thread::Builder::new() - .name("CliqueStepService".into()) + .name("StepService".into()) .spawn(move || { - // startup delay. - thread::sleep(Duration::from_secs(5)); + thread::sleep(INITIAL_DELAY); loop { // see if we are in shutdown. if shutdown.load(Ordering::Acquire) { - trace!(target: "shutdown", "CliqueStepService: received shutdown signal!"); + trace!(target: "shutdown", "StepService: received shutdown signal!"); break; } - trace!(target: "miner", "CliqueStepService: triggering sealing"); + trace!(target: "miner", "StepService: triggering sealing"); // Try sealing engine.upgrade().map(|x| x.step()); // Yield - thread::sleep(Duration::from_millis(2000)); + thread::sleep(SEALING_FREQ); } - trace!(target: "shutdown", "CliqueStepService: exited loop, shutdown."); - }).expect("CliqueStepService thread failed"); - + }).expect("StepService thread failed"); *self.thread.write() = Some(thr); } +} - /// Stop the `StepService` - pub fn stop(&self) { - trace!(target: "shutdown", "CliqueStepService: signalling shutting to stepping thread."); +impl Drop for StepService { + fn drop(&mut self) { self.shutdown.store(true, Ordering::Release); if let Some(t) = self.thread.write().take() { - t.join().expect("CliqueStepService thread panicked!"); + t.join().expect("StepService thread panicked!"); } } } diff --git a/ethcore/src/engines/mod.rs b/ethcore/src/engines/mod.rs index 5124f079db2..8b67c5d48a9 100644 --- a/ethcore/src/engines/mod.rs +++ b/ethcore/src/engines/mod.rs @@ -425,9 +425,6 @@ pub trait Engine: Sync + Send { /// Trigger next step of the consensus engine. fn step(&self) {} - /// Stops any services that the may hold the Engine and makes it safe to drop. - fn stop(&mut self) {} - /// Create a factory for building snapshot chunks and restoring from them. /// Returning `None` indicates that this engine doesn't support snapshot creation. fn snapshot_components(&self) -> Option> { From 709e8fe47bde1512fe3c380423ef4c07daba6073 Mon Sep 17 00:00:00 2001 From: David Palm Date: Thu, 23 May 2019 21:40:07 +0200 Subject: [PATCH 2/7] Remove StepService Remove StepService and spawn the stepping thread in `Clique::new()`. Don't store the thread handle and instead trust the `AtomicBool` to signal shutdown time. Don't check for `period > 0`: we assume a valid chainspec file. --- ethcore/src/engines/clique/mod.rs | 42 ++++++++--- ethcore/src/engines/clique/step_service.rs | 81 ---------------------- 2 files changed, 32 insertions(+), 91 deletions(-) delete mode 100644 ethcore/src/engines/clique/step_service.rs diff --git a/ethcore/src/engines/clique/mod.rs b/ethcore/src/engines/clique/mod.rs index b4cc6a0f826..e556838fa60 100644 --- a/ethcore/src/engines/clique/mod.rs +++ b/ethcore/src/engines/clique/mod.rs @@ -61,7 +61,7 @@ use std::cmp; use std::collections::HashMap; use std::collections::VecDeque; -use std::sync::{Arc, Weak}; +use std::sync::{Arc, Weak, atomic::{AtomicBool, Ordering}}; use std::thread; use std::time; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -89,11 +89,9 @@ use time_utils::CheckedSystemTime; use self::block_state::CliqueBlockState; use self::params::CliqueParams; -use self::step_service::StepService; mod params; mod block_state; -mod step_service; mod util; // TODO(niklasad1): extract tester types into a separate mod to be shared in the code base @@ -168,7 +166,7 @@ pub struct Clique { block_state_by_hash: RwLock>, proposals: RwLock>, signer: RwLock>>, - step_service: StepService, + shutdown_stepping: Arc, } #[cfg(test)] @@ -181,15 +179,23 @@ pub struct Clique { pub block_state_by_hash: RwLock>, pub proposals: RwLock>, pub signer: RwLock>>, - pub step_service: StepService, + pub shutdown_stepping: Arc, +} + +impl Drop for Clique { + fn drop(&mut self) { + self.shutdown_stepping.store(true, Ordering::Release); + } } impl Clique { /// Initialize Clique engine from empty state. pub fn new(params: CliqueParams, machine: EthereumMachine) -> Result, Error> { - if params.period == 0 { - return Err("bad params: period can not be 0".into()); - } + /// Pause before starting to step Clique + const INITIAL_DELAY: Duration = Duration::from_secs(5); + /// Step Clique at most every 2 seconds + const SEALING_FREQ: Duration = Duration::from_secs(2); + let engine = Clique { epoch_length: params.epoch, period: params.period, @@ -198,11 +204,27 @@ impl Clique { proposals: Default::default(), signer: Default::default(), machine, - step_service: StepService::new(), + shutdown_stepping: Arc::new(AtomicBool::new(false)), }; let engine = Arc::new(engine); let weak_eng = Arc::downgrade(&engine); - engine.step_service.start(weak_eng); + let shutdown = engine.shutdown_stepping.clone(); + thread::Builder::new().name("StepService".into()) + .spawn(move || { + thread::sleep(INITIAL_DELAY); + loop { + // Check if we are in shutdown. + if shutdown.load(Ordering::Acquire) { + trace!(target: "shutdown", "StepService: received shutdown signal!"); + break; + } + trace!(target: "miner", "StepService: triggering sealing"); + // Try sealing + weak_eng.upgrade().map(|x| x.step()); + // Yield + thread::sleep(SEALING_FREQ); + } + })?; Ok(engine) } diff --git a/ethcore/src/engines/clique/step_service.rs b/ethcore/src/engines/clique/step_service.rs deleted file mode 100644 index 1d11a1bbbe1..00000000000 --- a/ethcore/src/engines/clique/step_service.rs +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2015-2019 Parity Technologies (UK) Ltd. -// This file is part of Parity Ethereum. - -// Parity Ethereum is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Ethereum is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Ethereum. If not, see . - - -use std::sync::Weak; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::Duration; -use std::thread; -use std::sync::Arc; -use parking_lot::RwLock; - -use engines::Engine; -use machine::Machine; - -/// Service that is managing the engine -pub struct StepService { - shutdown: Arc, - thread: RwLock>>, -} - -impl StepService { - /// Create a new StepService without spawning a sealing thread. - pub fn new() -> Self { - let shutdown = Arc::new(AtomicBool::new(false)); - StepService { shutdown, thread: RwLock::new(None) } - } - - /// Start the StepService: spawns a thread that loops and triggers a sealing operation every 2sec. - pub fn start(&self, engine: Weak>) { - /// Pause before starting to step Clique - const INITIAL_DELAY: Duration = Duration::from_secs(5); - /// Step Clique at most every 2 seconds - const SEALING_FREQ: Duration = Duration::from_secs(2); - let shutdown = self.shutdown.clone(); - - let thr = thread::Builder::new() - .name("StepService".into()) - .spawn(move || { - thread::sleep(INITIAL_DELAY); - - loop { - // see if we are in shutdown. - if shutdown.load(Ordering::Acquire) { - trace!(target: "shutdown", "StepService: received shutdown signal!"); - break; - } - - trace!(target: "miner", "StepService: triggering sealing"); - - // Try sealing - engine.upgrade().map(|x| x.step()); - - // Yield - thread::sleep(SEALING_FREQ); - } - }).expect("StepService thread failed"); - *self.thread.write() = Some(thr); - } -} - -impl Drop for StepService { - fn drop(&mut self) { - self.shutdown.store(true, Ordering::Release); - if let Some(t) = self.thread.write().take() { - t.join().expect("StepService thread panicked!"); - } - } -} From 56eb2ac77014af2185319dcf1eb94e4c08f68cd5 Mon Sep 17 00:00:00 2001 From: David Palm Date: Thu, 23 May 2019 21:54:57 +0200 Subject: [PATCH 3/7] Don't shutdown the stepper thread at all, just let it run until exit Also: fix a few warnings and tests --- ethcore/src/engines/clique/mod.rs | 21 ++------------------- ethcore/src/lib.rs | 3 ++- ethcore/src/snapshot/tests/helpers.rs | 1 - 3 files changed, 4 insertions(+), 21 deletions(-) diff --git a/ethcore/src/engines/clique/mod.rs b/ethcore/src/engines/clique/mod.rs index e556838fa60..83d904577b2 100644 --- a/ethcore/src/engines/clique/mod.rs +++ b/ethcore/src/engines/clique/mod.rs @@ -61,7 +61,7 @@ use std::cmp; use std::collections::HashMap; use std::collections::VecDeque; -use std::sync::{Arc, Weak, atomic::{AtomicBool, Ordering}}; +use std::sync::{Arc, Weak}; use std::thread; use std::time; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -166,7 +166,6 @@ pub struct Clique { block_state_by_hash: RwLock>, proposals: RwLock>, signer: RwLock>>, - shutdown_stepping: Arc, } #[cfg(test)] @@ -179,13 +178,6 @@ pub struct Clique { pub block_state_by_hash: RwLock>, pub proposals: RwLock>, pub signer: RwLock>>, - pub shutdown_stepping: Arc, -} - -impl Drop for Clique { - fn drop(&mut self) { - self.shutdown_stepping.store(true, Ordering::Release); - } } impl Clique { @@ -204,24 +196,16 @@ impl Clique { proposals: Default::default(), signer: Default::default(), machine, - shutdown_stepping: Arc::new(AtomicBool::new(false)), }; let engine = Arc::new(engine); let weak_eng = Arc::downgrade(&engine); - let shutdown = engine.shutdown_stepping.clone(); + thread::Builder::new().name("StepService".into()) .spawn(move || { thread::sleep(INITIAL_DELAY); loop { - // Check if we are in shutdown. - if shutdown.load(Ordering::Acquire) { - trace!(target: "shutdown", "StepService: received shutdown signal!"); - break; - } trace!(target: "miner", "StepService: triggering sealing"); - // Try sealing weak_eng.upgrade().map(|x| x.step()); - // Yield thread::sleep(SEALING_FREQ); } })?; @@ -243,7 +227,6 @@ impl Clique { proposals: Default::default(), signer: Default::default(), machine: Spec::new_test_machine(), - step_service: None, } } diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index 463d22fd427..b4e3d3000fe 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -82,7 +82,6 @@ extern crate journaldb; extern crate keccak_hash as hash; extern crate keccak_hasher; extern crate kvdb; -extern crate kvdb_memorydb; extern crate len_caching_lock; extern crate lru_cache; extern crate memory_cache; @@ -113,6 +112,8 @@ extern crate ethcore_accounts as accounts; extern crate ethcore_stratum; #[cfg(any(test, feature = "tempdir"))] extern crate tempdir; +#[cfg(test)] +extern crate kvdb_memorydb; #[cfg(any(test, feature = "kvdb-rocksdb"))] extern crate kvdb_rocksdb; #[cfg(any(test, feature = "blooms-db"))] diff --git a/ethcore/src/snapshot/tests/helpers.rs b/ethcore/src/snapshot/tests/helpers.rs index 817e0249986..5d2e8616b00 100644 --- a/ethcore/src/snapshot/tests/helpers.rs +++ b/ethcore/src/snapshot/tests/helpers.rs @@ -157,7 +157,6 @@ pub fn restore( genesis: &[u8], ) -> Result<(), ::error::Error> { use std::sync::atomic::AtomicBool; - use snappy; let flag = AtomicBool::new(true); let components = engine.snapshot_components().unwrap(); From f12d34831ccce3207cae3f66e31c4e9830365bc9 Mon Sep 17 00:00:00 2001 From: David Palm Date: Thu, 23 May 2019 22:06:43 +0200 Subject: [PATCH 4/7] Put kvdb_memorydb back --- ethcore/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index b4e3d3000fe..463d22fd427 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -82,6 +82,7 @@ extern crate journaldb; extern crate keccak_hash as hash; extern crate keccak_hasher; extern crate kvdb; +extern crate kvdb_memorydb; extern crate len_caching_lock; extern crate lru_cache; extern crate memory_cache; @@ -112,8 +113,6 @@ extern crate ethcore_accounts as accounts; extern crate ethcore_stratum; #[cfg(any(test, feature = "tempdir"))] extern crate tempdir; -#[cfg(test)] -extern crate kvdb_memorydb; #[cfg(any(test, feature = "kvdb-rocksdb"))] extern crate kvdb_rocksdb; #[cfg(any(test, feature = "blooms-db"))] From a403948b51079736ed001bb651266a016efb4514 Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 24 May 2019 11:12:21 +0200 Subject: [PATCH 5/7] Warn&exit when engine is dropped Don't sleep too long! --- ethcore/src/engines/clique/mod.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/ethcore/src/engines/clique/mod.rs b/ethcore/src/engines/clique/mod.rs index 83d904577b2..29376acf277 100644 --- a/ethcore/src/engines/clique/mod.rs +++ b/ethcore/src/engines/clique/mod.rs @@ -64,7 +64,7 @@ use std::collections::VecDeque; use std::sync::{Arc, Weak}; use std::thread; use std::time; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{Instant, Duration, SystemTime, UNIX_EPOCH}; use block::ExecutedBlock; use client::{BlockId, EngineClient}; @@ -204,9 +204,18 @@ impl Clique { .spawn(move || { thread::sleep(INITIAL_DELAY); loop { + let next_step_at = Instant::now() + SEALING_FREQ; trace!(target: "miner", "StepService: triggering sealing"); - weak_eng.upgrade().map(|x| x.step()); - thread::sleep(SEALING_FREQ); + if let Some(eng) = weak_eng.upgrade() { eng.step() } + else { + warn!(target: "shutdown", "StepService: engine is dropped; exiting."); + break; + } + + let now = Instant::now(); + if now < next_step_at { + thread::sleep(next_step_at - now); + } } })?; Ok(engine) From 685b3343c8301420f4e2d2667315c05f86ad16af Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 24 May 2019 13:36:38 +0200 Subject: [PATCH 6/7] Don't delay stepping thread --- ethcore/src/engines/clique/mod.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/ethcore/src/engines/clique/mod.rs b/ethcore/src/engines/clique/mod.rs index 29376acf277..579976a20a1 100644 --- a/ethcore/src/engines/clique/mod.rs +++ b/ethcore/src/engines/clique/mod.rs @@ -183,8 +183,6 @@ pub struct Clique { impl Clique { /// Initialize Clique engine from empty state. pub fn new(params: CliqueParams, machine: EthereumMachine) -> Result, Error> { - /// Pause before starting to step Clique - const INITIAL_DELAY: Duration = Duration::from_secs(5); /// Step Clique at most every 2 seconds const SEALING_FREQ: Duration = Duration::from_secs(2); @@ -202,7 +200,6 @@ impl Clique { thread::Builder::new().name("StepService".into()) .spawn(move || { - thread::sleep(INITIAL_DELAY); loop { let next_step_at = Instant::now() + SEALING_FREQ; trace!(target: "miner", "StepService: triggering sealing"); From 3bc64bcf30ded282ef9bd462797b4aa9729f4f43 Mon Sep 17 00:00:00 2001 From: David Palm Date: Mon, 10 Jun 2019 11:51:52 +0200 Subject: [PATCH 7/7] Better formatting --- ethcore/src/engines/clique/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ethcore/src/engines/clique/mod.rs b/ethcore/src/engines/clique/mod.rs index 579976a20a1..7ced59c07c1 100644 --- a/ethcore/src/engines/clique/mod.rs +++ b/ethcore/src/engines/clique/mod.rs @@ -203,8 +203,9 @@ impl Clique { loop { let next_step_at = Instant::now() + SEALING_FREQ; trace!(target: "miner", "StepService: triggering sealing"); - if let Some(eng) = weak_eng.upgrade() { eng.step() } - else { + if let Some(eng) = weak_eng.upgrade() { + eng.step() + } else { warn!(target: "shutdown", "StepService: engine is dropped; exiting."); break; }