Skip to content

Commit

Permalink
[fix](cache) fix sql cache throw npe in cloud mode (#47221)
Browse files Browse the repository at this point in the history
fix sql cache throw npe in cloud mode, when some partitions is dropped:
```
2025-01-15 18:18:54,811 WARN (mysql-nio-pool-101426|288) [ConnectProcessor.handleQueryException():537] Process one query failed because unknown reason:
java.lang.NullPointerException: Cannot invoke "org.apache.doris.cloud.catalog.CloudPartition.getDbId()" because "partition" is null
        at org.apache.doris.cloud.catalog.CloudPartition.getSnapshotVisibleVersion(CloudPartition.java:196) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.catalog.OlapTable.getVersionInBatchForCloudMode(OlapTable.java:1190) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.qe.cache.CacheAnalyzer.buildCacheTableForOlapScanNode(CacheAnalyzer.java:700) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.qe.cache.CacheAnalyzer.buildCacheTableList(CacheAnalyzer.java:512) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.qe.cache.CacheAnalyzer.innerCheckCacheModeForNereids(CacheAnalyzer.java:412) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.qe.cache.CacheAnalyzer.getCacheData(CacheAnalyzer.java:522) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.qe.StmtExecutor.handleCacheStmt(StmtExecutor.java:1725) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.qe.StmtExecutor.handleQueryStmt(StmtExecutor.java:1831) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.qe.StmtExecutor.handleQueryWithRetry(StmtExecutor.java:874) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.qe.StmtExecutor.executeByNereids(StmtExecutor.java:811) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.qe.StmtExecutor.execute(StmtExecutor.java:607) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.qe.StmtExecutor.queryRetry(StmtExecutor.java:557) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.qe.StmtExecutor.execute(StmtExecutor.java:547) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.qe.ConnectProcessor.executeQuery(ConnectProcessor.java:397) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.qe.ConnectProcessor.handleQuery(ConnectProcessor.java:238) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.qe.MysqlConnectProcessor.handleQuery(MysqlConnectProcessor.java:194) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.qe.MysqlConnectProcessor.dispatch(MysqlConnectProcessor.java:222) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.qe.MysqlConnectProcessor.processOnce(MysqlConnectProcessor.java:281) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.mysql.ReadListener.lambda$handleEvent$0(ReadListener.java:52) ~[doris-fe.jar:1.2-SNAPSHOT]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
        at java.lang.Thread.run(Thread.java:840) ~[?:?] 
```

This is an unstable exception, so I can not add test
  • Loading branch information
924060929 authored and Your Name committed Jan 21, 2025
1 parent 52396fb commit 1f9780f
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,37 +224,36 @@ private String normalizeSql(String sql) {
private Optional<LogicalSqlCache> tryParseSql(
ConnectContext connectContext, String key, SqlCacheContext sqlCacheContext,
UserIdentity currentUserIdentity, boolean checkUserVariable) {
Env env = connectContext.getEnv();

if (!tryLockTables(connectContext, env, sqlCacheContext)) {
return invalidateCache(key);
}
try {
Env env = connectContext.getEnv();

// check table and view and their columns authority
if (privilegeChanged(connectContext, env, sqlCacheContext)) {
return invalidateCache(key);
}
if (tablesOrDataChanged(env, sqlCacheContext)) {
return invalidateCache(key);
}
if (viewsChanged(env, sqlCacheContext)) {
return invalidateCache(key);
}
if (!tryLockTables(connectContext, env, sqlCacheContext)) {
return invalidateCache(key);
}

LogicalEmptyRelation whateverPlan = new LogicalEmptyRelation(new RelationId(0), ImmutableList.of());
if (nondeterministicFunctionChanged(whateverPlan, connectContext, sqlCacheContext)) {
return invalidateCache(key);
}
// check table and view and their columns authority
if (privilegeChanged(connectContext, env, sqlCacheContext)) {
return invalidateCache(key);
}
if (tablesOrDataChanged(env, sqlCacheContext)) {
return invalidateCache(key);
}
if (viewsChanged(env, sqlCacheContext)) {
return invalidateCache(key);
}

// table structure and data not changed, now check policy
if (rowPoliciesChanged(currentUserIdentity, env, sqlCacheContext)) {
return invalidateCache(key);
}
if (dataMaskPoliciesChanged(currentUserIdentity, env, sqlCacheContext)) {
return invalidateCache(key);
}
LogicalEmptyRelation whateverPlan = new LogicalEmptyRelation(new RelationId(0), ImmutableList.of());
if (nondeterministicFunctionChanged(whateverPlan, connectContext, sqlCacheContext)) {
return invalidateCache(key);
}

try {
// table structure and data not changed, now check policy
if (rowPoliciesChanged(currentUserIdentity, env, sqlCacheContext)) {
return invalidateCache(key);
}
if (dataMaskPoliciesChanged(currentUserIdentity, env, sqlCacheContext)) {
return invalidateCache(key);
}
Optional<ResultSet> resultSetInFe = sqlCacheContext.getResultSetInFe();

List<Variable> currentVariables = ImmutableList.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,54 +471,58 @@ private CacheMode innerCheckCacheModeForNereids(long now) {
}

private List<CacheTable> buildCacheTableList() {
//Check the last version time of the table
MetricRepo.COUNTER_QUERY_TABLE.increase(1L);
long olapScanNodeSize = 0;
long hiveScanNodeSize = 0;
for (ScanNode scanNode : scanNodes) {
if (scanNode instanceof OlapScanNode) {
olapScanNodeSize++;
} else if (scanNode instanceof HiveScanNode) {
hiveScanNodeSize++;
try {
//Check the last version time of the table
MetricRepo.COUNTER_QUERY_TABLE.increase(1L);
long olapScanNodeSize = 0;
long hiveScanNodeSize = 0;
for (ScanNode scanNode : scanNodes) {
if (scanNode instanceof OlapScanNode) {
olapScanNodeSize++;
} else if (scanNode instanceof HiveScanNode) {
hiveScanNodeSize++;
}
}
}
if (olapScanNodeSize > 0) {
MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L);
}
if (hiveScanNodeSize > 0) {
MetricRepo.COUNTER_QUERY_HIVE_TABLE.increase(1L);
}

if (!(olapScanNodeSize == scanNodes.size() || hiveScanNodeSize == scanNodes.size())) {
if (LOG.isDebugEnabled()) {
LOG.debug("only support olap/hive table with non-federated query, other types are not supported now, "
+ "queryId {}", DebugUtil.printId(queryId));
if (olapScanNodeSize > 0) {
MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L);
}
if (hiveScanNodeSize > 0) {
MetricRepo.COUNTER_QUERY_HIVE_TABLE.increase(1L);
}
return Collections.emptyList();
}

List<CacheTable> tblTimeList = Lists.newArrayList();
for (int i = 0; i < scanNodes.size(); i++) {
ScanNode node = scanNodes.get(i);
if (enablePartitionCache()
&& (node instanceof OlapScanNode)
&& ((OlapScanNode) node).getSelectedPartitionNum() > 1
&& selectStmt != null
&& selectStmt.hasGroupByClause()) {
if (!(olapScanNodeSize == scanNodes.size() || hiveScanNodeSize == scanNodes.size())) {
if (LOG.isDebugEnabled()) {
LOG.debug("more than one partition scanned when qeury has agg, "
+ "partition cache cannot use, queryid {}",
DebugUtil.printId(queryId));
LOG.debug("only support olap/hive table with non-federated query, "
+ "other types are not supported now, queryId {}", DebugUtil.printId(queryId));
}
return Collections.emptyList();
}
CacheTable cTable = node instanceof OlapScanNode
? buildCacheTableForOlapScanNode((OlapScanNode) node)
: buildCacheTableForHiveScanNode((HiveScanNode) node);
tblTimeList.add(cTable);

List<CacheTable> tblTimeList = Lists.newArrayList();
for (int i = 0; i < scanNodes.size(); i++) {
ScanNode node = scanNodes.get(i);
if (enablePartitionCache()
&& (node instanceof OlapScanNode)
&& ((OlapScanNode) node).getSelectedPartitionNum() > 1
&& selectStmt != null
&& selectStmt.hasGroupByClause()) {
if (LOG.isDebugEnabled()) {
LOG.debug("more than one partition scanned when qeury has agg, "
+ "partition cache cannot use, queryid {}",
DebugUtil.printId(queryId));
}
return Collections.emptyList();
}
CacheTable cTable = node instanceof OlapScanNode
? buildCacheTableForOlapScanNode((OlapScanNode) node)
: buildCacheTableForHiveScanNode((HiveScanNode) node);
tblTimeList.add(cTable);
}
Collections.sort(tblTimeList);
return tblTimeList;
} catch (Throwable t) {
return new ArrayList<>();
}
Collections.sort(tblTimeList);
return tblTimeList;
}

public InternalService.PFetchCacheResult getCacheData() throws UserException {
Expand Down

0 comments on commit 1f9780f

Please sign in to comment.