Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][Registry] Optimizing waiting strategy #15223

Merged
merged 13 commits into from
Jan 2, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
import org.apache.dolphinscheduler.server.master.rpc.MasterRpcServer;
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;

import java.time.Duration;
Expand All @@ -51,8 +50,6 @@ public class MasterWaitingStrategy implements MasterConnectStrategy {
@Autowired
private RegistryClient registryClient;
@Autowired
private MasterRpcServer masterRPCServer;
@Autowired
private WorkflowEventQueue workflowEventQueue;
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
Expand Down Expand Up @@ -97,7 +94,6 @@ public void reconnect() {
} else {
try {
ServerLifeCycleManager.recoverFromWaiting();
reStartMasterResource();
log.info("Recover from waiting success, the current server status is {}",
ServerLifeCycleManager.getServerStatus());
} catch (Exception e) {
Expand All @@ -117,9 +113,6 @@ public StrategyType getStrategyType() {
}

private void clearMasterResource() {
// close the worker resource, if close failed should stop the worker server
masterRPCServer.close();
log.warn("Master closed RPC server due to lost registry connection");
workflowEventQueue.clearWorkflowEventQueue();
log.warn("Master clear workflow event queue due to lost registry connection");
processInstanceExecCacheManager.clearCache();
Expand All @@ -129,9 +122,4 @@ private void clearMasterResource() {

}

private void reStartMasterResource() {
// reopen the resource, if reopen failed should stop the worker server
masterRPCServer.start();
log.warn("Master restarted RPC server due to reconnect to registry");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.dolphinscheduler.registry.api.StrategyType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
import org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueue;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;

Expand All @@ -47,9 +46,6 @@ public class WorkerWaitingStrategy implements WorkerConnectStrategy {
@Autowired
private RegistryClient registryClient;

@Autowired
private WorkerRpcServer workerRpcServer;

@Autowired
private MessageRetryRunner messageRetryRunner;

Expand Down Expand Up @@ -97,7 +93,6 @@ public void reconnect() {
} else {
try {
ServerLifeCycleManager.recoverFromWaiting();
reStartWorkerResource();
log.info("Recover from waiting success, the current server status is {}",
ServerLifeCycleManager.getServerStatus());
} catch (Exception e) {
Expand All @@ -117,20 +112,11 @@ public StrategyType getStrategyType() {
}

private void clearWorkerResource() {
// close the worker resource, if close failed should stop the worker server
workerRpcServer.close();
log.warn("Worker server close the RPC server due to lost connection from registry");
workerManagerThread.clearTask();
globalTaskInstanceWaitingQueue.clearTask();
log.warn("Worker server clear the tasks due to lost connection from registry");
messageRetryRunner.clearMessage();
log.warn("Worker server clear the retry message due to lost connection from registry");

}

private void reStartWorkerResource() {
// reopen the resource, if reopen failed should stop the worker server
workerRpcServer.start();
log.warn("Worker server restart PRC server due to reconnect to registry");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.server.worker.runner.operator;

import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
Expand Down Expand Up @@ -55,6 +56,14 @@ public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest taskInst

LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());

// check server status, if server is not running, return failed to reject this task
if (!ServerLifeCycleManager.isRunning()) {
log.error("server is not running. reject task: {}", taskExecutionContext.getProcessInstanceId());
return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
"server is not running");
}

TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());

if (!globalTaskInstanceWaitingQueue.addDispatchTask(taskExecutionContext)) {
Expand Down
Loading