From a7d267cd96b94a6d6f0a453065e4c8dadb1c3584 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 26 Dec 2024 11:45:53 +0800 Subject: [PATCH] 1 --- .../main/java/org/apache/doris/job/base/AbstractJob.java | 8 ++++---- .../src/main/java/org/apache/doris/job/base/Job.java | 2 +- .../apache/doris/job/executor/DispatchTaskHandler.java | 2 +- .../org/apache/doris/job/extensions/insert/InsertJob.java | 4 ++-- .../apache/doris/job/extensions/insert/InsertTask.java | 2 +- .../org/apache/doris/job/extensions/mtmv/MTMVTask.java | 4 ++-- .../main/java/org/apache/doris/job/task/AbstractTask.java | 8 ++++---- .../src/main/java/org/apache/doris/job/task/Task.java | 4 +++- .../src/main/java/org/apache/doris/qe/StmtExecutor.java | 8 ++++++-- 9 files changed, 24 insertions(+), 18 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 906b86494fb748..b6f62f5121b5ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -149,12 +149,12 @@ public AbstractJob(Long jobId, String jobName, JobStatus jobStatus, private Lock createTaskLock = new ReentrantLock(); @Override - public void cancelAllTasks() throws JobException { + public void cancelAllTasks(boolean needWaitCancelComplete) throws JobException { if (CollectionUtils.isEmpty(runningTasks)) { return; } for (T task : runningTasks) { - task.cancel(); + task.cancel(needWaitCancelComplete); canceledTaskCount.incrementAndGet(); } runningTasks = new CopyOnWriteArrayList<>(); @@ -184,7 +184,7 @@ public void cancelTaskById(long taskId) throws JobException { throw new JobException("no running task"); } runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst() - .orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel(); + .orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel(true); runningTasks.removeIf(task -> task.getTaskId().equals(taskId)); canceledTaskCount.incrementAndGet(); if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) { @@ -292,7 +292,7 @@ public void updateJobStatus(JobStatus newJobStatus) throws JobException { this.finishTimeMs = System.currentTimeMillis(); } if (JobStatus.PAUSED.equals(newJobStatus) || JobStatus.STOPPED.equals(newJobStatus)) { - cancelAllTasks(); + cancelAllTasks(JobStatus.STOPPED.equals(newJobStatus) ? false : true); } jobStatus = newJobStatus; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java index a7e75554c71980..69d1e5e55fb01d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java @@ -101,7 +101,7 @@ public interface Job { * Cancels all running tasks of this job. * @throws JobException If cancelling a running task fails. */ - void cancelAllTasks() throws JobException; + void cancelAllTasks(boolean needWaitCancelComplete) throws JobException; /** * register job diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java index b8f726c4a0c76f..56222fd3e1fe60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java @@ -66,7 +66,7 @@ public void onEvent(TimerJobEvent event) { JobType jobType = event.getJob().getJobType(); for (AbstractTask task : tasks) { if (!disruptorMap.get(jobType).addTask(task)) { - task.cancel(); + task.cancel(true); continue; } log.info("dispatch timer job success, job id is {}, task id is {}", diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index f4a91498fea1e5..f87eb29d9d8c32 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -297,12 +297,12 @@ public void cancelTaskById(long taskId) throws JobException { } @Override - public void cancelAllTasks() throws JobException { + public void cancelAllTasks(boolean needWaitCancelComplete) throws JobException { try { if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { checkAuth("CANCEL LOAD"); } - super.cancelAllTasks(); + super.cancelAllTasks(needWaitCancelComplete); this.failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel"); } catch (DdlException e) { throw new JobException(e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index a577250dc865ed..883c42653167a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -223,7 +223,7 @@ public void onSuccess() throws JobException { } @Override - protected void executeCancelLogic() { + protected void executeCancelLogic(boolean needWaitCancelComplete) { if (isFinished.get() || isCanceled.get()) { return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 31e6c8353e24b3..aa1bbe629fd597 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -300,10 +300,10 @@ public synchronized void onSuccess() throws JobException { } @Override - protected synchronized void executeCancelLogic() { + protected synchronized void executeCancelLogic(boolean needWaitCancelComplete) { LOG.info("mtmv task cancel, taskId: {}", super.getTaskId()); if (executor != null) { - executor.cancel(new Status(TStatusCode.CANCELLED, "mtmv task cancelled")); + executor.cancel(new Status(TStatusCode.CANCELLED, "mtmv task cancelled"), needWaitCancelComplete); } after(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index 8a230c0bd385f7..b356bc58d32a05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -129,16 +129,16 @@ public void onSuccess() throws JobException { /** * Cancels the ongoing task, updating its status to {@link TaskStatus#CANCELED} and releasing associated resources. * This method encapsulates the core cancellation logic, calling the abstract method - * {@link #executeCancelLogic()} for task-specific actions. + * {@link #executeCancelLogic(boolean)} for task-specific actions. * * @throws JobException If an error occurs during the cancellation process, a new JobException is thrown wrapping * the original exception. */ @Override - public void cancel() throws JobException { + public void cancel(boolean needWaitCancelComplete) throws JobException { try { status = TaskStatus.CANCELED; - executeCancelLogic(); + executeCancelLogic(needWaitCancelComplete); } catch (Exception e) { log.warn("cancel task failed, job id is {}, task id is {}", jobId, taskId, e); throw new JobException(e); @@ -153,7 +153,7 @@ public void cancel() throws JobException { * * @throws Exception Any exception that might occur during the cancellation process in the subclass. */ - protected abstract void executeCancelLogic() throws Exception; + protected abstract void executeCancelLogic(boolean needWaitCancelComplete) throws Exception; @Override public void before() throws JobException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java index ee205c55c315ab..d184f6470754f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java @@ -63,8 +63,10 @@ public interface Task { /** * This method is called to cancel the execution of the task. * Implementations should define the necessary steps to cancel the task. + * + * @param needWaitCancelComplete Do we need to wait for the cancellation to be completed. */ - void cancel() throws JobException; + void cancel(boolean needWaitCancelComplete) throws JobException; /** * get info for tvf `tasks` diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 055a4c31e9075e..b155b468a7ec18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1586,7 +1586,7 @@ private void resetAnalyzerAndStmt() { } // Because this is called by other thread - public void cancel(Status cancelReason) { + public void cancel(Status cancelReason, boolean needWaitCancelComplete) { if (masterOpExecutor != null) { try { masterOpExecutor.cancel(); @@ -1610,12 +1610,16 @@ public void cancel(Status cancelReason) { if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof AnalyzeDBStmt) { Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context); } - if (insertOverwriteTableCommand.isPresent()) { + if (insertOverwriteTableCommand.isPresent() && needWaitCancelComplete) { // Wait for the command to run or cancel completion insertOverwriteTableCommand.get().waitNotRunning(); } } + public void cancel(Status cancelReason) { + cancel(cancelReason, true); + } + private Optional getInsertOverwriteTableCommand() { if (parsedStmt instanceof LogicalPlanAdapter) { LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) parsedStmt;