Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhance](mtmv)When drop MTMV, no longer wait for task cancel to complete #45995

Merged
merged 1 commit into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,12 @@ public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
private Lock createTaskLock = new ReentrantLock();

@Override
public void cancelAllTasks() throws JobException {
public void cancelAllTasks(boolean needWaitCancelComplete) throws JobException {
if (CollectionUtils.isEmpty(runningTasks)) {
return;
}
for (T task : runningTasks) {
task.cancel();
task.cancel(needWaitCancelComplete);
canceledTaskCount.incrementAndGet();
}
runningTasks = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -184,7 +184,7 @@ public void cancelTaskById(long taskId) throws JobException {
throw new JobException("no running task");
}
runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst()
.orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel();
.orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel(true);
runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
canceledTaskCount.incrementAndGet();
if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
Expand Down Expand Up @@ -292,7 +292,7 @@ public void updateJobStatus(JobStatus newJobStatus) throws JobException {
this.finishTimeMs = System.currentTimeMillis();
}
if (JobStatus.PAUSED.equals(newJobStatus) || JobStatus.STOPPED.equals(newJobStatus)) {
cancelAllTasks();
cancelAllTasks(JobStatus.STOPPED.equals(newJobStatus) ? false : true);
}
jobStatus = newJobStatus;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public interface Job<T extends AbstractTask, C> {
* Cancels all running tasks of this job.
* @throws JobException If cancelling a running task fails.
*/
void cancelAllTasks() throws JobException;
void cancelAllTasks(boolean needWaitCancelComplete) throws JobException;

/**
* register job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void onEvent(TimerJobEvent<T> event) {
JobType jobType = event.getJob().getJobType();
for (AbstractTask task : tasks) {
if (!disruptorMap.get(jobType).addTask(task)) {
task.cancel();
task.cancel(true);
continue;
}
log.info("dispatch timer job success, job id is {}, task id is {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,12 @@ public void cancelTaskById(long taskId) throws JobException {
}

@Override
public void cancelAllTasks() throws JobException {
public void cancelAllTasks(boolean needWaitCancelComplete) throws JobException {
try {
if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
checkAuth("CANCEL LOAD");
}
super.cancelAllTasks();
super.cancelAllTasks(needWaitCancelComplete);
this.failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel");
} catch (DdlException e) {
throw new JobException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public void onSuccess() throws JobException {
}

@Override
protected void executeCancelLogic() {
protected void executeCancelLogic(boolean needWaitCancelComplete) {
if (isFinished.get() || isCanceled.get()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,10 @@ public synchronized void onSuccess() throws JobException {
}

@Override
protected synchronized void executeCancelLogic() {
protected synchronized void executeCancelLogic(boolean needWaitCancelComplete) {
LOG.info("mtmv task cancel, taskId: {}", super.getTaskId());
if (executor != null) {
executor.cancel(new Status(TStatusCode.CANCELLED, "mtmv task cancelled"));
executor.cancel(new Status(TStatusCode.CANCELLED, "mtmv task cancelled"), needWaitCancelComplete);
}
after();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,16 @@ public void onSuccess() throws JobException {
/**
* Cancels the ongoing task, updating its status to {@link TaskStatus#CANCELED} and releasing associated resources.
* This method encapsulates the core cancellation logic, calling the abstract method
* {@link #executeCancelLogic()} for task-specific actions.
* {@link #executeCancelLogic(boolean)} for task-specific actions.
*
* @throws JobException If an error occurs during the cancellation process, a new JobException is thrown wrapping
* the original exception.
*/
@Override
public void cancel() throws JobException {
public void cancel(boolean needWaitCancelComplete) throws JobException {
try {
status = TaskStatus.CANCELED;
executeCancelLogic();
executeCancelLogic(needWaitCancelComplete);
} catch (Exception e) {
log.warn("cancel task failed, job id is {}, task id is {}", jobId, taskId, e);
throw new JobException(e);
Expand All @@ -153,7 +153,7 @@ public void cancel() throws JobException {
*
* @throws Exception Any exception that might occur during the cancellation process in the subclass.
*/
protected abstract void executeCancelLogic() throws Exception;
protected abstract void executeCancelLogic(boolean needWaitCancelComplete) throws Exception;

@Override
public void before() throws JobException {
Expand Down
4 changes: 3 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ public interface Task {
/**
* This method is called to cancel the execution of the task.
* Implementations should define the necessary steps to cancel the task.
*
* @param needWaitCancelComplete Do we need to wait for the cancellation to be completed.
*/
void cancel() throws JobException;
void cancel(boolean needWaitCancelComplete) throws JobException;

/**
* get info for tvf `tasks`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1586,7 +1586,7 @@ private void resetAnalyzerAndStmt() {
}

// Because this is called by other thread
public void cancel(Status cancelReason) {
public void cancel(Status cancelReason, boolean needWaitCancelComplete) {
if (masterOpExecutor != null) {
try {
masterOpExecutor.cancel();
Expand All @@ -1610,12 +1610,16 @@ public void cancel(Status cancelReason) {
if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof AnalyzeDBStmt) {
Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context);
}
if (insertOverwriteTableCommand.isPresent()) {
if (insertOverwriteTableCommand.isPresent() && needWaitCancelComplete) {
// Wait for the command to run or cancel completion
insertOverwriteTableCommand.get().waitNotRunning();
}
}

public void cancel(Status cancelReason) {
cancel(cancelReason, true);
}

private Optional<InsertOverwriteTableCommand> getInsertOverwriteTableCommand() {
if (parsedStmt instanceof LogicalPlanAdapter) {
LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) parsedStmt;
Expand Down
Loading