Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature-15475][DinkyTask] DinkyTask supports Dinky-1.0.0 and common sql #15479

Merged
merged 5 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;

import org.apache.commons.lang3.StringUtils;
Expand All @@ -44,20 +45,25 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import lombok.extern.slf4j.Slf4j;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.MissingNode;
import com.fasterxml.jackson.databind.node.NullNode;

@Slf4j
public class DinkyTask extends AbstractRemoteTask {

private final TaskExecutionContext taskExecutionContext;

private DinkyParameters dinkyParameters;
private String jobInstanceId;
private boolean status;
private String dinkyVersion;

protected DinkyTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
Expand All @@ -79,73 +85,192 @@ public void init() {
}
}

// todo split handle to submit and track
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// Get dinky version
dinkyVersion = getDinkyVersion(this.dinkyParameters.getAddress());
super.handle(taskCallBack);
}

@Override
public void submitApplication() throws TaskException {
if (dinkyVersion.startsWith("0")) {
submitApplicationV0();
} else {
submitApplicationV1();
}
}

@Override
public void trackApplicationStatus() throws TaskException {
if (dinkyVersion.startsWith("0")) {
trackApplicationStatusV0();
} else {
trackApplicationStatusV1();
}
}

private void submitApplicationV0() {
try {
String address = this.dinkyParameters.getAddress();
String taskId = this.dinkyParameters.getTaskId();
boolean isOnline = this.dinkyParameters.isOnline();
JsonNode result;
String apiResultDatasKey = DinkyTaskConstants.API_RESULT_DATAS;
if (isOnline) {
// Online dinky task, and only one job is allowed to execute
result = onlineTask(address, taskId);
// Online dinky-0.6.5 task, and only one job is allowed to execute
result = onlineTaskV0(address, taskId);
} else {
// Submit dinky task
result = submitTask(address, taskId);
// Submit dinky-0.6.5 task
result = submitTaskV0(address, taskId);
}
if (checkResult(result)) {
boolean status = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("success").asBoolean();
String jobInstanceId = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("jobInstanceId").asText();
boolean finishFlag = false;
while (!finishFlag) {
JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId);
if (!checkResult(jobInstanceInfoResult)) {
break;
}
String jobInstanceStatus =
jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText();
switch (jobInstanceStatus) {
case DinkyTaskConstants.STATUS_FINISHED:
final int exitStatusCode = mapStatusToExitCode(status);
// Use address-taskId as app id
setAppIds(String.format("%s-%s", address, taskId));
setExitStatusCode(exitStatusCode);
log.info("dinky task finished with results: {}",
result.get(DinkyTaskConstants.API_RESULT_DATAS));
finishFlag = true;
break;
case DinkyTaskConstants.STATUS_FAILED:
case DinkyTaskConstants.STATUS_CANCELED:
case DinkyTaskConstants.STATUS_UNKNOWN:
errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error")
.asText());
finishFlag = true;
break;
default:
Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS);
}
if (checkResultV0(result)) {
status = result.get(apiResultDatasKey).get(DinkyTaskConstants.API_RESULT_SUCCESS).asBoolean();
if (result.get(apiResultDatasKey).has(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID)
&& !(result.get(apiResultDatasKey)
.get(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID) instanceof NullNode)) {
jobInstanceId =
result.get(apiResultDatasKey).get(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID).asText();
}
}
} catch (InterruptedException ex) {
} catch (Exception ex) {
Thread.currentThread().interrupt();
log.error("Execute dinkyTask failed", ex);
log.error(DinkyTaskConstants.SUBMIT_FAILED_MSG, ex);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException("Execute dinkyTask failed", ex);
throw new TaskException(DinkyTaskConstants.SUBMIT_FAILED_MSG, ex);
}
}

