Skip to content

Commit

Permalink
CACHE-HITS ratio is wrong when scrub is running (openzfs#322)
Browse files Browse the repository at this point in the history
The CACHE-HITS ratio percentage is calculated as `CacheHit` /
`LookupForRead`.  However, the agent also increments CacheHit when
incrementing LookupForWrite (from `ZettaCache::heal()`).  We call heal()
when there's zettacache corruption, which should be very rare, but we
also heal() for every scrub read.  Because of this, when `zpool scrub`
is running, the hits ratio is incorrectly high, often much more than
100%.

Each write_block request results in a `ZettaCache::lookup_impl()`, to
ensure that the block isn't already present due to a kernel crash.
However, this is an implementation detail, and we expect these lookups
to rarely hit, and to always find the index entry in the index chunk
cache.  So we don't need these "lookups" to show up in any kind of
accounting.

This commit revamps the cache hits accounting by counting only (and all)
non-write lookups.
  • Loading branch information
ahrens authored Mar 29, 2022
1 parent 4f0675c commit 3aadcbd
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 78 deletions.
5 changes: 2 additions & 3 deletions cmd/zfs_object_agent/util/src/zettacache_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,7 @@ pub enum CacheStatCounter {
// These stats are collected as part of the ZettaCache.stats struct.
// They are consumed in the StatsDisplay.display_stat_values() function.
LookupBytes,
LookupForRead,
LookupForWrite,
Lookup,
CacheMissLockBusy, // pending, DOSE-905
IndexHitPendingChanges,
IndexHitIndexCache,
Expand All @@ -377,7 +376,7 @@ pub enum CacheStatCounter {
InsertForRead,
InsertForWrite,
InsertForSpeculativeRead,
InsertForHealing,
InsertForHeal,
InsertDropQueueFull,
InsertDropLockBusy, // pending, DOSE-905
HealedBlocks,
Expand Down
18 changes: 8 additions & 10 deletions cmd/zfs_object_agent/zcache/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,33 +211,31 @@ impl StatsDisplay {
}

// LOOKUPS
let write_lookups = values.value(LookupForWrite) as f64 * scale;
let read_lookups = values.value(LookupForRead) as f64 * scale;
let lookups = values.value(Lookup) as f64 * scale;
let pending_changes_hits = values.value(IndexHitPendingChanges) as f64 * scale;
let index_cache_hits = values.value(IndexHitIndexCache) as f64 * scale;
let chunk_cache_hits = values.value(IndexHitChunkCache) as f64 * scale;
let disk_hits = values.value(IndexHitDisk) as f64 * scale;
let hits = values.value(CacheHit) as f64 * scale;
let total_lookups = read_lookups + write_lookups;

self.display_count(total_lookups);
self.display_count(lookups);
self.display_bytes(values.value(LookupBytes) as f64 * scale);
if self.show_lookup_detail {
// LOOKUP DETAILS (optional)
self.display_percent(pending_changes_hits, read_lookups);
self.display_percent(index_cache_hits, read_lookups);
self.display_percent(chunk_cache_hits, read_lookups);
self.display_percent(disk_hits, read_lookups);
self.display_percent(pending_changes_hits, lookups);
self.display_percent(index_cache_hits, lookups);
self.display_percent(chunk_cache_hits, lookups);
self.display_percent(disk_hits, lookups);
}
// HITS
self.display_count(hits);
self.display_percent(hits, read_lookups);
self.display_percent(hits, lookups);

// INSERTS
let inserts = (values.value(InsertForRead)
+ values.value(InsertForWrite)
+ values.value(InsertForSpeculativeRead)
+ values.value(InsertForHealing)) as f64
+ values.value(InsertForHeal)) as f64
* scale;
self.display_count(inserts);
self.display_bytes(values.value(InsertBytes) as f64 * scale);
Expand Down
1 change: 0 additions & 1 deletion cmd/zfs_object_agent/zettacache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,4 @@ pub use zcachedb::ZettaCacheDBCommand;
pub use crate::zettacache::InsertSource;
pub use crate::zettacache::LookupOperation;
pub use crate::zettacache::LookupResponse;
pub use crate::zettacache::LookupSource;
pub use crate::zettacache::ZettaCache;
114 changes: 52 additions & 62 deletions cmd/zfs_object_agent/zettacache/src/zettacache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -840,10 +840,18 @@ pub enum InsertSource {
}

#[derive(Clone, Copy, Debug)]
pub enum LookupSource {
Write,
Read,
Evict,
enum LookupCounting {
Off,
On,
}

impl LookupCounting {
fn is_on(self) -> bool {
match self {
LookupCounting::On => true,
LookupCounting::Off => false,
}
}
}

pub enum LookupOperation {
Expand Down Expand Up @@ -1523,12 +1531,7 @@ impl ZettaCache {
.await;
}

pub async fn lookup(
&self,
guid: PoolGuid,
block: BlockId,
source: LookupSource,
) -> LookupResponse {
pub async fn lookup(&self, guid: PoolGuid, block: BlockId) -> LookupResponse {
let key = IndexKey::new(self.pool_guids.map_pool_guid(guid), block);
let locked_key = LockedKey(measure!().fut(self.outstanding_lookups.lock(key)).await);

Expand All @@ -1538,37 +1541,33 @@ impl ZettaCache {
}

let bytes = self
.lookup_impl(&locked_key, source, |state, value| {
if matches!(source, LookupSource::Read) {
state.size_histogram.lookup();
}
.lookup_impl(&locked_key, LookupCounting::On, |state, value| {
state.size_histogram.lookup();
match value {
Some(value) => future::Either::Left(state.lookup(&locked_key, value, source)),
Some(value) => future::Either::Left(state.lookup(&locked_key, value)),
None => future::Either::Right(future::ready(None)),
}
})
.await;

let response = match bytes {
self.stats.track_count(Lookup);
match bytes {
Some(bytes) => {
self.stats.track_bytes(LookupBytes, bytes.len() as u64);
super_trace!("cache hit for {:?}", key);
self.stats.track_bytes(LookupBytes, bytes.len() as u64);
self.stats.track_count(CacheHit);
LookupResponse::Present((bytes, locked_key))
}
None => LookupResponse::Absent(locked_key),
};

match source {
LookupSource::Write => self.stats.track_count(LookupForWrite),
LookupSource::Read => self.stats.track_count(LookupForRead),
LookupSource::Evict => {} // not possible for this code path
}

response
}

async fn lookup_impl<F, R, Fut>(&self, locked_key: &LockedKey, source: LookupSource, f: F) -> R
async fn lookup_impl<F, R, Fut>(
&self,
locked_key: &LockedKey,
counting: LookupCounting,
f: F,
) -> R
where
F: FnOnce(&mut ZettaCacheState, Option<ValidIndexValue>) -> Fut,
Fut: Future<Output = R> + Send,
Expand All @@ -1592,7 +1591,7 @@ impl ZettaCache {
let validated = state.validate(value);
// All entries in the pending changes should be valid
assert!(validated.is_some());
if matches!(source, LookupSource::Read) {
if counting.is_on() {
self.stats.track_count(IndexHitPendingChanges);
}
Either::Left(f(&mut state, validated))
Expand All @@ -1605,9 +1604,9 @@ impl ZettaCache {
match pc {
PendingChange::Insert(value)
| PendingChange::UpdateAtime(UpdateAtime(value, _)) => {
state.ghost_hit_check(value, source);
state.ghost_hit_check(value, counting);
let validated = state.validate(value);
if matches!(source, LookupSource::Read) {
if counting.is_on() {
self.stats.track_count(IndexHitPendingChanges);
}
Either::Left(f(&mut state, validated))
Expand All @@ -1616,9 +1615,9 @@ impl ZettaCache {
} else {
match state.index_cache.get(&key) {
Some(&value) => {
state.ghost_hit_check(value, source);
state.ghost_hit_check(value, counting);
let validated = state.validate(value);
if matches!(source, LookupSource::Read) {
if counting.is_on() {
self.stats.track_count(IndexHitIndexCache);
}
Either::Left(f(&mut state, validated))
Expand All @@ -1629,9 +1628,9 @@ impl ZettaCache {
} else {
match state.index_cache.get(&key) {
Some(&value) => {
state.ghost_hit_check(value, source);
state.ghost_hit_check(value, counting);
let validated = state.validate(value);
if matches!(source, LookupSource::Read) {
if counting.is_on() {
self.stats.track_count(IndexHitIndexCache);
}
Either::Left(f(&mut state, validated))
Expand Down Expand Up @@ -1671,7 +1670,7 @@ impl ZettaCache {
Either::Left(index) => measure!().fut(index.lookup(key)).await,
Either::Right(index) => measure!().fut(index.lookup(key)).await,
};
if matches!(source, LookupSource::Read) {
if counting.is_on() {
if chunk_cache_hit {
self.stats.track_count(IndexHitChunkCache);
} else {
Expand All @@ -1683,7 +1682,7 @@ impl ZettaCache {
// Again, we don't want to hold the state lock while reading from disk so
// we use lock_non_send() to ensure that we can't hold it across .await.
let mut state = measure!().fut(lock_non_send(&self.state)).await;
let value = state.lookup_with_value_from_index(&key, entry.value, source);
let value = state.lookup_with_value_from_index(&key, entry.value, counting);
if value.is_none() {
super_trace!(
"cache miss after reading index for {:?}, invalid entry",
Expand Down Expand Up @@ -1780,7 +1779,7 @@ impl ZettaCache {

self.stats.track_bytes(InsertBytes, bytes.len() as u64);
self.stats.track_count(match source {
InsertSource::Heal => InsertForHealing,
InsertSource::Heal => InsertForHeal,
InsertSource::Read => InsertForRead,
InsertSource::SpeculativeRead => InsertForSpeculativeRead,
InsertSource::Write => InsertForWrite,
Expand Down Expand Up @@ -1845,7 +1844,7 @@ impl ZettaCache {

let present = measure!()
.fut(
cache.lookup_impl(&locked_key, LookupSource::Write, |_state, value| {
cache.lookup_impl(&locked_key, LookupCounting::Off, |_state, value| {
future::ready(value.is_some())
}),
)
Expand All @@ -1863,7 +1862,7 @@ impl ZettaCache {

cache.stats.track_bytes(InsertBytes, len as u64);
cache.stats.track_count(match source {
InsertSource::Heal => InsertForHealing,
InsertSource::Heal => InsertForHeal,
InsertSource::Read => InsertForRead,
InsertSource::SpeculativeRead => InsertForSpeculativeRead,
InsertSource::Write => InsertForWrite,
Expand All @@ -1884,20 +1883,17 @@ impl ZettaCache {
}

pub async fn heal(&self, guid: PoolGuid, block: BlockId, object_bytes: AlignedBytes) {
if let LookupResponse::Present((cache_bytes, locked_key)) =
self.lookup(guid, block, LookupSource::Write).await
{
// For (hopefully) obvious reasons, we only need to do the heal when the bytes contained
// in the cache differ from the bytes contained in the object store. The
// bytes contained in the object store are always preferred over the bytes
// contained in the cache; we assume the bytes passed were retrieved from the object
// store.
if let LookupResponse::Present((cache_bytes, locked_key)) = self.lookup(guid, block).await {
// For (hopefully) obvious reasons, we only need to do the heal when the bytes
// contained in the cache differ from the bytes contained in the object store. The
// bytes contained in the object store are always preferred over the bytes contained
// in the cache; we assume the bytes passed were retrieved from the object store.
if *cache_bytes != *object_bytes {
self.stats.track_count(HealedBlocks);
debug!("healing cache: {:?}", locked_key.key());
// Note: this will result in a second insert for the same key in the index. This
// will be resolved either in the insert code (if the first insert
// is in pending_changes) or later during the next merge.
// will be resolved either in the insert code (if the first insert is in
// pending_changes) or later during the next merge.
self.insert(
locked_key,
object_bytes.len(),
Expand Down Expand Up @@ -1962,7 +1958,7 @@ impl ZettaCacheState {
}
}

fn ghost_hit_check(&mut self, value: IndexValue, source: LookupSource) {
fn ghost_hit_check(&mut self, value: IndexValue, counting: LookupCounting) {
let (live_cutoff, ghost_cutoff) = match &self.merge {
Some(ms) => (ms.eviction_cutoff, ms.ghost_cutoff),
None => (
Expand All @@ -1971,10 +1967,7 @@ impl ZettaCacheState {
),
};

if value.atime() >= ghost_cutoff
&& value.atime() < live_cutoff
&& matches!(source, LookupSource::Read)
{
if value.atime() >= ghost_cutoff && value.atime() < live_cutoff && counting.is_on() {
// This is a hit in the ghost hit-by-size histogram
let size = self.atime_histogram.size_at(value.atime());
self.size_histogram.ghost_hit(size);
Expand All @@ -1985,7 +1978,7 @@ impl ZettaCacheState {
&mut self,
key: &IndexKey,
value_from_index: IndexValue,
source: LookupSource,
counting: LookupCounting,
) -> Option<ValidIndexValue> {
// Note: we're here because there was no PendingChange for this key, but
// since we dropped the lock, a PendingChange may have been inserted
Expand All @@ -1998,27 +1991,24 @@ impl ZettaCacheState {
| Some(PendingChange::UpdateAtime(UpdateAtime(value_ref, _))) => *value_ref,
None => value_from_index,
};
self.ghost_hit_check(value, source);
self.ghost_hit_check(value, counting);
self.validate(value)
}

fn lookup(
&mut self,
locked_key: &LockedKey,
valid_value: ValidIndexValue,
source: LookupSource,
) -> impl Future<Output = Option<AlignedBytes>> {
let key = locked_key.key();
let old_value = valid_value.0;
let old_atime = old_value.atime();
super_trace!("cache hit: reading {:?} from {:?}", key, old_value);

if matches!(source, LookupSource::Read) {
// Add an entry to the hit-by-size histogram
let size = self.atime_histogram.size_at(old_atime);
super_trace!("cache size {} at {:?}", size, old_atime);
self.size_histogram.live_hit(size);
}
// Add an entry to the hit-by-size histogram
let size = self.atime_histogram.size_at(old_atime);
super_trace!("cache size {} at {:?}", size, old_atime);
self.size_histogram.live_hit(size);

assert_le!(old_atime, self.atime);
let new_value = IndexValue::new(old_value.location(), old_value.size(), self.atime);
Expand Down
3 changes: 1 addition & 2 deletions cmd/zfs_object_agent/zettaobject/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ use uuid::Uuid;
use zettacache::base_types::*;
use zettacache::InsertSource;
use zettacache::LookupResponse;
use zettacache::LookupSource;
use zettacache::ZettaCache;

use crate::base_types::*;
Expand Down Expand Up @@ -1743,7 +1742,7 @@ impl Pool {
.await
}
false => match measure!()
.fut(cache.lookup(self.state.shared_state.guid, block, LookupSource::Read))
.fut(cache.lookup(self.state.shared_state.guid, block))
.await
{
LookupResponse::Present((cached_bytes, _key)) => cached_bytes.into(),
Expand Down

0 comments on commit 3aadcbd

Please sign in to comment.