Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix-9221] [alert-server] optimization and gracefully close #9246

Merged
merged 17 commits into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
e81ac3a
[Fix-9221] [alert-server] optimization and gracefully close
Mar 28, 2022
6cbe8a0
Merge branch 'dev' of https://github.com/apache/dolphinscheduler into…
Mar 29, 2022
88852d2
[Fix-9221] [alert-server] remove unused mock data
Mar 29, 2022
61c6197
[Fix-9221] [alert-server] remove unused mock data
Mar 29, 2022
8bf7fa1
Merge branch 'dev' of https://github.com/apache/dolphinscheduler into…
Mar 30, 2022
1a005f3
[Fix-9221] [alert-server] remove unnecessary Mockito stubbings
Mar 30, 2022
19a318f
Merge branch 'dev' of https://github.com/apache/dolphinscheduler into…
Mar 30, 2022
29e2e4b
[Fix-9221] [alert-server] init AlertPluginManager in AlertServer
Mar 30, 2022
e75dba6
[Fix-9221] [alert-server] AlertServerTest add AlertPluginManager inst…
Mar 30, 2022
fcd9e5b
Merge branch 'dev' of https://github.com/apache/dolphinscheduler into…
Mar 31, 2022
5ba7d0a
[Fix-9221] [alert-server] replace @Eventlistener with @PostConstruct
Mar 31, 2022
42fd16b
Merge branch 'dev' of https://github.com/apache/dolphinscheduler into…
Apr 1, 2022
24bcc02
[Fix-9221] [alert-server] sonar check solution
Apr 1, 2022
a616ca9
Merge branch 'dev' of https://github.com/apache/dolphinscheduler into…
Apr 1, 2022
765870a
Merge branch 'dev' of https://github.com/apache/dolphinscheduler into…
Apr 6, 2022
673f139
[Improvement-9221] [alert] update constructor injection and replace I…
Apr 6, 2022
0a9427c
Merge branch 'dev' of https://github.com/apache/dolphinscheduler into…
Apr 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
import java.util.ServiceLoader;
import java.util.Set;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
Expand All @@ -55,23 +53,23 @@ public final class AlertPluginManager {

private final PluginDao pluginDao;

private final Map<Integer, AlertChannel> channelKeyedById = new HashMap<>();

private final PluginParams warningTypeParams = getWarningTypeParams();

public AlertPluginManager(PluginDao pluginDao) {
this.pluginDao = pluginDao;
}

private final Map<Integer, AlertChannel> channelKeyedById = new HashMap<>();

private final PluginParams warningTypeParams = getWarningTypeParams();

public PluginParams getWarningTypeParams() {
return
RadioParam.newBuilder(AlertConstants.NAME_WARNING_TYPE, AlertConstants.WARNING_TYPE)
.addParamsOptions(new ParamsOptions(WarningType.SUCCESS.getDescp(), WarningType.SUCCESS.getDescp(), false))
.addParamsOptions(new ParamsOptions(WarningType.FAILURE.getDescp(), WarningType.FAILURE.getDescp(), false))
.addParamsOptions(new ParamsOptions(WarningType.ALL.getDescp(), WarningType.ALL.getDescp(), false))
.setValue(WarningType.ALL.getDescp())
.addValidate(Validate.newBuilder().setRequired(true).build())
.build();
RadioParam.newBuilder(AlertConstants.NAME_WARNING_TYPE, AlertConstants.WARNING_TYPE)
.addParamsOptions(new ParamsOptions(WarningType.SUCCESS.getDescp(), WarningType.SUCCESS.getDescp(), false))
.addParamsOptions(new ParamsOptions(WarningType.FAILURE.getDescp(), WarningType.FAILURE.getDescp(), false))
.addParamsOptions(new ParamsOptions(WarningType.ALL.getDescp(), WarningType.ALL.getDescp(), false))
.setValue(WarningType.ALL.getDescp())
.addValidate(Validate.newBuilder().setRequired(true).build())
.build();
}

@EventListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
public final class AlertRequestProcessor implements NettyRequestProcessor {
private static final Logger logger = LoggerFactory.getLogger(AlertRequestProcessor.class);

private final AlertSender alertSender;
private final AlertSenderService alertSenderService;

public AlertRequestProcessor(AlertSender alertSender) {
this.alertSender = alertSender;
public AlertRequestProcessor(AlertSenderService alertSenderService) {
this.alertSenderService = alertSenderService;
}

@Override
Expand All @@ -51,7 +51,7 @@ public void process(Channel channel, Command command) {

logger.info("Received command : {}", alertSendRequestCommand);

AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(
AlertSendResponseCommand alertSendResponseCommand = alertSenderService.syncHandler(
alertSendRequestCommand.getGroupId(),
alertSendRequestCommand.getTitle(),
alertSendRequestCommand.getContent(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,26 @@

package org.apache.dolphinscheduler.alert;

import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertConstants;
import org.apache.dolphinscheduler.alert.api.AlertData;
import org.apache.dolphinscheduler.alert.api.AlertInfo;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult;

import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -40,22 +45,39 @@
import java.util.Optional;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public final class AlertSender {
private static final Logger logger = LoggerFactory.getLogger(AlertSender.class);
@Service
public final class AlertSenderService extends Thread {
private static final Logger logger = LoggerFactory.getLogger(AlertSenderService.class);

private final AlertDao alertDao;
private final AlertPluginManager alertPluginManager;

public AlertSender(AlertDao alertDao, AlertPluginManager alertPluginManager) {
public AlertSenderService(AlertDao alertDao, AlertPluginManager alertPluginManager) {
this.alertDao = alertDao;
this.alertPluginManager = alertPluginManager;
}

@Override
public synchronized void start() {
super.setName("AlertSenderService");
super.start();
}

@Override
public void run() {
logger.info("alert sender started");
while (Stopper.isRunning()) {
try {
List<Alert> alerts = alertDao.listPendingAlerts();
this.send(alerts);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L);
} catch (Exception e) {
logger.error("alert sender thread error", e);
}
}
}


public void send(List<Alert> alerts) {
for (Alert alert : alerts) {
//get alert group from alert
Expand All @@ -68,11 +90,11 @@ public void send(List<Alert> alerts) {
}
AlertData alertData = new AlertData();
alertData.setId(alert.getId())
.setContent(alert.getContent())
.setLog(alert.getLog())
.setTitle(alert.getTitle())
.setTitle(alert.getTitle())
.setWarnType(alert.getWarningType().getCode());
.setContent(alert.getContent())
.setLog(alert.getLog())
.setTitle(alert.getTitle())
.setTitle(alert.getTitle())
.setWarnType(alert.getWarningType().getCode());

int sendSuccessCount = 0;
for (AlertPluginInstance instance : alertInstanceList) {
Expand All @@ -93,23 +115,22 @@ public void send(List<Alert> alerts) {
}
alertDao.updateAlert(alertStatus, "", alert.getId());
}

}

/**
* sync send alert handler
*
* @param alertGroupId alertGroupId
* @param title title
* @param content content
* @param title title
* @param content content
* @return AlertSendResponseCommand
*/
public AlertSendResponseCommand syncHandler(int alertGroupId, String title, String content , int warnType) {
public AlertSendResponseCommand syncHandler(int alertGroupId, String title, String content, int warnType) {
List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
AlertData alertData = new AlertData();
alertData.setContent(content)
.setTitle(title)
.setWarnType(warnType);
.setTitle(title)
.setWarnType(warnType);

boolean sendResponseStatus = true;
List<AlertSendResponseResult> sendResponseResults = new ArrayList<>();
Expand All @@ -128,7 +149,7 @@ public AlertSendResponseCommand syncHandler(int alertGroupId, String title, Stri
AlertResult alertResult = this.alertResultHandler(instance, alertData);
if (alertResult != null) {
AlertSendResponseResult alertSendResponseResult = new AlertSendResponseResult(
Boolean.parseBoolean(String.valueOf(alertResult.getStatus())), alertResult.getMessage());
Boolean.parseBoolean(String.valueOf(alertResult.getStatus())), alertResult.getMessage());
sendResponseStatus = sendResponseStatus && alertSendResponseResult.getStatus();
sendResponseResults.add(alertSendResponseResult);
}
Expand All @@ -140,7 +161,7 @@ public AlertSendResponseCommand syncHandler(int alertGroupId, String title, Stri
/**
* alert result handler
*
* @param instance instance
* @param instance instance
* @param alertData alertData
* @return AlertResult
*/
Expand All @@ -159,7 +180,7 @@ private AlertResult alertResultHandler(AlertPluginInstance instance, AlertData a
Map<String, String> paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams());
String instanceWarnType = WarningType.ALL.getDescp();

if(paramsMap != null){
if (paramsMap != null) {
instanceWarnType = paramsMap.getOrDefault(AlertConstants.NAME_WARNING_TYPE, WarningType.ALL.getDescp());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,73 +17,95 @@

package org.apache.dolphinscheduler.alert;

import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;

import java.io.Closeable;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.annotation.PreDestroy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.event.EventListener;

import javax.annotation.PreDestroy;
import java.io.Closeable;

@SpringBootApplication
@ComponentScan("org.apache.dolphinscheduler")
public class AlertServer implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(AlertServer.class);

private final PluginDao pluginDao;
private final AlertDao alertDao;
private final AlertPluginManager alertPluginManager;
private final AlertSender alertSender;
private final AlertSenderService alertSenderService;
private final AlertRequestProcessor alertRequestProcessor;
private final AlertConfig alertConfig;
private NettyRemotingServer nettyRemotingServer;

private NettyRemotingServer server;

@Autowired
private AlertConfig config;

public AlertServer(PluginDao pluginDao, AlertDao alertDao, AlertPluginManager alertPluginManager, AlertSender alertSender, AlertRequestProcessor alertRequestProcessor) {
public AlertServer(PluginDao pluginDao, AlertSenderService alertSenderService, AlertRequestProcessor alertRequestProcessor, AlertConfig alertConfig) {
this.pluginDao = pluginDao;
this.alertDao = alertDao;
this.alertPluginManager = alertPluginManager;
this.alertSender = alertSender;
this.alertSenderService = alertSenderService;
this.alertRequestProcessor = alertRequestProcessor;
this.alertConfig = alertConfig;
}

/**
* alert server startup, not use web service
*
* @param args arguments
*/
public static void main(String[] args) {
SpringApplication.run(AlertServer.class, args);
Thread.currentThread().setName(Constants.THREAD_NAME_ALERT_SERVER);
new SpringApplicationBuilder(AlertServer.class).web(WebApplicationType.NONE).run(args);
}

@EventListener
public void start(ApplicationReadyEvent readyEvent) {
logger.info("Starting Alert server");
public void run(ApplicationReadyEvent readyEvent) {
logger.info("alert server starting...");

checkTable();
startServer();

Executors.newScheduledThreadPool(1)
.scheduleAtFixedRate(new Sender(), 5, 5, TimeUnit.SECONDS);
alertSenderService.start();
}

@Override
@PreDestroy
public void close() {
server.close();
destroy("alert server destroy");
}

/**
* gracefully stop
*
* @param cause stop cause
*/
public void destroy(String cause) {

try {
// execute only once
if (Stopper.isStopped()) {
return;
}

logger.info("alert server is stopping ..., cause : {}", cause);

// set stop signal is true
Stopper.stop();

// thread sleep 3 seconds for thread quietly stop
ThreadUtils.sleep(3000L);

// close
this.nettyRemotingServer.close();

} catch (Exception e) {
logger.error("alert server stop exception ", e);
}
}

private void checkTable() {
Expand All @@ -95,26 +117,11 @@ private void checkTable() {

private void startServer() {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(config.getPort());
serverConfig.setListenPort(alertConfig.getPort());

server = new NettyRemotingServer(serverConfig);
server.registerProcessor(CommandType.ALERT_SEND_REQUEST, alertRequestProcessor);
server.start();
nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(CommandType.ALERT_SEND_REQUEST, alertRequestProcessor);
nettyRemotingServer.start();
}

final class Sender implements Runnable {
@Override
public void run() {
if (!Stopper.isRunning()) {
return;
}

try {
final List<Alert> alerts = alertDao.listPendingAlerts();
alertSender.send(alerts);
} catch (Exception e) {
logger.error("Failed to send alert", e);
}
}
}
}
Loading