diff --git a/HISTORY.md b/HISTORY.md index 4f308f19944..abdf8ff1bf9 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,6 +1,12 @@ # Rocksdb Change Log > NOTE: Entries for next release do not go here. Follow instructions in `unreleased_history/README.txt` +## 8.6.6 (09/25/2023) +### Bug Fixes +* Fix a bug with atomic_flush=true that can cause DB to stuck after a flush fails (#11872). +* Fix a bug where RocksDB (with atomic_flush=false) can delete output SST files of pending flushes when a previous concurrent flush fails (#11865). This can result in DB entering read-only state with error message like `IO error: No such file or directory: While open a file for random read: /tmp/rocksdbtest-501/db_flush_test_87732_4230653031040984171/000013.sst`. +* When the compressed secondary cache capacity is reduced to 0, it should be completely disabled. Before this fix, inserts and lookups would still go to the backing `LRUCache` before returning, thus incurring locking overhead. With this fix, inserts and lookups are no-ops and do not add any overhead. + ## 8.6.5 (09/15/2023) ### Bug Fixes * Fixed a bug where `rocksdb.file.read.verify.file.checksums.micros` is not populated. diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index acf9723e9ad..8537af84d8d 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -3193,6 +3193,279 @@ INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, INSTANTIATE_TEST_CASE_P(DBAtomicFlushTest, DBAtomicFlushTest, testing::Bool()); +TEST_F(DBFlushTest, NonAtomicFlushRollbackPendingFlushes) { + // Fix a bug in when atomic_flush=false. + // The bug can happen as follows: + // Start Flush0 for memtable M0 to SST0 + // Start Flush1 for memtable M1 to SST1 + // Flush1 returns OK, but don't install to MANIFEST and let whoever flushes + // M0 to take care of it + // Flush0 finishes with a retryable IOError + // - It rollbacks M0, (incorrectly) not M1 + // - Deletes SST1 and SST2 + // + // Auto-recovery will start Flush2 for M0, it does not pick up M1 since it + // thinks that M1 is flushed + // Flush2 writes SST3 and finishes OK, tries to install SST3 and SST2 + // Error opening SST2 since it's already deleted + // + // The fix is to let Flush0 also rollback M1. + Options opts = CurrentOptions(); + opts.atomic_flush = false; + opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1)); + opts.max_write_buffer_number = 64; + opts.max_background_flushes = 4; + env_->SetBackgroundThreads(4, Env::HIGH); + DestroyAndReopen(opts); + std::atomic_int flush_count = 0; + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->SetCallBack( + "FlushJob::WriteLevel0Table:s", [&](void* s_ptr) { + int c = flush_count.fetch_add(1); + if (c == 0) { + Status* s = (Status*)(s_ptr); + IOStatus io_error = IOStatus::IOError("injected foobar"); + io_error.SetRetryable(true); + *s = io_error; + TEST_SYNC_POINT("Let mem1 flush start"); + TEST_SYNC_POINT("Wait for mem1 flush to finish"); + } + }); + SyncPoint::GetInstance()->LoadDependency( + {{"Let mem1 flush start", "Mem1 flush starts"}, + {"DBImpl::BGWorkFlush:done", "Wait for mem1 flush to finish"}, + {"RecoverFromRetryableBGIOError:RecoverSuccess", + "Wait for error recover"}}); + // Need first flush to wait for the second flush to finish + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(Put(Key(1), "val1")); + // trigger bg flush mem0 + ASSERT_OK(Put(Key(2), "val2")); + TEST_SYNC_POINT("Mem1 flush starts"); + // trigger bg flush mem1 + ASSERT_OK(Put(Key(3), "val3")); + + TEST_SYNC_POINT("Wait for error recover"); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBFlushTest, AbortNonAtomicFlushWhenBGError) { + // Fix a bug in when atomic_flush=false. + // The bug can happen as follows: + // Start Flush0 for memtable M0 to SST0 + // Start Flush1 for memtable M1 to SST1 + // Flush1 returns OK, but doesn't install output MANIFEST and let whoever + // flushes M0 to take care of it + // Start Flush2 for memtable M2 to SST2 + // Flush0 finishes with a retryable IOError + // - It rollbacks M0 AND M1 + // - Deletes SST1 and SST2 + // Flush2 finishes, does not rollback M2, + // - releases the pending file number that keeps SST2 alive + // - deletes SST2 + // + // Then auto-recovery starts, error opening SST2 when try to install + // flush result + // + // The fix is to let Flush2 rollback M2 if it finds that + // there is a background error. + Options opts = CurrentOptions(); + opts.atomic_flush = false; + opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1)); + opts.max_write_buffer_number = 64; + opts.max_background_flushes = 4; + env_->SetBackgroundThreads(4, Env::HIGH); + DestroyAndReopen(opts); + std::atomic_int flush_count = 0; + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->SetCallBack( + "FlushJob::WriteLevel0Table:s", [&](void* s_ptr) { + int c = flush_count.fetch_add(1); + if (c == 0) { + Status* s = (Status*)(s_ptr); + IOStatus io_error = IOStatus::IOError("injected foobar"); + io_error.SetRetryable(true); + *s = io_error; + TEST_SYNC_POINT("Let mem1 flush start"); + TEST_SYNC_POINT("Wait for mem1 flush to finish"); + + TEST_SYNC_POINT("Let mem2 flush start"); + TEST_SYNC_POINT("Wait for mem2 to start writing table"); + } + }); + + SyncPoint::GetInstance()->SetCallBack( + "FlushJob::WriteLevel0Table", [&](void* mems) { + autovector* mems_ptr = (autovector*)mems; + if ((*mems_ptr)[0]->GetID() == 3) { + TEST_SYNC_POINT("Mem2 flush starts writing table"); + TEST_SYNC_POINT("Mem2 flush waits until rollback"); + } + }); + SyncPoint::GetInstance()->LoadDependency( + {{"Let mem1 flush start", "Mem1 flush starts"}, + {"DBImpl::BGWorkFlush:done", "Wait for mem1 flush to finish"}, + {"Let mem2 flush start", "Mem2 flush starts"}, + {"Mem2 flush starts writing table", + "Wait for mem2 to start writing table"}, + {"RollbackMemtableFlush", "Mem2 flush waits until rollback"}, + {"RecoverFromRetryableBGIOError:RecoverSuccess", + "Wait for error recover"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put(Key(1), "val1")); + // trigger bg flush mem0 + ASSERT_OK(Put(Key(2), "val2")); + TEST_SYNC_POINT("Mem1 flush starts"); + // trigger bg flush mem1 + ASSERT_OK(Put(Key(3), "val3")); + + TEST_SYNC_POINT("Mem2 flush starts"); + ASSERT_OK(Put(Key(4), "val4")); + + TEST_SYNC_POINT("Wait for error recover"); + // Recovery flush writes 3 memtables together into 1 file. + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBFlushTest, NonAtomicNormalFlushAbortWhenBGError) { + Options opts = CurrentOptions(); + opts.atomic_flush = false; + opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1)); + opts.max_write_buffer_number = 64; + opts.max_background_flushes = 1; + env_->SetBackgroundThreads(2, Env::HIGH); + DestroyAndReopen(opts); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); + std::atomic_int flush_write_table_count = 0; + SyncPoint::GetInstance()->SetCallBack( + "FlushJob::WriteLevel0Table:s", [&](void* s_ptr) { + int c = flush_write_table_count.fetch_add(1); + if (c == 0) { + Status* s = (Status*)(s_ptr); + IOStatus io_error = IOStatus::IOError("injected foobar"); + io_error.SetRetryable(true); + *s = io_error; + } + }); + + SyncPoint::GetInstance()->EnableProcessing(); + SyncPoint::GetInstance()->LoadDependency( + {{"Let error recovery start", + "RecoverFromRetryableBGIOError:BeforeStart"}, + {"RecoverFromRetryableBGIOError:RecoverSuccess", + "Wait for error recover"}}); + + ASSERT_OK(Put(Key(1), "val1")); + // trigger bg flush0 for mem0 + ASSERT_OK(Put(Key(2), "val2")); + // Not checking status since this wait can finish before flush starts. + dbfull()->TEST_WaitForFlushMemTable().PermitUncheckedError(); + + // trigger bg flush1 for mem1, should see bg error and abort + // before picking a memtable to flush + ASSERT_OK(Put(Key(3), "val3")); + ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable()); + ASSERT_EQ(0, NumTableFilesAtLevel(0)); + + TEST_SYNC_POINT("Let error recovery start"); + TEST_SYNC_POINT("Wait for error recover"); + // Recovery flush writes 2 memtables together into 1 file. + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + // 1 for flush 0 and 1 for recovery flush + ASSERT_EQ(2, flush_write_table_count); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBFlushTest, DBStuckAfterAtomicFlushError) { + // Test for a bug with atomic flush where DB can become stuck + // after a flush error. A repro timeline: + // + // Start Flush0 for mem0 + // Start Flush1 for mem1 + // Now Flush1 will wait for Flush0 to install mem0 + // Flush0 finishes with retryable IOError, rollbacks mem0 + // Resume starts and waits for background job to finish, i.e., Flush1 + // Fill memtable again, trigger Flush2 for mem0 + // Flush2 will get error status, and not rollback mem0, see code in + // https://github.com/facebook/rocksdb/blob/b927ba5936216861c2c35ab68f50ba4a78e65747/db/db_impl/db_impl_compaction_flush.cc#L725 + // + // DB is stuck since mem0 can never be picked now + // + // The fix is to rollback mem0 in Flush2, and let Flush1 also abort upon + // background error besides waiting for older memtables to be installed. + // The recovery flush in this case should pick up all memtables + // and write them to a single L0 file. + Options opts = CurrentOptions(); + opts.atomic_flush = true; + opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1)); + opts.max_write_buffer_number = 64; + opts.max_background_flushes = 4; + env_->SetBackgroundThreads(4, Env::HIGH); + DestroyAndReopen(opts); + + std::atomic_int flush_count = 0; + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->SetCallBack( + "FlushJob::WriteLevel0Table:s", [&](void* s_ptr) { + int c = flush_count.fetch_add(1); + if (c == 0) { + Status* s = (Status*)(s_ptr); + IOStatus io_error = IOStatus::IOError("injected foobar"); + io_error.SetRetryable(true); + *s = io_error; + TEST_SYNC_POINT("Let flush for mem1 start"); + // Wait for Flush1 to start waiting to install flush result + TEST_SYNC_POINT("Wait for flush for mem1"); + } + }); + SyncPoint::GetInstance()->LoadDependency( + {{"Let flush for mem1 start", "Flush for mem1"}, + {"DBImpl::AtomicFlushMemTablesToOutputFiles:WaitCV", + "Wait for flush for mem1"}, + {"RecoverFromRetryableBGIOError:BeforeStart", + "Wait for resume to start"}, + {"Recovery should continue here", + "RecoverFromRetryableBGIOError:BeforeStart2"}, + {"RecoverFromRetryableBGIOError:RecoverSuccess", + "Wait for error recover"}}); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(Put(Key(1), "val1")); + // trigger Flush0 for mem0 + ASSERT_OK(Put(Key(2), "val2")); + + // trigger Flush1 for mem1 + TEST_SYNC_POINT("Flush for mem1"); + ASSERT_OK(Put(Key(3), "val3")); + + // Wait until resume started to schedule another flush + TEST_SYNC_POINT("Wait for resume to start"); + // This flush should not be scheduled due to bg error + ASSERT_OK(Put(Key(4), "val4")); + + // TEST_WaitForBackgroundWork() returns background error + // after all background work is done. + ASSERT_NOK(dbfull()->TEST_WaitForBackgroundWork()); + // Flush should abort and not writing any table + ASSERT_EQ(0, NumTableFilesAtLevel(0)); + + // Wait until this flush is done. + TEST_SYNC_POINT("Recovery should continue here"); + TEST_SYNC_POINT("Wait for error recover"); + // error recovery can schedule new flushes, but should not + // encounter error + ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork()); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 9822b401a16..51fcdfa7b77 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -471,6 +471,28 @@ Status DBImpl::ResumeImpl(DBRecoverContext context) { if (shutdown_initiated_) { s = Status::ShutdownInProgress(); } + if (s.ok() && context.flush_after_recovery) { + // Since we drop all non-recovery flush requests during recovery, + // and new memtable may fill up during recovery, + // schedule one more round of flush. + FlushOptions flush_opts; + flush_opts.allow_write_stall = false; + flush_opts.wait = false; + Status status = FlushAllColumnFamilies( + flush_opts, FlushReason::kCatchUpAfterErrorRecovery); + if (!status.ok()) { + // FlushAllColumnFamilies internally should take care of setting + // background error if needed. + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "The catch up flush after successful recovery failed [%s]", + s.ToString().c_str()); + } + // FlushAllColumnFamilies releases and re-acquires mutex. + if (shutdown_initiated_) { + s = Status::ShutdownInProgress(); + } + } + if (s.ok()) { for (auto cfd : *versions_->GetColumnFamilySet()) { SchedulePendingCompaction(cfd); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 732b0667dea..6bf5d0e21ab 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -283,6 +283,24 @@ Status DBImpl::FlushMemTableToOutputFile( // If the log sync failed, we do not need to pick memtable. Otherwise, // num_flush_not_started_ needs to be rollback. TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"); + // Exit a flush due to bg error should not set bg error again. + bool skip_set_bg_error = false; + if (s.ok() && !error_handler_.GetBGError().ok() && + error_handler_.IsBGWorkStopped() && + flush_reason != FlushReason::kErrorRecovery && + flush_reason != FlushReason::kErrorRecoveryRetryFlush) { + // Error recovery in progress, should not pick memtable which excludes + // them from being picked up by recovery flush. + // This ensures that when bg error is set, no new flush can pick + // memtables. + skip_set_bg_error = true; + s = error_handler_.GetBGError(); + assert(!s.ok()); + ROCKS_LOG_BUFFER(log_buffer, + "[JOB %d] Skip flush due to background error %s", + job_context->job_id, s.ToString().c_str()); + } + if (s.ok()) { flush_job.PickMemTable(); need_cancel = true; @@ -303,7 +321,8 @@ Status DBImpl::FlushMemTableToOutputFile( // is unlocked by the current thread. if (s.ok()) { s = flush_job.Run(&logs_with_prep_tracker_, &file_meta, - &switched_to_mempurge); + &switched_to_mempurge, &skip_set_bg_error, + &error_handler_); need_cancel = false; } @@ -344,7 +363,8 @@ Status DBImpl::FlushMemTableToOutputFile( } } - if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) { + if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() && + !skip_set_bg_error) { if (log_io_s.ok()) { // Error while writing to MANIFEST. // In fact, versions_->io_status() can also be the result of renaming @@ -556,6 +576,21 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( pick_status.push_back(false); } + bool flush_for_recovery = + bg_flush_args[0].flush_reason_ == FlushReason::kErrorRecovery || + bg_flush_args[0].flush_reason_ == FlushReason::kErrorRecoveryRetryFlush; + bool skip_set_bg_error = false; + + if (s.ok() && !error_handler_.GetBGError().ok() && + error_handler_.IsBGWorkStopped() && !flush_for_recovery) { + s = error_handler_.GetBGError(); + skip_set_bg_error = true; + assert(!s.ok()); + ROCKS_LOG_BUFFER(log_buffer, + "[JOB %d] Skip flush due to background error %s", + job_context->job_id, s.ToString().c_str()); + } + if (s.ok()) { for (int i = 0; i != num_cfs; ++i) { jobs[i]->PickMemTable(); @@ -620,7 +655,10 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( } } } - } else { + } else if (!skip_set_bg_error) { + // When `skip_set_bg_error` is true, no memtable is picked so + // there is no need to call Cancel() or RollbackMemtableFlush(). + // // Need to undo atomic flush if something went wrong, i.e. s is not OK and // it is not because of CF drop. // Have to cancel the flush jobs that have NOT executed because we need to @@ -633,8 +671,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( for (int i = 0; i != num_cfs; ++i) { if (exec_status[i].second.ok() && exec_status[i].first) { auto& mems = jobs[i]->GetMemTables(); - cfds[i]->imm()->RollbackMemtableFlush(mems, - file_meta[i].fd.GetNumber()); + cfds[i]->imm()->RollbackMemtableFlush( + mems, /*rollback_succeeding_memtables=*/false); } } } @@ -676,10 +714,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( }; bool resuming_from_bg_err = - error_handler_.IsDBStopped() || - (bg_flush_args[0].flush_reason_ == FlushReason::kErrorRecovery || - bg_flush_args[0].flush_reason_ == - FlushReason::kErrorRecoveryRetryFlush); + error_handler_.IsDBStopped() || flush_for_recovery; while ((!resuming_from_bg_err || error_handler_.GetRecoveryError().ok())) { std::pair res = wait_to_install_func(); @@ -690,15 +725,27 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( s = res.first; break; } else if (!res.second) { + // we are the oldest immutable memtable + break; + } + // We are not the oldest immutable memtable + TEST_SYNC_POINT_CALLBACK( + "DBImpl::AtomicFlushMemTablesToOutputFiles:WaitCV", &res); + // + // If bg work is stopped, recovery thread first calls + // WaitForBackgroundWork() before proceeding to flush for recovery. This + // flush can block WaitForBackgroundWork() while waiting for recovery + // flush to install result. To avoid this deadlock, we should abort here + // if there is background error. + if (!flush_for_recovery && error_handler_.IsBGWorkStopped() && + !error_handler_.GetBGError().ok()) { + s = error_handler_.GetBGError(); + assert(!s.ok()); break; } atomic_flush_install_cv_.Wait(); - resuming_from_bg_err = - error_handler_.IsDBStopped() || - (bg_flush_args[0].flush_reason_ == FlushReason::kErrorRecovery || - bg_flush_args[0].flush_reason_ == - FlushReason::kErrorRecoveryRetryFlush); + resuming_from_bg_err = error_handler_.IsDBStopped() || flush_for_recovery; } if (!resuming_from_bg_err) { @@ -714,6 +761,17 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( // installation. s = error_handler_.GetRecoveryError(); } + // Since we are not installing these memtables, need to rollback + // to allow future flush job to pick up these memtables. + if (!s.ok()) { + for (int i = 0; i != num_cfs; ++i) { + assert(exec_status[i].first); + assert(exec_status[i].second.ok()); + auto& mems = jobs[i]->GetMemTables(); + cfds[i]->imm()->RollbackMemtableFlush( + mems, /*rollback_succeeding_memtables=*/false); + } + } } if (s.ok()) { @@ -817,7 +875,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( // Need to undo atomic flush if something went wrong, i.e. s is not OK and // it is not because of CF drop. - if (!s.ok() && !s.IsColumnFamilyDropped()) { + if (!s.ok() && !s.IsColumnFamilyDropped() && !skip_set_bg_error) { if (log_io_s.ok()) { // Error while writing to MANIFEST. // In fact, versions_->io_status() can also be the result of renaming @@ -2223,9 +2281,13 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, WaitForPendingWrites(); if (flush_reason != FlushReason::kErrorRecoveryRetryFlush && + flush_reason != FlushReason::kCatchUpAfterErrorRecovery && (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load())) { // Note that, when flush reason is kErrorRecoveryRetryFlush, during the // auto retry resume, we want to avoid creating new small memtables. + // If flush reason is kCatchUpAfterErrorRecovery, we try to flush any new + // memtable that filled up during recovery, and we also want to avoid + // switching memtable to create small memtables. // Therefore, SwitchMemtable will not be called. Also, since ResumeImpl // will iterate through all the CFs and call FlushMemtable during auto // retry resume, it is possible that in some CFs, @@ -2416,7 +2478,8 @@ Status DBImpl::AtomicFlushMemTables( for (auto cfd : cfds) { if ((cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) || - flush_reason == FlushReason::kErrorRecoveryRetryFlush) { + flush_reason == FlushReason::kErrorRecoveryRetryFlush || + flush_reason == FlushReason::kCatchUpAfterErrorRecovery) { continue; } cfd->Ref(); @@ -2684,6 +2747,11 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { // There has been a hard error and this call is not part of the recovery // sequence. Bail out here so we don't get into an endless loop of // scheduling BG work which will again call this function + // + // Note that a non-recovery flush can still be scheduled if + // error_handler_.IsRecoveryInProgress() returns true. We rely on + // BackgroundCallFlush() to check flush reason and drop non-recovery + // flushes. return; } else if (shutting_down_.load(std::memory_order_acquire)) { // DB is being deleted; no more background compactions @@ -3013,6 +3081,24 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, // This cfd is already referenced FlushRequest flush_req = PopFirstFromFlushQueue(); FlushReason flush_reason = flush_req.flush_reason; + if (!error_handler_.GetBGError().ok() && error_handler_.IsBGWorkStopped() && + flush_reason != FlushReason::kErrorRecovery && + flush_reason != FlushReason::kErrorRecoveryRetryFlush) { + // Stop non-recovery flush when bg work is stopped + // Note that we drop the flush request here. + // Recovery thread should schedule further flushes after bg error + // is cleared. + status = error_handler_.GetBGError(); + assert(!status.ok()); + ROCKS_LOG_BUFFER(log_buffer, + "[JOB %d] Abort flush due to background error %s", + job_context->job_id, status.ToString().c_str()); + *reason = flush_reason; + for (auto item : flush_req.cfd_to_max_mem_id_to_persist) { + item.first->UnrefAndTryDelete(); + } + return status; + } if (!immutable_db_options_.atomic_flush && ShouldRescheduleFlushRequestToRetainUDT(flush_req)) { assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1); @@ -3155,9 +3241,9 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) { bg_cv_.SignalAll(); // In case a waiter can proceed despite the error mutex_.Unlock(); ROCKS_LOG_ERROR(immutable_db_options_.info_log, - "Waiting after background flush error: %s" + "[JOB %d] Waiting after background flush error: %s" "Accumulated background error counts: %" PRIu64, - s.ToString().c_str(), error_cnt); + job_context.job_id, s.ToString().c_str(), error_cnt); log_buffer.FlushBufferToLog(); LogFlush(immutable_db_options_.info_log); immutable_db_options_.clock->SleepForMicroseconds(1000000); diff --git a/db/error_handler.cc b/db/error_handler.cc index 55821952de8..7d3fcc82aeb 100644 --- a/db/error_handler.cc +++ b/db/error_handler.cc @@ -653,6 +653,7 @@ const Status& ErrorHandler::StartRecoverFromRetryableBGIOError( } recovery_in_prog_ = true; + TEST_SYNC_POINT("StartRecoverFromRetryableBGIOError::in_progress"); recovery_thread_.reset( new port::Thread(&ErrorHandler::RecoverFromRetryableBGIOError, this)); @@ -667,6 +668,7 @@ const Status& ErrorHandler::StartRecoverFromRetryableBGIOError( // mutex is released. void ErrorHandler::RecoverFromRetryableBGIOError() { TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeStart"); + TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeStart2"); InstrumentedMutexLock l(db_mutex_); if (end_recovery_) { EventHelpers::NotifyOnErrorRecoveryEnd(db_options_.listeners, bg_error_, @@ -675,6 +677,7 @@ void ErrorHandler::RecoverFromRetryableBGIOError() { return; } DBRecoverContext context = recover_context_; + context.flush_after_recovery = true; int resume_count = db_options_.max_bgerror_resume_count; uint64_t wait_interval = db_options_.bgerror_resume_retry_interval; uint64_t retry_count = 0; diff --git a/db/error_handler.h b/db/error_handler.h index 34e08a525d7..6b1e8028636 100644 --- a/db/error_handler.h +++ b/db/error_handler.h @@ -19,10 +19,13 @@ class DBImpl; // FlushReason, which tells the flush job why this flush is called. struct DBRecoverContext { FlushReason flush_reason; + bool flush_after_recovery; - DBRecoverContext() : flush_reason(FlushReason::kErrorRecovery) {} - - DBRecoverContext(FlushReason reason) : flush_reason(reason) {} + DBRecoverContext() + : flush_reason(FlushReason::kErrorRecovery), + flush_after_recovery(false) {} + DBRecoverContext(FlushReason reason) + : flush_reason(reason), flush_after_recovery(false) {} }; class ErrorHandler { diff --git a/db/event_helpers.cc b/db/event_helpers.cc index d442a1ed7b7..700c5f22c79 100644 --- a/db/event_helpers.cc +++ b/db/event_helpers.cc @@ -240,6 +240,8 @@ void EventHelpers::NotifyOnErrorRecoveryEnd( info.new_bg_error.PermitUncheckedError(); } db_mutex->Lock(); + } else { + old_bg_error.PermitUncheckedError(); } } diff --git a/db/flush_job.cc b/db/flush_job.cc index d3a777b4493..57fffc8a464 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -79,6 +79,8 @@ const char* GetFlushReasonString(FlushReason flush_reason) { return "Error Recovery Retry Flush"; case FlushReason::kWalFull: return "WAL Full"; + case FlushReason::kCatchUpAfterErrorRecovery: + return "Catch Up After Error Recovery"; default: return "Invalid"; } @@ -215,7 +217,8 @@ void FlushJob::PickMemTable() { } Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta, - bool* switched_to_mempurge) { + bool* switched_to_mempurge, bool* skipped_since_bg_error, + ErrorHandler* error_handler) { TEST_SYNC_POINT("FlushJob::Start"); db_mutex_->AssertHeld(); assert(pick_memtable_called); @@ -303,17 +306,32 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta, } if (!s.ok()) { - cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber()); + cfd_->imm()->RollbackMemtableFlush( + mems_, /*rollback_succeeding_memtables=*/!db_options_.atomic_flush); } else if (write_manifest_) { - TEST_SYNC_POINT("FlushJob::InstallResults"); - // Replace immutable memtable with the generated Table - s = cfd_->imm()->TryInstallMemtableFlushResults( - cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_, - meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, - log_buffer_, &committed_flush_jobs_info_, - !(mempurge_s.ok()) /* write_edit : true if no mempurge happened (or if aborted), + assert(!db_options_.atomic_flush); + if (!db_options_.atomic_flush && + flush_reason_ != FlushReason::kErrorRecovery && + flush_reason_ != FlushReason::kErrorRecoveryRetryFlush && + error_handler && !error_handler->GetBGError().ok() && + error_handler->IsBGWorkStopped()) { + cfd_->imm()->RollbackMemtableFlush( + mems_, /*rollback_succeeding_memtables=*/!db_options_.atomic_flush); + s = error_handler->GetBGError(); + if (skipped_since_bg_error) { + *skipped_since_bg_error = true; + } + } else { + TEST_SYNC_POINT("FlushJob::InstallResults"); + // Replace immutable memtable with the generated Table + s = cfd_->imm()->TryInstallMemtableFlushResults( + cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_, + meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, + log_buffer_, &committed_flush_jobs_info_, + !(mempurge_s.ok()) /* write_edit : true if no mempurge happened (or if aborted), but 'false' if mempurge successful: no new min log number or new level 0 file path to write to manifest. */); + } } if (s.ok() && file_meta != nullptr) { @@ -965,6 +983,7 @@ Status FlushJob::WriteLevel0Table() { &table_properties_, write_hint, full_history_ts_low, blob_callback_, base_, &num_input_entries, &memtable_payload_bytes, &memtable_garbage_bytes); + TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:s", &s); // TODO: Cleanup io_status in BuildTable and table builders assert(!s.ok() || io_s.ok()); io_s.PermitUncheckedError(); diff --git a/db/flush_job.h b/db/flush_job.h index 43d10ffe939..db5dbd719f2 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -83,9 +83,14 @@ class FlushJob { // Require db_mutex held. // Once PickMemTable() is called, either Run() or Cancel() has to be called. void PickMemTable(); + // @param skip_since_bg_error If not nullptr and if atomic_flush=false, + // then it is set to true if flush installation is skipped and memtable + // is rolled back due to existing background error. Status Run(LogsWithPrepTracker* prep_tracker = nullptr, FileMetaData* file_meta = nullptr, - bool* switched_to_mempurge = nullptr); + bool* switched_to_mempurge = nullptr, + bool* skipped_since_bg_error = nullptr, + ErrorHandler* error_handler = nullptr); void Cancel(); const autovector& GetMemTables() const { return mems_; } diff --git a/db/memtable_list.cc b/db/memtable_list.cc index ee1563f0161..9eb0f53cde2 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -434,23 +434,57 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id, } void MemTableList::RollbackMemtableFlush(const autovector& mems, - uint64_t /*file_number*/) { + bool rollback_succeeding_memtables) { + TEST_SYNC_POINT("RollbackMemtableFlush"); AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_MEMTABLE_ROLLBACK); - assert(!mems.empty()); - - // If the flush was not successful, then just reset state. - // Maybe a succeeding attempt to flush will be successful. +#ifndef NDEBUG for (MemTable* m : mems) { assert(m->flush_in_progress_); assert(m->file_number_ == 0); + } +#endif + + if (rollback_succeeding_memtables && !mems.empty()) { + std::list& memlist = current_->memlist_; + auto it = memlist.rbegin(); + for (; *it != mems[0] && it != memlist.rend(); ++it) { + } + // mems should be in memlist + assert(*it == mems[0]); + if (*it == mems[0]) { + ++it; + } + while (it != memlist.rend()) { + MemTable* m = *it; + // Only rollback complete, not in-progress, + // in_progress can be flushes that are still writing SSTs + if (m->flush_completed_) { + m->flush_in_progress_ = false; + m->flush_completed_ = false; + m->edit_.Clear(); + m->file_number_ = 0; + num_flush_not_started_++; + ++it; + } else { + break; + } + } + } - m->flush_in_progress_ = false; - m->flush_completed_ = false; - m->edit_.Clear(); - num_flush_not_started_++; + for (MemTable* m : mems) { + if (m->flush_in_progress_) { + assert(m->file_number_ == 0); + m->file_number_ = 0; + m->flush_in_progress_ = false; + m->flush_completed_ = false; + m->edit_.Clear(); + num_flush_not_started_++; + } + } + if (!mems.empty()) { + imm_flush_needed.store(true, std::memory_order_release); } - imm_flush_needed.store(true, std::memory_order_release); } // Try record a successful flush in the manifest file. It might just return diff --git a/db/memtable_list.h b/db/memtable_list.h index e95493b6f29..51d14dff7b8 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -271,8 +271,20 @@ class MemTableList { // Reset status of the given memtable list back to pending state so that // they can get picked up again on the next round of flush. + // + // @param rollback_succeeding_memtables If true, will rollback adjacent + // younger memtables whose flush is completed. Specifically, suppose the + // current immutable memtables are M_0,M_1...M_N ordered from youngest to + // oldest. Suppose that the youngest memtable in `mems` is M_K. We will try to + // rollback M_K-1, M_K-2... until the first memtable whose flush is + // not completed. These are the memtables that would have been installed + // by this flush job if it were to succeed. This flag is currently used + // by non atomic_flush rollback. + // Note that we also do rollback in `write_manifest_cb` by calling + // `RemoveMemTablesOrRestoreFlags()`. There we rollback the entire batch so + // it is similar to what we do here with rollback_succeeding_memtables=true. void RollbackMemtableFlush(const autovector& mems, - uint64_t file_number); + bool rollback_succeeding_memtables); // Try commit a successful flush in the manifest file. It might just return // Status::OK letting a concurrent flush to do the actual the recording. diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index dfa1dbfc79b..12f7495b8e6 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -682,7 +682,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); // Revert flush - list.RollbackMemtableFlush(to_flush, 0); + list.RollbackMemtableFlush(to_flush, false); ASSERT_FALSE(list.IsFlushPending()); ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); to_flush.clear(); @@ -732,7 +732,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); // Rollback first pick of tables - list.RollbackMemtableFlush(to_flush, 0); + list.RollbackMemtableFlush(to_flush, false); ASSERT_TRUE(list.IsFlushPending()); ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); to_flush.clear(); diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 87bc678693b..b7b58ae0059 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -161,6 +161,7 @@ enum class CompactionReason : int { kNumOfReasons, }; +// When adding flush reason, make sure to also update `GetFlushReasonString()`. enum class FlushReason : int { kOthers = 0x00, kGetLiveFiles = 0x01, @@ -178,6 +179,8 @@ enum class FlushReason : int { // will not be called to avoid many small immutable memtables. kErrorRecoveryRetryFlush = 0xc, kWalFull = 0xd, + // SwitchMemtable will not be called for this flush reason. + kCatchUpAfterErrorRecovery = 0xe, }; // TODO: In the future, BackgroundErrorReason will only be used to indicate diff --git a/include/rocksdb/version.h b/include/rocksdb/version.h index e043ea03eb2..fd1447cf825 100644 --- a/include/rocksdb/version.h +++ b/include/rocksdb/version.h @@ -13,7 +13,7 @@ // minor or major version number planned for release. #define ROCKSDB_MAJOR 8 #define ROCKSDB_MINOR 6 -#define ROCKSDB_PATCH 5 +#define ROCKSDB_PATCH 6 // Do not use these. We made the mistake of declaring macros starting with // double underscore. Now we have to live with our choice. We'll deprecate these diff --git a/unreleased_history/bug_fixes/compressed_sec_cache_disable.md b/unreleased_history/bug_fixes/compressed_sec_cache_disable.md deleted file mode 100644 index 9c80f447427..00000000000 --- a/unreleased_history/bug_fixes/compressed_sec_cache_disable.md +++ /dev/null @@ -1 +0,0 @@ -When the compressed secondary cache capacity is reduced to 0, it should be completely disabled. Before this fix, inserts and lookups would still go to the backing `LRUCache` before returning, thus incurring locking overhead. With this fix, inserts and lookups are no-ops and do not add any overhead.