Skip to content

Commit

Permalink
support deploy collector on web, fix dispatch bugs (#1251)
Browse files Browse the repository at this point in the history
Co-authored-by: Ceilzcx <1758619238@qq.com>
  • Loading branch information
tomsun28 and Ceilzcx authored Sep 21, 2023
1 parent a507e38 commit 0160810
Show file tree
Hide file tree
Showing 76 changed files with 1,913 additions and 1,385 deletions.
1 change: 1 addition & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

## Checklist

- [x] I hereby agree to the terms of the [HertzBeat CLA](https://gist.github.com/tomsun28/511c04e7643901cb550bb6ecc75a661b)
- [ ] I have read the [Contributing Guide](https://hertzbeat.com/docs/others/contributing/)
- [ ] I have written the necessary doc or comment.
- [ ] I have added the necessary unit tests and all cases have passed.
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@
3. Deploy collector clusters

```
docker run -d -e IDENTITY=custom-collector-name -e MANAGER_IP=127.0.0.1 -e MANAGER_PORT=1158 --name hertzbeat-collector tancloud/hertzbeat-collector
docker run -d -e IDENTITY=custom-collector-name -e MANAGER_HOST=127.0.0.1 -e MANAGER_PORT=1158 --name hertzbeat-collector tancloud/hertzbeat-collector
```
- `-e IDENTITY=custom-collector-name` : set the collector unique identity name.
- `-e MANAGER_IP=127.0.0.1` : set the main hertzbeat server ip.
- `-e MODE=public` : set the running mode(public or private), public cluster or private cloud-edge.
- `-e MANAGER_HOST=127.0.0.1` : set the main hertzbeat server ip.
- `-e MANAGER_PORT=1158` : set the main hertzbeat server port, default 1158.

Detailed config refer to [Install HertzBeat via Docker](https://hertzbeat.com/docs/start/docker-deploy)
Expand All @@ -139,15 +140,16 @@ Detailed config refer to [Install HertzBeat via Docker](https://hertzbeat.com/do
6. Deploy collector clusters
- Download the release package `hertzbeat-collector-xx.zip` to new machine [GITEE Release](https://gitee.com/dromara/hertzbeat/releases) [GITHUB Release](https://github.com/dromara/hertzbeat/releases)
- Need `java jdk11` Environment
- Configure the collector configuration yml file `hertzbeat-collector/config/application.yml`: unique `identity` name, hertzbeat `manager-ip`, hertzbeat `manager-port`
- Configure the collector configuration yml file `hertzbeat-collector/config/application.yml`: unique `identity` name, running `mode` (public or private), hertzbeat `manager-host`, hertzbeat `manager-port`
```yaml
collector:
dispatch:
entrance:
netty:
enabled: true
identity: ${IDENTITY:}
manager-ip: ${MANAGER_IP:127.0.0.1}
mode: ${MODE:public}
manager-host: ${MANAGER_HOST:127.0.0.1}
manager-port: ${MANAGER_PORT:1158}
```
- Run command `$ ./bin/startup.sh ` or `bin/startup.bat`
Expand Down
10 changes: 6 additions & 4 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@
3. 部署采集器集群

```
docker run -d -e IDENTITY=custom-collector-name -e MANAGER_IP=127.0.0.1 -e MANAGER_PORT=1158 --name hertzbeat-collector tancloud/hertzbeat-collector
docker run -d -e IDENTITY=custom-collector-name -e MANAGER_HOST=127.0.0.1 -e MANAGER_PORT=1158 --name hertzbeat-collector tancloud/hertzbeat-collector
```
- `-e IDENTITY=custom-collector-name` : 配置此采集器的唯一性标识符名称,多个采集器名称不能相同,建议自定义英文名称。
- `-e MANAGER_IP=127.0.0.1` : 配置连接主HertaBeat服务的对外IP。
- `-e MODE=public` : 配置运行模式(public or private), 公共集群模式或私有云边模式。
- `-e MANAGER_HOST=127.0.0.1` : 配置连接主HertaBeat服务的对外IP。
- `-e MANAGER_PORT=1158` : 配置连接主HertzBeat服务的对外端口,默认1158。

更多配置详细步骤参考 [通过Docker方式安装HertzBeat](https://hertzbeat.com/docs/start/docker-deploy)
Expand All @@ -136,15 +137,16 @@ docker run -d -e IDENTITY=custom-collector-name -e MANAGER_IP=127.0.0.1 -e MANAG
6. 部署采集器集群
- 下载采集器安装包`hertzbeat-collector-xx.zip`到规划的另一台部署主机上 [GITEE Release](https://gitee.com/dromara/hertzbeat/releases) [GITHUB Release](https://github.com/dromara/hertzbeat/releases)
- 需要提前已安装`java jdk11`环境
- 配置采集器的配置文件 `hertzbeat-collector/config/application.yml` 里面的连接主HertzBeat服务的对外IP,端口,当前采集器名称(需保证唯一性)等参数 `identity` `manager-ip` `manager-port`
- 配置采集器的配置文件 `hertzbeat-collector/config/application.yml` 里面的连接主HertzBeat服务的对外IP,端口,当前采集器名称(需保证唯一性)等参数 `identity` `mode` (public or private) `manager-host` `manager-port`
```yaml
collector:
dispatch:
entrance:
netty:
enabled: true
identity: ${IDENTITY:}
manager-ip: ${MANAGER_IP:127.0.0.1}
mode: ${MODE:public}
manager-host: ${MANAGER_HOST:127.0.0.1}
manager-port: ${MANAGER_PORT:1158}
```
- 启动 `$ ./bin/startup.sh ` 或 `bin/startup.bat`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ private boolean execAlertExpression(Map<String, Object> fieldValueMap, String ex
}

private void handlerAvailableMetrics(long monitorId, String app, CollectRep.MetricsData metricsData) {
// TODO CACHE getMonitorBindAlertAvaDefine
AlertDefine avaAlertDefine = alertDefineService.getMonitorBindAlertAvaDefine(monitorId, app, CommonConstants.AVAILABILITY);
if (avaAlertDefine == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.springframework.data.domain.Sort;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.http.ResponseEntity;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class AlertDefinesController {
description = "You can obtain the list of alarm definitions by querying filter items | 根据查询过滤项获取告警定义信息列表")
public ResponseEntity<Message<Page<AlertDefine>>> getAlertDefines(
@Parameter(description = "Alarm Definition ID | 告警定义ID", example = "6565463543") @RequestParam(required = false) List<Long> ids,
@Parameter(description = "Alarm Definition app | 告警定义名称", example = "6565463543") @RequestParam(required = false) String search,
@Parameter(description = "Alarm Definition Severity | 告警定义级别", example = "6565463543") @RequestParam(required = false) Byte priority,
@Parameter(description = "Sort field, default id | 排序字段,默认id", example = "id") @RequestParam(defaultValue = "id") String sort,
@Parameter(description = "Sort mode: asc: ascending, desc: descending | 排序方式,asc:升序,desc:降序", example = "desc") @RequestParam(defaultValue = "desc") String order,
Expand All @@ -77,6 +79,23 @@ public ResponseEntity<Message<Page<AlertDefine>>> getAlertDefines(
}
andList.add(inPredicate);
}
if (StringUtils.hasText(search)) {
Predicate predicate = criteriaBuilder.or(
criteriaBuilder.like(
criteriaBuilder.lower(root.get("app")),
"%" + search.toLowerCase() + "%"
),
criteriaBuilder.like(
criteriaBuilder.lower(root.get("metric")),
"%" + search.toUpperCase() + "%"
),
criteriaBuilder.like(
criteriaBuilder.lower(root.get("field")),
"%" + search.toUpperCase() + "%"
)
);
andList.add(predicate);
}
if (priority != null) {
Predicate predicate = criteriaBuilder.equal(root.get("priority"), priority);
andList.add(predicate);
Expand All @@ -102,40 +121,5 @@ public ResponseEntity<Message<Void>> deleteAlertDefines(
}
return ResponseEntity.ok(Message.success());
}

@GetMapping("/app")
@Operation(summary = "Example Query the alarm definition list by app| 根据名称查询告警定义列表",
description = "You can obtain the list of alarm definitions by querying filter items | 根据查询过滤项获取告警定义信息列表")
public ResponseEntity<Message<Page<AlertDefine>>> getAlertDefinesByName(
@Parameter(description = "Alarm Definition app | 告警定义名称", example = "6565463543") @RequestParam(required = false) String app,
@Parameter(description = "Sort field, default id | 排序字段,默认id", example = "id") @RequestParam(defaultValue = "id") String sort,
@Parameter(description = "Sort mode: asc: ascending, desc: descending | 排序方式,asc:升序,desc:降序", example = "desc") @RequestParam(defaultValue = "desc") String order,
@Parameter(description = "List current page | 列表当前分页", example = "0") @RequestParam(defaultValue = "0") int pageIndex,
@Parameter(description = "Number of list pages | 列表分页数量", example = "8") @RequestParam(defaultValue = "8") int pageSize) {

Specification<AlertDefine> specification = (root, query, criteriaBuilder) -> {
List<Predicate> andList = new ArrayList<>();
if (app != null) {
Predicate predicate = criteriaBuilder.or(
criteriaBuilder.like(
criteriaBuilder.lower(root.get("app")),
"%" + app.toLowerCase() + "%"
),
criteriaBuilder.like(
criteriaBuilder.lower(root.get("app")),
"%" + app.toUpperCase() + "%"
)
);
andList.add(predicate);
}
Predicate[] predicates = new Predicate[andList.size()];
return criteriaBuilder.and(andList.toArray(predicates));
};
// 分页是必须的
Sort sortExp = Sort.by(new Sort.Order(Sort.Direction.fromString(order), sort));
PageRequest pageRequest = PageRequest.of(pageIndex, pageSize, sortExp);
Page<AlertDefine> alertDefinePage = alertDefineService.getAlertDefines(specification, pageRequest);
return ResponseEntity.ok(Message.success(alertDefinePage));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;

Expand All @@ -29,7 +30,7 @@
*/
@Service
@Slf4j
@Order(value = 0)
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class JdbcSpiLoader implements CommandLineRunner {


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/
@Configuration
@Order(value = Ordered.HIGHEST_PRECEDENCE)
@Order(value = Ordered.HIGHEST_PRECEDENCE + 1)
public class CollectStrategyFactory implements CommandLineRunner {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.dromara.hertzbeat.collector.dispatch.entrance.internal.CollectJobService;
import org.dromara.hertzbeat.collector.dispatch.timer.Timeout;
import org.dromara.hertzbeat.collector.dispatch.timer.TimerDispatch;
import org.dromara.hertzbeat.collector.dispatch.timer.WheelTimerTask;
Expand Down Expand Up @@ -92,17 +93,21 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
private final ThreadPoolExecutor poolExecutor;

private final WorkerPool workerPool;

private final String collectorIdentity;

public CommonDispatcher(MetricsCollectorQueue jobRequestQueue,
TimerDispatch timerDispatch,
CommonDataQueue commonDataQueue,
WorkerPool workerPool,
CollectJobService collectJobService,
List<UnitConvert> unitConvertList) {
this.commonDataQueue = commonDataQueue;
this.jobRequestQueue = jobRequestQueue;
this.timerDispatch = timerDispatch;
this.unitConvertList = unitConvertList;
this.workerPool = workerPool;
this.collectorIdentity = collectJobService.getCollectorIdentity();
this.metricsTimeoutMonitorMap = new ConcurrentHashMap<>(16);
poolExecutor = new ThreadPoolExecutor(2, 2, 1,
TimeUnit.SECONDS,
Expand Down Expand Up @@ -165,6 +170,7 @@ public void start() {
WheelTimerTask timerJob = (WheelTimerTask) metricsTime.getTimeout().task();
CollectRep.MetricsData metricsData = CollectRep.MetricsData.newBuilder()
.setId(timerJob.getJob().getMonitorId())
.setTenantId(timerJob.getJob().getTenantId())
.setApp(timerJob.getJob().getApp())
.setMetrics(metricsTime.getMetrics().getName())
.setPriority(metricsTime.getMetrics().getPriority())
Expand Down Expand Up @@ -202,7 +208,8 @@ public void dispatchMetricsTask(Timeout timeout) {
job.constructPriorMetrics();
Set<Metrics> metricsSet = job.getNextCollectMetrics(null, true);
metricsSet.forEach(metrics -> {
MetricsCollect metricsCollect = new MetricsCollect(metrics, timeout, this, unitConvertList);
MetricsCollect metricsCollect = new MetricsCollect(metrics, timeout, this,
collectorIdentity, unitConvertList);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + "-" + metrics.getName(),
new MetricsTime(System.currentTimeMillis(), metrics, timeout));
Expand Down Expand Up @@ -266,7 +273,8 @@ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.Met
List<Map<String, Configmap>> configmapList = getConfigmapFromPreCollectData(metricsData);
for (Metrics metricItem : metricsSet) {
if (CollectionUtils.isEmpty(configmapList) || CollectUtil.notContainCryPlaceholder(GSON.toJsonTree(metricItem))) {
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this, unitConvertList);
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this,
collectorIdentity, unitConvertList);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + "-" + metricItem.getName(),
new MetricsTime(System.currentTimeMillis(), metricItem, timeout));
Expand All @@ -284,7 +292,8 @@ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.Met
metric.setSubTaskNum(subTaskNumAtomic);
metric.setSubTaskId(index);
metric.setSubTaskDataRef(metricsDataReference);
MetricsCollect metricsCollect = new MetricsCollect(metric, timeout, this, unitConvertList);
MetricsCollect metricsCollect = new MetricsCollect(metric, timeout, this,
collectorIdentity, unitConvertList);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + "-" + metric.getName() + "-sub-" + index,
new MetricsTime(System.currentTimeMillis(), metric, timeout));
Expand Down Expand Up @@ -321,7 +330,8 @@ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.Met
// The execution of the current level indicator group is completed, and the execution of the next level indicator group starts
// 当前级别指标组执行完成,开始执行下一级别的指标组
metricsSet.forEach(metricItem -> {
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this, unitConvertList);
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this,
collectorIdentity, unitConvertList);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + "-" + metricItem.getName(),
new MetricsTime(System.currentTimeMillis(), metricItem, timeout));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,18 @@ public static class NettyProperties {
* default is the host name
*/
private String identity;

/**
* this collector mode
* public: for public network, support cluster
* private: for private network, support cloud-edge
*/
private String mode;

/**
* connect cluster master ip
* connect cluster master host
*/
private String managerIp;
private String managerHost;

/**
* connect cluster master port
Expand All @@ -243,13 +250,21 @@ public String getIdentity() {
public void setIdentity(String identity) {
this.identity = identity;
}

public String getManagerIp() {
return managerIp;

public String getMode() {
return mode;
}

public void setMode(String mode) {
this.mode = mode;
}

public String getManagerHost() {
return managerHost;
}

public void setManagerIp(String managerIp) {
this.managerIp = managerIp;
public void setManagerHost(String managerHost) {
this.managerHost = managerHost;
}

public int getManagerPort() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
* 调度告警阈值时间 100ms
*/
private static final long WARN_DISPATCH_TIME = 100;
/**
* collector identity
*/
protected String collectorIdentity;
/**
* Tenant ID
*/
protected long tenantId;
/**
* Monitor ID
* 监控ID
Expand Down Expand Up @@ -99,13 +107,16 @@ public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {

public MetricsCollect(Metrics metrics, Timeout timeout,
CollectDataDispatch collectDataDispatch,
String collectorIdentity,
List<UnitConvert> unitConvertList) {
this.newTime = System.currentTimeMillis();
this.timeout = timeout;
this.metrics = metrics;
this.collectorIdentity = collectorIdentity;
WheelTimerTask timerJob = (WheelTimerTask) timeout.task();
Job job = timerJob.getJob();
this.monitorId = job.getMonitorId();
this.tenantId = job.getTenantId();
this.app = job.getApp();
this.collectDataDispatch = collectDataDispatch;
this.isCyclic = job.isCyclic();
Expand All @@ -126,6 +137,7 @@ public void run() {
CollectRep.MetricsData.Builder response = CollectRep.MetricsData.newBuilder();
response.setApp(app);
response.setId(monitorId);
response.setTenantId(tenantId);
response.setMetrics(metrics.getName());
// According to the indicator group collection protocol, application type, etc., dispatch to the real application indicator group collection implementation class
// 根据指标组采集协议,应用类型等来调度到真正的应用指标组采集实现类
Expand Down
Loading

0 comments on commit 0160810

Please sign in to comment.