Skip to content

Commit

Permalink
Add dolphinscheduler-extract-base
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Aug 21, 2023
1 parent 70731a1 commit a3bcb3d
Show file tree
Hide file tree
Showing 374 changed files with 5,937 additions and 10,361 deletions.
2 changes: 1 addition & 1 deletion .github/actions/labeler/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ backend:
- 'dolphinscheduler-dist/**/*'
- 'dolphinscheduler-master/**/*'
- 'dolphinscheduler-registry/**/*'
- 'dolphinscheduler-remote/**/*'
- 'dolphinscheduler-extract/**/*'
- 'dolphinscheduler-scheduler-plugin/**/*'
- 'dolphinscheduler-service/**/*'
- 'dolphinscheduler-spi/**/*'
Expand Down
10 changes: 6 additions & 4 deletions dolphinscheduler-alert/dolphinscheduler-alert-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@
<name>${project.artifactId}</name>
<dependencies>
<!-- dolphinscheduler -->
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-remote</artifactId>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-meter</artifactId>
Expand All @@ -51,6 +48,11 @@
<artifactId>dolphinscheduler-registry-all</artifactId>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-alert</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,49 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.alert.rpc;

import org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
import org.apache.dolphinscheduler.remote.command.Message;
import org.apache.dolphinscheduler.remote.command.MessageType;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequest;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponse;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.apache.dolphinscheduler.extract.alert.IAlertOperator;
import org.apache.dolphinscheduler.extract.alert.request.AlertSendRequest;
import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

