Skip to content

Commit

Permalink
remove sub workflow finish notify (apache#15057)
Browse files Browse the repository at this point in the history
Co-authored-by: xiangzihao <460888207@qq.com>
  • Loading branch information
2 people authored and xdu-chenrj committed Oct 30, 2023
1 parent d08c11c commit f8305f8
Showing 1 changed file with 0 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,15 @@

package org.apache.dolphinscheduler.server.master.runner;

import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
import org.apache.dolphinscheduler.service.process.ProcessService;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.annotation.PostConstruct;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -49,8 +34,6 @@
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import com.google.common.base.Strings;

/**
* Used to execute {@link WorkflowExecuteRunnable}.
*/
Expand All @@ -61,9 +44,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@Autowired
private MasterConfig masterConfig;

@Autowired
private ProcessService processService;

@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;

Expand Down Expand Up @@ -122,8 +102,6 @@ public void onFailure(Throwable ex) {
LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId);
try {
log.error("Workflow instance events handle failed", ex);
notifyProcessChanged(
workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance());
multiThreadFilterMap.remove(workflowInstanceId);
} finally {
LogUtils.removeWorkflowInstanceIdMDC();
Expand All @@ -140,8 +118,6 @@ public void onSuccess(Object result) {
.removeProcess4TimeoutCheck(workflowExecuteThread.getWorkflowExecuteContext()
.getWorkflowInstance().getId());
processInstanceExecCacheManager.removeByProcessInstanceId(workflowInstanceId);
notifyProcessChanged(
workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance());
log.info("Workflow instance is finished.");
}
} catch (Exception e) {
Expand All @@ -155,83 +131,4 @@ public void onSuccess(Object result) {
});
}

/**
* notify process change
*/
private void notifyProcessChanged(ProcessInstance finishProcessInstance) {
if (Flag.NO == finishProcessInstance.getIsSubProcess()) {
return;
}
Map<ProcessInstance, TaskInstance> fatherMaps = processService.notifyProcessList(finishProcessInstance.getId());
for (Map.Entry<ProcessInstance, TaskInstance> entry : fatherMaps.entrySet()) {
ProcessInstance processInstance = entry.getKey();
TaskInstance taskInstance = entry.getValue();
crossWorkflowParameterPassing(finishProcessInstance, taskInstance);
String address = NetUtils.getAddr(masterConfig.getListenPort());
try {
LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(), taskInstance.getId());
if (processInstance.getHost().equalsIgnoreCase(address)) {
log.info("Process host is local master, will notify it");
this.notifyMyself(processInstance, taskInstance);
} else {
log.info("Process host is remote master, will notify it");
this.notifyProcess(finishProcessInstance, processInstance, taskInstance);
}
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}

private void crossWorkflowParameterPassing(ProcessInstance finishProcessInstance, TaskInstance taskInstance) {
try {
MasterTaskExecuteRunnable masterTaskExecuteRunnable =
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskInstance.getId());
masterTaskExecuteRunnable.getILogicTask().getTaskParameters()
.setVarPool(finishProcessInstance.getVarPool());
log.info("Cross workflow parameter passing success, finishProcessInstanceId: {}, taskInstanceId: {}",
finishProcessInstance.getId(), taskInstance.getId());
} catch (Exception ex) {
log.error("Cross workflow parameter passing error, finishProcessInstanceId: {}, taskInstanceId: {}",
finishProcessInstance.getId(), taskInstance.getId(), ex);
}
}

/**
* notify myself
*/
private void notifyMyself(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
if (!processInstanceExecCacheManager.contains(processInstance.getId())) {
log.warn("The execute cache manager doesn't contains this workflow instance");
return;
}
TaskStateEvent stateEvent = TaskStateEvent.builder()
.processInstanceId(processInstance.getId())
.taskInstanceId(taskInstance.getId())
.type(StateEventType.TASK_STATE_CHANGE)
.status(TaskExecutionStatus.RUNNING_EXECUTION)
.build();
this.submitStateEvent(stateEvent);
}

/**
* notify process's master
*/
private void notifyProcess(ProcessInstance finishProcessInstance, ProcessInstance processInstance,
TaskInstance taskInstance) {
String processInstanceHost = processInstance.getHost();
if (Strings.isNullOrEmpty(processInstanceHost)) {
log.error("Process {} host is empty, cannot notify task {} now, taskId: {}", processInstance.getName(),
taskInstance.getName(), taskInstance.getId());
return;
}
ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(processInstanceHost, ITaskInstanceExecutionEventListener.class);

WorkflowInstanceStateChangeEvent workflowInstanceStateChangeEvent = new WorkflowInstanceStateChangeEvent(
finishProcessInstance.getId(), 0, finishProcessInstance.getState(), processInstance.getId(),
taskInstance.getId());
iTaskInstanceExecutionEventListener.onWorkflowInstanceInstanceStateChange(workflowInstanceStateChangeEvent);
}
}

0 comments on commit f8305f8

Please sign in to comment.