diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index f1cdf47b33f0eab..d1934a60299de74 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -230,7 +230,6 @@ import org.apache.doris.statistics.StatisticsAutoCollector; import org.apache.doris.statistics.StatisticsCache; import org.apache.doris.statistics.StatisticsCleaner; -import org.apache.doris.statistics.StatisticsPeriodCollector; import org.apache.doris.statistics.query.QueryStats; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; @@ -495,8 +494,6 @@ public class Env { private StatisticsAutoCollector statisticsAutoCollector; - private StatisticsPeriodCollector statisticsPeriodCollector; - private HiveTransactionMgr hiveTransactionMgr; private TopicPublisherThread topicPublisherThread; @@ -720,7 +717,6 @@ private Env(boolean isCheckpointCatalog) { this.analysisManager = new AnalysisManager(); this.statisticsCleaner = new StatisticsCleaner(); this.statisticsAutoCollector = new StatisticsAutoCollector(); - this.statisticsPeriodCollector = new StatisticsPeriodCollector(); this.globalFunctionMgr = new GlobalFunctionMgr(); this.workloadGroupMgr = new WorkloadGroupMgr(); this.queryStats = new QueryStats(); @@ -971,9 +967,6 @@ public void initialize(String[] args) throws Exception { if (statisticsAutoCollector != null) { statisticsAutoCollector.start(); } - if (statisticsPeriodCollector != null) { - statisticsPeriodCollector.start(); - } queryCancelWorker.start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 16cec127bff2560..32cd2fd94e01f31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -54,7 +54,6 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisType; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.HistogramTask; -import org.apache.doris.statistics.MVAnalysisTask; import org.apache.doris.statistics.OlapAnalysisTask; import org.apache.doris.statistics.TableStatsMeta; import org.apache.doris.statistics.util.StatisticsUtil; @@ -1122,11 +1121,9 @@ public TTableDescriptor toThrift() { public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { if (info.analysisType.equals(AnalysisType.HISTOGRAM)) { return new HistogramTask(info); - } - if (info.analysisType.equals(AnalysisType.FUNDAMENTALS)) { + } else { return new OlapAnalysisTask(info); } - return new MVAnalysisTask(info); } public boolean needReAnalyzeTable(TableStatsMeta tblStats) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java new file mode 100644 index 000000000000000..aba139ee7894ae7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Env; +import org.apache.doris.qe.AuditLogHelper; +import org.apache.doris.qe.AutoCloseConnectContext; +import org.apache.doris.qe.QueryState; +import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.statistics.util.StatisticsUtil; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.StringJoiner; + +public class AnalysisJob { + + public static final Logger LOG = LogManager.getLogger(AnalysisJob.class); + + protected Set queryingTask; + + protected Set queryFinished; + + protected List buf; + + protected int total; + + protected int acc; + + protected StmtExecutor stmtExecutor; + + protected boolean killed; + + protected long start; + + protected AnalysisInfo jobInfo; + + public AnalysisJob(AnalysisInfo jobInfo, Collection queryingTask) { + for (BaseAnalysisTask task : queryingTask) { + task.job = this; + } + this.queryingTask = new HashSet<>(queryingTask); + this.queryFinished = new HashSet<>(); + this.buf = new ArrayList<>(); + total = queryingTask.size(); + start = System.currentTimeMillis(); + this.jobInfo = jobInfo; + } + + public synchronized void appendBuf(BaseAnalysisTask task, List statsData) { + queryingTask.remove(task); + buf.addAll(statsData); + queryFinished.add(task); + acc += 1; + if (acc == total) { + writeBuf(); + updateTaskState(AnalysisState.FINISHED, "Cost time in sec: " + + (System.currentTimeMillis() - start) / 1000); + deregisterJob(); + } else if (buf.size() >= StatisticConstants.ANALYZE_JOB_BUF_SIZE) { + writeBuf(); + } + } + + // CHECKSTYLE OFF + // fallthrough here is expected + public void updateTaskState(AnalysisState state, String msg) { + long time = System.currentTimeMillis(); + switch (state) { + case FAILED: + for (BaseAnalysisTask task : queryingTask) { + Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(task.info, state, msg, time); + } + case FINISHED: + for (BaseAnalysisTask task : queryFinished) { + Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(task.info, state, msg, time); + } + default: + // DO NOTHING + } + } + + protected void writeBuf() { + String insertStmt = "INSERT INTO " + StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME + " VALUES "; + StringJoiner values = new StringJoiner(","); + for (ColStatsData data : buf) { + values.add(data.toSQL(true)); + } + insertStmt += values.toString(); + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) { + stmtExecutor = new StmtExecutor(r.connectContext, insertStmt); + executeWithExceptionOnFail(stmtExecutor); + } catch (Throwable t) { + LOG.warn("Failed to write buf: " + insertStmt, t); + updateTaskState(AnalysisState.FAILED, t.getMessage()); + return; + } + updateTaskState(AnalysisState.FINISHED, ""); + syncLoadStats(); + queryFinished.clear(); + } + + protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exception { + if (killed) { + return; + } + LOG.debug("execute internal sql: {}", stmtExecutor.getOriginStmt()); + try { + stmtExecutor.execute(); + QueryState queryState = stmtExecutor.getContext().getState(); + if (queryState.getStateType().equals(MysqlStateType.ERR)) { + throw new RuntimeException( + "Failed to insert : " + stmtExecutor.getOriginStmt().originStmt + "Error msg: " + + queryState.getErrorMessage()); + } + } finally { + AuditLogHelper.logAuditLog(stmtExecutor.getContext(), stmtExecutor.getOriginStmt().toString(), + stmtExecutor.getParsedStmt(), stmtExecutor.getQueryStatisticsForAuditLog(), + true); + } + } + + public void taskFailed(BaseAnalysisTask task, String reason) { + updateTaskState(AnalysisState.FAILED, reason); + cancel(); + deregisterJob(); + } + + public void cancel() { + for (BaseAnalysisTask task : queryingTask) { + task.cancel(); + } + } + + public void deregisterJob() { + Env.getCurrentEnv().getAnalysisManager().removeJob(jobInfo.jobId); + } + + protected void syncLoadStats() { + long tblId = jobInfo.tblId; + for (BaseAnalysisTask task : queryFinished) { + String colName = task.col.getName(); + if (!Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tblId, -1, colName)) { + Env.getCurrentEnv().getAnalysisManager().removeColStatsStatus(tblId, colName); + } + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index ef6cafa1f3aa2d0..45150dda1a142d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -128,6 +128,8 @@ public class AnalysisManager extends Daemon implements Writable { private final Map idToTblStats = new ConcurrentHashMap<>(); + private final Map idToAnalysisJob = new ConcurrentHashMap<>(); + protected SimpleQueue autoJobs = createSimpleQueue(null, this); private final Function userJobStatusUpdater = w -> { @@ -371,6 +373,7 @@ protected AnalysisInfo buildAndAssignJob(AnalyzeTblStmt stmt) throws DdlExceptio boolean isSync = stmt.isSync(); Map analysisTaskInfos = new HashMap<>(); createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync); + constructJob(jobInfo, analysisTaskInfos.values()); if (!jobInfo.partitionOnly && stmt.isAllColumns() && StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) { createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, isSync); @@ -1129,4 +1132,17 @@ public ColStatsMeta findColStatsMeta(long tblId, String colName) { } return tableStats.findColumnStatsMeta(colName); } + + public AnalysisJob findJob(long id) { + return idToAnalysisJob.get(id); + } + + public void constructJob(AnalysisInfo jobInfo, Collection tasks) { + AnalysisJob job = new AnalysisJob(jobInfo, tasks); + idToAnalysisJob.put(jobInfo.jobId, job); + } + + public void removeJob(long id) { + idToAnalysisJob.remove(id); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java index 9aa3d85992b32c0..ffdd375ee9e36d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java @@ -17,7 +17,6 @@ package org.apache.doris.statistics; -import org.apache.doris.catalog.Env; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; import org.apache.doris.statistics.AnalysisInfo.ScheduleType; @@ -59,9 +58,8 @@ public void run() { if (task.info.scheduleType.equals(ScheduleType.AUTOMATIC) && !StatisticsUtil.inAnalyzeTime( LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) { // TODO: Do we need a separate AnalysisState here? - Env.getCurrentEnv().getAnalysisManager() - .updateTaskStatus(task.info, AnalysisState.FAILED, "Auto task" - + "doesn't get executed within specified time range", System.currentTimeMillis()); + task.job.taskFailed(task, "Auto task" + + "doesn't get executed within specified time range"); return; } executor.putJob(this); @@ -76,15 +74,7 @@ public void run() { if (!task.killed) { if (except != null) { LOG.warn("Analyze {} failed.", task.toString(), except); - Env.getCurrentEnv().getAnalysisManager() - .updateTaskStatus(task.info, - AnalysisState.FAILED, Util.getRootCauseMessage(except), System.currentTimeMillis()); - } else { - LOG.debug("Analyze {} finished, cost time:{}", task.toString(), - System.currentTimeMillis() - startTime); - Env.getCurrentEnv().getAnalysisManager() - .updateTaskStatus(task.info, - AnalysisState.FINISHED, "", System.currentTimeMillis()); + task.job.taskFailed(task, Util.getRootCauseMessage(except)); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index ad74266a7c365a3..c9e24dfae9abbe9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -24,9 +24,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Config; import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.qe.AuditLogHelper; -import org.apache.doris.qe.QueryState; -import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.AnalysisType; @@ -38,6 +36,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Collections; import java.util.concurrent.TimeUnit; public abstract class BaseAnalysisTask { @@ -52,58 +51,25 @@ public abstract class BaseAnalysisTask { + "else NDV(`${colName}`) * ${scaleFactor} end AS ndv, " ; - /** - * Stats stored in the column_statistics table basically has two types, `part_id` is null which means it is - * aggregate from partition level stats, `part_id` is not null which means it is partition level stats. - * For latter, it's id field contains part id, for previous doesn't. - */ - protected static final String INSERT_PART_STATISTICS = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " SELECT " - + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', ${partId}) AS id, " - + "${catalogId} AS catalog_id, " - + "${dbId} AS db_id, " - + "${tblId} AS tbl_id, " - + "${idxId} AS idx_id, " - + "'${colId}' AS col_id, " - + "${partId} AS part_id, " - + "COUNT(1) AS row_count, " - + "NDV(`${colName}`) AS ndv, " - + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, " - + "MIN(`${colName}`) AS min, " - + "MAX(`${colName}`) AS max, " - + "${dataSizeFunction} AS data_size, " - + "NOW() "; - - protected static final String INSERT_COL_STATISTICS = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " SELECT id, catalog_id, db_id, tbl_id, idx_id, col_id, part_id, row_count, " - + " ndv, null_count, CAST(min AS string), CAST(max AS string), data_size, update_time\n" - + " FROM \n" - + " (SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " + protected static final String COLLECT_COL_STATISTICS = + "SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " + " ${catalogId} AS catalog_id, " + " ${dbId} AS db_id, " + " ${tblId} AS tbl_id, " + " ${idxId} AS idx_id, " + " '${colId}' AS col_id, " + " NULL AS part_id, " - + " SUM(count) AS row_count, \n" - + " SUM(null_count) AS null_count, " - + " MIN(CAST(min AS ${type})) AS min, " - + " MAX(CAST(max AS ${type})) AS max, " - + " SUM(data_size_in_bytes) AS data_size, " - + " NOW() AS update_time \n" - + " FROM ${internalDB}.${columnStatTbl}" - + " WHERE ${internalDB}.${columnStatTbl}.db_id = '${dbId}' AND " - + " ${internalDB}.${columnStatTbl}.tbl_id='${tblId}' AND " - + " ${internalDB}.${columnStatTbl}.col_id='${colId}' AND " - + " ${internalDB}.${columnStatTbl}.idx_id='${idxId}' AND " - + " ${internalDB}.${columnStatTbl}.part_id IS NOT NULL" - + " ) t1, \n"; - - protected static final String ANALYZE_PARTITION_COLUMN_TEMPLATE = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " SELECT " + + " COUNT(1) AS row_count, " + + " NDV(`${colName}`) AS ndv, " + + " COUNT(1) - COUNT(${colName}) AS null_count, " + + " CAST(MIN(${colName}) AS STRING) AS min, " + + " CAST(MAX(${colName}) AS STRING) AS max, " + + " ${dataSizeFunction} AS data_size, " + + " NOW() AS update_time " + + " FROM `${dbName}`.`${tblName}`"; + + protected static final String ANALYZE_PARTITION_COLUMN_TEMPLATE = + " SELECT " + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " + "${catalogId} AS catalog_id, " + "${dbId} AS db_id, " @@ -135,6 +101,8 @@ public abstract class BaseAnalysisTask { protected TableSample tableSample = null; + protected AnalysisJob job; + @VisibleForTesting public BaseAnalysisTask() { @@ -191,6 +159,7 @@ protected void executeWithRetry() { } LOG.warn("Failed to execute analysis task, retried times: {}", retriedTimes++, t); if (retriedTimes > StatisticConstants.ANALYZE_TASK_RETRY_TIMES) { + job.taskFailed(this, t.getMessage()); throw new RuntimeException(t); } StatisticsUtil.sleep(TimeUnit.SECONDS.toMillis(2 ^ retriedTimes) * 10); @@ -282,23 +251,20 @@ public String toString() { col == null ? "TableRowCount" : col.getName()); } - protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exception { - if (killed) { - return; - } - LOG.debug("execute internal sql: {}", stmtExecutor.getOriginStmt()); - try { - stmtExecutor.execute(); - QueryState queryState = stmtExecutor.getContext().getState(); - if (queryState.getStateType().equals(MysqlStateType.ERR)) { - throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s", - catalog.getName(), db.getFullName(), info.colName, stmtExecutor.getOriginStmt().toString(), - queryState.getErrorMessage())); - } + public void setJob(AnalysisJob job) { + this.job = job; + } + + protected void runQuery(String sql) { + long startTime = System.currentTimeMillis(); + try (AutoCloseConnectContext a = StatisticsUtil.buildConnectContext()) { + stmtExecutor = new StmtExecutor(a.connectContext, sql); + stmtExecutor.executeInternalQuery(); + ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0)); + job.appendBuf(this, Collections.singletonList(colStatsData)); } finally { - AuditLogHelper.logAuditLog(stmtExecutor.getContext(), stmtExecutor.getOriginStmt().toString(), - stmtExecutor.getParsedStmt(), stmtExecutor.getQueryStatisticsForAuditLog(), - true); + LOG.debug("End cost time in secs: " + (System.currentTimeMillis() - startTime) / 1000); } } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java index 3cbd1b5a61129b9..be165ee306ee9d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java @@ -19,6 +19,8 @@ import org.apache.doris.statistics.util.StatisticsUtil; +import com.google.common.annotations.VisibleForTesting; + import java.util.StringJoiner; /** @@ -52,6 +54,18 @@ public class ColStatsData { public final String updateTime; + @VisibleForTesting + public ColStatsData() { + statsId = null; + count = 0; + ndv = 0; + nullCount = 0; + minLit = null; + maxLit = null; + dataSizeInBytes = 0; + updateTime = null; + } + public ColStatsData(ResultRow row) { this.statsId = new StatsId(row); this.count = (long) Double.parseDouble(row.get(7)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java index 188665645c386f5..1e71013c8404d00 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java @@ -23,26 +23,19 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.external.hive.util.HiveUtil; -import org.apache.doris.qe.AutoCloseConnectContext; -import org.apache.doris.qe.QueryState; -import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.util.StatisticsUtil; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.commons.text.StringSubstitutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.ArrayList; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.StringJoiner; import java.util.stream.Collectors; public class HMSAnalysisTask extends BaseAnalysisTask { @@ -51,9 +44,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask { // While doing sample analysis, the sampled ndv result will multiply a factor (total size/sample size) // if ndv(col)/count(col) is greater than this threshold. - private static final String ANALYZE_TABLE_TEMPLATE = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " SELECT " + private static final String ANALYZE_TABLE_TEMPLATE = " SELECT " + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " + "${catalogId} AS catalog_id, " + "${dbId} AS db_id, " @@ -70,28 +61,9 @@ public class HMSAnalysisTask extends BaseAnalysisTask { + "NOW() " + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}"; - private static final String ANALYZE_PARTITION_TEMPLATE = " SELECT " - + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', ${partId}) AS id, " - + "${catalogId} AS catalog_id, " - + "${dbId} AS db_id, " - + "${tblId} AS tbl_id, " - + "${idxId} AS idx_id, " - + "'${colId}' AS col_id, " - + "${partId} AS part_id, " - + "COUNT(1) AS row_count, " - + "NDV(`${colName}`) AS ndv, " - + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, " - + "MIN(`${colName}`) AS min, " - + "MAX(`${colName}`) AS max, " - + "${dataSizeFunction} AS data_size, " - + "NOW() FROM `${catalogName}`.`${dbName}`.`${tblName}` where "; - private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT ROUND(COUNT(1) * ${scaleFactor}) as rowCount " + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}"; - // cache stats for each partition, it would be inserted into column_statistics in a batch. - private final List> buf = new ArrayList<>(); - private final boolean isTableLevelTask; private final boolean isPartitionOnly; private Set partitionNames; @@ -131,25 +103,16 @@ private void getTableStats() throws Exception { * Get column statistics and insert the result to __internal_schema.column_statistics */ private void getTableColumnStats() throws Exception { - if (isPartitionOnly) { - getPartitionNames(); - List partitionAnalysisSQLs = new ArrayList<>(); - for (String partId : this.partitionNames) { - partitionAnalysisSQLs.add(generateSqlForPartition(partId)); - } - execSQLs(partitionAnalysisSQLs); - } else { - if (!info.usingSqlForPartitionColumn && isPartitionColumn()) { - try { - getPartitionColumnStats(); - } catch (Exception e) { - LOG.warn("Failed to collect stats for partition col {} using metadata, " - + "fallback to normal collection", col.getName(), e); - getOrdinaryColumnStats(); - } - } else { + if (!info.usingSqlForPartitionColumn && isPartitionColumn()) { + try { + getPartitionColumnStats(); + } catch (Exception e) { + LOG.warn("Failed to collect stats for partition col {} using metadata, " + + "fallback to normal collection", col.getName(), e); getOrdinaryColumnStats(); } + } else { + getOrdinaryColumnStats(); } } @@ -182,7 +145,7 @@ private void getOrdinaryColumnStats() throws Exception { params.put("maxFunction", getMaxFunction()); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); String sql = stringSubstitutor.replace(sb.toString()); - executeInsertSql(sql); + runQuery(sql); } private void getPartitionColumnStats() throws Exception { @@ -227,7 +190,7 @@ private void getPartitionColumnStats() throws Exception { params.put("data_size", String.valueOf(dataSize)); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); String sql = stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_TEMPLATE); - executeInsertSql(sql); + runQuery(sql); } private String updateMinValue(String currentMin, String value) { @@ -278,7 +241,7 @@ private void getPartitionNames() { partitionNames = table.getPartitionNames(); } else if (info.partitionCount > 0) { partitionNames = table.getPartitionNames().stream() - .limit(info.partitionCount).collect(Collectors.toSet()); + .limit(info.partitionCount).collect(Collectors.toSet()); } if (partitionNames == null || partitionNames.isEmpty()) { throw new RuntimeException("Not a partition table or no partition specified."); @@ -286,80 +249,6 @@ private void getPartitionNames() { } } - private String generateSqlForPartition(String partId) { - StringBuilder sb = new StringBuilder(); - sb.append(ANALYZE_PARTITION_TEMPLATE); - String[] splits = partId.split("/"); - for (int i = 0; i < splits.length; i++) { - String[] kv = splits[i].split("="); - sb.append(kv[0]); - sb.append("='"); - sb.append(kv[1]); - sb.append("'"); - if (i < splits.length - 1) { - sb.append(" and "); - } - } - Map params = buildStatsParams(partId); - params.put("dataSizeFunction", getDataSizeFunction(col)); - return new StringSubstitutor(params).replace(sb.toString()); - } - - public void execSQLs(List partitionAnalysisSQLs) throws Exception { - long startTime = System.currentTimeMillis(); - LOG.debug("analyze task {} start at {}", info.toString(), new Date()); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { - List> sqlGroups = Lists.partition(partitionAnalysisSQLs, StatisticConstants.UNION_ALL_LIMIT); - for (List group : sqlGroups) { - if (killed) { - return; - } - StringJoiner partitionCollectSQL = new StringJoiner("UNION ALL"); - group.forEach(partitionCollectSQL::add); - stmtExecutor = new StmtExecutor(r.connectContext, partitionCollectSQL.toString()); - buf.add(stmtExecutor.executeInternalQuery() - .stream().map(ColStatsData::new).collect(Collectors.toList())); - QueryState queryState = r.connectContext.getState(); - if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) { - throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s", - catalog.getName(), db.getFullName(), info.colName, partitionCollectSQL, - queryState.getErrorMessage())); - } - } - for (List colStatsDataList : buf) { - StringBuilder batchInsertSQL = - new StringBuilder("INSERT INTO " + StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME - + " VALUES "); - StringJoiner sj = new StringJoiner(","); - colStatsDataList.forEach(c -> sj.add(c.toSQL(true))); - batchInsertSQL.append(sj); - stmtExecutor = new StmtExecutor(r.connectContext, batchInsertSQL.toString()); - executeWithExceptionOnFail(stmtExecutor); - } - } finally { - LOG.debug("analyze task {} end. cost {}ms", info, System.currentTimeMillis() - startTime); - } - - } - - private void executeInsertSql(String sql) throws Exception { - long startTime = System.currentTimeMillis(); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { - r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); - this.stmtExecutor = new StmtExecutor(r.connectContext, sql); - r.connectContext.setExecutor(stmtExecutor); - this.stmtExecutor.execute(); - QueryState queryState = r.connectContext.getState(); - if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) { - LOG.warn(String.format("Failed to analyze %s.%s.%s, sql: [%s], error: [%s]", - catalog.getName(), db.getFullName(), info.colName, sql, queryState.getErrorMessage())); - throw new RuntimeException(queryState.getErrorMessage()); - } - LOG.debug(String.format("Analyze %s.%s.%s done. SQL: [%s]. Cost %d ms.", - catalog.getName(), db.getFullName(), info.colName, sql, (System.currentTimeMillis() - startTime))); - } - } - private Map buildStatsParams(String partId) { Map commonParams = new HashMap<>(); String id = StatisticsUtil.constructId(tbl.getId(), -1); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java index 0c148b5ad8d7f37..649b075c673f5db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java @@ -20,25 +20,17 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.external.JdbcExternalTable; import org.apache.doris.common.FeConstants; -import org.apache.doris.qe.AutoCloseConnectContext; -import org.apache.doris.qe.QueryState; -import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.commons.text.StringSubstitutor; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.util.HashMap; import java.util.List; import java.util.Map; public class JdbcAnalysisTask extends BaseAnalysisTask { - private static final Logger LOG = LogManager.getLogger(JdbcAnalysisTask.class); - private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " SELECT " + private static final String ANALYZE_SQL_TABLE_TEMPLATE = " SELECT " + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " + "${catalogId} AS catalog_id, " + "${dbId} AS db_id, " @@ -117,25 +109,7 @@ private void getTableColumnStats() throws Exception { params.put("dataSizeFunction", getDataSizeFunction(col)); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); String sql = stringSubstitutor.replace(sb.toString()); - executeInsertSql(sql); - } - - private void executeInsertSql(String sql) throws Exception { - long startTime = System.currentTimeMillis(); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { - r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); - this.stmtExecutor = new StmtExecutor(r.connectContext, sql); - r.connectContext.setExecutor(stmtExecutor); - this.stmtExecutor.execute(); - QueryState queryState = r.connectContext.getState(); - if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) { - LOG.warn(String.format("Failed to analyze %s.%s.%s, sql: [%s], error: [%s]", - catalog.getName(), db.getFullName(), info.colName, sql, queryState.getErrorMessage())); - throw new RuntimeException(queryState.getErrorMessage()); - } - LOG.debug(String.format("Analyze %s.%s.%s done. SQL: [%s]. Cost %d ms.", - catalog.getName(), db.getFullName(), info.colName, sql, (System.currentTimeMillis() - startTime))); - } + runQuery(sql); } private Map buildTableStatsParams(String partId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java deleted file mode 100644 index 6a43c5092fa072c..000000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java +++ /dev/null @@ -1,152 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.statistics; - -import org.apache.doris.analysis.CreateMaterializedViewStmt; -import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.FunctionCallExpr; -import org.apache.doris.analysis.PartitionNames; -import org.apache.doris.analysis.SelectListItem; -import org.apache.doris.analysis.SelectStmt; -import org.apache.doris.analysis.SlotRef; -import org.apache.doris.analysis.SqlParser; -import org.apache.doris.analysis.SqlScanner; -import org.apache.doris.analysis.TableRef; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.MaterializedIndexMeta; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.common.FeConstants; -import org.apache.doris.common.util.SqlParserUtils; -import org.apache.doris.statistics.util.StatisticsUtil; - -import com.google.common.base.Preconditions; - -import java.io.StringReader; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -/** - * Analysis for the materialized view, only gets constructed when the AnalyzeStmt is not set which - * columns to be analyzed. - * TODO: Supports multi-table mv - */ -public class MVAnalysisTask extends BaseAnalysisTask { - - private static final String ANALYZE_MV_PART = INSERT_PART_STATISTICS - + " FROM (${sql}) mv ${sampleExpr}"; - - private static final String ANALYZE_MV_COL = INSERT_COL_STATISTICS - + " (SELECT NDV(`${colName}`) AS ndv " - + " FROM (${sql}) mv) t2"; - - private MaterializedIndexMeta meta; - - private SelectStmt selectStmt; - - private OlapTable olapTable; - - public MVAnalysisTask(AnalysisInfo info) { - super(info); - init(); - } - - private void init() { - olapTable = (OlapTable) tbl; - meta = olapTable.getIndexMetaByIndexId(info.indexId); - Preconditions.checkState(meta != null); - String mvDef = meta.getDefineStmt().originStmt; - SqlScanner input = - new SqlScanner(new StringReader(mvDef), 0L); - SqlParser parser = new SqlParser(input); - CreateMaterializedViewStmt cmv = null; - try { - cmv = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, 0); - } catch (Exception e) { - throw new RuntimeException(e); - } - selectStmt = cmv.getSelectStmt(); - selectStmt.getTableRefs().get(0).getName().setDb(db.getFullName()); - } - - @Override - public void doExecute() throws Exception { - for (Column column : meta.getSchema()) { - SelectStmt selectOne = (SelectStmt) selectStmt.clone(); - TableRef tableRef = selectOne.getTableRefs().get(0); - SelectListItem selectItem = selectOne.getSelectList().getItems() - .stream() - .filter(i -> isCorrespondingToColumn(i, column)) - .findFirst() - .get(); - selectItem.setAlias(column.getName()); - Map params = new HashMap<>(); - for (String partName : tbl.getPartitionNames()) { - PartitionNames partitionName = new PartitionNames(false, - Collections.singletonList(partName)); - tableRef.setPartitionNames(partitionName); - String sql = selectOne.toSql(); - params.put("internalDB", FeConstants.INTERNAL_DB_NAME); - params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); - params.put("catalogId", String.valueOf(catalog.getId())); - params.put("dbId", String.valueOf(db.getId())); - params.put("tblId", String.valueOf(tbl.getId())); - params.put("idxId", String.valueOf(meta.getIndexId())); - String colName = column.getName(); - params.put("colId", colName); - String partId = olapTable.getPartition(partName) == null ? "NULL" : - String.valueOf(olapTable.getPartition(partName).getId()); - params.put("partId", partId); - params.put("dataSizeFunction", getDataSizeFunction(column)); - params.put("dbName", db.getFullName()); - params.put("colName", colName); - params.put("tblName", tbl.getName()); - params.put("sql", sql); - StatisticsUtil.execUpdate(ANALYZE_MV_PART, params); - } - params.remove("partId"); - params.remove("sampleExpr"); - params.put("type", column.getType().toString()); - StatisticsUtil.execUpdate(ANALYZE_MV_COL, params); - Env.getCurrentEnv().getStatisticsCache() - .refreshColStatsSync(meta.getIndexId(), meta.getIndexId(), column.getName()); - } - } - - // Based on the fact that materialized view create statement's select expr only contains basic SlotRef and - // AggregateFunction. - private boolean isCorrespondingToColumn(SelectListItem item, Column column) { - Expr expr = item.getExpr(); - if (expr instanceof SlotRef) { - SlotRef slotRef = (SlotRef) expr; - return slotRef.getColumnName().equalsIgnoreCase(column.getName()); - } - if (expr instanceof FunctionCallExpr) { - FunctionCallExpr func = (FunctionCallExpr) expr; - SlotRef slotRef = (SlotRef) func.getChild(0); - return slotRef.getColumnName().equalsIgnoreCase(column.getName()); - } - return false; - } - - @Override - protected void afterExecution() { - // DO NOTHING - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java index 2df7b9c358d5689..00babaadc72862a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -22,27 +22,21 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; -import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.qe.AutoCloseConnectContext; -import org.apache.doris.qe.QueryState; -import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; import org.apache.commons.text.StringSubstitutor; import java.security.SecureRandom; import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.StringJoiner; import java.util.stream.Collectors; /** @@ -50,29 +44,6 @@ */ public class OlapAnalysisTask extends BaseAnalysisTask { - // TODO Currently, NDV is computed for the full table; in fact, - // NDV should only be computed for the relevant partition. - private static final String ANALYZE_COLUMN_SQL_TEMPLATE = INSERT_COL_STATISTICS - + " (SELECT NDV(`${colName}`) AS ndv " - + " FROM `${dbName}`.`${tblName}`) t2"; - - private static final String COLLECT_PARTITION_STATS_SQL_TEMPLATE = - " SELECT " - + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', ${partId}) AS id, " - + "${catalogId} AS catalog_id, " - + "${dbId} AS db_id, " - + "${tblId} AS tbl_id, " - + "${idxId} AS idx_id, " - + "'${colId}' AS col_id, " - + "${partId} AS part_id, " - + "COUNT(1) AS row_count, " - + "NDV(`${colName}`) AS ndv, " - + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, " - + "MIN(`${colName}`) AS min, " - + "MAX(`${colName}`) AS max, " - + "${dataSizeFunction} AS data_size, " - + "NOW() FROM `${dbName}`.`${tblName}` PARTITION ${partitionName}"; - private static final String SAMPLE_COLUMN_SQL_TEMPLATE = "SELECT " + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " + "${catalogId} AS catalog_id, " @@ -91,9 +62,6 @@ public class OlapAnalysisTask extends BaseAnalysisTask { + "FROM `${dbName}`.`${tblName}`" + "${tablets}"; - // cache stats for each partition, it would be inserted into column_statistics in a batch. - private final List> buf = new ArrayList<>(); - @VisibleForTesting public OlapAnalysisTask() { } @@ -146,45 +114,7 @@ protected void doSample() throws Exception { stmtExecutor = new StmtExecutor(r.connectContext, stringSubstitutor.replace(SAMPLE_COLUMN_SQL_TEMPLATE)); // Scalar query only return one row ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0)); - OlapTable olapTable = (OlapTable) tbl; - Collection partitions = olapTable.getPartitions(); - int partitionCount = partitions.size(); - List values = partitions.stream().map(p -> String.format( - "(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())", - StatisticsUtil.quote(StatisticsUtil.constructId(tbl.getId(), -1, col.getName(), p.getId())), - InternalCatalog.INTERNAL_CATALOG_ID, - db.getId(), - tbl.getId(), - -1, - StatisticsUtil.quote(col.getName()), - p.getId(), - colStatsData.count / partitionCount, - colStatsData.ndv / partitionCount, - colStatsData.nullCount / partitionCount, - StatisticsUtil.quote(colStatsData.minLit), - StatisticsUtil.quote(colStatsData.maxLit), - colStatsData.dataSizeInBytes / partitionCount)).collect(Collectors.toList()); - values.add(String.format( - "(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())", - StatisticsUtil.quote(StatisticsUtil.constructId(tbl.getId(), -1, col.getName())), - InternalCatalog.INTERNAL_CATALOG_ID, - db.getId(), - tbl.getId(), - -1, - StatisticsUtil.quote(col.getName()), - "NULL", - colStatsData.count, - colStatsData.ndv, - colStatsData.nullCount, - StatisticsUtil.quote(colStatsData.minLit), - StatisticsUtil.quote(colStatsData.maxLit), - colStatsData.dataSizeInBytes)); - String insertSQL = "INSERT INTO " - + StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME - + " VALUES " - + String.join(",", values); - stmtExecutor = new StmtExecutor(r.connectContext, insertSQL); - executeWithExceptionOnFail(stmtExecutor); + job.appendBuf(this, Collections.singletonList(colStatsData)); } } @@ -210,68 +140,14 @@ protected void doFull() throws Exception { params.put("dbName", db.getFullName()); params.put("colName", String.valueOf(info.colName)); params.put("tblName", String.valueOf(tbl.getName())); - List partitionAnalysisSQLs = new ArrayList<>(); - try { - tbl.readLock(); - - for (String partitionName : partitionNames) { - Partition part = tbl.getPartition(partitionName); - if (part == null) { - continue; - } - params.put("partId", String.valueOf(tbl.getPartition(partitionName).getId())); - // Avoid error when get the default partition - params.put("partitionName", "`" + partitionName + "`"); - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - partitionAnalysisSQLs.add(stringSubstitutor.replace(COLLECT_PARTITION_STATS_SQL_TEMPLATE)); - } - } finally { - tbl.readUnlock(); - } - execSQLs(partitionAnalysisSQLs, params); + execSQL(params); } @VisibleForTesting - public void execSQLs(List partitionAnalysisSQLs, Map params) throws Exception { - long startTime = System.currentTimeMillis(); - LOG.debug("analyze task {} start at {}", info.toString(), new Date()); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(info.jobType.equals(JobType.SYSTEM))) { - List> sqlGroups = Lists.partition(partitionAnalysisSQLs, StatisticConstants.UNION_ALL_LIMIT); - for (List group : sqlGroups) { - if (killed) { - return; - } - StringJoiner partitionCollectSQL = new StringJoiner("UNION ALL"); - group.forEach(partitionCollectSQL::add); - stmtExecutor = new StmtExecutor(r.connectContext, partitionCollectSQL.toString()); - buf.add(stmtExecutor.executeInternalQuery() - .stream().map(ColStatsData::new).collect(Collectors.toList())); - QueryState queryState = r.connectContext.getState(); - if (queryState.getStateType().equals(MysqlStateType.ERR)) { - throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s", - catalog.getName(), db.getFullName(), info.colName, partitionCollectSQL, - queryState.getErrorMessage())); - } - } - for (List colStatsDataList : buf) { - StringBuilder batchInsertSQL = - new StringBuilder("INSERT INTO " + StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME - + " VALUES "); - StringJoiner sj = new StringJoiner(","); - colStatsDataList.forEach(c -> sj.add(c.toSQL(true))); - batchInsertSQL.append(sj.toString()); - stmtExecutor = new StmtExecutor(r.connectContext, batchInsertSQL.toString()); - executeWithExceptionOnFail(stmtExecutor); - } - params.put("type", col.getType().toString()); - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE); - stmtExecutor = new StmtExecutor(r.connectContext, sql); - executeWithExceptionOnFail(stmtExecutor); - } finally { - LOG.debug("analyze task {} end. cost {}ms", info, - System.currentTimeMillis() - startTime); - } + public void execSQL(Map params) throws Exception { + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String collectColStats = stringSubstitutor.replace(COLLECT_COL_STATISTICS); + runQuery(collectColStats); } // Get sample tablets id and scale up scaleFactor diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java index e6b8297d0c0b013..f293cdbff963fd9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java @@ -84,6 +84,8 @@ public class StatisticConstants { public static final String FULL_AUTO_ANALYZE_START_TIME = "00:00:00"; public static final String FULL_AUTO_ANALYZE_END_TIME = "23:59:59"; + public static final int ANALYZE_JOB_BUF_SIZE = 20000; + static { SYSTEM_DBS.add(SystemInfoService.DEFAULT_CLUSTER + ClusterNamespace.CLUSTER_DELIMITER + FeConstants.INTERNAL_DB_NAME); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 8f5bb605b689144..8edf18d3b841c56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -50,7 +50,7 @@ public class StatisticsAutoCollector extends StatisticsCollector { public StatisticsAutoCollector() { super("Automatic Analyzer", - TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes), + TimeUnit.MINUTES.toMillis(1), new AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java index c2f1db6bc4a64cc..638db5539876111 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java @@ -73,14 +73,15 @@ protected void createSystemAnalysisJob(AnalysisInfo jobInfo) return; } - Map analysisTaskInfos = new HashMap<>(); + Map analysisTasks = new HashMap<>(); AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); - analysisManager.createTaskForEachColumns(jobInfo, analysisTaskInfos, false); + analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); + Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, analysisTasks.values()); if (StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) { - analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, false); + analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTasks, false); } - Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTaskInfos); - analysisTaskInfos.values().forEach(analysisTaskExecutor::submitTask); + Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTasks); + analysisTasks.values().forEach(analysisTaskExecutor::submitTask); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java deleted file mode 100644 index f34ad0f1221de7f..000000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java +++ /dev/null @@ -1,50 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.statistics; - -import org.apache.doris.catalog.Env; -import org.apache.doris.common.Config; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -public class StatisticsPeriodCollector extends StatisticsCollector { - private static final Logger LOG = LogManager.getLogger(StatisticsPeriodCollector.class); - - public StatisticsPeriodCollector() { - super("Automatic Analyzer", - TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2, - new AnalysisTaskExecutor(Config.period_analyze_simultaneously_running_task_num)); - } - - @Override - protected void collect() { - try { - AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); - List jobInfos = analysisManager.findPeriodicJobs(); - for (AnalysisInfo jobInfo : jobInfos) { - createSystemAnalysisJob(jobInfo); - } - } catch (Exception e) { - LOG.warn("Failed to periodically analyze the statistics." + e); - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java index 3f9b2641b752240..9ef314c8505aebd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java @@ -52,7 +52,7 @@ public String toSQL() { sj.add(String.valueOf(tblId)); sj.add(String.valueOf(idxId)); sj.add(StatisticsUtil.quote(colId)); - sj.add(StatisticsUtil.quote(partId)); + sj.add(partId); return sj.toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 169ac3e33838b31..37201d80162be07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -177,7 +177,7 @@ public static AutoCloseConnectContext buildConnectContext(boolean limitScan) { sessionVariable.enablePageCache = false; sessionVariable.parallelExecInstanceNum = Config.statistics_sql_parallel_exec_instance_num; sessionVariable.parallelPipelineTaskNum = Config.statistics_sql_parallel_exec_instance_num; - sessionVariable.setEnableNereidsPlanner(false); + sessionVariable.setEnableNereidsPlanner(true); sessionVariable.enableProfile = false; sessionVariable.enableScanRunSerial = limitScan; sessionVariable.queryTimeoutS = Config.analyze_task_timeout_in_hours * 60 * 60; diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java index f01485f642fac0f..1c90e2ff396a492 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java @@ -17,25 +17,9 @@ package org.apache.doris.statistics; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.InternalSchemaInitializer; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.common.FeConstants; -import org.apache.doris.datasource.InternalCatalog; -import org.apache.doris.qe.AutoCloseConnectContext; -import org.apache.doris.qe.ConnectContext; +import org.apache.doris.catalog.Env; import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; -import org.apache.doris.statistics.AnalysisInfo.AnalysisMode; -import org.apache.doris.statistics.AnalysisInfo.AnalysisType; -import org.apache.doris.statistics.AnalysisInfo.JobType; -import org.apache.doris.statistics.util.DBObjects; -import org.apache.doris.statistics.util.StatisticsUtil; -import org.apache.doris.utframe.TestWithFeService; - -import com.google.common.collect.Maps; + import mockit.Expectations; import mockit.Mock; import mockit.MockUp; @@ -44,136 +28,199 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class AnalysisJobTest extends TestWithFeService { - - @Override - protected void runBeforeAll() throws Exception { - try { - InternalSchemaInitializer.createDB(); - createDatabase("analysis_job_test"); - connectContext.setDatabase("default_cluster:analysis_job_test"); - createTable("CREATE TABLE t1 (col1 int not null, col2 int not null, col3 int not null)\n" - + "DISTRIBUTED BY HASH(col3)\n" + "BUCKETS 1\n" - + "PROPERTIES(\n" + " \"replication_num\"=\"1\"\n" - + ");"); - } catch (Exception e) { - throw new RuntimeException(e); - } - FeConstants.runningUnitTest = true; - } +import java.util.HashSet; + +public class AnalysisJobTest { @Test - public void testCreateAnalysisJob() throws Exception { + public void initTest(@Mocked AnalysisInfo jobInfo, @Mocked OlapAnalysisTask task) { + AnalysisJob analysisJob = new AnalysisJob(jobInfo, Arrays.asList(task)); + Assertions.assertSame(task.job, analysisJob); + } - new MockUp() { + @Test + public void testAppendBufTest1(@Mocked AnalysisInfo analysisInfo, @Mocked OlapAnalysisTask olapAnalysisTask) { + new MockUp() { + @Mock + protected void writeBuf() { + } @Mock - public AutoCloseConnectContext buildConnectContext() { - return new AutoCloseConnectContext(connectContext); + public void updateTaskState(AnalysisState state, String msg) { } @Mock - public void execUpdate(String sql) throws Exception { + public void deregisterJob() { } }; + AnalysisJob job = new AnalysisJob(analysisInfo, Arrays.asList(olapAnalysisTask)); + job.queryingTask = new HashSet<>(); + job.queryingTask.add(olapAnalysisTask); + job.queryFinished = new HashSet<>(); + job.buf = new ArrayList<>(); + job.total = 20; + + job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData())); + new Expectations() { + { + job.writeBuf(); + times = 0; + } + }; + } - new MockUp() { + @Test + public void testAppendBufTest2(@Mocked AnalysisInfo analysisInfo, @Mocked OlapAnalysisTask olapAnalysisTask) { + new MockUp() { @Mock - public List executeInternalQuery() { - return Collections.emptyList(); + protected void writeBuf() { } - }; - new MockUp() { + @Mock + public void updateTaskState(AnalysisState state, String msg) { + } @Mock - public ConnectContext get() { - return connectContext; + public void deregisterJob() { + } + }; + AnalysisJob job = new AnalysisJob(analysisInfo, Arrays.asList(olapAnalysisTask)); + job.queryingTask = new HashSet<>(); + job.queryingTask.add(olapAnalysisTask); + job.queryFinished = new HashSet<>(); + job.buf = new ArrayList<>(); + job.total = 1; + + job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData())); + new Expectations() { + { + job.writeBuf(); + times = 1; + job.deregisterJob(); + times = 1; } }; - String sql = "ANALYZE TABLE t1"; - Assertions.assertNotNull(getSqlStmtExecutor(sql)); } @Test - public void testJobExecution(@Mocked StmtExecutor stmtExecutor, @Mocked InternalCatalog catalog, @Mocked - Database database, - @Mocked OlapTable olapTable) - throws Exception { - new MockUp() { - + public void testAppendBufTest3(@Mocked AnalysisInfo analysisInfo, @Mocked OlapAnalysisTask olapAnalysisTask) { + new MockUp() { @Mock - public Column getColumn(String name) { - return new Column("col1", PrimitiveType.INT); + protected void writeBuf() { } - }; - - new MockUp() { @Mock - public ConnectContext buildConnectContext() { - return connectContext; + public void updateTaskState(AnalysisState state, String msg) { } @Mock - public void execUpdate(String sql) throws Exception { + public void deregisterJob() { } + }; + AnalysisJob job = new AnalysisJob(analysisInfo, Arrays.asList(olapAnalysisTask)); + job.queryingTask = new HashSet<>(); + job.queryingTask.add(olapAnalysisTask); + job.queryFinished = new HashSet<>(); + job.buf = new ArrayList<>(); + ColStatsData colStatsData = new ColStatsData(); + for (int i = 0; i < StatisticConstants.ANALYZE_JOB_BUF_SIZE; i++) { + job.buf.add(colStatsData); + } + job.total = 100; - @Mock - public DBObjects convertIdToObjects(long catalogId, long dbId, long tblId) { - return new DBObjects(catalog, database, olapTable); + job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData())); + new Expectations() { + { + job.writeBuf(); + times = 1; } }; - new MockUp() { + } + @Test + public void testUpdateTaskState( + @Mocked AnalysisInfo info, + @Mocked OlapAnalysisTask task1, + @Mocked OlapAnalysisTask task2) { + new MockUp() { @Mock - public void syncLoadColStats(long tableId, long idxId, String colName) { + public void updateTaskStatus(AnalysisInfo info, AnalysisState taskState, String message, long time) { } }; - new MockUp() { - + AnalysisManager analysisManager = new AnalysisManager(); + new MockUp() { @Mock - public void execute() throws Exception { - + public AnalysisManager getAnalysisManager() { + return analysisManager; } + }; + AnalysisJob job = new AnalysisJob(info, Collections.singletonList(task1)); + job.queryFinished = new HashSet<>(); + job.queryFinished.add(task2); + job.updateTaskState(AnalysisState.FAILED, ""); + new Expectations() { + { + analysisManager.updateTaskStatus((AnalysisInfo) any, (AnalysisState) any, anyString, anyLong); + times = 2; + } + }; + } + @Test + public void testWriteBuf1(@Mocked AnalysisInfo info, + @Mocked OlapAnalysisTask task1, @Mocked OlapAnalysisTask task2) { + AnalysisJob job = new AnalysisJob(info, Collections.singletonList(task1)); + job.queryFinished = new HashSet<>(); + job.queryFinished.add(task2); + new MockUp() { @Mock - public List executeInternalQuery() { - return new ArrayList<>(); + public void updateTaskState(AnalysisState state, String msg) { } - }; - new MockUp() { + @Mock + protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exception { + + } @Mock - public void execSQLs(List partitionAnalysisSQLs, Map params) throws Exception {} + protected void syncLoadStats() { + } }; - HashMap> colToPartitions = Maps.newHashMap(); - colToPartitions.put("col1", Collections.singleton("t1")); - AnalysisInfo analysisJobInfo = new AnalysisInfoBuilder().setJobId(0).setTaskId(0) - .setCatalogId(0) - .setDBId(0) - .setTblId(0) - .setColName("col1").setJobType(JobType.MANUAL) - .setAnalysisMode(AnalysisMode.FULL) - .setAnalysisMethod(AnalysisMethod.FULL) - .setAnalysisType(AnalysisType.FUNDAMENTALS) - .setColToPartitions(colToPartitions) - .setState(AnalysisState.RUNNING) - .build(); - new OlapAnalysisTask(analysisJobInfo).doExecute(); new Expectations() { { - stmtExecutor.execute(); + job.syncLoadStats(); times = 1; } }; + job.writeBuf(); + + Assertions.assertEquals(0, job.queryFinished.size()); + } + + @Test + public void testWriteBuf2(@Mocked AnalysisInfo info, + @Mocked OlapAnalysisTask task1, @Mocked OlapAnalysisTask task2) { + AnalysisJob job = new AnalysisJob(info, Collections.singletonList(task1)); + job.queryFinished = new HashSet<>(); + job.queryFinished.add(task2); + new MockUp() { + @Mock + public void updateTaskState(AnalysisState state, String msg) { + } + + @Mock + protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exception { + throw new RuntimeException(); + } + + @Mock + protected void syncLoadStats() { + } + }; + job.writeBuf(); + Assertions.assertEquals(1, job.queryFinished.size()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java new file mode 100644 index 000000000000000..bd1135562da45b0 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.InternalSchemaInitializer; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.FeConstants; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.qe.AutoCloseConnectContext; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; +import org.apache.doris.statistics.AnalysisInfo.AnalysisMode; +import org.apache.doris.statistics.AnalysisInfo.AnalysisType; +import org.apache.doris.statistics.AnalysisInfo.JobType; +import org.apache.doris.statistics.util.DBObjects; +import org.apache.doris.statistics.util.StatisticsUtil; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.Maps; +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class AnalyzeTest extends TestWithFeService { + + @Override + protected void runBeforeAll() throws Exception { + try { + InternalSchemaInitializer.createDB(); + createDatabase("analysis_job_test"); + connectContext.setDatabase("default_cluster:analysis_job_test"); + createTable("CREATE TABLE t1 (col1 int not null, col2 int not null, col3 int not null)\n" + + "DISTRIBUTED BY HASH(col3)\n" + "BUCKETS 1\n" + + "PROPERTIES(\n" + " \"replication_num\"=\"1\"\n" + + ");"); + } catch (Exception e) { + throw new RuntimeException(e); + } + FeConstants.runningUnitTest = true; + } + + @Test + public void testCreateAnalysisJob() throws Exception { + + new MockUp() { + + @Mock + public AutoCloseConnectContext buildConnectContext() { + return new AutoCloseConnectContext(connectContext); + } + + @Mock + public void execUpdate(String sql) throws Exception { + } + }; + + new MockUp() { + @Mock + public List executeInternalQuery() { + return Collections.emptyList(); + } + }; + + new MockUp() { + + @Mock + public ConnectContext get() { + return connectContext; + } + }; + String sql = "ANALYZE TABLE t1"; + Assertions.assertNotNull(getSqlStmtExecutor(sql)); + } + + @Test + public void testJobExecution(@Mocked StmtExecutor stmtExecutor, @Mocked InternalCatalog catalog, @Mocked + Database database, + @Mocked OlapTable olapTable) + throws Exception { + new MockUp() { + + @Mock + public Column getColumn(String name) { + return new Column("col1", PrimitiveType.INT); + } + }; + + new MockUp() { + + @Mock + public ConnectContext buildConnectContext() { + return connectContext; + } + + @Mock + public void execUpdate(String sql) throws Exception { + } + + @Mock + public DBObjects convertIdToObjects(long catalogId, long dbId, long tblId) { + return new DBObjects(catalog, database, olapTable); + } + }; + new MockUp() { + + @Mock + public void syncLoadColStats(long tableId, long idxId, String colName) { + } + }; + new MockUp() { + + @Mock + public void execute() throws Exception { + + } + + @Mock + public List executeInternalQuery() { + return new ArrayList<>(); + } + }; + + new MockUp() { + + @Mock + public void execSQLs(List partitionAnalysisSQLs, Map params) throws Exception {} + }; + HashMap> colToPartitions = Maps.newHashMap(); + colToPartitions.put("col1", Collections.singleton("t1")); + AnalysisInfo analysisJobInfo = new AnalysisInfoBuilder().setJobId(0).setTaskId(0) + .setCatalogId(0) + .setDBId(0) + .setTblId(0) + .setColName("col1").setJobType(JobType.MANUAL) + .setAnalysisMode(AnalysisMode.FULL) + .setAnalysisMethod(AnalysisMethod.FULL) + .setAnalysisType(AnalysisType.FUNDAMENTALS) + .setColToPartitions(colToPartitions) + .setState(AnalysisState.RUNNING) + .build(); + new OlapAnalysisTask(analysisJobInfo).doExecute(); + new Expectations() { + { + stmtExecutor.execute(); + times = 1; + } + }; + } + +}