Skip to content

Commit

Permalink
Add connection_id and connection_alias in logs (#8348)
Browse files Browse the repository at this point in the history
close #8345
  • Loading branch information
xzhangxian1008 authored Nov 17, 2023
1 parent 4e2f86f commit 226b613
Show file tree
Hide file tree
Showing 22 changed files with 190 additions and 62 deletions.
2 changes: 1 addition & 1 deletion contrib/client-c
6 changes: 4 additions & 2 deletions dbms/src/Debug/dbgQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ tipb::SelectResponse executeDAGRequest(
region_id,
RegionInfo(region_id, region_version, region_conf_version, std::move(key_ranges), nullptr));

DAGContext dag_context(dag_request, std::move(tables_regions_info), NullspaceID, "", DAGRequestKind::Cop, "", log);
DAGContext
dag_context(dag_request, std::move(tables_regions_info), NullspaceID, "", DAGRequestKind::Cop, "", 0, "", log);
context.setDAGContext(&dag_context);

DAGDriver<DAGRequestKind::Cop> driver(context, start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, &dag_response, true);
Expand Down Expand Up @@ -433,7 +434,8 @@ bool runAndCompareDagReq(
region_id,
RegionInfo(region_id, region->version(), region->confVer(), std::move(key_ranges), nullptr));

DAGContext dag_context(dag_request, std::move(tables_regions_info), NullspaceID, "", DAGRequestKind::Cop, "", log);
DAGContext
dag_context(dag_request, std::move(tables_regions_info), NullspaceID, "", DAGRequestKind::Cop, "", 0, "", log);
context.setDAGContext(&dag_context);
DAGDriver<DAGRequestKind::Cop>
driver(context, properties.start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, &dag_response, true);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/BatchCoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ grpc::Status BatchCoprocessorHandler::execute()
cop_context.db_context.getClientInfo().current_address.toString(),
DAGRequestKind::BatchCop,
resource_group_name,
cop_request->connection_id(),
cop_request->connection_alias(),
Logger::get(log->identifier()));
cop_context.db_context.setDAGContext(&dag_context);

