Skip to content

Commit

Permalink
[Feature-14678][Master][UI]Dependent task parameter passing (apache#1…
Browse files Browse the repository at this point in the history
…4702)

* feat: dependent task parameter passing

* style: code format

* refactor: remove inappropriate log

* refactor: Modify unreasonable code structure

* refactor: Modify unreasonable code structure

* refactor: Solve conflicts caused by merge

---------

Co-authored-by: Rick Cheng <rickchengx@gmail.com>
Co-authored-by: BaoLiang <29528966+lenboo@users.noreply.github.com>
  • Loading branch information
3 people authored and xdu-chenrj committed Oct 30, 2023
1 parent f212b8b commit 5e76468
Show file tree
Hide file tree
Showing 13 changed files with 109 additions and 37 deletions.
2 changes: 2 additions & 0 deletions docs/docs/en/guide/task/dependent.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ The Dependent node provides a logical judgment function, which can detect the ex
Two dependency modes are supported, including workflow-dependent and task-dependent. The task-dependent mode is divided into two cases: depend on all tasks in the workflow and depend on a single task.
The workflow-dependent mode checks the status of the dependent workflow; the all-task-dependent mode checks the status of all tasks in the workflow; and the single-task-dependent mode checks the status of the dependent task.

When the dependent result is success and the parameter passing option is true, the Dependent node will output the output parameters of the dependency to the downstream task. When the parameter names of multiple dependencies are the same, it involves the priority of the parameters. See also [Parameter Priority](../parameter/priority.md)

For example, process A is a weekly task, processes B and C are daily tasks, and task A requires tasks B and C to be successfully executed last week.

![dependent_task01](../../../../img/tasks/demo/dependent_task01.png)
Expand Down
2 changes: 2 additions & 0 deletions docs/docs/zh/guide/task/dependent.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ Dependent 节点提供了逻辑判断功能,可以按照逻辑来检测所依
依赖工作流的模式会检查所依赖的工作流的状态;依赖所有任务的模式会检查工作流中所有任务的状态;
依赖单个任务的模式会检查所依赖的任务的状态。

当 Dependent 节点结果为 success 且参数传递选项为 true 时,Dependent 节点会将该依赖项的输出参数输出给下游任务。当多个依赖项的参数名称相同时涉及到参数的优先级问题,详见[参数优先级](../parameter/priority.md)

例如,A 流程为周报任务,B、C 流程为天任务,A 任务需要 B、C 任务在上周执行成功,如图示:

![dependent_task01](../../../../img/tasks/demo/dependent_task01.png)
Expand Down
Binary file modified docs/img/tasks/demo/dependent_task01.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/img/tasks/demo/dependent_task02.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/img/tasks/demo/dependent_task03.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.dolphinscheduler.common.constants.Constants.DEPENDENT_SPLIT;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
Expand All @@ -34,6 +35,7 @@
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
import org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction;
Expand Down Expand Up @@ -69,6 +71,7 @@ public class DependentAsyncTaskExecuteFunction implements AsyncTaskExecuteFuncti
private final Date dependentDate;
private final List<DependentExecute> dependentTaskList;
private final Map<String, DependResult> dependResultMap;
private final Map<String, Property> dependVarPoolPropertyMap;

