From bdca52c902dda81f081afbe893770b211eff9916 Mon Sep 17 00:00:00 2001 From: wangrui <7039986@qq.com> Date: Wed, 30 Mar 2022 14:52:26 +0800 Subject: [PATCH] [DS-9263][Improvement][master]optimize failover - add FailoverService.java - move failover method from MasterRegistryClient to FailoverService - move failover code from FailoverExecuteThread to FailoverService This closes #9263 --- .../master/registry/MasterRegistryClient.java | 294 +------------- .../master/runner/FailoverExecuteThread.java | 54 +-- .../master/service/FailoverService.java | 369 ++++++++++++++++++ .../master/service/FailoverServiceTest.java | 157 ++++++++ 4 files changed, 539 insertions(+), 335 deletions(-) create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java create mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index abf1734af43d..c04cadceb4ec 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; +import org.apache.dolphinscheduler.server.master.service.FailoverService; import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -77,10 +78,10 @@ public class MasterRegistryClient { private static final Logger logger = LoggerFactory.getLogger(MasterRegistryClient.class); /** - * process service + * failover service */ @Autowired - private ProcessService processService; + private FailoverService failoverService; @Autowired private RegistryClient registryClient; @@ -96,16 +97,11 @@ public class MasterRegistryClient { */ private ScheduledExecutorService heartBeatExecutor; - @Autowired - private WorkflowExecuteThreadPool workflowExecuteThreadPool; - /** * master startup time, ms */ private long startupTime; - private String localNodePath; - public void init() { this.startupTime = System.currentTimeMillis(); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); @@ -158,10 +154,7 @@ public void removeMasterNodePath(String path, NodeType nodeType, boolean failove return; } - String failoverPath = getFailoverLockPath(nodeType, serverHost); try { - registryClient.getLock(failoverPath); - if (!registryClient.exists(path)) { logger.info("path: {} not exists", path); // handle dead server @@ -170,12 +163,10 @@ public void removeMasterNodePath(String path, NodeType nodeType, boolean failove //failover server if (failover) { - failoverServerWhenDown(serverHost, nodeType); + failoverService.failoverServerWhenDown(serverHost, nodeType); } } catch (Exception e) { logger.error("{} server failover failed, host:{}", nodeType, serverHost, e); - } finally { - registryClient.releaseLock(failoverPath); } } @@ -204,287 +195,19 @@ public void removeWorkerNodePath(String path, NodeType nodeType, boolean failove } //failover server if (failover) { - failoverServerWhenDown(serverHost, nodeType); + failoverService.failoverServerWhenDown(serverHost, nodeType); } } catch (Exception e) { logger.error("{} server failover failed", nodeType, e); } } - private boolean isNeedToHandleDeadServer(String host, NodeType nodeType, Duration sessionTimeout) { - long sessionTimeoutMillis = Math.max(Constants.REGISTRY_SESSION_TIMEOUT, sessionTimeout.toMillis()); - List serverList = registryClient.getServerList(nodeType); - if (CollectionUtils.isEmpty(serverList)) { - return true; - } - Date startupTime = getServerStartupTime(serverList, host); - if (startupTime == null) { - return true; - } - if (System.currentTimeMillis() - startupTime.getTime() > sessionTimeoutMillis) { - return true; - } - return false; - } - - /** - * failover server when server down - * - * @param serverHost server host - * @param nodeType zookeeper node type - */ - private void failoverServerWhenDown(String serverHost, NodeType nodeType) { - switch (nodeType) { - case MASTER: - failoverMaster(serverHost); - break; - case WORKER: - failoverWorker(serverHost); - break; - default: - break; - } - } - - /** - * get failover lock path - * - * @param nodeType zookeeper node type - * @return fail over lock path - */ - public String getFailoverLockPath(NodeType nodeType, String host) { - switch (nodeType) { - case MASTER: - return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host; - case WORKER: - return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host; - default: - return ""; - } - } - - /** - * task needs failover if task start before worker starts - * - * @param workerServers worker servers - * @param taskInstance task instance - * @return true if task instance need fail over - */ - private boolean checkTaskInstanceNeedFailover(List workerServers, TaskInstance taskInstance) { - - boolean taskNeedFailover = true; - - //now no host will execute this task instance,so no need to failover the task - if (taskInstance.getHost() == null) { - return false; - } - - //if task start after worker starts, there is no need to failover the task. - if (checkTaskAfterWorkerStart(workerServers, taskInstance)) { - taskNeedFailover = false; - } - - return taskNeedFailover; - } - - /** - * check task start after the worker server starts. - * - * @param taskInstance task instance - * @return true if task instance start time after worker server start date - */ - private boolean checkTaskAfterWorkerStart(List workerServers, TaskInstance taskInstance) { - if (StringUtils.isEmpty(taskInstance.getHost())) { - return false; - } - Date workerServerStartDate = getServerStartupTime(workerServers, taskInstance.getHost()); - if (workerServerStartDate != null) { - if (taskInstance.getStartTime() == null) { - return taskInstance.getSubmitTime().after(workerServerStartDate); - } else { - return taskInstance.getStartTime().after(workerServerStartDate); - } - } - return false; - } - - /** - * get server startup time - */ - private Date getServerStartupTime(List servers, String host) { - if (CollectionUtils.isEmpty(servers)) { - return null; - } - Date serverStartupTime = null; - for (Server server : servers) { - if (host.equals(server.getHost() + Constants.COLON + server.getPort())) { - serverStartupTime = server.getCreateTime(); - break; - } - } - return serverStartupTime; - } - - /** - * get server startup time - */ - private Date getServerStartupTime(NodeType nodeType, String host) { - if (StringUtils.isEmpty(host)) { - return null; - } - List servers = registryClient.getServerList(nodeType); - return getServerStartupTime(servers, host); - } - - /** - * failover worker tasks - *

