Skip to content

Commit

Permalink
processlist schema table query all fe
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo committed Aug 5, 2024
1 parent 48121da commit 47f875d
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 4 deletions.
1 change: 1 addition & 0 deletions be/src/exec/schema_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ struct SchemaScannerCommonParam {
int32_t port; // frontend thrift port
int64_t thread_id;
const std::string* catalog = nullptr;
std::set<TNetworkAddress> fe_addr_list;
};

// scanner parameter from frontend
Expand Down
11 changes: 8 additions & 3 deletions be/src/exec/schema_scanner/schema_processlist_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,14 @@ Status SchemaProcessListScanner::start(RuntimeState* state) {
TShowProcessListRequest request;
request.__set_show_full_sql(true);

RETURN_IF_ERROR(SchemaHelper::show_process_list(*(_param->common_param->ip),
_param->common_param->port, request,
&_process_list_result));
for (const auto& fe_addr : _param->common_param->fe_addr_list) {
TShowProcessListResult tmp_ret;
RETURN_IF_ERROR(
SchemaHelper::show_process_list(fe_addr.hostname, fe_addr.port, request, &tmp_ret));
_process_list_result.process_list.insert(_process_list_result.process_list.end(),
tmp_ret.process_list.begin(),
tmp_ret.process_list.end());
}

return Status::OK();
}
Expand Down
11 changes: 11 additions & 0 deletions be/src/pipeline/exec/schema_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@ Status SchemaScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
_common_scanner_param->catalog =
state->obj_pool()->add(new std::string(tnode.schema_scan_node.catalog));
}

if (tnode.schema_scan_node.__isset.fe_addr_list) {
for (const auto& fe_addr : tnode.schema_scan_node.fe_addr_list) {
_common_scanner_param->fe_addr_list.insert(fe_addr);
}
} else if (tnode.schema_scan_node.__isset.ip && tnode.schema_scan_node.__isset.port) {
TNetworkAddress fe_addr;
fe_addr.hostname = tnode.schema_scan_node.ip;
fe_addr.port = tnode.schema_scan_node.port;
_common_scanner_param->fe_addr_list.insert(fe_addr);
}
return Status::OK();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ public class SchemaTable extends Table {
.column("INFO", ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
.column("FE",
ScalarType.createVarchar(64))
.column("CLOUD_CLUSTER", ScalarType.createVarchar(64)).build()))
.column("CLOUD_CLUSTER", ScalarType.createVarchar(64)).build(), true))
.put("workload_policy",
new SchemaTable(SystemIdGenerator.getNextId(), "workload_policy", TableType.SCHEMA,
builder().column("ID", ScalarType.createType(PrimitiveType.BIGINT))
Expand Down Expand Up @@ -542,10 +542,17 @@ public class SchemaTable extends Table {
)
.build();

private boolean fetchAllFe = false;

protected SchemaTable(long id, String name, TableType type, List<Column> baseSchema) {
super(id, name, type, baseSchema);
}

protected SchemaTable(long id, String name, TableType type, List<Column> baseSchema, boolean fetchAllFe) {
this(id, name, type, baseSchema);
this.fetchAllFe = fetchAllFe;
}

@Override
public void write(DataOutput out) throws IOException {
throw new UnsupportedOperationException("Do not allow to write SchemaTable to image.");
Expand All @@ -559,6 +566,14 @@ public static Builder builder() {
return new Builder();
}

public static boolean isShouldFetchAllFe(String schemaTableName) {
Table table = TABLE_MAP.get(schemaTableName);
if (table != null && table instanceof SchemaTable) {
return ((SchemaTable) table).fetchAllFe;
}
return false;
}

/**
* For TABLE_MAP.
**/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.SchemaTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
Expand All @@ -27,6 +28,8 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.Frontend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TScanRangeLocations;
Expand All @@ -38,6 +41,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;

/**
Expand Down Expand Up @@ -96,6 +100,21 @@ public void finalizeForNereids() throws UserException {
frontendPort = Config.rpc_port;
}

private void setFeAddrList(TPlanNode msg) {
if (SchemaTable.isShouldFetchAllFe(tableName)) {
List<TNetworkAddress> feAddrList = new ArrayList();
if (ConnectContext.get().getSessionVariable().showAllFeConnection) {
List<Frontend> feList = Env.getCurrentEnv().getFrontends(null);
for (Frontend fe : feList) {
feAddrList.add(new TNetworkAddress(fe.getHost(), fe.getRpcPort()));
}
} else {
feAddrList.add(new TNetworkAddress(frontendIP, frontendPort));
}
msg.schema_scan_node.setFeAddrList(feAddrList);
}
}

@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.SCHEMA_SCAN_NODE;
Expand Down Expand Up @@ -128,6 +147,7 @@ protected void toThrift(TPlanNode msg) {

TUserIdentity tCurrentUser = ConnectContext.get().getCurrentUserIdentity().toThrift();
msg.schema_scan_node.setCurrentUserIdent(tCurrentUser);
setFeAddrList(msg);
}

@Override
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ struct TSchemaScanNode {
12: optional bool show_hidden_cloumns = false
// 13: optional list<TSchemaTableStructure> table_structure // deprecated
14: optional string catalog
15: optional list<Types.TNetworkAddress> fe_addr_list
}

struct TMetaScanNode {
Expand Down

0 comments on commit 47f875d

Please sign in to comment.