Skip to content

Commit

Permalink
fix: stop background threads between estimations
Browse files Browse the repository at this point in the history
Explicitly stop and wait for prefetching background threads to terminate
when a testbed is dropped. This avoids that estimations are influenced
by background threads left over from previous estimations, which we have
observed since merging near#7661.
  • Loading branch information
jakmeier committed Sep 26, 2022
1 parent 2b5dcaa commit 9447cc2
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 12 deletions.
48 changes: 38 additions & 10 deletions core/store/src/trie/prefetching_trie_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ struct TriePrefetchingStorage {
/// The former puts requests in, the latter serves requests.
/// With this API, the store does not know about receipts etc, and the runtime
/// does not know about the trie structure. The only thing they share is this object.
#[derive(Clone)]
pub struct PrefetchApi {
/// Bounded, shared queue for all IO threads to take work from.
///
Expand All @@ -64,6 +63,9 @@ pub struct PrefetchApi {
/// at the same time.
work_queue_tx: crossbeam::channel::Sender<(StateRoot, TrieKey)>,
work_queue_rx: crossbeam::channel::Receiver<(StateRoot, TrieKey)>,
/// Threads spawned by this instance of `PrefetchApi`, clones created for chunk processing
/// will have an empty vector instead.
io_thread_handles: Vec<std::thread::JoinHandle<()>>,
/// Prefetching IO threads will insert fetched data here. This is also used
/// to mark what is already being fetched, to avoid fetching the same data
/// multiple times.
Expand Down Expand Up @@ -388,9 +390,10 @@ impl PrefetchApi {
let sweat_prefetch_senders = trie_config.sweat_prefetch_senders.clone();
let enable_receipt_prefetching = trie_config.enable_receipt_prefetching;

let this = Self {
let mut this = Self {
work_queue_tx,
work_queue_rx,
io_thread_handles: vec![],
prefetching: PrefetchStagingArea::new(shard_uid.shard_id()),
enable_receipt_prefetching,
sweat_prefetch_receivers,
Expand All @@ -412,20 +415,15 @@ impl PrefetchApi {
self.work_queue_tx.send((root, trie_key)).map_err(|e| e.0)
}

pub fn start_io_thread(
&self,
store: Store,
shard_cache: TrieCache,
shard_uid: ShardUId,
) -> std::thread::JoinHandle<()> {
pub fn start_io_thread(&mut self, store: Store, shard_cache: TrieCache, shard_uid: ShardUId) {
let prefetcher_storage =
TriePrefetchingStorage::new(store, shard_uid, shard_cache, self.prefetching.clone());
let work_queue = self.work_queue_rx.clone();
let metric_prefetch_sent =
metrics::PREFETCH_SENT.with_label_values(&[&shard_uid.shard_id.to_string()]);
let metric_prefetch_fail =
metrics::PREFETCH_FAIL.with_label_values(&[&shard_uid.shard_id.to_string()]);
std::thread::spawn(move || {
let handle = std::thread::spawn(move || {
while let Ok((trie_root, trie_key)) = work_queue.recv() {
// Since the trie root can change,and since the root is not known at the time when the IO threads starts,
// we need to redefine the trie before each request.
Expand All @@ -443,7 +441,8 @@ impl PrefetchApi {
metric_prefetch_fail.inc();
}
}
})
});
self.io_thread_handles.push(handle);
}

/// Remove queued up requests so IO threads will be paused after they finish their current task.
Expand All @@ -458,6 +457,18 @@ impl PrefetchApi {
pub fn clear_data(&self) {
self.prefetching.0.lock().expect(POISONED_LOCK_ERR).slots.clear();
}

/// Interrupt and wait for all prefetching background threads to terminate.
pub fn stop_background_threads(&mut self) -> std::thread::Result<()> {
// close cross-beam channel
(self.work_queue_tx, self.work_queue_rx) =
crossbeam::channel::bounded(MAX_QUEUED_WORK_ITEMS);
// wait for IO threads to terminate
for handle in self.io_thread_handles.drain(..) {
handle.join()?;
}
Ok(())
}
}

fn prefetch_state_matches(expected: PrefetchSlot, actual: &PrefetchSlot) -> bool {
Expand All @@ -469,6 +480,23 @@ fn prefetch_state_matches(expected: PrefetchSlot, actual: &PrefetchSlot) -> bool
}
}

// Manual implementation to avoid cloning thread handles. Those handles should
// be in exclusive ownership of the original object, stored in `ShardTries`.
impl Clone for PrefetchApi {
fn clone(&self) -> Self {
Self {
work_queue_tx: self.work_queue_tx.clone(),
work_queue_rx: self.work_queue_rx.clone(),
io_thread_handles: vec![],
prefetching: self.prefetching.clone(),
enable_receipt_prefetching: self.enable_receipt_prefetching.clone(),
sweat_prefetch_receivers: self.sweat_prefetch_receivers.clone(),
sweat_prefetch_senders: self.sweat_prefetch_senders.clone(),
shard_uid: self.shard_uid.clone(),
}
}
}

/// Implementation to make testing from runtime possible.
///
/// Prefetching by design has no visible side-effects.
Expand Down
9 changes: 9 additions & 0 deletions core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,15 @@ impl ShardTries {
) -> (StoreUpdate, StateRoot) {
self.apply_all_inner(trie_changes, shard_uid, true)
}

// Stop prefetching background threads and wait until they have terminated.
pub fn stop_prefetching_threads(&self) -> std::thread::Result<()> {
for prefetcher in self.0.prefetchers.write().expect(POISONED_LOCK_ERR).values_mut() {
prefetcher.clear_queue();
prefetcher.stop_background_threads()?;
}
Ok(())
}
}

pub struct WrappedTrieChanges {
Expand Down
6 changes: 6 additions & 0 deletions runtime/runtime-params-estimator/src/estimator_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,9 @@ impl Testbed<'_> {
}
}
}

impl<'a> Drop for Testbed<'a> {
fn drop(&mut self) {
self.inner.stop_prefetching_threads();
}
}
3 changes: 3 additions & 0 deletions runtime/runtime-params-estimator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,9 @@ fn apply_block_cost(ctx: &mut EstimatorContext) -> GasCost {

let gas_cost = average_cost(measurements);

// Drop required if `Drop` is implemented on testbed to satisfy lifetime
// requirements on `ctx`.
std::mem::drop(testbed);
ctx.cached.apply_block = Some(gas_cost.clone());

gas_cost
Expand Down
4 changes: 4 additions & 0 deletions runtime/runtime-params-estimator/src/testbed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,8 @@ impl RuntimeTestbed {
pub fn store(&mut self) -> Store {
self.tries.get_store()
}

pub fn stop_prefetching_threads(&self) {
self.tries.stop_prefetching_threads().unwrap();
}
}
5 changes: 3 additions & 2 deletions runtime/runtime-params-estimator/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ pub(crate) fn fn_cost_with_setup(
let block_latency = 0;
let overhead = overhead_per_measured_block(ctx, block_latency);
let block_size = 2usize;
let n_blocks = ctx.config.warmup_iters_per_block + ctx.config.iter_per_block;
let warmup_iters = ctx.config.warmup_iters_per_block;
let n_blocks = warmup_iters + ctx.config.iter_per_block;

let mut testbed = ctx.testbed();

Expand Down Expand Up @@ -180,7 +181,7 @@ pub(crate) fn fn_cost_with_setup(
// Filter out setup blocks.
let measurements: Vec<_> = measurements
.into_iter()
.skip(ctx.config.warmup_iters_per_block * 2)
.skip(warmup_iters * 2)
.enumerate()
.filter(|(i, _)| i % 2 == 1)
.map(|(_, m)| m)
Expand Down

0 comments on commit 9447cc2

Please sign in to comment.