Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Use Drop to shutdown stepper thread
Browse files Browse the repository at this point in the history
Make period == 0 an error and remove the Option from step_service
  • Loading branch information
dvdplm committed May 23, 2019
1 parent 752031a commit fe55764
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 40 deletions.
35 changes: 11 additions & 24 deletions ethcore/src/engines/clique/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ pub struct Clique {
block_state_by_hash: RwLock<LruCache<H256, CliqueBlockState>>,
proposals: RwLock<HashMap<Address, VoteType>>,
signer: RwLock<Option<Box<EngineSigner>>>,
step_service: Option<StepService>,
step_service: StepService,
}

#[cfg(test)]
Expand All @@ -181,33 +181,29 @@ pub struct Clique {
pub block_state_by_hash: RwLock<LruCache<H256, CliqueBlockState>>,
pub proposals: RwLock<HashMap<Address, VoteType>>,
pub signer: RwLock<Option<Box<EngineSigner>>>,
pub step_service: Option<StepService>,
pub step_service: StepService,
}

impl Clique {
/// Initialize Clique engine from empty state.
pub fn new(params: CliqueParams, machine: EthereumMachine) -> Result<Arc<Self>, Error> {
let mut engine = Clique {
if params.period == 0 {
return Err("bad params: period can not be 0".into());
}
let engine = Clique {
epoch_length: params.epoch,
period: params.period,
client: Default::default(),
block_state_by_hash: RwLock::new(LruCache::new(STATE_CACHE_NUM)),
proposals: Default::default(),
signer: Default::default(),
machine,
step_service: None,
step_service: StepService::new(),
};
if params.period > 0 {
engine.step_service = Some(StepService::new());
let engine = Arc::new(engine);
let weak_eng = Arc::downgrade(&engine);
if let Some(step_service) = &engine.step_service {
step_service.start(weak_eng);
}
Ok(engine)
} else {
Ok(Arc::new(engine))
}
let engine = Arc::new(engine);
let weak_eng = Arc::downgrade(&engine);
engine.step_service.start(weak_eng);
Ok(engine)
}

#[cfg(test)]
Expand Down Expand Up @@ -348,15 +344,6 @@ impl Clique {
}
}

impl Drop for Clique {
fn drop(&mut self) {
if let Some(step_service) = &self.step_service {
trace!(target: "shutdown", "Clique; stopping step service");
step_service.stop();
}
}
}

impl Engine<EthereumMachine> for Clique {
fn name(&self) -> &str { "Clique" }

Expand Down
27 changes: 14 additions & 13 deletions ethcore/src/engines/clique/step_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,41 +40,42 @@ impl StepService {

/// Start the StepService: spawns a thread that loops and triggers a sealing operation every 2sec.
pub fn start<M: Machine + 'static>(&self, engine: Weak<Engine<M>>) {
/// Pause before starting to step Clique
const INITIAL_DELAY: Duration = Duration::from_secs(5);
/// Step Clique at most every 2 seconds
const SEALING_FREQ: Duration = Duration::from_secs(2);
let shutdown = self.shutdown.clone();

let thr = thread::Builder::new()
.name("CliqueStepService".into())
.name("StepService".into())
.spawn(move || {
// startup delay.
thread::sleep(Duration::from_secs(5));
thread::sleep(INITIAL_DELAY);

loop {
// see if we are in shutdown.
if shutdown.load(Ordering::Acquire) {
trace!(target: "shutdown", "CliqueStepService: received shutdown signal!");
trace!(target: "shutdown", "StepService: received shutdown signal!");
break;
}

trace!(target: "miner", "CliqueStepService: triggering sealing");
trace!(target: "miner", "StepService: triggering sealing");

// Try sealing
engine.upgrade().map(|x| x.step());

// Yield
thread::sleep(Duration::from_millis(2000));
thread::sleep(SEALING_FREQ);
}
trace!(target: "shutdown", "CliqueStepService: exited loop, shutdown.");
}).expect("CliqueStepService thread failed");

}).expect("StepService thread failed");
*self.thread.write() = Some(thr);
}
}

/// Stop the `StepService`
pub fn stop(&self) {
trace!(target: "shutdown", "CliqueStepService: signalling shutting to stepping thread.");
impl Drop for StepService {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Release);
if let Some(t) = self.thread.write().take() {
t.join().expect("CliqueStepService thread panicked!");
t.join().expect("StepService thread panicked!");
}
}
}
3 changes: 0 additions & 3 deletions ethcore/src/engines/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,6 @@ pub trait Engine<M: Machine>: Sync + Send {
/// Trigger next step of the consensus engine.
fn step(&self) {}

/// Stops any services that the may hold the Engine and makes it safe to drop.
fn stop(&mut self) {}

/// Create a factory for building snapshot chunks and restoring from them.
/// Returning `None` indicates that this engine doesn't support snapshot creation.
fn snapshot_components(&self) -> Option<Box<SnapshotComponents>> {
Expand Down

0 comments on commit fe55764

Please sign in to comment.