diff --git a/src/blob_format.h b/src/blob_format.h index 4ae1a5c74..8032f8566 100644 --- a/src/blob_format.h +++ b/src/blob_format.h @@ -234,17 +234,16 @@ class BlobFileMeta { uint64_t file_number() const { return file_number_; } uint64_t file_size() const { return file_size_; } uint64_t live_data_size() const { return live_data_size_; } - void set_live_data_size(uint64_t size) { live_data_size_ = size; } - uint64_t file_entries() const { return file_entries_; } uint32_t file_level() const { return file_level_; } const std::string& smallest_key() const { return smallest_key_; } const std::string& largest_key() const { return largest_key_; } + void set_live_data_size(uint64_t size) { live_data_size_ = size; } + uint64_t file_entries() const { return file_entries_; } FileState file_state() const { return state_; } bool is_obsolete() const { return state_ == FileState::kObsolete; } void FileStateTransit(const FileEvent& event); - bool UpdateLiveDataSize(int64_t delta) { int64_t result = static_cast(live_data_size_) + delta; if (result < 0) { @@ -267,6 +266,7 @@ class BlobFileMeta { private: // Persistent field + uint64_t file_number_{0}; uint64_t file_size_{0}; uint64_t file_entries_; diff --git a/src/blob_gc.cc b/src/blob_gc.cc index 816ca6e5d..9fe6cd2d6 100644 --- a/src/blob_gc.cc +++ b/src/blob_gc.cc @@ -3,9 +3,9 @@ namespace rocksdb { namespace titandb { -BlobGC::BlobGC(std::vector&& blob_files, +BlobGC::BlobGC(std::vector>&& blob_files, TitanCFOptions&& _titan_cf_options, bool need_trigger_next) - : inputs_(std::move(blob_files)), + : inputs_(blob_files), titan_cf_options_(std::move(_titan_cf_options)), trigger_next_(need_trigger_next) { MarkFilesBeingGC(); diff --git a/src/blob_gc.h b/src/blob_gc.h index e58483d69..508c08af9 100644 --- a/src/blob_gc.h +++ b/src/blob_gc.h @@ -12,7 +12,7 @@ namespace titandb { // A BlobGC encapsulates information about a blob gc. class BlobGC { public: - BlobGC(std::vector&& blob_files, + BlobGC(std::vector>&& blob_files, TitanCFOptions&& _titan_cf_options, bool need_trigger_next); // No copying allowed @@ -21,7 +21,7 @@ class BlobGC { ~BlobGC(); - const std::vector& inputs() { return inputs_; } + const std::vector>& inputs() { return inputs_; } const TitanCFOptions& titan_cf_options() { return titan_cf_options_; } @@ -40,7 +40,7 @@ class BlobGC { bool trigger_next() { return trigger_next_; } private: - std::vector inputs_; + std::vector> inputs_; std::vector outputs_; TitanCFOptions titan_cf_options_; ColumnFamilyHandle* cfh_{nullptr}; diff --git a/src/blob_gc_job.cc b/src/blob_gc_job.cc index f98de963a..aec9e14fe 100644 --- a/src/blob_gc_job.cc +++ b/src/blob_gc_job.cc @@ -364,8 +364,6 @@ Status BlobGCJob::Finish() { mutex_->Lock(); } - // TODO(@DorianZheng) cal discardable size for new blob file - if (s.ok() && !blob_gc_->GetColumnFamilyData()->IsDropped()) { s = DeleteInputBlobFiles(); } @@ -567,11 +565,14 @@ Status BlobGCJob::DeleteInputBlobFiles() { metrics_.gc_num_files++; RecordInHistogram(statistics(stats_), TITAN_GC_INPUT_FILE_SIZE, file->file_size()); + if (file->is_obsolete()) { + // There may be a concurrent DeleteBlobFilesInRanges or GC, + // so the input file is already deleted. + continue; + } edit.DeleteBlobFile(file->file_number(), obsolete_sequence); } s = blob_file_set_->LogAndApply(edit); - // TODO(@DorianZheng) Purge pending outputs - // base_db_->pending_outputs_.erase(handle->GetNumber()); return s; } diff --git a/src/blob_gc_job_test.cc b/src/blob_gc_job_test.cc index 560b6a7c3..c697a84ca 100644 --- a/src/blob_gc_job_test.cc +++ b/src/blob_gc_job_test.cc @@ -213,7 +213,7 @@ class BlobGCJobTest : public testing::TestWithParam { ASSERT_OK(WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), key, res)); auto rewrite_status = base_db_->Write(WriteOptions(), &wb); - std::vector tmp; + std::vector> tmp; BlobGC blob_gc(std::move(tmp), TitanCFOptions(), false /*trigger_next*/); blob_gc.SetColumnFamily(cfh); BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(), diff --git a/src/blob_gc_picker.cc b/src/blob_gc_picker.cc index ff9ae35dc..0d3d221bf 100644 --- a/src/blob_gc_picker.cc +++ b/src/blob_gc_picker.cc @@ -19,7 +19,7 @@ BasicBlobGCPicker::~BasicBlobGCPicker() {} std::unique_ptr BasicBlobGCPicker::PickBlobGC( BlobStorage* blob_storage) { Status s; - std::vector blob_files; + std::vector> blob_files; uint64_t batch_size = 0; uint64_t estimate_output_size = 0; @@ -39,7 +39,7 @@ std::unique_ptr BasicBlobGCPicker::PickBlobGC( continue; } if (!stop_picking) { - blob_files.push_back(blob_file.get()); + blob_files.emplace_back(blob_file); batch_size += blob_file->file_size(); estimate_output_size += blob_file->live_data_size(); if (batch_size >= cf_options_.max_gc_batch_size || diff --git a/src/db_impl_gc.cc b/src/db_impl_gc.cc index 7900fab0a..36c566c30 100644 --- a/src/db_impl_gc.cc +++ b/src/db_impl_gc.cc @@ -209,6 +209,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, s = blob_gc_job.Prepare(); if (s.ok()) { mutex_.Unlock(); + TEST_SYNC_POINT("TitanDBImpl::BackgroundGC::BeforeRunGCJob"); s = blob_gc_job.Run(); TEST_SYNC_POINT("TitanDBImpl::BackgroundGC::AfterRunGCJob"); mutex_.Lock(); diff --git a/src/titan_db_test.cc b/src/titan_db_test.cc index 18fcfe554..6f072e15c 100644 --- a/src/titan_db_test.cc +++ b/src/titan_db_test.cc @@ -1501,6 +1501,54 @@ TEST_F(TitanDBTest, CompactionDuringGC) { CheckBlobFileCount(0); } +TEST_F(TitanDBTest, DeleteFilesInRangeDuringGC) { + options_.max_background_gc = 1; + options_.disable_background_gc = false; + options_.blob_file_discardable_ratio = 0.01; + Open(); + + ASSERT_OK(db_->Put(WriteOptions(), "k1", std::string(10 * 1024, 'v'))); + auto snap = db_->GetSnapshot(); + ASSERT_OK(db_->Put(WriteOptions(), "k1", std::string(100 * 1024, 'v'))); + Flush(); + + db_->ReleaseSnapshot(snap); + + SyncPoint::GetInstance()->LoadDependency( + {{"TitanDBImpl::BackgroundGC::BeforeRunGCJob", + "TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCStart"}, + {"TitanDBTest::DeleteFilesInRangeDuringGC::ContinueGC", + "BlobGCJob::Finish::BeforeRewriteValidKeyToLSM"}, + {"BlobGCJob::Finish::AfterRewriteValidKeyToLSM", + "TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCFinish"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + CheckBlobFileCount(1); + CompactAll(); + std::shared_ptr blob_storage = GetBlobStorage().lock(); + ASSERT_TRUE(blob_storage != nullptr); + std::map> blob_files; + blob_storage->ExportBlobFiles(blob_files); + ASSERT_EQ(blob_files.size(), 1); + + // trigger GC + CompactAll(); + + TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCStart"); + DeleteFilesInRange(nullptr, nullptr); + + TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::ContinueGC"); + TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCFinish"); + + std::string value; + Status s = db_->Get(ReadOptions(), "k1", &value); + ASSERT_TRUE(s.IsNotFound()); + // it shouldn't be any background error + ASSERT_OK(db_->Flush(FlushOptions())); + + SyncPoint::GetInstance()->DisableProcessing(); +} + } // namespace titandb } // namespace rocksdb