Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix GC may delete a already deleted blob file (#168) #169

Merged
merged 2 commits into from
Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/blob_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,17 +234,16 @@ class BlobFileMeta {
uint64_t file_number() const { return file_number_; }
uint64_t file_size() const { return file_size_; }
uint64_t live_data_size() const { return live_data_size_; }
void set_live_data_size(uint64_t size) { live_data_size_ = size; }
uint64_t file_entries() const { return file_entries_; }
uint32_t file_level() const { return file_level_; }
const std::string& smallest_key() const { return smallest_key_; }
const std::string& largest_key() const { return largest_key_; }

void set_live_data_size(uint64_t size) { live_data_size_ = size; }
uint64_t file_entries() const { return file_entries_; }
FileState file_state() const { return state_; }
bool is_obsolete() const { return state_ == FileState::kObsolete; }

void FileStateTransit(const FileEvent& event);

bool UpdateLiveDataSize(int64_t delta) {
int64_t result = static_cast<int64_t>(live_data_size_) + delta;
if (result < 0) {
Expand All @@ -267,10 +266,9 @@ class BlobFileMeta {

private:
// Persistent field

uint64_t file_number_{0};
uint64_t file_size_{0};
// Size of data with reference from SST files.
uint64_t live_data_size_{0};
uint64_t file_entries_;
// Target level of compaction/flush which generates this blob file
uint32_t file_level_;
Expand All @@ -280,7 +278,10 @@ class BlobFileMeta {
std::string largest_key_;

// Not persistent field
FileState state_{FileState::kInit};

// Size of data with reference from SST files.
std::atomic<uint64_t> live_data_size_{0};
std::atomic<FileState> state_{FileState::kInit};
};

// Format of blob file header for version 1 (8 bytes):
Expand Down
4 changes: 2 additions & 2 deletions src/blob_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
namespace rocksdb {
namespace titandb {

BlobGC::BlobGC(std::vector<BlobFileMeta*>&& blob_files,
BlobGC::BlobGC(std::vector<std::shared_ptr<BlobFileMeta>>&& blob_files,
TitanCFOptions&& _titan_cf_options, bool need_trigger_next)
: inputs_(std::move(blob_files)),
: inputs_(blob_files),
titan_cf_options_(std::move(_titan_cf_options)),
trigger_next_(need_trigger_next) {
MarkFilesBeingGC();
Expand Down
6 changes: 3 additions & 3 deletions src/blob_gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace titandb {
// A BlobGC encapsulates information about a blob gc.
class BlobGC {
public:
BlobGC(std::vector<BlobFileMeta*>&& blob_files,
BlobGC(std::vector<std::shared_ptr<BlobFileMeta>>&& blob_files,
TitanCFOptions&& _titan_cf_options, bool need_trigger_next);

// No copying allowed
Expand All @@ -21,7 +21,7 @@ class BlobGC {

~BlobGC();

const std::vector<BlobFileMeta*>& inputs() { return inputs_; }
const std::vector<std::shared_ptr<BlobFileMeta>>& inputs() { return inputs_; }

const TitanCFOptions& titan_cf_options() { return titan_cf_options_; }

Expand All @@ -40,7 +40,7 @@ class BlobGC {
bool trigger_next() { return trigger_next_; }

private:
std::vector<BlobFileMeta*> inputs_;
std::vector<std::shared_ptr<BlobFileMeta>> inputs_;
std::vector<BlobFileMeta*> outputs_;
TitanCFOptions titan_cf_options_;
ColumnFamilyHandle* cfh_{nullptr};
Expand Down
11 changes: 7 additions & 4 deletions src/blob_gc_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ Status BlobGCJob::Finish() {
mutex_->Unlock();
s = InstallOutputBlobFiles();
if (s.ok()) {
TEST_SYNC_POINT("BlobGCJob::Finish::BeforeRewriteValidKeyToLSM");
s = RewriteValidKeyToLSM();
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
Expand All @@ -363,11 +364,10 @@ Status BlobGCJob::Finish() {
mutex_->Lock();
}

// TODO(@DorianZheng) cal discardable size for new blob file

if (s.ok() && !blob_gc_->GetColumnFamilyData()->IsDropped()) {
s = DeleteInputBlobFiles();
}
TEST_SYNC_POINT("BlobGCJob::Finish::AfterRewriteValidKeyToLSM");

if (s.ok()) {
UpdateInternalOpStats();
Expand Down Expand Up @@ -531,11 +531,14 @@ Status BlobGCJob::DeleteInputBlobFiles() {
metrics_.gc_num_files++;
RecordInHistogram(statistics(stats_), TITAN_GC_INPUT_FILE_SIZE,
file->file_size());
if (file->is_obsolete()) {
// There may be a concurrent DeleteBlobFilesInRanges or GC,
// so the input file is already deleted.
continue;
}
edit.DeleteBlobFile(file->file_number(), obsolete_sequence);
}
s = blob_file_set_->LogAndApply(edit);
// TODO(@DorianZheng) Purge pending outputs
// base_db_->pending_outputs_.erase(handle->GetNumber());
return s;
}

Expand Down
2 changes: 1 addition & 1 deletion src/blob_gc_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class BlobGCJobTest : public testing::TestWithParam<bool /*gc_merge_mode*/> {
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), key, res));
auto rewrite_status = base_db_->Write(WriteOptions(), &wb);

std::vector<BlobFileMeta*> tmp;
std::vector<std::shared_ptr<BlobFileMeta>> tmp;
BlobGC blob_gc(std::move(tmp), TitanCFOptions(), false /*trigger_next*/);
blob_gc.SetColumnFamily(cfh);
BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(),
Expand Down
4 changes: 2 additions & 2 deletions src/blob_gc_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ BasicBlobGCPicker::~BasicBlobGCPicker() {}
std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
BlobStorage* blob_storage) {
Status s;
std::vector<BlobFileMeta*> blob_files;
std::vector<std::shared_ptr<BlobFileMeta>> blob_files;

uint64_t batch_size = 0;
uint64_t estimate_output_size = 0;
Expand All @@ -39,7 +39,7 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
continue;
}
if (!stop_picking) {
blob_files.push_back(blob_file.get());
blob_files.emplace_back(blob_file);
batch_size += blob_file->file_size();
estimate_output_size += blob_file->live_data_size();
if (batch_size >= cf_options_.max_gc_batch_size ||
Expand Down
1 change: 1 addition & 0 deletions src/db_impl_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer,
s = blob_gc_job.Prepare();
if (s.ok()) {
mutex_.Unlock();
TEST_SYNC_POINT("TitanDBImpl::BackgroundGC::BeforeRunGCJob");
s = blob_gc_job.Run();
mutex_.Lock();
}
Expand Down
69 changes: 66 additions & 3 deletions src/titan_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,17 @@ class TitanDBTest : public testing::Test {
return db_impl_->blob_file_set_->GetBlobStorage(cf_handle->GetID());
}

void CheckBlobFileCount(int count, ColumnFamilyHandle* cf_handle = nullptr) {
db_impl_->TEST_WaitForBackgroundGC();
ASSERT_OK(db_impl_->TEST_PurgeObsoleteFiles());
std::shared_ptr<BlobStorage> blob_storage =
GetBlobStorage(cf_handle).lock();
ASSERT_TRUE(blob_storage != nullptr);
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
blob_storage->ExportBlobFiles(blob_files);
ASSERT_EQ(count, blob_files.size());
}

ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t cf_id) {
return db_impl_->db_impl_->GetColumnFamilyHandleUnlocked(cf_id).release();
}
Expand Down Expand Up @@ -220,13 +231,17 @@ class TitanDBTest : public testing::Test {
}
}

void CompactAll() {
void CompactAll(ColumnFamilyHandle* cf_handle = nullptr) {
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
auto opts = db_->GetOptions();
auto compact_opts = CompactRangeOptions();
compact_opts.change_level = true;
compact_opts.target_level = opts.num_levels - 1;
compact_opts.bottommost_level_compaction = BottommostLevelCompaction::kSkip;
ASSERT_OK(db_->CompactRange(compact_opts, nullptr, nullptr));
compact_opts.bottommost_level_compaction =
BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(compact_opts, cf_handle, nullptr, nullptr));
}

void DeleteFilesInRange(const Slice* begin, const Slice* end) {
Expand Down Expand Up @@ -1386,6 +1401,54 @@ TEST_F(TitanDBTest, GCAfterReopen) {
ASSERT_GT(file2->file_number(), file_number1);
}

TEST_F(TitanDBTest, DeleteFilesInRangeDuringGC) {
options_.max_background_gc = 1;
options_.disable_background_gc = false;
options_.blob_file_discardable_ratio = 0.01;
Open();

ASSERT_OK(db_->Put(WriteOptions(), "k1", std::string(10 * 1024, 'v')));
auto snap = db_->GetSnapshot();
ASSERT_OK(db_->Put(WriteOptions(), "k1", std::string(100 * 1024, 'v')));
Flush();

db_->ReleaseSnapshot(snap);

SyncPoint::GetInstance()->LoadDependency(
{{"TitanDBImpl::BackgroundGC::BeforeRunGCJob",
"TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCStart"},
{"TitanDBTest::DeleteFilesInRangeDuringGC::ContinueGC",
"BlobGCJob::Finish::BeforeRewriteValidKeyToLSM"},
{"BlobGCJob::Finish::AfterRewriteValidKeyToLSM",
"TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCFinish"}});
SyncPoint::GetInstance()->EnableProcessing();

CheckBlobFileCount(1);
CompactAll();
std::shared_ptr<BlobStorage> blob_storage = GetBlobStorage().lock();
ASSERT_TRUE(blob_storage != nullptr);
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
blob_storage->ExportBlobFiles(blob_files);
ASSERT_EQ(blob_files.size(), 1);

// trigger GC
CompactAll();

TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCStart");
DeleteFilesInRange(nullptr, nullptr);

TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::ContinueGC");
TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCFinish");

std::string value;
Status s = db_->Get(ReadOptions(), "k1", &value);
ASSERT_TRUE(s.IsNotFound());
// it shouldn't be any background error
ASSERT_OK(db_->Flush(FlushOptions()));

SyncPoint::GetInstance()->DisableProcessing();
}

} // namespace titandb
} // namespace rocksdb

Expand Down