- * 1. kill yarn job if there are yarn jobs in tasks. - * 2. change task state from running to need failover. - * 3. failover all tasks when workerHost is null - * - * @param workerHost worker host - */ - private void failoverWorker(String workerHost) { - - if (StringUtils.isEmpty(workerHost)) { - return; - } - - List workerServers = registryClient.getServerList(NodeType.WORKER); - - long startTime = System.currentTimeMillis(); - List needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); - Map processInstanceCacheMap = new HashMap<>(); - logger.info("start worker[{}] failover, task list size:{}", workerHost, needFailoverTaskInstanceList.size()); - - for (TaskInstance taskInstance : needFailoverTaskInstanceList) { - ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId()); - if (processInstance == null) { - processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); - if (processInstance == null) { - logger.error("failover task instance error, processInstance {} of taskInstance {} is null", - taskInstance.getProcessInstanceId(), taskInstance.getId()); - continue; - } - processInstanceCacheMap.put(processInstance.getId(), processInstance); - } - - if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) { - continue; - } - - // only failover the task owned myself if worker down. - if (!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) { - continue; - } - - logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId()); - failoverTaskInstance(processInstance, taskInstance); - } - logger.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime); - } - - /** - * failover master - *

- * failover process instance and associated task instance - * - * @param masterHost master host - */ - public void failoverMaster(String masterHost) { - - if (StringUtils.isEmpty(masterHost)) { - return; - } - - Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost); - List workerServers = registryClient.getServerList(NodeType.WORKER); - - long startTime = System.currentTimeMillis(); - List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); - logger.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size()); - - for (ProcessInstance processInstance : needFailoverProcessInstanceList) { - if (Constants.NULL.equals(processInstance.getHost())) { - continue; - } - - List validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); - for (TaskInstance taskInstance : validTaskInstanceList) { - if (Constants.NULL.equals(taskInstance.getHost())) { - continue; - } - if (taskInstance.getState().typeIsFinished()) { - continue; - } - if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) { - continue; - } - logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId()); - failoverTaskInstance(processInstance, taskInstance); - } - - if (serverStartupTime != null && processInstance.getRestartTime() != null - && processInstance.getRestartTime().after(serverStartupTime)) { - continue; - } - - logger.info("failover process instance id: {}", processInstance.getId()); - //updateProcessInstance host is null and insert into command - processService.processNeedFailoverProcessInstances(processInstance); - } - - logger.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime); - } - - /** - * failover task instance - *

