Skip to content

Commit

Permalink
fix pagestorage v2 restore (#5384) (#5391)
Browse files Browse the repository at this point in the history
* This is an automated cherry-pick of #5384

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>

* Update PageStorage.h

* fix static analysis

* update format code

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
Co-authored-by: lidezhu <47731263+lidezhu@users.noreply.github.com>
Co-authored-by: lidezhu <lidezhu@pingcap.com>
  • Loading branch information
3 people authored Sep 14, 2022
1 parent 76d5e3f commit 12a9cc0
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 31 deletions.
40 changes: 21 additions & 19 deletions dbms/src/Storages/Page/V2/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ void PageStorage::restore()
/// Restore current version from both formal and legacy page files

MetaMergingQueue merging_queue;
for (auto & page_file : page_files)
for (const auto & page_file : page_files)
{
if (!(page_file.getType() == PageFile::Type::Formal || page_file.getType() == PageFile::Type::Legacy
|| page_file.getType() == PageFile::Type::Checkpoint))
Expand Down Expand Up @@ -286,7 +286,7 @@ void PageStorage::restore()
// Remove old checkpoints and archive obsolete PageFiles that have not been archived yet during gc for some reason.
#ifdef PAGE_STORAGE_UTIL_DEBUGGGING
LOG_FMT_TRACE(log, "{} These file would be archive:", storage_name);
for (auto & pf : page_files_to_remove)
for (const auto & pf : page_files_to_remove)
LOG_FMT_TRACE(log, "{} {}", storage_name, pf.toString());
#else
// when restore `PageStorage`, the `PageFile` in `page_files_to_remove` is not counted in the total size,
Expand All @@ -297,13 +297,15 @@ void PageStorage::restore()
}

// Fill write_files
PageFileIdAndLevel max_page_file_id_lvl{0, 0};
{
const size_t num_delta_paths = delegator->numPaths();
std::vector<size_t> next_write_fill_idx(num_delta_paths);
std::iota(next_write_fill_idx.begin(), next_write_fill_idx.end(), 0);
// Only insert location of PageFile when it storing delta data
for (const auto & page_file : page_files)
{
max_page_file_id_lvl = std::max(max_page_file_id_lvl, page_file.fileIdLevel());
// Checkpoint file is always stored on `delegator`'s default path, so no need to insert it's location here
size_t idx_in_delta_paths = delegator->addPageFileUsedSize(
page_file.fileIdLevel(),
Expand All @@ -327,7 +329,7 @@ void PageStorage::restore()
std::vector<String> store_paths = delegator->listPaths();
for (size_t i = 0; i < write_files.size(); ++i)
{
auto writer = checkAndRenewWriter(write_files[i], /*parent_path_hint=*/store_paths[i % store_paths.size()]);
auto writer = checkAndRenewWriter(write_files[i], max_page_file_id_lvl, /*parent_path_hint=*/store_paths[i % store_paths.size()]);
idle_writers.emplace_back(std::move(writer));
}
#endif
Expand All @@ -346,7 +348,7 @@ PageId PageStorage::getMaxId()
return versioned_page_entries.getSnapshot()->version()->maxId();
}

PageId PageStorage::getNormalPageId(PageId page_id, SnapshotPtr snapshot)
PageId PageStorage::getNormalPageId(PageId page_id, SnapshotPtr snapshot) // NOLINT(google-default-arguments)
{
if (!snapshot)
{
Expand All @@ -357,7 +359,7 @@ PageId PageStorage::getNormalPageId(PageId page_id, SnapshotPtr snapshot)
return is_ref_id ? normal_page_id : page_id;
}

DB::PageEntry PageStorage::getEntry(PageId page_id, SnapshotPtr snapshot)
DB::PageEntry PageStorage::getEntry(PageId page_id, SnapshotPtr snapshot) // NOLINT(google-default-arguments)
{
if (!snapshot)
{
Expand Down Expand Up @@ -385,6 +387,7 @@ DB::PageEntry PageStorage::getEntry(PageId page_id, SnapshotPtr snapshot)
// The <id,level> of the new `page_file` is <max_id + 1, 0> of all `write_files`
PageStorage::WriterPtr PageStorage::checkAndRenewWriter( //
WritingPageFile & writing_file,
PageFileIdAndLevel max_page_file_id_lvl_hint,
const String & parent_path_hint,
PageStorage::WriterPtr && old_writer,
const String & logging_msg)
Expand Down Expand Up @@ -425,8 +428,7 @@ PageStorage::WriterPtr PageStorage::checkAndRenewWriter( //
// Check whether caller has defined a hint path
pf_parent_path = parent_path_hint;
}

PageFileIdAndLevel max_writing_id_lvl{0, 0};
PageFileIdAndLevel max_writing_id_lvl{max_page_file_id_lvl_hint};
for (const auto & wf : write_files)
max_writing_id_lvl = std::max(max_writing_id_lvl, wf.file.fileIdLevel());
delegator->addPageFileUsedSize( //
Expand All @@ -439,7 +441,6 @@ PageStorage::WriterPtr PageStorage::checkAndRenewWriter( //
writing_file.file
= PageFile::newPageFile(max_writing_id_lvl.first + 1, 0, pf_parent_path, file_provider, PageFile::Type::Formal, page_file_log);
writing_file.persisted.meta_offset = 0;

write_file_writer = writing_file.file.createWriter(config.sync_on_write, true);
}
return write_file_writer;
Expand Down Expand Up @@ -469,7 +470,7 @@ PageStorage::ReaderPtr PageStorage::getReader(const PageFileIdAndLevel & file_id
return pages_reader;
}

void PageStorage::write(DB::WriteBatch && wb, const WriteLimiterPtr & write_limiter)
void PageStorage::write(DB::WriteBatch && wb, const WriteLimiterPtr & write_limiter) // NOLINT(google-default-arguments)
{
if (unlikely(wb.empty()))
return;
Expand Down Expand Up @@ -542,7 +543,7 @@ void PageStorage::write(DB::WriteBatch && wb, const WriteLimiterPtr & write_limi

// Check whether we need to roll to new PageFile and its writer
const auto logging_msg = " PageFile_" + DB::toString(writing_file.file.getFileId()) + "_0 is full,";
file_to_write = checkAndRenewWriter(writing_file, "", std::move(file_to_write), logging_msg);
file_to_write = checkAndRenewWriter(writing_file, {0, 0}, "", std::move(file_to_write), logging_msg);

idle_writers.emplace_back(std::move(file_to_write));

Expand Down Expand Up @@ -577,7 +578,7 @@ std::tuple<size_t, double, unsigned> PageStorage::getSnapshotsStat() const
return versioned_page_entries.getSnapshotsStat();
}

DB::Page PageStorage::read(PageId page_id, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot)
DB::Page PageStorage::read(PageId page_id, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) // NOLINT(google-default-arguments)
{
if (!snapshot)
{
Expand All @@ -593,7 +594,7 @@ DB::Page PageStorage::read(PageId page_id, const ReadLimiterPtr & read_limiter,
return file_reader->read(to_read, read_limiter)[page_id];
}

PageMap PageStorage::read(const std::vector<PageId> & page_ids, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot)
PageMap PageStorage::read(const std::vector<PageId> & page_ids, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) // NOLINT(google-default-arguments)
{
if (!snapshot)
{
Expand Down Expand Up @@ -636,7 +637,7 @@ PageMap PageStorage::read(const std::vector<PageId> & page_ids, const ReadLimite
return page_map;
}

void PageStorage::read(const std::vector<PageId> & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot)
void PageStorage::read(const std::vector<PageId> & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) // NOLINT(google-default-arguments)
{
if (!snapshot)
{
Expand Down Expand Up @@ -676,7 +677,7 @@ void PageStorage::read(const std::vector<PageId> & page_ids, const PageHandler &
}
}

PageMap PageStorage::read(const std::vector<PageReadFields> & page_fields, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot)
PageMap PageStorage::read(const std::vector<PageReadFields> & page_fields, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) // NOLINT(google-default-arguments)
{
if (!snapshot)
snapshot = this->getSnapshot();
Expand Down Expand Up @@ -718,7 +719,7 @@ PageMap PageStorage::read(const std::vector<PageReadFields> & page_fields, const
return page_map;
}

void PageStorage::traverse(const std::function<void(const DB::Page & page)> & acceptor, SnapshotPtr snapshot)
void PageStorage::traverse(const std::function<void(const DB::Page & page)> & acceptor, SnapshotPtr snapshot) // NOLINT(google-default-arguments)
{
if (!snapshot)
{
Expand Down Expand Up @@ -902,7 +903,7 @@ WriteBatch::SequenceID PageStorage::WritingFilesSnapshot::minPersistedSequence()
return seq;
}

bool PageStorage::gc(bool not_skip, const WriteLimiterPtr & write_limiter, const ReadLimiterPtr & read_limiter)
bool PageStorage::gc(bool not_skip, const WriteLimiterPtr & write_limiter, const ReadLimiterPtr & read_limiter) // NOLINT(google-default-arguments)
{
// If another thread is running gc, just return;
bool v = false;
Expand Down Expand Up @@ -1079,8 +1080,9 @@ bool PageStorage::gc(bool not_skip, const WriteLimiterPtr & write_limiter, const
// Legacy and checkpoint files will be removed from `page_files` after `tryCompact`.
LegacyCompactor compactor(*this, write_limiter, read_limiter);
PageFileSet page_files_to_archive;
auto files_to_compact = std::move(page_files);
std::tie(page_files, page_files_to_archive, gc_context.num_bytes_written_in_compact_legacy)
= compactor.tryCompact(std::move(page_files), writing_files_snapshot);
= compactor.tryCompact(std::move(files_to_compact), writing_files_snapshot);
archivePageFiles(page_files_to_archive, true);
gc_context.num_files_archive_in_compact_legacy = page_files_to_archive.size();
}
Expand Down Expand Up @@ -1149,7 +1151,7 @@ void PageStorage::archivePageFiles(const PageFileSet & page_files, bool remove_s
if (!archive_dir.exists())
archive_dir.createDirectory();

for (auto & page_file : page_files)
for (const auto & page_file : page_files)
{
Poco::Path path(page_file.folderPath());
auto dest = archive_path.toString() + "/" + path.getFileName();
Expand Down Expand Up @@ -1218,7 +1220,7 @@ PageStorage::gcRemoveObsoleteData(PageFileSet & page_files,
{
size_t num_data_removed = 0;
size_t num_bytes_removed = 0;
for (auto & page_file : page_files)
for (const auto & page_file : page_files)
{
const auto page_id_and_lvl = page_file.fileIdLevel();
if (page_id_and_lvl >= writing_file_id_level)
Expand Down
19 changes: 10 additions & 9 deletions dbms/src/Storages/Page/V2/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,23 @@ class PageStorage : public DB::PageStorage

std::tuple<size_t, double, unsigned> getSnapshotsStat() const override;

void write(DB::WriteBatch && write_batch, const WriteLimiterPtr & write_limiter = nullptr) override;
void write(DB::WriteBatch && wb, const WriteLimiterPtr & write_limiter = nullptr) override; // NOLINT(google-default-arguments)

DB::PageEntry getEntry(PageId page_id, SnapshotPtr snapshot = {}) override;
DB::PageEntry getEntry(PageId page_id, SnapshotPtr snapshot = {}) override; // NOLINT(google-default-arguments)

DB::Page read(PageId page_id, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}) override;
DB::Page read(PageId page_id, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}) override; // NOLINT(google-default-arguments)

PageMap read(const std::vector<PageId> & page_ids, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}) override;
PageMap read(const std::vector<PageId> & page_ids, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}) override; // NOLINT(google-default-arguments)

void read(const std::vector<PageId> & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}) override;
void read(const std::vector<PageId> & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}) override; // NOLINT(google-default-arguments)

PageMap read(const std::vector<PageReadFields> & page_fields, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}) override;
PageMap read(const std::vector<PageReadFields> & page_fields, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}) override; // NOLINT(google-default-arguments)

void traverse(const std::function<void(const DB::Page & page)> & acceptor, SnapshotPtr snapshot = {}) override;
void traverse(const std::function<void(const DB::Page & page)> & acceptor, SnapshotPtr snapshot = {}) override; // NOLINT(google-default-arguments)

void traversePageEntries(const std::function<void(PageId page_id, const DB::PageEntry & page)> & acceptor, SnapshotPtr snapshot) override;

bool gc(bool not_skip = false, const WriteLimiterPtr & write_limiter = nullptr, const ReadLimiterPtr & read_limiter = nullptr) override;
bool gc(bool not_skip = false, const WriteLimiterPtr & write_limiter = nullptr, const ReadLimiterPtr & read_limiter = nullptr) override; // NOLINT(google-default-arguments)

void registerExternalPagesCallbacks(ExternalPagesScanner scanner, ExternalPagesRemover remover) override;

Expand Down Expand Up @@ -241,7 +241,8 @@ class PageStorage : public DB::PageStorage

private:
WriterPtr checkAndRenewWriter(
WritingPageFile & page_file,
WritingPageFile & writing_file,
PageFileIdAndLevel max_page_file_id_lvl_hint,
const String & parent_path_hint,
WriterPtr && old_writer = nullptr,
const String & logging_msg = "");
Expand Down
32 changes: 32 additions & 0 deletions dbms/src/Storages/Page/V2/tests/gtest_page_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,38 @@ try
}
CATCH

TEST_F(PageStorage_test, RenewWriter)
try
{
constexpr size_t buf_sz = 100;
char c_buff[buf_sz];

{
WriteBatch wb;
memset(c_buff, 0xf, buf_sz);
auto buf = std::make_shared<ReadBufferFromMemory>(c_buff, sizeof(c_buff));
wb.putPage(1, 0, buf, buf_sz);
storage->write(std::move(wb));
}

{
PageStorage::ListPageFilesOption opt;
auto page_files = storage->listAllPageFiles(file_provider, storage->delegator, storage->log, opt);
ASSERT_EQ(page_files.size(), 1UL);
}

PageStorage::Config config;
config.file_roll_size = 10; // make it easy to renew a new page file for write
storage = reopenWithConfig(config);

{
PageStorage::ListPageFilesOption opt;
auto page_files = storage->listAllPageFiles(file_provider, storage->delegator, storage->log, opt);
ASSERT_EQ(page_files.size(), 2UL);
}
}
CATCH

/// Check if we can correctly do read / write after restore from disk.
TEST_F(PageStorage_test, WriteReadRestore)
try
Expand Down
14 changes: 11 additions & 3 deletions format-diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,19 @@ def main():
if subprocess.Popen(cmd, shell=True, cwd=tics_repo_path).wait():
exit(-1)
diff_res = run_cmd('git diff --name-only')
if diff_res:
files_not_in_contrib = [f for f in diff_res if not f.startswith('contrib')]
files_contrib = [f for f in diff_res if f.startswith('contrib')]
if files_not_in_contrib:
print('')
print('Error: found files NOT formatted')
print(''.join(diff_res))
print(''.join(run_cmd('git diff')))
print(''.join(files_not_in_contrib))
exit(-1)
elif files_contrib:
print('')
print('Warn: found contrib changed')
print(''.join(files_contrib))
print('')
print(''.join(run_cmd('git status')))
else:
print("Format check passed")
else:
Expand Down

0 comments on commit 12a9cc0

Please sign in to comment.