Skip to content

Commit

Permalink
Fix wrong assert delta < 0 for cocurrent compaction while flush (#172) (
Browse files Browse the repository at this point in the history
#176)

* fix cocurrent compaction while flush

Signed-off-by: Connor1996 <zbk602423539@gmail.com>
  • Loading branch information
Connor1996 committed Jun 4, 2020
1 parent 0db7976 commit 80657c0
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 19 deletions.
Empty file removed .clion.source.upload.marker
Empty file.
16 changes: 13 additions & 3 deletions src/blob_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,6 @@ class BlobFileMeta {
// Persistent field
uint64_t file_number_{0};
uint64_t file_size_{0};
// Size of data with reference from SST files.
uint64_t live_data_size_{0};
uint64_t file_entries_;
// Target level of compaction/flush which generates this blob file
uint32_t file_level_;
Expand All @@ -280,7 +278,19 @@ class BlobFileMeta {
std::string largest_key_;

// Not persistent field
FileState state_{FileState::kInit};

// Size of data with reference from SST files.
//
// Because the new generated SST is added to superversion before
// `OnFlushCompleted()`/`OnCompactionCompleted()` is called, so if there is a
// later compaction trigger by the new generated SST, the later
// `OnCompactionCompleted()` maybe called before the previous events'
// `OnFlushCompleted()`/`OnCompactionCompleted()` is called.
// So when state_ == kPendingLSM, it uses this to record the delta as a
// positive number if any later compaction is trigger before previous
// `OnCompactionCompleted()` is called.
std::atomic<uint64_t> live_data_size_{0};
std::atomic<FileState> state_{FileState::kInit};
};

// Format of blob file header for version 1 (8 bytes):
Expand Down
9 changes: 8 additions & 1 deletion src/blob_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,14 @@ void BlobStorage::AddBlobFile(std::shared_ptr<BlobFileMeta>& file) {
files_.emplace(std::make_pair(file->file_number(), file));
blob_ranges_.emplace(std::make_pair(Slice(file->smallest_key()), file));
levels_file_count_[file->file_level()]++;
AddStats(stats_, cf_id_, file->GetDiscardableRatioLevel(), 1);
if (file->live_data_size() != 0) {
// When live data size == 0, it means the live size of blob file is unknown
// now.
// So don't count this metrics now, it will delayed to when setting the real
// live data size
// in `InitializeGC()` and `OnFlushCompleted()`/`OnCompactionCompleted()`.
AddStats(stats_, cf_id_, file->GetDiscardableRatioLevel(), 1);
}
AddStats(stats_, cf_id_, TitanInternalStats::LIVE_BLOB_FILE_SIZE,
file->file_size());
AddStats(stats_, cf_id_, TitanInternalStats::NUM_LIVE_BLOB_FILE, 1);
Expand Down
62 changes: 49 additions & 13 deletions src/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,7 @@ bool TitanDBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
}

void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
TEST_SYNC_POINT("TitanDBImpl::OnFlushCompleted:Begin1");
TEST_SYNC_POINT("TitanDBImpl::OnFlushCompleted:Begin");
if (!initialized()) {
assert(false);
Expand Down Expand Up @@ -1164,10 +1165,18 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
delta = 0;
}

// the metrics is counted in the table builder when flushing,
// so update it when updateing the live data size.
SubStats(stats_.get(), flush_job_info.cf_id,
file->GetDiscardableRatioLevel(), 1);
if (file->live_data_size() != 0) {
// Because the flushed SST is added to superversion before
// `OnFlushCompleted()` is called, so if there is a concurrent
// compaction, `OnCompactionCompleted()` maybe called before
// `OnFlushCompleted()` is called.
// In this case, the state of the blob file generated by the flush is
// still `kPendingLSM`, while the blob file size delta is for the
// compaction event. So it is possible that delta is negative.
// It records the delta as a positive number if any, so here subtract it
// from the total live data size.
delta -= file->live_data_size();
}
file->set_live_data_size(static_cast<uint64_t>(delta));
AddStats(stats_.get(), flush_job_info.cf_id,
file->GetDiscardableRatioLevel(), 1);
Expand Down Expand Up @@ -1250,17 +1259,44 @@ void TitanDBImpl::OnCompactionCompleted(
continue;
}
if (file->file_state() == BlobFileMeta::FileState::kPendingLSM) {
if (delta < 0) {
// Cannot happen..
ROCKS_LOG_WARN(db_options_.info_log,
"OnCompactionCompleted[%d]: New blob file %" PRIu64
" live size being negative",
if (delta <= 0) {
// Because the new generated SST is added to superversion before
// `OnFlushCompleted()`/`OnCompactionCompleted()` is called, so if
// there is a later compaction trigger by the new generated SST, the
// later `OnCompactionCompleted()` maybe called before the previous
// events' `OnFlushCompleted()`/`OnCompactionCompleted()` is called.
// In this case, the state of the blob file generated by the
// flush/compaction is still `kPendingLSM`, while the blob file size
// delta is for the later compaction event. So it is possible that
// delta is negative.
// To make the live data size accurate, here records the delta as a
// positive number. And the delta will be subtracted with total live
// data size in the previous
// `OnFlushCompleted()`/`OnCompactionCompleted()`.
bool ok = file->UpdateLiveDataSize(static_cast<uint64_t>(-delta));
if (!ok) {
// Cannot happen
ROCKS_LOG_WARN(
db_options_.info_log,
"OnCompactionCompleted[%d]: pendingLSM Blob file %" PRIu64
" live size below zero.",
compaction_job_info.job_id, file_number);
assert(false);
}
ROCKS_LOG_INFO(db_options_.info_log,
"OnCompactionCompleted[%d]: Get blob file %" PRIu64
" live size being negative, maybe due to "
"OnFlushCompleted() is called yet",
compaction_job_info.job_id, file_number);
assert(false);
delta = 0;
continue;
}
if (file->live_data_size() != 0) {
// It records the delta as a positive number if any later compaction
// is trigger before previous `OnCompactionCompleted()` is called, so
// here subtract it
// from the total live data size.
delta -= file->live_data_size();
}
SubStats(stats_.get(), compaction_job_info.cf_id,
file->GetDiscardableRatioLevel(), 1);
file->set_live_data_size(static_cast<uint64_t>(delta));
AddStats(stats_.get(), compaction_job_info.cf_id,
file->GetDiscardableRatioLevel(), 1);
Expand Down
2 changes: 0 additions & 2 deletions src/db_impl_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ Status TitanDBImpl::InitializeGC(
blob_storage->FindFile(file_size.first).lock();
if (file != nullptr) {
assert(file->live_data_size() == 0);
SubStats(stats_.get(), cf_handle->GetID(),
file->GetDiscardableRatioLevel(), 1);
file->set_live_data_size(static_cast<uint64_t>(file_size.second));
AddStats(stats_.get(), cf_handle->GetID(),
file->GetDiscardableRatioLevel(), 1);
Expand Down
40 changes: 40 additions & 0 deletions src/titan_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,46 @@ TEST_F(TitanDBTest, GCAfterReopen) {
ASSERT_GT(file2->file_number(), file_number1);
}

TEST_F(TitanDBTest, CompactionDuringFlush) {
options_.max_background_gc = 1;
options_.disable_background_gc = true;
Open();

ASSERT_OK(db_->Put(WriteOptions(), "k1", "value"));
Flush();

SyncPoint::GetInstance()->LoadDependency(
{{"TitanDBImpl::OnFlushCompleted:Begin1",
"TitanDBTest::CompactionDuringFlush::WaitFlushStart"},
{"TitanDBTest::CompactionDuringFlush::ContinueFlush",
"TitanDBImpl::OnFlushCompleted:Begin"}});
SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(db_->Put(WriteOptions(), "k1", std::string(10 * 1024, 'v')));
auto snap = db_->GetSnapshot();
ASSERT_OK(db_->Delete(WriteOptions(), "k1"));

port::Thread writer([&]() { Flush(); });
TEST_SYNC_POINT("TitanDBTest::CompactionDuringFlush::WaitFlushStart");
db_->ReleaseSnapshot(snap);

auto compact_opts = CompactRangeOptions();
compact_opts.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(compact_opts, nullptr, nullptr));
ASSERT_OK(db_->CompactRange(compact_opts, nullptr, nullptr));

TEST_SYNC_POINT("TitanDBTest::CompactionDuringFlush::ContinueFlush");
writer.join();
CheckBlobFileCount(1);
SyncPoint::GetInstance()->DisableProcessing();

std::string value;
Status s = db_->Get(ReadOptions(), "k1", &value);
ASSERT_TRUE(s.IsNotFound());
// it shouldn't be any background error
ASSERT_OK(db_->Flush(FlushOptions()));
}

TEST_F(TitanDBTest, CompactionDuringGC) {
options_.max_background_gc = 1;
options_.disable_background_gc = false;
Expand Down

0 comments on commit 80657c0

Please sign in to comment.