import io.netty.channel.Channel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Component
@Slf4j
public final class AlertRequestProcessor implements NettyRequestProcessor {
@Service
public class AlertOperatorImpl implements IAlertOperator {

private final AlertBootstrapService alertBootstrapService;

public AlertRequestProcessor(AlertBootstrapService alertBootstrapService) {
this.alertBootstrapService = alertBootstrapService;
}
@Autowired
private AlertBootstrapService alertBootstrapService;

@Override
public void process(Channel channel, Message message) {
AlertSendRequest alertSendRequest = JsonSerializer.deserialize(message.getBody(), AlertSendRequest.class);

log.info("Received command : {}", alertSendRequest);

public AlertSendResponse sendAlert(AlertSendRequest alertSendRequest) {
log.info("Received AlertSendRequest : {}", alertSendRequest);
AlertSendResponse alertSendResponse = alertBootstrapService.syncHandler(
alertSendRequest.getGroupId(),
alertSendRequest.getTitle(),
alertSendRequest.getContent(),
alertSendRequest.getWarnType());
channel.writeAndFlush(alertSendResponse.convert2Command(message.getOpaque()));
}

@Override
public MessageType getCommandType() {
return MessageType.ALERT_SEND_REQUEST;
log.info("Handle AlertSendRequest finish: {}", alertSendResponse);
return alertSendResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,32 @@
package org.apache.dolphinscheduler.alert.rpc;

import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.factory.NettyRemotingServerFactory;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;

import java.util.List;
import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class AlertRpcServer implements AutoCloseable {

@Autowired
private List<NettyRequestProcessor> nettyRequestProcessors;
@Autowired
private AlertConfig alertConfig;
public class AlertRpcServer extends SpringServerMethodInvokerDiscovery implements AutoCloseable {

private NettyRemotingServer nettyRemotingServer;
public AlertRpcServer(AlertConfig alertConfig) {
super(new NettyRemotingServer(new NettyServerConfig(alertConfig.getPort())));
}

public void start() {
log.info("Starting alert rpc server...");
nettyRemotingServer = NettyRemotingServerFactory.buildNettyRemotingServer(alertConfig.getPort());
for (NettyRequestProcessor nettyRequestProcessor : nettyRequestProcessors) {
nettyRemotingServer.registerProcessor(nettyRequestProcessor);
log.info("Success register netty processor: {}", nettyRequestProcessor.getClass().getName());
}
log.info("Starting AlertRpcServer...");
nettyRemotingServer.start();
log.info("Started alert rpc server...");
log.info("Started AlertRpcServer...");
}

@Override
public void close() throws Exception {
log.info("Closing alert rpc server...");
public void close() {
log.info("Closing AlertRpcServer...");
nettyRemotingServer.close();
log.info("Closed alert rpc server...");
log.info("Closed AlertRpcServer...");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.AlertSendStatus;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponse;
import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponse;
import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse;

import java.util.ArrayList;
import java.util.List;
Expand Down
11 changes: 11 additions & 0 deletions dolphinscheduler-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- dolphinscheduler -->
<dependency>
Expand Down Expand Up @@ -82,6 +83,16 @@
<artifactId>dolphinscheduler-storage-all</artifactId>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-master</artifactId>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-worker</artifactId>
</dependency>

<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
package org.apache.dolphinscheduler.api.aspect;

import org.apache.dolphinscheduler.common.enums.CacheType;
import org.apache.dolphinscheduler.remote.command.cache.CacheExpireRequest;
import org.apache.dolphinscheduler.service.cache.CacheNotifyService;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.IMasterCacheService;
import org.apache.dolphinscheduler.extract.master.transportor.CacheExpireRequest;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.service.cache.impl.CacheKeyGenerator;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.lang.reflect.Method;
Expand Down Expand Up @@ -65,7 +70,7 @@ public class CacheEvictAspect {
private CacheKeyGenerator cacheKeyGenerator;

@Autowired
private CacheNotifyService cacheNotifyService;
private RegistryClient registryClient;

@Pointcut("@annotation(org.springframework.cache.annotation.CacheEvict)")
public void cacheEvictPointCut() {
Expand Down Expand Up @@ -96,7 +101,7 @@ public Object doAround(ProceedingJoinPoint proceedingJoinPoint) throws Throwable
}
}
if (StringUtils.isNotEmpty(cacheKey)) {
cacheNotifyService.notifyMaster(new CacheExpireRequest(cacheType, cacheKey).convert2Command());
notifyMaster(cacheType, cacheKey);
}
}

Expand Down Expand Up @@ -134,4 +139,21 @@ private String parseKey(String key, List<Object> paramList) {
}
return obj.toString();
}

private void notifyMaster(CacheType cacheType, String cacheKey) {
try {
List<Server> serverList = registryClient.getServerList(RegistryNodeType.MASTER);
if (CollectionUtils.isEmpty(serverList)) {
return;
}
for (Server server : serverList) {
IMasterCacheService masterCacheService = SingletonJdkDynamicRpcClientProxyFactory.getInstance()
.getProxyClient(server.getHost() + ":" + server.getPort(), IMasterCacheService.class);
masterCacheService.cacheExpire(new CacheExpireRequest(cacheType, cacheKey));
}
} catch (Exception e) {
log.error("notify master error", e);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecuteDto;

import org.apache.commons.lang3.StringUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,20 @@
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.remote.command.workflow.WorkflowStateEventChangeRequest;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;

public class PauseExecuteFunction implements ExecuteFunction<PauseExecuteRequest, PauseExecuteResult> {

private final ProcessInstanceDao processInstanceDao;

private final ApiRpcClient apiRpcClient;

public PauseExecuteFunction(ProcessInstanceDao processInstanceDao, ApiRpcClient apiRpcClient) {
public PauseExecuteFunction(ProcessInstanceDao processInstanceDao) {
this.processInstanceDao = processInstanceDao;
this.apiRpcClient = apiRpcClient;
}

@Override
Expand All @@ -59,15 +55,18 @@ public PauseExecuteResult execute(PauseExecuteRequest request) throws ExecuteRun
"The workflow instance: %s pause failed, due to update the workflow instance status in DB failed",
workflowInstance.getName()));
}
WorkflowStateEventChangeRequest workflowStateEventChangeRequest = new WorkflowStateEventChangeRequest(
workflowInstance.getId(), 0, workflowInstance.getState(), workflowInstance.getId(), 0);
try {
apiRpcClient.send(Host.of(workflowInstance.getHost()), workflowStateEventChangeRequest.convert2Command());
} catch (RemotingException e) {
// todo: direct call the workflow instance pause method
ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
.getProxyClient(workflowInstance.getHost(), ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onWorkflowInstanceInstanceStateChange(
new WorkflowInstanceStateChangeEvent(
workflowInstance.getId(), 0, workflowInstance.getState(), workflowInstance.getId(), 0));
} catch (Exception e) {
throw new ExecuteRuntimeException(
String.format(
"The workflow instance: %s pause failed, due to send rpc request to master: %s failed",
workflowInstance.getName(), workflowInstance.getHost()),
"WorkflowInstance: %s pause failed", workflowInstance.getName()),
e);
}
return new PauseExecuteResult(workflowInstance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.dolphinscheduler.api.executor.ExecuteContext;
import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder;
import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;

import java.util.concurrent.CompletableFuture;
Expand All @@ -36,12 +35,10 @@ public class PauseExecuteFunctionBuilder implements ExecuteFunctionBuilder<Pause

@Autowired
private ProcessInstanceDao processInstanceDao;
@Autowired
private ApiRpcClient apiRpcClient;

@Override
public CompletableFuture<ExecuteFunction<PauseExecuteRequest, PauseExecuteResult>> createWorkflowInstanceExecuteFunction(ExecuteContext executeContext) {
return CompletableFuture.completedFuture(new PauseExecuteFunction(processInstanceDao, apiRpcClient));
return CompletableFuture.completedFuture(new PauseExecuteFunction(processInstanceDao));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,23 @@
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.remote.command.workflow.WorkflowStateEventChangeRequest;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class StopExecuteFunction implements ExecuteFunction<StopRequest, StopResult> {

private final ProcessInstanceDao processInstanceDao;
// todo: Use ApiRpcClient instead of NettyRemotingClient
private final ApiRpcClient apiRpcClient;

public StopExecuteFunction(ProcessInstanceDao processInstanceDao, ApiRpcClient apiRpcClient) {
public StopExecuteFunction(ProcessInstanceDao processInstanceDao) {
this.processInstanceDao = processInstanceDao;
this.apiRpcClient = apiRpcClient;
}

@Override
Expand All @@ -60,17 +56,17 @@ public StopResult execute(StopRequest request) throws ExecuteRuntimeException {
if (processInstanceDao.updateById(workflowInstance)) {
log.info("Workflow instance {} ready to stop success, will call master to stop the workflow instance",
workflowInstance.getName());
// todo: Use specific stop command instead of WorkflowStateEventChangeCommand
WorkflowStateEventChangeRequest workflowStateEventChangeRequest = new WorkflowStateEventChangeRequest(
workflowInstance.getId(), 0, workflowInstance.getState(), workflowInstance.getId(), 0);
try {
apiRpcClient.send(Host.of(workflowInstance.getHost()),
workflowStateEventChangeRequest.convert2Command());
} catch (RemotingException e) {
// todo: direct call the workflow instance stop method
ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
.getProxyClient(workflowInstance.getHost(), ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onWorkflowInstanceInstanceStateChange(
new WorkflowInstanceStateChangeEvent(
workflowInstance.getId(), 0, workflowInstance.getState(), workflowInstance.getId(), 0));
} catch (Exception e) {
throw new ExecuteRuntimeException(
String.format("Workflow instance: %s stop failed, due to send request to master: %s failed",
workflowInstance.getName(), workflowInstance.getHost()),
e);
String.format("WorkflowInstance: %s stop failed", workflowInstance.getName()), e);
}
// todo: use async and inject the completeFuture in the result.
return new StopResult(workflowInstance);
Expand Down
Loading

0 comments on commit a3bcb3d

Please sign in to comment.