Expand Down
13 changes: 12 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ DAGContext::DAGContext(
const String & tidb_host_,
DAGRequestKind kind_,
const String & resource_group_name_,
UInt64 connection_id_,
const String & connection_alias_,
LoggerPtr log_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->ShortDebugString())
Expand All @@ -71,6 +73,8 @@ DAGContext::DAGContext(
, warning_count(0)
, keyspace_id(keyspace_id_)
, resource_group_name(resource_group_name_)
, connection_id(connection_id_)
, connection_alias(connection_alias_)
{
RUNTIME_ASSERT(kind != DAGRequestKind::MPP, log, "DAGContext non-mpp constructor get a mpp kind");
initOutputInfo();
Expand All @@ -95,6 +99,8 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const mpp::TaskMeta & me
, warning_count(0)
, keyspace_id(RequestUtils::deriveKeyspaceID(meta_))
, resource_group_name(meta_.resource_group_name())
, connection_id(meta_.connection_id())
, connection_alias(meta_.connection_alias())
{
// only mpp task has join executor.
initExecutorIdToJoinIdMap();
Expand Down Expand Up @@ -127,6 +133,8 @@ DAGContext::DAGContext(
, warnings(max_recorded_error_count)
, warning_count(0)
, keyspace_id(RequestUtils::deriveKeyspaceID(task_meta_))
, connection_id(task_meta_.connection_id())
, connection_alias(task_meta_.connection_alias())
{
initOutputInfo();
}
Expand All @@ -144,6 +152,7 @@ DAGContext::DAGContext(UInt64 max_error_count_)
, max_recorded_error_count(max_error_count_)
, warnings(max_recorded_error_count)
, warning_count(0)
, connection_id(0)
{}

// for tests need to run query tasks.
Expand All @@ -163,8 +172,10 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, String log_identifier, s
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
, connection_id(0)
{
query_operator_spill_contexts = std::make_shared<QueryOperatorSpillContexts>(MPPQueryId(0, 0, 0, 0, ""), 100);
query_operator_spill_contexts
= std::make_shared<QueryOperatorSpillContexts>(MPPQueryId(0, 0, 0, 0, "", 0, ""), 100);
initOutputInfo();
}

Expand Down
10 changes: 10 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ class DAGContext
const String & tidb_host_,
DAGRequestKind cop_kind_,
const String & resource_group_name,
UInt64 connection_id_,
const String & connection_alias_,
LoggerPtr log_);

// for mpp
Expand Down Expand Up @@ -340,6 +342,9 @@ class DAGContext
void setAutoSpillMode() { in_auto_spill_mode = true; }
bool isInAutoSpillMode() const { return in_auto_spill_mode; }

UInt64 getConnectionID() const { return connection_id; }
const String & getConnectionAlias() const { return connection_alias; }

public:
DAGRequest dag_request;
/// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast,
Expand Down Expand Up @@ -447,6 +452,11 @@ class DAGContext
// - Stream: execute with block input stream
// - Pipeline: execute with pipeline model
ExecutionMode execution_mode = ExecutionMode::None;

// It's the session id between mysql client and tidb
UInt64 connection_id;
// It's the session alias between mysql client and tidb
String connection_alias;
};

} // namespace DB
6 changes: 6 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,8 @@ std::vector<pingcap::coprocessor::CopTask> DAGStorageInterpreter::buildCopTasks(
req,
store_type,
dagContext().getKeyspaceID(),
remote_request.connection_id,
remote_request.connection_alias,
&Poco::Logger::get("pingcap/coprocessor"),
std::move(meta_data),
[&] { GET_METRIC(tiflash_coprocessor_request_count, type_remote_read_sent).Increment(); });
Expand Down Expand Up @@ -1551,6 +1553,8 @@ std::vector<RemoteRequest> DAGStorageInterpreter::buildRemoteRequests(const DM::
retry_regions_map[region_id_to_table_id_map[r.get().region_id]].emplace_back(r);
}

UInt64 connection_id = dagContext().getConnectionID();
const String & connection_alias = dagContext().getConnectionAlias();
for (const auto physical_table_id : table_scan.getPhysicalTableIDs())
{
const auto & retry_regions = retry_regions_map[physical_table_id];
Expand All @@ -1568,6 +1572,8 @@ std::vector<RemoteRequest> DAGStorageInterpreter::buildRemoteRequests(const DM::
table_scan,
storages_with_structure_lock[physical_table_id].storage->getTableInfo(),
filter_conditions,
connection_id,
connection_alias,
log));
}
LOG_DEBUG(log, "remote request size: {}", remote_requests.size());
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Coprocessor/RemoteRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ RemoteRequest RemoteRequest::build(
const TiDBTableScan & table_scan,
const TiDB::TableInfo & table_info,
const FilterConditions & filter_conditions,
UInt64 connection_id,
const String & connection_alias,
const LoggerPtr & log)
{
LOG_INFO(log, "{}", printRetryRegions(retry_regions, table_info.id));
Expand Down Expand Up @@ -86,7 +88,7 @@ RemoteRequest RemoteRequest::build(
dag_req.set_time_zone_offset(original_dag_req.time_zone_offset());

std::vector<pingcap::coprocessor::KeyRange> key_ranges = buildKeyRanges(retry_regions);
return {std::move(dag_req), std::move(schema), std::move(key_ranges)};
return {std::move(dag_req), std::move(schema), std::move(key_ranges), connection_id, connection_alias};
}

std::vector<pingcap::coprocessor::KeyRange> RemoteRequest::buildKeyRanges(const RegionRetryList & retry_regions)
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Flash/Coprocessor/RemoteRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ struct RemoteRequest
RemoteRequest(
tipb::DAGRequest && dag_request_,
DAGSchema && schema_,
std::vector<pingcap::coprocessor::KeyRange> && key_ranges_)
std::vector<pingcap::coprocessor::KeyRange> && key_ranges_,
UInt64 connection_id_,
const String & connection_alias_)
: dag_request(std::move(dag_request_))
, schema(std::move(schema_))
, key_ranges(std::move(key_ranges_))
, connection_id(connection_id_)
, connection_alias(connection_alias_)
{}

static RemoteRequest build(
Expand All @@ -52,6 +56,8 @@ struct RemoteRequest
const TiDBTableScan & table_scan,
const TiDB::TableInfo & table_info,
const FilterConditions & filter_conditions,
UInt64 connection_id,
const String & connection_alias,
const LoggerPtr & log);
static std::vector<pingcap::coprocessor::KeyRange> buildKeyRanges(const RegionRetryList & retry_regions);
static std::string printRetryRegions(const RegionRetryList & retry_regions, TableID table_id);
Expand All @@ -60,5 +66,8 @@ struct RemoteRequest
DAGSchema schema;
/// the sorted key ranges
std::vector<pingcap::coprocessor::KeyRange> key_ranges;

UInt64 connection_id;
String connection_alias;
};
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ grpc::Status CoprocessorHandler<is_stream>::execute()
cop_context.db_context.getClientInfo().current_address.toString(),
kind,
resource_group_name,
cop_request->connection_id(),
cop_request->connection_alias(),
Logger::get(log->identifier()));
cop_context.db_context.setDAGContext(&dag_context);

Expand Down
13 changes: 9 additions & 4 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,14 @@ grpc::Status FlashService::Coprocessor(
});
CoprocessorContext cop_context(*db_context, request->context(), *grpc_context);
auto request_identifier = fmt::format(
"Coprocessor, is_remote_read: {}, start_ts: {}, region_info: {}, resource_group: {}",
"Coprocessor, is_remote_read: {}, start_ts: {}, region_info: {}, resource_group: {}, conn_id: {}, "
"conn_alias: {}",
is_remote_read,
request->start_ts(),
region_info,
request->context().resource_control_context().resource_group_name());
request->context().resource_control_context().resource_group_name(),
request->connection_id(),
request->connection_alias());
CoprocessorHandler<false> cop_handler(cop_context, request, response, request_identifier);
return cop_handler.execute();
};
Expand Down Expand Up @@ -340,9 +343,11 @@ grpc::Status FlashService::BatchCoprocessor(
}
CoprocessorContext cop_context(*db_context, request->context(), *grpc_context);
auto request_identifier = fmt::format(
"BatchCoprocessor, start_ts: {}, resource_group: {}",
"BatchCoprocessor, start_ts: {}, resource_group: {}, conn_id: {}, conn_alias: {}",
request->start_ts(),
request->context().resource_control_context().resource_group_name());
request->context().resource_control_context().resource_group_name(),
request->connection_id(),
request->connection_alias());
BatchCoprocessorHandler cop_handler(cop_context, request, writer, request_identifier);
return cop_handler.execute();
});
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Mpp/MPPTaskId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ String MPPTaskId::toString() const
const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{};

