Skip to content

Commit

Permalink
[enhancement](nereids) speedup sql cache with use variable as partiti…
Browse files Browse the repository at this point in the history
…on predicate (apache#37915)

support reuse sql cache when use variable as partition predicate and variable change:
```sql
set @dt='2024-07-16';
-- create cache 1
select * from tbl where dt = @dt;

set @dt='2024-07-17';
-- create cache 2, will not invalidate cache 1
select * from tbl where dt = @dt;

set @dt='2024-07-16';
-- reuse cache 1
select * from tbl where dt = @dt;
```
  • Loading branch information
924060929 authored and seawinde committed Jul 17, 2024
1 parent 1eb6ad5 commit d3c5d9c
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.common.ConfigBase.DefaultConfHandler;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.DataMaskPolicy;
import org.apache.doris.mysql.privilege.RowFilterPolicy;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.SqlCacheContext;
import org.apache.doris.nereids.SqlCacheContext.CacheKeyType;
import org.apache.doris.nereids.SqlCacheContext.FullColumnName;
import org.apache.doris.nereids.SqlCacheContext.FullTableName;
import org.apache.doris.nereids.SqlCacheContext.ScanTable;
Expand Down Expand Up @@ -125,7 +127,9 @@ public void tryAddFeSqlCache(ConnectContext connectContext, String sql) {

SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
String key = currentUserIdentity.toString() + ":" + sql.trim();
String key = sqlCacheContext.getCacheKeyType() == CacheKeyType.SQL
? currentUserIdentity.toString() + ":" + sql.trim()
: currentUserIdentity.toString() + ":" + DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5());
if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null
&& sqlCacheContext.getResultSetInFe().isPresent()) {
sqlCaches.put(key, sqlCacheContext);
Expand All @@ -143,7 +147,9 @@ public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyz
}
SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
String key = currentUserIdentity.toString() + ":" + sql.trim();
String key = sqlCacheContext.getCacheKeyType() == CacheKeyType.SQL
? currentUserIdentity.toString() + ":" + sql.trim()
: currentUserIdentity.toString() + ":" + DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5());
if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null) {
SqlCache cache = (SqlCache) analyzer.getCache();
sqlCacheContext.setSumOfPartitionNum(cache.getSumOfPartitionNum());
Expand All @@ -163,15 +169,44 @@ public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyz
/** tryParseSql */
public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, String sql) {
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
Env env = connectContext.getEnv();
String key = currentUserIdentity.toString() + ":" + sql.trim();
String key = currentUserIdentity + ":" + sql.trim();
SqlCacheContext sqlCacheContext = sqlCaches.getIfPresent(key);
if (sqlCacheContext == null) {
return Optional.empty();
}

// LOG.info("Total size: " + GraphLayout.parseInstance(sqlCacheContext).totalSize());

List<Variable> currentVariables = resolveUserVariables(sqlCacheContext);
if (usedVariablesChanged(currentVariables, sqlCacheContext)) {
String md5 = DebugUtil.printId(
sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables)));

String md5CacheKey = currentUserIdentity + ":" + md5;
SqlCacheContext sqlCacheContextWithVariable = sqlCaches.getIfPresent(md5CacheKey);

// already exist cache in the fe, but the variable is different to this query,
// we should create another cache context in fe, use another cache key
connectContext.getStatementContext()
.getSqlCacheContext().ifPresent(ctx -> ctx.setCacheKeyType(CacheKeyType.MD5));

if (sqlCacheContextWithVariable != null) {
return tryParseSqlWithoutCheckVariable(
connectContext, md5CacheKey, sqlCacheContextWithVariable, currentUserIdentity
);
} else {
return Optional.empty();
}
} else {
return tryParseSqlWithoutCheckVariable(connectContext, key, sqlCacheContext, currentUserIdentity);
}
}

