From 12a9cc088df60c589c7e8c5fb69701d6bc19c33f Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 14 Sep 2022 16:32:21 +0800 Subject: [PATCH] fix pagestorage v2 restore (#5384) (#5391) * This is an automated cherry-pick of #5384 Signed-off-by: ti-chi-bot * Update PageStorage.h * fix static analysis * update format code Signed-off-by: ti-chi-bot Co-authored-by: lidezhu <47731263+lidezhu@users.noreply.github.com> Co-authored-by: lidezhu --- dbms/src/Storages/Page/V2/PageStorage.cpp | 40 ++++++++++--------- dbms/src/Storages/Page/V2/PageStorage.h | 19 ++++----- .../Page/V2/tests/gtest_page_storage.cpp | 32 +++++++++++++++ format-diff.py | 14 +++++-- 4 files changed, 74 insertions(+), 31 deletions(-) diff --git a/dbms/src/Storages/Page/V2/PageStorage.cpp b/dbms/src/Storages/Page/V2/PageStorage.cpp index 4b49e9c64a6..40c9bdb1899 100644 --- a/dbms/src/Storages/Page/V2/PageStorage.cpp +++ b/dbms/src/Storages/Page/V2/PageStorage.cpp @@ -198,7 +198,7 @@ void PageStorage::restore() /// Restore current version from both formal and legacy page files MetaMergingQueue merging_queue; - for (auto & page_file : page_files) + for (const auto & page_file : page_files) { if (!(page_file.getType() == PageFile::Type::Formal || page_file.getType() == PageFile::Type::Legacy || page_file.getType() == PageFile::Type::Checkpoint)) @@ -286,7 +286,7 @@ void PageStorage::restore() // Remove old checkpoints and archive obsolete PageFiles that have not been archived yet during gc for some reason. #ifdef PAGE_STORAGE_UTIL_DEBUGGGING LOG_FMT_TRACE(log, "{} These file would be archive:", storage_name); - for (auto & pf : page_files_to_remove) + for (const auto & pf : page_files_to_remove) LOG_FMT_TRACE(log, "{} {}", storage_name, pf.toString()); #else // when restore `PageStorage`, the `PageFile` in `page_files_to_remove` is not counted in the total size, @@ -297,6 +297,7 @@ void PageStorage::restore() } // Fill write_files + PageFileIdAndLevel max_page_file_id_lvl{0, 0}; { const size_t num_delta_paths = delegator->numPaths(); std::vector next_write_fill_idx(num_delta_paths); @@ -304,6 +305,7 @@ void PageStorage::restore() // Only insert location of PageFile when it storing delta data for (const auto & page_file : page_files) { + max_page_file_id_lvl = std::max(max_page_file_id_lvl, page_file.fileIdLevel()); // Checkpoint file is always stored on `delegator`'s default path, so no need to insert it's location here size_t idx_in_delta_paths = delegator->addPageFileUsedSize( page_file.fileIdLevel(), @@ -327,7 +329,7 @@ void PageStorage::restore() std::vector store_paths = delegator->listPaths(); for (size_t i = 0; i < write_files.size(); ++i) { - auto writer = checkAndRenewWriter(write_files[i], /*parent_path_hint=*/store_paths[i % store_paths.size()]); + auto writer = checkAndRenewWriter(write_files[i], max_page_file_id_lvl, /*parent_path_hint=*/store_paths[i % store_paths.size()]); idle_writers.emplace_back(std::move(writer)); } #endif @@ -346,7 +348,7 @@ PageId PageStorage::getMaxId() return versioned_page_entries.getSnapshot()->version()->maxId(); } -PageId PageStorage::getNormalPageId(PageId page_id, SnapshotPtr snapshot) +PageId PageStorage::getNormalPageId(PageId page_id, SnapshotPtr snapshot) // NOLINT(google-default-arguments) { if (!snapshot) { @@ -357,7 +359,7 @@ PageId PageStorage::getNormalPageId(PageId page_id, SnapshotPtr snapshot) return is_ref_id ? normal_page_id : page_id; } -DB::PageEntry PageStorage::getEntry(PageId page_id, SnapshotPtr snapshot) +DB::PageEntry PageStorage::getEntry(PageId page_id, SnapshotPtr snapshot) // NOLINT(google-default-arguments) { if (!snapshot) { @@ -385,6 +387,7 @@ DB::PageEntry PageStorage::getEntry(PageId page_id, SnapshotPtr snapshot) // The of the new `page_file` is of all `write_files` PageStorage::WriterPtr PageStorage::checkAndRenewWriter( // WritingPageFile & writing_file, + PageFileIdAndLevel max_page_file_id_lvl_hint, const String & parent_path_hint, PageStorage::WriterPtr && old_writer, const String & logging_msg) @@ -425,8 +428,7 @@ PageStorage::WriterPtr PageStorage::checkAndRenewWriter( // // Check whether caller has defined a hint path pf_parent_path = parent_path_hint; } - - PageFileIdAndLevel max_writing_id_lvl{0, 0}; + PageFileIdAndLevel max_writing_id_lvl{max_page_file_id_lvl_hint}; for (const auto & wf : write_files) max_writing_id_lvl = std::max(max_writing_id_lvl, wf.file.fileIdLevel()); delegator->addPageFileUsedSize( // @@ -439,7 +441,6 @@ PageStorage::WriterPtr PageStorage::checkAndRenewWriter( // writing_file.file = PageFile::newPageFile(max_writing_id_lvl.first + 1, 0, pf_parent_path, file_provider, PageFile::Type::Formal, page_file_log); writing_file.persisted.meta_offset = 0; - write_file_writer = writing_file.file.createWriter(config.sync_on_write, true); } return write_file_writer; @@ -469,7 +470,7 @@ PageStorage::ReaderPtr PageStorage::getReader(const PageFileIdAndLevel & file_id return pages_reader; } -void PageStorage::write(DB::WriteBatch && wb, const WriteLimiterPtr & write_limiter) +void PageStorage::write(DB::WriteBatch && wb, const WriteLimiterPtr & write_limiter) // NOLINT(google-default-arguments) { if (unlikely(wb.empty())) return; @@ -542,7 +543,7 @@ void PageStorage::write(DB::WriteBatch && wb, const WriteLimiterPtr & write_limi // Check whether we need to roll to new PageFile and its writer const auto logging_msg = " PageFile_" + DB::toString(writing_file.file.getFileId()) + "_0 is full,"; - file_to_write = checkAndRenewWriter(writing_file, "", std::move(file_to_write), logging_msg); + file_to_write = checkAndRenewWriter(writing_file, {0, 0}, "", std::move(file_to_write), logging_msg); idle_writers.emplace_back(std::move(file_to_write)); @@ -577,7 +578,7 @@ std::tuple PageStorage::getSnapshotsStat() const return versioned_page_entries.getSnapshotsStat(); } -DB::Page PageStorage::read(PageId page_id, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) +DB::Page PageStorage::read(PageId page_id, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) // NOLINT(google-default-arguments) { if (!snapshot) { @@ -593,7 +594,7 @@ DB::Page PageStorage::read(PageId page_id, const ReadLimiterPtr & read_limiter, return file_reader->read(to_read, read_limiter)[page_id]; } -PageMap PageStorage::read(const std::vector & page_ids, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) +PageMap PageStorage::read(const std::vector & page_ids, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) // NOLINT(google-default-arguments) { if (!snapshot) { @@ -636,7 +637,7 @@ PageMap PageStorage::read(const std::vector & page_ids, const ReadLimite return page_map; } -void PageStorage::read(const std::vector & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) +void PageStorage::read(const std::vector & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) // NOLINT(google-default-arguments) { if (!snapshot) { @@ -676,7 +677,7 @@ void PageStorage::read(const std::vector & page_ids, const PageHandler & } } -PageMap PageStorage::read(const std::vector & page_fields, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) +PageMap PageStorage::read(const std::vector & page_fields, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) // NOLINT(google-default-arguments) { if (!snapshot) snapshot = this->getSnapshot(); @@ -718,7 +719,7 @@ PageMap PageStorage::read(const std::vector & page_fields, const return page_map; } -void PageStorage::traverse(const std::function & acceptor, SnapshotPtr snapshot) +void PageStorage::traverse(const std::function & acceptor, SnapshotPtr snapshot) // NOLINT(google-default-arguments) { if (!snapshot) { @@ -902,7 +903,7 @@ WriteBatch::SequenceID PageStorage::WritingFilesSnapshot::minPersistedSequence() return seq; } -bool PageStorage::gc(bool not_skip, const WriteLimiterPtr & write_limiter, const ReadLimiterPtr & read_limiter) +bool PageStorage::gc(bool not_skip, const WriteLimiterPtr & write_limiter, const ReadLimiterPtr & read_limiter) // NOLINT(google-default-arguments) { // If another thread is running gc, just return; bool v = false; @@ -1079,8 +1080,9 @@ bool PageStorage::gc(bool not_skip, const WriteLimiterPtr & write_limiter, const // Legacy and checkpoint files will be removed from `page_files` after `tryCompact`. LegacyCompactor compactor(*this, write_limiter, read_limiter); PageFileSet page_files_to_archive; + auto files_to_compact = std::move(page_files); std::tie(page_files, page_files_to_archive, gc_context.num_bytes_written_in_compact_legacy) - = compactor.tryCompact(std::move(page_files), writing_files_snapshot); + = compactor.tryCompact(std::move(files_to_compact), writing_files_snapshot); archivePageFiles(page_files_to_archive, true); gc_context.num_files_archive_in_compact_legacy = page_files_to_archive.size(); } @@ -1149,7 +1151,7 @@ void PageStorage::archivePageFiles(const PageFileSet & page_files, bool remove_s if (!archive_dir.exists()) archive_dir.createDirectory(); - for (auto & page_file : page_files) + for (const auto & page_file : page_files) { Poco::Path path(page_file.folderPath()); auto dest = archive_path.toString() + "/" + path.getFileName(); @@ -1218,7 +1220,7 @@ PageStorage::gcRemoveObsoleteData(PageFileSet & page_files, { size_t num_data_removed = 0; size_t num_bytes_removed = 0; - for (auto & page_file : page_files) + for (const auto & page_file : page_files) { const auto page_id_and_lvl = page_file.fileIdLevel(); if (page_id_and_lvl >= writing_file_id_level) diff --git a/dbms/src/Storages/Page/V2/PageStorage.h b/dbms/src/Storages/Page/V2/PageStorage.h index 1a09cd269b3..ee1e0b367a9 100644 --- a/dbms/src/Storages/Page/V2/PageStorage.h +++ b/dbms/src/Storages/Page/V2/PageStorage.h @@ -93,23 +93,23 @@ class PageStorage : public DB::PageStorage std::tuple getSnapshotsStat() const override; - void write(DB::WriteBatch && write_batch, const WriteLimiterPtr & write_limiter = nullptr) override; + void write(DB::WriteBatch && wb, const WriteLimiterPtr & write_limiter = nullptr) override; // NOLINT(google-default-arguments) - DB::PageEntry getEntry(PageId page_id, SnapshotPtr snapshot = {}) override; + DB::PageEntry getEntry(PageId page_id, SnapshotPtr snapshot = {}) override; // NOLINT(google-default-arguments) - DB::Page read(PageId page_id, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}) override; + DB::Page read(PageId page_id, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}) override; // NOLINT(google-default-arguments) - PageMap read(const std::vector & page_ids, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}) override; + PageMap read(const std::vector & page_ids, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}) override; // NOLINT(google-default-arguments) - void read(const std::vector & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}) override; + void read(const std::vector & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}) override; // NOLINT(google-default-arguments) - PageMap read(const std::vector & page_fields, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}) override; + PageMap read(const std::vector & page_fields, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}) override; // NOLINT(google-default-arguments) - void traverse(const std::function & acceptor, SnapshotPtr snapshot = {}) override; + void traverse(const std::function & acceptor, SnapshotPtr snapshot = {}) override; // NOLINT(google-default-arguments) void traversePageEntries(const std::function & acceptor, SnapshotPtr snapshot) override; - bool gc(bool not_skip = false, const WriteLimiterPtr & write_limiter = nullptr, const ReadLimiterPtr & read_limiter = nullptr) override; + bool gc(bool not_skip = false, const WriteLimiterPtr & write_limiter = nullptr, const ReadLimiterPtr & read_limiter = nullptr) override; // NOLINT(google-default-arguments) void registerExternalPagesCallbacks(ExternalPagesScanner scanner, ExternalPagesRemover remover) override; @@ -241,7 +241,8 @@ class PageStorage : public DB::PageStorage private: WriterPtr checkAndRenewWriter( - WritingPageFile & page_file, + WritingPageFile & writing_file, + PageFileIdAndLevel max_page_file_id_lvl_hint, const String & parent_path_hint, WriterPtr && old_writer = nullptr, const String & logging_msg = ""); diff --git a/dbms/src/Storages/Page/V2/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V2/tests/gtest_page_storage.cpp index 5c710c566cf..20926347089 100644 --- a/dbms/src/Storages/Page/V2/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V2/tests/gtest_page_storage.cpp @@ -427,6 +427,38 @@ try } CATCH +TEST_F(PageStorage_test, RenewWriter) +try +{ + constexpr size_t buf_sz = 100; + char c_buff[buf_sz]; + + { + WriteBatch wb; + memset(c_buff, 0xf, buf_sz); + auto buf = std::make_shared(c_buff, sizeof(c_buff)); + wb.putPage(1, 0, buf, buf_sz); + storage->write(std::move(wb)); + } + + { + PageStorage::ListPageFilesOption opt; + auto page_files = storage->listAllPageFiles(file_provider, storage->delegator, storage->log, opt); + ASSERT_EQ(page_files.size(), 1UL); + } + + PageStorage::Config config; + config.file_roll_size = 10; // make it easy to renew a new page file for write + storage = reopenWithConfig(config); + + { + PageStorage::ListPageFilesOption opt; + auto page_files = storage->listAllPageFiles(file_provider, storage->delegator, storage->log, opt); + ASSERT_EQ(page_files.size(), 2UL); + } +} +CATCH + /// Check if we can correctly do read / write after restore from disk. TEST_F(PageStorage_test, WriteReadRestore) try diff --git a/format-diff.py b/format-diff.py index 1571d24744d..0eb788ecad7 100755 --- a/format-diff.py +++ b/format-diff.py @@ -66,11 +66,19 @@ def main(): if subprocess.Popen(cmd, shell=True, cwd=tics_repo_path).wait(): exit(-1) diff_res = run_cmd('git diff --name-only') - if diff_res: + files_not_in_contrib = [f for f in diff_res if not f.startswith('contrib')] + files_contrib = [f for f in diff_res if f.startswith('contrib')] + if files_not_in_contrib: + print('') print('Error: found files NOT formatted') - print(''.join(diff_res)) - print(''.join(run_cmd('git diff'))) + print(''.join(files_not_in_contrib)) exit(-1) + elif files_contrib: + print('') + print('Warn: found contrib changed') + print(''.join(files_contrib)) + print('') + print(''.join(run_cmd('git status'))) else: print("Format check passed") else: