Skip to content

Commit

Permalink
Add metadata deletion capability to FileSystem trait (#229)
Browse files Browse the repository at this point in the history
* let file system manage file deletion

Signed-off-by: tabokie <xy.tao@outlook.com>

* update changelog

Signed-off-by: tabokie <xy.tao@outlook.com>

* address comment and fix test

Signed-off-by: tabokie <xy.tao@outlook.com>

* detect and cleanup stale metadata on startup

Signed-off-by: tabokie <xy.tao@outlook.com>

* update changelog

Signed-off-by: tabokie <xy.tao@outlook.com>

* fix issues

Signed-off-by: tabokie <xy.tao@outlook.com>
  • Loading branch information
tabokie authored Jun 23, 2022
1 parent b0ad1f7 commit a950ec5
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
### Public API Changes

* Add `is_empty` to `Engine` API.
* Add metadata deletion capability to `FileSystem` trait. Users can implement `exists_metadata` and `delete_metadata` to clean up obsolete metadata from older versions of Raft Engine.

## [0.2.0] - 2022-05-25

Expand Down
138 changes: 138 additions & 0 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ mod tests {
use crate::util::ReadableSize;
use kvproto::raft_serverpb::RaftLocalState;
use raft::eraftpb::Entry;
use std::collections::BTreeSet;
use std::fs::OpenOptions;
use std::path::PathBuf;

Expand Down Expand Up @@ -617,6 +618,15 @@ mod tests {
reader(e.index, entry_index.entries.unwrap().id.queue, &e.data);
}
}

fn file_count(&self, queue: Option<LogQueue>) -> usize {
if let Some(queue) = queue {
let (a, b) = self.file_span(queue);
(b - a + 1) as usize
} else {
self.file_count(Some(LogQueue::Append)) + self.file_count(Some(LogQueue::Rewrite))
}
}
}

#[test]
Expand Down Expand Up @@ -1633,4 +1643,132 @@ mod tests {
engine.clean(rid);
assert!(engine.is_empty());
}

pub struct DeleteMonitoredFileSystem {
inner: ObfuscatedFileSystem,
append_metadata: Mutex<BTreeSet<u64>>,
}

impl DeleteMonitoredFileSystem {
fn new() -> Self {
Self {
inner: ObfuscatedFileSystem::default(),
append_metadata: Mutex::new(BTreeSet::new()),
}
}

fn update_metadata(&self, path: &Path, delete: bool) -> bool {
let id = FileId::parse_file_name(path.file_name().unwrap().to_str().unwrap()).unwrap();
if id.queue == LogQueue::Append {
if delete {
self.append_metadata.lock().unwrap().remove(&id.seq)
} else {
self.append_metadata.lock().unwrap().insert(id.seq)
}
} else {
false
}
}
}

impl FileSystem for DeleteMonitoredFileSystem {
type Handle = <ObfuscatedFileSystem as FileSystem>::Handle;
type Reader = <ObfuscatedFileSystem as FileSystem>::Reader;
type Writer = <ObfuscatedFileSystem as FileSystem>::Writer;

fn create<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> {
let handle = self.inner.create(&path)?;
self.update_metadata(path.as_ref(), false);
Ok(handle)
}

fn open<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> {
let handle = self.inner.open(&path)?;
self.update_metadata(path.as_ref(), false);
Ok(handle)
}

fn delete<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
self.inner.delete(&path)?;
self.update_metadata(path.as_ref(), true);
Ok(())
}

fn delete_metadata<P: AsRef<Path>>(&self, path: P) -> std::io::Result<bool> {
Ok(self.inner.delete_metadata(&path)? | self.update_metadata(path.as_ref(), true))
}

fn exists_metadata<P: AsRef<Path>>(&self, path: P) -> bool {
if self.inner.exists_metadata(&path) {
return true;
}
let id = FileId::parse_file_name(path.as_ref().file_name().unwrap().to_str().unwrap())
.unwrap();
if id.queue == LogQueue::Append {
self.append_metadata.lock().unwrap().contains(&id.seq)
} else {
false
}
}