- * 1. kill yarn job if there are yarn jobs in tasks. - * 2. change task state from running to need failover. - * 3. try to notify local master - */ - private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance) { - if (taskInstance == null) { - logger.error("failover task instance error, taskInstance is null"); - return; - } - - if (processInstance == null) { - logger.error("failover task instance error, processInstance {} of taskInstance {} is null", - taskInstance.getProcessInstanceId(), taskInstance.getId()); - return; - } - - taskInstance.setProcessInstance(processInstance); - TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() - .buildTaskInstanceRelatedInfo(taskInstance) - .buildProcessInstanceRelatedInfo(processInstance) - .create(); - - if (masterConfig.isKillYarnJobWhenTaskFailover()) { - // only kill yarn job if exists , the local thread has exited - ProcessUtils.killYarnJob(taskExecutionContext); - } - - taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); - processService.saveTaskInstance(taskInstance); - - StateEvent stateEvent = new StateEvent(); - stateEvent.setTaskInstanceId(taskInstance.getId()); - stateEvent.setType(StateEventType.TASK_STATE_CHANGE); - stateEvent.setProcessInstanceId(processInstance.getId()); - stateEvent.setExecutionStatus(taskInstance.getState()); - workflowExecuteThreadPool.submitStateEvent(stateEvent); - } - /** * registry */ public void registry() { String address = NetUtils.getAddr(masterConfig.getListenPort()); - localNodePath = getMasterPath(); + String localNodePath = getMasterPath(); int masterHeartbeatInterval = masterConfig.getHeartbeatInterval(); HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, masterConfig.getMaxCpuLoadAvg(), @@ -514,6 +237,7 @@ public void registry() { } public void handleConnectionState(ConnectionState state) { + String localNodePath = getMasterPath(); switch (state) { case CONNECTED: logger.debug("registry connection state is {}", state); @@ -550,7 +274,7 @@ public void deregister() { /** * get master path */ - public String getMasterPath() { + private String getMasterPath() { String address = getLocalAddress(); return REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + address; } @@ -558,7 +282,7 @@ public String getMasterPath() { /** * get local address */ - public String getLocalAddress() { + private String getLocalAddress() { return NetUtils.getAddr(masterConfig.getListenPort()); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java index 144baf2dab0d..31977c4cd4fa 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java @@ -18,18 +18,10 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; -import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.registry.RegistryClient; - -import org.apache.commons.collections4.CollectionUtils; - -import java.util.Iterator; -import java.util.List; +import org.apache.dolphinscheduler.server.master.service.FailoverService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,20 +33,14 @@ public class FailoverExecuteThread extends Thread { private static final Logger logger = LoggerFactory.getLogger(FailoverExecuteThread.class); - @Autowired - private MasterRegistryClient masterRegistryClient; - - @Autowired - private RegistryClient registryClient; - @Autowired private MasterConfig masterConfig; /** - * process service + * failover service */ @Autowired - private ProcessService processService; + private FailoverService failoverService; @Override public synchronized void start() { @@ -67,23 +53,7 @@ public void run() { while (Stopper.isRunning()) { logger.info("failover execute started"); try { - List hosts = getNeedFailoverMasterServers(); - if (CollectionUtils.isEmpty(hosts)) { - continue; - } - logger.info("need failover hosts:{}", hosts); - - for (String host : hosts) { - String failoverPath = masterRegistryClient.getFailoverLockPath(NodeType.MASTER, host); - try { - registryClient.getLock(failoverPath); - masterRegistryClient.failoverMaster(host); - } catch (Exception e) { - logger.error("{} server failover failed, host:{}", NodeType.MASTER, host, e); - } finally { - registryClient.releaseLock(failoverPath); - } - } + failoverService.checkMasterFailover(); } catch (Exception e) { logger.error("failover execute error", e); } finally { @@ -91,20 +61,4 @@ public void run() { } } } - - private List getNeedFailoverMasterServers() { - // failover myself && failover dead masters - List hosts = processService.queryNeedFailoverProcessInstanceHost(); - - Iterator iterator = hosts.iterator(); - while (iterator.hasNext()) { - String host = iterator.next(); - if (registryClient.checkNodeExists(host, NodeType.MASTER)) { - if (!host.equals(masterRegistryClient.getLocalAddress())) { - iterator.remove(); - } - } - } - return hosts; - } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java new file mode 100644 index 000000000000..4f25c78dadcb --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.service; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.common.enums.StateEvent; +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; +import org.apache.dolphinscheduler.server.utils.ProcessUtils; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.registry.RegistryClient; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang.StringUtils; + +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * failover service + */ +@Component +public class FailoverService { + private static final Logger LOGGER = LoggerFactory.getLogger(FailoverService.class); + private final RegistryClient registryClient; + private final MasterConfig masterConfig; + private final ProcessService processService; + private final WorkflowExecuteThreadPool workflowExecuteThreadPool; + + public FailoverService(RegistryClient registryClient, MasterConfig masterConfig, ProcessService processService, + WorkflowExecuteThreadPool workflowExecuteThreadPool) { + this.registryClient = registryClient; + this.masterConfig = masterConfig; + this.processService = processService; + this.workflowExecuteThreadPool = workflowExecuteThreadPool; + } + + /** + * check master failover + */ + public void checkMasterFailover() { + List hosts = getNeedFailoverMasterServers(); + if (CollectionUtils.isEmpty(hosts)) { + return; + } + LOGGER.info("need failover hosts:{}", hosts); + + for (String host : hosts) { + failoverMasterWithLock(host); + } + } + + /** + * failover server when server down + * + * @param serverHost server host + * @param nodeType node type + */ + public void failoverServerWhenDown(String serverHost, NodeType nodeType) { + switch (nodeType) { + case MASTER: + failoverMasterWithLock(serverHost); + break; + case WORKER: + failoverWorker(serverHost); + break; + default: + break; + } + } + + private void failoverMasterWithLock(String masterHost) { + String failoverPath = getFailoverLockPath(NodeType.MASTER, masterHost); + try { + registryClient.getLock(failoverPath); + this.failoverMaster(masterHost); + } catch (Exception e) { + LOGGER.error("{} server failover failed, host:{}", NodeType.MASTER, masterHost, e); + } finally { + registryClient.releaseLock(failoverPath); + } + } + + /** + * failover master + *

+ * failover process instance and associated task instance + * + * @param masterHost master host + */ + private void failoverMaster(String masterHost) { + if (StringUtils.isEmpty(masterHost)) { + return; + } + Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost); + long startTime = System.currentTimeMillis(); + List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); + LOGGER.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size()); + List workerServers = registryClient.getServerList(NodeType.WORKER); + for (ProcessInstance processInstance : needFailoverProcessInstanceList) { + if (Constants.NULL.equals(processInstance.getHost())) { + continue; + } + + List validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); + for (TaskInstance taskInstance : validTaskInstanceList) { + LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId()); + failoverTaskInstance(processInstance, taskInstance, workerServers); + } + + if (serverStartupTime != null && processInstance.getRestartTime() != null + && processInstance.getRestartTime().after(serverStartupTime)) { + continue; + } + + LOGGER.info("failover process instance id: {}", processInstance.getId()); + //updateProcessInstance host is null and insert into command + processInstance.setHost(Constants.NULL); + processService.processNeedFailoverProcessInstances(processInstance); + } + + LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime); + } + + /** + * failover worker tasks + *

