Skip to content

Commit

Permalink
Don't log too many useless information in log (#8400)
Browse files Browse the repository at this point in the history
close #8399
  • Loading branch information
windtalker authored Nov 23, 2023
1 parent f246f35 commit d1949c6
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 60 deletions.
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ void DAGQueryBlockInterpreter::handleJoin(
const Settings & settings = context.getSettingsRef();
SpillConfig build_spill_config(
context.getTemporaryPath(),
fmt::format("{}_hash_join_0_build", log->identifier()),
fmt::format("{}_0_build", log->identifier()),
settings.max_cached_data_bytes_in_spiller,
settings.max_spilled_rows_per_file,
settings.max_spilled_bytes_per_file,
Expand All @@ -310,7 +310,7 @@ void DAGQueryBlockInterpreter::handleJoin(
settings.max_block_size);
SpillConfig probe_spill_config(
context.getTemporaryPath(),
fmt::format("{}_hash_join_0_probe", log->identifier()),
fmt::format("{}_0_probe", log->identifier()),
settings.max_cached_data_bytes_in_spiller,
settings.max_spilled_rows_per_file,
settings.max_spilled_bytes_per_file,
Expand Down Expand Up @@ -497,7 +497,7 @@ void DAGQueryBlockInterpreter::executeAggregation(
AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header);
SpillConfig spill_config(
context.getTemporaryPath(),
fmt::format("{}_aggregation", log->identifier()),
log->identifier(),
settings.max_cached_data_bytes_in_spiller,
settings.max_spilled_rows_per_file,
settings.max_spilled_bytes_per_file,
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ void orderStreams(
getAverageThreshold(settings.max_bytes_before_external_sort, pipeline.streams.size()),
SpillConfig(
context.getTemporaryPath(),
fmt::format("{}_sort", log->identifier()),
log->identifier(),
settings.max_cached_data_bytes_in_spiller,
settings.max_spilled_rows_per_file,
settings.max_spilled_bytes_per_file,
Expand Down Expand Up @@ -239,7 +239,7 @@ void orderStreams(
// todo use identifier_executor_id as the spill id
SpillConfig(
context.getTemporaryPath(),
fmt::format("{}_sort", log->identifier()),
log->identifier(),
settings.max_cached_data_bytes_in_spiller,
settings.max_spilled_rows_per_file,
settings.max_spilled_bytes_per_file,
Expand Down Expand Up @@ -295,7 +295,7 @@ void executeLocalSort(
fine_grained_spill_context = std::make_shared<FineGrainedOperatorSpillContext>("sort", log);
SpillConfig spill_config{
context.getTemporaryPath(),
fmt::format("{}_sort", log->identifier()),
log->identifier(),
settings.max_cached_data_bytes_in_spiller,
settings.max_spilled_rows_per_file,
settings.max_spilled_bytes_per_file,
Expand Down Expand Up @@ -353,7 +353,7 @@ void executeFinalSort(

SpillConfig spill_config{
context.getTemporaryPath(),
fmt::format("{}_sort", log->identifier()),
log->identifier(),
settings.max_cached_data_bytes_in_spiller,
settings.max_spilled_rows_per_file,
settings.max_spilled_bytes_per_file,
Expand Down
78 changes: 50 additions & 28 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,16 @@ grpc::Status FlashService::Coprocessor(
request->context().region_id(),
request->context().region_epoch().conf_ver(),
request->context().region_epoch().version());
auto log_level = is_remote_read ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG;
LOG_IMPL(
LOG_INFO(
log,
log_level,
"Handling coprocessor request, is_remote_read: {}, start ts: {}, region info: {}",
"Handling coprocessor request, is_remote_read: {}, start ts: {}, region info: {}, resource_group: {}, conn_id: "
"{}, conn_alias: {}",
is_remote_read,
request->start_ts(),
region_info);
region_info,
request->context().resource_control_context().resource_group_name(),
request->connection_id(),
request->connection_alias());

auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
Expand Down Expand Up @@ -255,6 +257,7 @@ grpc::Status FlashService::Coprocessor(
}
}

auto log_level = is_remote_read ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG;
auto exec_func = [&]() -> grpc::Status {
auto wait_ms = watch.elapsedMilliseconds();
if (wait_ms > 1000)
Expand All @@ -279,14 +282,10 @@ grpc::Status FlashService::Coprocessor(
});
CoprocessorContext cop_context(*db_context, request->context(), *grpc_context);
auto request_identifier = fmt::format(
"Coprocessor, is_remote_read: {}, start_ts: {}, region_info: {}, resource_group: {}, conn_id: {}, "
"conn_alias: {}",
"Coprocessor, is_remote_read: {}, start_ts: {}, region_info: {}",
is_remote_read,
request->start_ts(),
region_info,
request->context().resource_control_context().resource_group_name(),
request->connection_id(),
request->connection_alias());
region_info);
CoprocessorHandler<false> cop_handler(cop_context, request, response, request_identifier);
return cop_handler.execute();
};
Expand Down Expand Up @@ -318,7 +317,13 @@ grpc::Status FlashService::BatchCoprocessor(
grpc::ServerWriter<coprocessor::BatchResponse> * writer)
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
LOG_INFO(log, "Handling batch coprocessor request, start ts: {}", request->start_ts());
LOG_INFO(
log,
"Handling batch coprocessor request, start ts: {}, resource_group: {}, conn_id: {}, conn_alias: {}",
request->start_ts(),
request->context().resource_control_context().resource_group_name(),
request->connection_id(),
request->connection_alias());

auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
Expand All @@ -342,12 +347,7 @@ grpc::Status FlashService::BatchCoprocessor(
return status;
}
CoprocessorContext cop_context(*db_context, request->context(), *grpc_context);
auto request_identifier = fmt::format(
"BatchCoprocessor, start_ts: {}, resource_group: {}, conn_id: {}, conn_alias: {}",
request->start_ts(),
request->context().resource_control_context().resource_group_name(),
request->connection_id(),
request->connection_alias());
auto request_identifier = fmt::format("BatchCoprocessor, start_ts: {}", request->start_ts());
BatchCoprocessorHandler cop_handler(cop_context, request, writer, request_identifier);
return cop_handler.execute();
});
Expand All @@ -372,14 +372,16 @@ grpc::Status FlashService::CoprocessorStream(
request->context().region_id(),
request->context().region_epoch().conf_ver(),
request->context().region_epoch().version());
auto log_level = is_remote_read ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG;
LOG_IMPL(
LOG_INFO(
log,
log_level,
"Handling coprocessor stream request, is_remote_read: {}, start ts: {}, region info: {}",
"Handling coprocessor stream request, is_remote_read: {}, start ts: {}, region info: {}, resource_group: {}, "
"conn_id: {}, conn_alias: {}",
is_remote_read,
request->start_ts(),
region_info);
region_info,
request->context().resource_control_context().resource_group_name(),
request->connection_id(),
request->connection_alias());

auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
Expand Down Expand Up @@ -425,6 +427,7 @@ grpc::Status FlashService::CoprocessorStream(
}
}

auto log_level = is_remote_read ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG;
auto exec_func = [&]() -> grpc::Status {
auto wait_ms = watch.elapsedMilliseconds();
if (wait_ms > 1000)
Expand All @@ -449,11 +452,10 @@ grpc::Status FlashService::CoprocessorStream(
});
CoprocessorContext cop_context(*db_context, request->context(), *grpc_context);
auto request_identifier = fmt::format(
"Coprocessor(stream), is_remote_read: {}, start_ts: {}, region_info: {}, resource_group: {}",
"Coprocessor(stream), is_remote_read: {}, start_ts: {}, region_info: {}",
is_remote_read,
request->start_ts(),
region_info,
request->context().resource_control_context().resource_group_name());
region_info);
CoprocessorHandler<true> cop_handler(cop_context, request, writer, request_identifier);
return cop_handler.execute();
};
Expand Down Expand Up @@ -488,7 +490,14 @@ grpc::Status FlashService::DispatchMPPTask(
mpp::DispatchTaskResponse * response)
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
LOG_INFO(log, "Handling mpp dispatch request, task meta: {}", request->meta().DebugString());
const auto & task_meta = request->meta();
LOG_INFO(
log,
"Handling mpp dispatch request, task: {}, resource_group: {}, conn_id: {}, conn_alias: {}",
MPPTaskId(task_meta).toString(),
task_meta.resource_group_name(),
task_meta.connection_id(),
task_meta.connection_alias());
auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
return check_result;
Expand Down Expand Up @@ -615,7 +624,20 @@ grpc::Status FlashService::EstablishMPPConnection(
CPUAffinityManager::getInstance().bindSelfGrpcThread();
// Establish a pipe for data transferring. The pipes have registered by the task in advance.
// We need to find it out and bind the grpc stream with it.
LOG_INFO(log, "Handling establish mpp connection request: {}", request->DebugString());
const auto & receiver_meta = request->receiver_meta();
const auto & sender_meta = request->sender_meta();
assert(receiver_meta.resource_group_name() == sender_meta.resource_group_name());
assert(receiver_meta.connection_id() == sender_meta.connection_id());
assert(receiver_meta.connection_alias() == receiver_meta.connection_alias());
LOG_INFO(
log,
"Handling establish mpp connection request, receiver: {}, sender: {}, resource_group: {}, conn_id: {}, "
"conn_alias: {}",
MPPTaskId(receiver_meta).toString(),
MPPTaskId(sender_meta).toString(),
receiver_meta.resource_group_name(),
receiver_meta.connection_id(),
receiver_meta.connection_alias());

auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Flash/Mpp/MPPTaskId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,13 @@ size_t MPPGatherIdHash::operator()(MPPGatherId const & mpp_gather_id) const noex
String MPPTaskId::toString() const
{
return isUnknown() ? "MPP<gather_id:N/A,task_id:N/A>"
: fmt::format("MPP<gather_id:{},task_id:{}>", gather_id.toString(), task_id);
: fmt::format("MPP<{},task_id:{}>", gather_id.toString(), task_id);
}

const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{};

constexpr UInt64 MAX_UINT64 = std::numeric_limits<UInt64>::max();
const MPPQueryId MPPTaskId::Max_Query_Id
= MPPQueryId(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64, "", MAX_UINT64, "");
const MPPQueryId MPPTaskId::Max_Query_Id = MPPQueryId(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64, "", 0, "");

bool operator==(const MPPTaskId & lid, const MPPTaskId & rid)
{
Expand Down
16 changes: 4 additions & 12 deletions dbms/src/Flash/Mpp/MPPTaskId.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,11 @@ struct MPPQueryId
String toString() const
{
return fmt::format(
"<query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}, resource_group: {}, conn_id: {}, conn_alias: "
"{}>",
"<query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}>",
query_ts,
local_query_id,
server_id,
start_ts,
resource_group_name,
connection_id,
connection_alias);
start_ts);
}
};

Expand Down Expand Up @@ -113,16 +109,12 @@ struct MPPGatherId
String toString() const
{
return fmt::format(
"<gather_id:{}, query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}, resource_group: {}, conn_id: "
"{}, conn_alias: {}>",
"gather_id:{}, query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}",
gather_id,
query_id.query_ts,
query_id.local_query_id,
query_id.server_id,
query_id.start_ts,
query_id.resource_group_name,
query_id.connection_id,
query_id.connection_alias);
query_id.start_ts);
}
bool hasMeaningfulGatherId() const { return gather_id > 0; }
bool operator==(const MPPGatherId & rid) const;
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ std::pair<MPPTunnelPtr, String> MPPTaskManager::findAsyncTunnel(
if (!error_msg.empty())
{
/// if the gather is aborted, return the error message
LOG_WARNING(log, "{}: Gather {} is aborted, all its tasks are invalid.", req_info, id.gather_id.toString());
LOG_WARNING(log, "{}: Gather <{}> is aborted, all its tasks are invalid.", req_info, id.gather_id.toString());
/// meet error
return {nullptr, error_msg};
}
Expand Down Expand Up @@ -229,7 +229,11 @@ std::pair<MPPTunnelPtr, String> MPPTaskManager::findTunnelWithTimeout(
if (!error_msg.empty())
{
/// if the gather is aborted, return true to stop waiting timeout.
LOG_WARNING(log, "{}: Gather {} is aborted, all its tasks are invalid.", req_info, id.gather_id.toString());
LOG_WARNING(
log,
"{}: Gather <{}> is aborted, all its tasks are invalid.",
req_info,
id.gather_id.toString());
cancelled = true;
error_message = error_msg;
return true;
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Planner/PhysicalPlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ PhysicalPlanNode::PhysicalPlanNode(
const PlanType & type_,
const NamesAndTypes & schema_,
const FineGrainedShuffle & fine_grained_shuffle_,
const String & req_id)
const String & req_id_)
: executor_id(executor_id_)
, req_id(req_id_)
, type(type_)
, schema(schema_)
, fine_grained_shuffle(fine_grained_shuffle_)
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Planner/PhysicalPlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class PhysicalPlanNode : public std::enable_shared_from_this<PhysicalPlanNode>
void recordProfileStreams(DAGPipeline & pipeline, const Context & context);

String executor_id;
String req_id;
PlanType type;
NamesAndTypes schema;

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont
AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header);
SpillConfig spill_config(
context.getTemporaryPath(),
fmt::format("{}_aggregation", log->identifier()),
log->identifier(),
context.getSettingsRef().max_cached_data_bytes_in_spiller,
context.getSettingsRef().max_spilled_rows_per_file,
context.getSettingsRef().max_spilled_bytes_per_file,
Expand Down Expand Up @@ -216,7 +216,7 @@ void PhysicalAggregation::buildPipelineExecGroupImpl(
AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header);
SpillConfig spill_config(
context.getTemporaryPath(),
fmt::format("{}_aggregation", log->identifier()),
log->identifier(),
context.getSettingsRef().max_cached_data_bytes_in_spiller,
context.getSettingsRef().max_spilled_rows_per_file,
context.getSettingsRef().max_spilled_bytes_per_file,
Expand Down Expand Up @@ -264,7 +264,7 @@ void PhysicalAggregation::buildPipeline(
auto agg_build = std::make_shared<PhysicalAggregationBuild>(
executor_id,
schema,
log->identifier(),
req_id,
child,
before_agg_actions,
aggregation_keys,
Expand All @@ -281,7 +281,7 @@ void PhysicalAggregation::buildPipeline(
auto agg_convergent = std::make_shared<PhysicalAggregationConvergent>(
executor_id,
schema,
log->identifier(),
req_id,
aggregate_context,
expr_after_agg);
builder.addPlanNode(agg_convergent);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void PhysicalAggregationBuild::buildPipelineExecGroupImpl(
AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header);
SpillConfig spill_config(
context.getTemporaryPath(),
fmt::format("{}_aggregation", log->identifier()),
log->identifier(),
context.getSettingsRef().max_cached_data_bytes_in_spiller,
context.getSettingsRef().max_spilled_rows_per_file,
context.getSettingsRef().max_spilled_bytes_per_file,
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ void PhysicalJoin::buildPipeline(PipelineBuilder & builder, Context & context, P
executor_id,
build()->getSchema(),
fine_grained_shuffle,
log->identifier(),
req_id,
build(),
join_ptr,
build_side_prepare_actions);
Expand All @@ -317,7 +317,7 @@ void PhysicalJoin::buildPipeline(PipelineBuilder & builder, Context & context, P
auto join_probe = std::make_shared<PhysicalJoinProbe>(
executor_id,
schema,
log->identifier(),
req_id,
probe(),
join_ptr,
probe_side_prepare_actions);
Expand Down

0 comments on commit d1949c6

Please sign in to comment.