Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Recover WorkflowInstance will casue workflow Instance state is success but task insatnce is killed/paused #15574

Merged
merged 1 commit into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,6 @@ public void testViewGantt() throws Exception {
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog());
when(processInstanceMapper.queryDetailById(1)).thenReturn(processInstance);
when(taskInstanceMapper.queryByInstanceIdAndName(Mockito.anyInt(), Mockito.any())).thenReturn(taskInstance);
DAG<Long, TaskNode, TaskNodeRelation> graph = new DAG<>();
for (long i = 1; i <= 7; ++i) {
graph.addNode(i, new TaskNode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,10 @@
*/
public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {

List<Integer> queryTaskByProcessIdAndState(@Param("processInstanceId") Integer processInstanceId,
@Param("state") Integer state);

List<TaskInstance> findValidTaskListByProcessId(@Param("processInstanceId") Integer processInstanceId,
@Param("flag") Flag flag,
@Param("testFlag") int testFlag);

List<TaskInstance> queryByHostAndStatus(@Param("host") String host,
@Param("states") int[] stateArray);

int setFailoverByHostAndStateArray(@Param("host") String host,
@Param("states") int[] stateArray,
@Param("destStatus") TaskExecutionStatus destStatus);

TaskInstance queryByInstanceIdAndName(@Param("processInstanceId") int processInstanceId,
@Param("name") String name);

TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int processInstanceId,
@Param("taskCode") Long taskCode);

Expand All @@ -66,9 +53,6 @@ TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int processIns
List<TaskInstance> queryByProcessInstanceIdsAndTaskCodes(@Param("processInstanceIds") List<Integer> processInstanceIds,
@Param("taskCodes") List<Long> taskCodes);

Integer countTask(@Param("projectCodes") Long[] projectCodes,
@Param("taskIds") int[] taskIds);

/**
* Statistics task instance group by given project codes list by start time
* <p>
Expand Down Expand Up @@ -97,20 +81,6 @@ List<ExecuteStatusCount> countTaskInstanceStateByProjectIdsV2(@Param("startTime"
@Param("endTime") Date endTime,
@Param("projectIds") Set<Integer> projectIds);

/**
* Statistics task instance group by given project codes list by submit time
* <p>
* We only need project codes to determine whether the task instance belongs to the user or not.
*
* @param startTime Statistics start time
* @param endTime Statistics end time
* @param projectCodes Project codes list to filter
* @return List of ExecuteStatusCount
*/
List<ExecuteStatusCount> countTaskInstanceStateByProjectCodesAndStatesBySubmitTime(@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
@Param("projectCodes") Long[] projectCodes,
@Param("states") List<TaskExecutionStatus> states);
/**
* Statistics task instance group by given project codes list by submit time
* <p>
Expand Down Expand Up @@ -159,9 +129,6 @@ IPage<TaskInstance> queryStreamTaskInstanceListPaging(IPage<TaskInstance> page,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime);

List<TaskInstance> loadAllInfosNoRelease(@Param("processInstanceId") int processInstanceId,
@Param("status") int status);

void deleteByWorkflowInstanceId(@Param("workflowInstanceId") int workflowInstanceId);

List<TaskInstance> findByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ List<TaskInstance> queryLastTaskInstanceListIntervalInProcessInstance(Integer pr

/**
* find last task instance corresponding to taskCode in the date interval
*
* @param processInstanceId Task's parent process instance id
* @param depTaskCode taskCode
* @param testFlag test flag
* @param depTaskCode taskCode
* @param testFlag test flag
* @return task instance
*/
TaskInstance queryLastTaskInstanceIntervalInProcessInstance(Integer processInstanceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,6 @@
${alias}.flag, ${alias}.is_cache, ${alias}.cache_key, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id,
${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run, ${alias}.test_flag, ${alias}.task_group_id, ${alias}.task_execute_type
</sql>
<update id="setFailoverByHostAndStateArray">
update t_ds_task_instance
set state = #{destStatus}
where host = #{host}
<if test="states != null and states.length != 0">
and state in
<foreach collection="states" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
</update>
<select id="queryTaskByProcessIdAndState" resultType="java.lang.Integer">
select id
from t_ds_task_instance
WHERE process_instance_id = #{processInstanceId}
and state = #{state}
and flag = 1
</select>
<select id="findValidTaskListByProcessId" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
Expand All @@ -63,21 +45,6 @@
from t_ds_task_instance
WHERE process_instance_id = #{workflowInstanceId}
</select>
<select id="queryByHostAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
from t_ds_task_instance
where 1 = 1
<if test="host != null and host != ''">
and host = #{host}
</if>
<if test="states != null and states.length != 0">
and state in
<foreach collection="states" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
</select>

<select id="countTaskInstanceStateByProjectCodes" resultType="org.apache.dolphinscheduler.dao.model.TaskInstanceStatusCountDto">
select state, count(0) as count
Expand Down Expand Up @@ -118,32 +85,7 @@
</if>
group by t.state
</select>
<select id="countTaskInstanceStateByProjectCodesAndStatesBySubmitTime" resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount">
select
state, count(0) as count
from t_ds_task_instance t
left join t_ds_task_definition_log d on d.code=t.task_code and d.version=t.task_definition_version
where 1=1
<if test="states != null and states.size != 0">
and t.state in
<foreach collection="states" index="index" item="state" open="(" separator="," close=")">
#{state.code}
</foreach>
</if>
<if test="projectCodes != null and projectCodes.length != 0">
and d.project_code in
<foreach collection="projectCodes" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
<if test="startTime != null">
and t.submit_time <![CDATA[ > ]]> #{startTime}
</if>
<if test="endTime != null">
and t.submit_time <![CDATA[ <= ]]> #{endTime}
</if>
group by t.state
</select>

<select id="countTaskInstanceStateByProjectCodesAndStatesBySubmitTimeV2" resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount">
select
state, count(0) as count
Expand Down Expand Up @@ -181,15 +123,7 @@
</if>
group by t.state
</select>
<select id="queryByInstanceIdAndName" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
from t_ds_task_instance
where process_instance_id = #{processInstanceId}
and name = #{name}
and flag = 1
limit 1
</select>

<select id="queryByInstanceIdAndCode" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
Expand Down Expand Up @@ -229,24 +163,7 @@
</foreach>
</if>
</select>
<select id="countTask" resultType="java.lang.Integer">
select count(1) as count
from t_ds_task_instance task,t_ds_task_definition_log define
where task.task_code=define.code
and task.task_definition_version=define.version
<if test="projectCodes != null and projectCodes.length != 0">
and define.project_code in
<foreach collection="projectCodes" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
<if test="taskIds != null and taskIds.length != 0">
and task.id in
<foreach collection="taskIds" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
</select>

<select id="queryTaskInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
Expand Down Expand Up @@ -330,16 +247,6 @@
</if>
order by start_time desc
</select>
<select id="loadAllInfosNoRelease" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSqlV2">
<property name="alias" value="instance"/>
</include>
from t_ds_task_instance instance
left join t_ds_task_group_queue que on instance.id = que.task_id
where instance.process_instance_id = #{processInstanceId}
and que.status = #{status}
</select>
<select id="findLastTaskInstances" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSqlV2">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,30 +197,6 @@ public void testQueryProcessInstanceListPaging() {
processInstanceMapper.deleteById(processInstance.getId());
}

/**
* test set failover by host and state
*/
@Test
public void testSetFailoverByHostAndStateArray() {

int[] stateArray = new int[]{
WorkflowExecutionStatus.RUNNING_EXECUTION.ordinal(),
WorkflowExecutionStatus.SUCCESS.ordinal()};

ProcessInstance processInstance = insertOne();

processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
processInstance.setHost("192.168.2.220");
processInstanceMapper.updateById(processInstance);
String host = processInstance.getHost();
int update = processInstanceMapper.setFailoverByHostAndStateArray(host, stateArray);
Assertions.assertNotEquals(0, update);

processInstance = processInstanceMapper.selectById(processInstance.getId());
Assertions.assertNull(processInstance.getHost());
processInstanceMapper.deleteById(processInstance.getId());
}

/**
* test update process instance by state
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,25 +143,6 @@ public void testQuery() {
Assertions.assertNotEquals(0, taskInstances.size());
}

/**
* test query task instance by process instance id and state
*/
@Test
public void testQueryTaskByProcessIdAndState() {
// insert ProcessInstance
ProcessInstance processInstance = insertProcessInstance();

// insert taskInstance
TaskInstance task = insertTaskInstance(processInstance.getId());
task.setProcessInstanceId(processInstance.getId());
taskInstanceMapper.updateById(task);
List<Integer> taskInstances = taskInstanceMapper.queryTaskByProcessIdAndState(
task.getProcessInstanceId(),
TaskExecutionStatus.RUNNING_EXECUTION.getCode());
taskInstanceMapper.deleteById(task.getId());
Assertions.assertNotEquals(0, taskInstances.size());
}

/**
* test find valid task list by process instance id
*/
Expand Down Expand Up @@ -194,66 +175,6 @@ public void testFindValidTaskListByProcessId() {
Assertions.assertNotEquals(0, taskInstances1.size());
}

/**
* test query by host and status
*/
@Test
public void testQueryByHostAndStatus() {
// insert ProcessInstance
ProcessInstance processInstance = insertProcessInstance();

// insert taskInstance
TaskInstance task = insertTaskInstance(processInstance.getId());
task.setHost("111.111.11.11");
taskInstanceMapper.updateById(task);

List<TaskInstance> taskInstances = taskInstanceMapper.queryByHostAndStatus(
task.getHost(), new int[]{TaskExecutionStatus.RUNNING_EXECUTION.getCode()});
taskInstanceMapper.deleteById(task.getId());
Assertions.assertNotEquals(0, taskInstances.size());
}

/**
* test set failover by host and state array
*/
@Test
public void testSetFailoverByHostAndStateArray() {
// insert ProcessInstance
ProcessInstance processInstance = insertProcessInstance();

// insert taskInstance
TaskInstance task = insertTaskInstance(processInstance.getId());
task.setHost("111.111.11.11");
taskInstanceMapper.updateById(task);

int setResult = taskInstanceMapper.setFailoverByHostAndStateArray(
task.getHost(),
new int[]{TaskExecutionStatus.RUNNING_EXECUTION.getCode()},
TaskExecutionStatus.NEED_FAULT_TOLERANCE);
taskInstanceMapper.deleteById(task.getId());
Assertions.assertNotEquals(0, setResult);
}

/**
* test query by task instance id and name
*/
@Test
public void testQueryByInstanceIdAndName() {
// insert ProcessInstance
ProcessInstance processInstance = insertProcessInstance();

// insert taskInstance
TaskInstance task = insertTaskInstance(processInstance.getId());
task.setHost("111.111.11.11");
taskInstanceMapper.updateById(task);

TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(
task.getProcessInstanceId(),
task.getName());
taskInstanceMapper.deleteById(task.getId());
Assertions.assertNotEquals(null, taskInstance);
}

/**
* test query by task instance id and code
*/
Expand Down Expand Up @@ -294,37 +215,6 @@ public void testQueryByProcessInstanceIdsAndTaskCodes() {
Assertions.assertEquals(1, taskInstances.size());
}

/**
* test count task instance
*/
@Test
public void testCountTask() {
// insert ProcessInstance
ProcessInstance processInstance = insertProcessInstance();

// insert taskInstance
TaskInstance task = insertTaskInstance(processInstance.getId());
ProcessDefinition definition = new ProcessDefinition();
definition.setCode(1L);
definition.setProjectCode(1111L);
definition.setCreateTime(new Date());
definition.setUpdateTime(new Date());
processDefinitionMapper.insert(definition);
taskInstanceMapper.updateById(task);

int countTask = taskInstanceMapper.countTask(
new Long[0],
new int[0]);
int countTask2 = taskInstanceMapper.countTask(
new Long[]{definition.getProjectCode()},
new int[]{task.getId()});
taskInstanceMapper.deleteById(task.getId());
processDefinitionMapper.deleteById(definition.getId());
Assertions.assertEquals(0, countTask);
Assertions.assertEquals(0, countTask2);

}

/**
* test count task instance state by user
*/
Expand Down
Loading
Loading