From ea955f02529092321b5a4626019698c624d7b1d3 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Thu, 9 Nov 2023 15:52:27 +0800 Subject: [PATCH 01/10] init --- contrib/kvproto | 2 +- dbms/src/Flash/Coprocessor/DAGContext.cpp | 3 +- dbms/src/Flash/FlashService.cpp | 2 +- dbms/src/Flash/Mpp/MPPTaskId.cpp | 3 +- dbms/src/Flash/Mpp/MPPTaskId.h | 46 +++++++++++++++---- .../tests/gtest_aborted_mpp_gather_cache.cpp | 20 +++++--- .../Mpp/tests/gtest_mpp_task_manager.cpp | 10 ++-- .../gtest_query_operator_spill_contexts.cpp | 10 ++-- .../DeltaMerge/Remote/DisaggTaskId.cpp | 4 +- .../Storages/StorageDisaggregatedRemote.cpp | 42 ++++++++++++----- 10 files changed, 101 insertions(+), 41 deletions(-) diff --git a/contrib/kvproto b/contrib/kvproto index d603cce32b9..cfc6f5b2c02 160000 --- a/contrib/kvproto +++ b/contrib/kvproto @@ -1 +1 @@ -Subproject commit d603cce32b968e37fa24bce074511f0db2b4d740 +Subproject commit cfc6f5b2c025c65f9d788b73b19813daf3517866 diff --git a/dbms/src/Flash/Coprocessor/DAGContext.cpp b/dbms/src/Flash/Coprocessor/DAGContext.cpp index 13e6c739c68..252620ab945 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.cpp +++ b/dbms/src/Flash/Coprocessor/DAGContext.cpp @@ -164,7 +164,8 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, String log_identifier, s , warnings(max_recorded_error_count) , warning_count(0) { - query_operator_spill_contexts = std::make_shared(MPPQueryId(0, 0, 0, 0, ""), 100); + query_operator_spill_contexts + = std::make_shared(MPPQueryId(0, 0, 0, 0, "", 0, ""), 100); initOutputInfo(); } diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 157af85897f..0d6fdc745c6 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -919,7 +919,7 @@ grpc::Status FlashService::EstablishDisaggTask( "EstablishDisaggTask should only be called on write node"); const auto & meta = request->meta(); - DM::DisaggTaskId task_id(meta); + DM::DisaggTaskId task_id(meta\); auto logger = Logger::get(task_id); auto record_other_error = [&](int flash_err_code, const String & err_msg) { diff --git a/dbms/src/Flash/Mpp/MPPTaskId.cpp b/dbms/src/Flash/Mpp/MPPTaskId.cpp index 3c24419cf52..3940675ed5b 100644 --- a/dbms/src/Flash/Mpp/MPPTaskId.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskId.cpp @@ -107,7 +107,8 @@ String MPPTaskId::toString() const const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{}; constexpr UInt64 MAX_UINT64 = std::numeric_limits::max(); -const MPPQueryId MPPTaskId::Max_Query_Id = MPPQueryId(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64, ""); +const MPPQueryId MPPTaskId::Max_Query_Id + = MPPQueryId(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64, "", MAX_UINT64, ""); bool operator==(const MPPTaskId & lid, const MPPTaskId & rid) { diff --git a/dbms/src/Flash/Mpp/MPPTaskId.h b/dbms/src/Flash/Mpp/MPPTaskId.h index 903fc7552fa..783f5fd7b69 100644 --- a/dbms/src/Flash/Mpp/MPPTaskId.h +++ b/dbms/src/Flash/Mpp/MPPTaskId.h @@ -28,18 +28,24 @@ struct MPPQueryId UInt64 server_id; UInt64 start_ts; String resource_group_name; + UInt64 connection_id; + String connection_alias; MPPQueryId( UInt64 query_ts, UInt64 local_query_id, UInt64 server_id, UInt64 start_ts, - const String & resource_group_name_) + const String & resource_group_name_, + UInt64 connection_id_, + const String & connection_alias_) : query_ts(query_ts) , local_query_id(local_query_id) , server_id(server_id) , start_ts(start_ts) , resource_group_name(resource_group_name_) + , connection_id(connection_id_) + , connection_alias(connection_alias_) {} explicit MPPQueryId(const mpp::TaskMeta & task_meta) : query_ts(task_meta.query_ts()) @@ -47,6 +53,8 @@ struct MPPQueryId , server_id(task_meta.server_id()) , start_ts(task_meta.start_ts()) , resource_group_name(task_meta.resource_group_name()) + , connection_id(task_meta.connection_id()) + , connection_alias(task_meta.connection_alias()) {} bool operator<(const MPPQueryId & mpp_query_id) const; bool operator==(const MPPQueryId & rid) const; @@ -56,12 +64,15 @@ struct MPPQueryId String toString() const { return fmt::format( - "", + "", query_ts, local_query_id, server_id, start_ts, - resource_group_name); + resource_group_name, + connection_id, + connection_alias); } }; @@ -89,9 +100,11 @@ struct MPPGatherId UInt64 local_query_id, UInt64 server_id, UInt64 start_ts, - const String & resource_group_name) + const String & resource_group_name, + UInt64 connection_id, + const String & connection_alias) : gather_id(gather_id_) - , query_id(query_ts, local_query_id, server_id, start_ts, resource_group_name) + , query_id(query_ts, local_query_id, server_id, start_ts, resource_group_name, connection_id, connection_alias) {} explicit MPPGatherId(const mpp::TaskMeta & task_meta) : gather_id(task_meta.gather_id()) @@ -100,13 +113,16 @@ struct MPPGatherId String toString() const { return fmt::format( - "", + "", gather_id, query_id.query_ts, query_id.local_query_id, query_id.server_id, query_id.start_ts, - query_id.resource_group_name); + query_id.resource_group_name, + query_id.connection_id, + query_id.connection_alias); } bool hasMeaningfulGatherId() const { return gather_id > 0; } bool operator==(const MPPGatherId & rid) const; @@ -122,7 +138,7 @@ struct MPPTaskId { MPPTaskId() : task_id(unknown_task_id) - , gather_id(0, 0, 0, 0, 0, ""){}; + , gather_id(0, 0, 0, 0, 0, "", 0, ""){}; MPPTaskId( UInt64 start_ts, @@ -131,9 +147,19 @@ struct MPPTaskId Int64 gather_id, UInt64 query_ts, UInt64 local_query_id, - const String resource_group_name) + const String resource_group_name, + UInt64 connection_id, + const String & connection_alias) : task_id(task_id_) - , gather_id(gather_id, query_ts, local_query_id, server_id, start_ts, resource_group_name) + , gather_id( + gather_id, + query_ts, + local_query_id, + server_id, + start_ts, + resource_group_name, + connection_id, + connection_alias) {} explicit MPPTaskId(const mpp::TaskMeta & task_meta) diff --git a/dbms/src/Flash/Mpp/tests/gtest_aborted_mpp_gather_cache.cpp b/dbms/src/Flash/Mpp/tests/gtest_aborted_mpp_gather_cache.cpp index 6a81918f408..ff97a5a8962 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_aborted_mpp_gather_cache.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_aborted_mpp_gather_cache.cpp @@ -32,25 +32,31 @@ try AbortedMPPGatherCache cache(capacity); for (size_t i = 0; i < capacity; i++) { - cache.add(MPPGatherId(i, MPPQueryId(1, 2, 3, 4, /*resource_group_name=*/"")), ""); + cache.add(MPPGatherId(i, MPPQueryId(1, 2, 3, 4, /*resource_group_name=*/"", 5, "")), ""); } for (size_t i = 0; i < capacity; i++) { - ASSERT_EQ(!cache.check(MPPGatherId(i, MPPQueryId(1, 2, 3, 4, /*resource_group_name=*/""))).empty(), true); + ASSERT_EQ( + !cache.check(MPPGatherId(i, MPPQueryId(1, 2, 3, 4, /*resource_group_name=*/"", 5, ""))).empty(), + true); } for (size_t i = 0; i < capacity; i++) { - cache.add(MPPGatherId(0, MPPQueryId(1, 2, 3, 4, /*resource_group_name=*/"")), ""); + cache.add(MPPGatherId(0, MPPQueryId(1, 2, 3, 4, /*resource_group_name=*/"", 5, "")), ""); } for (size_t i = 0; i < capacity; i++) { - ASSERT_EQ(!cache.check(MPPGatherId(i, MPPQueryId(1, 2, 3, 4, /*resource_group_name=*/""))).empty(), true); + ASSERT_EQ( + !cache.check(MPPGatherId(i, MPPQueryId(1, 2, 3, 4, /*resource_group_name=*/"", 5, ""))).empty(), + true); } - cache.add(MPPGatherId(capacity, MPPQueryId(1, 2, 3, 4, /*resource_group_name=*/"")), ""); - ASSERT_EQ(!cache.check(MPPGatherId(0, MPPQueryId(1, 2, 3, 4, /*resource_group_name=*/""))).empty(), false); + cache.add(MPPGatherId(capacity, MPPQueryId(1, 2, 3, 4, /*resource_group_name=*/"", 5, "")), ""); + ASSERT_EQ(!cache.check(MPPGatherId(0, MPPQueryId(1, 2, 3, 4, /*resource_group_name=*/"", 5, ""))).empty(), false); for (size_t i = 0; i < capacity; i++) { - ASSERT_EQ(!cache.check(MPPGatherId(i + 1, MPPQueryId(1, 2, 3, 4, /*resource_group_name=*/""))).empty(), true); + ASSERT_EQ( + !cache.check(MPPGatherId(i + 1, MPPQueryId(1, 2, 3, 4, /*resource_group_name=*/"", 5, ""))).empty(), + true); } } CATCH diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpp_task_manager.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpp_task_manager.cpp index 0ff23d02774..a093e61d039 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpp_task_manager.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpp_task_manager.cpp @@ -66,7 +66,7 @@ try /// find async tunnel create alarm if task is not visible EstablishCallData establish_call_data; mpp::EstablishMPPConnectionRequest establish_req; - auto gather_id = MPPGatherId(1, MPPQueryId(1, 1, 1, 1, "")); + auto gather_id = MPPGatherId(1, MPPQueryId(1, 1, 1, 1, "", 1, "")); auto * receiver_meta = establish_req.mutable_receiver_meta(); fillTaskMeta(receiver_meta, 2, gather_id); auto * sender_meta = establish_req.mutable_sender_meta(); @@ -77,7 +77,7 @@ try ASSERT_TRUE(find_tunnel_result.first == nullptr && find_tunnel_result.second.empty()); /// `findAsyncTunnel` will create GatherTaskSet - auto gather_task_set = mpp_task_manager->getGatherTaskSet(MPPGatherId(1, MPPQueryId(1, 1, 1, 1, ""))); + auto gather_task_set = mpp_task_manager->getGatherTaskSet(MPPGatherId(1, MPPQueryId(1, 1, 1, 1, "", 1, ""))); ASSERT_TRUE(gather_task_set.first != nullptr); ASSERT_TRUE(!gather_task_set.first->hasMPPTask()); ASSERT_TRUE(gather_task_set.first->hasAlarm()); @@ -98,7 +98,7 @@ try /// unregister task should clean the related alarms mpp_task_manager->unregisterTask(mpp_task_1->getId(), ""); - gather_task_set = mpp_task_manager->getGatherTaskSet(MPPGatherId(1, MPPQueryId(1, 1, 1, 1, ""))); + gather_task_set = mpp_task_manager->getGatherTaskSet(MPPGatherId(1, MPPQueryId(1, 1, 1, 1, "", 1, ""))); ASSERT_TRUE(gather_task_set.first->hasMPPTask()); ASSERT_TRUE(!gather_task_set.first->hasAlarm()); @@ -114,7 +114,7 @@ try /// if all task is unregistered, min tso should be updated mpp_task_manager->unregisterTask(mpp_task_2->getId(), ""); - gather_task_set = mpp_task_manager->getGatherTaskSet(MPPGatherId(1, MPPQueryId(1, 1, 1, 1, ""))); + gather_task_set = mpp_task_manager->getGatherTaskSet(MPPGatherId(1, MPPQueryId(1, 1, 1, 1, "", 1, ""))); ASSERT_TRUE(gather_task_set.first == nullptr); ASSERT_TRUE( mpp_task_manager->getCurrentMinTSOQueryId(gather_id.query_id.resource_group_name) == MPPTaskId::Max_Query_Id); @@ -129,7 +129,7 @@ try auto mpp_task_manager = context->getTMTContext().getMPPTaskManager(); { mpp::EstablishMPPConnectionRequest establish_req; - auto gather_id = MPPGatherId(1, MPPQueryId(1, 1, 1, 1, "")); + auto gather_id = MPPGatherId(1, MPPQueryId(1, 1, 1, 1, "", 1, "")); auto * sender_meta = establish_req.mutable_sender_meta(); fillTaskMeta(sender_meta, 1, gather_id); original_task = MPPTask::newTaskForTest(*sender_meta, context); diff --git a/dbms/src/Interpreters/tests/gtest_query_operator_spill_contexts.cpp b/dbms/src/Interpreters/tests/gtest_query_operator_spill_contexts.cpp index 8e1dc05a42e..d860f551f82 100644 --- a/dbms/src/Interpreters/tests/gtest_query_operator_spill_contexts.cpp +++ b/dbms/src/Interpreters/tests/gtest_query_operator_spill_contexts.cpp @@ -64,7 +64,9 @@ try std::shared_ptr task_operator_spill_contexts = std::make_shared(); task_operator_spill_contexts->registerOperatorSpillContext(sort_spill_context); - QueryOperatorSpillContexts query_operator_spill_contexts(MPPQueryId(0, 0, 0, 0, /*resource_group_name=*/""), 0); + QueryOperatorSpillContexts query_operator_spill_contexts( + MPPQueryId(0, 0, 0, 0, /*resource_group_name=*/"", 0, ""), + 0); ASSERT_TRUE(query_operator_spill_contexts.getTaskOperatorSpillContextsCount() == 0); query_operator_spill_contexts.registerTaskOperatorSpillContexts(task_operator_spill_contexts); ASSERT_TRUE(query_operator_spill_contexts.getTaskOperatorSpillContextsCount() == 1); @@ -87,7 +89,9 @@ try task_operator_spill_contexts_2->registerOperatorSpillContext(sort_spill_context_2); task_operator_spill_contexts_2->registerOperatorSpillContext(sort_spill_context_3); - QueryOperatorSpillContexts query_operator_spill_contexts(MPPQueryId(0, 0, 0, 0, /*resource_group_name=*/""), 0); + QueryOperatorSpillContexts query_operator_spill_contexts( + MPPQueryId(0, 0, 0, 0, /*resource_group_name=*/"", 0, ""), + 0); query_operator_spill_contexts.registerTaskOperatorSpillContexts(task_operator_spill_contexts_1); query_operator_spill_contexts.registerTaskOperatorSpillContexts(task_operator_spill_contexts_2); @@ -146,7 +150,7 @@ try UInt64 min_check_interval = 1000; QueryOperatorSpillContexts query_operator_spill_contexts( - MPPQueryId(0, 0, 0, 0, /*resource_group_name=*/""), + MPPQueryId(0, 0, 0, 0, /*resource_group_name=*/"", 0, ""), min_check_interval); query_operator_spill_contexts.registerTaskOperatorSpillContexts(task_operator_spill_contexts_1); query_operator_spill_contexts.registerTaskOperatorSpillContexts(task_operator_spill_contexts_2); diff --git a/dbms/src/Storages/DeltaMerge/Remote/DisaggTaskId.cpp b/dbms/src/Storages/DeltaMerge/Remote/DisaggTaskId.cpp index 925289a2145..5f3a87f02be 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DisaggTaskId.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/DisaggTaskId.cpp @@ -27,7 +27,9 @@ DisaggTaskId::DisaggTaskId(const disaggregated::DisaggTaskMeta & task_meta) task_meta.gather_id(), task_meta.query_ts(), task_meta.local_query_id(), - /*resource_group_name=*/"") + /*resource_group_name=*/"", + task_meta.connection_id(), + task_meta.connection_alias()) , executor_id(task_meta.executor_id()) {} diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index 1af797602d5..eac9a26f7b2 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -74,6 +74,29 @@ extern const int DISAGG_ESTABLISH_RETRYABLE_ERROR; extern const int TIMEOUT_EXCEEDED; } // namespace ErrorCodes +namespace +{ +void initDisaggTaskMeta( + disaggregated::DisaggTaskMeta * meta, + const MPPTaskId & sender_target_mpp_task_id, + DAGContext * dag_context, + const KeyspaceID keyspace_id, + const TiDBTableScan & table_scan) +{ + meta->set_start_ts(sender_target_mpp_task_id.gather_id.query_id.start_ts); + meta->set_query_ts(sender_target_mpp_task_id.gather_id.query_id.query_ts); + meta->set_server_id(sender_target_mpp_task_id.gather_id.query_id.server_id); + meta->set_local_query_id(sender_target_mpp_task_id.gather_id.query_id.local_query_id); + meta->set_gather_id(sender_target_mpp_task_id.gather_id.gather_id); + meta->set_task_id(dag_context->getMPPTaskId().task_id); + meta->set_executor_id(table_scan.getTableScanExecutorID()); + meta->set_keyspace_id(keyspace_id); + meta->set_api_version(keyspace_id == NullspaceID ? kvrpcpb::APIVersion::V1 : kvrpcpb::APIVersion::V2); + meta->set_connection_id(sender_target_mpp_task_id.gather_id.query_id.connection_id); + meta->set_connection_alias(sender_target_mpp_task_id.gather_id.query_id.connection_alias); +} +} // namespace + BlockInputStreams StorageDisaggregated::readThroughS3(const Context & db_context, unsigned num_streams) { // Build InputStream according to the remote segment read tasks @@ -406,20 +429,17 @@ std::shared_ptr StorageDisaggregated: { const auto & settings = db_context.getSettingsRef(); auto establish_req = std::make_shared(); + { - disaggregated::DisaggTaskMeta * meta = establish_req->mutable_meta(); - meta->set_start_ts(sender_target_mpp_task_id.gather_id.query_id.start_ts); - meta->set_query_ts(sender_target_mpp_task_id.gather_id.query_id.query_ts); - meta->set_server_id(sender_target_mpp_task_id.gather_id.query_id.server_id); - meta->set_local_query_id(sender_target_mpp_task_id.gather_id.query_id.local_query_id); - meta->set_gather_id(sender_target_mpp_task_id.gather_id.gather_id); auto * dag_context = db_context.getDAGContext(); - meta->set_task_id(dag_context->getMPPTaskId().task_id); - meta->set_executor_id(table_scan.getTableScanExecutorID()); - const auto keyspace_id = dag_context->getKeyspaceID(); - meta->set_keyspace_id(keyspace_id); - meta->set_api_version(keyspace_id == NullspaceID ? kvrpcpb::APIVersion::V1 : kvrpcpb::APIVersion::V2); + initDisaggTaskMeta( + establish_req->mutable_meta(), + sender_target_mpp_task_id, + dag_context, + dag_context->getKeyspaceID(), + table_scan); } + // how long the task is valid on the write node establish_req->set_timeout_s(DEFAULT_DISAGG_TASK_TIMEOUT_SEC); establish_req->set_address(batch_cop_task.store_addr); From 85c4bcb2c4f37b019cf8b9c80fb9628b2e91e729 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Thu, 9 Nov 2023 16:16:59 +0800 Subject: [PATCH 02/10] remove useless --- dbms/src/Flash/FlashService.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 0d6fdc745c6..157af85897f 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -919,7 +919,7 @@ grpc::Status FlashService::EstablishDisaggTask( "EstablishDisaggTask should only be called on write node"); const auto & meta = request->meta(); - DM::DisaggTaskId task_id(meta\); + DM::DisaggTaskId task_id(meta); auto logger = Logger::get(task_id); auto record_other_error = [&](int flash_err_code, const String & err_msg) { From e26a1da4498a396b93a37acc5a5787f210ee07ec Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 10 Nov 2023 13:27:13 +0800 Subject: [PATCH 03/10] add cop --- contrib/kvproto | 2 +- dbms/src/Flash/FlashService.cpp | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/contrib/kvproto b/contrib/kvproto index cfc6f5b2c02..ddcf2223e2a 160000 --- a/contrib/kvproto +++ b/contrib/kvproto @@ -1 +1 @@ -Subproject commit cfc6f5b2c025c65f9d788b73b19813daf3517866 +Subproject commit ddcf2223e2aa9315df2929a0eb5f1f8214c2b580 diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 157af85897f..9d9d0b4214f 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -279,11 +279,13 @@ grpc::Status FlashService::Coprocessor( }); CoprocessorContext cop_context(*db_context, request->context(), *grpc_context); auto request_identifier = fmt::format( - "Coprocessor, is_remote_read: {}, start_ts: {}, region_info: {}, resource_group: {}", + "Coprocessor, is_remote_read: {}, start_ts: {}, region_info: {}, resource_group: {}, conn_id: {}, conn_alias: {}", is_remote_read, request->start_ts(), region_info, - request->context().resource_control_context().resource_group_name()); + request->context().resource_control_context().resource_group_name(), + request->connection_id(), + request->connection_alias()); CoprocessorHandler cop_handler(cop_context, request, response, request_identifier); return cop_handler.execute(); }; @@ -340,9 +342,11 @@ grpc::Status FlashService::BatchCoprocessor( } CoprocessorContext cop_context(*db_context, request->context(), *grpc_context); auto request_identifier = fmt::format( - "BatchCoprocessor, start_ts: {}, resource_group: {}", + "BatchCoprocessor, start_ts: {}, resource_group: {}, conn_id: {}, conn_alias: {}", request->start_ts(), - request->context().resource_control_context().resource_group_name()); + request->context().resource_control_context().resource_group_name(), + request->connection_id(), + request->connection_alias()); BatchCoprocessorHandler cop_handler(cop_context, request, writer, request_identifier); return cop_handler.execute(); }); From 732fbe60821a3a7dacbb3757968f407b43e39932 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 10 Nov 2023 21:25:29 +0800 Subject: [PATCH 04/10] support remote read --- dbms/src/Debug/dbgQueryExecutor.cpp | 6 ++- dbms/src/Flash/BatchCoprocessorHandler.cpp | 2 + dbms/src/Flash/Coprocessor/DAGContext.cpp | 10 +++++ dbms/src/Flash/Coprocessor/DAGContext.h | 7 ++++ .../Coprocessor/DAGStorageInterpreter.cpp | 5 ++- dbms/src/Flash/CoprocessorHandler.cpp | 2 + dbms/src/Flash/FlashService.cpp | 3 +- dbms/src/Flash/tests/gtest_compute_server.cpp | 40 ++++++++++++++----- .../Storages/tests/gtest_filter_parser.cpp | 2 +- .../tests/gtests_parse_push_down_filter.cpp | 2 +- dbms/src/TestUtils/MPPTaskTestUtils.cpp | 2 +- 11 files changed, 63 insertions(+), 18 deletions(-) diff --git a/dbms/src/Debug/dbgQueryExecutor.cpp b/dbms/src/Debug/dbgQueryExecutor.cpp index 6bee1fe2d12..f3906a56a33 100644 --- a/dbms/src/Debug/dbgQueryExecutor.cpp +++ b/dbms/src/Debug/dbgQueryExecutor.cpp @@ -399,7 +399,8 @@ tipb::SelectResponse executeDAGRequest( region_id, RegionInfo(region_id, region_version, region_conf_version, std::move(key_ranges), nullptr)); - DAGContext dag_context(dag_request, std::move(tables_regions_info), NullspaceID, "", DAGRequestKind::Cop, "", log); + DAGContext + dag_context(dag_request, std::move(tables_regions_info), NullspaceID, "", DAGRequestKind::Cop, "", 0, "", log); context.setDAGContext(&dag_context); DAGDriver driver(context, start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, &dag_response, true); @@ -433,7 +434,8 @@ bool runAndCompareDagReq( region_id, RegionInfo(region_id, region->version(), region->confVer(), std::move(key_ranges), nullptr)); - DAGContext dag_context(dag_request, std::move(tables_regions_info), NullspaceID, "", DAGRequestKind::Cop, "", log); + DAGContext + dag_context(dag_request, std::move(tables_regions_info), NullspaceID, "", DAGRequestKind::Cop, "", 0, "", log); context.setDAGContext(&dag_context); DAGDriver driver(context, properties.start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, &dag_response, true); diff --git a/dbms/src/Flash/BatchCoprocessorHandler.cpp b/dbms/src/Flash/BatchCoprocessorHandler.cpp index 626216b6245..210d915e1a8 100644 --- a/dbms/src/Flash/BatchCoprocessorHandler.cpp +++ b/dbms/src/Flash/BatchCoprocessorHandler.cpp @@ -84,6 +84,8 @@ grpc::Status BatchCoprocessorHandler::execute() cop_context.db_context.getClientInfo().current_address.toString(), DAGRequestKind::BatchCop, resource_group_name, + cop_request->connection_id(), + cop_request->connection_alias(), Logger::get(log->identifier())); cop_context.db_context.setDAGContext(&dag_context); diff --git a/dbms/src/Flash/Coprocessor/DAGContext.cpp b/dbms/src/Flash/Coprocessor/DAGContext.cpp index 252620ab945..8d25e991c89 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.cpp +++ b/dbms/src/Flash/Coprocessor/DAGContext.cpp @@ -52,6 +52,8 @@ DAGContext::DAGContext( const String & tidb_host_, DAGRequestKind kind_, const String & resource_group_name_, + UInt64 connection_id_, + const String & connection_alias_, LoggerPtr log_) : dag_request(&dag_request_) , dummy_query_string(dag_request->ShortDebugString()) @@ -71,6 +73,8 @@ DAGContext::DAGContext( , warning_count(0) , keyspace_id(keyspace_id_) , resource_group_name(resource_group_name_) + , connection_id(connection_id_) + , connection_alias(connection_alias_) { RUNTIME_ASSERT(kind != DAGRequestKind::MPP, log, "DAGContext non-mpp constructor get a mpp kind"); initOutputInfo(); @@ -95,6 +99,8 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const mpp::TaskMeta & me , warning_count(0) , keyspace_id(RequestUtils::deriveKeyspaceID(meta_)) , resource_group_name(meta_.resource_group_name()) + , connection_id(meta_.connection_id()) + , connection_alias(meta_.connection_alias()) { // only mpp task has join executor. initExecutorIdToJoinIdMap(); @@ -127,6 +133,8 @@ DAGContext::DAGContext( , warnings(max_recorded_error_count) , warning_count(0) , keyspace_id(RequestUtils::deriveKeyspaceID(task_meta_)) + , connection_id(task_meta_.connection_id()) + , connection_alias(task_meta_.connection_alias()) { initOutputInfo(); } @@ -144,6 +152,7 @@ DAGContext::DAGContext(UInt64 max_error_count_) , max_recorded_error_count(max_error_count_) , warnings(max_recorded_error_count) , warning_count(0) + , connection_id(0) {} // for tests need to run query tasks. @@ -163,6 +172,7 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, String log_identifier, s , max_recorded_error_count(getMaxErrorCount(*dag_request)) , warnings(max_recorded_error_count) , warning_count(0) + , connection_id(0) { query_operator_spill_contexts = std::make_shared(MPPQueryId(0, 0, 0, 0, "", 0, ""), 100); diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 995b8b7e522..55e85923d42 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -157,6 +157,8 @@ class DAGContext const String & tidb_host_, DAGRequestKind cop_kind_, const String & resource_group_name, + UInt64 connection_id_, + const String & connection_alias_, LoggerPtr log_); // for mpp @@ -447,6 +449,11 @@ class DAGContext // - Stream: execute with block input stream // - Pipeline: execute with pipeline model ExecutionMode execution_mode = ExecutionMode::None; + + // It's the session id between client and tidb + UInt64 connection_id; + // It's the session alias between client and tidb + String connection_alias; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 6595744b2dc..34d69970fb3 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -714,7 +714,7 @@ std::vector DAGStorageInterpreter::buildCopTasks( std::multimap meta_data; meta_data.emplace("is_remote_read", "true"); - auto tasks = pingcap::coprocessor::buildCopTasks( + auto tasks = pingcap::coprocessor::buildCopTasks( // Here bo, cluster, remote_request.key_ranges, @@ -733,7 +733,8 @@ std::vector DAGStorageInterpreter::buildCopTasks( CoprocessorReaderPtr DAGStorageInterpreter::buildCoprocessorReader(const std::vector & remote_requests) { - std::vector all_tasks = buildCopTasks(remote_requests); + std::vector all_tasks + = buildCopTasks(remote_requests); // put the id and alias into CopTasks const DAGSchema & schema = remote_requests[0].schema; pingcap::kv::Cluster * cluster = tmt.getKVCluster(); bool has_enforce_encode_type diff --git a/dbms/src/Flash/CoprocessorHandler.cpp b/dbms/src/Flash/CoprocessorHandler.cpp index cd2a725ac20..73494f033b2 100644 --- a/dbms/src/Flash/CoprocessorHandler.cpp +++ b/dbms/src/Flash/CoprocessorHandler.cpp @@ -156,6 +156,8 @@ grpc::Status CoprocessorHandler::execute() cop_context.db_context.getClientInfo().current_address.toString(), kind, resource_group_name, + cop_request->connection_id(), + cop_request->connection_alias(), Logger::get(log->identifier())); cop_context.db_context.setDAGContext(&dag_context); diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 9d9d0b4214f..2950a3f1432 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -279,7 +279,8 @@ grpc::Status FlashService::Coprocessor( }); CoprocessorContext cop_context(*db_context, request->context(), *grpc_context); auto request_identifier = fmt::format( - "Coprocessor, is_remote_read: {}, start_ts: {}, region_info: {}, resource_group: {}, conn_id: {}, conn_alias: {}", + "Coprocessor, is_remote_read: {}, start_ts: {}, region_info: {}, resource_group: {}, conn_id: {}, " + "conn_alias: {}", is_remote_read, request->start_ts(), region_info, diff --git a/dbms/src/Flash/tests/gtest_compute_server.cpp b/dbms/src/Flash/tests/gtest_compute_server.cpp index e79c1d179fa..0879bbef7fc 100644 --- a/dbms/src/Flash/tests/gtest_compute_server.cpp +++ b/dbms/src/Flash/tests/gtest_compute_server.cpp @@ -139,7 +139,9 @@ class ComputeServerRunner : public DB::tests::MPPTaskTestUtils properties.local_query_id, properties.server_id, properties.start_ts, - /*resource_group_name=*/""); + /*resource_group_name=*/"", + 0, + ""); gather_ids.push_back(gather_id); running_queries.emplace_back([&, properties, gather_id]() { BlockInputStreamPtr stream; @@ -578,7 +580,9 @@ try properties.local_query_id, properties.server_id, properties.start_ts, - /*resource_group_name=*/""); + /*resource_group_name=*/"", + 0, + ""); auto res = prepareMPPStreams( context.scan("test_db", "test_table_1") .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) @@ -597,7 +601,9 @@ try properties.local_query_id, properties.server_id, properties.start_ts, - /*resource_group_name=*/""); + /*resource_group_name=*/"", + 0, + ""); auto tasks = prepareMPPTasks( context.scan("test_db", "test_table_1") .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) @@ -631,7 +637,9 @@ try properties.local_query_id, properties.server_id, properties.start_ts, - /*resource_group_name=*/""); + /*resource_group_name=*/"", + 0, + ""); auto res = prepareMPPStreams( context.scan("test_db", "l_table") .join(context.scan("test_db", "r_table"), tipb::JoinType::TypeLeftOuterJoin, {col("join_c")}), @@ -658,7 +666,9 @@ try properties.local_query_id, properties.server_id, properties.start_ts, - /*resource_group_name=*/""); + /*resource_group_name=*/"", + 0, + ""); auto stream = prepareMPPStreams( context.scan("test_db", "l_table") .join(context.scan("test_db", "r_table"), tipb::JoinType::TypeLeftOuterJoin, {col("join_c")}) @@ -687,7 +697,9 @@ try properties1.local_query_id, properties1.server_id, properties1.start_ts, - /*resource_group_name=*/""); + /*resource_group_name=*/"", + 0, + ""); auto res1 = prepareMPPStreams( context.scan("test_db", "l_table") .join(context.scan("test_db", "r_table"), tipb::JoinType::TypeLeftOuterJoin, {col("join_c")}), @@ -699,7 +711,9 @@ try properties2.local_query_id, properties2.server_id, properties2.start_ts, - /*resource_group_name=*/""); + /*resource_group_name=*/"", + 0, + ""); auto res2 = prepareMPPStreams( context.scan("test_db", "l_table") .join(context.scan("test_db", "r_table"), tipb::JoinType::TypeLeftOuterJoin, {col("join_c")}) @@ -728,7 +742,9 @@ try properties.local_query_id, properties.server_id, properties.start_ts, - /*resource_group_name=*/""); + /*resource_group_name=*/"", + 0, + ""); queries.push_back(std::make_tuple( gather_id, prepareMPPStreams( @@ -852,7 +868,9 @@ try properties.local_query_id, properties.server_id, properties.start_ts, - /*resource_group_name=*/""); + /*resource_group_name=*/"", + 0, + ""); try { BlockInputStreamPtr tmp = prepareMPPStreams( @@ -906,7 +924,9 @@ try properties.local_query_id, properties.server_id, properties.start_ts, - /*resource_group_name=*/""); + /*resource_group_name=*/"", + 0, + ""); /// currently all the failpoints are automatically disabled after triggered once, so have to enable it before every run FailPointHelper::enableFailPoint(failpoint); BlockInputStreamPtr stream; diff --git a/dbms/src/Storages/tests/gtest_filter_parser.cpp b/dbms/src/Storages/tests/gtest_filter_parser.cpp index 60795670c03..14ef3d8f777 100644 --- a/dbms/src/Storages/tests/gtest_filter_parser.cpp +++ b/dbms/src/Storages/tests/gtest_filter_parser.cpp @@ -85,7 +85,7 @@ DM::RSOperatorPtr FilterParserTest::generateRsOperator( [&](const String &, const String &) { return table_info; }, getDAGProperties("")); auto & dag_request = *query_tasks[0].dag_request; - DAGContext dag_context(dag_request, {}, NullspaceID, "", DAGRequestKind::Cop, "", log); + DAGContext dag_context(dag_request, {}, NullspaceID, "", DAGRequestKind::Cop, "", 0, "", log); ctx->setDAGContext(&dag_context); // Don't care about regions information in this test DAGQuerySource dag(*ctx); diff --git a/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp b/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp index c107d19e84e..e030a4fda55 100644 --- a/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp +++ b/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp @@ -73,7 +73,7 @@ DM::PushDownFilterPtr ParsePushDownFilterTest::generatePushDownFilter( [&](const String &, const String &) { return table_info; }, getDAGProperties("")); auto & dag_request = *query_tasks[0].dag_request; - DAGContext dag_context(dag_request, {}, NullspaceID, "", DAGRequestKind::Cop, "", log); + DAGContext dag_context(dag_request, {}, NullspaceID, "", DAGRequestKind::Cop, "", 0, "", log); ctx->setDAGContext(&dag_context); // Don't care about regions information in this test DAGQuerySource dag(*ctx); diff --git a/dbms/src/TestUtils/MPPTaskTestUtils.cpp b/dbms/src/TestUtils/MPPTaskTestUtils.cpp index e1744d54448..c2240c1abdf 100644 --- a/dbms/src/TestUtils/MPPTaskTestUtils.cpp +++ b/dbms/src/TestUtils/MPPTaskTestUtils.cpp @@ -154,7 +154,7 @@ ColumnsWithTypeAndName MPPTaskTestUtils::executeCoprocessorTask(std::shared_ptr< auto * data = req->mutable_data(); dag_request->AppendToString(data); - DAGContext dag_context(*dag_request, {}, NullspaceID, "", DAGRequestKind::Cop, "", Logger::get()); + DAGContext dag_context(*dag_request, {}, NullspaceID, "", DAGRequestKind::Cop, "", 0, "", Logger::get()); TiFlashTestEnv::getGlobalContext(test_meta.context_idx).setDAGContext(&dag_context); TiFlashTestEnv::getGlobalContext(test_meta.context_idx).setCopTest(); From 3815537ccc4f4e9cf0620bc2325dc78d647bad1d Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Sun, 12 Nov 2023 12:53:08 +0800 Subject: [PATCH 05/10] ok --- dbms/src/Flash/Coprocessor/DAGContext.h | 3 +++ dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp | 6 ++++++ dbms/src/Flash/Coprocessor/RemoteRequest.cpp | 4 +++- dbms/src/Flash/Coprocessor/RemoteRequest.h | 11 ++++++++++- 4 files changed, 22 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 55e85923d42..0e95968fb68 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -342,6 +342,9 @@ class DAGContext void setAutoSpillMode() { in_auto_spill_mode = true; } bool isInAutoSpillMode() const { return in_auto_spill_mode; } + UInt64 getConnectionID() const { return connection_id; } + const String & getConnectionAlias() const { return connection_alias; } + public: DAGRequest dag_request; /// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast, diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 34d69970fb3..aa9c38092fa 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -721,6 +721,8 @@ std::vector DAGStorageInterpreter::buildCopTasks( req, store_type, dagContext().getKeyspaceID(), + remote_request.connection_id, + remote_request.connection_alias, &Poco::Logger::get("pingcap/coprocessor"), std::move(meta_data), [&] { GET_METRIC(tiflash_coprocessor_request_count, type_remote_read_sent).Increment(); }); @@ -1552,6 +1554,8 @@ std::vector DAGStorageInterpreter::buildRemoteRequests(const DM:: retry_regions_map[region_id_to_table_id_map[r.get().region_id]].emplace_back(r); } + UInt64 connection_id = dagContext().getConnectionID(); + const String & connection_alias = dagContext().getConnectionAlias(); for (const auto physical_table_id : table_scan.getPhysicalTableIDs()) { const auto & retry_regions = retry_regions_map[physical_table_id]; @@ -1569,6 +1573,8 @@ std::vector DAGStorageInterpreter::buildRemoteRequests(const DM:: table_scan, storages_with_structure_lock[physical_table_id].storage->getTableInfo(), filter_conditions, + connection_id, + connection_alias, log)); } LOG_DEBUG(log, "remote request size: {}", remote_requests.size()); diff --git a/dbms/src/Flash/Coprocessor/RemoteRequest.cpp b/dbms/src/Flash/Coprocessor/RemoteRequest.cpp index b885b2f8550..a2cd6f4a696 100644 --- a/dbms/src/Flash/Coprocessor/RemoteRequest.cpp +++ b/dbms/src/Flash/Coprocessor/RemoteRequest.cpp @@ -27,6 +27,8 @@ RemoteRequest RemoteRequest::build( const TiDBTableScan & table_scan, const TiDB::TableInfo & table_info, const FilterConditions & filter_conditions, + UInt64 connection_id, + const String & connection_alias, const LoggerPtr & log) { LOG_INFO(log, "{}", printRetryRegions(retry_regions, table_info.id)); @@ -86,7 +88,7 @@ RemoteRequest RemoteRequest::build( dag_req.set_time_zone_offset(original_dag_req.time_zone_offset()); std::vector key_ranges = buildKeyRanges(retry_regions); - return {std::move(dag_req), std::move(schema), std::move(key_ranges)}; + return {std::move(dag_req), std::move(schema), std::move(key_ranges), connection_id, connection_alias}; } std::vector RemoteRequest::buildKeyRanges(const RegionRetryList & retry_regions) diff --git a/dbms/src/Flash/Coprocessor/RemoteRequest.h b/dbms/src/Flash/Coprocessor/RemoteRequest.h index 96a86a5ed3b..b107d7890a0 100644 --- a/dbms/src/Flash/Coprocessor/RemoteRequest.h +++ b/dbms/src/Flash/Coprocessor/RemoteRequest.h @@ -40,10 +40,14 @@ struct RemoteRequest RemoteRequest( tipb::DAGRequest && dag_request_, DAGSchema && schema_, - std::vector && key_ranges_) + std::vector && key_ranges_, + UInt64 connection_id_, + const String & connection_alias_) : dag_request(std::move(dag_request_)) , schema(std::move(schema_)) , key_ranges(std::move(key_ranges_)) + , connection_id(connection_id_) + , connection_alias(connection_alias_) {} static RemoteRequest build( @@ -52,6 +56,8 @@ struct RemoteRequest const TiDBTableScan & table_scan, const TiDB::TableInfo & table_info, const FilterConditions & filter_conditions, + UInt64 connection_id, + const String & connection_alias, const LoggerPtr & log); static std::vector buildKeyRanges(const RegionRetryList & retry_regions); static std::string printRetryRegions(const RegionRetryList & retry_regions, TableID table_id); @@ -60,5 +66,8 @@ struct RemoteRequest DAGSchema schema; /// the sorted key ranges std::vector key_ranges; + + UInt64 connection_id; + String connection_alias; }; } // namespace DB From 0608b7327154ea5741168e7da60e8af5d25231e9 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 13 Nov 2023 12:30:25 +0800 Subject: [PATCH 06/10] address comments --- dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index aa9c38092fa..84e9de1341d 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -714,7 +714,7 @@ std::vector DAGStorageInterpreter::buildCopTasks( std::multimap meta_data; meta_data.emplace("is_remote_read", "true"); - auto tasks = pingcap::coprocessor::buildCopTasks( // Here + auto tasks = pingcap::coprocessor::buildCopTasks( bo, cluster, remote_request.key_ranges, @@ -736,7 +736,7 @@ std::vector DAGStorageInterpreter::buildCopTasks( CoprocessorReaderPtr DAGStorageInterpreter::buildCoprocessorReader(const std::vector & remote_requests) { std::vector all_tasks - = buildCopTasks(remote_requests); // put the id and alias into CopTasks + = buildCopTasks(remote_requests); const DAGSchema & schema = remote_requests[0].schema; pingcap::kv::Cluster * cluster = tmt.getKVCluster(); bool has_enforce_encode_type From 36fa981714222d2aa21d5e185e370dc88f72efce Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 13 Nov 2023 16:22:27 +0800 Subject: [PATCH 07/10] tweaking --- dbms/src/Flash/Coprocessor/DAGContext.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 0e95968fb68..230bf9b399d 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -453,9 +453,9 @@ class DAGContext // - Pipeline: execute with pipeline model ExecutionMode execution_mode = ExecutionMode::None; - // It's the session id between client and tidb + // It's the session id between mysql client and tidb UInt64 connection_id; - // It's the session alias between client and tidb + // It's the session alias between mysql client and tidb String connection_alias; }; From 18ec90920751292145510d9c7345df7ccf1e9ac9 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Thu, 16 Nov 2023 09:35:42 +0800 Subject: [PATCH 08/10] tweaking --- contrib/client-c | 2 +- contrib/kvproto | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/contrib/client-c b/contrib/client-c index 2436ed2e493..6e76c700d58 160000 --- a/contrib/client-c +++ b/contrib/client-c @@ -1 +1 @@ -Subproject commit 2436ed2e493ccc51de2caa935211d4a1e163de38 +Subproject commit 6e76c700d5837ba021086852b85222d8669ddc5e diff --git a/contrib/kvproto b/contrib/kvproto index ddcf2223e2a..4a609e987ff 160000 --- a/contrib/kvproto +++ b/contrib/kvproto @@ -1 +1 @@ -Subproject commit ddcf2223e2aa9315df2929a0eb5f1f8214c2b580 +Subproject commit 4a609e987ff49e39a73592e0b900c3656366b49b From a95f3e6cc4614d7487c4514b899f86aac3947570 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Thu, 16 Nov 2023 11:45:52 +0800 Subject: [PATCH 09/10] change client-c and kvproto --- contrib/client-c | 2 +- contrib/kvproto | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/contrib/client-c b/contrib/client-c index 6e76c700d58..efc6ca21ddf 160000 --- a/contrib/client-c +++ b/contrib/client-c @@ -1 +1 @@ -Subproject commit 6e76c700d5837ba021086852b85222d8669ddc5e +Subproject commit efc6ca21ddfa5013ae6290aecd024510cd278da9 diff --git a/contrib/kvproto b/contrib/kvproto index 4a609e987ff..ecf635d1a67 160000 --- a/contrib/kvproto +++ b/contrib/kvproto @@ -1 +1 @@ -Subproject commit 4a609e987ff49e39a73592e0b900c3656366b49b +Subproject commit ecf635d1a67b57ace980deca2d9fa838d05e63d8 From b7366b55ee95d51f011052da29faa3751f8470fb Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Thu, 16 Nov 2023 13:01:58 +0800 Subject: [PATCH 10/10] format --- dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 84e9de1341d..c4bc532eaba 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -735,8 +735,7 @@ std::vector DAGStorageInterpreter::buildCopTasks( CoprocessorReaderPtr DAGStorageInterpreter::buildCoprocessorReader(const std::vector & remote_requests) { - std::vector all_tasks - = buildCopTasks(remote_requests); + std::vector all_tasks = buildCopTasks(remote_requests); const DAGSchema & schema = remote_requests[0].schema; pingcap::kv::Cluster * cluster = tmt.getKVCluster(); bool has_enforce_encode_type