From cd7217b67ce47f0576d5e827b574e62242a2fd35 Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Thu, 29 Sep 2022 13:18:03 +0100 Subject: [PATCH] fix: properly stop prefetching background threads (#7712) Explicitly stop and wait for prefetching background threads to terminate when the `ShardTriesInner` is dropped. This avoids that estimations are influenced by background threads left over from previous estimations, which we have observed since merging #7661. --- .../src/trie/prefetching_trie_storage.rs | 100 +++++++++++++----- core/store/src/trie/shard_tries.rs | 4 +- 2 files changed, 78 insertions(+), 26 deletions(-) diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index e5dc696435c..f993f4d0ce7 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -3,14 +3,17 @@ use crate::{ metrics, DBCol, StorageError, Store, Trie, TrieCache, TrieCachingStorage, TrieConfig, TrieStorage, }; +use crossbeam::select; use near_o11y::metrics::prometheus; use near_o11y::metrics::prometheus::core::GenericGauge; +use near_o11y::tracing::error; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; use near_primitives::trie_key::TrieKey; use near_primitives::types::{AccountId, ShardId, StateRoot, TrieNodesCount}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; +use std::thread; const MAX_QUEUED_WORK_ITEMS: usize = 16 * 1024; const MAX_PREFETCH_STAGING_MEMORY: usize = 200 * 1024 * 1024; @@ -239,7 +242,7 @@ impl TrieStorage for TriePrefetchingStorage { PrefetcherResult::Prefetched(value) => Ok(value), PrefetcherResult::Pending => { // yield once before calling `block_get` that will check for data to be present again. - std::thread::yield_now(); + thread::yield_now(); self.prefetching .blocking_get(hash.clone()) .or_else(|| { @@ -334,7 +337,7 @@ impl PrefetchStagingArea { Some(_) => (), None => return None, } - std::thread::sleep(std::time::Duration::from_micros(1)); + thread::sleep(std::time::Duration::from_micros(1)); } } @@ -377,12 +380,12 @@ impl PrefetchStagingArea { } impl PrefetchApi { - pub fn new( + pub(crate) fn new( store: Store, shard_cache: TrieCache, shard_uid: ShardUId, trie_config: &TrieConfig, - ) -> Self { + ) -> (Self, PrefetchingThreadsHandle) { let (work_queue_tx, work_queue_rx) = crossbeam::channel::bounded(MAX_QUEUED_WORK_ITEMS); let sweat_prefetch_receivers = trie_config.sweat_prefetch_receivers.clone(); let sweat_prefetch_senders = trie_config.sweat_prefetch_senders.clone(); @@ -397,10 +400,19 @@ impl PrefetchApi { sweat_prefetch_senders, shard_uid, }; - for _ in 0..NUM_IO_THREADS { - this.start_io_thread(store.clone(), shard_cache.clone(), shard_uid.clone()); - } - this + let (shutdown_tx, shutdown_rx) = crossbeam::channel::bounded(1); + let handles = (0..NUM_IO_THREADS) + .map(|_| { + this.start_io_thread( + store.clone(), + shard_cache.clone(), + shard_uid.clone(), + shutdown_rx.clone(), + ) + }) + .collect(); + let handle = PrefetchingThreadsHandle { shutdown_channel: Some(shutdown_tx), handles }; + (this, handle) } /// Returns the argument back if queue is full. @@ -417,7 +429,8 @@ impl PrefetchApi { store: Store, shard_cache: TrieCache, shard_uid: ShardUId, - ) -> std::thread::JoinHandle<()> { + shutdown_rx: crossbeam::channel::Receiver<()>, + ) -> thread::JoinHandle<()> { let prefetcher_storage = TriePrefetchingStorage::new(store, shard_uid, shard_cache, self.prefetching.clone()); let work_queue = self.work_queue_rx.clone(); @@ -425,22 +438,35 @@ impl PrefetchApi { metrics::PREFETCH_SENT.with_label_values(&[&shard_uid.shard_id.to_string()]); let metric_prefetch_fail = metrics::PREFETCH_FAIL.with_label_values(&[&shard_uid.shard_id.to_string()]); - std::thread::spawn(move || { - while let Ok((trie_root, trie_key)) = work_queue.recv() { - // Since the trie root can change,and since the root is not known at the time when the IO threads starts, - // we need to redefine the trie before each request. - // Note that the constructor of `Trie` is trivial, and the clone only clones a few `Arc`s, so the performance hit is small. - let prefetcher_trie = - Trie::new(Box::new(prefetcher_storage.clone()), trie_root, None); - let storage_key = trie_key.to_vec(); - metric_prefetch_sent.inc(); - if let Ok(_maybe_value) = prefetcher_trie.get(&storage_key) { - near_o11y::io_trace!(count: "prefetch"); - } else { - // This may happen in rare occasions and can be ignored safely. - // See comments in `TriePrefetchingStorage::retrieve_raw_bytes`. - near_o11y::io_trace!(count: "prefetch_failure"); - metric_prefetch_fail.inc(); + thread::spawn(move || { + loop { + let selected = select! { + recv(shutdown_rx) -> _ => None, + recv(work_queue) -> maybe_work_item => maybe_work_item.ok(), + }; + + match selected { + None => return, + Some((trie_root, trie_key)) => { + // Since the trie root can change,and since the root is + // not known at the time when the IO threads starts, + // we need to redefine the trie before each request. + // Note that the constructor of `Trie` is trivial, and + // the clone only clones a few `Arc`s, so the performance + // hit is small. + let prefetcher_trie = + Trie::new(Box::new(prefetcher_storage.clone()), trie_root, None); + let storage_key = trie_key.to_vec(); + metric_prefetch_sent.inc(); + if let Ok(_maybe_value) = prefetcher_trie.get(&storage_key) { + near_o11y::io_trace!(count: "prefetch"); + } else { + // This may happen in rare occasions and can be ignored safely. + // See comments in `TriePrefetchingStorage::retrieve_raw_bytes`. + near_o11y::io_trace!(count: "prefetch_failure"); + metric_prefetch_fail.inc(); + } + } } } }) @@ -469,6 +495,30 @@ fn prefetch_state_matches(expected: PrefetchSlot, actual: &PrefetchSlot) -> bool } } +/// Guard that owns the spawned prefetching IO threads. +#[must_use = "When dropping this handle, the IO threads will be aborted immediately."] +pub(crate) struct PrefetchingThreadsHandle { + /// Shutdown channel to all spawned threads. + shutdown_channel: Option>, + /// Join handles of spawned threads. + /// + /// Used to actively join all background threads after shutting them down. + handles: Vec>, +} + +impl Drop for PrefetchingThreadsHandle { + fn drop(&mut self) { + // Dropping the single sender will hang up the channel and stop + // background threads. + self.shutdown_channel.take(); + for handle in self.handles.drain(..) { + if let Err(e) = handle.join() { + error!("IO thread panicked joining failed, {e:?}"); + } + } + } +} + /// Implementation to make testing from runtime possible. /// /// Prefetching by design has no visible side-effects. diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index 0acf2f10980..0842bab2fa5 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -13,6 +13,7 @@ use near_primitives::types::{ use crate::flat_state::FlatStateFactory; use crate::trie::config::TrieConfig; +use crate::trie::prefetching_trie_storage::PrefetchingThreadsHandle; use crate::trie::trie_storage::{TrieCache, TrieCachingStorage}; use crate::trie::{TrieRefcountChange, POISONED_LOCK_ERR}; use crate::{metrics, DBCol, DBOp, DBTransaction, PrefetchApi}; @@ -27,7 +28,7 @@ struct ShardTriesInner { view_caches: RwLock>, flat_state_factory: FlatStateFactory, /// Prefetcher state, such as IO threads, per shard. - prefetchers: RwLock>, + prefetchers: RwLock>, } #[derive(Clone)] @@ -136,6 +137,7 @@ impl ShardTries { &self.0.trie_config, ) }) + .0 .clone() });