Skip to content

Commit

Permalink
Fix TaskGroupQueue will never be wakeup due to wakeup failed at one time
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Jan 31, 2024
1 parent 25810a8 commit b9e452e
Show file tree
Hide file tree
Showing 49 changed files with 1,453 additions and 918 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ jobs:
fail-fast: false
matrix:
db: ["mysql", "postgresql"]
version: ["2.0.9", "3.0.6", "3.1.8", "3.2.0"]
version: ["2.0.9", "3.0.6", "3.1.9", "3.2.0"]
steps:
- name: Set up JDK 8
uses: actions/setup-java@v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public Result modifyPriority(@Parameter(hidden = true) @RequestAttribute(value =
* @param pageSize page size
* @return queue list
*/
@Operation(summary = "queryTasksByGroupId", description = "QUERY_ALL_TASKS_GROUP_NOTES")
@Operation(summary = "queryTaskGroupQueuesByGroupId", description = "QUERY_TASKS_GROUP_GROUP_QUEUES")
@Parameters({
@Parameter(name = "groupId", description = "GROUP_ID", required = false, schema = @Schema(implementation = int.class, example = "1", defaultValue = "-1")),
@Parameter(name = "taskInstanceName", description = "TASK_INSTANCE_NAME", required = false, schema = @Schema(implementation = String.class, example = "taskName")),
Expand All @@ -310,15 +310,21 @@ public Result modifyPriority(@Parameter(hidden = true) @RequestAttribute(value =
@GetMapping(value = "/query-list-by-group-id")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_GROUP_QUEUE_LIST_ERROR)
public Result queryTasksByGroupId(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "groupId", required = false, defaultValue = "-1") Integer groupId,
@RequestParam(value = "taskInstanceName", required = false) String taskName,
@RequestParam(value = "processInstanceName", required = false) String processName,
@RequestParam(value = "status", required = false) Integer status,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Map<String, Object> result = taskGroupQueueService.queryTasksByGroupId(loginUser, taskName, processName, status,
groupId, pageNo, pageSize);
public Result queryTaskGroupQueues(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "groupId", required = false, defaultValue = "-1") Integer groupId,
@RequestParam(value = "taskInstanceName", required = false) String taskName,
@RequestParam(value = "processInstanceName", required = false) String processName,
@RequestParam(value = "status", required = false) Integer status,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Map<String, Object> result = taskGroupQueueService.queryTasksByGroupId(
loginUser,
taskName,
processName,
status,
groupId,
pageNo,
pageSize);
return returnDataList(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,6 @@ Map<String, Object> createTaskGroup(User loginUser, Long projectCode, String nam
Map<String, Object> updateTaskGroup(User loginUser, int id, String name,
String description, int groupSize);

/**
* get task group status
*
* @param id task group id
* @return the result code and msg
*/
boolean isTheTaskGroupAvailable(int id);

/**
* query all task group by user id
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.model.Server;
Expand Down Expand Up @@ -83,14 +82,12 @@
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.master.IStreamingTaskOperator;
import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
import org.apache.dolphinscheduler.extract.master.IWorkflowInstanceService;
import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerRequest;
import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerResponse;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartRequest;
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.service.command.CommandService;
Expand Down Expand Up @@ -552,16 +549,20 @@ public Map<String, Object> forceStartTaskInstance(User loginUser, int queueId) {
Map<String, Object> result = new HashMap<>();
TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.selectById(queueId);
// check process instance exist
ProcessInstance processInstance = processInstanceMapper.selectById(taskGroupQueue.getProcessId());
if (processInstance == null) {
log.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.",
taskGroupQueue.getProjectCode(), taskGroupQueue.getProcessId());
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId());
return result;
ProcessInstance processInstance = processInstanceDao.queryOptionalById(taskGroupQueue.getProcessId())
.orElseThrow(
() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId()));

Check notice

Code scanning / CodeQL

Unread local variable Note

Variable 'ProcessInstance processInstance' is never read.
checkMasterExists();

if (taskGroupQueue.getInQueue() == Flag.NO.getCode()) {
throw new ServiceException(Status.TASK_GROUP_QUEUE_ALREADY_START);
}
taskGroupQueue.setForceStart(Flag.YES.getCode());
taskGroupQueue.setUpdateTime(new Date());
taskGroupQueueMapper.updateById(taskGroupQueue);

checkMasterExists();
return forceStart(processInstance, taskGroupQueue);
result.put(Constants.STATUS, Status.SUCCESS);
return result;
}

public void checkStartNodeList(String startNodeList, Long processDefinitionCode, int version) {
Expand Down Expand Up @@ -664,32 +665,6 @@ private Map<String, Object> updateProcessInstancePrepare(ProcessInstance process
return result;
}

/**
* prepare to update process instance command type and status
*
* @param processInstance process instance
* @return update result
*/
private Map<String, Object> forceStart(ProcessInstance processInstance, TaskGroupQueue taskGroupQueue) {
Map<String, Object> result = new HashMap<>();
if (taskGroupQueue.getStatus() != TaskGroupQueueStatus.WAIT_QUEUE) {
log.warn("Task group queue already starts, taskGroupQueueId:{}.", taskGroupQueue.getId());
putMsg(result, Status.TASK_GROUP_QUEUE_ALREADY_START);
return result;
}

taskGroupQueue.setForceStart(Flag.YES.getCode());
taskGroupQueue.setUpdateTime(new Date());
processService.updateTaskGroupQueue(taskGroupQueue);
log.info("Sending force start command to master: {}.", processInstance.getHost());
ILogicTaskInstanceOperator iLogicTaskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(processInstance.getHost(), ILogicTaskInstanceOperator.class);
iLogicTaskInstanceOperator.forceStartTaskInstance(
new TaskInstanceForceStartRequest(processInstance.getId(), taskGroupQueue.getTaskId()));
putMsg(result, Status.SUCCESS);
return result;
}

/**
* check whether sub processes are offline before starting process definition
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr
* @return tasks list
*/
@Override
public Map<String, Object> queryTasksByGroupId(User loginUser, String taskName, String processName, Integer status,
int groupId, int pageNo, int pageSize) {
public Map<String, Object> queryTasksByGroupId(User loginUser,
String taskName,
String processName,
Integer status,
int groupId,
int pageNo,
int pageSize) {
Map<String, Object> result = new HashMap<>();
Page<TaskGroupQueue> page = new Page<>(pageNo, pageSize);
PageInfo<TaskGroupQueue> pageInfo = new PageInfo<>(pageNo, pageSize);
Expand All @@ -79,8 +84,13 @@ public Map<String, Object> queryTasksByGroupId(User loginUser, String taskName,
return result;
}
List<Project> projects = projectMapper.selectBatchIds(projectIds);
IPage<TaskGroupQueue> taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueueByTaskGroupIdPaging(page,
taskName, processName, status, groupId, projects);
IPage<TaskGroupQueue> taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueueByTaskGroupIdPaging(
page,
taskName,
processName,
status,
groupId,
projects);

pageInfo.setTotal((int) taskGroupQueue.getTotal());
pageInfo.setTotalList(taskGroupQueue.getRecords());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public Map<String, Object> createTaskGroup(User loginUser, Long projectCode, Str
.description(description)
.groupSize(groupSize)
.userId(loginUser.getId())
.status(Flag.YES.getCode())
.status(Flag.YES)
.createTime(now)
.updateTime(now)
.build();
Expand Down Expand Up @@ -180,7 +180,7 @@ public Map<String, Object> updateTaskGroup(User loginUser, int id, String name,
putMsg(result, Status.TASK_GROUP_NAME_EXSIT);
return result;
}
if (taskGroup.getStatus() != Flag.YES.getCode()) {
if (taskGroup.getStatus() != Flag.YES) {
log.warn("Task group has been closed, taskGroupId:{}.", id);
putMsg(result, Status.TASK_GROUP_STATUS_ERROR);
return result;
Expand All @@ -202,17 +202,6 @@ public Map<String, Object> updateTaskGroup(User loginUser, int id, String name,
return result;
}

/**
* get task group status
*
* @param id task group id
* @return is the task group available
*/
@Override
public boolean isTheTaskGroupAvailable(int id) {
return taskGroupMapper.selectCountByIdStatus(id, Flag.YES.getCode()) == 1;
}

/**
* query all task group by user id
*
Expand Down Expand Up @@ -331,12 +320,12 @@ public Map<String, Object> closeTaskGroup(User loginUser, int id) {
return result;
}
TaskGroup taskGroup = taskGroupMapper.selectById(id);
if (taskGroup.getStatus() == Flag.NO.getCode()) {
if (taskGroup.getStatus() == Flag.NO) {
log.info("Task group has been closed, taskGroupId:{}.", id);
putMsg(result, Status.TASK_GROUP_STATUS_CLOSED);
return result;
}
taskGroup.setStatus(Flag.NO.getCode());
taskGroup.setStatus(Flag.NO);
int update = taskGroupMapper.updateById(taskGroup);
if (update > 0)
log.info("Task group close complete, taskGroupId:{}.", id);
Expand Down Expand Up @@ -364,12 +353,12 @@ public Map<String, Object> startTaskGroup(User loginUser, int id) {
return result;
}
TaskGroup taskGroup = taskGroupMapper.selectById(id);
if (taskGroup.getStatus() == Flag.YES.getCode()) {
if (taskGroup.getStatus() == Flag.YES) {
log.info("Task group has been started, taskGroupId:{}.", id);
putMsg(result, Status.TASK_GROUP_STATUS_OPENED);
return result;
}
taskGroup.setStatus(Flag.YES.getCode());
taskGroup.setStatus(Flag.YES);
taskGroup.setUpdateTime(new Date(System.currentTimeMillis()));
int update = taskGroupMapper.updateById(taskGroup);
if (update > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private TaskGroup getTaskGroup() {
.description(taskGroupDesc)
.groupSize(100)
.userId(1)
.status(Flag.YES.getCode())
.status(Flag.YES)
.build();

return taskGroup;
Expand Down Expand Up @@ -204,7 +204,7 @@ public void testUpdate() {

User loginUser = getLoginUser();
TaskGroup taskGroup = getTaskGroup();
taskGroup.setStatus(Flag.YES.getCode());
taskGroup.setStatus(Flag.YES);
// Task group status error

Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.TASK_GROUP,
Expand All @@ -218,7 +218,6 @@ public void testUpdate() {
logger.info(result.toString());
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));

taskGroup.setStatus(0);
}

@Test
Expand All @@ -236,12 +235,12 @@ public void testCloseAndStart() {
Map<String, Object> result = taskGroupService.closeTaskGroup(loginUser, 1);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));

taskGroup.setStatus(0);
taskGroup.setStatus(Flag.NO);
Mockito.when(taskGroupMapper.selectById(1)).thenReturn(taskGroup);
result = taskGroupService.closeTaskGroup(loginUser, 1);
Assertions.assertEquals(Status.TASK_GROUP_STATUS_CLOSED, result.get(Constants.STATUS));

taskGroup.setStatus(1);
taskGroup.setStatus(Flag.YES);
Mockito.when(taskGroupMapper.selectById(1)).thenReturn(taskGroup);
result = taskGroupService.startTaskGroup(loginUser, 1);
Assertions.assertEquals(Status.TASK_GROUP_STATUS_OPENED, result.get(Constants.STATUS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.dolphinscheduler.dao.entity;

import org.apache.dolphinscheduler.common.enums.Flag;

import java.io.Serializable;
import java.util.Date;

Expand All @@ -36,44 +38,17 @@
@TableName("t_ds_task_group")
public class TaskGroup implements Serializable {

/**
* key
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* task_group name
*/
private String name;

private long projectCode;
private String description;
/**
* 作业组大小
*/
private int groupSize;
/**
* 已使用作业组大小
*/
private int useSize;
/**
* creator id
*/
private int userId;
/**
* 0 not available, 1 available
*/
private Integer status;
/**
* create time
*/
private Flag status;
private Date createTime;
/**
* update time
*/
private Date updateTime;
/**
* project code
*/
private long projectCode;

}
Loading

0 comments on commit b9e452e

Please sign in to comment.