@Override
public void submitApplication() throws TaskException {
private void submitApplicationV1() {
try {
String address = this.dinkyParameters.getAddress();
String taskId = this.dinkyParameters.getTaskId();
boolean isOnline = this.dinkyParameters.isOnline();
JsonNode result;
String apiResultDataKey = DinkyTaskConstants.API_RESULT_DATA;
// Submit dinky-1.0.0 task
result = submitTaskV1(address, taskId, isOnline, generateVariables());
if (checkResultV1(result)) {
status = result.get(DinkyTaskConstants.API_RESULT_SUCCESS).asBoolean();
if (result.get(apiResultDataKey).has(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID)
&& !(result.get(apiResultDataKey)
.get(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID) instanceof NullNode)) {
jobInstanceId =
result.get(apiResultDataKey).get(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID).asText();
}
} else {
log.error(DinkyTaskConstants.SUBMIT_FAILED_MSG + "{}", result.get(DinkyTaskConstants.API_RESULT_MSG));
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException(
DinkyTaskConstants.SUBMIT_FAILED_MSG + result.get(DinkyTaskConstants.API_RESULT_MSG));
}
} catch (Exception ex) {
Thread.currentThread().interrupt();
log.error(DinkyTaskConstants.SUBMIT_FAILED_MSG, ex);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException(DinkyTaskConstants.SUBMIT_FAILED_MSG, ex);
}
}

public void trackApplicationStatusV0() throws TaskException {
try {
String address = this.dinkyParameters.getAddress();
String taskId = this.dinkyParameters.getTaskId();
if (status && jobInstanceId == null) {
// Use address-taskId as app id
setAppIds(String.format(DinkyTaskConstants.APPIDS_FORMAT, address, taskId));
setExitStatusCode(mapStatusToExitCode(true));
log.info("Dinky common sql task finished.");
return;
}
String apiResultDatasKey = DinkyTaskConstants.API_RESULT_DATAS;
boolean finishFlag = false;
while (!finishFlag) {
JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId);
if (!checkResultV0(jobInstanceInfoResult)) {
break;
}
String jobInstanceStatus =
jobInstanceInfoResult.get(apiResultDatasKey).get("status").asText();
switch (jobInstanceStatus) {
case DinkyTaskConstants.STATUS_FINISHED:
final int exitStatusCode = mapStatusToExitCode(status);
// Use address-taskId as app id
setAppIds(String.format(DinkyTaskConstants.APPIDS_FORMAT, address, taskId));
setExitStatusCode(exitStatusCode);
log.info("dinky task finished with results: {}",
jobInstanceInfoResult.get(apiResultDatasKey));
finishFlag = true;
break;
case DinkyTaskConstants.STATUS_FAILED:
case DinkyTaskConstants.STATUS_CANCELED:
case DinkyTaskConstants.STATUS_UNKNOWN:
errorHandle(
jobInstanceInfoResult.get(apiResultDatasKey).get(DinkyTaskConstants.API_RESULT_ERROR)
.asText());
finishFlag = true;
break;
default:
Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS);
}
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
log.error(DinkyTaskConstants.TRACK_FAILED_MSG, ex);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException(DinkyTaskConstants.TRACK_FAILED_MSG, ex);
}
}

@Override
public void trackApplicationStatus() throws TaskException {
public void trackApplicationStatusV1() throws TaskException {
try {

String address = this.dinkyParameters.getAddress();
String taskId = this.dinkyParameters.getTaskId();
if (status && jobInstanceId == null) {
// Use address-taskId as app id
setAppIds(String.format(DinkyTaskConstants.APPIDS_FORMAT, address, taskId));
setExitStatusCode(mapStatusToExitCode(true));
log.info("Dinky common sql task finished.");
return;
}
String apiResultDataKey = DinkyTaskConstants.API_RESULT_DATA;
boolean finishFlag = false;
while (!finishFlag) {
JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId);
if (!checkResultV1(jobInstanceInfoResult)) {
break;
}
String jobInstanceStatus =
jobInstanceInfoResult.get(apiResultDataKey).get("status").asText();
switch (jobInstanceStatus) {
case DinkyTaskConstants.STATUS_FINISHED:
final int exitStatusCode = mapStatusToExitCode(status);
// Use address-taskId as app id
setAppIds(String.format(DinkyTaskConstants.APPIDS_FORMAT, address, taskId));
setExitStatusCode(exitStatusCode);
log.info("dinky task finished with results: {}",
jobInstanceInfoResult.get(apiResultDataKey));
finishFlag = true;
break;
case DinkyTaskConstants.STATUS_FAILED:
case DinkyTaskConstants.STATUS_CANCELED:
case DinkyTaskConstants.STATUS_UNKNOWN:
errorHandle(jobInstanceInfoResult.get(apiResultDataKey).get(DinkyTaskConstants.API_RESULT_ERROR)
.asText());
finishFlag = true;
break;
default:
Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS);
}
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
log.error(DinkyTaskConstants.TRACK_FAILED_MSG, ex);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException(DinkyTaskConstants.TRACK_FAILED_MSG, ex);
}
}