private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
ConnectContext connectContext, String key,
SqlCacheContext sqlCacheContext, UserIdentity currentUserIdentity) {
Env env = connectContext.getEnv();

// check table and view and their columns authority
if (privilegeChanged(connectContext, env, sqlCacheContext)) {
return invalidateCache(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public class SqlCacheContext {
private volatile PUniqueId cacheKeyMd5;
private volatile ResultSet resultSetInFe;

private volatile CacheKeyType cacheKeyType = CacheKeyType.SQL;

public SqlCacheContext(UserIdentity userIdentity, TUniqueId queryId) {
this.userIdentity = Objects.requireNonNull(userIdentity, "userIdentity cannot be null");
this.queryId = Objects.requireNonNull(queryId, "queryId cannot be null");
Expand Down Expand Up @@ -392,6 +394,14 @@ public void setResultSetInFe(ResultSet resultSetInFe) {
this.resultSetInFe = resultSetInFe;
}

public CacheKeyType getCacheKeyType() {
return cacheKeyType;
}

public void setCacheKeyType(CacheKeyType cacheKeyType) {
this.cacheKeyType = cacheKeyType;
}

/** FullTableName */
@lombok.Data
@lombok.AllArgsConstructor
Expand Down Expand Up @@ -433,4 +443,12 @@ public void addScanPartition(Long partitionId) {
this.scanPartitions.add(partitionId);
}
}

/** CacheKeyType */
public enum CacheKeyType {
// use `userIdentity`:`sql`.trim() as Cache key in FE
SQL,
// use MD5 as Cache key in FE
MD5
}
}
14 changes: 14 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.apache.doris.mysql.MysqlPacket;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.mysql.MysqlServerStatusFlag;
import org.apache.doris.nereids.SqlCacheContext;
import org.apache.doris.nereids.SqlCacheContext.CacheKeyType;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.nereids.exceptions.ParseException;
Expand Down Expand Up @@ -262,9 +264,15 @@ public void executeQuery(MysqlCommand mysqlCommand, String originStmt) throws Ex
// TODO: after implemented full prepared, we could remove this flag
boolean nereidsUseServerPrep = sessionVariable.enableServeSidePreparedStatement
|| mysqlCommand == MysqlCommand.COM_QUERY;
CacheKeyType cacheKeyType = null;
if (nereidsUseServerPrep && sessionVariable.isEnableNereidsPlanner()) {
if (wantToParseSqlFromSqlCache) {
cachedStmts = parseFromSqlCache(originStmt);
Optional<SqlCacheContext> sqlCacheContext = ConnectContext.get()
.getStatementContext().getSqlCacheContext();
if (sqlCacheContext.isPresent()) {
cacheKeyType = sqlCacheContext.get().getCacheKeyType();
}
if (cachedStmts != null) {
stmts = cachedStmts;
}
Expand Down Expand Up @@ -367,6 +375,12 @@ public void executeQuery(MysqlCommand mysqlCommand, String originStmt) throws Ex
executor.getProfile().getSummaryProfile().setParseSqlFinishTime(parseSqlFinishTime);
ctx.setExecutor(executor);

if (cacheKeyType != null) {
SqlCacheContext sqlCacheContext =
executor.getContext().getStatementContext().getSqlCacheContext().get();
sqlCacheContext.setCacheKeyType(cacheKeyType);
}

try {
executor.execute();
if (connectType.equals(ConnectType.MYSQL)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,26 @@ suite("parse_sql_from_sql_cache") {
assertHasCache "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1"
def result1 = sql "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1"
assertTrue(result1.size() == 1 && result1[0][0].toString().toInteger() == 10)


sql "set @custom_variable2=1"
assertNoCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
def res = sql "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
assertTrue(res[0][0] == 1)
assertHasCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"

sql "set @custom_variable2=2"
assertNoCache "select* from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
// should not invalidate cache with @custom_variable2=1
res = sql "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
assertTrue(res[0][0] == 2)
assertHasCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"

sql "set @custom_variable2=1"
// should reuse cache
assertHasCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
res = sql "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
assertTrue(res[0][0] == 1)
}
}),
extraThread("test_udf", {
Expand Down

0 comments on commit d3c5d9c

Please sign in to comment.