From 74827a45d6f73d30eee142b036e6fae0e9681416 Mon Sep 17 00:00:00 2001 From: Connor Date: Fri, 29 May 2020 15:15:46 +0800 Subject: [PATCH] Fix GC may delete a already deleted blob file (#168) * add deletefilesinrange during gc case Signed-off-by: Connor1996 * check obsolete before apply Signed-off-by: Connor1996 --- src/blob_format.h | 13 ++++---- src/blob_gc.cc | 4 +-- src/blob_gc.h | 6 ++-- src/blob_gc_job.cc | 11 ++++--- src/blob_gc_job_test.cc | 2 +- src/blob_gc_picker.cc | 4 +-- src/db_impl_gc.cc | 1 + src/titan_db_test.cc | 69 +++++++++++++++++++++++++++++++++++++++-- 8 files changed, 89 insertions(+), 21 deletions(-) diff --git a/src/blob_format.h b/src/blob_format.h index 19c5aa352..7485d1455 100644 --- a/src/blob_format.h +++ b/src/blob_format.h @@ -234,17 +234,16 @@ class BlobFileMeta { uint64_t file_number() const { return file_number_; } uint64_t file_size() const { return file_size_; } uint64_t live_data_size() const { return live_data_size_; } - void set_live_data_size(uint64_t size) { live_data_size_ = size; } - uint64_t file_entries() const { return file_entries_; } uint32_t file_level() const { return file_level_; } const std::string& smallest_key() const { return smallest_key_; } const std::string& largest_key() const { return largest_key_; } + void set_live_data_size(uint64_t size) { live_data_size_ = size; } + uint64_t file_entries() const { return file_entries_; } FileState file_state() const { return state_; } bool is_obsolete() const { return state_ == FileState::kObsolete; } void FileStateTransit(const FileEvent& event); - bool UpdateLiveDataSize(int64_t delta) { int64_t result = static_cast(live_data_size_) + delta; if (result < 0) { @@ -267,10 +266,9 @@ class BlobFileMeta { private: // Persistent field + uint64_t file_number_{0}; uint64_t file_size_{0}; - // Size of data with reference from SST files. - uint64_t live_data_size_{0}; uint64_t file_entries_; // Target level of compaction/flush which generates this blob file uint32_t file_level_; @@ -280,7 +278,10 @@ class BlobFileMeta { std::string largest_key_; // Not persistent field - FileState state_{FileState::kInit}; + + // Size of data with reference from SST files. + std::atomic live_data_size_{0}; + std::atomic state_{FileState::kInit}; }; // Format of blob file header for version 1 (8 bytes): diff --git a/src/blob_gc.cc b/src/blob_gc.cc index 816ca6e5d..9fe6cd2d6 100644 --- a/src/blob_gc.cc +++ b/src/blob_gc.cc @@ -3,9 +3,9 @@ namespace rocksdb { namespace titandb { -BlobGC::BlobGC(std::vector&& blob_files, +BlobGC::BlobGC(std::vector>&& blob_files, TitanCFOptions&& _titan_cf_options, bool need_trigger_next) - : inputs_(std::move(blob_files)), + : inputs_(blob_files), titan_cf_options_(std::move(_titan_cf_options)), trigger_next_(need_trigger_next) { MarkFilesBeingGC(); diff --git a/src/blob_gc.h b/src/blob_gc.h index e58483d69..508c08af9 100644 --- a/src/blob_gc.h +++ b/src/blob_gc.h @@ -12,7 +12,7 @@ namespace titandb { // A BlobGC encapsulates information about a blob gc. class BlobGC { public: - BlobGC(std::vector&& blob_files, + BlobGC(std::vector>&& blob_files, TitanCFOptions&& _titan_cf_options, bool need_trigger_next); // No copying allowed @@ -21,7 +21,7 @@ class BlobGC { ~BlobGC(); - const std::vector& inputs() { return inputs_; } + const std::vector>& inputs() { return inputs_; } const TitanCFOptions& titan_cf_options() { return titan_cf_options_; } @@ -40,7 +40,7 @@ class BlobGC { bool trigger_next() { return trigger_next_; } private: - std::vector inputs_; + std::vector> inputs_; std::vector outputs_; TitanCFOptions titan_cf_options_; ColumnFamilyHandle* cfh_{nullptr}; diff --git a/src/blob_gc_job.cc b/src/blob_gc_job.cc index c104192b7..5259e762b 100644 --- a/src/blob_gc_job.cc +++ b/src/blob_gc_job.cc @@ -347,6 +347,7 @@ Status BlobGCJob::Finish() { mutex_->Unlock(); s = InstallOutputBlobFiles(); if (s.ok()) { + TEST_SYNC_POINT("BlobGCJob::Finish::BeforeRewriteValidKeyToLSM"); s = RewriteValidKeyToLSM(); if (!s.ok()) { ROCKS_LOG_ERROR(db_options_.info_log, @@ -363,11 +364,10 @@ Status BlobGCJob::Finish() { mutex_->Lock(); } - // TODO(@DorianZheng) cal discardable size for new blob file - if (s.ok() && !blob_gc_->GetColumnFamilyData()->IsDropped()) { s = DeleteInputBlobFiles(); } + TEST_SYNC_POINT("BlobGCJob::Finish::AfterRewriteValidKeyToLSM"); if (s.ok()) { UpdateInternalOpStats(); @@ -531,11 +531,14 @@ Status BlobGCJob::DeleteInputBlobFiles() { metrics_.gc_num_files++; RecordInHistogram(statistics(stats_), TITAN_GC_INPUT_FILE_SIZE, file->file_size()); + if (file->is_obsolete()) { + // There may be a concurrent DeleteBlobFilesInRanges or GC, + // so the input file is already deleted. + continue; + } edit.DeleteBlobFile(file->file_number(), obsolete_sequence); } s = blob_file_set_->LogAndApply(edit); - // TODO(@DorianZheng) Purge pending outputs - // base_db_->pending_outputs_.erase(handle->GetNumber()); return s; } diff --git a/src/blob_gc_job_test.cc b/src/blob_gc_job_test.cc index 560b6a7c3..c697a84ca 100644 --- a/src/blob_gc_job_test.cc +++ b/src/blob_gc_job_test.cc @@ -213,7 +213,7 @@ class BlobGCJobTest : public testing::TestWithParam { ASSERT_OK(WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), key, res)); auto rewrite_status = base_db_->Write(WriteOptions(), &wb); - std::vector tmp; + std::vector> tmp; BlobGC blob_gc(std::move(tmp), TitanCFOptions(), false /*trigger_next*/); blob_gc.SetColumnFamily(cfh); BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(), diff --git a/src/blob_gc_picker.cc b/src/blob_gc_picker.cc index ff9ae35dc..0d3d221bf 100644 --- a/src/blob_gc_picker.cc +++ b/src/blob_gc_picker.cc @@ -19,7 +19,7 @@ BasicBlobGCPicker::~BasicBlobGCPicker() {} std::unique_ptr BasicBlobGCPicker::PickBlobGC( BlobStorage* blob_storage) { Status s; - std::vector blob_files; + std::vector> blob_files; uint64_t batch_size = 0; uint64_t estimate_output_size = 0; @@ -39,7 +39,7 @@ std::unique_ptr BasicBlobGCPicker::PickBlobGC( continue; } if (!stop_picking) { - blob_files.push_back(blob_file.get()); + blob_files.emplace_back(blob_file); batch_size += blob_file->file_size(); estimate_output_size += blob_file->live_data_size(); if (batch_size >= cf_options_.max_gc_batch_size || diff --git a/src/db_impl_gc.cc b/src/db_impl_gc.cc index cb587716c..a35bb278a 100644 --- a/src/db_impl_gc.cc +++ b/src/db_impl_gc.cc @@ -211,6 +211,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, s = blob_gc_job.Prepare(); if (s.ok()) { mutex_.Unlock(); + TEST_SYNC_POINT("TitanDBImpl::BackgroundGC::BeforeRunGCJob"); s = blob_gc_job.Run(); mutex_.Lock(); } diff --git a/src/titan_db_test.cc b/src/titan_db_test.cc index ddec98e84..ae8b0ebcb 100644 --- a/src/titan_db_test.cc +++ b/src/titan_db_test.cc @@ -155,6 +155,17 @@ class TitanDBTest : public testing::Test { return db_impl_->blob_file_set_->GetBlobStorage(cf_handle->GetID()); } + void CheckBlobFileCount(int count, ColumnFamilyHandle* cf_handle = nullptr) { + db_impl_->TEST_WaitForBackgroundGC(); + ASSERT_OK(db_impl_->TEST_PurgeObsoleteFiles()); + std::shared_ptr blob_storage = + GetBlobStorage(cf_handle).lock(); + ASSERT_TRUE(blob_storage != nullptr); + std::map> blob_files; + blob_storage->ExportBlobFiles(blob_files); + ASSERT_EQ(count, blob_files.size()); + } + ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t cf_id) { return db_impl_->db_impl_->GetColumnFamilyHandleUnlocked(cf_id).release(); } @@ -220,13 +231,17 @@ class TitanDBTest : public testing::Test { } } - void CompactAll() { + void CompactAll(ColumnFamilyHandle* cf_handle = nullptr) { + if (cf_handle == nullptr) { + cf_handle = db_->DefaultColumnFamily(); + } auto opts = db_->GetOptions(); auto compact_opts = CompactRangeOptions(); compact_opts.change_level = true; compact_opts.target_level = opts.num_levels - 1; - compact_opts.bottommost_level_compaction = BottommostLevelCompaction::kSkip; - ASSERT_OK(db_->CompactRange(compact_opts, nullptr, nullptr)); + compact_opts.bottommost_level_compaction = + BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(compact_opts, cf_handle, nullptr, nullptr)); } void DeleteFilesInRange(const Slice* begin, const Slice* end) { @@ -1386,6 +1401,54 @@ TEST_F(TitanDBTest, GCAfterReopen) { ASSERT_GT(file2->file_number(), file_number1); } +TEST_F(TitanDBTest, DeleteFilesInRangeDuringGC) { + options_.max_background_gc = 1; + options_.disable_background_gc = false; + options_.blob_file_discardable_ratio = 0.01; + Open(); + + ASSERT_OK(db_->Put(WriteOptions(), "k1", std::string(10 * 1024, 'v'))); + auto snap = db_->GetSnapshot(); + ASSERT_OK(db_->Put(WriteOptions(), "k1", std::string(100 * 1024, 'v'))); + Flush(); + + db_->ReleaseSnapshot(snap); + + SyncPoint::GetInstance()->LoadDependency( + {{"TitanDBImpl::BackgroundGC::BeforeRunGCJob", + "TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCStart"}, + {"TitanDBTest::DeleteFilesInRangeDuringGC::ContinueGC", + "BlobGCJob::Finish::BeforeRewriteValidKeyToLSM"}, + {"BlobGCJob::Finish::AfterRewriteValidKeyToLSM", + "TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCFinish"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + CheckBlobFileCount(1); + CompactAll(); + std::shared_ptr blob_storage = GetBlobStorage().lock(); + ASSERT_TRUE(blob_storage != nullptr); + std::map> blob_files; + blob_storage->ExportBlobFiles(blob_files); + ASSERT_EQ(blob_files.size(), 1); + + // trigger GC + CompactAll(); + + TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCStart"); + DeleteFilesInRange(nullptr, nullptr); + + TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::ContinueGC"); + TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCFinish"); + + std::string value; + Status s = db_->Get(ReadOptions(), "k1", &value); + ASSERT_TRUE(s.IsNotFound()); + // it shouldn't be any background error + ASSERT_OK(db_->Flush(FlushOptions())); + + SyncPoint::GetInstance()->DisableProcessing(); +} + } // namespace titandb } // namespace rocksdb