fn new_reader(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Reader> {
self.inner.new_reader(h)
}

fn new_writer(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Writer> {
self.inner.new_writer(h)
}
}

#[test]
fn test_managed_file_deletion() {
let dir = tempfile::Builder::new()
.prefix("test_managed_file_deletion")
.tempdir()
.unwrap();
let entry_data = vec![b'x'; 128];
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
purge_threshold: ReadableSize(1),
..Default::default()
};
let fs = Arc::new(DeleteMonitoredFileSystem::new());

let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap();
for rid in 1..=10 {
engine.append(rid, 1, 11, Some(&entry_data));
}
for rid in 1..=5 {
engine.clean(rid);
}
let (start, _) = engine.file_span(LogQueue::Append);
engine.purge_expired_files().unwrap();
assert!(start < engine.file_span(LogQueue::Append).0);
assert_eq!(engine.file_count(None), fs.inner.file_count());
let (start, _) = engine.file_span(LogQueue::Append);
assert_eq!(
fs.append_metadata.lock().unwrap().iter().next().unwrap(),
&start
);

let engine = engine.reopen();
assert_eq!(engine.file_count(None), fs.inner.file_count());
let (start, _) = engine.file_span(LogQueue::Append);
assert_eq!(
fs.append_metadata.lock().unwrap().iter().next().unwrap(),
&start
);

// Simulate stale metadata.
for i in start / 2..start {
fs.append_metadata.lock().unwrap().insert(i);
}
let engine = engine.reopen();
let (start, _) = engine.file_span(LogQueue::Append);
assert_eq!(
fs.append_metadata.lock().unwrap().iter().next().unwrap(),
&start
);
}
}
4 changes: 4 additions & 0 deletions src/env/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ impl FileSystem for DefaultFileSystem {
LogFd::open(path.as_ref())
}

fn delete<P: AsRef<Path>>(&self, path: P) -> IoResult<()> {
std::fs::remove_file(path)
}

