From 38b37e079bf499ade14fff2eccd9dfec4963c9cc Mon Sep 17 00:00:00 2001 From: YangKeao Date: Sun, 3 Jul 2022 22:43:00 +0800 Subject: [PATCH 1/7] upgrade prometheus-cpp to v1.0.1 (#5279) ref pingcap/tiflash#2103, close pingcap/tiflash#5278 --- contrib/prometheus-cpp | 2 +- contrib/prometheus-cpp-cmake/pull/CMakeLists.txt | 9 +++++++++ contrib/prometheus-cpp-cmake/push/CMakeLists.txt | 2 ++ 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/contrib/prometheus-cpp b/contrib/prometheus-cpp index ca1f3463e74..76470b3ec02 160000 --- a/contrib/prometheus-cpp +++ b/contrib/prometheus-cpp @@ -1 +1 @@ -Subproject commit ca1f3463e74d957d1cccddd4a1a29e3e5d34bd83 +Subproject commit 76470b3ec024c8214e1f4253fb1f4c0b28d3df94 diff --git a/contrib/prometheus-cpp-cmake/pull/CMakeLists.txt b/contrib/prometheus-cpp-cmake/pull/CMakeLists.txt index daebd1b7c5a..993618e16ac 100644 --- a/contrib/prometheus-cpp-cmake/pull/CMakeLists.txt +++ b/contrib/prometheus-cpp-cmake/pull/CMakeLists.txt @@ -12,9 +12,18 @@ if(ENABLE_COMPRESSION) endif() add_library(pull + ${PROMETHEUS_SRC_DIR}/pull/src/basic_auth.cc + ${PROMETHEUS_SRC_DIR}/pull/src/basic_auth.h + ${PROMETHEUS_SRC_DIR}/pull/src/endpoint.cc + ${PROMETHEUS_SRC_DIR}/pull/src/endpoint.h ${PROMETHEUS_SRC_DIR}/pull/src/exposer.cc ${PROMETHEUS_SRC_DIR}/pull/src/handler.cc ${PROMETHEUS_SRC_DIR}/pull/src/handler.h + ${PROMETHEUS_SRC_DIR}/pull/src/metrics_collector.cc + ${PROMETHEUS_SRC_DIR}/pull/src/metrics_collector.h + + ${PROMETHEUS_SRC_DIR}/pull/src/detail/base64.h + $<$:$> ) diff --git a/contrib/prometheus-cpp-cmake/push/CMakeLists.txt b/contrib/prometheus-cpp-cmake/push/CMakeLists.txt index 71dad9fb812..b776d17bdaf 100644 --- a/contrib/prometheus-cpp-cmake/push/CMakeLists.txt +++ b/contrib/prometheus-cpp-cmake/push/CMakeLists.txt @@ -3,6 +3,8 @@ if(NOT CURL_FOUND) endif() add_library(push + ${PROMETHEUS_SRC_DIR}/push/src/curl_wrapper.cc + ${PROMETHEUS_SRC_DIR}/push/src/curl_wrapper.h ${PROMETHEUS_SRC_DIR}/push/src/gateway.cc ) From 4ce641b2706d3ffcb41d6abfb019292e1b8ee550 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Sun, 3 Jul 2022 23:08:59 +0800 Subject: [PATCH 2/7] Fix README type error (#5273) ref pingcap/tiflash#5178 --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index aa64e39d5ba..ab996b6f3d6 100644 --- a/README.md +++ b/README.md @@ -253,7 +253,7 @@ ninja tiflash tiup playground nightly --tiflash.binpath $BUILD/dbms/src/Server/tiflash ``` 3. Check $WORKSPACE/tests/_env.sh to make the port and build dir right. -4. Run your integration tests using commands like "./run-test.sh fullstack-test2/ddl" under $WORKSPACE dir +4. Run your integration tests using commands like "./run-test.sh fullstack-test2/ddl" under $WORKSPACE/tests dir ## Run MicroBenchmark Tests @@ -261,7 +261,7 @@ To run micro benchmark tests, you need to build with -DCMAKE_BUILD_TYPE=RELEASE ```shell cd $BUILD -cmake $WORKSPACE/tiflash -GNinja -DCMAKE_BUILD_TYPE=DEBUG -DENABLE_TESTS=ON +cmake $WORKSPACE/tiflash -GNinja -DCMAKE_BUILD_TYPE=RELEASE -DENABLE_TESTS=ON ninja bench_dbms ``` From 19dfdd792215c1c593983d66ee0f4c5865fc394c Mon Sep 17 00:00:00 2001 From: Schrodinger ZHU Yifan Date: Mon, 4 Jul 2022 11:15:00 +0800 Subject: [PATCH 3/7] fix(cmake): make sure libc++ is utilized by tiflash-proxy (#5281) close pingcap/tiflash#5282 --- contrib/tiflash-proxy-cmake/CMakeLists.txt | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/contrib/tiflash-proxy-cmake/CMakeLists.txt b/contrib/tiflash-proxy-cmake/CMakeLists.txt index e243ecba37c..e3e2df379a1 100644 --- a/contrib/tiflash-proxy-cmake/CMakeLists.txt +++ b/contrib/tiflash-proxy-cmake/CMakeLists.txt @@ -4,7 +4,11 @@ file(GLOB_RECURSE _TIFLASH_PROXY_SRCS "${_TIFLASH_PROXY_SOURCE_DIR}/*.rs") list(FILTER _TIFLASH_PROXY_SRCS EXCLUDE REGEX ${_TIFLASH_PROXY_SOURCE_DIR}/target/.*) # use `CFLAGS=-w CXXFLAGS=-w` to inhibit warning messages. -set(TIFLASH_RUST_ENV CMAKE=${CMAKE_COMMAND} CFLAGS=-w CXXFLAGS=-w) +if (TIFLASH_LLVM_TOOLCHAIN) + set(TIFLASH_RUST_ENV CMAKE=${CMAKE_COMMAND} "CFLAGS=-w -fuse-ld=lld" "CXXFLAGS=-w -fuse-ld=lld -stdlib=libc++") +else() + set(TIFLASH_RUST_ENV CMAKE=${CMAKE_COMMAND} CFLAGS=-w CXXFLAGS=-w) +endif() if(TIFLASH_LLVM_TOOLCHAIN AND USE_LIBCXX) set(TIFLASH_RUST_LINKER ${CMAKE_CURRENT_BINARY_DIR}/tiflash-linker) From 09402e36aefb84f82bb10a12a3c5e37d83d01733 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 4 Jul 2022 11:47:00 +0800 Subject: [PATCH 4/7] fix the wrong order of execution summary for list based executors (#5242) close pingcap/tiflash#5241 --- dbms/src/Flash/Coprocessor/DAGContext.h | 6 +++- dbms/src/Flash/Coprocessor/DAGQuerySource.cpp | 23 +++++++++++++ .../Flash/Coprocessor/DAGResponseWriter.cpp | 33 ++++++++++++++----- 3 files changed, 53 insertions(+), 9 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 8b94d4637a8..a50a4d4007b 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -349,6 +349,10 @@ class DAGContext std::vector output_field_types; std::vector output_offsets; + /// Hold the order of list based executors. + /// It is used to ensure that the order of Execution summary of list based executors is the same as the order of list based executors. + std::vector list_based_executors_order; + private: void initExecutorIdToJoinIdMap(); void initOutputInfo(); @@ -356,7 +360,7 @@ class DAGContext private: /// Hold io for correcting the destruction order. BlockIO io; - /// profile_streams_map is a map that maps from executor_id to profile BlockInputStreams + /// profile_streams_map is a map that maps from executor_id to profile BlockInputStreams. std::unordered_map profile_streams_map; /// executor_id_to_join_id_map is a map that maps executor id to all the join executor id of itself and all its children. std::unordered_map> executor_id_to_join_id_map; diff --git a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp index 882699e1599..d68a7b17aaa 100644 --- a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp @@ -20,6 +20,26 @@ namespace DB { +namespace +{ +void fillOrderForListBasedExecutors(DAGContext & dag_context, const DAGQueryBlock & query_block) +{ + assert(query_block.source); + auto & list_based_executors_order = dag_context.list_based_executors_order; + list_based_executors_order.push_back(query_block.source_name); + if (query_block.selection) + list_based_executors_order.push_back(query_block.selection_name); + if (query_block.aggregation) + list_based_executors_order.push_back(query_block.aggregation_name); + if (query_block.having) + list_based_executors_order.push_back(query_block.having_name); + if (query_block.limit_or_topn) + list_based_executors_order.push_back(query_block.limit_or_topn_name); + if (query_block.exchange_sender) + dag_context.list_based_executors_order.push_back(query_block.exchange_sender_name); +} +} // namespace + DAGQuerySource::DAGQuerySource(Context & context_) : context(context_) { @@ -32,6 +52,9 @@ DAGQuerySource::DAGQuerySource(Context & context_) else { root_query_block = std::make_shared(1, dag_request.executors()); + auto & dag_context = getDAGContext(); + if (!dag_context.return_executor_id) + fillOrderForListBasedExecutors(dag_context, *root_query_block); } } diff --git a/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp index 53bebc91da8..33f6d99f9d8 100644 --- a/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp @@ -89,12 +89,10 @@ void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response, boo } } - /// add execution_summary for local executor - for (auto & p : dag_context.getProfileStreamsMap()) - { + auto fill_execution_summary = [&](const String & executor_id, const BlockInputStreams & streams) { ExecutionSummary current; /// part 1: local execution info - for (auto & stream_ptr : p.second) + for (const auto & stream_ptr : streams) { if (auto * p_stream = dynamic_cast(stream_ptr.get())) { @@ -105,16 +103,16 @@ void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response, boo current.concurrency++; } /// part 2: remote execution info - if (merged_remote_execution_summaries.find(p.first) != merged_remote_execution_summaries.end()) + if (merged_remote_execution_summaries.find(executor_id) != merged_remote_execution_summaries.end()) { - for (auto & remote : merged_remote_execution_summaries[p.first]) + for (auto & remote : merged_remote_execution_summaries[executor_id]) current.merge(remote, false); } /// part 3: for join need to add the build time /// In TiFlash, a hash join's build side is finished before probe side starts, /// so the join probe side's running time does not include hash table's build time, /// when construct ExecSummaries, we need add the build cost to probe executor - auto all_join_id_it = dag_context.getExecutorIdToJoinIdMap().find(p.first); + auto all_join_id_it = dag_context.getExecutorIdToJoinIdMap().find(executor_id); if (all_join_id_it != dag_context.getExecutorIdToJoinIdMap().end()) { for (const auto & join_executor_id : all_join_id_it->second) @@ -138,8 +136,27 @@ void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response, boo } current.time_processed_ns += dag_context.compile_time_ns; - fillTiExecutionSummary(response.add_execution_summaries(), current, p.first, delta_mode); + fillTiExecutionSummary(response.add_execution_summaries(), current, executor_id, delta_mode); + }; + + /// add execution_summary for local executor + if (dag_context.return_executor_id) + { + for (auto & p : dag_context.getProfileStreamsMap()) + fill_execution_summary(p.first, p.second); + } + else + { + const auto & profile_streams_map = dag_context.getProfileStreamsMap(); + assert(profile_streams_map.size() == dag_context.list_based_executors_order.size()); + for (const auto & executor_id : dag_context.list_based_executors_order) + { + auto it = profile_streams_map.find(executor_id); + assert(it != profile_streams_map.end()); + fill_execution_summary(executor_id, it->second); + } } + for (auto & p : merged_remote_execution_summaries) { if (local_executors.find(p.first) == local_executors.end()) From a89222abd6ad1934c54838531532a1253bf4f66c Mon Sep 17 00:00:00 2001 From: jiaqizho Date: Mon, 4 Jul 2022 12:27:00 +0800 Subject: [PATCH 5/7] Schema: allow loading empty schema diff when the version grows up. (#5245) close pingcap/tiflash#5244 --- dbms/src/Debug/MockSchemaGetter.h | 13 ++- dbms/src/Debug/MockTiDB.cpp | 7 +- dbms/src/Debug/MockTiDB.h | 4 +- .../Storages/Transaction/ReadIndexWorker.cpp | 2 +- dbms/src/TiDB/Schema/SchemaGetter.cpp | 13 ++- dbms/src/TiDB/Schema/SchemaGetter.h | 6 +- dbms/src/TiDB/Schema/TiDBSchemaSyncer.h | 79 ++++++++++++++----- 7 files changed, 96 insertions(+), 28 deletions(-) diff --git a/dbms/src/Debug/MockSchemaGetter.h b/dbms/src/Debug/MockSchemaGetter.h index f02699866ce..11c5d97f036 100644 --- a/dbms/src/Debug/MockSchemaGetter.h +++ b/dbms/src/Debug/MockSchemaGetter.h @@ -17,16 +17,25 @@ #include #include +#include + namespace DB { - struct MockSchemaGetter { TiDB::DBInfoPtr getDatabase(DatabaseID db_id) { return MockTiDB::instance().getDBInfoByID(db_id); } Int64 getVersion() { return MockTiDB::instance().getVersion(); } - SchemaDiff getSchemaDiff(Int64 version) { return MockTiDB::instance().getSchemaDiff(version); } + std::optional getSchemaDiff(Int64 version) + { + return MockTiDB::instance().getSchemaDiff(version); + } + + bool checkSchemaDiffExists(Int64 version) + { + return MockTiDB::instance().checkSchemaDiffExists(version); + } TiDB::TableInfoPtr getTableInfo(DatabaseID, TableID table_id) { return MockTiDB::instance().getTableInfoByID(table_id); } diff --git a/dbms/src/Debug/MockTiDB.cpp b/dbms/src/Debug/MockTiDB.cpp index 7b3bdb0948f..99d9625461b 100644 --- a/dbms/src/Debug/MockTiDB.cpp +++ b/dbms/src/Debug/MockTiDB.cpp @@ -668,9 +668,14 @@ std::pair MockTiDB::getDBIDByName(const String & database_name return std::make_pair(false, -1); } -SchemaDiff MockTiDB::getSchemaDiff(Int64 version_) +std::optional MockTiDB::getSchemaDiff(Int64 version_) { return version_diff[version_]; } +bool MockTiDB::checkSchemaDiffExists(Int64 version) +{ + return version_diff.find(version) != version_diff.end(); +} + } // namespace DB diff --git a/dbms/src/Debug/MockTiDB.h b/dbms/src/Debug/MockTiDB.h index 36d2af90859..261e547b13a 100644 --- a/dbms/src/Debug/MockTiDB.h +++ b/dbms/src/Debug/MockTiDB.h @@ -127,7 +127,9 @@ class MockTiDB : public ext::Singleton std::pair getDBIDByName(const String & database_name); - SchemaDiff getSchemaDiff(Int64 version); + bool checkSchemaDiffExists(Int64 version); + + std::optional getSchemaDiff(Int64 version); std::unordered_map getDatabases() { return databases; } diff --git a/dbms/src/Storages/Transaction/ReadIndexWorker.cpp b/dbms/src/Storages/Transaction/ReadIndexWorker.cpp index 3223c815989..7de79dd5c6d 100644 --- a/dbms/src/Storages/Transaction/ReadIndexWorker.cpp +++ b/dbms/src/Storages/Transaction/ReadIndexWorker.cpp @@ -880,7 +880,7 @@ BatchReadIndexRes ReadIndexWorkerManager::batchReadIndex( } } { // if meet timeout, which means part of regions can not get response from leader, try to poll rest tasks - TEST_LOG_FMT("rest {}, poll rest tasks onece", tasks.size()); + TEST_LOG_FMT("rest {}, poll rest tasks once", tasks.size()); while (!tasks.empty()) { diff --git a/dbms/src/TiDB/Schema/SchemaGetter.cpp b/dbms/src/TiDB/Schema/SchemaGetter.cpp index 7f52f9301b1..6e333d6ba87 100644 --- a/dbms/src/TiDB/Schema/SchemaGetter.cpp +++ b/dbms/src/TiDB/Schema/SchemaGetter.cpp @@ -19,7 +19,6 @@ namespace DB { - namespace ErrorCodes { extern const int SCHEMA_SYNC_ERROR; @@ -188,18 +187,26 @@ Int64 SchemaGetter::getVersion() return std::stoll(ver); } +bool SchemaGetter::checkSchemaDiffExists(Int64 ver) +{ + String key = getSchemaDiffKey(ver); + String data = TxnStructure::get(snap, key); + return !data.empty(); +} + String SchemaGetter::getSchemaDiffKey(Int64 ver) { return std::string(schemaDiffPrefix) + ":" + std::to_string(ver); } -SchemaDiff SchemaGetter::getSchemaDiff(Int64 ver) +std::optional SchemaGetter::getSchemaDiff(Int64 ver) { String key = getSchemaDiffKey(ver); String data = TxnStructure::get(snap, key); if (data.empty()) { - throw TiFlashException("cannot find schema diff for version: " + std::to_string(ver), Errors::Table::SyncError); + LOG_FMT_WARNING(log, "The schema diff for version {}, key {} is empty.", ver, key); + return std::nullopt; } SchemaDiff diff; diff.deserialize(data); diff --git a/dbms/src/TiDB/Schema/SchemaGetter.h b/dbms/src/TiDB/Schema/SchemaGetter.h index 02d2f7a7c88..fe0ecd59af0 100644 --- a/dbms/src/TiDB/Schema/SchemaGetter.h +++ b/dbms/src/TiDB/Schema/SchemaGetter.h @@ -26,6 +26,8 @@ #include +#include + namespace DB { // The enum results are completely the same as the DDL Action listed in the "parser/model/ddl.go" of TiDB codebase, which must be keeping in sync. @@ -138,7 +140,9 @@ struct SchemaGetter Int64 getVersion(); - SchemaDiff getSchemaDiff(Int64 ver); + bool checkSchemaDiffExists(Int64 ver); + + std::optional getSchemaDiff(Int64 ver); static String getSchemaDiffKey(Int64 ver); diff --git a/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h b/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h index 4fdba195acb..a23aeab139f 100644 --- a/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h +++ b/dbms/src/TiDB/Schema/TiDBSchemaSyncer.h @@ -106,21 +106,31 @@ struct TiDBSchemaSyncer : public SchemaSyncer Stopwatch watch; SCOPE_EXIT({ GET_METRIC(tiflash_schema_apply_duration_seconds).Observe(watch.elapsedSeconds()); }); - LOG_FMT_INFO(log, "start to sync schemas. current version is: {} and try to sync schema version to: {}", cur_version, version); + LOG_FMT_INFO(log, "Start to sync schemas. current version is: {} and try to sync schema version to: {}", cur_version, version); // Show whether the schema mutex is held for a long time or not. GET_METRIC(tiflash_schema_applying).Set(1.0); SCOPE_EXIT({ GET_METRIC(tiflash_schema_applying).Set(0.0); }); GET_METRIC(tiflash_schema_apply_count, type_diff).Increment(); - if (!tryLoadSchemaDiffs(getter, version, context)) + // After the feature concurrent DDL, TiDB does `update schema version` before `set schema diff`, and they are done in separate transactions. + // So TiFlash may see a schema version X but no schema diff X, meaning that the transaction of schema diff X has not been committed or has + // been aborted. + // However, TiDB makes sure that if we get a schema version X, then the schema diff X-1 must exist. Otherwise the transaction of schema diff + // X-1 is aborted and we can safely ignore it. + // Since TiDB can not make sure the schema diff of the latest schema version X is not empty, under this situation we should set the `cur_version` + // to X-1 and try to fetch the schema diff X next time. + Int64 version_after_load_diff = 0; + if (version_after_load_diff = tryLoadSchemaDiffs(getter, version, context); version_after_load_diff == -1) { GET_METRIC(tiflash_schema_apply_count, type_full).Increment(); loadAllSchema(getter, version, context); + // After loadAllSchema, we need update `version_after_load_diff` by last diff value exist or not + version_after_load_diff = getter.checkSchemaDiffExists(version) ? version : version - 1; } - cur_version = version; + cur_version = version_after_load_diff; GET_METRIC(tiflash_schema_version).Set(cur_version); - LOG_FMT_INFO(log, "end sync schema, version has been updated to {}", cur_version); + LOG_FMT_INFO(log, "End sync schema, version has been updated to {}{}", cur_version, cur_version == version ? "" : "(latest diff is empty)"); return true; } @@ -144,30 +154,60 @@ struct TiDBSchemaSyncer : public SchemaSyncer return it->second; } - bool tryLoadSchemaDiffs(Getter & getter, Int64 version, Context & context) + // Return Values + // - if latest schema diff is not empty, return the (latest_version) + // - if latest schema diff is empty, return the (latest_version - 1) + // - if error happend, return (-1) + Int64 tryLoadSchemaDiffs(Getter & getter, Int64 latest_version, Context & context) { - if (isTooOldSchema(cur_version, version)) + if (isTooOldSchema(cur_version, latest_version)) { - return false; + return -1; } - LOG_FMT_DEBUG(log, "try load schema diffs."); + LOG_FMT_DEBUG(log, "Try load schema diffs."); - SchemaBuilder builder(getter, context, databases, version); + SchemaBuilder builder(getter, context, databases, latest_version); Int64 used_version = cur_version; - std::vector diffs; - while (used_version < version) + // First get all schema diff from `cur_version` to `latest_version`. Only apply the schema diff(s) if we fetch all + // schema diff without any exception. + std::vector> diffs; + while (used_version < latest_version) { used_version++; diffs.push_back(getter.getSchemaDiff(used_version)); } - LOG_FMT_DEBUG(log, "end load schema diffs with total {} entries.", diffs.size()); + LOG_FMT_DEBUG(log, "End load schema diffs with total {} entries.", diffs.size()); + try { - for (const auto & diff : diffs) + for (size_t diff_index = 0; diff_index < diffs.size(); ++diff_index) { - builder.applyDiff(diff); + const auto & schema_diff = diffs[diff_index]; + + if (!schema_diff) + { + // If `schema diff` from `latest_version` got empty `schema diff` + // Then we won't apply to `latest_version`, but we will apply to `latest_version - 1` + // If `schema diff` from [`cur_version`, `latest_version - 1`] got empty `schema diff` + // Then we should just skip it. + // + // example: + // - `cur_version` is 1, `latest_version` is 10 + // - The schema diff of schema version [2,4,6] is empty, Then we just skip it. + // - The schema diff of schema version 10 is empty, Then we should just apply version into 9 + if (diff_index != diffs.size() - 1) + { + LOG_FMT_WARNING(log, "Skip the schema diff from version {}. ", cur_version + diff_index + 1); + continue; + } + + // if diff_index == diffs.size() - 1, return used_version - 1; + return used_version - 1; + } + + builder.applyDiff(*schema_diff); } } catch (TiFlashException & e) @@ -177,7 +217,7 @@ struct TiDBSchemaSyncer : public SchemaSyncer GET_METRIC(tiflash_schema_apply_count, type_failed).Increment(); } LOG_FMT_WARNING(log, "apply diff meets exception : {} \n stack is {}", e.displayText(), e.getStackTrace().toString()); - return false; + return -1; } catch (Exception & e) { @@ -187,21 +227,22 @@ struct TiDBSchemaSyncer : public SchemaSyncer } GET_METRIC(tiflash_schema_apply_count, type_failed).Increment(); LOG_FMT_WARNING(log, "apply diff meets exception : {} \n stack is {}", e.displayText(), e.getStackTrace().toString()); - return false; + return -1; } catch (Poco::Exception & e) { GET_METRIC(tiflash_schema_apply_count, type_failed).Increment(); LOG_FMT_WARNING(log, "apply diff meets exception : {}", e.displayText()); - return false; + return -1; } catch (std::exception & e) { GET_METRIC(tiflash_schema_apply_count, type_failed).Increment(); LOG_FMT_WARNING(log, "apply diff meets exception : {}", e.what()); - return false; + return -1; } - return true; + + return used_version; } void loadAllSchema(Getter & getter, Int64 version, Context & context) From 6da631c99c918bfffcf183128306a5e6bd35c7f7 Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Mon, 4 Jul 2022 16:57:01 +0800 Subject: [PATCH 6/7] Optimize apply speed under heavy write pressure (#4883) ref pingcap/tiflash#4728 --- .../DeltaMerge/Delta/DeltaValueSpace.cpp | 13 +++++++ .../DeltaMerge/Delta/DeltaValueSpace.h | 7 ++++ .../Storages/DeltaMerge/DeltaMergeStore.cpp | 35 +++++++++++++++---- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 8 ++--- dbms/src/Storages/DeltaMerge/Segment.h | 2 ++ dbms/src/Storages/IManageableStorage.h | 2 +- dbms/src/Storages/StorageDeltaMerge.cpp | 6 ++-- dbms/src/Storages/StorageDeltaMerge.h | 2 +- dbms/src/Storages/Transaction/KVStore.cpp | 27 ++++++++------ dbms/src/Storages/Transaction/KVStore.h | 2 +- dbms/src/Storages/Transaction/RegionTable.cpp | 2 +- 11 files changed, 79 insertions(+), 27 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp index 132732d6989..8a69b7573e2 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp @@ -141,6 +141,19 @@ bool DeltaValueSpace::ingestColumnFiles(DMContext & /*context*/, const RowKeyRan bool DeltaValueSpace::flush(DMContext & context) { + bool v = false; + if (!is_flushing.compare_exchange_strong(v, true)) + { + // other thread is flushing, just return. + LOG_FMT_DEBUG(log, "{}, Flush stop because other thread is flushing", simpleInfo()); + return false; + } + SCOPE_EXIT({ + bool v = true; + if (!is_flushing.compare_exchange_strong(v, false)) + throw Exception(simpleInfo() + " is expected to be flushing", ErrorCodes::LOGICAL_ERROR); + }); + LOG_FMT_DEBUG(log, "{}, Flush start", info()); /// We have two types of data needed to flush to disk: diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h index 8f14682caa8..04fb97b3004 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h @@ -77,6 +77,11 @@ class DeltaValueSpace /// Note that those things can not be done at the same time. std::atomic_bool is_updating = false; + /// Note that it's safe to do multiple flush concurrently but only one of them can succeed, + /// and other thread's work is just a waste of resource. + /// So we only allow one flush task running at any time to aviod waste resource. + std::atomic_bool is_flushing = false; + std::atomic last_try_flush_rows = 0; std::atomic last_try_flush_bytes = 0; std::atomic last_try_compact_column_files = 0; @@ -159,6 +164,8 @@ class DeltaValueSpace size_t getTotalCacheBytes() const; size_t getValidCacheRows() const; + bool isFlushing() const { return is_flushing; } + bool isUpdating() const { return is_updating; } bool tryLockUpdating() diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 195ed5c53c2..09f290e311c 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -980,14 +980,14 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings checkSegmentUpdate(dm_context, segment, ThreadType::Write); } -void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range) +bool DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range, bool try_until_succeed) { RowKeyRange cur_range = range; while (!cur_range.none()) { RowKeyRange segment_range; - // Keep trying until succeeded. + // Keep trying until succeeded if needed. while (true) { SegmentPtr segment; @@ -1010,10 +1010,15 @@ void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRa { break; } + else if (!try_until_succeed) + { + return false; + } } cur_range.setStart(segment_range.end); } + return true; } void DeltaMergeStore::mergeDeltaAll(const Context & context) @@ -1347,6 +1352,12 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const && (delta_rows - delta_last_try_flush_rows >= delta_cache_limit_rows || delta_bytes - delta_last_try_flush_bytes >= delta_cache_limit_bytes); bool should_foreground_flush = unsaved_rows >= delta_cache_limit_rows * 3 || unsaved_bytes >= delta_cache_limit_bytes * 3; + /// For write thread, we want to avoid foreground flush to block the process of apply raft command. + /// So we increase the threshold of foreground flush for write thread. + if (thread_type == ThreadType::Write) + { + should_foreground_flush = unsaved_rows >= delta_cache_limit_rows * 10 || unsaved_bytes >= delta_cache_limit_bytes * 10; + } bool should_background_merge_delta = ((delta_check_rows >= delta_limit_rows || delta_check_bytes >= delta_limit_bytes) // && (delta_rows - delta_last_try_merge_delta_rows >= delta_cache_limit_rows @@ -1404,9 +1415,16 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const } else if (should_background_flush) { - delta_last_try_flush_rows = delta_rows; - delta_last_try_flush_bytes = delta_bytes; - try_add_background_task(BackgroundTask{TaskType::Flush, dm_context, segment, {}}); + /// It's meaningless to add more flush tasks if the segment is flushing. + /// Because only one flush task can proceed at any time. + /// And after the current flush task finished, + /// it will call `checkSegmentUpdate` again to check whether there is more flush task to do. + if (!segment->isFlushing()) + { + delta_last_try_flush_rows = delta_rows; + delta_last_try_flush_bytes = delta_bytes; + try_add_background_task(BackgroundTask{TaskType::Flush, dm_context, segment, {}}); + } } } @@ -1502,7 +1520,12 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const return false; }; auto try_bg_compact = [&]() { - if (should_compact) + /// Compact task should be a really low priority task. + /// And if the segment is flushing, + /// we should avoid adding background compact task to reduce lock contention on the segment and save disk throughput. + /// And after the current flush task complete, + /// it will call `checkSegmentUpdate` again to check whether there is other kinds of task to do. + if (should_compact && !segment->isFlushing()) { delta_last_try_compact_column_files = column_file_count; try_add_background_task(BackgroundTask{TaskType::Compact, dm_context, segment, {}}); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 705481ca107..57c2a42b807 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -367,14 +367,14 @@ class DeltaMergeStore : private boost::noncopyable const SegmentIdSet & read_segments = {}, size_t extra_table_id_index = InvalidColumnID); - /// Force flush all data to disk. - void flushCache(const Context & context, const RowKeyRange & range) + /// Try flush all data in `range` to disk and return whether the task succeed. + bool flushCache(const Context & context, const RowKeyRange & range, bool try_until_succeed = true) { auto dm_context = newDMContext(context, context.getSettingsRef()); - flushCache(dm_context, range); + return flushCache(dm_context, range, try_until_succeed); } - void flushCache(const DMContextPtr & dm_context, const RowKeyRange & range); + bool flushCache(const DMContextPtr & dm_context, const RowKeyRange & range, bool try_until_succeed = true); /// Merge delta into the stable layer for all segments. /// diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index cccfc5091b9..8058329ae91 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -300,6 +300,8 @@ class Segment : private boost::noncopyable void drop(const FileProviderPtr & file_provider, WriteBatches & wbs); + bool isFlushing() const { return delta->isFlushing(); } + RowsAndBytes getRowsAndBytesInRange( DMContext & dm_context, const SegmentSnapshotPtr & segment_snap, diff --git a/dbms/src/Storages/IManageableStorage.h b/dbms/src/Storages/IManageableStorage.h index ebf84c592e4..2ff766a9c6d 100644 --- a/dbms/src/Storages/IManageableStorage.h +++ b/dbms/src/Storages/IManageableStorage.h @@ -68,7 +68,7 @@ class IManageableStorage : public IStorage virtual void flushCache(const Context & /*context*/) {} - virtual void flushCache(const Context & /*context*/, const DM::RowKeyRange & /*range_to_flush*/) {} + virtual bool flushCache(const Context & /*context*/, const DM::RowKeyRange & /*range_to_flush*/, [[maybe_unused]] bool try_until_succeed = true) { return true; } virtual BlockInputStreamPtr status() { return {}; } diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 67d32c73a05..a6de4efb3ac 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -775,12 +775,12 @@ void StorageDeltaMerge::checkStatus(const Context & context) void StorageDeltaMerge::flushCache(const Context & context) { - flushCache(context, DM::RowKeyRange::newAll(is_common_handle, rowkey_column_size)); + flushCache(context, DM::RowKeyRange::newAll(is_common_handle, rowkey_column_size), /* try_until_succeed */ true); } -void StorageDeltaMerge::flushCache(const Context & context, const DM::RowKeyRange & range_to_flush) +bool StorageDeltaMerge::flushCache(const Context & context, const DM::RowKeyRange & range_to_flush, bool try_until_succeed) { - getAndMaybeInitStore()->flushCache(context, range_to_flush); + return getAndMaybeInitStore()->flushCache(context, range_to_flush, try_until_succeed); } void StorageDeltaMerge::mergeDelta(const Context & context) diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 79ee225d237..9e4ab12ad4f 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -73,7 +73,7 @@ class StorageDeltaMerge void flushCache(const Context & context) override; - void flushCache(const Context & context, const DM::RowKeyRange & range_to_flush) override; + bool flushCache(const Context & context, const DM::RowKeyRange & range_to_flush, bool try_until_succeed) override; /// Merge delta into the stable layer for all segments. /// diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index 318a04c6ed9..f9d6d01955e 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -129,7 +129,7 @@ void KVStore::traverseRegions(std::function & callback(region.first, region.second); } -void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log) +bool KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log, bool try_until_succeed) { auto table_id = region.getMappedTableID(); auto storage = tmt.getStorages().get(table_id); @@ -139,7 +139,7 @@ void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & regi "tryFlushRegionCacheInStorage can not get table for region {} with table id {}, ignored", region.toString(), table_id); - return; + return true; } try @@ -151,7 +151,7 @@ void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & regi region.getRange()->getMappedTableID(), storage->isCommonHandle(), storage->getRowKeyColumnSize()); - storage->flushCache(tmt.getContext(), rowkey_range); + return storage->flushCache(tmt.getContext(), rowkey_range, try_until_succeed); } catch (DB::Exception & e) { @@ -159,6 +159,7 @@ void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & regi if (e.code() != ErrorCodes::TABLE_IS_DROPPED) throw; } + return true; } void KVStore::tryPersist(RegionID region_id) @@ -366,12 +367,12 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd( if (rows >= region_compact_log_min_rows.load(std::memory_order_relaxed) || size_bytes >= region_compact_log_min_bytes.load(std::memory_order_relaxed)) { - // if rows or bytes more than threshold, flush cache and perist mem data. + // if rows or bytes more than threshold, try to flush cache and persist mem data. return true; } else { - // if thhere is little data in mem, wait until time interval reached threshold. + // if there is little data in mem, wait until time interval reached threshold. // use random period so that lots of regions will not be persisted at same time. auto compact_log_period = std::rand() % region_compact_log_period.load(std::memory_order_relaxed); // NOLINT return !(curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now()); @@ -381,11 +382,17 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd( if (check_sync_log()) { - tryFlushRegionCacheInStorage(tmt, curr_region, log); - persistRegion(curr_region, region_task_lock, "compact raft log"); - curr_region.markCompactLog(); - curr_region.cleanApproxMemCacheInfo(); - return EngineStoreApplyRes::Persist; + if (tryFlushRegionCacheInStorage(tmt, curr_region, log, /* try_until_succeed */ false)) + { + persistRegion(curr_region, region_task_lock, "compact raft log"); + curr_region.markCompactLog(); + curr_region.cleanApproxMemCacheInfo(); + return EngineStoreApplyRes::Persist; + } + else + { + return EngineStoreApplyRes::None; + } } return EngineStoreApplyRes::None; } diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index bb45e65d18b..66e2fe32b75 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -91,7 +91,7 @@ class KVStore final : private boost::noncopyable void tryPersist(RegionID region_id); - static void tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log); + static bool tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log, bool try_until_succeed = true); size_t regionSize() const; EngineStoreApplyRes handleAdminRaftCmd(raft_cmdpb::AdminRequest && request, diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index c855d5b3226..5ae36a4bd64 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -230,7 +230,7 @@ void removeObsoleteDataInStorage( auto rowkey_range = DM::RowKeyRange::fromRegionRange(handle_range, table_id, table_id, storage->isCommonHandle(), storage->getRowKeyColumnSize()); dm_storage->deleteRange(rowkey_range, context->getSettingsRef()); - dm_storage->flushCache(*context, rowkey_range); // flush to disk + dm_storage->flushCache(*context, rowkey_range, /*try_until_succeed*/ true); // flush to disk } catch (DB::Exception & e) { From a0ecce02725e10c32e8484e7be79396fda5bfdf6 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Tue, 5 Jul 2022 14:49:00 +0800 Subject: [PATCH 7/7] update proxy to raftstore-proxy-6.2 (#5287) ref pingcap/tiflash#4982 --- contrib/tiflash-proxy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/tiflash-proxy b/contrib/tiflash-proxy index ca2f51f94e5..6ea4d608b1c 160000 --- a/contrib/tiflash-proxy +++ b/contrib/tiflash-proxy @@ -1 +1 @@ -Subproject commit ca2f51f94e55bdd23749dcc02ab4afb94eeb5ae5 +Subproject commit 6ea4d608b1c03fab89d17f54a2e399602231e27c