Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the problem of killing workflow. #516

Merged
merged 1 commit into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.webank.wedatasphere.dss.flow.execution.entrance.restful;

import com.fasterxml.jackson.databind.JsonNode;
import com.webank.wedatasphere.dss.common.entity.DSSWorkspace;
import com.webank.wedatasphere.dss.common.utils.DSSCommonUtils;
import com.webank.wedatasphere.dss.standard.sso.utils.SSOHelper;
Expand All @@ -29,15 +30,19 @@
import org.apache.linkis.protocol.constants.TaskConstant;
import org.apache.linkis.protocol.utils.ZuulEntranceUtils;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.scheduler.listener.LogListener;
import org.apache.linkis.scheduler.queue.Job;
import org.apache.linkis.scheduler.queue.SchedulerEventState;
import org.apache.linkis.server.Message;
import org.apache.linkis.server.security.SecurityFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
import scala.Function0;
import scala.Option;

import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
import java.util.Map;


Expand Down Expand Up @@ -126,6 +131,54 @@ public Message status(@PathVariable("id") String id, @RequestParam(required = fa
return message;
}

@Override
@RequestMapping(path = {"/{id}/kill"},method = {RequestMethod.GET})
public Message kill(@PathVariable("id") String id, @RequestParam(value = "taskID",required = false) Long taskID) {
String realId = ZuulEntranceUtils.parseExecID(id)[3];
Option job = Option.apply((Object)null);
try {
job = this.entranceServer.getJob(realId);
} catch (Exception var10) {
logger.warn("can not find a job in entranceServer, will force to kill it", var10);
JobHistoryHelper.forceKill(taskID);
Message message = Message.ok("Forced Kill task (强制杀死任务)");
message.setMethod("/api/entrance/" + id + "/kill");
message.setStatus(0);
return message;
}
Message message = null;
if (job.isEmpty()) {
logger.warn("can not find a job in entranceServer, will force to kill it");
JobHistoryHelper.forceKill(taskID);
message = Message.ok("Forced Kill task (强制杀死任务)");
message.setMethod("/api/entrance/" + id + "/kill");
message.setStatus(0);
return message;
} else {
try {
logger.info("begin to kill job {} ", ((Job)job.get()).getId());
((Job)job.get()).kill();
message = Message.ok("Successfully killed the job(成功kill了job)");
message.setMethod("/api/entrance/" + id + "/kill");
message.setStatus(0);
message.data("execID", id);
if (job.get() instanceof EntranceJob) {
EntranceJob entranceJob = (EntranceJob)job.get();
JobRequest jobReq = entranceJob.getJobRequest();
entranceJob.updateJobRequestStatus(SchedulerEventState.Cancelled().toString());
this.entranceServer.getEntranceContext().getOrCreatePersistenceManager().createPersistenceEngine().updateIfNeeded(jobReq);
}
logger.info("end to kill job {} ", ((Job)job.get()).getId());
} catch (Throwable var9) {
logger.error("kill job {} failed ", ((Job)job.get()).getId(), var9);
message = Message.error("An exception occurred while killing the job, kill failed(kill job的时候出现了异常,kill失败)");
message.setMethod("/api/entrance/" + id + "/kill");
message.setStatus(1);
}
return message;
}
}

private void pushLog(String log, Job job) {
entranceServer.getEntranceContext().getOrCreateLogManager().onLogUpdate(job, log);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ class FlowEntranceJob(persistManager:PersistenceManager) extends EntranceExecuti
if(! SchedulerEventState.isCompleted(this.getState)){
super.kill()
Utils.tryAndWarn(this.killNodes)
transitionCompleted(ErrorExecuteResponse(s"execute job(${getId}) failed!", new FlowExecutionErrorException(90101, s"This Flow killed by user") ))
}
Utils.tryAndWarn(transitionCompleted(ErrorExecuteResponse(s"execute job(${getId}) failed!", new FlowExecutionErrorException(90101, s"This Flow killed by user"))))
}
}

override def cancel(): Unit = if (! SchedulerEventState.isCompleted(this.getState)) this synchronized {
if(! SchedulerEventState.isCompleted(this.getState)){
Utils.tryAndWarn(this.killNodes)
super.cancel()
transitionCompleted(ErrorExecuteResponse(s"cancel job(${getId}) execution!", new FlowExecutionErrorException(90101, s"This Flow killed by user") ))
Utils.tryAndWarn(transitionCompleted(ErrorExecuteResponse(s"cancel job(${getId}) execution!", new FlowExecutionErrorException(90101, s"This Flow killed by user"))))
}
}

Expand Down