Skip to content

Commit

Permalink
Revert "Auto resume the DB from Retryable IO Error (facebook#6765)"
Browse files Browse the repository at this point in the history
This reverts commit a10f12e.
  • Loading branch information
jay-zhuang committed Jul 17, 2020
1 parent ec711b2 commit cd8bbbc
Show file tree
Hide file tree
Showing 15 changed files with 47 additions and 1,050 deletions.
1 change: 0 additions & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
* DB identity (`db_id`) and DB session identity (`db_session_id`) are added to table properties and stored in SST files. SST files generated from SstFileWriter and Repairer have DB identity “SST Writer” and “DB Repairer”, respectively. Their DB session IDs are generated in the same way as `DB::GetDbSessionId`. The session ID for SstFileWriter (resp., Repairer) resets every time `SstFileWriter::Open` (resp., `Repairer::Run`) is called.
* Added experimental option BlockBasedTableOptions::optimize_filters_for_memory for reducing allocated memory size of Bloom filters (~10% savings with Jemalloc) while preserving the same general accuracy. To have an effect, the option requires format_version=5 and malloc_usable_size. Enabling this option is forward and backward compatible with existing format_version=5.
* `BackupTableNameOption BackupableDBOptions::share_files_with_checksum_naming` is added, where `BackupTableNameOption` is an `enum` type with two enumerators `kChecksumAndFileSize` and `kChecksumAndFileSize`. By default, `BackupTableNameOption BackupableDBOptions::share_files_with_checksum_naming` is set to `kChecksumAndDbSessionId`. In this default case, backup table filenames are of the form `<file_number>_<crc32c>_<db_session_id>.sst` as opposed to `<file_number>_<crc32c>_<file_size>.sst`. The new default behavior fixes the backup file name collision problem, which might be possible at large scale, but the option `kChecksumAndFileSize` is added to allow use of old naming in case it is needed. This default behavior change is not an upgrade issue, because previous versions of RocksDB can read, restore, and delete backups using new names, and it's OK for a backup directory to use a mixture of table file naming schemes. Note that `share_files_with_checksum_naming` comes into effect only when both `share_files_with_checksum` and `share_table_files` are true.
* Added auto resume function to automatically recover the DB from background Retryable IO Error. When retryable IOError happens during flush and WAL write, the error is mapped to Hard Error and DB will be in read mode. When retryable IO Error happens during compaction, the error will be mapped to Soft Error. DB is still in write/read mode. Autoresume function will create a thread for a DB to call DB->ResumeImpl() to try the recover for Retryable IO Error during flush and WAL write. Compaction will be rescheduled by itself if retryable IO Error happens. Auto resume may also cause other Retryable IO Error during the recovery, so the recovery will fail. Retry the auto resume may solve the issue, so we use max_bgerror_resume_count to decide how many resume cycles will be tried in total. If it is <=0, auto resume retryable IO Error is disabled. Default is INT_MAX, which will lead to a infinit auto resume. bgerror_resume_retry_interval decides the time interval between two auto resumes.

