diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index a2ec9fb03b00dd..a22070a0fd178f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -46,6 +46,7 @@ import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand; import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo; +import org.apache.doris.qe.AuditLogHelper; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.StmtExecutor; @@ -199,7 +200,7 @@ public void run() throws JobException { // need get names before exec Map execPartitionSnapshots = MTMVPartitionUtil .generatePartitionSnapshots(context, relation.getBaseTablesOneLevel(), execPartitionNames); - exec(ctx, execPartitionNames, tableWithPartKey); + exec(execPartitionNames, tableWithPartKey); completedPartitions.addAll(execPartitionNames); partitionSnapshots.putAll(execPartitionSnapshots); } @@ -214,10 +215,10 @@ public void run() throws JobException { } } - private void exec(ConnectContext ctx, Set refreshPartitionNames, + private void exec(Set refreshPartitionNames, Map tableWithPartKey) throws Exception { - Objects.requireNonNull(ctx, "ctx should not be null"); + ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv); StatementContext statementContext = new StatementContext(); ctx.setStatementContext(statementContext); TUniqueId queryId = generateQueryId(); @@ -226,20 +227,34 @@ private void exec(ConnectContext ctx, Set refreshPartitionNames, UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand .from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE ? refreshPartitionNames : Sets.newHashSet(), tableWithPartKey); - executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); - ctx.setExecutor(executor); - ctx.setQueryId(queryId); - ctx.getState().setNereids(true); - command.run(ctx, executor); - if (getStatus() == TaskStatus.CANCELED) { - // Throwing an exception to interrupt subsequent partition update tasks - throw new JobException("task is CANCELED"); - } - if (ctx.getState().getStateType() != MysqlStateType.OK) { - throw new JobException(ctx.getState().getErrorMessage()); + try { + executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); + ctx.setExecutor(executor); + ctx.setQueryId(queryId); + ctx.getState().setNereids(true); + command.run(ctx, executor); + if (getStatus() == TaskStatus.CANCELED) { + // Throwing an exception to interrupt subsequent partition update tasks + throw new JobException("task is CANCELED"); + } + if (ctx.getState().getStateType() != MysqlStateType.OK) { + throw new JobException(ctx.getState().getErrorMessage()); + } + } finally { + if (executor != null) { + AuditLogHelper.logAuditLog(ctx, getDummyStmt(refreshPartitionNames), + executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog(), true); + } } } + private String getDummyStmt(Set refreshPartitionNames) { + return String.format( + "Asynchronous materialized view refresh task, mvName: %s," + + "taskId: %s, partitions refreshed by this insert overwrite: %s", + mtmv.getName(), super.getTaskId(), refreshPartitionNames); + } + @Override public synchronized void onFail() throws JobException { LOG.info("mtmv task onFail, taskId: {}", super.getTaskId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java index 8dcca9851c5320..0a93af5676f1c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java @@ -64,6 +64,7 @@ public static ConnectContext createMTMVContext(MTMV mtmv) { if (workloadGroup.isPresent()) { ctx.getSessionVariable().setWorkloadGroup(workloadGroup.get()); } + ctx.setStartTime(); return ctx; }