Skip to content

Commit

Permalink
core function finished
Browse files Browse the repository at this point in the history
  • Loading branch information
Kikyou1997 committed Nov 1, 2023
1 parent 7ba4f91 commit 97b1912
Show file tree
Hide file tree
Showing 19 changed files with 591 additions and 678 deletions.
7 changes: 0 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@
import org.apache.doris.statistics.StatisticsAutoCollector;
import org.apache.doris.statistics.StatisticsCache;
import org.apache.doris.statistics.StatisticsCleaner;
import org.apache.doris.statistics.StatisticsPeriodCollector;
import org.apache.doris.statistics.query.QueryStats;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
Expand Down Expand Up @@ -495,8 +494,6 @@ public class Env {

private StatisticsAutoCollector statisticsAutoCollector;

private StatisticsPeriodCollector statisticsPeriodCollector;

private HiveTransactionMgr hiveTransactionMgr;

private TopicPublisherThread topicPublisherThread;
Expand Down Expand Up @@ -720,7 +717,6 @@ private Env(boolean isCheckpointCatalog) {
this.analysisManager = new AnalysisManager();
this.statisticsCleaner = new StatisticsCleaner();
this.statisticsAutoCollector = new StatisticsAutoCollector();
this.statisticsPeriodCollector = new StatisticsPeriodCollector();
this.globalFunctionMgr = new GlobalFunctionMgr();
this.workloadGroupMgr = new WorkloadGroupMgr();
this.queryStats = new QueryStats();
Expand Down Expand Up @@ -971,9 +967,6 @@ public void initialize(String[] args) throws Exception {
if (statisticsAutoCollector != null) {
statisticsAutoCollector.start();
}
if (statisticsPeriodCollector != null) {
statisticsPeriodCollector.start();
}

queryCancelWorker.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.HistogramTask;
import org.apache.doris.statistics.MVAnalysisTask;
import org.apache.doris.statistics.OlapAnalysisTask;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.util.StatisticsUtil;
Expand Down Expand Up @@ -1122,11 +1121,9 @@ public TTableDescriptor toThrift() {
public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
if (info.analysisType.equals(AnalysisType.HISTOGRAM)) {
return new HistogramTask(info);
}
if (info.analysisType.equals(AnalysisType.FUNDAMENTALS)) {
} else {
return new OlapAnalysisTask(info);
}
return new MVAnalysisTask(info);
}

public boolean needReAnalyzeTable(TableStatsMeta tblStats) {
Expand Down
171 changes: 171 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.statistics;

import org.apache.doris.catalog.Env;
import org.apache.doris.qe.AuditLogHelper;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.util.StatisticsUtil;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringJoiner;

public class AnalysisJob {

public static final Logger LOG = LogManager.getLogger(AnalysisJob.class);

protected Set<BaseAnalysisTask> queryingTask;

protected Set<BaseAnalysisTask> queryFinished;

protected List<ColStatsData> buf;

protected int total;

protected int acc;

protected StmtExecutor stmtExecutor;

protected boolean killed;

protected long start;

protected AnalysisInfo jobInfo;

public AnalysisJob(AnalysisInfo jobInfo, Collection<? extends BaseAnalysisTask> queryingTask) {
for (BaseAnalysisTask task : queryingTask) {
task.job = this;
}
this.queryingTask = new HashSet<>(queryingTask);
this.queryFinished = new HashSet<>();
this.buf = new ArrayList<>();
total = queryingTask.size();
start = System.currentTimeMillis();
this.jobInfo = jobInfo;
}

public synchronized void appendBuf(BaseAnalysisTask task, List<ColStatsData> statsData) {
queryingTask.remove(task);
buf.addAll(statsData);
queryFinished.add(task);
acc += 1;
if (acc == total) {
writeBuf();
updateTaskState(AnalysisState.FINISHED, "Cost time in sec: "
+ (System.currentTimeMillis() - start) / 1000);
deregisterJob();
} else if (buf.size() >= StatisticConstants.ANALYZE_JOB_BUF_SIZE) {
writeBuf();
}
}

// CHECKSTYLE OFF
// fallthrough here is expected
public void updateTaskState(AnalysisState state, String msg) {
long time = System.currentTimeMillis();
switch (state) {
case FAILED:
for (BaseAnalysisTask task : queryingTask) {
Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(task.info, state, msg, time);
}
case FINISHED:
for (BaseAnalysisTask task : queryFinished) {
Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(task.info, state, msg, time);
}
default:
// DO NOTHING
}
}

protected void writeBuf() {
String insertStmt = "INSERT INTO " + StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME + " VALUES ";
StringJoiner values = new StringJoiner(",");
for (ColStatsData data : buf) {
values.add(data.toSQL(true));
}
insertStmt += values.toString();
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) {
stmtExecutor = new StmtExecutor(r.connectContext, insertStmt);
executeWithExceptionOnFail(stmtExecutor);
} catch (Throwable t) {
LOG.warn("Failed to write buf: " + insertStmt, t);
updateTaskState(AnalysisState.FAILED, t.getMessage());
return;
}
updateTaskState(AnalysisState.FINISHED, "");
syncLoadStats();
queryFinished.clear();
}

protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exception {
if (killed) {
return;
}
LOG.debug("execute internal sql: {}", stmtExecutor.getOriginStmt());
try {
stmtExecutor.execute();
QueryState queryState = stmtExecutor.getContext().getState();
if (queryState.getStateType().equals(MysqlStateType.ERR)) {
throw new RuntimeException(
"Failed to insert : " + stmtExecutor.getOriginStmt().originStmt + "Error msg: "
+ queryState.getErrorMessage());
}
} finally {
AuditLogHelper.logAuditLog(stmtExecutor.getContext(), stmtExecutor.getOriginStmt().toString(),
stmtExecutor.getParsedStmt(), stmtExecutor.getQueryStatisticsForAuditLog(),
true);
}
}

public void taskFailed(BaseAnalysisTask task, String reason) {
updateTaskState(AnalysisState.FAILED, reason);
cancel();
deregisterJob();
}

public void cancel() {
for (BaseAnalysisTask task : queryingTask) {
task.cancel();
}
}

public void deregisterJob() {
Env.getCurrentEnv().getAnalysisManager().removeJob(jobInfo.jobId);
}

protected void syncLoadStats() {
long tblId = jobInfo.tblId;
for (BaseAnalysisTask task : queryFinished) {
String colName = task.col.getName();
if (!Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tblId, -1, colName)) {
Env.getCurrentEnv().getAnalysisManager().removeColStatsStatus(tblId, colName);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public class AnalysisManager extends Daemon implements Writable {

private final Map<Long, TableStatsMeta> idToTblStats = new ConcurrentHashMap<>();

private final Map<Long, AnalysisJob> idToAnalysisJob = new ConcurrentHashMap<>();

protected SimpleQueue<AnalysisInfo> autoJobs = createSimpleQueue(null, this);

private final Function<TaskStatusWrapper, Void> userJobStatusUpdater = w -> {
Expand Down Expand Up @@ -371,6 +373,7 @@ protected AnalysisInfo buildAndAssignJob(AnalyzeTblStmt stmt) throws DdlExceptio
boolean isSync = stmt.isSync();
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync);
constructJob(jobInfo, analysisTaskInfos.values());
if (!jobInfo.partitionOnly && stmt.isAllColumns()
&& StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) {
createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, isSync);
Expand Down Expand Up @@ -1129,4 +1132,17 @@ public ColStatsMeta findColStatsMeta(long tblId, String colName) {
}
return tableStats.findColumnStatsMeta(colName);
}

public AnalysisJob findJob(long id) {
return idToAnalysisJob.get(id);
}

public void constructJob(AnalysisInfo jobInfo, Collection<? extends BaseAnalysisTask> tasks) {
AnalysisJob job = new AnalysisJob(jobInfo, tasks);
idToAnalysisJob.put(jobInfo.jobId, job);
}

public void removeJob(long id) {
idToAnalysisJob.remove(id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.statistics;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
Expand Down Expand Up @@ -59,9 +58,8 @@ public void run() {
if (task.info.scheduleType.equals(ScheduleType.AUTOMATIC) && !StatisticsUtil.inAnalyzeTime(
LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) {
// TODO: Do we need a separate AnalysisState here?
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(task.info, AnalysisState.FAILED, "Auto task"
+ "doesn't get executed within specified time range", System.currentTimeMillis());
task.job.taskFailed(task, "Auto task"
+ "doesn't get executed within specified time range");
return;
}
executor.putJob(this);
Expand All @@ -76,15 +74,7 @@ public void run() {
if (!task.killed) {
if (except != null) {
LOG.warn("Analyze {} failed.", task.toString(), except);
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(task.info,
AnalysisState.FAILED, Util.getRootCauseMessage(except), System.currentTimeMillis());
} else {
LOG.debug("Analyze {} finished, cost time:{}", task.toString(),
System.currentTimeMillis() - startTime);
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(task.info,
AnalysisState.FINISHED, "", System.currentTimeMillis());
task.job.taskFailed(task, Util.getRootCauseMessage(except));
}
}
}
Expand Down
Loading

0 comments on commit 97b1912

Please sign in to comment.