### Bug Fixes
* Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further.
Expand Down
18 changes: 5 additions & 13 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ Status BuildTable(
// Reports the IOStats for flush for every following bytes.
const size_t kReportFlushIOStatsEvery = 1048576;
Status s;
IOStatus io_s;
meta->fd.file_size = 0;
iter->SeekToFirst();
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg(
Expand Down Expand Up @@ -125,11 +124,7 @@ Status BuildTable(
bool use_direct_writes = file_options.use_direct_writes;
TEST_SYNC_POINT_CALLBACK("BuildTable:create_file", &use_direct_writes);
#endif // !NDEBUG
io_s = NewWritableFile(fs, fname, &file, file_options);
s = io_s;
if (io_status->ok()) {
*io_status = io_s;
}
s = NewWritableFile(fs, fname, &file, file_options);
if (!s.ok()) {
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger, ioptions.listeners, dbname, column_family_name, fname,
Expand Down Expand Up @@ -198,10 +193,7 @@ Status BuildTable(
} else {
s = builder->Finish();
}
io_s = builder->io_status();
if (io_status->ok()) {
*io_status = io_s;
}
*io_status = builder->io_status();

if (s.ok() && !empty) {
uint64_t file_size = builder->FileSize();
Expand All @@ -220,16 +212,16 @@ Status BuildTable(
StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS);
*io_status = file_writer->Sync(ioptions.use_fsync);
}
if (s.ok() && io_status->ok() && !empty) {
if (io_status->ok() && !empty) {
*io_status = file_writer->Close();
}
if (s.ok() && io_status->ok() && !empty) {
if (io_status->ok() && !empty) {
// Add the checksum information to file metadata.
meta->file_checksum = file_writer->GetFileChecksum();
meta->file_checksum_func_name = file_writer->GetFileChecksumFuncName();
}

if (s.ok()) {
if (!io_status->ok()) {
s = *io_status;
}

Expand Down
41 changes: 13 additions & 28 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ struct CompactionJob::SubcompactionState {
// The return status of this subcompaction
Status status;

// The return IO Status of this subcompaction
IOStatus io_status;

// Files produced by this subcompaction
struct Output {
FileMetaData meta;
Expand Down Expand Up @@ -182,7 +179,6 @@ struct CompactionJob::SubcompactionState {
start = std::move(o.start);
end = std::move(o.end);
status = std::move(o.status);
io_status = std::move(o.io_status);
outputs = std::move(o.outputs);
outfile = std::move(o.outfile);
builder = std::move(o.builder);
Expand Down Expand Up @@ -613,26 +609,22 @@ Status CompactionJob::Run() {

// Check if any thread encountered an error during execution
Status status;
IOStatus io_s;
for (const auto& state : compact_->sub_compact_states) {
if (!state.status.ok()) {
status = state.status;
io_s = state.io_status;
break;
}
}
if (io_status_.ok()) {
io_status_ = io_s;
}

IOStatus io_s;
if (status.ok() && output_directory_) {
io_s = output_directory_->Fsync(IOOptions(), nullptr);
}
if (io_status_.ok()) {
if (!io_s.ok()) {
io_status_ = io_s;
}
if (status.ok()) {
status = io_s;
}

if (status.ok()) {
thread_pool.clear();
std::vector<const FileMetaData*> files_meta;
Expand Down Expand Up @@ -1218,7 +1210,6 @@ Status CompactionJob::FinishCompactionOutputFile(
} else {
it->SeekToFirst();
}
TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1");
for (; it->Valid(); it->Next()) {
auto tombstone = it->Tombstone();
if (upper_bound != nullptr) {
Expand Down Expand Up @@ -1319,9 +1310,9 @@ Status CompactionJob::FinishCompactionOutputFile(
} else {
sub_compact->builder->Abandon();
}
IOStatus io_s = sub_compact->builder->io_status();
if (s.ok()) {
s = io_s;
if (!sub_compact->builder->io_status().ok()) {
io_status_ = sub_compact->builder->io_status();
s = io_status_;
}
const uint64_t current_bytes = sub_compact->builder->FileSize();
if (s.ok()) {
Expand All @@ -1331,25 +1322,24 @@ Status CompactionJob::FinishCompactionOutputFile(
sub_compact->total_bytes += current_bytes;

// Finish and check for file errors
IOStatus io_s;
if (s.ok()) {
StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
io_s = sub_compact->outfile->Sync(db_options_.use_fsync);
}
if (s.ok() && io_s.ok()) {
if (io_s.ok()) {
io_s = sub_compact->outfile->Close();
}
if (s.ok() && io_s.ok()) {
if (io_s.ok()) {
// Add the checksum information to file metadata.
meta->file_checksum = sub_compact->outfile->GetFileChecksum();
meta->file_checksum_func_name =
sub_compact->outfile->GetFileChecksumFuncName();
}
if (s.ok()) {
if (!io_s.ok()) {
io_status_ = io_s;
s = io_s;
}
if (sub_compact->io_status.ok()) {
sub_compact->io_status = io_s;
}
sub_compact->outfile.reset();

TableProperties tp;
Expand Down Expand Up @@ -1498,12 +1488,7 @@ Status CompactionJob::OpenCompactionOutputFile(
TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
&syncpoint_arg);
#endif
Status s;
IOStatus io_s = NewWritableFile(fs_, fname, &writable_file, file_options_);
s = io_s;
if (sub_compact->io_status.ok()) {
sub_compact->io_status = io_s;
}
Status s = NewWritableFile(fs_, fname, &writable_file, file_options_);
if (!s.ok()) {
ROCKS_LOG_ERROR(
db_options_.info_log,
Expand Down
4 changes: 2 additions & 2 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1710,8 +1710,8 @@ class DBImpl : public DB {
size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
Env::WriteLifeTimeHint CalculateWALWriteHint() { return Env::WLTH_SHORT; }

IOStatus CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
size_t preallocate_block_size, log::Writer** new_log);
Status CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
size_t preallocate_block_size, log::Writer** new_log);

// Validate self-consistency of DB options
static Status ValidateOptions(const DBOptions& db_options);
Expand Down
11 changes: 2 additions & 9 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,7 @@ Status DBImpl::FlushMemTableToOutputFile(
} else {
flush_job.Cancel();
}
if (io_s.ok()) {
io_s = flush_job.io_status();
}
io_s = flush_job.io_status();

if (s.ok()) {
InstallSuperVersionAndScheduleWork(cfd, superversion_context,
Expand Down Expand Up @@ -1109,12 +1107,7 @@ Status DBImpl::CompactFilesImpl(
"[%s] [JOB %d] Compaction error: %s",
c->column_family_data()->GetName().c_str(),
job_context->job_id, status.ToString().c_str());
IOStatus io_s = compaction_job.io_status();
if (!io_s.ok()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction);
} else {
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
}
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
}

if (output_file_names != nullptr) {
Expand Down
17 changes: 8 additions & 9 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1388,10 +1388,9 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
!kSeqPerBatch, kBatchPerTxn);
}

IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
size_t preallocate_block_size,
log::Writer** new_log) {
IOStatus io_s;
Status DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
size_t preallocate_block_size, log::Writer** new_log) {
Status s;
std::unique_ptr<FSWritableFile> lfile;

DBOptions db_options =
Expand All @@ -1409,13 +1408,13 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
LogFileName(immutable_db_options_.wal_dir, recycle_log_number);
TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
io_s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
&lfile, /*dbg=*/nullptr);
s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
&lfile, /*dbg=*/nullptr);
} else {
io_s = NewWritableFile(fs_.get(), log_fname, &lfile, opt_file_options);
s = NewWritableFile(fs_.get(), log_fname, &lfile, opt_file_options);
}

if (io_s.ok()) {
if (s.ok()) {
lfile->SetWriteLifeTimeHint(CalculateWALWriteHint());
lfile->SetPreallocationBlockSize(preallocate_block_size);

Expand All @@ -1427,7 +1426,7 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
immutable_db_options_.recycle_log_file_num > 0,
immutable_db_options_.manual_wal_flush);
}
return io_s;
return s;
}

Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
Expand Down
19 changes: 4 additions & 15 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1638,7 +1638,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
std::unique_ptr<WritableFile> lfile;
log::Writer* new_log = nullptr;
MemTable* new_mem = nullptr;
IOStatus io_s;

// Recoverable state is persisted in WAL. After memtable switch, WAL might
// be deleted, so we write the state to memtable to be persisted as well.
Expand Down Expand Up @@ -1684,11 +1683,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
if (creating_new_log) {
// TODO: Write buffer size passed in should be max of all CF's instead
// of mutable_cf_options.write_buffer_size.
io_s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size,
&new_log);
if (s.ok()) {
s = io_s;
}
s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size,
&new_log);
}
if (s.ok()) {
SequenceNumber seq = versions_->LastSequence();
Expand All @@ -1714,10 +1710,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
if (!logs_.empty()) {
// Alway flush the buffer of the last log before switching to a new one
log::Writer* cur_log_writer = logs_.back().writer;
io_s = cur_log_writer->WriteBuffer();
if (s.ok()) {
s = io_s;
}
s = cur_log_writer->WriteBuffer();
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64
Expand Down Expand Up @@ -1752,11 +1745,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
}
// We may have lost data from the WritableFileBuffer in-memory buffer for
// the current log, so treat it as a fatal error and set bg_error
if (!io_s.ok()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kMemTable);
} else {
error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable);
}
error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable);
// Read back bg_error in order to get the right severity
s = error_handler_.GetBGError();
return s;
Expand Down
Loading

0 comments on commit cd8bbbc

Please sign in to comment.