constexpr UInt64 MAX_UINT64 = std::numeric_limits<UInt64>::max();
const MPPQueryId MPPTaskId::Max_Query_Id = MPPQueryId(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64, "");
const MPPQueryId MPPTaskId::Max_Query_Id
= MPPQueryId(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64, "", MAX_UINT64, "");

bool operator==(const MPPTaskId & lid, const MPPTaskId & rid)
{
Expand Down
46 changes: 36 additions & 10 deletions dbms/src/Flash/Mpp/MPPTaskId.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,33 @@ struct MPPQueryId
UInt64 server_id;
UInt64 start_ts;
String resource_group_name;
UInt64 connection_id;
String connection_alias;

MPPQueryId(
UInt64 query_ts,
UInt64 local_query_id,
UInt64 server_id,
UInt64 start_ts,
const String & resource_group_name_)
const String & resource_group_name_,
UInt64 connection_id_,
const String & connection_alias_)
: query_ts(query_ts)
, local_query_id(local_query_id)
, server_id(server_id)
, start_ts(start_ts)
, resource_group_name(resource_group_name_)
, connection_id(connection_id_)
, connection_alias(connection_alias_)
{}
explicit MPPQueryId(const mpp::TaskMeta & task_meta)
: query_ts(task_meta.query_ts())
, local_query_id(task_meta.local_query_id())
, server_id(task_meta.server_id())
, start_ts(task_meta.start_ts())
, resource_group_name(task_meta.resource_group_name())
, connection_id(task_meta.connection_id())
, connection_alias(task_meta.connection_alias())
{}
bool operator<(const MPPQueryId & mpp_query_id) const;
bool operator==(const MPPQueryId & rid) const;
Expand All @@ -56,12 +64,15 @@ struct MPPQueryId
String toString() const
{
return fmt::format(
"<query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}, resource_group: {}>",
"<query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}, resource_group: {}, conn_id: {}, conn_alias: "
"{}>",
query_ts,
local_query_id,
server_id,
start_ts,
resource_group_name);
resource_group_name,
connection_id,
connection_alias);
}
};

Expand Down Expand Up @@ -89,9 +100,11 @@ struct MPPGatherId
UInt64 local_query_id,
UInt64 server_id,
UInt64 start_ts,
const String & resource_group_name)
const String & resource_group_name,
UInt64 connection_id,
const String & connection_alias)
: gather_id(gather_id_)
, query_id(query_ts, local_query_id, server_id, start_ts, resource_group_name)
, query_id(query_ts, local_query_id, server_id, start_ts, resource_group_name, connection_id, connection_alias)
{}
explicit MPPGatherId(const mpp::TaskMeta & task_meta)
: gather_id(task_meta.gather_id())
Expand All @@ -100,13 +113,16 @@ struct MPPGatherId
String toString() const
{
return fmt::format(
"<gather_id:{}, query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}, resource_group: {}>",
"<gather_id:{}, query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}, resource_group: {}, conn_id: "
"{}, conn_alias: {}>",
gather_id,
query_id.query_ts,
query_id.local_query_id,
query_id.server_id,
query_id.start_ts,
query_id.resource_group_name);
query_id.resource_group_name,
query_id.connection_id,
query_id.connection_alias);
}
bool hasMeaningfulGatherId() const { return gather_id > 0; }
bool operator==(const MPPGatherId & rid) const;
Expand All @@ -122,7 +138,7 @@ struct MPPTaskId
{
MPPTaskId()
: task_id(unknown_task_id)
, gather_id(0, 0, 0, 0, 0, ""){};
, gather_id(0, 0, 0, 0, 0, "", 0, ""){};

MPPTaskId(
UInt64 start_ts,
Expand All @@ -131,9 +147,19 @@ struct MPPTaskId
Int64 gather_id,
UInt64 query_ts,
UInt64 local_query_id,
const String resource_group_name)
const String resource_group_name,
UInt64 connection_id,
const String & connection_alias)
: task_id(task_id_)
, gather_id(gather_id, query_ts, local_query_id, server_id, start_ts, resource_group_name)
, gather_id(
gather_id,
query_ts,
local_query_id,
server_id,
start_ts,
resource_group_name,
connection_id,
connection_alias)
{}

explicit MPPTaskId(const mpp::TaskMeta & task_meta)
Expand Down
Loading

0 comments on commit 226b613

Please sign in to comment.