From 078fb7c3b02a1931c4ac70957cc8940030ea20f1 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Wed, 16 Oct 2019 10:39:00 -0700 Subject: [PATCH] Fix OnFlushCompleted fired before flush result write to MANIFEST (#5908) Summary: When there are concurrent flush job on the same CF, `OnFlushCompleted` can be called before the flush result being install to LSM. Fixing the issue by passing `FlushJobInfo` through `MemTable`, and the thread who commit the flush result can fetch the `FlushJobInfo` and fire `OnFlushCompleted` on behave of the thread actually writing the SST. Fix https://github.com/facebook/rocksdb/issues/5892 Pull Request resolved: https://github.com/facebook/rocksdb/pull/5908 Test Plan: Add new test. The test will fail without the fix. Differential Revision: D17916144 Pulled By: riversand963 fbshipit-source-id: e18df67d9533b5baee52ae3605026cdeb05cbe10 --- HISTORY.md | 3 + db/db_flush_test.cc | 96 ++++++++++++++++++++++++++ db/db_impl/db_impl.h | 8 +-- db/db_impl/db_impl_compaction_flush.cc | 77 +++++++++------------ db/flush_job.cc | 26 ++++++- db/flush_job.h | 18 ++++- db/flush_job_test.cc | 12 ++-- db/memtable.h | 16 +++++ db/memtable_list.cc | 11 ++- db/memtable_list.h | 5 +- db/memtable_list_test.cc | 6 +- include/rocksdb/listener.h | 1 + 12 files changed, 217 insertions(+), 62 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index ddc6c5a0796..e1b515aa738 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,9 @@ ### New Features * When user uses options.force_consistency_check in RocksDb, instead of crashing the process, we now pass the error back to the users without killing the process. +### Bug Fixes +* Fix OnFlushCompleted fired before flush result persisted in MANIFEST when there's concurrent flush job. The bug exists since OnFlushCompleted was introduced in rocksdb 3.8. + ## 6.4.4 (9/17/2019) * Fix a bug introduced 6.3 which could cause wrong results in a corner case when prefix bloom filter is used and the iterator is reseeked. diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 034ec63226c..c586cd3222d 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -7,10 +7,16 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include + +#include "db/db_impl/db_impl.h" #include "db/db_test_util.h" +#include "port/port.h" #include "port/stack_trace.h" #include "test_util/fault_injection_test_env.h" #include "test_util/sync_point.h" +#include "util/cast_util.h" +#include "util/mutexlock.h" namespace rocksdb { @@ -323,6 +329,96 @@ TEST_F(DBFlushTest, CFDropRaceWithWaitForFlushMemTables) { SyncPoint::GetInstance()->DisableProcessing(); } +#ifndef ROCKSDB_LITE +TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) { + class TestListener : public EventListener { + public: + void OnFlushCompleted(DB* db, const FlushJobInfo& info) override { + // There's only one key in each flush. + ASSERT_EQ(info.smallest_seqno, info.largest_seqno); + ASSERT_NE(0, info.smallest_seqno); + if (info.smallest_seqno == seq1) { + // First flush completed + ASSERT_FALSE(completed1); + completed1 = true; + CheckFlushResultCommitted(db, seq1); + } else { + // Second flush completed + ASSERT_FALSE(completed2); + completed2 = true; + ASSERT_EQ(info.smallest_seqno, seq2); + CheckFlushResultCommitted(db, seq2); + } + } + + void CheckFlushResultCommitted(DB* db, SequenceNumber seq) { + DBImpl* db_impl = static_cast_with_check(db); + InstrumentedMutex* mutex = db_impl->mutex(); + mutex->Lock(); + auto* cfd = + reinterpret_cast(db->DefaultColumnFamily()) + ->cfd(); + ASSERT_LT(seq, cfd->imm()->current()->GetEarliestSequenceNumber()); + mutex->Unlock(); + } + + std::atomic seq1{0}; + std::atomic seq2{0}; + std::atomic completed1{false}; + std::atomic completed2{false}; + }; + std::shared_ptr listener = std::make_shared(); + + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::FlushMemTable:AfterScheduleFlush", + "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"}, + {"DBImpl::FlushMemTableToOutputFile:Finish", + "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitSecond"}}); + SyncPoint::GetInstance()->SetCallBack( + "FlushJob::WriteLevel0Table", [&listener](void* arg) { + // Wait for the second flush finished, out of mutex. + auto* mems = reinterpret_cast*>(arg); + if (mems->front()->GetEarliestSequenceNumber() == listener->seq1 - 1) { + TEST_SYNC_POINT( + "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:" + "WaitSecond"); + } + }); + + Options options = CurrentOptions(); + options.create_if_missing = true; + options.listeners.push_back(listener); + // Setting max_flush_jobs = max_background_jobs / 4 = 2. + options.max_background_jobs = 8; + // Allow 2 immutable memtables. + options.max_write_buffer_number = 3; + Reopen(options); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(Put("foo", "v")); + listener->seq1 = db_->GetLatestSequenceNumber(); + // t1 will wait for the second flush complete before committing flush result. + auto t1 = port::Thread([&]() { + // flush_opts.wait = true + ASSERT_OK(db_->Flush(FlushOptions())); + }); + // Wait for first flush scheduled. + TEST_SYNC_POINT( + "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"); + // The second flush will exit early without commit its result. The work + // is delegated to the first flush. + ASSERT_OK(Put("bar", "v")); + listener->seq2 = db_->GetLatestSequenceNumber(); + FlushOptions flush_opts; + flush_opts.wait = false; + ASSERT_OK(db_->Flush(flush_opts)); + t1.join(); + ASSERT_TRUE(listener->completed1); + ASSERT_TRUE(listener->completed2); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} +#endif // !ROCKSDB_LITE + TEST_P(DBAtomicFlushTest, ManualAtomicFlush) { Options options = CurrentOptions(); options.create_if_missing = true; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index a0a2f97c1a6..776833ce790 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -968,11 +968,11 @@ class DBImpl : public DB { void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, - int job_id, TableProperties prop); + int job_id); - void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta, - const MutableCFOptions& mutable_cf_options, - int job_id, TableProperties prop); + void NotifyOnFlushCompleted( + ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, + std::list>* flush_jobs_info); void NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c, const Status& st, diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index a00b60ddf8d..787fd13f58f 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -163,8 +163,7 @@ Status DBImpl::FlushMemTableToOutputFile( #ifndef ROCKSDB_LITE // may temporarily unlock and lock the mutex. - NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id, - flush_job.GetTableProperties()); + NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id); #endif // ROCKSDB_LITE Status s; @@ -212,8 +211,8 @@ Status DBImpl::FlushMemTableToOutputFile( if (s.ok()) { #ifndef ROCKSDB_LITE // may temporarily unlock and lock the mutex. - NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options, - job_context->job_id, flush_job.GetTableProperties()); + NotifyOnFlushCompleted(cfd, mutable_cf_options, + flush_job.GetCommittedFlushJobsInfo()); auto sfm = static_cast( immutable_db_options_.sst_file_manager.get()); if (sfm) { @@ -232,6 +231,7 @@ Status DBImpl::FlushMemTableToOutputFile( } #endif // ROCKSDB_LITE } + TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish"); return s; } @@ -302,7 +302,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( autovector distinct_output_dirs; autovector distinct_output_dir_paths; - std::vector jobs; + std::vector> jobs; std::vector all_mutable_cf_options; int num_cfs = static_cast(cfds.size()); all_mutable_cf_options.reserve(num_cfs); @@ -329,7 +329,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions()); const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back(); const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_); - jobs.emplace_back( + jobs.emplace_back(new FlushJob( dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id, env_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot, @@ -337,8 +337,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, false /* sync_output_directory */, false /* write_manifest */, - thread_pri); - jobs.back().PickMemTable(); + thread_pri)); + jobs.back()->PickMemTable(); } std::vector file_meta(num_cfs); @@ -350,7 +350,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i); // may temporarily unlock and lock the mutex. NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options, - job_context->job_id, jobs[i].GetTableProperties()); + job_context->job_id); } #endif /* !ROCKSDB_LITE */ @@ -372,7 +372,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( // TODO (yanqin): parallelize jobs with threads. for (int i = 1; i != num_cfs; ++i) { exec_status[i].second = - jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]); + jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]); exec_status[i].first = true; } if (num_cfs > 1) { @@ -381,8 +381,10 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( TEST_SYNC_POINT( "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"); } + assert(exec_status.size() > 0); + assert(!file_meta.empty()); exec_status[0].second = - jobs[0].Run(&logs_with_prep_tracker_, &file_meta[0]); + jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]); exec_status[0].first = true; Status error_status; @@ -423,7 +425,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( auto wait_to_install_func = [&]() { bool ready = true; for (size_t i = 0; i != cfds.size(); ++i) { - const auto& mems = jobs[i].GetMemTables(); + const auto& mems = jobs[i]->GetMemTables(); if (cfds[i]->IsDropped()) { // If the column family is dropped, then do not wait. continue; @@ -464,7 +466,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( autovector mutable_cf_options_list; autovector tmp_file_meta; for (int i = 0; i != num_cfs; ++i) { - const auto& mems = jobs[i].GetMemTables(); + const auto& mems = jobs[i]->GetMemTables(); if (!cfds[i]->IsDropped() && !mems.empty()) { tmp_cfds.emplace_back(cfds[i]); mems_list.emplace_back(&mems); @@ -500,12 +502,13 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( #ifndef ROCKSDB_LITE auto sfm = static_cast( immutable_db_options_.sst_file_manager.get()); + assert(all_mutable_cf_options.size() == static_cast(num_cfs)); for (int i = 0; i != num_cfs; ++i) { if (cfds[i]->IsDropped()) { continue; } - NotifyOnFlushCompleted(cfds[i], &file_meta[i], all_mutable_cf_options[i], - job_context->job_id, jobs[i].GetTableProperties()); + NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i], + jobs[i]->GetCommittedFlushJobsInfo()); if (sfm) { std::string file_path = MakeTableFileName( cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber()); @@ -529,12 +532,12 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( // unref the versions. for (int i = 0; i != num_cfs; ++i) { if (!exec_status[i].first) { - jobs[i].Cancel(); + jobs[i]->Cancel(); } } for (int i = 0; i != num_cfs; ++i) { if (exec_status[i].first && exec_status[i].second.ok()) { - auto& mems = jobs[i].GetMemTables(); + auto& mems = jobs[i]->GetMemTables(); cfds[i]->imm()->RollbackMemtableFlush(mems, file_meta[i].fd.GetNumber()); } @@ -548,7 +551,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, - int job_id, TableProperties prop) { + int job_id) { #ifndef ROCKSDB_LITE if (immutable_db_options_.listeners.size() == 0U) { return; @@ -579,7 +582,6 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, info.triggered_writes_stop = triggered_writes_stop; info.smallest_seqno = file_meta->fd.smallest_seqno; info.largest_seqno = file_meta->fd.largest_seqno; - info.table_properties = prop; info.flush_reason = cfd->GetFlushReason(); for (auto listener : immutable_db_options_.listeners) { listener->OnFlushBegin(this, info); @@ -593,15 +595,14 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, (void)file_meta; (void)mutable_cf_options; (void)job_id; - (void)prop; #endif // ROCKSDB_LITE } -void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, - FileMetaData* file_meta, - const MutableCFOptions& mutable_cf_options, - int job_id, TableProperties prop) { +void DBImpl::NotifyOnFlushCompleted( + ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, + std::list>* flush_jobs_info) { #ifndef ROCKSDB_LITE + assert(flush_jobs_info != nullptr); if (immutable_db_options_.listeners.size() == 0U) { return; } @@ -618,34 +619,22 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, // release lock while notifying events mutex_.Unlock(); { - FlushJobInfo info; - info.cf_id = cfd->GetID(); - info.cf_name = cfd->GetName(); - // TODO(yhchiang): make db_paths dynamic in case flush does not - // go to L0 in the future. - info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path, - file_meta->fd.GetNumber()); - info.thread_id = env_->GetThreadID(); - info.job_id = job_id; - info.triggered_writes_slowdown = triggered_writes_slowdown; - info.triggered_writes_stop = triggered_writes_stop; - info.smallest_seqno = file_meta->fd.smallest_seqno; - info.largest_seqno = file_meta->fd.largest_seqno; - info.table_properties = prop; - info.flush_reason = cfd->GetFlushReason(); - for (auto listener : immutable_db_options_.listeners) { - listener->OnFlushCompleted(this, info); + for (auto& info : *flush_jobs_info) { + info->triggered_writes_slowdown = triggered_writes_slowdown; + info->triggered_writes_stop = triggered_writes_stop; + for (auto listener : immutable_db_options_.listeners) { + listener->OnFlushCompleted(this, *info); + } } + flush_jobs_info->clear(); } mutex_.Lock(); // no need to signal bg_cv_ as it will be signaled at the end of the // flush process. #else (void)cfd; - (void)file_meta; (void)mutable_cf_options; - (void)job_id; - (void)prop; + (void)flush_jobs_info; #endif // ROCKSDB_LITE } diff --git a/db/flush_job.cc b/db/flush_job.cc index 589d81f2974..716e21e9749 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -241,7 +241,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, s = cfd_->imm()->TryInstallMemtableFlushResults( cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_, meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, - log_buffer_); + log_buffer_, &committed_flush_jobs_info_); } if (s.ok() && file_meta != nullptr) { @@ -392,7 +392,7 @@ Status FlushJob::WriteLevel0Table() { if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) { s = output_file_directory_->Fsync(); } - TEST_SYNC_POINT("FlushJob::WriteLevel0Table"); + TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_); db_mutex_->Lock(); } base_->Unref(); @@ -410,6 +410,10 @@ Status FlushJob::WriteLevel0Table() { meta_.fd.smallest_seqno, meta_.fd.largest_seqno, meta_.marked_for_compaction); } +#ifndef ROCKSDB_LITE + // Piggyback FlushJobInfo on the first first flushed memtable. + mems_[0]->SetFlushJobInfo(GetFlushJobInfo()); +#endif // !ROCKSDB_LITE // Note that here we treat flush as level 0 compaction in internal stats InternalStats::CompactionStats stats(CompactionReason::kFlush, 1); @@ -424,4 +428,22 @@ Status FlushJob::WriteLevel0Table() { return s; } +#ifndef ROCKSDB_LITE +std::unique_ptr FlushJob::GetFlushJobInfo() const { + db_mutex_->AssertHeld(); + std::unique_ptr info(new FlushJobInfo); + info->cf_id = cfd_->GetID(); + info->cf_name = cfd_->GetName(); + info->file_path = MakeTableFileName(cfd_->ioptions()->cf_paths[0].path, + meta_.fd.GetNumber()); + info->thread_id = db_options_.env->GetThreadID(); + info->job_id = job_context_->job_id; + info->smallest_seqno = meta_.fd.smallest_seqno; + info->largest_seqno = meta_.fd.largest_seqno; + info->table_properties = table_properties_; + info->flush_reason = cfd_->GetFlushReason(); + return info; +} +#endif // !ROCKSDB_LITE + } // namespace rocksdb diff --git a/db/flush_job.h b/db/flush_job.h index fdb0917bdba..b25aca3529c 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -11,10 +11,11 @@ #include #include #include +#include #include +#include #include #include -#include #include "db/column_family.h" #include "db/dbformat.h" @@ -34,6 +35,7 @@ #include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" +#include "rocksdb/listener.h" #include "rocksdb/memtablerep.h" #include "rocksdb/transaction_log.h" #include "table/scoped_arena_iterator.h" @@ -79,14 +81,22 @@ class FlushJob { Status Run(LogsWithPrepTracker* prep_tracker = nullptr, FileMetaData* file_meta = nullptr); void Cancel(); - TableProperties GetTableProperties() const { return table_properties_; } const autovector& GetMemTables() const { return mems_; } +#ifndef ROCKSDB_LITE + std::list>* GetCommittedFlushJobsInfo() { + return &committed_flush_jobs_info_; + } +#endif // !ROCKSDB_LITE + private: void ReportStartedFlush(); void ReportFlushInputSize(const autovector& mems); void RecordFlushIOStats(); Status WriteLevel0Table(); +#ifndef ROCKSDB_LITE + std::unique_ptr GetFlushJobInfo() const; +#endif // !ROCKSDB_LITE const std::string& dbname_; ColumnFamilyData* cfd_; @@ -131,6 +141,10 @@ class FlushJob { // In this case, only after all flush jobs succeed in flush can RocksDB // commit to the MANIFEST. const bool write_manifest_; + // The current flush job can commit flush result of a concurrent flush job. + // We collect FlushJobInfo of all jobs committed by current job and fire + // OnFlushCompleted for them. + std::list> committed_flush_jobs_info_; // Variables below are set by PickMemTable(): FileMetaData meta_; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 130179ae67b..9718cce7f7e 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -298,18 +298,18 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { EventLogger event_logger(db_options_.info_log.get()); SnapshotChecker* snapshot_checker = nullptr; // not relevant - std::vector flush_jobs; + std::vector> flush_jobs; k = 0; for (auto cfd : all_cfds) { std::vector snapshot_seqs; - flush_jobs.emplace_back( + flush_jobs.emplace_back(new FlushJob( dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), &memtable_ids[k], env_options_, versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, false /* sync_output_directory */, false /* write_manifest */, - Env::Priority::USER); + Env::Priority::USER)); k++; } HistogramData hist; @@ -318,12 +318,12 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { file_metas.reserve(flush_jobs.size()); mutex_.Lock(); for (auto& job : flush_jobs) { - job.PickMemTable(); + job->PickMemTable(); } for (auto& job : flush_jobs) { FileMetaData meta; // Run will release and re-acquire mutex - ASSERT_OK(job.Run(nullptr /**/, &meta)); + ASSERT_OK(job->Run(nullptr /**/, &meta)); file_metas.emplace_back(meta); } autovector file_meta_ptrs; @@ -332,7 +332,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { } autovector*> mems_list; for (size_t i = 0; i != all_cfds.size(); ++i) { - const auto& mems = flush_jobs[i].GetMemTables(); + const auto& mems = flush_jobs[i]->GetMemTables(); mems_list.push_back(&mems); } autovector mutable_cf_options_list; diff --git a/db/memtable.h b/db/memtable.h index 6b8c4141f5a..6db6edd8818 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -31,6 +31,7 @@ namespace rocksdb { +struct FlushJobInfo; class Mutex; class MemTableIterator; class MergeContext; @@ -401,6 +402,16 @@ class MemTable { flush_in_progress_ = in_progress; } +#ifndef ROCKSDB_LITE + void SetFlushJobInfo(std::unique_ptr&& info) { + flush_job_info_ = std::move(info); + } + + std::unique_ptr ReleaseFlushJobInfo() { + return std::move(flush_job_info_); + } +#endif // !ROCKSDB_LITE + private: enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; @@ -479,6 +490,11 @@ class MemTable { // writes with sequence number smaller than seq are flushed. SequenceNumber atomic_flush_seqno_; +#ifndef ROCKSDB_LITE + // Flush job info of the current memtable. + std::unique_ptr flush_job_info_; +#endif // !ROCKSDB_LITE + // Returns a heuristic flush decision bool ShouldFlushNow() const; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 0f796eb9a73..12737dacfb9 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -324,7 +324,8 @@ Status MemTableList::TryInstallMemtableFlushResults( const autovector& mems, LogsWithPrepTracker* prep_tracker, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, autovector* to_delete, Directory* db_directory, - LogBuffer* log_buffer) { + LogBuffer* log_buffer, + std::list>* committed_flush_jobs_info) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); mu->AssertHeld(); @@ -382,6 +383,14 @@ Status MemTableList::TryInstallMemtableFlushResults( cfd->GetName().c_str(), m->file_number_); edit_list.push_back(&m->edit_); memtables_to_flush.push_back(m); +#ifndef ROCKSDB_LITE + std::unique_ptr info = m->ReleaseFlushJobInfo(); + if (info != nullptr) { + committed_flush_jobs_info->push_back(std::move(info)); + } +#else + (void)committed_flush_jobs_info; +#endif // !ROCKSDB_LITE } batch_count++; } diff --git a/db/memtable_list.h b/db/memtable_list.h index a72077ff3d5..d83a084b9fd 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -33,6 +33,8 @@ class InstrumentedMutex; class MergeIteratorBuilder; class MemTableList; +struct FlushJobInfo; + // keeps a list of immutable memtables in a vector. the list is immutable // if refcount is bigger than one. It is used as a state for Get() and // Iterator code paths @@ -227,7 +229,8 @@ class MemTableList { const autovector& m, LogsWithPrepTracker* prep_tracker, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, autovector* to_delete, Directory* db_directory, - LogBuffer* log_buffer); + LogBuffer* log_buffer, + std::list>* committed_flush_jobs_info); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 3a14b6830a6..cdd295ad116 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -117,9 +117,11 @@ class MemTableListTest : public testing::Test { // Create dummy mutex. InstrumentedMutex mutex; InstrumentedMutexLock l(&mutex); - return list->TryInstallMemtableFlushResults( + std::list> flush_jobs_info; + Status s = list->TryInstallMemtableFlushResults( cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex, - file_num, to_delete, nullptr, &log_buffer); + file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info); + return s; } // Calls MemTableList::InstallMemtableFlushResults() and sets up all diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 5be55cbede8..8d11bfaeaf8 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -459,6 +459,7 @@ class EventListener { #else class EventListener {}; +struct FlushJobInfo {}; #endif // ROCKSDB_LITE