Skip to content

Commit

Permalink
[enhance](mtmv)When drop MTMV, no longer wait for task cancel to comp…
Browse files Browse the repository at this point in the history
…lete (#45995) (#46025)

pick: #45995
  • Loading branch information
zddr authored Dec 26, 2024
1 parent 3a8df5f commit 7d5e8a1
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,12 @@ public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
private Lock createTaskLock = new ReentrantLock();

@Override
public void cancelAllTasks() throws JobException {
public void cancelAllTasks(boolean needWaitCancelComplete) throws JobException {
if (CollectionUtils.isEmpty(runningTasks)) {
return;
}
for (T task : runningTasks) {
task.cancel();
task.cancel(needWaitCancelComplete);
canceledTaskCount.incrementAndGet();
}
runningTasks = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -184,7 +184,7 @@ public void cancelTaskById(long taskId) throws JobException {
throw new JobException("no running task");
}
runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst()
.orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel();
.orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel(true);
runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
canceledTaskCount.incrementAndGet();
if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
Expand Down Expand Up @@ -292,7 +292,7 @@ public void updateJobStatus(JobStatus newJobStatus) throws JobException {
this.finishTimeMs = System.currentTimeMillis();
}
if (JobStatus.PAUSED.equals(newJobStatus) || JobStatus.STOPPED.equals(newJobStatus)) {
cancelAllTasks();
cancelAllTasks(JobStatus.STOPPED.equals(newJobStatus) ? false : true);
}
jobStatus = newJobStatus;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public interface Job<T extends AbstractTask, C> {
* Cancels all running tasks of this job.
* @throws JobException If cancelling a running task fails.
*/
void cancelAllTasks() throws JobException;
void cancelAllTasks(boolean needWaitCancelComplete) throws JobException;

/**
* register job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void onEvent(TimerJobEvent<T> event) {
JobType jobType = event.getJob().getJobType();
for (AbstractTask task : tasks) {
if (!disruptorMap.get(jobType).addTask(task)) {
task.cancel();
task.cancel(true);
continue;
}
log.info("dispatch timer job success, job id is {}, task id is {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,12 @@ public void cancelTaskById(long taskId) throws JobException {
}

@Override
public void cancelAllTasks() throws JobException {
public void cancelAllTasks(boolean needWaitCancelComplete) throws JobException {
try {
if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
checkAuth("CANCEL LOAD");
}
super.cancelAllTasks();
super.cancelAllTasks(needWaitCancelComplete);
this.failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel");
} catch (DdlException e) {
throw new JobException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public void onSuccess() throws JobException {
}

@Override
protected void executeCancelLogic() {
protected void executeCancelLogic(boolean needWaitCancelComplete) {
if (isFinished.get() || isCanceled.get()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,10 @@ public synchronized void onSuccess() throws JobException {
}

@Override
protected synchronized void executeCancelLogic() {
protected synchronized void executeCancelLogic(boolean needWaitCancelComplete) {
LOG.info("mtmv task cancel, taskId: {}", super.getTaskId());
if (executor != null) {
executor.cancel("mtmv task cancelled");
executor.cancel("mtmv task cancelled", needWaitCancelComplete);
}
after();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,16 @@ public void onSuccess() throws JobException {
/**
* Cancels the ongoing task, updating its status to {@link TaskStatus#CANCELED} and releasing associated resources.
* This method encapsulates the core cancellation logic, calling the abstract method
* {@link #executeCancelLogic()} for task-specific actions.
* {@link #executeCancelLogic(boolean)} for task-specific actions.
*
* @throws JobException If an error occurs during the cancellation process, a new JobException is thrown wrapping
* the original exception.
*/
@Override
public void cancel() throws JobException {
public void cancel(boolean needWaitCancelComplete) throws JobException {
try {
status = TaskStatus.CANCELED;
executeCancelLogic();
executeCancelLogic(needWaitCancelComplete);
} catch (Exception e) {
log.warn("cancel task failed, job id is {}, task id is {}", jobId, taskId, e);
throw new JobException(e);
Expand All @@ -153,7 +153,7 @@ public void cancel() throws JobException {
*
* @throws Exception Any exception that might occur during the cancellation process in the subclass.
*/
protected abstract void executeCancelLogic() throws Exception;
protected abstract void executeCancelLogic(boolean needWaitCancelComplete) throws Exception;

@Override
public void before() throws JobException {
Expand Down
4 changes: 3 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ public interface Task {
/**
* This method is called to cancel the execution of the task.
* Implementations should define the necessary steps to cancel the task.
*
* @param needWaitCancelComplete Do we need to wait for the cancellation to be completed.
*/
void cancel() throws JobException;
void cancel(boolean needWaitCancelComplete) throws JobException;

/**
* get info for tvf `tasks`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1488,7 +1488,7 @@ private void resetAnalyzerAndStmt() {
}

// Because this is called by other thread
public void cancel(String message) {
public void cancel(String message, boolean needWaitCancelComplete) {
Optional<InsertOverwriteTableCommand> insertOverwriteTableCommand = getInsertOverwriteTableCommand();
if (insertOverwriteTableCommand.isPresent()) {
// If the be scheduling has not been triggered yet, cancel the scheduling first
Expand All @@ -1504,12 +1504,16 @@ public void cancel(String message) {
if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof AnalyzeDBStmt) {
Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context);
}
if (insertOverwriteTableCommand.isPresent()) {
if (insertOverwriteTableCommand.isPresent() && needWaitCancelComplete) {
// Wait for the command to run or cancel completion
insertOverwriteTableCommand.get().waitNotRunning();
}
}

public void cancel(String message) {
cancel(message, true);
}

private Optional<InsertOverwriteTableCommand> getInsertOverwriteTableCommand() {
if (parsedStmt instanceof LogicalPlanAdapter) {
LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) parsedStmt;
Expand Down

0 comments on commit 7d5e8a1

Please sign in to comment.