From 2426d4c7c376ff6d1ce107a0630b02ca995e74fd Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Fri, 11 Oct 2019 05:26:45 +0000 Subject: [PATCH 01/11] fire OnFlushCompleted after flush result committed Signed-off-by: Yi Wu --- db/db_impl/db_impl.h | 6 +-- db/db_impl/db_impl_compaction_flush.cc | 53 ++++++++++---------------- db/flush_job.cc | 20 +++++++++- db/flush_job.h | 12 +++++- db/memtable.h | 10 +++++ db/memtable_list.cc | 7 +++- db/memtable_list.h | 6 ++- 7 files changed, 72 insertions(+), 42 deletions(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index c1f4e66b9b0..4d9cb6201b1 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1005,11 +1005,11 @@ class DBImpl : public DB { void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, - int job_id, TableProperties prop); + int job_id); - void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta, + void NotifyOnFlushCompleted(ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, - int job_id, TableProperties prop); + autovector* flush_jobs_info); void NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c, const Status& st, diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 8e4dc411f53..abf3a2396f7 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -164,8 +164,7 @@ Status DBImpl::FlushMemTableToOutputFile( #ifndef ROCKSDB_LITE // may temporarily unlock and lock the mutex. - NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id, - flush_job.GetTableProperties()); + NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id); #endif // ROCKSDB_LITE Status s; @@ -213,8 +212,8 @@ Status DBImpl::FlushMemTableToOutputFile( if (s.ok()) { #ifndef ROCKSDB_LITE // may temporarily unlock and lock the mutex. - NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options, - job_context->job_id, flush_job.GetTableProperties()); + NotifyOnFlushCompleted(cfd, mutable_cf_options, + flush_job.GetCommittedFlushJobsInfo()); auto sfm = static_cast( immutable_db_options_.sst_file_manager.get()); if (sfm) { @@ -351,7 +350,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i); // may temporarily unlock and lock the mutex. NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options, - job_context->job_id, jobs[i].GetTableProperties()); + job_context->job_id); } #endif /* !ROCKSDB_LITE */ @@ -505,8 +504,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( if (cfds[i]->IsDropped()) { continue; } - NotifyOnFlushCompleted(cfds[i], &file_meta[i], all_mutable_cf_options[i], - job_context->job_id, jobs[i].GetTableProperties()); + NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i], + jobs[i].GetCommittedFlushJobsInfo()); if (sfm) { std::string file_path = MakeTableFileName( cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber()); @@ -549,7 +548,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, - int job_id, TableProperties prop) { + int job_id) { #ifndef ROCKSDB_LITE if (immutable_db_options_.listeners.size() == 0U) { return; @@ -580,7 +579,6 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, info.triggered_writes_stop = triggered_writes_stop; info.smallest_seqno = file_meta->fd.smallest_seqno; info.largest_seqno = file_meta->fd.largest_seqno; - info.table_properties = prop; info.flush_reason = cfd->GetFlushReason(); for (auto listener : immutable_db_options_.listeners) { listener->OnFlushBegin(this, info); @@ -598,11 +596,11 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, #endif // ROCKSDB_LITE } -void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, - FileMetaData* file_meta, - const MutableCFOptions& mutable_cf_options, - int job_id, TableProperties prop) { +void DBImpl::NotifyOnFlushCompleted( + ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, + autovector* flush_jobs_info) { #ifndef ROCKSDB_LITE + assert(flush_jobs_info != nullptr); if (immutable_db_options_.listeners.size() == 0U) { return; } @@ -619,34 +617,23 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, // release lock while notifying events mutex_.Unlock(); { - FlushJobInfo info; - info.cf_id = cfd->GetID(); - info.cf_name = cfd->GetName(); - // TODO(yhchiang): make db_paths dynamic in case flush does not - // go to L0 in the future. - info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path, - file_meta->fd.GetNumber()); - info.thread_id = env_->GetThreadID(); - info.job_id = job_id; - info.triggered_writes_slowdown = triggered_writes_slowdown; - info.triggered_writes_stop = triggered_writes_stop; - info.smallest_seqno = file_meta->fd.smallest_seqno; - info.largest_seqno = file_meta->fd.largest_seqno; - info.table_properties = prop; - info.flush_reason = cfd->GetFlushReason(); - for (auto listener : immutable_db_options_.listeners) { - listener->OnFlushCompleted(this, info); + for (auto* info : *flush_jobs_info) { + info->triggered_writes_slowdown = triggered_writes_slowdown; + info->triggered_writes_stop = triggered_writes_stop; + for (auto listener : immutable_db_options_.listeners) { + listener->OnFlushCompleted(this, *info); + } + delete info; } + flush_jobs_info->clear(); } mutex_.Lock(); // no need to signal bg_cv_ as it will be signaled at the end of the // flush process. #else (void)cfd; - (void)file_meta; (void)mutable_cf_options; - (void)job_id; - (void)prop; + (void)flush_jobs_info; #endif // ROCKSDB_LITE } diff --git a/db/flush_job.cc b/db/flush_job.cc index 589d81f2974..436bff8284f 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -241,7 +241,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, s = cfd_->imm()->TryInstallMemtableFlushResults( cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_, meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, - log_buffer_); + log_buffer_, &committed_flush_jobs_info_); } if (s.ok() && file_meta != nullptr) { @@ -410,6 +410,8 @@ Status FlushJob::WriteLevel0Table() { meta_.fd.smallest_seqno, meta_.fd.largest_seqno, meta_.marked_for_compaction); } + // Piggyback FlushJobInfo on the first first flushed memtable. + mems_[0]->SetFlushJobInfo(GetFlushJobInfo()); // Note that here we treat flush as level 0 compaction in internal stats InternalStats::CompactionStats stats(CompactionReason::kFlush, 1); @@ -424,4 +426,20 @@ Status FlushJob::WriteLevel0Table() { return s; } +std::unique_ptr FlushJob::GetFlushJobInfo() const { + db_mutex_->AssertHeld(); + std::unique_ptr info(new FlushJobInfo); + info->cf_id = cfd_->GetID(); + info->cf_name = cfd_->GetName(); + info->file_path = MakeTableFileName(cfd_->ioptions()->cf_paths[0].path, + meta_.fd.GetNumber()); + info->thread_id = db_options_.env->GetThreadID(); + info->job_id = job_context_->job_id; + info->smallest_seqno = meta_.fd.smallest_seqno; + info->largest_seqno = meta_.fd.largest_seqno; + info->table_properties = table_properties_; + info->flush_reason = cfd_->GetFlushReason(); + return info; +} + } // namespace rocksdb diff --git a/db/flush_job.h b/db/flush_job.h index fdb0917bdba..46cf6791910 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -12,9 +12,9 @@ #include #include #include +#include #include #include -#include #include "db/column_family.h" #include "db/dbformat.h" @@ -34,6 +34,7 @@ #include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" +#include "rocksdb/listener.h" #include "rocksdb/memtablerep.h" #include "rocksdb/transaction_log.h" #include "table/scoped_arena_iterator.h" @@ -79,14 +80,17 @@ class FlushJob { Status Run(LogsWithPrepTracker* prep_tracker = nullptr, FileMetaData* file_meta = nullptr); void Cancel(); - TableProperties GetTableProperties() const { return table_properties_; } const autovector& GetMemTables() const { return mems_; } + autovector* GetCommittedFlushJobsInfo() { + return &committed_flush_jobs_info_; + } private: void ReportStartedFlush(); void ReportFlushInputSize(const autovector& mems); void RecordFlushIOStats(); Status WriteLevel0Table(); + std::unique_ptr GetFlushJobInfo() const; const std::string& dbname_; ColumnFamilyData* cfd_; @@ -131,6 +135,10 @@ class FlushJob { // In this case, only after all flush jobs succeed in flush can RocksDB // commit to the MANIFEST. const bool write_manifest_; + // The current flush job can commit flush result of a concurrent flush job. + // We collect FlushJobInfo of all jobs committed by current job and fire + // OnFlushCompleted for them. + autovector committed_flush_jobs_info_; // Variables below are set by PickMemTable(): FileMetaData meta_; diff --git a/db/memtable.h b/db/memtable.h index f316ab8e29a..b7c82eb9af3 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -31,6 +31,7 @@ namespace rocksdb { +struct FlushJobInfo; class Mutex; class MemTableIterator; class MergeContext; @@ -418,6 +419,12 @@ class MemTable { flush_in_progress_ = in_progress; } + void SetFlushJobInfo(std::unique_ptr&& info) { + flush_job_info_ = std::move(info); + } + + FlushJobInfo* ReleaseFlushJobInfo() { return flush_job_info_.release(); } + private: enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; @@ -500,6 +507,9 @@ class MemTable { // Gets refrshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow` std::atomic approximate_memory_usage_; + // Flush job info of the current memtable. + std::unique_ptr flush_job_info_; + // Returns a heuristic flush decision bool ShouldFlushNow(); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index e3f0732de15..f480b030639 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -374,7 +374,8 @@ Status MemTableList::TryInstallMemtableFlushResults( const autovector& mems, LogsWithPrepTracker* prep_tracker, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, autovector* to_delete, Directory* db_directory, - LogBuffer* log_buffer) { + LogBuffer* log_buffer, + autovector* committed_flush_jobs_info) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); mu->AssertHeld(); @@ -431,6 +432,10 @@ Status MemTableList::TryInstallMemtableFlushResults( "[%s] Level-0 commit table #%" PRIu64 " started", cfd->GetName().c_str(), m->file_number_); edit_list.push_back(&m->edit_); + FlushJobInfo* info = m->ReleaseFlushJobInfo(); + if (info != nullptr) { + committed_flush_jobs_info->push_back(info); + } memtables_to_flush.push_back(m); } batch_count++; diff --git a/db/memtable_list.h b/db/memtable_list.h index 75cc1a524b2..70f8ee349aa 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -7,7 +7,6 @@ #include #include -#include #include #include #include @@ -33,6 +32,8 @@ class InstrumentedMutex; class MergeIteratorBuilder; class MemTableList; +struct FlushJobInfo; + // keeps a list of immutable memtables in a vector. the list is immutable // if refcount is bigger than one. It is used as a state for Get() and // Iterator code paths @@ -251,7 +252,8 @@ class MemTableList { const autovector& m, LogsWithPrepTracker* prep_tracker, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, autovector* to_delete, Directory* db_directory, - LogBuffer* log_buffer); + LogBuffer* log_buffer, + autovector* committed_flush_jobs_info); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). From 022fc22cf0ad57f5bcc541c4608caeb805b527e6 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Fri, 11 Oct 2019 18:08:12 +0000 Subject: [PATCH 02/11] fix memtable_list_test Signed-off-by: Yi Wu --- db/memtable_list_test.cc | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index b8dc802166e..bb22d691640 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -117,9 +117,14 @@ class MemTableListTest : public testing::Test { // Create dummy mutex. InstrumentedMutex mutex; InstrumentedMutexLock l(&mutex); - return list->TryInstallMemtableFlushResults( + autovector flush_jobs_info; + Status s = list->TryInstallMemtableFlushResults( cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex, - file_num, to_delete, nullptr, &log_buffer); + file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info); + for (auto* info : flush_jobs_info) { + delete info; + } + return s; } // Calls MemTableList::InstallMemtableFlushResults() and sets up all From b9401ec254e2201bff8967ef0a31200a09edebb9 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Mon, 7 Oct 2019 23:52:05 +0000 Subject: [PATCH 03/11] add test Signed-off-by: Yi Wu --- db/db_flush_test.cc | 94 ++++++++++++++++++++++++++ db/db_impl/db_impl_compaction_flush.cc | 1 + db/flush_job.cc | 2 +- db/memtable_list.cc | 2 +- 4 files changed, 97 insertions(+), 2 deletions(-) diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 034ec63226c..9272c20ba66 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -7,10 +7,15 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include + +#include "db/db_impl/db_impl.h" #include "db/db_test_util.h" +#include "port/port.h" #include "port/stack_trace.h" #include "test_util/fault_injection_test_env.h" #include "test_util/sync_point.h" +#include "util/mutexlock.h" namespace rocksdb { @@ -323,6 +328,95 @@ TEST_F(DBFlushTest, CFDropRaceWithWaitForFlushMemTables) { SyncPoint::GetInstance()->DisableProcessing(); } +#ifndef ROCKSDB_LITE +TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) { + class TestListener : public EventListener { + public: + void OnFlushCompleted(DB* db, const FlushJobInfo& info) override { + // There's only one key in each flush. + ASSERT_EQ(info.smallest_seqno, info.largest_seqno); + ASSERT_NE(0, info.smallest_seqno); + if (info.smallest_seqno == seq1) { + // First flush completed + ASSERT_FALSE(completed1); + completed1 = true; + CheckFlushResultCommitted(db, seq1); + } else { + // Second flush completed + ASSERT_FALSE(completed2); + completed2 = true; + ASSERT_EQ(info.smallest_seqno, seq2); + CheckFlushResultCommitted(db, seq2); + } + } + + void CheckFlushResultCommitted(DB* db, SequenceNumber seq) { + DBImpl* db_impl = reinterpret_cast(db); + auto* mutex = db_impl->mutex(); + mutex->Lock(); + auto* cfd = + reinterpret_cast(db->DefaultColumnFamily()) + ->cfd(); + ASSERT_LT(seq, cfd->imm()->current()->GetEarliestSequenceNumber()); + mutex->Unlock(); + } + + std::atomic seq1; + std::atomic seq2; + std::atomic completed1{false}; + std::atomic completed2{false}; + }; + std::shared_ptr listener = std::make_shared(); + std::atomic first_flush_pending{false}; + + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::FlushMemTable:AfterScheduleFlush", + "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:1"}, + {"DBImpl::FlushMemTableToOutputFile:Finish", + "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:2"}}); + SyncPoint::GetInstance()->SetCallBack( + "FlushJob::WriteLevel0Table", [&](void* arg) { + // Wait for the second flush finished, out of mutex. + auto* mems = reinterpret_cast*>(arg); + if (mems->front()->GetEarliestSequenceNumber() == listener->seq1 - 1) { + TEST_SYNC_POINT( + "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:2"); + } + }); + + Options options = CurrentOptions(); + options.create_if_missing = true; + options.listeners.push_back(listener); + // Setting max_flush_jobs = max_background_jobs / 4 = 2. + options.max_background_jobs = 8; + // Allow 2 immutable memtables. + options.max_write_buffer_number = 3; + Reopen(options); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(Put("foo", "v")); + listener->seq1 = db_->GetLatestSequenceNumber(); + // t1 will wait for the second flush complete before committing flush result. + auto t1 = port::Thread([&]() { + // flush_opts.wait = true + ASSERT_OK(db_->Flush(FlushOptions())); + }); + // Wait for first flush scheduled. + TEST_SYNC_POINT("DBFlushTest::FireOnFlushCompletedAfterCommittedResult:1"); + // The second flush will exit early without commit its result. The work + // is delegated to the first flush. + ASSERT_OK(Put("bar", "v")); + listener->seq2 = db_->GetLatestSequenceNumber(); + FlushOptions flush_opts; + flush_opts.wait = false; + ASSERT_OK(db_->Flush(flush_opts)); + t1.join(); + ASSERT_TRUE(listener->completed1); + ASSERT_TRUE(listener->completed2); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} +#endif // !ROCKSDB_LITE + TEST_P(DBAtomicFlushTest, ManualAtomicFlush) { Options options = CurrentOptions(); options.create_if_missing = true; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index abf3a2396f7..0fb48554554 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -232,6 +232,7 @@ Status DBImpl::FlushMemTableToOutputFile( } #endif // ROCKSDB_LITE } + TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish"); return s; } diff --git a/db/flush_job.cc b/db/flush_job.cc index 436bff8284f..20a4a8df49d 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -392,7 +392,7 @@ Status FlushJob::WriteLevel0Table() { if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) { s = output_file_directory_->Fsync(); } - TEST_SYNC_POINT("FlushJob::WriteLevel0Table"); + TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_); db_mutex_->Lock(); } base_->Unref(); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index f480b030639..07acb48d251 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -432,11 +432,11 @@ Status MemTableList::TryInstallMemtableFlushResults( "[%s] Level-0 commit table #%" PRIu64 " started", cfd->GetName().c_str(), m->file_number_); edit_list.push_back(&m->edit_); + memtables_to_flush.push_back(m); FlushJobInfo* info = m->ReleaseFlushJobInfo(); if (info != nullptr) { committed_flush_jobs_info->push_back(info); } - memtables_to_flush.push_back(m); } batch_count++; } From cac4936bbeaf948cec81bd2361e96689e889eca2 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Fri, 11 Oct 2019 22:01:57 +0000 Subject: [PATCH 04/11] update HISTORY.md Signed-off-by: Yi Wu --- HISTORY.md | 1 + 1 file changed, 1 insertion(+) diff --git a/HISTORY.md b/HISTORY.md index 41d1078c7a8..f5f87fc4fcb 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ * Fix a bug when format_version=3, partitioned fitlers, and prefix search are used in conjunction. The bug could result into Seek::(prefix) returning NotFound for an existing prefix. * Revert the feature "Merging iterator to avoid child iterator reseek for some cases (#5286)" since it might cause strong results when reseek happens with a different iterator upper bound. * Fix a bug causing a crash during ingest external file when background compaction cause severe error (file not found). +* Fix OnFlushCompleted fired before flush result persisted in MANIFEST when there's concurrent flush job. The bug exists since OnFlushCompleted was introduced in rocksdb 3.8. ### New Features * Introduced DBOptions::max_write_batch_group_size_bytes to configure maximum limit on number of bytes that are written in a single batch of WAL or memtable write. It is followed when the leader write size is larger than 1/8 of this limit. * VerifyChecksum() by default will issue readahead. Allow ReadOptions to be passed in to those functions to override the readhead size. For checksum verifying before external SST file ingestion, a new option IngestExternalFileOptions.verify_checksums_readahead_size, is added for this readahead setting. From 73d954bed0662f06793833cc16031517af22f2e5 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Mon, 14 Oct 2019 18:32:12 +0000 Subject: [PATCH 05/11] Fix lite build Signed-off-by: Yi Wu --- db/db_impl/db_impl_compaction_flush.cc | 1 - db/flush_job.cc | 4 ++++ db/flush_job.h | 5 +++++ db/memtable.h | 4 ++++ db/memtable_list.cc | 4 ++++ include/rocksdb/listener.h | 1 + 6 files changed, 18 insertions(+), 1 deletion(-) diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 0fb48554554..f0dd69662e6 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -593,7 +593,6 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, (void)file_meta; (void)mutable_cf_options; (void)job_id; - (void)prop; #endif // ROCKSDB_LITE } diff --git a/db/flush_job.cc b/db/flush_job.cc index 20a4a8df49d..716e21e9749 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -410,8 +410,10 @@ Status FlushJob::WriteLevel0Table() { meta_.fd.smallest_seqno, meta_.fd.largest_seqno, meta_.marked_for_compaction); } +#ifndef ROCKSDB_LITE // Piggyback FlushJobInfo on the first first flushed memtable. mems_[0]->SetFlushJobInfo(GetFlushJobInfo()); +#endif // !ROCKSDB_LITE // Note that here we treat flush as level 0 compaction in internal stats InternalStats::CompactionStats stats(CompactionReason::kFlush, 1); @@ -426,6 +428,7 @@ Status FlushJob::WriteLevel0Table() { return s; } +#ifndef ROCKSDB_LITE std::unique_ptr FlushJob::GetFlushJobInfo() const { db_mutex_->AssertHeld(); std::unique_ptr info(new FlushJobInfo); @@ -441,5 +444,6 @@ std::unique_ptr FlushJob::GetFlushJobInfo() const { info->flush_reason = cfd_->GetFlushReason(); return info; } +#endif // !ROCKSDB_LITE } // namespace rocksdb diff --git a/db/flush_job.h b/db/flush_job.h index 46cf6791910..cecc6dd83d9 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -81,16 +81,21 @@ class FlushJob { FileMetaData* file_meta = nullptr); void Cancel(); const autovector& GetMemTables() const { return mems_; } + +#ifndef ROCKSDB_LITE autovector* GetCommittedFlushJobsInfo() { return &committed_flush_jobs_info_; } +#endif // !ROCKSDB_LITE private: void ReportStartedFlush(); void ReportFlushInputSize(const autovector& mems); void RecordFlushIOStats(); Status WriteLevel0Table(); +#ifndef ROCKSDB_LITE std::unique_ptr GetFlushJobInfo() const; +#endif // !ROCKSDB_LITE const std::string& dbname_; ColumnFamilyData* cfd_; diff --git a/db/memtable.h b/db/memtable.h index b7c82eb9af3..cde30dd0734 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -419,11 +419,13 @@ class MemTable { flush_in_progress_ = in_progress; } +#ifndef ROCKSDB_LITE void SetFlushJobInfo(std::unique_ptr&& info) { flush_job_info_ = std::move(info); } FlushJobInfo* ReleaseFlushJobInfo() { return flush_job_info_.release(); } +#endif // !ROCKSDB_LITE private: enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; @@ -507,8 +509,10 @@ class MemTable { // Gets refrshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow` std::atomic approximate_memory_usage_; +#ifndef ROCKSDB_LITE // Flush job info of the current memtable. std::unique_ptr flush_job_info_; +#endif // !ROCKSDB_LITE // Returns a heuristic flush decision bool ShouldFlushNow(); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 07acb48d251..e47b0fe4464 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -433,10 +433,14 @@ Status MemTableList::TryInstallMemtableFlushResults( cfd->GetName().c_str(), m->file_number_); edit_list.push_back(&m->edit_); memtables_to_flush.push_back(m); +#ifndef ROCKSDB_LITE FlushJobInfo* info = m->ReleaseFlushJobInfo(); if (info != nullptr) { committed_flush_jobs_info->push_back(info); } +#else + (void)committed_flush_jobs_info; +#endif // !ROCKSDB_LITE } batch_count++; } diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 5be55cbede8..8d11bfaeaf8 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -459,6 +459,7 @@ class EventListener { #else class EventListener {}; +struct FlushJobInfo {}; #endif // ROCKSDB_LITE From 591d4dff41468684582acafcded29e1ce53556ef Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Mon, 14 Oct 2019 19:30:51 +0000 Subject: [PATCH 06/11] use unique_ptr install of raw pointer Signed-off-by: Yi Wu --- db/db_impl/db_impl.h | 6 +-- db/db_impl/db_impl_compaction_flush.cc | 27 ++++++------- db/flush_job.h | 5 ++- db/memtable.h | 4 +- db/memtable_list.cc | 6 +-- db/memtable_list.h | 3 +- db/memtable_list_test.cc | 5 +-- tools/db_stress.cc | 56 +++++++++++++------------- 8 files changed, 56 insertions(+), 56 deletions(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 4d9cb6201b1..4a9b243d922 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1007,9 +1007,9 @@ class DBImpl : public DB { const MutableCFOptions& mutable_cf_options, int job_id); - void NotifyOnFlushCompleted(ColumnFamilyData* cfd, - const MutableCFOptions& mutable_cf_options, - autovector* flush_jobs_info); + void NotifyOnFlushCompleted( + ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, + std::list>* flush_jobs_info); void NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c, const Status& st, diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index f0dd69662e6..4f81fb0638b 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -303,7 +303,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( autovector distinct_output_dirs; autovector distinct_output_dir_paths; - std::vector jobs; + std::vector> jobs; std::vector all_mutable_cf_options; int num_cfs = static_cast(cfds.size()); all_mutable_cf_options.reserve(num_cfs); @@ -330,7 +330,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions()); const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back(); const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_); - jobs.emplace_back( + jobs.emplace_back(new FlushJob( dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id, env_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot, @@ -338,8 +338,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, false /* sync_output_directory */, false /* write_manifest */, - thread_pri); - jobs.back().PickMemTable(); + thread_pri)); + jobs.back()->PickMemTable(); } std::vector file_meta(num_cfs); @@ -373,7 +373,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( // TODO (yanqin): parallelize jobs with threads. for (int i = 1; i != num_cfs; ++i) { exec_status[i].second = - jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]); + jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]); exec_status[i].first = true; } if (num_cfs > 1) { @@ -383,7 +383,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"); } exec_status[0].second = - jobs[0].Run(&logs_with_prep_tracker_, &file_meta[0]); + jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]); exec_status[0].first = true; Status error_status; @@ -424,7 +424,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( auto wait_to_install_func = [&]() { bool ready = true; for (size_t i = 0; i != cfds.size(); ++i) { - const auto& mems = jobs[i].GetMemTables(); + const auto& mems = jobs[i]->GetMemTables(); if (cfds[i]->IsDropped()) { // If the column family is dropped, then do not wait. continue; @@ -465,7 +465,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( autovector mutable_cf_options_list; autovector tmp_file_meta; for (int i = 0; i != num_cfs; ++i) { - const auto& mems = jobs[i].GetMemTables(); + const auto& mems = jobs[i]->GetMemTables(); if (!cfds[i]->IsDropped() && !mems.empty()) { tmp_cfds.emplace_back(cfds[i]); mems_list.emplace_back(&mems); @@ -506,7 +506,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( continue; } NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i], - jobs[i].GetCommittedFlushJobsInfo()); + jobs[i]->GetCommittedFlushJobsInfo()); if (sfm) { std::string file_path = MakeTableFileName( cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber()); @@ -530,12 +530,12 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( // unref the versions. for (int i = 0; i != num_cfs; ++i) { if (!exec_status[i].first) { - jobs[i].Cancel(); + jobs[i]->Cancel(); } } for (int i = 0; i != num_cfs; ++i) { if (exec_status[i].first && exec_status[i].second.ok()) { - auto& mems = jobs[i].GetMemTables(); + auto& mems = jobs[i]->GetMemTables(); cfds[i]->imm()->RollbackMemtableFlush(mems, file_meta[i].fd.GetNumber()); } @@ -598,7 +598,7 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, void DBImpl::NotifyOnFlushCompleted( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, - autovector* flush_jobs_info) { + std::list>* flush_jobs_info) { #ifndef ROCKSDB_LITE assert(flush_jobs_info != nullptr); if (immutable_db_options_.listeners.size() == 0U) { @@ -617,13 +617,12 @@ void DBImpl::NotifyOnFlushCompleted( // release lock while notifying events mutex_.Unlock(); { - for (auto* info : *flush_jobs_info) { + for (auto& info : *flush_jobs_info) { info->triggered_writes_slowdown = triggered_writes_slowdown; info->triggered_writes_stop = triggered_writes_stop; for (auto listener : immutable_db_options_.listeners) { listener->OnFlushCompleted(this, *info); } - delete info; } flush_jobs_info->clear(); } diff --git a/db/flush_job.h b/db/flush_job.h index cecc6dd83d9..b25aca3529c 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -83,7 +84,7 @@ class FlushJob { const autovector& GetMemTables() const { return mems_; } #ifndef ROCKSDB_LITE - autovector* GetCommittedFlushJobsInfo() { + std::list>* GetCommittedFlushJobsInfo() { return &committed_flush_jobs_info_; } #endif // !ROCKSDB_LITE @@ -143,7 +144,7 @@ class FlushJob { // The current flush job can commit flush result of a concurrent flush job. // We collect FlushJobInfo of all jobs committed by current job and fire // OnFlushCompleted for them. - autovector committed_flush_jobs_info_; + std::list> committed_flush_jobs_info_; // Variables below are set by PickMemTable(): FileMetaData meta_; diff --git a/db/memtable.h b/db/memtable.h index 15aec87b8e4..0aeadce80c8 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -429,7 +429,9 @@ class MemTable { flush_job_info_ = std::move(info); } - FlushJobInfo* ReleaseFlushJobInfo() { return flush_job_info_.release(); } + std::unique_ptr ReleaseFlushJobInfo() { + return std::move(flush_job_info_); + } #endif // !ROCKSDB_LITE private: diff --git a/db/memtable_list.cc b/db/memtable_list.cc index a7670d6c581..de212b6a504 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -386,7 +386,7 @@ Status MemTableList::TryInstallMemtableFlushResults( VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, autovector* to_delete, Directory* db_directory, LogBuffer* log_buffer, - autovector* committed_flush_jobs_info) { + std::list>* committed_flush_jobs_info) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); mu->AssertHeld(); @@ -445,9 +445,9 @@ Status MemTableList::TryInstallMemtableFlushResults( edit_list.push_back(&m->edit_); memtables_to_flush.push_back(m); #ifndef ROCKSDB_LITE - FlushJobInfo* info = m->ReleaseFlushJobInfo(); + std::unique_ptr info = m->ReleaseFlushJobInfo(); if (info != nullptr) { - committed_flush_jobs_info->push_back(info); + committed_flush_jobs_info->push_back(std::move(info)); } #else (void)committed_flush_jobs_info; diff --git a/db/memtable_list.h b/db/memtable_list.h index 742582345e3..d78a8b5ea9e 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -7,6 +7,7 @@ #include #include +#include #include #include #include @@ -256,7 +257,7 @@ class MemTableList { VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, autovector* to_delete, Directory* db_directory, LogBuffer* log_buffer, - autovector* committed_flush_jobs_info); + std::list>* committed_flush_jobs_info); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index bb22d691640..32a227f4b55 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -117,13 +117,10 @@ class MemTableListTest : public testing::Test { // Create dummy mutex. InstrumentedMutex mutex; InstrumentedMutexLock l(&mutex); - autovector flush_jobs_info; + std::list> flush_jobs_info; Status s = list->TryInstallMemtableFlushResults( cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex, file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info); - for (auto* info : flush_jobs_info) { - delete info; - } return s; } diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 84e6de97cc5..78935a051b3 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -2093,23 +2093,22 @@ class StressTest { break; } if (open_cnt != 0) { - thread->stats.FinishedSingleOp(); - MutexLock l(thread->shared->GetMutex()); - while (!thread->snapshot_queue.empty()) { - db_->ReleaseSnapshot( - thread->snapshot_queue.front().second.snapshot); - delete thread->snapshot_queue.front().second.key_vec; - thread->snapshot_queue.pop(); - } - thread->shared->IncVotedReopen(); - if (thread->shared->AllVotedReopen()) { - thread->shared->GetStressTest()->Reopen(); - thread->shared->GetCondVar()->SignalAll(); - } else { - thread->shared->GetCondVar()->Wait(); - } - // Commenting this out as we don't want to reset stats on each open. - // thread->stats.Start(); + thread->stats.FinishedSingleOp(); + MutexLock l(thread->shared->GetMutex()); + while (!thread->snapshot_queue.empty()) { + db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot); + delete thread->snapshot_queue.front().second.key_vec; + thread->snapshot_queue.pop(); + } + thread->shared->IncVotedReopen(); + if (thread->shared->AllVotedReopen()) { + thread->shared->GetStressTest()->Reopen(); + thread->shared->GetCondVar()->SignalAll(); + } else { + thread->shared->GetCondVar()->Wait(); + } + // Commenting this out as we don't want to reset stats on each open. + // thread->stats.Start(); } for (uint64_t i = 0; i < ops_per_open; i++) { @@ -2262,13 +2261,12 @@ class StressTest { ropt.snapshot = snapshot; std::string value_at; // When taking a snapshot, we also read a key from that snapshot. We - // will later read the same key before releasing the snapshot and verify - // that the results are the same. + // will later read the same key before releasing the snapshot and + // verify that the results are the same. auto status_at = db_->Get(ropt, column_family, key, &value_at); - std::vector *key_vec = nullptr; + std::vector* key_vec = nullptr; - if (FLAGS_compare_full_db_state_snapshot && - (thread->tid == 0)) { + if (FLAGS_compare_full_db_state_snapshot && (thread->tid == 0)) { key_vec = new std::vector(FLAGS_max_key); // When `prefix_extractor` is set, seeking to beginning and scanning // across prefixes are only supported with `total_order_seek` set. @@ -2284,7 +2282,8 @@ class StressTest { ThreadState::SnapshotState snap_state = { snapshot, rand_column_family, column_family->GetName(), - keystr, status_at, value_at, key_vec}; + keystr, status_at, value_at, + key_vec}; thread->snapshot_queue.emplace( std::min(FLAGS_ops_per_thread - 1, i + FLAGS_snapshot_hold_ops), snap_state); @@ -2293,8 +2292,8 @@ class StressTest { i >= thread->snapshot_queue.front().first) { auto snap_state = thread->snapshot_queue.front().second; assert(snap_state.snapshot); - // Note: this is unsafe as the cf might be dropped concurrently. But it - // is ok since unclean cf drop is cunnrently not supported by write + // Note: this is unsafe as the cf might be dropped concurrently. But + // it is ok since unclean cf drop is cunnrently not supported by write // prepared transactions. Status s = AssertSame(db_, column_families_[snap_state.cf_at], snap_state); @@ -2336,8 +2335,8 @@ class StressTest { TestPrefixScan(thread, read_opts, rand_column_families, rand_keys); } else if (prefixBound <= prob_op && prob_op < writeBound) { // OPERATION write - TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys, - value, lock); + TestPut(thread, write_opts, read_opts, rand_column_families, + rand_keys, value, lock); } else if (writeBound <= prob_op && prob_op < delBound) { // OPERATION delete TestDelete(thread, write_opts, rand_column_families, rand_keys, lock); @@ -2363,7 +2362,8 @@ class StressTest { thread->rand.Uniform(FLAGS_secondary_catch_up_one_in) == 0) { Status s = secondaries_[tid]->TryCatchUpWithPrimary(); if (!s.ok()) { - VerificationAbort(shared, "Secondary instance failed to catch up", s); + VerificationAbort(shared, "Secondary instance failed to catch up", + s); break; } } From b52a2a965c28253282cb4ce378222e990346f588 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Mon, 14 Oct 2019 19:33:05 +0000 Subject: [PATCH 07/11] revert unwanted changes Signed-off-by: Yi Wu --- tools/db_stress.cc | 56 +++++++++++++++++++++++----------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 78935a051b3..84e6de97cc5 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -2093,22 +2093,23 @@ class StressTest { break; } if (open_cnt != 0) { - thread->stats.FinishedSingleOp(); - MutexLock l(thread->shared->GetMutex()); - while (!thread->snapshot_queue.empty()) { - db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot); - delete thread->snapshot_queue.front().second.key_vec; - thread->snapshot_queue.pop(); - } - thread->shared->IncVotedReopen(); - if (thread->shared->AllVotedReopen()) { - thread->shared->GetStressTest()->Reopen(); - thread->shared->GetCondVar()->SignalAll(); - } else { - thread->shared->GetCondVar()->Wait(); - } - // Commenting this out as we don't want to reset stats on each open. - // thread->stats.Start(); + thread->stats.FinishedSingleOp(); + MutexLock l(thread->shared->GetMutex()); + while (!thread->snapshot_queue.empty()) { + db_->ReleaseSnapshot( + thread->snapshot_queue.front().second.snapshot); + delete thread->snapshot_queue.front().second.key_vec; + thread->snapshot_queue.pop(); + } + thread->shared->IncVotedReopen(); + if (thread->shared->AllVotedReopen()) { + thread->shared->GetStressTest()->Reopen(); + thread->shared->GetCondVar()->SignalAll(); + } else { + thread->shared->GetCondVar()->Wait(); + } + // Commenting this out as we don't want to reset stats on each open. + // thread->stats.Start(); } for (uint64_t i = 0; i < ops_per_open; i++) { @@ -2261,12 +2262,13 @@ class StressTest { ropt.snapshot = snapshot; std::string value_at; // When taking a snapshot, we also read a key from that snapshot. We - // will later read the same key before releasing the snapshot and - // verify that the results are the same. + // will later read the same key before releasing the snapshot and verify + // that the results are the same. auto status_at = db_->Get(ropt, column_family, key, &value_at); - std::vector* key_vec = nullptr; + std::vector *key_vec = nullptr; - if (FLAGS_compare_full_db_state_snapshot && (thread->tid == 0)) { + if (FLAGS_compare_full_db_state_snapshot && + (thread->tid == 0)) { key_vec = new std::vector(FLAGS_max_key); // When `prefix_extractor` is set, seeking to beginning and scanning // across prefixes are only supported with `total_order_seek` set. @@ -2282,8 +2284,7 @@ class StressTest { ThreadState::SnapshotState snap_state = { snapshot, rand_column_family, column_family->GetName(), - keystr, status_at, value_at, - key_vec}; + keystr, status_at, value_at, key_vec}; thread->snapshot_queue.emplace( std::min(FLAGS_ops_per_thread - 1, i + FLAGS_snapshot_hold_ops), snap_state); @@ -2292,8 +2293,8 @@ class StressTest { i >= thread->snapshot_queue.front().first) { auto snap_state = thread->snapshot_queue.front().second; assert(snap_state.snapshot); - // Note: this is unsafe as the cf might be dropped concurrently. But - // it is ok since unclean cf drop is cunnrently not supported by write + // Note: this is unsafe as the cf might be dropped concurrently. But it + // is ok since unclean cf drop is cunnrently not supported by write // prepared transactions. Status s = AssertSame(db_, column_families_[snap_state.cf_at], snap_state); @@ -2335,8 +2336,8 @@ class StressTest { TestPrefixScan(thread, read_opts, rand_column_families, rand_keys); } else if (prefixBound <= prob_op && prob_op < writeBound) { // OPERATION write - TestPut(thread, write_opts, read_opts, rand_column_families, - rand_keys, value, lock); + TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys, + value, lock); } else if (writeBound <= prob_op && prob_op < delBound) { // OPERATION delete TestDelete(thread, write_opts, rand_column_families, rand_keys, lock); @@ -2362,8 +2363,7 @@ class StressTest { thread->rand.Uniform(FLAGS_secondary_catch_up_one_in) == 0) { Status s = secondaries_[tid]->TryCatchUpWithPrimary(); if (!s.ok()) { - VerificationAbort(shared, "Secondary instance failed to catch up", - s); + VerificationAbort(shared, "Secondary instance failed to catch up", s); break; } } From 9d1a9b4f1100929366fd78610bed92ebeb047804 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Mon, 14 Oct 2019 22:01:26 +0000 Subject: [PATCH 08/11] Fix flush_job_test. Address comments Signed-off-by: Yi Wu --- db/db_flush_test.cc | 16 ++++++++-------- db/flush_job_test.cc | 12 ++++++------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 9272c20ba66..429612d0661 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -15,6 +15,7 @@ #include "port/stack_trace.h" #include "test_util/fault_injection_test_env.h" #include "test_util/sync_point.h" +#include "util/cast_util.h" #include "util/mutexlock.h" namespace rocksdb { @@ -351,8 +352,8 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) { } void CheckFlushResultCommitted(DB* db, SequenceNumber seq) { - DBImpl* db_impl = reinterpret_cast(db); - auto* mutex = db_impl->mutex(); + DBImpl* db_impl = static_cast_with_check(db); + InstrumentedMutex* mutex = db_impl->mutex(); mutex->Lock(); auto* cfd = reinterpret_cast(db->DefaultColumnFamily()) @@ -371,16 +372,15 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) { SyncPoint::GetInstance()->LoadDependency( {{"DBImpl::FlushMemTable:AfterScheduleFlush", - "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:1"}, + "wait_for_schedule_first_flush"}, {"DBImpl::FlushMemTableToOutputFile:Finish", - "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:2"}}); + "wait_for_second_flush_job_finish"}}); SyncPoint::GetInstance()->SetCallBack( - "FlushJob::WriteLevel0Table", [&](void* arg) { + "FlushJob::WriteLevel0Table", [&listener](void* arg) { // Wait for the second flush finished, out of mutex. auto* mems = reinterpret_cast*>(arg); if (mems->front()->GetEarliestSequenceNumber() == listener->seq1 - 1) { - TEST_SYNC_POINT( - "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:2"); + TEST_SYNC_POINT("wait_for_second_flush_job_finish"); } }); @@ -401,7 +401,7 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) { ASSERT_OK(db_->Flush(FlushOptions())); }); // Wait for first flush scheduled. - TEST_SYNC_POINT("DBFlushTest::FireOnFlushCompletedAfterCommittedResult:1"); + TEST_SYNC_POINT("wait_for_schedule_first_flush"); // The second flush will exit early without commit its result. The work // is delegated to the first flush. ASSERT_OK(Put("bar", "v")); diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index e4400e84355..7a36da5f118 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -306,18 +306,18 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { EventLogger event_logger(db_options_.info_log.get()); SnapshotChecker* snapshot_checker = nullptr; // not relevant - std::vector flush_jobs; + std::vector> flush_jobs; k = 0; for (auto cfd : all_cfds) { std::vector snapshot_seqs; - flush_jobs.emplace_back( + flush_jobs.emplace_back(new FlushJob( dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), &memtable_ids[k], env_options_, versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, false /* sync_output_directory */, false /* write_manifest */, - Env::Priority::USER); + Env::Priority::USER)); k++; } HistogramData hist; @@ -326,12 +326,12 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { file_metas.reserve(flush_jobs.size()); mutex_.Lock(); for (auto& job : flush_jobs) { - job.PickMemTable(); + job->PickMemTable(); } for (auto& job : flush_jobs) { FileMetaData meta; // Run will release and re-acquire mutex - ASSERT_OK(job.Run(nullptr /**/, &meta)); + ASSERT_OK(job->Run(nullptr /**/, &meta)); file_metas.emplace_back(meta); } autovector file_meta_ptrs; @@ -340,7 +340,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { } autovector*> mems_list; for (size_t i = 0; i != all_cfds.size(); ++i) { - const auto& mems = flush_jobs[i].GetMemTables(); + const auto& mems = flush_jobs[i]->GetMemTables(); mems_list.push_back(&mems); } autovector mutable_cf_options_list; From 0467f886c1358d85aa20bb2c6c52a46314b337c0 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Mon, 14 Oct 2019 22:41:53 +0000 Subject: [PATCH 09/11] update sync point name Signed-off-by: Yi Wu --- db/db_flush_test.cc | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 429612d0661..5f363bdc7e6 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -372,15 +372,17 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) { SyncPoint::GetInstance()->LoadDependency( {{"DBImpl::FlushMemTable:AfterScheduleFlush", - "wait_for_schedule_first_flush"}, + "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"}, {"DBImpl::FlushMemTableToOutputFile:Finish", - "wait_for_second_flush_job_finish"}}); + "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitSecond"}}); SyncPoint::GetInstance()->SetCallBack( "FlushJob::WriteLevel0Table", [&listener](void* arg) { // Wait for the second flush finished, out of mutex. auto* mems = reinterpret_cast*>(arg); if (mems->front()->GetEarliestSequenceNumber() == listener->seq1 - 1) { - TEST_SYNC_POINT("wait_for_second_flush_job_finish"); + TEST_SYNC_POINT( + "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:" + "WaitSecond"); } }); @@ -401,7 +403,8 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) { ASSERT_OK(db_->Flush(FlushOptions())); }); // Wait for first flush scheduled. - TEST_SYNC_POINT("wait_for_schedule_first_flush"); + TEST_SYNC_POINT( + "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"); // The second flush will exit early without commit its result. The work // is delegated to the first flush. ASSERT_OK(Put("bar", "v")); From e7d77096a301626b0c125ad4f6ff38ad651bacf1 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Tue, 15 Oct 2019 06:19:16 +0000 Subject: [PATCH 10/11] attempt to fix lint Signed-off-by: Yi Wu --- db/db_flush_test.cc | 5 ++--- db/db_impl/db_impl_compaction_flush.cc | 2 ++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 5f363bdc7e6..c586cd3222d 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -362,13 +362,12 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) { mutex->Unlock(); } - std::atomic seq1; - std::atomic seq2; + std::atomic seq1{0}; + std::atomic seq2{0}; std::atomic completed1{false}; std::atomic completed2{false}; }; std::shared_ptr listener = std::make_shared(); - std::atomic first_flush_pending{false}; SyncPoint::GetInstance()->LoadDependency( {{"DBImpl::FlushMemTable:AfterScheduleFlush", diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 4f81fb0638b..70ac42385f8 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -382,6 +382,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( TEST_SYNC_POINT( "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"); } + assert(exec_status.size() > 0); exec_status[0].second = jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]); exec_status[0].first = true; @@ -501,6 +502,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( #ifndef ROCKSDB_LITE auto sfm = static_cast( immutable_db_options_.sst_file_manager.get()); + assert(all_mutable_cf_options.size() == static_cast(num_cfs)); for (int i = 0; i != num_cfs; ++i) { if (cfds[i]->IsDropped()) { continue; From 05fd54f723247ec9b335259ac0ed970187da5460 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Tue, 15 Oct 2019 12:52:52 -0700 Subject: [PATCH 11/11] Address lint error Assert `file_meta` is non-empty before accessing it via subscription. --- db/db_impl/db_impl_compaction_flush.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 70ac42385f8..90c49ba8588 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -383,6 +383,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"); } assert(exec_status.size() > 0); + assert(!file_meta.empty()); exec_status[0].second = jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]); exec_status[0].first = true;