Skip to content

Commit

Permalink
[DS-9263][Improvement][master]optimize failover
Browse files Browse the repository at this point in the history
- add FailoverService.java
- move failover method  from MasterRegistryClient to FailoverService
- move failover code from FailoverExecuteThread to FailoverService

This closes #9263
  • Loading branch information
huagetai committed Mar 30, 2022
1 parent 11d9859 commit bdca52c
Show file tree
Hide file tree
Showing 4 changed files with 539 additions and 335 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.service.FailoverService;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
Expand Down Expand Up @@ -77,10 +78,10 @@ public class MasterRegistryClient {
private static final Logger logger = LoggerFactory.getLogger(MasterRegistryClient.class);

/**
* process service
* failover service
*/
@Autowired
private ProcessService processService;
private FailoverService failoverService;

@Autowired
private RegistryClient registryClient;
Expand All @@ -96,16 +97,11 @@ public class MasterRegistryClient {
*/
private ScheduledExecutorService heartBeatExecutor;

@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;

/**
* master startup time, ms
*/
private long startupTime;

private String localNodePath;

public void init() {
this.startupTime = System.currentTimeMillis();
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
Expand Down Expand Up @@ -158,10 +154,7 @@ public void removeMasterNodePath(String path, NodeType nodeType, boolean failove
return;
}

String failoverPath = getFailoverLockPath(nodeType, serverHost);
try {
registryClient.getLock(failoverPath);

if (!registryClient.exists(path)) {
logger.info("path: {} not exists", path);
// handle dead server
Expand All @@ -170,12 +163,10 @@ public void removeMasterNodePath(String path, NodeType nodeType, boolean failove

//failover server
if (failover) {
failoverServerWhenDown(serverHost, nodeType);
failoverService.failoverServerWhenDown(serverHost, nodeType);
}
} catch (Exception e) {
logger.error("{} server failover failed, host:{}", nodeType, serverHost, e);
} finally {
registryClient.releaseLock(failoverPath);
}
}

Expand Down Expand Up @@ -204,287 +195,19 @@ public void removeWorkerNodePath(String path, NodeType nodeType, boolean failove
}
//failover server
if (failover) {
failoverServerWhenDown(serverHost, nodeType);
failoverService.failoverServerWhenDown(serverHost, nodeType);
}
} catch (Exception e) {
logger.error("{} server failover failed", nodeType, e);
}
}

private boolean isNeedToHandleDeadServer(String host, NodeType nodeType, Duration sessionTimeout) {
long sessionTimeoutMillis = Math.max(Constants.REGISTRY_SESSION_TIMEOUT, sessionTimeout.toMillis());
List<Server> serverList = registryClient.getServerList(nodeType);
if (CollectionUtils.isEmpty(serverList)) {
return true;
}
Date startupTime = getServerStartupTime(serverList, host);
if (startupTime == null) {
return true;
}
if (System.currentTimeMillis() - startupTime.getTime() > sessionTimeoutMillis) {
return true;
}
return false;
}

/**
* failover server when server down
*
* @param serverHost server host
* @param nodeType zookeeper node type
*/
private void failoverServerWhenDown(String serverHost, NodeType nodeType) {
switch (nodeType) {
case MASTER:
failoverMaster(serverHost);
break;
case WORKER:
failoverWorker(serverHost);
break;
default:
break;
}
}

/**
* get failover lock path
*
* @param nodeType zookeeper node type
* @return fail over lock path
*/
public String getFailoverLockPath(NodeType nodeType, String host) {
switch (nodeType) {
case MASTER:
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host;
case WORKER:
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host;
default:
return "";
}
}

/**
* task needs failover if task start before worker starts
*
* @param workerServers worker servers
* @param taskInstance task instance
* @return true if task instance need fail over
*/
private boolean checkTaskInstanceNeedFailover(List<Server> workerServers, TaskInstance taskInstance) {

boolean taskNeedFailover = true;

//now no host will execute this task instance,so no need to failover the task
if (taskInstance.getHost() == null) {
return false;
}

//if task start after worker starts, there is no need to failover the task.
if (checkTaskAfterWorkerStart(workerServers, taskInstance)) {
taskNeedFailover = false;
}

return taskNeedFailover;
}

/**
* check task start after the worker server starts.
*
* @param taskInstance task instance
* @return true if task instance start time after worker server start date
*/
private boolean checkTaskAfterWorkerStart(List<Server> workerServers, TaskInstance taskInstance) {
if (StringUtils.isEmpty(taskInstance.getHost())) {
return false;
}
Date workerServerStartDate = getServerStartupTime(workerServers, taskInstance.getHost());
if (workerServerStartDate != null) {
if (taskInstance.getStartTime() == null) {
return taskInstance.getSubmitTime().after(workerServerStartDate);
} else {
return taskInstance.getStartTime().after(workerServerStartDate);
}
}
return false;
}

/**
* get server startup time
*/
private Date getServerStartupTime(List<Server> servers, String host) {
if (CollectionUtils.isEmpty(servers)) {
return null;
}
Date serverStartupTime = null;
for (Server server : servers) {
if (host.equals(server.getHost() + Constants.COLON + server.getPort())) {
serverStartupTime = server.getCreateTime();
break;
}
}
return serverStartupTime;
}

/**
* get server startup time
*/
private Date getServerStartupTime(NodeType nodeType, String host) {
if (StringUtils.isEmpty(host)) {
return null;
}
List<Server> servers = registryClient.getServerList(nodeType);
return getServerStartupTime(servers, host);
}

/**
* failover worker tasks
* <p>
* 1. kill yarn job if there are yarn jobs in tasks.
* 2. change task state from running to need failover.
* 3. failover all tasks when workerHost is null
*
* @param workerHost worker host
*/
private void failoverWorker(String workerHost) {

if (StringUtils.isEmpty(workerHost)) {
return;
}

List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);

long startTime = System.currentTimeMillis();
List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
logger.info("start worker[{}] failover, task list size:{}", workerHost, needFailoverTaskInstanceList.size());

for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
if (processInstance == null) {
processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if (processInstance == null) {
logger.error("failover task instance error, processInstance {} of taskInstance {} is null",
taskInstance.getProcessInstanceId(), taskInstance.getId());
continue;
}
processInstanceCacheMap.put(processInstance.getId(), processInstance);
}

if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) {
continue;
}

// only failover the task owned myself if worker down.
if (!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
continue;
}

logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance);
}
logger.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime);
}