/**
* map dinky task status to exitStatusCode
*
Expand All @@ -160,15 +285,28 @@ private int mapStatusToExitCode(boolean status) {
}
}

private boolean checkResult(JsonNode result) {
if (result instanceof MissingNode || result == null) {
private boolean checkResultV0(JsonNode result) {
boolean isCorrect = true;
if (result instanceof MissingNode || result instanceof NullNode) {
errorHandle(DinkyTaskConstants.API_VERSION_ERROR_TIPS);
isCorrect = false;
} else if (result.get(DinkyTaskConstants.API_RESULT_CODE).asInt() == DinkyTaskConstants.API_ERROR) {
errorHandle(result.get(DinkyTaskConstants.API_RESULT_MSG));
isCorrect = false;
}
return isCorrect;
}

private boolean checkResultV1(JsonNode result) {
boolean isCorrect = true;
if (result instanceof MissingNode || result instanceof NullNode) {
errorHandle(DinkyTaskConstants.API_VERSION_ERROR_TIPS);
return false;
} else if (result.get("code").asInt() == DinkyTaskConstants.API_ERROR) {
errorHandle(result.get("msg"));
return false;
isCorrect = false;
} else if (!result.get(DinkyTaskConstants.API_RESULT_SUCCESS).asBoolean()) {
errorHandle(result.get(DinkyTaskConstants.API_RESULT_MSG));
isCorrect = false;
}
return true;
return isCorrect;
}

private void errorHandle(Object msg) {
Expand Down Expand Up @@ -196,18 +334,53 @@ public void cancelApplication() throws TaskException {
taskId);
}

private JsonNode submitTask(String address, String taskId) {
private Map<String, String> generateVariables() {
Map<String, String> variables = new ConcurrentHashMap<>();
List<Property> propertyList = JSONUtils.toList(taskExecutionContext.getGlobalParams(), Property.class);
if (propertyList != null && !propertyList.isEmpty()) {
for (Property property : propertyList) {
variables.put(property.getProp(), property.getValue());
}
}
List<Property> localParams = this.dinkyParameters.getLocalParams();
if (localParams == null || localParams.isEmpty()) {
return variables;
}
for (Property property : localParams) {
variables.put(property.getProp(), property.getValue());
}
return variables;
}

private String getDinkyVersion(String address) {
JsonNode versionJsonNode = parse(doGet(address + DinkyTaskConstants.GET_VERSION, new HashMap<>()));
if (versionJsonNode instanceof MissingNode || versionJsonNode == null
|| versionJsonNode.get(DinkyTaskConstants.API_RESULT_CODE).asInt() == DinkyTaskConstants.API_ERROR) {
return "0";
}
return versionJsonNode.get(DinkyTaskConstants.API_RESULT_DATA).asText();
}

private JsonNode submitTaskV0(String address, String taskId) {
Map<String, String> params = new HashMap<>();
params.put(DinkyTaskConstants.PARAM_TASK_ID, taskId);
return parse(doGet(address + DinkyTaskConstants.SUBMIT_TASK, params));
}

private JsonNode onlineTask(String address, String taskId) {
private JsonNode onlineTaskV0(String address, String taskId) {
Map<String, String> params = new HashMap<>();
params.put(DinkyTaskConstants.PARAM_TASK_ID, taskId);
return parse(doGet(address + DinkyTaskConstants.ONLINE_TASK, params));
}

private JsonNode submitTaskV1(String address, String taskId, boolean isOnline, Map<String, String> variables) {
Map<String, Object> params = new HashMap<>();
params.put(DinkyTaskConstants.PARAM_TASK_ID, taskId);
params.put(DinkyTaskConstants.PARAM_TASK_IS_ONLINE, isOnline);
params.put(DinkyTaskConstants.PARAM_TASK_VARIABLES, variables);
return parse(sendJsonStr(address + DinkyTaskConstants.SUBMIT_TASK, JSONUtils.toJsonString(params)));
}

private JsonNode cancelTask(String address, String taskId) {
Map<String, String> params = new HashMap<>();
params.put(DinkyTaskConstants.PARAM_JSON_TASK_ID, taskId);
Expand Down Expand Up @@ -289,4 +462,5 @@ private String sendJsonStr(String url, String params) {
}
return result;
}

}
Loading
Loading