Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore](user) Add user property parallel_fragment_exec_instance_num #28447

Merged
merged 3 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,12 @@ public void updateUserPropertyInternal(String user, List<Pair<String, String>> p
Env.getCurrentEnv().getEditLog().logUpdateUserProperty(propertyInfo);
}
LOG.info("finished to set properties for user: {}", user);
} catch (DdlException e) {
if (isReplay && e.getMessage().contains("Unknown user property")) {
LOG.warn("ReplayUpdateUserProperty failed, maybe FE rolled back version, " + e.getMessage());
} else {
throw e;
}
} finally {
writeUnlock();
}
Expand Down Expand Up @@ -1000,6 +1006,15 @@ public long getMaxQueryInstances(String qualifiedUser) {
}
}

public int getParallelFragmentExecInstanceNum(String qualifiedUser) {
readLock();
try {
return propertyMgr.getParallelFragmentExecInstanceNum(qualifiedUser);
} finally {
readUnlock();
}
}

public String[] getSqlBlockRules(String qualifiedUser) {
readLock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class CommonUserProperties implements Writable {
// The maximum total number of query instances that the user is allowed to send from this FE
@SerializedName("maxQueryInstances")
private long maxQueryInstances = -1;
@SerializedName("parallelFragmentExecInstanceNum")
private int parallelFragmentExecInstanceNum = -1;
@SerializedName("sqlBlockRules")
private String sqlBlockRules = "";
@SerializedName("cpuResourceLimit")
Expand Down Expand Up @@ -75,6 +77,10 @@ long getMaxQueryInstances() {
return maxQueryInstances;
}

int getParallelFragmentExecInstanceNum() {
return parallelFragmentExecInstanceNum;
}

String getSqlBlockRules() {
return sqlBlockRules;
}
Expand All @@ -91,6 +97,10 @@ void setMaxQueryInstances(long maxQueryInstances) {
this.maxQueryInstances = maxQueryInstances;
}

void setParallelFragmentExecInstanceNum(int parallelFragmentExecInstanceNum) {
this.parallelFragmentExecInstanceNum = parallelFragmentExecInstanceNum;
}

void setSqlBlockRules(String sqlBlockRules) {
this.sqlBlockRules = sqlBlockRules;
setSqlBlockRulesSplit(sqlBlockRules);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class UserProperty implements Writable {
// advanced properties
private static final String PROP_MAX_USER_CONNECTIONS = "max_user_connections";
private static final String PROP_MAX_QUERY_INSTANCES = "max_query_instances";
private static final String PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num";
private static final String PROP_RESOURCE_TAGS = "resource_tags";
private static final String PROP_RESOURCE = "resource";
private static final String PROP_SQL_BLOCK_RULES = "sql_block_rules";
Expand Down Expand Up @@ -113,6 +114,8 @@ public class UserProperty implements Writable {
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_LOAD_CLUSTER + "." + DppConfig.CLUSTER_NAME_REGEX + "."
+ DppConfig.PRIORITY + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_MAX_QUERY_INSTANCES + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM + "$",
Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_SQL_BLOCK_RULES + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_CPU_RESOURCE_LIMIT + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_RESOURCE_TAGS + "$", Pattern.CASE_INSENSITIVE));
Expand Down Expand Up @@ -154,6 +157,10 @@ public long getMaxQueryInstances() {
return commonProperties.getMaxQueryInstances(); // maxQueryInstances;
}

public int getParallelFragmentExecInstanceNum() {
return commonProperties.getParallelFragmentExecInstanceNum();
}

public String[] getSqlBlockRules() {
return commonProperties.getSqlBlockRulesSplit();
}
Expand Down Expand Up @@ -187,6 +194,7 @@ public void update(List<Pair<String, String>> properties, boolean isReplay) thro
// copy
long newMaxConn = this.commonProperties.getMaxConn();
long newMaxQueryInstances = this.commonProperties.getMaxQueryInstances();
int newParallelFragmentExecInstanceNum = this.commonProperties.getParallelFragmentExecInstanceNum();
String sqlBlockRules = this.commonProperties.getSqlBlockRules();
int cpuResourceLimit = this.commonProperties.getCpuResourceLimit();
Set<Tag> resourceTags = this.commonProperties.getResourceTags();
Expand Down Expand Up @@ -242,6 +250,17 @@ public void update(List<Pair<String, String>> properties, boolean isReplay) thro
} catch (NumberFormatException e) {
throw new DdlException(PROP_MAX_QUERY_INSTANCES + " is not number");
}
} else if (keyArr[0].equalsIgnoreCase(PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM)) {
// set property "parallel_fragment_exec_instance_num" = "16"
if (keyArr.length != 1) {
throw new DdlException(PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM + " format error");
}

try {
newParallelFragmentExecInstanceNum = Integer.parseInt(value);
} catch (NumberFormatException e) {
throw new DdlException(PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM + " is not number");
}
} else if (keyArr[0].equalsIgnoreCase(PROP_SQL_BLOCK_RULES)) {
// set property "sql_block_rules" = "test_rule1,test_rule2"
if (keyArr.length != 1) {
Expand Down Expand Up @@ -337,6 +356,7 @@ public void update(List<Pair<String, String>> properties, boolean isReplay) thro
// set
this.commonProperties.setMaxConn(newMaxConn);
this.commonProperties.setMaxQueryInstances(newMaxQueryInstances);
this.commonProperties.setParallelFragmentExecInstanceNum(newParallelFragmentExecInstanceNum);
this.commonProperties.setSqlBlockRules(sqlBlockRules);
this.commonProperties.setCpuResourceLimit(cpuResourceLimit);
this.commonProperties.setResourceTags(resourceTags);
Expand Down Expand Up @@ -456,6 +476,10 @@ public List<List<String>> fetchProperty() {
result.add(Lists.newArrayList(PROP_MAX_QUERY_INSTANCES,
String.valueOf(commonProperties.getMaxQueryInstances())));

// parallel fragment exec instance num
result.add(Lists.newArrayList(PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM,
String.valueOf(commonProperties.getParallelFragmentExecInstanceNum())));

// sql block rules
result.add(Lists.newArrayList(PROP_SQL_BLOCK_RULES, commonProperties.getSqlBlockRules()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ public long getMaxQueryInstances(String qualifiedUser) {
return existProperty.getMaxQueryInstances();
}

public int getParallelFragmentExecInstanceNum(String qualifiedUser) {
UserProperty existProperty = propertyMap.get(qualifiedUser);
existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty);
if (existProperty == null) {
return -1;
}
return existProperty.getParallelFragmentExecInstanceNum();
}

public Set<Tag> getResourceTags(String qualifiedUser) {
UserProperty existProperty = propertyMap.get(qualifiedUser);
existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1969,6 +1969,14 @@ public void setEnableFoldConstantByBe(boolean foldConstantByBe) {
}

public int getParallelExecInstanceNum() {
ConnectContext connectContext = ConnectContext.get();
if (connectContext != null && connectContext.getEnv() != null && connectContext.getEnv().getAuth() != null) {
int userParallelExecInstanceNum = connectContext.getEnv().getAuth()
.getParallelFragmentExecInstanceNum(connectContext.getQualifiedUser());
if (userParallelExecInstanceNum > 0) {
return userParallelExecInstanceNum;
}
}
if (getEnablePipelineEngine() && parallelPipelineTaskNum == 0) {
int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize();
int autoInstance = (size + 1) / 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public void testUpdate() throws UserException {
properties.add(Pair.of("load_cluster.dpp-cluster.hadoop_palo_path", "/user/palo2"));
properties.add(Pair.of("default_load_cluster", "dpp-cluster"));
properties.add(Pair.of("max_qUERY_instances", "3000"));
properties.add(Pair.of("parallel_fragment_exec_instance_num", "2000"));
properties.add(Pair.of("sql_block_rules", "rule1,rule2"));
properties.add(Pair.of("cpu_resource_limit", "2"));
properties.add(Pair.of("query_timeout", "500"));
Expand All @@ -114,6 +115,7 @@ public void testUpdate() throws UserException {
Assert.assertEquals("/user/palo2", userProperty.getLoadClusterInfo("dpp-cluster").second.getPaloPath());
Assert.assertEquals("dpp-cluster", userProperty.getDefaultLoadCluster());
Assert.assertEquals(3000, userProperty.getMaxQueryInstances());
Assert.assertEquals(2000, userProperty.getParallelFragmentExecInstanceNum());
Assert.assertEquals(new String[]{"rule1", "rule2"}, userProperty.getSqlBlockRules());
Assert.assertEquals(2, userProperty.getCpuResourceLimit());
Assert.assertEquals(500, userProperty.getQueryTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public void test() throws Exception {
Assert.assertEquals(1000000, execMemLimit);

List<List<String>> userProps = Env.getCurrentEnv().getAuth().getUserProperties(Auth.ROOT_USER);
Assert.assertEquals(10, userProps.size());
Assert.assertEquals(11, userProps.size());

// now :
// be1 be2 be3 ==>tag1;
Expand Down
Loading