diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 7da090989f3..67947e74ca4 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -301,7 +301,7 @@ void DAGQueryBlockInterpreter::handleJoin( const Settings & settings = context.getSettingsRef(); SpillConfig build_spill_config( context.getTemporaryPath(), - fmt::format("{}_hash_join_0_build", log->identifier()), + fmt::format("{}_0_build", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, @@ -310,7 +310,7 @@ void DAGQueryBlockInterpreter::handleJoin( settings.max_block_size); SpillConfig probe_spill_config( context.getTemporaryPath(), - fmt::format("{}_hash_join_0_probe", log->identifier()), + fmt::format("{}_0_probe", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, @@ -497,7 +497,7 @@ void DAGQueryBlockInterpreter::executeAggregation( AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header); SpillConfig spill_config( context.getTemporaryPath(), - fmt::format("{}_aggregation", log->identifier()), + log->identifier(), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index 4d1e273b9fa..646e388a4e8 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -207,7 +207,7 @@ void orderStreams( getAverageThreshold(settings.max_bytes_before_external_sort, pipeline.streams.size()), SpillConfig( context.getTemporaryPath(), - fmt::format("{}_sort", log->identifier()), + log->identifier(), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, @@ -239,7 +239,7 @@ void orderStreams( // todo use identifier_executor_id as the spill id SpillConfig( context.getTemporaryPath(), - fmt::format("{}_sort", log->identifier()), + log->identifier(), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, @@ -295,7 +295,7 @@ void executeLocalSort( fine_grained_spill_context = std::make_shared("sort", log); SpillConfig spill_config{ context.getTemporaryPath(), - fmt::format("{}_sort", log->identifier()), + log->identifier(), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, @@ -353,7 +353,7 @@ void executeFinalSort( SpillConfig spill_config{ context.getTemporaryPath(), - fmt::format("{}_sort", log->identifier()), + log->identifier(), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 2950a3f1432..23ddd334c33 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -206,14 +206,16 @@ grpc::Status FlashService::Coprocessor( request->context().region_id(), request->context().region_epoch().conf_ver(), request->context().region_epoch().version()); - auto log_level = is_remote_read ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG; - LOG_IMPL( + LOG_INFO( log, - log_level, - "Handling coprocessor request, is_remote_read: {}, start ts: {}, region info: {}", + "Handling coprocessor request, is_remote_read: {}, start ts: {}, region info: {}, resource_group: {}, conn_id: " + "{}, conn_alias: {}", is_remote_read, request->start_ts(), - region_info); + region_info, + request->context().resource_control_context().resource_group_name(), + request->connection_id(), + request->connection_alias()); auto check_result = checkGrpcContext(grpc_context); if (!check_result.ok()) @@ -255,6 +257,7 @@ grpc::Status FlashService::Coprocessor( } } + auto log_level = is_remote_read ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG; auto exec_func = [&]() -> grpc::Status { auto wait_ms = watch.elapsedMilliseconds(); if (wait_ms > 1000) @@ -279,14 +282,10 @@ grpc::Status FlashService::Coprocessor( }); CoprocessorContext cop_context(*db_context, request->context(), *grpc_context); auto request_identifier = fmt::format( - "Coprocessor, is_remote_read: {}, start_ts: {}, region_info: {}, resource_group: {}, conn_id: {}, " - "conn_alias: {}", + "Coprocessor, is_remote_read: {}, start_ts: {}, region_info: {}", is_remote_read, request->start_ts(), - region_info, - request->context().resource_control_context().resource_group_name(), - request->connection_id(), - request->connection_alias()); + region_info); CoprocessorHandler cop_handler(cop_context, request, response, request_identifier); return cop_handler.execute(); }; @@ -318,7 +317,13 @@ grpc::Status FlashService::BatchCoprocessor( grpc::ServerWriter * writer) { CPUAffinityManager::getInstance().bindSelfGrpcThread(); - LOG_INFO(log, "Handling batch coprocessor request, start ts: {}", request->start_ts()); + LOG_INFO( + log, + "Handling batch coprocessor request, start ts: {}, resource_group: {}, conn_id: {}, conn_alias: {}", + request->start_ts(), + request->context().resource_control_context().resource_group_name(), + request->connection_id(), + request->connection_alias()); auto check_result = checkGrpcContext(grpc_context); if (!check_result.ok()) @@ -342,12 +347,7 @@ grpc::Status FlashService::BatchCoprocessor( return status; } CoprocessorContext cop_context(*db_context, request->context(), *grpc_context); - auto request_identifier = fmt::format( - "BatchCoprocessor, start_ts: {}, resource_group: {}, conn_id: {}, conn_alias: {}", - request->start_ts(), - request->context().resource_control_context().resource_group_name(), - request->connection_id(), - request->connection_alias()); + auto request_identifier = fmt::format("BatchCoprocessor, start_ts: {}", request->start_ts()); BatchCoprocessorHandler cop_handler(cop_context, request, writer, request_identifier); return cop_handler.execute(); }); @@ -372,14 +372,16 @@ grpc::Status FlashService::CoprocessorStream( request->context().region_id(), request->context().region_epoch().conf_ver(), request->context().region_epoch().version()); - auto log_level = is_remote_read ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG; - LOG_IMPL( + LOG_INFO( log, - log_level, - "Handling coprocessor stream request, is_remote_read: {}, start ts: {}, region info: {}", + "Handling coprocessor stream request, is_remote_read: {}, start ts: {}, region info: {}, resource_group: {}, " + "conn_id: {}, conn_alias: {}", is_remote_read, request->start_ts(), - region_info); + region_info, + request->context().resource_control_context().resource_group_name(), + request->connection_id(), + request->connection_alias()); auto check_result = checkGrpcContext(grpc_context); if (!check_result.ok()) @@ -425,6 +427,7 @@ grpc::Status FlashService::CoprocessorStream( } } + auto log_level = is_remote_read ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG; auto exec_func = [&]() -> grpc::Status { auto wait_ms = watch.elapsedMilliseconds(); if (wait_ms > 1000) @@ -449,11 +452,10 @@ grpc::Status FlashService::CoprocessorStream( }); CoprocessorContext cop_context(*db_context, request->context(), *grpc_context); auto request_identifier = fmt::format( - "Coprocessor(stream), is_remote_read: {}, start_ts: {}, region_info: {}, resource_group: {}", + "Coprocessor(stream), is_remote_read: {}, start_ts: {}, region_info: {}", is_remote_read, request->start_ts(), - region_info, - request->context().resource_control_context().resource_group_name()); + region_info); CoprocessorHandler cop_handler(cop_context, request, writer, request_identifier); return cop_handler.execute(); }; @@ -488,7 +490,14 @@ grpc::Status FlashService::DispatchMPPTask( mpp::DispatchTaskResponse * response) { CPUAffinityManager::getInstance().bindSelfGrpcThread(); - LOG_INFO(log, "Handling mpp dispatch request, task meta: {}", request->meta().DebugString()); + const auto & task_meta = request->meta(); + LOG_INFO( + log, + "Handling mpp dispatch request, task: {}, resource_group: {}, conn_id: {}, conn_alias: {}", + MPPTaskId(task_meta).toString(), + task_meta.resource_group_name(), + task_meta.connection_id(), + task_meta.connection_alias()); auto check_result = checkGrpcContext(grpc_context); if (!check_result.ok()) return check_result; @@ -615,7 +624,20 @@ grpc::Status FlashService::EstablishMPPConnection( CPUAffinityManager::getInstance().bindSelfGrpcThread(); // Establish a pipe for data transferring. The pipes have registered by the task in advance. // We need to find it out and bind the grpc stream with it. - LOG_INFO(log, "Handling establish mpp connection request: {}", request->DebugString()); + const auto & receiver_meta = request->receiver_meta(); + const auto & sender_meta = request->sender_meta(); + assert(receiver_meta.resource_group_name() == sender_meta.resource_group_name()); + assert(receiver_meta.connection_id() == sender_meta.connection_id()); + assert(receiver_meta.connection_alias() == receiver_meta.connection_alias()); + LOG_INFO( + log, + "Handling establish mpp connection request, receiver: {}, sender: {}, resource_group: {}, conn_id: {}, " + "conn_alias: {}", + MPPTaskId(receiver_meta).toString(), + MPPTaskId(sender_meta).toString(), + receiver_meta.resource_group_name(), + receiver_meta.connection_id(), + receiver_meta.connection_alias()); auto check_result = checkGrpcContext(grpc_context); if (!check_result.ok()) diff --git a/dbms/src/Flash/Mpp/MPPTaskId.cpp b/dbms/src/Flash/Mpp/MPPTaskId.cpp index 3940675ed5b..168ab7ac871 100644 --- a/dbms/src/Flash/Mpp/MPPTaskId.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskId.cpp @@ -101,14 +101,13 @@ size_t MPPGatherIdHash::operator()(MPPGatherId const & mpp_gather_id) const noex String MPPTaskId::toString() const { return isUnknown() ? "MPP" - : fmt::format("MPP", gather_id.toString(), task_id); + : fmt::format("MPP<{},task_id:{}>", gather_id.toString(), task_id); } const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{}; constexpr UInt64 MAX_UINT64 = std::numeric_limits::max(); -const MPPQueryId MPPTaskId::Max_Query_Id - = MPPQueryId(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64, "", MAX_UINT64, ""); +const MPPQueryId MPPTaskId::Max_Query_Id = MPPQueryId(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64, "", 0, ""); bool operator==(const MPPTaskId & lid, const MPPTaskId & rid) { diff --git a/dbms/src/Flash/Mpp/MPPTaskId.h b/dbms/src/Flash/Mpp/MPPTaskId.h index 783f5fd7b69..d9e0946424f 100644 --- a/dbms/src/Flash/Mpp/MPPTaskId.h +++ b/dbms/src/Flash/Mpp/MPPTaskId.h @@ -64,15 +64,11 @@ struct MPPQueryId String toString() const { return fmt::format( - "", + "", query_ts, local_query_id, server_id, - start_ts, - resource_group_name, - connection_id, - connection_alias); + start_ts); } }; @@ -113,16 +109,12 @@ struct MPPGatherId String toString() const { return fmt::format( - "", + "gather_id:{}, query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}", gather_id, query_id.query_ts, query_id.local_query_id, query_id.server_id, - query_id.start_ts, - query_id.resource_group_name, - query_id.connection_id, - query_id.connection_alias); + query_id.start_ts); } bool hasMeaningfulGatherId() const { return gather_id > 0; } bool operator==(const MPPGatherId & rid) const; diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index bb42cd3871d..cf5a3ba0ee8 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -147,7 +147,7 @@ std::pair MPPTaskManager::findAsyncTunnel( if (!error_msg.empty()) { /// if the gather is aborted, return the error message - LOG_WARNING(log, "{}: Gather {} is aborted, all its tasks are invalid.", req_info, id.gather_id.toString()); + LOG_WARNING(log, "{}: Gather <{}> is aborted, all its tasks are invalid.", req_info, id.gather_id.toString()); /// meet error return {nullptr, error_msg}; } @@ -229,7 +229,11 @@ std::pair MPPTaskManager::findTunnelWithTimeout( if (!error_msg.empty()) { /// if the gather is aborted, return true to stop waiting timeout. - LOG_WARNING(log, "{}: Gather {} is aborted, all its tasks are invalid.", req_info, id.gather_id.toString()); + LOG_WARNING( + log, + "{}: Gather <{}> is aborted, all its tasks are invalid.", + req_info, + id.gather_id.toString()); cancelled = true; error_message = error_msg; return true; diff --git a/dbms/src/Flash/Planner/PhysicalPlanNode.cpp b/dbms/src/Flash/Planner/PhysicalPlanNode.cpp index 4ba718337a3..8b8849530bc 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanNode.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlanNode.cpp @@ -30,8 +30,9 @@ PhysicalPlanNode::PhysicalPlanNode( const PlanType & type_, const NamesAndTypes & schema_, const FineGrainedShuffle & fine_grained_shuffle_, - const String & req_id) + const String & req_id_) : executor_id(executor_id_) + , req_id(req_id_) , type(type_) , schema(schema_) , fine_grained_shuffle(fine_grained_shuffle_) diff --git a/dbms/src/Flash/Planner/PhysicalPlanNode.h b/dbms/src/Flash/Planner/PhysicalPlanNode.h index 7bc5134923c..cfc96724e85 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanNode.h +++ b/dbms/src/Flash/Planner/PhysicalPlanNode.h @@ -116,6 +116,7 @@ class PhysicalPlanNode : public std::enable_shared_from_this void recordProfileStreams(DAGPipeline & pipeline, const Context & context); String executor_id; + String req_id; PlanType type; NamesAndTypes schema; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp b/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp index 25f3e3892c3..3cf5b5fc3d7 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp @@ -100,7 +100,7 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header); SpillConfig spill_config( context.getTemporaryPath(), - fmt::format("{}_aggregation", log->identifier()), + log->identifier(), context.getSettingsRef().max_cached_data_bytes_in_spiller, context.getSettingsRef().max_spilled_rows_per_file, context.getSettingsRef().max_spilled_bytes_per_file, @@ -216,7 +216,7 @@ void PhysicalAggregation::buildPipelineExecGroupImpl( AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header); SpillConfig spill_config( context.getTemporaryPath(), - fmt::format("{}_aggregation", log->identifier()), + log->identifier(), context.getSettingsRef().max_cached_data_bytes_in_spiller, context.getSettingsRef().max_spilled_rows_per_file, context.getSettingsRef().max_spilled_bytes_per_file, @@ -264,7 +264,7 @@ void PhysicalAggregation::buildPipeline( auto agg_build = std::make_shared( executor_id, schema, - log->identifier(), + req_id, child, before_agg_actions, aggregation_keys, @@ -281,7 +281,7 @@ void PhysicalAggregation::buildPipeline( auto agg_convergent = std::make_shared( executor_id, schema, - log->identifier(), + req_id, aggregate_context, expr_after_agg); builder.addPlanNode(agg_convergent); diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp b/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp index 43b6890858e..2c2d521250e 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp @@ -40,7 +40,7 @@ void PhysicalAggregationBuild::buildPipelineExecGroupImpl( AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header); SpillConfig spill_config( context.getTemporaryPath(), - fmt::format("{}_aggregation", log->identifier()), + log->identifier(), context.getSettingsRef().max_cached_data_bytes_in_spiller, context.getSettingsRef().max_spilled_rows_per_file, context.getSettingsRef().max_spilled_bytes_per_file, diff --git a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp index f07ddc499ff..be2c1bc3e2c 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp @@ -303,7 +303,7 @@ void PhysicalJoin::buildPipeline(PipelineBuilder & builder, Context & context, P executor_id, build()->getSchema(), fine_grained_shuffle, - log->identifier(), + req_id, build(), join_ptr, build_side_prepare_actions); @@ -317,7 +317,7 @@ void PhysicalJoin::buildPipeline(PipelineBuilder & builder, Context & context, P auto join_probe = std::make_shared( executor_id, schema, - log->identifier(), + req_id, probe(), join_ptr, probe_side_prepare_actions);