diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index ab6ad976190..8e448d40c5a 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -271,6 +271,13 @@ struct ContextShared return; shutdown_called = true; + // The local index scheduler must be shutdown to stop all + // running tasks before shutting down `global_storage_pool`. + if (global_local_indexer_scheduler) + { + global_local_indexer_scheduler->shutdown(); + } + if (global_storage_pool) { // shutdown the gc task of global storage pool before diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 51ad372db24..2897b56de64 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -311,7 +311,7 @@ std::tuple executeQueryImpl( if (elem.read_rows != 0) { - LOG_INFO( + LOG_DEBUG( execute_query_logger, "Read {} rows, {} in {:.3f} sec., {} rows/sec., {}/sec.", elem.read_rows, @@ -421,7 +421,7 @@ void logQueryPipeline(const LoggerPtr & logger, const BlockInputStreamPtr & in) in->dumpTree(log_buffer); return log_buffer.toString(); }; - LOG_INFO(logger, pipeline_log_str()); + LOG_DEBUG(logger, pipeline_log_str()); } BlockIO executeQuery(const String & query, Context & context, bool internal, QueryProcessingStage::Enum stage) diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp index d513d146868..15ca84b8002 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp @@ -64,9 +64,9 @@ LocalIndexerScheduler::LocalIndexerScheduler(const Options & options) start(); } -LocalIndexerScheduler::~LocalIndexerScheduler() +void LocalIndexerScheduler::shutdown() { - LOG_INFO(logger, "LocalIndexerScheduler is destroying. Waiting scheduler and tasks to finish..."); + LOG_INFO(logger, "LocalIndexerScheduler is shutting down. Waiting scheduler and tasks to finish..."); // First quit the scheduler. Don't schedule more tasks. is_shutting_down = true; @@ -81,7 +81,15 @@ LocalIndexerScheduler::~LocalIndexerScheduler() // Then wait all running tasks to finish. pool.reset(); + LOG_INFO(logger, "LocalIndexerScheduler is shutdown."); +} +LocalIndexerScheduler::~LocalIndexerScheduler() +{ + if (!is_shutting_down) + { + shutdown(); + } LOG_INFO(logger, "LocalIndexerScheduler is destroyed"); } @@ -295,7 +303,10 @@ bool LocalIndexerScheduler::tryAddTaskToPool(std::unique_lock & lock } }; - RUNTIME_CHECK(pool); + if (is_shutting_down || !pool) + // shutting down, retry again + return false; + if (!pool->trySchedule(real_job)) // Concurrent task limit reached return false; diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h index 53349740918..11f70fbdd84 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h @@ -93,6 +93,12 @@ class LocalIndexerScheduler ~LocalIndexerScheduler(); + /** + * @brief Stop the scheduler and wait for running tasks to finish. + * Note that this method won't clear the task pushed. + */ + void shutdown(); + /** * @brief Start the scheduler. In some tests we need to start scheduler * after some tasks are pushed. @@ -101,7 +107,7 @@ class LocalIndexerScheduler /** * @brief Blocks until there is no tasks remaining in the queue and there is no running tasks. - * Should be only used in tests. + * **Should be only used in tests**. */ void waitForFinish(); @@ -114,6 +120,7 @@ class LocalIndexerScheduler /** * @brief Drop all tasks matching specified keyspace id and table id. + * Note that this method won't drop the running tasks. */ size_t dropTasks(KeyspaceID keyspace_id, TableID table_id); @@ -147,9 +154,6 @@ class LocalIndexerScheduler void moveBackReadyTasks(std::unique_lock & lock); private: - bool is_started = false; - std::thread scheduler_thread; - /// Try to add a task to the pool. Returns false if the pool is full /// (for example, reaches concurrent task limit or memory limit). /// When pool is full, we will not try to schedule any more tasks at this moment. @@ -160,6 +164,9 @@ class LocalIndexerScheduler /// heavy pressure. bool tryAddTaskToPool(std::unique_lock & lock, const InternalTaskPtr & task); + std::thread scheduler_thread; + bool is_started = false; + KeyspaceID last_schedule_keyspace_id = 0; std::map last_schedule_table_id_by_ks; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 61da22b81bd..5d5c281555c 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -417,24 +417,34 @@ SegmentPtr Segment::restoreSegment( // DMContext & context, PageIdU64 segment_id) { - Page page = context.storage_pool->metaReader()->read(segment_id); // not limit restore - - ReadBufferFromMemory buf(page.data.begin(), page.data.size()); Segment::SegmentMetaInfo segment_info; - readSegmentMetaInfo(buf, segment_info); + try + { + Page page = context.storage_pool->metaReader()->read(segment_id); // not limit restore - auto delta = DeltaValueSpace::restore(context, segment_info.range, segment_info.delta_id); - auto stable = StableValueSpace::restore(context, segment_info.stable_id); - auto segment = std::make_shared( - parent_log, - segment_info.epoch, - segment_info.range, - segment_id, - segment_info.next_segment_id, - delta, - stable); + ReadBufferFromMemory buf(page.data.begin(), page.data.size()); + readSegmentMetaInfo(buf, segment_info); - return segment; + auto delta = DeltaValueSpace::restore(context, segment_info.range, segment_info.delta_id); + auto stable = StableValueSpace::restore(context, segment_info.stable_id); + auto segment = std::make_shared( + parent_log, + segment_info.epoch, + segment_info.range, + segment_id, + segment_info.next_segment_id, + delta, + stable); + + return segment; + } + catch (DB::Exception & e) + { + e.addMessage(fmt::format("while restoreSegment, segment_id={}", segment_id)); + e.rethrow(); + } + RUNTIME_CHECK_MSG(false, "unreachable"); + return {}; } Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( // diff --git a/dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp b/dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp index bcd6bb91470..e9f2189290a 100644 --- a/dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp +++ b/dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp @@ -12,15 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include #include #include +#include +#include #include #include #include #include +#include #include #include #include @@ -29,6 +33,7 @@ #include #include +#include #include #include @@ -47,12 +52,15 @@ struct ControlOptions CHECK_ALL_DATA_CRC = 4, DISPLAY_WAL_ENTRIES = 5, DISPLAY_REGION_INFO = 6, + DISPLAY_BLOB_DATA = 7, }; std::vector paths; DisplayType mode = DisplayType::DISPLAY_SUMMARY_INFO; UInt64 page_id = UINT64_MAX; UInt32 blob_id = UINT32_MAX; + BlobFileOffset blob_offset = INVALID_BLOBFILE_OFFSET; + size_t blob_size = UINT64_MAX; UInt64 namespace_id = DB::TEST_NAMESPACE_ID; StorageType storage_type = StorageType::Unknown; // only useful for universal page storage UInt32 keyspace_id = NullspaceID; // only useful for universal page storage @@ -85,6 +93,7 @@ ControlOptions ControlOptions::parse(int argc, char ** argv) 4 is check every data is valid 5 is dump entries in WAL log files 6 is display all region info + 7 is display blob data (in hex) )") // ("show_entries", value()->default_value(true), @@ -106,8 +115,14 @@ ControlOptions ControlOptions::parse(int argc, char ** argv) value()->default_value(UINT64_MAX), "Query a single Page id, and print its version chain.") // ("blob_id,B", - value()->default_value(UINT32_MAX), - "Query a single Blob id, and print its data distribution.") // + value()->default_value(INVALID_BLOBFILE_ID), + "Specify the blob_id") // + ("blob_offset", + value()->default_value(INVALID_BLOBFILE_OFFSET), + "Specify the offset.") // + ("blob_size", + value()->default_value(0), + "Specify the size.") // // ("imitative,I", value()->default_value(true), @@ -140,7 +155,9 @@ ControlOptions ControlOptions::parse(int argc, char ** argv) opt.paths = options["paths"].as>(); auto mode_int = options["mode"].as(); opt.page_id = options["page_id"].as(); - opt.blob_id = options["blob_id"].as(); + opt.blob_id = options["blob_id"].as(); + opt.blob_offset = options["blob_offset"].as(); + opt.blob_size = options["blob_size"].as(); opt.show_entries = options["show_entries"].as(); opt.check_fields = options["check_fields"].as(); auto storage_type_int = options["storage_type"].as(); @@ -346,6 +363,12 @@ class PageStorageControlV3 } break; } + case ControlOptions::DisplayType::DISPLAY_BLOB_DATA: + { + String hex_data = getBlobData(blob_store, opts.blob_id, opts.blob_offset, opts.blob_size); + fmt::println("hex:{}", hex_data); + break; + } default: std::cout << "Invalid display mode." << std::endl; break; @@ -821,6 +844,32 @@ class PageStorageControlV3 return error_msg.toString(); } + static String getBlobData( + typename Trait::BlobStore & blob_store, + BlobFileId blob_id, + BlobFileOffset offset, + size_t size) + { + auto page_id = []() { + if constexpr (std::is_same_v) + return PageIdV3Internal(0, 0); + else + return UniversalPageId(""); + }(); + char * buffer = new char[size]; + blob_store.read(page_id, blob_id, offset, buffer, size, nullptr, false); + + using ChecksumClass = Digest::CRC64; + ChecksumClass digest; + digest.update(buffer, size); + auto checksum = digest.checksum(); + fmt::println("checksum: 0x{:X}", checksum); + + auto hex_str = Redact::keyToHexString(buffer, size); + delete[] buffer; + return hex_str; + } + private: ControlOptions options; };