public DependentAsyncTaskExecuteFunction(TaskExecutionContext taskExecutionContext,
DependentParameters dependentParameters,
Expand All @@ -89,6 +92,7 @@ public DependentAsyncTaskExecuteFunction(TaskExecutionContext taskExecutionConte
this.dependentTaskList = initializeDependentTaskList();
log.info("Initialized dependent task list successfully");
this.dependResultMap = new HashMap<>();
this.dependVarPoolPropertyMap = new HashMap<>();
}

@Override
Expand All @@ -97,8 +101,13 @@ public DependentAsyncTaskExecuteFunction(TaskExecutionContext taskExecutionConte
log.info("All dependent task finished, will calculate the dependent result");
DependResult dependResult = calculateDependResult();
log.info("The Dependent result is: {}", dependResult);
return dependResult == DependResult.SUCCESS ? AsyncTaskExecutionStatus.SUCCESS
: AsyncTaskExecutionStatus.FAILED;
if (dependResult == DependResult.SUCCESS) {
dependentParameters.setVarPool(JSONUtils.toJsonString(dependVarPoolPropertyMap.values()));
log.info("Set dependentParameters varPool: {}", dependentParameters.getVarPool());
return AsyncTaskExecutionStatus.SUCCESS;
} else {
return AsyncTaskExecutionStatus.FAILED;
}
}
return AsyncTaskExecutionStatus.RUNNING;
}
Expand Down Expand Up @@ -189,9 +198,16 @@ private List<DependentExecute> initializeDependentTaskList() {

private DependResult calculateDependResult() {
List<DependResult> dependResultList = new ArrayList<>();
Map<String, Long> dependVarPoolEndTimeMap = new HashMap<>();
for (DependentExecute dependentExecute : dependentTaskList) {
DependResult dependResult =
dependentExecute.getModelDependResult(dependentDate, processInstance.getTestFlag());
if (dependResult == DependResult.SUCCESS) {
Map<String, Property> varPoolPropertyMap = dependentExecute.getDependTaskVarPoolPropertyMap();
Map<String, Long> varPoolEndTimeMap = dependentExecute.getDependTaskVarPoolEndTimeMap();
DependentUtils.addTaskVarPool(varPoolPropertyMap, varPoolEndTimeMap, dependVarPoolPropertyMap,
dependVarPoolEndTimeMap);
}
dependResultList.add(dependResult);
}
return DependentUtils.getDependResultForRelation(this.dependentParameters.getRelation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
Expand All @@ -33,9 +34,11 @@
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
Expand All @@ -48,6 +51,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -97,6 +101,14 @@ public class DependentExecute {
*/
private final TaskDefinitionDao taskDefinitionDao = SpringApplicationContext.getBean(TaskDefinitionDao.class);

private Map<String, Property> dependTaskVarPoolPropertyMap = new HashMap<>();

private Map<String, Long> dependTaskVarPoolEndTimeMap = new HashMap<>();

private Map<String, Property> dependItemVarPoolPropertyMap = new HashMap<>();

private Map<String, Long> dependItemVarPoolEndTimeMap = new HashMap<>();

/**
* constructor
*
Expand Down Expand Up @@ -168,6 +180,7 @@ private DependResult dependResultByProcessInstance(ProcessInstance processInstan
return DependResult.WAITING;
}
if (processInstance.getState().isSuccess()) {
addItemVarPool(processInstance.getVarPool(), processInstance.getEndTime().getTime());
return DependResult.SUCCESS;
}
log.warn(
Expand Down Expand Up @@ -221,46 +234,12 @@ private DependResult dependResultByAllTaskOfProcessInstance(ProcessInstance proc
}
}
}
addItemVarPool(processInstance.getVarPool(), processInstance.getEndTime().getTime());
return DependResult.SUCCESS;
}
return DependResult.FAILED;
}

/**
* get depend task result
*
* @param taskCode
* @param processInstance
* @return
*/
private DependResult getDependTaskResult(long taskCode, ProcessInstance processInstance, int testFlag) {
DependResult result;
TaskInstance taskInstance = null;
List<TaskInstance> taskInstanceList =
taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processInstance.getId(), testFlag);

for (TaskInstance task : taskInstanceList) {
if (task.getTaskCode() == taskCode) {
taskInstance = task;
break;
}
}

if (taskInstance == null) {
// cannot find task in the process instance
// maybe because process instance is running or failed.
if (processInstance.getState().isFinished()) {
result = DependResult.FAILED;
} else {
return DependResult.WAITING;
}
} else {
result = getDependResultByState(taskInstance.getState());
}

return result;
}

/**
* depend type = depend_task
*
Expand Down Expand Up @@ -303,12 +282,31 @@ private DependResult dependResultBySingleTaskInstance(ProcessInstance processIns
log.info(
"The dependent task is a streaming task, so return depend success. Task code: {}, task name: {}.",
taskInstance.getTaskCode(), taskInstance.getName());
addItemVarPool(taskInstance.getVarPool(), taskInstance.getEndTime().getTime());
return DependResult.SUCCESS;
}
return getDependResultByState(taskInstance.getState());
}
}

/**
* add varPool to dependItemVarPoolMap
*
* @param varPoolStr
* @param endTime
*/
private void addItemVarPool(String varPoolStr, Long endTime) {
List<Property> varPool = new ArrayList<>(JSONUtils.toList(varPoolStr, Property.class));
if (!varPool.isEmpty()) {
Map<String, Property> varPoolPropertyMap = varPool.stream().filter(p -> p.getDirect().equals(Direct.OUT))
.collect(Collectors.toMap(Property::getProp, Function.identity()));
Map<String, Long> varPoolEndTimeMap = varPool.stream().filter(p -> p.getDirect().equals(Direct.OUT))
.collect(Collectors.toMap(Property::getProp, d -> endTime));
dependItemVarPoolPropertyMap.putAll(varPoolPropertyMap);
dependItemVarPoolEndTimeMap.putAll(varPoolEndTimeMap);
}
}

/**
* find the last one process instance that :
* 1. manual run and finish between the interval
Expand Down Expand Up @@ -399,7 +397,13 @@ public DependResult getModelDependResult(Date currentTime, int testFlag) {
DependResult dependResult = getDependResultForItem(dependentItem, currentTime, testFlag);
if (dependResult != DependResult.WAITING && dependResult != DependResult.FAILED) {
dependResultMap.put(dependentItem.getKey(), dependResult);
if (dependentItem.getParameterPassing() && !dependItemVarPoolPropertyMap.isEmpty()) {
DependentUtils.addTaskVarPool(dependItemVarPoolPropertyMap, dependItemVarPoolEndTimeMap,
dependTaskVarPoolPropertyMap, dependTaskVarPoolEndTimeMap);
}
}
dependItemVarPoolPropertyMap.clear();
dependItemVarPoolEndTimeMap.clear();
dependResultList.add(dependResult);
}
return DependentUtils.getDependResultForRelation(this.relation, dependResultList);
Expand All @@ -424,6 +428,14 @@ public Map<String, DependResult> getDependResultMap() {
return dependResultMap;
}

public Map<String, Property> getDependTaskVarPoolPropertyMap() {
return dependTaskVarPoolPropertyMap;
}

public Map<String, Long> getDependTaskVarPoolEndTimeMap() {
return dependTaskVarPoolEndTimeMap;
}

/**
* check for self-dependent
* @param dependentItem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class DependentItem {
private String dateValue;
private DependResult dependResult;
private TaskExecutionStatus status;
private Boolean parameterPassing;

public String getKey() {
return String.format("%d-%d-%s-%s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;

public class DependentUtils {

Expand Down Expand Up @@ -151,4 +153,30 @@ public static List<DateInterval> getDateIntervalList(Date businessDate, String d
return result;
}

/**
* add varPool from dependItemVarPoolMap to dependTaskVarPoolMap
*
* @param dependItemVarPoolPropertyMap dependItemVarPoolPropertyMap
* @param dependItemVarPoolEndTimeMap dependItemVarPoolEndTimeMap
* @param dependTaskVarPoolPropertyMap dependTaskVarPoolPropertyMap
* @param dependTaskVarPoolEndTimeMap dependTaskVarPoolEndTimeMap
*/
public static void addTaskVarPool(Map<String, Property> dependItemVarPoolPropertyMap,
Map<String, Long> dependItemVarPoolEndTimeMap,
Map<String, Property> dependTaskVarPoolPropertyMap,
Map<String, Long> dependTaskVarPoolEndTimeMap) {
dependItemVarPoolPropertyMap.forEach((prop, property) -> {
Long itemEndTime = dependItemVarPoolEndTimeMap.get(prop);
if (dependTaskVarPoolPropertyMap.containsKey(prop)) {
if (itemEndTime < dependTaskVarPoolEndTimeMap.get(prop)) {
dependTaskVarPoolPropertyMap.put(prop, property);
dependTaskVarPoolEndTimeMap.put(prop, itemEndTime);
}
} else {
dependTaskVarPoolPropertyMap.put(prop, property);
dependTaskVarPoolEndTimeMap.put(prop, itemEndTime);
}
});
}

}
1 change: 1 addition & 0 deletions dolphinscheduler-ui/src/locales/en_US/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ export default {
hour: 'hour',
add_dependency: 'Add dependency',
waiting_dependent_start: 'Waiting Dependent start',
dependent_task_parameter_passing: 'Parameter Passing',
check_interval: 'Check interval',
check_interval_tips: 'Check interval must be a positive integer',
waiting_dependent_complete: 'Waiting Dependent complete',
Expand Down
1 change: 1 addition & 0 deletions dolphinscheduler-ui/src/locales/zh_CN/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ export default {
hour: '时',
add_dependency: '添加依赖',
waiting_dependent_start: '等待依赖启动',
dependent_task_parameter_passing: '参数传递',
check_interval: '检查间隔',
check_interval_tips: '检查间隔必须为正整数',
waiting_dependent_complete: '等待依赖完成',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ export function useDependent(model: { [field: string]: any }): IJsonItem[] {
selectOptions.value[i] = options
item.depTaskCode = null
item.definitionCode = null
item.parameterPassing = false
}
},
options: projectList,
Expand Down Expand Up @@ -475,6 +476,13 @@ export function useDependent(model: { [field: string]: any }): IJsonItem[] {
}
}
}),
(j = 0) => ({
type: 'switch',
field: 'parameterPassing',
span: 20,
name: t('project.node.dependent_task_parameter_passing'),
path: `dependTaskList.${i}.dependItemList.${j}.parameterPassing`
}),
(j = 0) => ({
type: 'custom',
field: 'state',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ interface IDependentItem {
cycle?: 'month' | 'week' | 'day' | 'hour'
dateValue?: string
dependentType?: 'DEPENDENT_ON_WORKFLOW' | 'DEPENDENT_ON_TASK'
parameterPassing?: boolean
}

interface IDependTask {
Expand Down

0 comments on commit 5e76468

Please sign in to comment.