/**
* failover master
* <p>
* failover process instance and associated task instance
*
* @param masterHost master host
*/
public void failoverMaster(String masterHost) {

if (StringUtils.isEmpty(masterHost)) {
return;
}

Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost);
List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);

long startTime = System.currentTimeMillis();
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
logger.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size());

for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
if (Constants.NULL.equals(processInstance.getHost())) {
continue;
}

List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance taskInstance : validTaskInstanceList) {
if (Constants.NULL.equals(taskInstance.getHost())) {
continue;
}
if (taskInstance.getState().typeIsFinished()) {
continue;
}
if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) {
continue;
}
logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance);
}

if (serverStartupTime != null && processInstance.getRestartTime() != null
&& processInstance.getRestartTime().after(serverStartupTime)) {
continue;
}

logger.info("failover process instance id: {}", processInstance.getId());
//updateProcessInstance host is null and insert into command
processService.processNeedFailoverProcessInstances(processInstance);
}

logger.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime);
}

/**
* failover task instance
* <p>
* 1. kill yarn job if there are yarn jobs in tasks.
* 2. change task state from running to need failover.
* 3. try to notify local master
*/
private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance) {
if (taskInstance == null) {
logger.error("failover task instance error, taskInstance is null");
return;
}

if (processInstance == null) {
logger.error("failover task instance error, processInstance {} of taskInstance {} is null",
taskInstance.getProcessInstanceId(), taskInstance.getId());
return;
}

taskInstance.setProcessInstance(processInstance);
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.create();

if (masterConfig.isKillYarnJobWhenTaskFailover()) {
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
}

taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processService.saveTaskInstance(taskInstance);

StateEvent stateEvent = new StateEvent();
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(processInstance.getId());
stateEvent.setExecutionStatus(taskInstance.getState());
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}

/**
* registry
*/
public void registry() {
String address = NetUtils.getAddr(masterConfig.getListenPort());
localNodePath = getMasterPath();
String localNodePath = getMasterPath();
int masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
masterConfig.getMaxCpuLoadAvg(),
Expand Down Expand Up @@ -514,6 +237,7 @@ public void registry() {
}

public void handleConnectionState(ConnectionState state) {
String localNodePath = getMasterPath();
switch (state) {
case CONNECTED:
logger.debug("registry connection state is {}", state);
Expand Down Expand Up @@ -550,15 +274,15 @@ public void deregister() {
/**
* get master path
*/
public String getMasterPath() {
private String getMasterPath() {
String address = getLocalAddress();
return REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + address;
}

/**
* get local address
*/
public String getLocalAddress() {
private String getLocalAddress() {
return NetUtils.getAddr(masterConfig.getListenPort());
}

Expand Down
Loading

0 comments on commit bdca52c

Please sign in to comment.