Skip to content

Commit

Permalink
Fix GC may delete a already deleted blob file (#168) (#169)
Browse files Browse the repository at this point in the history
  • Loading branch information
Connor1996 committed Jun 4, 2020
1 parent 80657c0 commit 81814ec
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 15 deletions.
6 changes: 3 additions & 3 deletions src/blob_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,17 +234,16 @@ class BlobFileMeta {
uint64_t file_number() const { return file_number_; }
uint64_t file_size() const { return file_size_; }
uint64_t live_data_size() const { return live_data_size_; }
void set_live_data_size(uint64_t size) { live_data_size_ = size; }
uint64_t file_entries() const { return file_entries_; }
uint32_t file_level() const { return file_level_; }
const std::string& smallest_key() const { return smallest_key_; }
const std::string& largest_key() const { return largest_key_; }

void set_live_data_size(uint64_t size) { live_data_size_ = size; }
uint64_t file_entries() const { return file_entries_; }
FileState file_state() const { return state_; }
bool is_obsolete() const { return state_ == FileState::kObsolete; }

void FileStateTransit(const FileEvent& event);

bool UpdateLiveDataSize(int64_t delta) {
int64_t result = static_cast<int64_t>(live_data_size_) + delta;
if (result < 0) {
Expand All @@ -267,6 +266,7 @@ class BlobFileMeta {

private:
// Persistent field

uint64_t file_number_{0};
uint64_t file_size_{0};
uint64_t file_entries_;
Expand Down
4 changes: 2 additions & 2 deletions src/blob_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
namespace rocksdb {
namespace titandb {

BlobGC::BlobGC(std::vector<BlobFileMeta*>&& blob_files,
BlobGC::BlobGC(std::vector<std::shared_ptr<BlobFileMeta>>&& blob_files,
TitanCFOptions&& _titan_cf_options, bool need_trigger_next)
: inputs_(std::move(blob_files)),
: inputs_(blob_files),
titan_cf_options_(std::move(_titan_cf_options)),
trigger_next_(need_trigger_next) {
MarkFilesBeingGC();
Expand Down
6 changes: 3 additions & 3 deletions src/blob_gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace titandb {
// A BlobGC encapsulates information about a blob gc.
class BlobGC {
public:
BlobGC(std::vector<BlobFileMeta*>&& blob_files,
BlobGC(std::vector<std::shared_ptr<BlobFileMeta>>&& blob_files,
TitanCFOptions&& _titan_cf_options, bool need_trigger_next);

// No copying allowed
Expand All @@ -21,7 +21,7 @@ class BlobGC {

~BlobGC();

const std::vector<BlobFileMeta*>& inputs() { return inputs_; }
const std::vector<std::shared_ptr<BlobFileMeta>>& inputs() { return inputs_; }

const TitanCFOptions& titan_cf_options() { return titan_cf_options_; }

Expand All @@ -40,7 +40,7 @@ class BlobGC {
bool trigger_next() { return trigger_next_; }

private:
std::vector<BlobFileMeta*> inputs_;
std::vector<std::shared_ptr<BlobFileMeta>> inputs_;
std::vector<BlobFileMeta*> outputs_;
TitanCFOptions titan_cf_options_;
ColumnFamilyHandle* cfh_{nullptr};
Expand Down
9 changes: 5 additions & 4 deletions src/blob_gc_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,6 @@ Status BlobGCJob::Finish() {
mutex_->Lock();
}

// TODO(@DorianZheng) cal discardable size for new blob file

if (s.ok() && !blob_gc_->GetColumnFamilyData()->IsDropped()) {
s = DeleteInputBlobFiles();
}
Expand Down Expand Up @@ -567,11 +565,14 @@ Status BlobGCJob::DeleteInputBlobFiles() {
metrics_.gc_num_files++;
RecordInHistogram(statistics(stats_), TITAN_GC_INPUT_FILE_SIZE,
file->file_size());
if (file->is_obsolete()) {
// There may be a concurrent DeleteBlobFilesInRanges or GC,
// so the input file is already deleted.
continue;
}
edit.DeleteBlobFile(file->file_number(), obsolete_sequence);
}
s = blob_file_set_->LogAndApply(edit);
// TODO(@DorianZheng) Purge pending outputs
// base_db_->pending_outputs_.erase(handle->GetNumber());
return s;
}

Expand Down
2 changes: 1 addition & 1 deletion src/blob_gc_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class BlobGCJobTest : public testing::TestWithParam<bool /*gc_merge_mode*/> {
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), key, res));
auto rewrite_status = base_db_->Write(WriteOptions(), &wb);

std::vector<BlobFileMeta*> tmp;
std::vector<std::shared_ptr<BlobFileMeta>> tmp;
BlobGC blob_gc(std::move(tmp), TitanCFOptions(), false /*trigger_next*/);
blob_gc.SetColumnFamily(cfh);
BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(),
Expand Down
4 changes: 2 additions & 2 deletions src/blob_gc_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ BasicBlobGCPicker::~BasicBlobGCPicker() {}
std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
BlobStorage* blob_storage) {
Status s;
std::vector<BlobFileMeta*> blob_files;
std::vector<std::shared_ptr<BlobFileMeta>> blob_files;

uint64_t batch_size = 0;
uint64_t estimate_output_size = 0;
Expand All @@ -39,7 +39,7 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
continue;
}
if (!stop_picking) {
blob_files.push_back(blob_file.get());
blob_files.emplace_back(blob_file);
batch_size += blob_file->file_size();
estimate_output_size += blob_file->live_data_size();
if (batch_size >= cf_options_.max_gc_batch_size ||
Expand Down
1 change: 1 addition & 0 deletions src/db_impl_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer,
s = blob_gc_job.Prepare();
if (s.ok()) {
mutex_.Unlock();
TEST_SYNC_POINT("TitanDBImpl::BackgroundGC::BeforeRunGCJob");
s = blob_gc_job.Run();
TEST_SYNC_POINT("TitanDBImpl::BackgroundGC::AfterRunGCJob");
mutex_.Lock();
Expand Down
48 changes: 48 additions & 0 deletions src/titan_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1501,6 +1501,54 @@ TEST_F(TitanDBTest, CompactionDuringGC) {
CheckBlobFileCount(0);
}

TEST_F(TitanDBTest, DeleteFilesInRangeDuringGC) {
options_.max_background_gc = 1;
options_.disable_background_gc = false;
options_.blob_file_discardable_ratio = 0.01;
Open();

ASSERT_OK(db_->Put(WriteOptions(), "k1", std::string(10 * 1024, 'v')));
auto snap = db_->GetSnapshot();
ASSERT_OK(db_->Put(WriteOptions(), "k1", std::string(100 * 1024, 'v')));
Flush();

db_->ReleaseSnapshot(snap);

SyncPoint::GetInstance()->LoadDependency(
{{"TitanDBImpl::BackgroundGC::BeforeRunGCJob",
"TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCStart"},
{"TitanDBTest::DeleteFilesInRangeDuringGC::ContinueGC",
"BlobGCJob::Finish::BeforeRewriteValidKeyToLSM"},
{"BlobGCJob::Finish::AfterRewriteValidKeyToLSM",
"TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCFinish"}});
SyncPoint::GetInstance()->EnableProcessing();

CheckBlobFileCount(1);
CompactAll();
std::shared_ptr<BlobStorage> blob_storage = GetBlobStorage().lock();
ASSERT_TRUE(blob_storage != nullptr);
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
blob_storage->ExportBlobFiles(blob_files);
ASSERT_EQ(blob_files.size(), 1);

// trigger GC
CompactAll();

TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCStart");
DeleteFilesInRange(nullptr, nullptr);

TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::ContinueGC");
TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCFinish");

std::string value;
Status s = db_->Get(ReadOptions(), "k1", &value);
ASSERT_TRUE(s.IsNotFound());
// it shouldn't be any background error
ASSERT_OK(db_->Flush(FlushOptions()));

SyncPoint::GetInstance()->DisableProcessing();
}

} // namespace titandb
} // namespace rocksdb

Expand Down

0 comments on commit 81814ec

Please sign in to comment.