Skip to content

Commit

Permalink
processlist schema table support show all fe
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo committed Aug 1, 2024
1 parent 48121da commit b9651bb
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 36 deletions.
3 changes: 3 additions & 0 deletions be/src/exec/schema_scanner/schema_processlist_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ SchemaProcessListScanner::~SchemaProcessListScanner() = default;
Status SchemaProcessListScanner::start(RuntimeState* state) {
TShowProcessListRequest request;
request.__set_show_full_sql(true);
if (state && state->show_all_fe_connection()) {
request.__set_show_all_fe_connection(true);
}

RETURN_IF_ERROR(SchemaHelper::show_process_list(*(_param->common_param->ip),
_param->common_param->port, request,
Expand Down
7 changes: 7 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,13 @@ class RuntimeState {
return 1;
}

bool show_all_fe_connection() {
if (_query_options.__isset.show_all_fe_connection) {
return _query_options.show_all_fe_connection;
}
return false;
}

void set_max_operator_id(int max_operator_id) { _max_operator_id = max_operator_id; }

int max_operator_id() const { return _max_operator_id; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,8 @@ private void initQueryOptions(ConnectContext context) {
this.queryOptions.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
this.queryOptions.setWaitFullBlockScheduleTimes(context.getSessionVariable().getWaitFullBlockScheduleTimes());
this.queryOptions.setMysqlRowBinaryFormat(
context.getCommand() == MysqlCommand.COM_STMT_EXECUTE);
context.getCommand() == MysqlCommand.COM_STMT_EXECUTE);
this.queryOptions.setShowAllFeConnection(context.getSessionVariable().getShowAllFeConnection());
}

public ConnectContext getConnectContext() {
Expand Down
74 changes: 39 additions & 35 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,44 @@ private void handleShowRollup() {
resultSet = new ShowResultSet(showRollupStmt.getMetaData(), rowSets);
}

public static void getRemoteFeProcesslist(boolean includeSelf, boolean showFullSql, List<List<String>> ret) {
try {
TShowProcessListRequest request = new TShowProcessListRequest();
request.setShowFullSql(showFullSql);
List<Pair<String, Integer>> frontends = FrontendsProcNode.getFrontendWithRpcPort(Env.getCurrentEnv(),
includeSelf);
FrontendService.Client client = null;
for (Pair<String, Integer> fe : frontends) {
TNetworkAddress thriftAddress = new TNetworkAddress(fe.key(), fe.value());
try {
client = ClientPool.frontendPool.borrowObject(thriftAddress, 3000);
} catch (Exception e) {
LOG.warn("Failed to get frontend {} client. exception: {}", fe.key(), e);
continue;
}

boolean isReturnToPool = false;
try {
TShowProcessListResult result = client.showProcessList(request);
if (result.process_list != null && result.process_list.size() > 0) {
ret.addAll(result.process_list);
}
isReturnToPool = true;
} catch (Exception e) {
LOG.warn("Failed to request processlist to fe: {} . exception: {}", fe.key(), e);
} finally {
if (isReturnToPool) {
ClientPool.frontendPool.returnObject(thriftAddress, client);
} else {
ClientPool.frontendPool.invalidateObject(thriftAddress, client);
}
}
}
} catch (Throwable t) {
LOG.warn(" fetch process list from other fe failed, ", t);
}
}

// Handle show processlist
private void handleShowProcesslist() {
ShowProcesslistStmt showStmt = (ShowProcesslistStmt) stmt;
Expand All @@ -526,41 +564,7 @@ private void handleShowProcesslist() {
}

if (isShowAllFe) {
try {
TShowProcessListRequest request = new TShowProcessListRequest();
request.setShowFullSql(isShowFullSql);
List<Pair<String, Integer>> frontends = FrontendsProcNode.getFrontendWithRpcPort(Env.getCurrentEnv(),
false);
FrontendService.Client client = null;
for (Pair<String, Integer> fe : frontends) {
TNetworkAddress thriftAddress = new TNetworkAddress(fe.key(), fe.value());
try {
client = ClientPool.frontendPool.borrowObject(thriftAddress, 3000);
} catch (Exception e) {
LOG.warn("Failed to get frontend {} client. exception: {}", fe.key(), e);
continue;
}

boolean isReturnToPool = false;
try {
TShowProcessListResult result = client.showProcessList(request);
if (result.process_list != null && result.process_list.size() > 0) {
rowSet.addAll(result.process_list);
}
isReturnToPool = true;
} catch (Exception e) {
LOG.warn("Failed to request processlist to fe: {} . exception: {}", fe.key(), e);
} finally {
if (isReturnToPool) {
ClientPool.frontendPool.returnObject(thriftAddress, client);
} else {
ClientPool.frontendPool.invalidateObject(thriftAddress, client);
}
}
}
} catch (Throwable t) {
LOG.warn(" fetch process list from other fe failed, ", t);
}
getRemoteFeProcesslist(false, isShowFullSql, rowSet);
}

resultSet = new ShowResultSet(showStmt.getMetaData(), rowSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.apache.doris.qe.MysqlConnectProcessor;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.ShowExecutor;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.service.arrowflight.FlightSqlConnectProcessor;
Expand Down Expand Up @@ -3945,6 +3946,10 @@ public TShowProcessListResult showProcessList(TShowProcessListRequest request) {
}
List<List<String>> processList = ExecuteEnv.getInstance().getScheduler()
.listConnectionWithoutAuth(isShowFullSql);
// currently only be calls showProcessList could set field show_all_fe_processlist.
if (request.isSetShowAllFeConnection() && request.isShowAllFeConnection()) {
ShowExecutor.getRemoteFeProcesslist(false, isShowFullSql, processList);
}
TShowProcessListResult result = new TShowProcessListResult();
result.setProcessList(processList);
return result;
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1514,6 +1514,7 @@ struct TGetColumnInfoResult {

struct TShowProcessListRequest {
1: optional bool show_full_sql
2: optional bool show_all_fe_connection
}

struct TShowProcessListResult {
Expand Down
3 changes: 3 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ struct TQueryOptions {
121: optional bool keep_carriage_return = false; // \n,\r\n split line in CSV.

122: optional i32 runtime_bloom_filter_min_size = 1048576;

123: optional bool show_all_fe_connection = false;

// For cloud, to control if the content would be written into file cache
1000: optional bool disable_file_cache = false
}
Expand Down

0 comments on commit b9651bb

Please sign in to comment.