+ * 1. kill yarn job if there are yarn jobs in tasks. + * 2. change task state from running to need failover. + * 3. failover all tasks when workerHost is null + * + * @param workerHost worker host + */ + private void failoverWorker(String workerHost) { + if (StringUtils.isEmpty(workerHost)) { + return; + } + + long startTime = System.currentTimeMillis(); + List needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); + Map processInstanceCacheMap = new HashMap<>(); + LOGGER.info("start worker[{}] failover, task list size:{}", workerHost, needFailoverTaskInstanceList.size()); + List workerServers = registryClient.getServerList(NodeType.WORKER); + for (TaskInstance taskInstance : needFailoverTaskInstanceList) { + ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId()); + if (processInstance == null) { + processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); + if (processInstance == null) { + LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null", + taskInstance.getProcessInstanceId(), taskInstance.getId()); + continue; + } + processInstanceCacheMap.put(processInstance.getId(), processInstance); + } + + // only failover the task owned myself if worker down. + if (!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) { + continue; + } + + LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId()); + failoverTaskInstance(processInstance, taskInstance, workerServers); + } + LOGGER.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime); + } + + /** + * failover task instance + *

+ * 1. kill yarn job if there are yarn jobs in tasks. + * 2. change task state from running to need failover. + * 3. try to notify local master + */ + private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance, List workerServers) { + if (processInstance == null) { + LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null", + taskInstance.getProcessInstanceId(), taskInstance.getId()); + return; + } + if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) { + return; + } + + taskInstance.setProcessInstance(processInstance); + TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() + .buildTaskInstanceRelatedInfo(taskInstance) + .buildProcessInstanceRelatedInfo(processInstance) + .create(); + + if (masterConfig.isKillYarnJobWhenTaskFailover()) { + // only kill yarn job if exists , the local thread has exited + ProcessUtils.killYarnJob(taskExecutionContext); + } + + taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); + processService.saveTaskInstance(taskInstance); + + StateEvent stateEvent = new StateEvent(); + stateEvent.setTaskInstanceId(taskInstance.getId()); + stateEvent.setType(StateEventType.TASK_STATE_CHANGE); + stateEvent.setProcessInstanceId(processInstance.getId()); + stateEvent.setExecutionStatus(taskInstance.getState()); + workflowExecuteThreadPool.submitStateEvent(stateEvent); + } + + /** + * get need failover master servers + * + * @return need failover master servers + */ + private List getNeedFailoverMasterServers() { + // failover myself && failover dead masters + List hosts = processService.queryNeedFailoverProcessInstanceHost(); + + Iterator iterator = hosts.iterator(); + while (iterator.hasNext()) { + String host = iterator.next(); + if (registryClient.checkNodeExists(host, NodeType.MASTER)) { + if (!host.equals(getLocalAddress())) { + iterator.remove(); + } + } + } + return hosts; + } + + /** + * task needs failover if task start before worker starts + * + * @param workerServers worker servers + * @param taskInstance task instance + * @return true if task instance need fail over + */ + private boolean checkTaskInstanceNeedFailover(List workerServers, TaskInstance taskInstance) { + + boolean taskNeedFailover = true; + + if (taskInstance == null) { + LOGGER.error("failover task instance error, taskInstance is null"); + return false; + } + + if (Constants.NULL.equals(taskInstance.getHost())) { + return false; + } + + if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) { + return false; + } + + + //now no host will execute this task instance,so no need to failover the task + if (taskInstance.getHost() == null) { + return false; + } + + //if task start after worker starts, there is no need to failover the task. + if (checkTaskAfterWorkerStart(workerServers, taskInstance)) { + taskNeedFailover = false; + } + + return taskNeedFailover; + } + + /** + * check task start after the worker server starts. + * + * @param taskInstance task instance + * @return true if task instance start time after worker server start date + */ + private boolean checkTaskAfterWorkerStart(List workerServers, TaskInstance taskInstance) { + if (StringUtils.isEmpty(taskInstance.getHost())) { + return false; + } + Date workerServerStartDate = getServerStartupTime(workerServers, taskInstance.getHost()); + if (workerServerStartDate != null) { + if (taskInstance.getStartTime() == null) { + return taskInstance.getSubmitTime().after(workerServerStartDate); + } else { + return taskInstance.getStartTime().after(workerServerStartDate); + } + } + return false; + } + + /** + * get failover lock path + * + * @param nodeType zookeeper node type + * @return fail over lock path + */ + private String getFailoverLockPath(NodeType nodeType, String host) { + switch (nodeType) { + case MASTER: + return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host; + case WORKER: + return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host; + default: + return ""; + } + } + + /** + * get server startup time + */ + private Date getServerStartupTime(NodeType nodeType, String host) { + if (StringUtils.isEmpty(host)) { + return null; + } + List servers = registryClient.getServerList(nodeType); + return getServerStartupTime(servers, host); + } + + /** + * get server startup time + */ + private Date getServerStartupTime(List servers, String host) { + if (CollectionUtils.isEmpty(servers)) { + return null; + } + Date serverStartupTime = null; + for (Server server : servers) { + if (host.equals(server.getHost() + Constants.COLON + server.getPort())) { + serverStartupTime = server.getCreateTime(); + break; + } + } + return serverStartupTime; + } + + /** + * get local address + */ + String getLocalAddress() { + return NetUtils.getAddr(masterConfig.getListenPort()); + } + +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java new file mode 100644 index 000000000000..17a379809070 --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.service; + +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.doNothing; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.common.enums.StateEvent; +import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.registry.RegistryClient; + +import java.util.Arrays; +import java.util.Date; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.springframework.test.util.ReflectionTestUtils; + +import com.google.common.collect.Lists; + +/** + * MasterRegistryClientTest + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({RegistryClient.class}) +@PowerMockIgnore({"javax.management.*"}) +public class FailoverServiceTest { + @InjectMocks + private FailoverService failoverService; + + @Mock + private MasterConfig masterConfig; + + @Mock + private RegistryClient registryClient; + + @Mock + private ProcessService processService; + + @Mock + private WorkflowExecuteThreadPool workflowExecuteThreadPool; + + private String testHost; + private ProcessInstance processInstance; + private TaskInstance taskInstance; + + @Before + public void before() throws Exception { + given(masterConfig.getListenPort()).willReturn(8080); + + testHost = failoverService.getLocalAddress(); + String ip = testHost.split(":")[0]; + int port = Integer.valueOf(testHost.split(":")[1]); + Assert.assertEquals(8080, port); + + given(registryClient.getLock(Mockito.anyString())).willReturn(true); + given(registryClient.releaseLock(Mockito.anyString())).willReturn(true); + given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn(testHost); + given(registryClient.getStoppable()).willReturn(cause -> { + }); + given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any())).willReturn(true); + doNothing().when(registryClient).handleDeadServer(Mockito.anySet(), Mockito.any(NodeType.class), Mockito.anyString()); + + processInstance = new ProcessInstance(); + processInstance.setId(1); + processInstance.setHost(testHost); + processInstance.setRestartTime(new Date()); + processInstance.setHistoryCmd("xxx"); + processInstance.setCommandType(CommandType.STOP); + + taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setStartTime(new Date()); + taskInstance.setHost(testHost); + + given(processService.queryNeedFailoverTaskInstances(Mockito.anyString())).willReturn(Arrays.asList(taskInstance)); + given(processService.queryNeedFailoverProcessInstanceHost()).willReturn(Lists.newArrayList(testHost)); + given(processService.queryNeedFailoverProcessInstances(Mockito.anyString())).willReturn(Arrays.asList(processInstance)); + doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class)); + given(processService.findValidTaskListByProcessId(Mockito.anyInt())).willReturn(Lists.newArrayList(taskInstance)); + given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance); + + Thread.sleep(1000); + Server server = new Server(); + server.setHost(ip); + server.setPort(port); + server.setCreateTime(new Date()); + given(registryClient.getServerList(NodeType.WORKER)).willReturn(Arrays.asList(server)); + given(registryClient.getServerList(NodeType.MASTER)).willReturn(Arrays.asList(server)); + ReflectionTestUtils.setField(failoverService, "registryClient", registryClient); + + doNothing().when(workflowExecuteThreadPool).submitStateEvent(Mockito.any(StateEvent.class)); + } + + @Test + public void checkMasterFailoverTest() { + failoverService.checkMasterFailover(); + } + + @Test + public void failoverMasterTest() { + processInstance.setHost(Constants.NULL); + taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + failoverService.failoverServerWhenDown(testHost, NodeType.MASTER); + Assert.assertNotEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); + + processInstance.setHost(testHost); + taskInstance.setState(ExecutionStatus.SUCCESS); + failoverService.failoverServerWhenDown(testHost, NodeType.MASTER); + Assert.assertNotEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); + Assert.assertEquals(Constants.NULL, processInstance.getHost()); + + processInstance.setHost(testHost); + taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + failoverService.failoverServerWhenDown(testHost, NodeType.MASTER); + Assert.assertEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); + Assert.assertEquals(Constants.NULL, processInstance.getHost()); + } + + @Test + public void failoverWorkTest() { + failoverService.failoverServerWhenDown(testHost, NodeType.WORKER); + Assert.assertEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); + } +}