Skip to content

Commit

Permalink
DLPX-85889 zoa panic: UpdateAtime has no corresponding entry in the i…
Browse files Browse the repository at this point in the history
…ndex run (openzfs#845)

The zettacache index cache is updated as part of merging the
PendingChanges into the on-disk index.  The merge task sends the updates
to the checkpoint task, as part of a `MergeProgress` message.  The index
cache updates are then made from a spawned blocking (CPU-bound) task.
The updates are completed (waited for) before the next checkpoint
completes.

During the merge, it's expected that lookups can see IndexEntry's from
the old index, either from reading the old index itself, or from the
index entry cache.  These stale entries are "corrected" by either
`PendingChanges::update()`'s call to `Remap::remap()`, or
`MergeState::entry_disposition()`'s check of
`PendingChanges::freeing()`.

When the `MergeMessage::Complete` is received it calls
`Locked::rotate_index()` which deletes the old on-disk index, and calls
`PendingChanges::set_remap(None)` and `Locked::merge.take()`.  This ends
the stale entry "corrections" mentioned above, which are no longer
necessary because we can no longer see stale entries from the old
on-disk index.

The problem occurs when the `MergeMessage::Complete` is received and
processed before the spawned blocking task completes.  In this case, we
end the stale entry "corrections", but we can still see stale entries
from the index cache.

This PR addresses the problem by waiting for the index cache updates to
complete before processing the `MergeMessage::Complete`.

The problem was introduced by openzfs#808.
  • Loading branch information
ahrens authored May 9, 2023
1 parent bfe30c5 commit 3a1595c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 32 deletions.
4 changes: 3 additions & 1 deletion cmd/zfs_object_agent/zettacache/src/zettacache/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ impl<'a> Progress<'a> {
state.stats.track_bytes(Evictions, size);
}
// The index cache does not need to be updated, because the entry will continue
// to fail validation (`Locked::validate()`) based on its atime.
// to fail validation (`Locked::validate()`) based on its atime, or if it was
// removed due to remapping to None, it has already been added to
// `cache_updates`.
}
EntryDisposition::RemoveFreed => {
if let Some(extent) = entry.value.extent() {
Expand Down
72 changes: 41 additions & 31 deletions cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,7 @@ impl Inner {
let mut free_count = 0;
let mut cache_updates_count = 0;
let mut handles = Vec::new();
let mut handle_duration = Duration::ZERO;
let cache_update_duration = Arc::new(std::sync::Mutex::new(Duration::ZERO));
let locked_held = Arc::new(std::sync::Mutex::new(Duration::ZERO));
// we have a channel to an active merge task, check it for messages
Expand Down Expand Up @@ -985,39 +986,36 @@ impl Inner {
}));

// Spawn a new "blocking" (CPU-bound) task to process the index cache
// updates. Because this is CPU bound, if we didn't use a blocking
// task, it could starve the async executor thread.
// updates. Because this takes lots of CPU, if we didn't use a
// blocking task, it could starve the async executor thread,
// impacting overall agent performance.
let inner = self.clone();
let duration = cache_update_duration.clone();
handles.push(tokio::task::spawn_blocking(move || {
// Here is where we populate the index cache to contain any new
// keys that may have been inserted or updated, as well as ensure
// any existing keys are consistent w.r.t. a remap (i.e. ensuring
// the cache references the key's new/remapped location on disk).
// Populate the index cache to contain any new keys that may have
// been inserted, as well as ensure any existing keys are
// consistent w.r.t. a remap (i.e. ensuring the cache references
// the key's new/remapped location on disk).
//
// Note that keys that are already in the cache will have their
// associated values updated, and keys that are not will be added
// (with their values). Also note that any keys with no
// associated values (ghost keys) will be removed from the index.
// This is important for keys which may have been "valid", but
// were evicted because they could not be remapped.
// Keys that are already in the cache will have their associated
// values updated, and keys that are not will be added (with
// their values). Any keys with no associated values (ghost keys)
// will be removed from the index. This is important for keys
// which may have been "valid", but were evicted because they
// could not be remapped.
let start = Instant::now();
with_alloctag("Locked::index_cache", || {
for entry in progress.cache_updates.into_iter() {
match entry.value.location() {
// It's possible the key wasn't already in the cache, so
// this may add or update the key.
Some(_) => {
inner.index_cache.insert(entry.key, entry.value);
}
// It's possible the key isn't in the cache; .remove()
// doesn't fail in that case.
None => {
inner.index_cache.remove(&entry.key);
}
};
}
});
for entry in progress.cache_updates.into_iter() {
match entry.value.location() {
Some(_) => {
// insert or update the value
inner.index_cache.insert(entry.key, entry.value);
}
None => {
// remove if present
inner.index_cache.remove(&entry.key);
}
};
}
*(duration.lock().unwrap()) += start.elapsed();
}));

Expand All @@ -1034,6 +1032,17 @@ impl Inner {
}
// merge task complete, replace the current index with the new index
Some(MergeMessage::Complete(new_index)) => {
// Index cache updates must take effect before the indices are
// rotated. Rotation clears the MergeState and the PendingChange's
// Remap, so we can't be allowed to see any stale locations in the
// index cache, because they couldn't be corrected by
// `MergeState::entry_disposition()`'s freeing() check and
// `PendingChanges::update()`'s call to `Remap::remap()`.
let handle_start = Instant::now();
for handle in handles.drain(..) {
handle.await.unwrap();
}
handle_duration += handle_start.elapsed();
let mut indices = self.indices.write().await;
let mut locked = lock_non_send_measured!(&self.locked).await;
locked.block_allocator.finish_evacuation(new_index.id());
Expand All @@ -1051,19 +1060,20 @@ impl Inner {
}
}

// Frees and index cache updates must take effect before the checkpoint is
// written, so we don't try to read from old (freed) locations.
// Frees must take effect before the checkpoint is written, so we don't leak
// space if we crash.
let handle_start = Instant::now();
for handle in handles {
handle.await.unwrap();
}
handle_duration += handle_start.elapsed();
debug!(
"processed {msg_count} merge messages with {free_count} frees and \
{cache_updates_count} cache updates in {}ms (state lock held for {}ms, cache updated in {} CPU-ms, waited {}ms)",
begin.elapsed().as_millis(),
locked_held.lock().unwrap().as_millis(),
cache_update_duration.lock().unwrap().as_millis(),
handle_start.elapsed().as_millis(),
handle_duration.as_millis(),
);
} else {
loop {
Expand Down

0 comments on commit 3a1595c

Please sign in to comment.