fn new_reader(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Reader> {
Ok(LogFile::new(handle))
}
Expand Down
21 changes: 21 additions & 0 deletions src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,29 @@ pub trait FileSystem: Send + Sync {
type Writer: Seek + Write + Send + WriteExt;

fn create<P: AsRef<Path>>(&self, path: P) -> Result<Self::Handle>;

fn open<P: AsRef<Path>>(&self, path: P) -> Result<Self::Handle>;

fn delete<P: AsRef<Path>>(&self, path: P) -> Result<()>;

/// Deletes user implemented metadata associated with `path`. Returns
/// `true` if any metadata is deleted.
///
/// In older versions of Raft Engine, physical files are deleted without
/// going through user implemented cleanup procedure. This method is used to
/// detect and cleanup the user metadata that is no longer mapped to a
/// physical file.
fn delete_metadata<P: AsRef<Path>>(&self, _path: P) -> Result<bool> {
Ok(false)
}

/// Returns whether there is any user metadata associated with given `path`.
fn exists_metadata<P: AsRef<Path>>(&self, _path: P) -> bool {
false
}

fn new_reader(&self, handle: Arc<Self::Handle>) -> Result<Self::Reader>;

fn new_writer(&self, handle: Arc<Self::Handle>) -> Result<Self::Writer>;
}

Expand Down
41 changes: 33 additions & 8 deletions src/env/obfuscated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use crate::env::{DefaultFileSystem, FileSystem, WriteExt};
Expand Down Expand Up @@ -68,11 +69,23 @@ impl WriteExt for ObfuscatedWriter {
/// `[ObfuscatedFileSystem]` is a special implementation of `[FileSystem]`,
/// which is used for constructing and simulating an abnormal file system for
/// `[Read]` and `[Write]`.
pub struct ObfuscatedFileSystem(DefaultFileSystem);
pub struct ObfuscatedFileSystem {
inner: DefaultFileSystem,
files: AtomicUsize,
}

impl Default for ObfuscatedFileSystem {
fn default() -> Self {
ObfuscatedFileSystem(DefaultFileSystem)
ObfuscatedFileSystem {
inner: DefaultFileSystem,
files: AtomicUsize::new(0),
}
}
}

impl ObfuscatedFileSystem {
pub fn file_count(&self) -> usize {
self.files.load(Ordering::Relaxed)
}
}

Expand All @@ -82,18 +95,30 @@ impl FileSystem for ObfuscatedFileSystem {
type Writer = ObfuscatedWriter;

fn create<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
self.0.create(path)
let r = self.inner.create(path);
if r.is_ok() {
self.files.fetch_add(1, Ordering::Relaxed);
}
r
}

fn open<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
self.0.open(path)
self.inner.open(path)
}

fn delete<P: AsRef<Path>>(&self, path: P) -> IoResult<()> {
let r = self.inner.delete(path);
if r.is_ok() {
self.files.fetch_sub(1, Ordering::Relaxed);
}
r
}

fn new_reader(&self, inner: Arc<Self::Handle>) -> IoResult<Self::Reader> {
Ok(ObfuscatedReader(self.0.new_reader(inner)?))
fn new_reader(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Reader> {
Ok(ObfuscatedReader(self.inner.new_reader(handle)?))
}

fn new_writer(&self, inner: Arc<Self::Handle>) -> IoResult<Self::Writer> {
Ok(ObfuscatedWriter(self.0.new_writer(inner)?))
fn new_writer(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Writer> {
Ok(ObfuscatedWriter(self.inner.new_writer(handle)?))
}
}
14 changes: 6 additions & 8 deletions src/file_pipe_log/pipe.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.

use std::collections::VecDeque;
use std::fs::{self, File};
use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;

use crossbeam::utils::CachePadded;
use fail::fail_point;
use log::{error, warn};
use log::error;
use num_traits::FromPrimitive;
use parking_lot::{Mutex, MutexGuard, RwLock};

Expand Down Expand Up @@ -300,17 +300,15 @@ impl<F: FileSystem> SinglePipe<F> {
let path = file_id.build_file_path(&self.dir);
#[cfg(feature = "failpoints")]
{
let remove_failure = || {
fail::fail_point!("file_pipe_log::remove_file_failure", |_| true);
let remove_skipped = || {
fail::fail_point!("file_pipe_log::remove_file_skipped", |_| true);
false
};
if remove_failure() {
if remove_skipped() {
continue;
}
}
if let Err(e) = fs::remove_file(&path) {
warn!("Remove purged log file {:?} failed: {}", path, e);
}
self.file_system.delete(&path)?;
}
Ok(purged)
}
Expand Down
33 changes: 33 additions & 0 deletions src/file_pipe_log/pipe_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,39 @@ impl<F: FileSystem> DualPipesBuilder<F> {
),
] {
if max_id > 0 {
// Try to cleanup stale metadata left by the previous version.
let max_sample = 100;
// Find the first obsolete metadata.
let mut delete_start = None;
for i in 0..max_sample {
let seq = i * min_id / max_sample;
let file_id = FileId { queue, seq };
let path = file_id.build_file_path(dir);
if self.file_system.exists_metadata(&path) {
delete_start = Some(i.saturating_sub(1) * min_id / max_sample + 1);
break;
}
}
// Delete metadata starting from the oldest. Abort on error.
if let Some(start) = delete_start {
let mut success = 0;
for seq in start..min_id {
let file_id = FileId { queue, seq };
let path = file_id.build_file_path(dir);
match self.file_system.delete_metadata(&path) {
Err(e) => {
error!("failed to delete metadata of {}: {}.", path.display(), e);
break;
}
Ok(true) => success += 1,
_ => {}
}
}
warn!(
"deleted {} stale files of {:?} in range [{}, {}).",
success, queue, start, min_id,
);
}
for seq in min_id..=max_id {
let file_id = FileId { queue, seq };
let path = file_id.build_file_path(dir);
Expand Down
5 changes: 2 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
#![cfg_attr(feature = "swap", feature(nonnull_slice_from_raw_parts))]
#![cfg_attr(feature = "swap", feature(slice_ptr_len))]
#![cfg_attr(feature = "swap", feature(alloc_layout_extra))]
// For testing only.
#![cfg_attr(feature = "swap", feature(alloc_error_hook))]
#![cfg_attr(feature = "swap", feature(cfg_sanitize))]
#![cfg_attr(all(test, feature = "swap"), feature(alloc_error_hook))]
#![cfg_attr(all(test, feature = "swap"), feature(cfg_sanitize))]

#[macro_use]
extern crate lazy_static;
Expand Down
2 changes: 1 addition & 1 deletion tests/failpoints/test_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ fn test_incomplete_purge() {
let engine = Engine::open(cfg.clone()).unwrap();

{
let _f = FailGuard::new("file_pipe_log::remove_file_failure", "return");
let _f = FailGuard::new("file_pipe_log::remove_file_skipped", "return");
append(&engine, rid, 0, 20, Some(&data));
let append_first = engine.file_span(LogQueue::Append).0;
engine.compact_to(rid, 18);
Expand Down

0 comments on commit a950ec5

Please sign in to comment.