Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: properly stop prefetching background threads #7712

Merged
merged 3 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 75 additions & 25 deletions core/store/src/trie/prefetching_trie_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ use crate::{
metrics, DBCol, StorageError, Store, Trie, TrieCache, TrieCachingStorage, TrieConfig,
TrieStorage,
};
use crossbeam::select;
use near_o11y::metrics::prometheus;
use near_o11y::metrics::prometheus::core::GenericGauge;
use near_o11y::tracing::error;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardUId;
use near_primitives::trie_key::TrieKey;
use near_primitives::types::{AccountId, ShardId, StateRoot, TrieNodesCount};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread;

const MAX_QUEUED_WORK_ITEMS: usize = 16 * 1024;
const MAX_PREFETCH_STAGING_MEMORY: usize = 200 * 1024 * 1024;
Expand Down Expand Up @@ -239,7 +242,7 @@ impl TrieStorage for TriePrefetchingStorage {
PrefetcherResult::Prefetched(value) => Ok(value),
PrefetcherResult::Pending => {
// yield once before calling `block_get` that will check for data to be present again.
std::thread::yield_now();
thread::yield_now();
self.prefetching
.blocking_get(hash.clone())
.or_else(|| {
Expand Down Expand Up @@ -334,7 +337,7 @@ impl PrefetchStagingArea {
Some(_) => (),
None => return None,
}
std::thread::sleep(std::time::Duration::from_micros(1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unreleated to the PR, but this one here looks quite suspect. Should this have a Condvar instead of a busy-wait here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah from a logical point it almost certainly should. But I suspect it would also make code quite a bit more complex.

To elaborate a bit, we are waiting for one of two events to happen here:

  1. An IO background thread puts the value into the staging area.
  2. A main thread puts the value in the shard cache.

Both these options are possible regardless of whether blocking_get got called from a main thread or from a background thread.

The case where we are a main thread and another main thread puts it in the shard cache is also not handled properly, yet. Let me create an issue for cleaning that up...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thread::sleep(std::time::Duration::from_micros(1));
}
}

Expand Down Expand Up @@ -377,12 +380,12 @@ impl PrefetchStagingArea {
}

impl PrefetchApi {
pub fn new(
pub(crate) fn new(
store: Store,
shard_cache: TrieCache,
shard_uid: ShardUId,
trie_config: &TrieConfig,
) -> Self {
) -> (Self, PrefetchingThreadsHandle) {
let (work_queue_tx, work_queue_rx) = crossbeam::channel::bounded(MAX_QUEUED_WORK_ITEMS);
let sweat_prefetch_receivers = trie_config.sweat_prefetch_receivers.clone();
let sweat_prefetch_senders = trie_config.sweat_prefetch_senders.clone();
Expand All @@ -397,10 +400,19 @@ impl PrefetchApi {
sweat_prefetch_senders,
shard_uid,
};
for _ in 0..NUM_IO_THREADS {
this.start_io_thread(store.clone(), shard_cache.clone(), shard_uid.clone());
}
this
let (shutdown_tx, shutdown_rx) = crossbeam::channel::bounded(1);
let handles = (0..NUM_IO_THREADS)
.map(|_| {
this.start_io_thread(
store.clone(),
shard_cache.clone(),
shard_uid.clone(),
shutdown_rx.clone(),
)
})
.collect();
let handle = PrefetchingThreadsHandle { shutdown_channel: Some(shutdown_tx), handles };
(this, handle)
}

/// Returns the argument back if queue is full.
Expand All @@ -417,30 +429,44 @@ impl PrefetchApi {
store: Store,
shard_cache: TrieCache,
shard_uid: ShardUId,
) -> std::thread::JoinHandle<()> {
shutdown_rx: crossbeam::channel::Receiver<()>,
) -> thread::JoinHandle<()> {
let prefetcher_storage =
TriePrefetchingStorage::new(store, shard_uid, shard_cache, self.prefetching.clone());
let work_queue = self.work_queue_rx.clone();
let metric_prefetch_sent =
metrics::PREFETCH_SENT.with_label_values(&[&shard_uid.shard_id.to_string()]);
let metric_prefetch_fail =
metrics::PREFETCH_FAIL.with_label_values(&[&shard_uid.shard_id.to_string()]);
std::thread::spawn(move || {
while let Ok((trie_root, trie_key)) = work_queue.recv() {
// Since the trie root can change,and since the root is not known at the time when the IO threads starts,
// we need to redefine the trie before each request.
// Note that the constructor of `Trie` is trivial, and the clone only clones a few `Arc`s, so the performance hit is small.
let prefetcher_trie =
Trie::new(Box::new(prefetcher_storage.clone()), trie_root, None);
let storage_key = trie_key.to_vec();
metric_prefetch_sent.inc();
if let Ok(_maybe_value) = prefetcher_trie.get(&storage_key) {
near_o11y::io_trace!(count: "prefetch");
} else {
// This may happen in rare occasions and can be ignored safely.
// See comments in `TriePrefetchingStorage::retrieve_raw_bytes`.
near_o11y::io_trace!(count: "prefetch_failure");
metric_prefetch_fail.inc();
thread::spawn(move || {
loop {
let selected = select! {
recv(shutdown_rx) -> _ => None,
recv(work_queue) -> maybe_work_item => maybe_work_item.ok(),
};

match selected {
None => return,
Some((trie_root, trie_key)) => {
// Since the trie root can change,and since the root is
// not known at the time when the IO threads starts,
// we need to redefine the trie before each request.
// Note that the constructor of `Trie` is trivial, and
// the clone only clones a few `Arc`s, so the performance
// hit is small.
let prefetcher_trie =
Trie::new(Box::new(prefetcher_storage.clone()), trie_root, None);
let storage_key = trie_key.to_vec();
metric_prefetch_sent.inc();
if let Ok(_maybe_value) = prefetcher_trie.get(&storage_key) {
near_o11y::io_trace!(count: "prefetch");
} else {
// This may happen in rare occasions and can be ignored safely.
// See comments in `TriePrefetchingStorage::retrieve_raw_bytes`.
near_o11y::io_trace!(count: "prefetch_failure");
metric_prefetch_fail.inc();
}
}
}
}
})
Expand Down Expand Up @@ -469,6 +495,30 @@ fn prefetch_state_matches(expected: PrefetchSlot, actual: &PrefetchSlot) -> bool
}
}

/// Guard that owns the spawned prefetching IO threads.
#[must_use = "When dropping this handle, the IO threads will be aborted immediately."]
pub(crate) struct PrefetchingThreadsHandle {
/// Shutdown channel to all spawned threads.
shutdown_channel: Option<crossbeam::channel::Sender<()>>,
/// Join handles of spawned threads.
///
/// Used to actively join all background threads after shutting them down.
handles: Vec<thread::JoinHandle<()>>,
}

impl Drop for PrefetchingThreadsHandle {
fn drop(&mut self) {
// Dropping the single sender will hang up the channel and stop
// background threads.
self.shutdown_channel.take();
for handle in self.handles.drain(..) {
if let Err(e) = handle.join() {
error!("IO thread panicked joining failed, {e:?}");
}
}
}
}

/// Implementation to make testing from runtime possible.
///
/// Prefetching by design has no visible side-effects.
Expand Down
4 changes: 3 additions & 1 deletion core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use near_primitives::types::{

use crate::flat_state::FlatStateFactory;
use crate::trie::config::TrieConfig;
use crate::trie::prefetching_trie_storage::PrefetchingThreadsHandle;
use crate::trie::trie_storage::{TrieCache, TrieCachingStorage};
use crate::trie::{TrieRefcountChange, POISONED_LOCK_ERR};
use crate::{metrics, DBCol, DBOp, DBTransaction, PrefetchApi};
Expand All @@ -27,7 +28,7 @@ struct ShardTriesInner {
view_caches: RwLock<HashMap<ShardUId, TrieCache>>,
flat_state_factory: FlatStateFactory,
/// Prefetcher state, such as IO threads, per shard.
prefetchers: RwLock<HashMap<ShardUId, PrefetchApi>>,
prefetchers: RwLock<HashMap<ShardUId, (PrefetchApi, PrefetchingThreadsHandle)>>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -136,6 +137,7 @@ impl ShardTries {
&self.0.trie_config,
)
})
.0
.clone()
});

Expand Down