Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RPC] Refactor the remote module. #14776

Merged
merged 2 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion .github/actions/labeler/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ backend:
- 'dolphinscheduler-dist/**/*'
- 'dolphinscheduler-master/**/*'
- 'dolphinscheduler-registry/**/*'
- 'dolphinscheduler-remote/**/*'
- 'dolphinscheduler-extract/**/*'
- 'dolphinscheduler-scheduler-plugin/**/*'
- 'dolphinscheduler-service/**/*'
- 'dolphinscheduler-spi/**/*'
Expand Down
10 changes: 6 additions & 4 deletions dolphinscheduler-alert/dolphinscheduler-alert-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@
<name>${project.artifactId}</name>
<dependencies>
<!-- dolphinscheduler -->
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-remote</artifactId>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-meter</artifactId>
Expand All @@ -51,6 +48,11 @@
<artifactId>dolphinscheduler-registry-all</artifactId>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-alert</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,49 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.alert.rpc;

import org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
import org.apache.dolphinscheduler.remote.command.Message;
import org.apache.dolphinscheduler.remote.command.MessageType;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequest;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponse;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.apache.dolphinscheduler.extract.alert.IAlertOperator;
import org.apache.dolphinscheduler.extract.alert.request.AlertSendRequest;
import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

import io.netty.channel.Channel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Component
@Slf4j
public final class AlertRequestProcessor implements NettyRequestProcessor {
@Service
public class AlertOperatorImpl implements IAlertOperator {

private final AlertBootstrapService alertBootstrapService;

public AlertRequestProcessor(AlertBootstrapService alertBootstrapService) {
this.alertBootstrapService = alertBootstrapService;
}
@Autowired
private AlertBootstrapService alertBootstrapService;

@Override
public void process(Channel channel, Message message) {
AlertSendRequest alertSendRequest = JsonSerializer.deserialize(message.getBody(), AlertSendRequest.class);

log.info("Received command : {}", alertSendRequest);

public AlertSendResponse sendAlert(AlertSendRequest alertSendRequest) {
log.info("Received AlertSendRequest : {}", alertSendRequest);
AlertSendResponse alertSendResponse = alertBootstrapService.syncHandler(
alertSendRequest.getGroupId(),
alertSendRequest.getTitle(),
alertSendRequest.getContent(),
alertSendRequest.getWarnType());
channel.writeAndFlush(alertSendResponse.convert2Command(message.getOpaque()));
}

@Override
public MessageType getCommandType() {
return MessageType.ALERT_SEND_REQUEST;
log.info("Handle AlertSendRequest finish: {}", alertSendResponse);
return alertSendResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,32 @@
package org.apache.dolphinscheduler.alert.rpc;

import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.factory.NettyRemotingServerFactory;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;

import java.util.List;
import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class AlertRpcServer implements AutoCloseable {

@Autowired
private List<NettyRequestProcessor> nettyRequestProcessors;
@Autowired
private AlertConfig alertConfig;
public class AlertRpcServer extends SpringServerMethodInvokerDiscovery implements AutoCloseable {

private NettyRemotingServer nettyRemotingServer;
public AlertRpcServer(AlertConfig alertConfig) {
super(new NettyRemotingServer(new NettyServerConfig(alertConfig.getPort())));
}

public void start() {
log.info("Starting alert rpc server...");
nettyRemotingServer = NettyRemotingServerFactory.buildNettyRemotingServer(alertConfig.getPort());
for (NettyRequestProcessor nettyRequestProcessor : nettyRequestProcessors) {
nettyRemotingServer.registerProcessor(nettyRequestProcessor);
log.info("Success register netty processor: {}", nettyRequestProcessor.getClass().getName());
}
log.info("Starting AlertRpcServer...");
nettyRemotingServer.start();
log.info("Started alert rpc server...");
log.info("Started AlertRpcServer...");
}

@Override
public void close() throws Exception {
log.info("Closing alert rpc server...");
public void close() {
log.info("Closing AlertRpcServer...");
nettyRemotingServer.close();
log.info("Closed alert rpc server...");
log.info("Closed AlertRpcServer...");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.AlertSendStatus;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponse;
import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponse;
import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse;

import java.util.ArrayList;
import java.util.List;
Expand Down
11 changes: 11 additions & 0 deletions dolphinscheduler-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- dolphinscheduler -->
<dependency>
Expand Down Expand Up @@ -82,6 +83,16 @@
<artifactId>dolphinscheduler-storage-all</artifactId>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-master</artifactId>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-worker</artifactId>
</dependency>

<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
package org.apache.dolphinscheduler.api.aspect;

import org.apache.dolphinscheduler.common.enums.CacheType;
import org.apache.dolphinscheduler.remote.command.cache.CacheExpireRequest;
import org.apache.dolphinscheduler.service.cache.CacheNotifyService;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.IMasterCacheService;
import org.apache.dolphinscheduler.extract.master.transportor.CacheExpireRequest;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.service.cache.impl.CacheKeyGenerator;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.lang.reflect.Method;
Expand Down Expand Up @@ -65,7 +70,7 @@ public class CacheEvictAspect {
private CacheKeyGenerator cacheKeyGenerator;

@Autowired
private CacheNotifyService cacheNotifyService;
private RegistryClient registryClient;

@Pointcut("@annotation(org.springframework.cache.annotation.CacheEvict)")
public void cacheEvictPointCut() {
Expand Down Expand Up @@ -96,7 +101,7 @@ public Object doAround(ProceedingJoinPoint proceedingJoinPoint) throws Throwable
}
}
if (StringUtils.isNotEmpty(cacheKey)) {
cacheNotifyService.notifyMaster(new CacheExpireRequest(cacheType, cacheKey).convert2Command());
notifyMaster(cacheType, cacheKey);
}
}

Expand Down Expand Up @@ -134,4 +139,21 @@ private String parseKey(String key, List<Object> paramList) {
}
return obj.toString();
}

private void notifyMaster(CacheType cacheType, String cacheKey) {
try {
List<Server> serverList = registryClient.getServerList(RegistryNodeType.MASTER);
if (CollectionUtils.isEmpty(serverList)) {
return;
}
for (Server server : serverList) {
IMasterCacheService masterCacheService = SingletonJdkDynamicRpcClientProxyFactory.getInstance()
.getProxyClient(server.getHost() + ":" + server.getPort(), IMasterCacheService.class);
masterCacheService.cacheExpire(new CacheExpireRequest(cacheType, cacheKey));
}
} catch (Exception e) {
log.error("notify master error", e);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecuteDto;

import org.apache.commons.lang3.StringUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,20 @@
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.remote.command.workflow.WorkflowStateEventChangeRequest;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;

public class PauseExecuteFunction implements ExecuteFunction<PauseExecuteRequest, PauseExecuteResult> {

private final ProcessInstanceDao processInstanceDao;

private final ApiRpcClient apiRpcClient;

public PauseExecuteFunction(ProcessInstanceDao processInstanceDao, ApiRpcClient apiRpcClient) {
public PauseExecuteFunction(ProcessInstanceDao processInstanceDao) {
this.processInstanceDao = processInstanceDao;
this.apiRpcClient = apiRpcClient;
}

@Override
Expand All @@ -59,15 +55,18 @@ public PauseExecuteResult execute(PauseExecuteRequest request) throws ExecuteRun
"The workflow instance: %s pause failed, due to update the workflow instance status in DB failed",
workflowInstance.getName()));
}
WorkflowStateEventChangeRequest workflowStateEventChangeRequest = new WorkflowStateEventChangeRequest(
workflowInstance.getId(), 0, workflowInstance.getState(), workflowInstance.getId(), 0);
try {
apiRpcClient.send(Host.of(workflowInstance.getHost()), workflowStateEventChangeRequest.convert2Command());
} catch (RemotingException e) {
// todo: direct call the workflow instance pause method
ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
.getProxyClient(workflowInstance.getHost(), ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onWorkflowInstanceInstanceStateChange(
new WorkflowInstanceStateChangeEvent(
workflowInstance.getId(), 0, workflowInstance.getState(), workflowInstance.getId(), 0));
} catch (Exception e) {
throw new ExecuteRuntimeException(
String.format(
"The workflow instance: %s pause failed, due to send rpc request to master: %s failed",
workflowInstance.getName(), workflowInstance.getHost()),
"WorkflowInstance: %s pause failed", workflowInstance.getName()),
e);
}
return new PauseExecuteResult(workflowInstance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.dolphinscheduler.api.executor.ExecuteContext;
import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder;
import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;

import java.util.concurrent.CompletableFuture;
Expand All @@ -36,12 +35,10 @@ public class PauseExecuteFunctionBuilder implements ExecuteFunctionBuilder<Pause

@Autowired
private ProcessInstanceDao processInstanceDao;
@Autowired
private ApiRpcClient apiRpcClient;

@Override
public CompletableFuture<ExecuteFunction<PauseExecuteRequest, PauseExecuteResult>> createWorkflowInstanceExecuteFunction(ExecuteContext executeContext) {
return CompletableFuture.completedFuture(new PauseExecuteFunction(processInstanceDao, apiRpcClient));
return CompletableFuture.completedFuture(new PauseExecuteFunction(processInstanceDao));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,23 @@
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.remote.command.workflow.WorkflowStateEventChangeRequest;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class StopExecuteFunction implements ExecuteFunction<StopRequest, StopResult> {

private final ProcessInstanceDao processInstanceDao;
// todo: Use ApiRpcClient instead of NettyRemotingClient
private final ApiRpcClient apiRpcClient;

public StopExecuteFunction(ProcessInstanceDao processInstanceDao, ApiRpcClient apiRpcClient) {
public StopExecuteFunction(ProcessInstanceDao processInstanceDao) {
this.processInstanceDao = processInstanceDao;
this.apiRpcClient = apiRpcClient;
}

@Override
Expand All @@ -60,17 +56,17 @@ public StopResult execute(StopRequest request) throws ExecuteRuntimeException {
if (processInstanceDao.updateById(workflowInstance)) {
log.info("Workflow instance {} ready to stop success, will call master to stop the workflow instance",
workflowInstance.getName());
// todo: Use specific stop command instead of WorkflowStateEventChangeCommand
WorkflowStateEventChangeRequest workflowStateEventChangeRequest = new WorkflowStateEventChangeRequest(
workflowInstance.getId(), 0, workflowInstance.getState(), workflowInstance.getId(), 0);
try {
apiRpcClient.send(Host.of(workflowInstance.getHost()),
workflowStateEventChangeRequest.convert2Command());
} catch (RemotingException e) {
// todo: direct call the workflow instance stop method
ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
.getProxyClient(workflowInstance.getHost(), ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onWorkflowInstanceInstanceStateChange(
new WorkflowInstanceStateChangeEvent(
workflowInstance.getId(), 0, workflowInstance.getState(), workflowInstance.getId(), 0));
} catch (Exception e) {
throw new ExecuteRuntimeException(
String.format("Workflow instance: %s stop failed, due to send request to master: %s failed",
workflowInstance.getName(), workflowInstance.getHost()),
e);
String.format("WorkflowInstance: %s stop failed", workflowInstance.getName()), e);
}
// todo: use async and inject the completeFuture in the result.
return new StopResult(workflowInstance);
Expand Down
Loading
Loading