diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java index 694317122e9c..2c0b0cb68e11 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.commons.lang3.StringUtils; @@ -44,6 +45,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import lombok.extern.slf4j.Slf4j; @@ -51,6 +53,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.MissingNode; +import com.fasterxml.jackson.databind.node.NullNode; @Slf4j public class DinkyTask extends AbstractRemoteTask { @@ -58,6 +61,9 @@ public class DinkyTask extends AbstractRemoteTask { private final TaskExecutionContext taskExecutionContext; private DinkyParameters dinkyParameters; + private String jobInstanceId; + private boolean status; + private String dinkyVersion; protected DinkyTask(TaskExecutionContext taskExecutionContext) { super(taskExecutionContext); @@ -79,73 +85,192 @@ public void init() { } } - // todo split handle to submit and track @Override public void handle(TaskCallBack taskCallBack) throws TaskException { - try { + // Get dinky version + dinkyVersion = getDinkyVersion(this.dinkyParameters.getAddress()); + super.handle(taskCallBack); + } + + @Override + public void submitApplication() throws TaskException { + if (dinkyVersion.startsWith("0")) { + submitApplicationV0(); + } else { + submitApplicationV1(); + } + } + + @Override + public void trackApplicationStatus() throws TaskException { + if (dinkyVersion.startsWith("0")) { + trackApplicationStatusV0(); + } else { + trackApplicationStatusV1(); + } + } + private void submitApplicationV0() { + try { String address = this.dinkyParameters.getAddress(); String taskId = this.dinkyParameters.getTaskId(); boolean isOnline = this.dinkyParameters.isOnline(); JsonNode result; + String apiResultDatasKey = DinkyTaskConstants.API_RESULT_DATAS; if (isOnline) { - // Online dinky task, and only one job is allowed to execute - result = onlineTask(address, taskId); + // Online dinky-0.6.5 task, and only one job is allowed to execute + result = onlineTaskV0(address, taskId); } else { - // Submit dinky task - result = submitTask(address, taskId); + // Submit dinky-0.6.5 task + result = submitTaskV0(address, taskId); } - if (checkResult(result)) { - boolean status = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("success").asBoolean(); - String jobInstanceId = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("jobInstanceId").asText(); - boolean finishFlag = false; - while (!finishFlag) { - JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId); - if (!checkResult(jobInstanceInfoResult)) { - break; - } - String jobInstanceStatus = - jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText(); - switch (jobInstanceStatus) { - case DinkyTaskConstants.STATUS_FINISHED: - final int exitStatusCode = mapStatusToExitCode(status); - // Use address-taskId as app id - setAppIds(String.format("%s-%s", address, taskId)); - setExitStatusCode(exitStatusCode); - log.info("dinky task finished with results: {}", - result.get(DinkyTaskConstants.API_RESULT_DATAS)); - finishFlag = true; - break; - case DinkyTaskConstants.STATUS_FAILED: - case DinkyTaskConstants.STATUS_CANCELED: - case DinkyTaskConstants.STATUS_UNKNOWN: - errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error") - .asText()); - finishFlag = true; - break; - default: - Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS); - } + if (checkResultV0(result)) { + status = result.get(apiResultDatasKey).get(DinkyTaskConstants.API_RESULT_SUCCESS).asBoolean(); + if (result.get(apiResultDatasKey).has(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID) + && !(result.get(apiResultDatasKey) + .get(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID) instanceof NullNode)) { + jobInstanceId = + result.get(apiResultDatasKey).get(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID).asText(); } } - } catch (InterruptedException ex) { + } catch (Exception ex) { Thread.currentThread().interrupt(); - log.error("Execute dinkyTask failed", ex); + log.error(DinkyTaskConstants.SUBMIT_FAILED_MSG, ex); setExitStatusCode(EXIT_CODE_FAILURE); - throw new TaskException("Execute dinkyTask failed", ex); + throw new TaskException(DinkyTaskConstants.SUBMIT_FAILED_MSG, ex); } } - @Override - public void submitApplication() throws TaskException { + private void submitApplicationV1() { + try { + String address = this.dinkyParameters.getAddress(); + String taskId = this.dinkyParameters.getTaskId(); + boolean isOnline = this.dinkyParameters.isOnline(); + JsonNode result; + String apiResultDataKey = DinkyTaskConstants.API_RESULT_DATA; + // Submit dinky-1.0.0 task + result = submitTaskV1(address, taskId, isOnline, generateVariables()); + if (checkResultV1(result)) { + status = result.get(DinkyTaskConstants.API_RESULT_SUCCESS).asBoolean(); + if (result.get(apiResultDataKey).has(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID) + && !(result.get(apiResultDataKey) + .get(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID) instanceof NullNode)) { + jobInstanceId = + result.get(apiResultDataKey).get(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID).asText(); + } + } else { + log.error(DinkyTaskConstants.SUBMIT_FAILED_MSG + "{}", result.get(DinkyTaskConstants.API_RESULT_MSG)); + setExitStatusCode(EXIT_CODE_FAILURE); + throw new TaskException( + DinkyTaskConstants.SUBMIT_FAILED_MSG + result.get(DinkyTaskConstants.API_RESULT_MSG)); + } + } catch (Exception ex) { + Thread.currentThread().interrupt(); + log.error(DinkyTaskConstants.SUBMIT_FAILED_MSG, ex); + setExitStatusCode(EXIT_CODE_FAILURE); + throw new TaskException(DinkyTaskConstants.SUBMIT_FAILED_MSG, ex); + } + } + public void trackApplicationStatusV0() throws TaskException { + try { + String address = this.dinkyParameters.getAddress(); + String taskId = this.dinkyParameters.getTaskId(); + if (status && jobInstanceId == null) { + // Use address-taskId as app id + setAppIds(String.format(DinkyTaskConstants.APPIDS_FORMAT, address, taskId)); + setExitStatusCode(mapStatusToExitCode(true)); + log.info("Dinky common sql task finished."); + return; + } + String apiResultDatasKey = DinkyTaskConstants.API_RESULT_DATAS; + boolean finishFlag = false; + while (!finishFlag) { + JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId); + if (!checkResultV0(jobInstanceInfoResult)) { + break; + } + String jobInstanceStatus = + jobInstanceInfoResult.get(apiResultDatasKey).get("status").asText(); + switch (jobInstanceStatus) { + case DinkyTaskConstants.STATUS_FINISHED: + final int exitStatusCode = mapStatusToExitCode(status); + // Use address-taskId as app id + setAppIds(String.format(DinkyTaskConstants.APPIDS_FORMAT, address, taskId)); + setExitStatusCode(exitStatusCode); + log.info("dinky task finished with results: {}", + jobInstanceInfoResult.get(apiResultDatasKey)); + finishFlag = true; + break; + case DinkyTaskConstants.STATUS_FAILED: + case DinkyTaskConstants.STATUS_CANCELED: + case DinkyTaskConstants.STATUS_UNKNOWN: + errorHandle( + jobInstanceInfoResult.get(apiResultDatasKey).get(DinkyTaskConstants.API_RESULT_ERROR) + .asText()); + finishFlag = true; + break; + default: + Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS); + } + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + log.error(DinkyTaskConstants.TRACK_FAILED_MSG, ex); + setExitStatusCode(EXIT_CODE_FAILURE); + throw new TaskException(DinkyTaskConstants.TRACK_FAILED_MSG, ex); + } } - @Override - public void trackApplicationStatus() throws TaskException { + public void trackApplicationStatusV1() throws TaskException { + try { + String address = this.dinkyParameters.getAddress(); + String taskId = this.dinkyParameters.getTaskId(); + if (status && jobInstanceId == null) { + // Use address-taskId as app id + setAppIds(String.format(DinkyTaskConstants.APPIDS_FORMAT, address, taskId)); + setExitStatusCode(mapStatusToExitCode(true)); + log.info("Dinky common sql task finished."); + return; + } + String apiResultDataKey = DinkyTaskConstants.API_RESULT_DATA; + boolean finishFlag = false; + while (!finishFlag) { + JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId); + if (!checkResultV1(jobInstanceInfoResult)) { + break; + } + String jobInstanceStatus = + jobInstanceInfoResult.get(apiResultDataKey).get("status").asText(); + switch (jobInstanceStatus) { + case DinkyTaskConstants.STATUS_FINISHED: + final int exitStatusCode = mapStatusToExitCode(status); + // Use address-taskId as app id + setAppIds(String.format(DinkyTaskConstants.APPIDS_FORMAT, address, taskId)); + setExitStatusCode(exitStatusCode); + log.info("dinky task finished with results: {}", + jobInstanceInfoResult.get(apiResultDataKey)); + finishFlag = true; + break; + case DinkyTaskConstants.STATUS_FAILED: + case DinkyTaskConstants.STATUS_CANCELED: + case DinkyTaskConstants.STATUS_UNKNOWN: + errorHandle(jobInstanceInfoResult.get(apiResultDataKey).get(DinkyTaskConstants.API_RESULT_ERROR) + .asText()); + finishFlag = true; + break; + default: + Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS); + } + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + log.error(DinkyTaskConstants.TRACK_FAILED_MSG, ex); + setExitStatusCode(EXIT_CODE_FAILURE); + throw new TaskException(DinkyTaskConstants.TRACK_FAILED_MSG, ex); + } } - /** * map dinky task status to exitStatusCode * @@ -160,15 +285,28 @@ private int mapStatusToExitCode(boolean status) { } } - private boolean checkResult(JsonNode result) { - if (result instanceof MissingNode || result == null) { + private boolean checkResultV0(JsonNode result) { + boolean isCorrect = true; + if (result instanceof MissingNode || result instanceof NullNode) { + errorHandle(DinkyTaskConstants.API_VERSION_ERROR_TIPS); + isCorrect = false; + } else if (result.get(DinkyTaskConstants.API_RESULT_CODE).asInt() == DinkyTaskConstants.API_ERROR) { + errorHandle(result.get(DinkyTaskConstants.API_RESULT_MSG)); + isCorrect = false; + } + return isCorrect; + } + + private boolean checkResultV1(JsonNode result) { + boolean isCorrect = true; + if (result instanceof MissingNode || result instanceof NullNode) { errorHandle(DinkyTaskConstants.API_VERSION_ERROR_TIPS); - return false; - } else if (result.get("code").asInt() == DinkyTaskConstants.API_ERROR) { - errorHandle(result.get("msg")); - return false; + isCorrect = false; + } else if (!result.get(DinkyTaskConstants.API_RESULT_SUCCESS).asBoolean()) { + errorHandle(result.get(DinkyTaskConstants.API_RESULT_MSG)); + isCorrect = false; } - return true; + return isCorrect; } private void errorHandle(Object msg) { @@ -196,18 +334,53 @@ public void cancelApplication() throws TaskException { taskId); } - private JsonNode submitTask(String address, String taskId) { + private Map generateVariables() { + Map variables = new ConcurrentHashMap<>(); + List propertyList = JSONUtils.toList(taskExecutionContext.getGlobalParams(), Property.class); + if (propertyList != null && !propertyList.isEmpty()) { + for (Property property : propertyList) { + variables.put(property.getProp(), property.getValue()); + } + } + List localParams = this.dinkyParameters.getLocalParams(); + if (localParams == null || localParams.isEmpty()) { + return variables; + } + for (Property property : localParams) { + variables.put(property.getProp(), property.getValue()); + } + return variables; + } + + private String getDinkyVersion(String address) { + JsonNode versionJsonNode = parse(doGet(address + DinkyTaskConstants.GET_VERSION, new HashMap<>())); + if (versionJsonNode instanceof MissingNode || versionJsonNode == null + || versionJsonNode.get(DinkyTaskConstants.API_RESULT_CODE).asInt() == DinkyTaskConstants.API_ERROR) { + return "0"; + } + return versionJsonNode.get(DinkyTaskConstants.API_RESULT_DATA).asText(); + } + + private JsonNode submitTaskV0(String address, String taskId) { Map params = new HashMap<>(); params.put(DinkyTaskConstants.PARAM_TASK_ID, taskId); return parse(doGet(address + DinkyTaskConstants.SUBMIT_TASK, params)); } - private JsonNode onlineTask(String address, String taskId) { + private JsonNode onlineTaskV0(String address, String taskId) { Map params = new HashMap<>(); params.put(DinkyTaskConstants.PARAM_TASK_ID, taskId); return parse(doGet(address + DinkyTaskConstants.ONLINE_TASK, params)); } + private JsonNode submitTaskV1(String address, String taskId, boolean isOnline, Map variables) { + Map params = new HashMap<>(); + params.put(DinkyTaskConstants.PARAM_TASK_ID, taskId); + params.put(DinkyTaskConstants.PARAM_TASK_IS_ONLINE, isOnline); + params.put(DinkyTaskConstants.PARAM_TASK_VARIABLES, variables); + return parse(sendJsonStr(address + DinkyTaskConstants.SUBMIT_TASK, JSONUtils.toJsonString(params))); + } + private JsonNode cancelTask(String address, String taskId) { Map params = new HashMap<>(); params.put(DinkyTaskConstants.PARAM_JSON_TASK_ID, taskId); @@ -289,4 +462,5 @@ private String sendJsonStr(String url, String params) { } return result; } + } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTaskConstants.java index 8b12b8c259be..827a4c163544 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTaskConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTaskConstants.java @@ -27,6 +27,7 @@ private DinkyTaskConstants() { } private static final String API_ROUTE = "/openapi/"; + public static final String GET_VERSION = API_ROUTE + "version"; public static final String SUBMIT_TASK = API_ROUTE + "submitTask"; public static final String ONLINE_TASK = API_ROUTE + "onLineTask"; public static final String SAVEPOINT_TASK = API_ROUTE + "savepointTask"; @@ -34,11 +35,19 @@ private DinkyTaskConstants() { public static final int API_ERROR = 1; public static final String API_VERSION_ERROR_TIPS = "Please check that the dinky version is greater than or equal to 0.6.5"; + public static final String API_RESULT_CODE = "code"; + public static final String API_RESULT_SUCCESS = "success"; + public static final String API_RESULT_MSG = "msg"; + public static final String API_RESULT_ERROR = "error"; + public static final String API_RESULT_DATA = "data"; public static final String API_RESULT_DATAS = "datas"; + public static final String API_RESULT_JOB_INSTANCE_ID = "jobInstanceId"; public static final String SAVEPOINT_CANCEL = "cancel"; public static final String PARAM_TASK_ID = "id"; + public static final String PARAM_TASK_IS_ONLINE = "isOnline"; + public static final String PARAM_TASK_VARIABLES = "variables"; public static final String PARAM_JSON_TASK_ID = "taskId"; public static final String PARAM_SAVEPOINT_TYPE = "type"; public static final String PARAM_JOB_INSTANCE_ID = "id"; @@ -48,6 +57,10 @@ private DinkyTaskConstants() { public static final String STATUS_FAILED = "FAILED"; public static final String STATUS_UNKNOWN = "UNKNOWN"; + public static final String SUBMIT_FAILED_MSG = "Submit dinkyTask failed:"; + public static final String TRACK_FAILED_MSG = "Track dinkyTask failed:"; + public static final String APPIDS_FORMAT = "%s-%s"; + public static final long SLEEP_MILLIS = 3000; } diff --git a/dolphinscheduler-ui/public/images/task-icons/dinky.png b/dolphinscheduler-ui/public/images/task-icons/dinky.png index 7f4ad3997b65..ad1aba9b2086 100644 Binary files a/dolphinscheduler-ui/public/images/task-icons/dinky.png and b/dolphinscheduler-ui/public/images/task-icons/dinky.png differ diff --git a/dolphinscheduler-ui/public/images/task-icons/dinky_hover.png b/dolphinscheduler-ui/public/images/task-icons/dinky_hover.png index 1db1980ce914..6aca8a2f4932 100644 Binary files a/dolphinscheduler-ui/public/images/task-icons/dinky_hover.png and b/dolphinscheduler-ui/public/images/task-icons/dinky_hover.png differ