From 21fc49a0be659c08baf33d19cdf1c2886f0abdd7 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Fri, 3 May 2024 16:28:58 +0800 Subject: [PATCH 1/7] [test] add test for UdpCollectImpl (#1906) --- .../collect/udp/UdpCollectImplTest.java | 142 ++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 collector/src/test/java/org/apache/hertzbeat/collector/collect/udp/UdpCollectImplTest.java diff --git a/collector/src/test/java/org/apache/hertzbeat/collector/collect/udp/UdpCollectImplTest.java b/collector/src/test/java/org/apache/hertzbeat/collector/collect/udp/UdpCollectImplTest.java new file mode 100644 index 00000000000..f8ed3c0ddc8 --- /dev/null +++ b/collector/src/test/java/org/apache/hertzbeat/collector/collect/udp/UdpCollectImplTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.collector.collect.udp; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.PortUnreachableException; +import java.net.SocketTimeoutException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hertzbeat.common.entity.job.Metrics; +import org.apache.hertzbeat.common.entity.job.protocol.UdpProtocol; +import org.apache.hertzbeat.common.entity.message.CollectRep; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.MockedConstruction; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +/** + * Test case for {@link UdpCollectImpl} + */ +@ExtendWith(MockitoExtension.class) +class UdpCollectImplTest { + + @InjectMocks + private UdpCollectImpl udpCollect; + + @Test + void testPreCheck() { + CollectRep.MetricsData.Builder builder = CollectRep.MetricsData.newBuilder(); + List aliasField = new ArrayList<>(); + aliasField.add("responseTime"); + Metrics metrics = new Metrics(); + metrics.setAliasFields(aliasField); + udpCollect.collect(builder, 1L, "test", metrics); + assertEquals(builder.getCode(), CollectRep.Code.FAIL); + + } + + @Test + void testCollect(){ + CollectRep.MetricsData.Builder builder = CollectRep.MetricsData.newBuilder(); + UdpProtocol ftpProtocol = UdpProtocol.builder() + .timeout("10") + .port("21") + .host("127.0.0.1") + .build(); + + MockedConstruction socketMockedConstruction = + Mockito.mockConstruction(DatagramSocket.class, (socket, context) -> { + Mockito.doNothing().when(socket).send(Mockito.any(DatagramPacket.class)); + Mockito.doNothing().when(socket).receive(Mockito.any(DatagramPacket.class)); + }); + + + List aliasField = new ArrayList<>(); + aliasField.add("responseTime"); + Metrics metrics = new Metrics(); + metrics.setUdp(ftpProtocol); + metrics.setAliasFields(aliasField); + udpCollect.collect(builder, 1L, "test", metrics); + assertEquals(builder.getValuesCount(), 1); + for (CollectRep.ValueRow valueRow : builder.getValuesList()) { + assertNotNull(valueRow.getColumns(0)); + } + + socketMockedConstruction.close(); + } + + @Test + void testCollectWithSocketException(){ + CollectRep.MetricsData.Builder builder = CollectRep.MetricsData.newBuilder(); + UdpProtocol ftpProtocol = UdpProtocol.builder() + .timeout("10") + .port("21") + .host("127.0.0.1") + .build(); + + MockedConstruction socketMockedConstruction = + Mockito.mockConstruction(DatagramSocket.class, (socket, context) -> { + Mockito.doThrow(new SocketTimeoutException("test exception")) + .when(socket).send(Mockito.any(DatagramPacket.class)); + }); + + + List aliasField = new ArrayList<>(); + aliasField.add("responseTime"); + Metrics metrics = new Metrics(); + metrics.setUdp(ftpProtocol); + metrics.setAliasFields(aliasField); + udpCollect.collect(builder, 1L, "test", metrics); + assertEquals(builder.getCode(), CollectRep.Code.UN_CONNECTABLE); + + socketMockedConstruction.close(); + } + + @Test + void testCollectWithPortUnreachableException(){ + CollectRep.MetricsData.Builder builder = CollectRep.MetricsData.newBuilder(); + UdpProtocol ftpProtocol = UdpProtocol.builder() + .timeout("10") + .port("21") + .host("127.0.0.1") + .build(); + + MockedConstruction socketMockedConstruction = + Mockito.mockConstruction(DatagramSocket.class, (socket, context) -> { + Mockito.doThrow(new PortUnreachableException("test exception")) + .when(socket).send(Mockito.any(DatagramPacket.class)); + }); + + + List aliasField = new ArrayList<>(); + aliasField.add("responseTime"); + Metrics metrics = new Metrics(); + metrics.setUdp(ftpProtocol); + metrics.setAliasFields(aliasField); + udpCollect.collect(builder, 1L, "test", metrics); + assertEquals(builder.getCode(), CollectRep.Code.UN_REACHABLE); + + socketMockedConstruction.close(); + } +} \ No newline at end of file From 87b5af0e702cad8cfe48c23b10b4aa9452daf32d Mon Sep 17 00:00:00 2001 From: yqxxgh <42080876+yqxxgh@users.noreply.github.com> Date: Fri, 3 May 2024 23:15:51 +0800 Subject: [PATCH 2/7] [improve]fix github unknown licenses found (#1907) --- .gitpod.yml | 8 -------- .licenserc.yaml | 1 - LICENSE.tpl | 33 --------------------------------- 3 files changed, 42 deletions(-) delete mode 100644 .gitpod.yml delete mode 100644 LICENSE.tpl diff --git a/.gitpod.yml b/.gitpod.yml deleted file mode 100644 index 9828af3c206..00000000000 --- a/.gitpod.yml +++ /dev/null @@ -1,8 +0,0 @@ -# This configuration file was automatically generated by Gitpod. -# Please adjust to your needs (see https://www.gitpod.io/docs/introduction/learn-gitpod/gitpod-yaml) -# and commit this file to your remote git repository to share the goodness with others. - -# Learn more from ready-to-use templates: https://www.gitpod.io/docs/introduction/getting-started/quickstart - -tasks: - - init: mvn install -DskipTests=false diff --git a/.licenserc.yaml b/.licenserc.yaml index ca8f71b3fc5..0ec0d92aea7 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -41,7 +41,6 @@ header: - '**/*.txt' - '**/target/**' - '.gitattributes' - - '.gitpod.yml' - '**/.gitignore' - '**/.gitkeep' - 'home/**' diff --git a/LICENSE.tpl b/LICENSE.tpl deleted file mode 100644 index 30f0f9edf9e..00000000000 --- a/LICENSE.tpl +++ /dev/null @@ -1,33 +0,0 @@ -{{ .LicenseContent }} - -======================================================================= -Apache HertzBeat Subcomponents: - -The Apache HertzBeat project contains subcomponents with separate copyright -notices and license terms. Your use of the source code for the these -subcomponents is subject to the terms and conditions of the following -licenses. -======================================================================== - -{{ range .Groups }} -======================================================================== -{{ .LicenseID }} licenses -======================================================================== -The following components are provided under the {{ .LicenseID }} License. See project link for details. -{{- if eq .LicenseID "Apache-2.0" }} -The text of each license is the standard Apache 2.0 license. -{{- else }} -The text of each license is also included in licenses/LICENSE-[project].txt. -{{ end }} - - {{- range .Deps }} - {{- $groupArtifact := regexSplit ":" .Name -1 }} - {{- if eq (len $groupArtifact) 2 }} - {{- $group := index $groupArtifact 0 }} - {{- $artifact := index $groupArtifact 1 }} - https://mvnrepository.com/artifact/{{ $group }}/{{ $artifact }}/{{ .Version }} {{ .LicenseID }} - {{- else }} - https://npmjs.com/package/{{ .Name }}/v/{{ .Version }} {{ .Version }} {{ .LicenseID }} - {{- end }} - {{- end }} -{{ end }} From be42721bf2de2128e42abd57a2583dfc27fe98d8 Mon Sep 17 00:00:00 2001 From: Ceilzcx <48920254+Ceilzcx@users.noreply.github.com> Date: Sat, 4 May 2024 10:49:05 +0800 Subject: [PATCH 3/7] [improve] refactor code (#1901) --- .../common/entity/dto/CollectorInfo.java | 3 - .../hertzbeat/common/entity/job/Job.java | 10 +- .../scheduler/CollectorJobScheduler.java | 170 +++++++++--------- .../apache/hertzbeat/manager/ManagerTest.java | 16 +- .../WarehouseAutoConfiguration.java | 2 +- .../config/entrance/KafkaProperties.java | 33 ---- .../controller/MetricsDataController.java | 16 +- .../service/WarehouseServiceImpl.java | 8 +- .../warehouse/store/DataStorageDispatch.java | 50 ++++-- .../AbstractHistoryDataStorage.java | 4 +- .../greptime/GrepTimeDbDataStorage.java} | 13 +- .../history}/greptime/GreptimeProperties.java | 2 +- .../influxdb/InfluxdbDataStorage.java} | 13 +- .../history}/influxdb/InfluxdbProperties.java | 2 +- .../iotdb/IotDbDataStorage.java} | 14 +- .../history}/iotdb/IotDbProperties.java | 2 +- .../history}/iotdb/IotDbVersion.java | 2 +- .../jpa/JpaDatabaseDataStorage.java} | 15 +- .../history}/jpa/JpaProperties.java | 2 +- .../tdengine/TdEngineDataStorage.java} | 8 +- .../history}/tdengine/TdEngineProperties.java | 2 +- .../VictoriaMetricsClusterDataStorage.java} | 12 +- .../vm}/VictoriaMetricsClusterProperties.java | 2 +- .../vm/VictoriaMetricsDataStorage.java} | 11 +- .../vm}/VictoriaMetricsInsertProperties.java | 2 +- .../vm/VictoriaMetricsProperties.java | 2 +- .../vm}/VictoriaMetricsSelectProperties.java | 2 +- .../AbstractRealTimeDataStorage.java | 4 +- .../memory/MemoryDataStorage.java} | 11 +- .../realtime}/memory/MemoryProperties.java | 2 +- .../redis}/MetricsDataRedisCodec.java | 2 +- .../redis/RedisDataStorage.java} | 11 +- .../realtime}/redis/RedisProperties.java | 2 +- .../main/resources/META-INF/spring.factories | 2 +- .../controller/MetricsDataControllerTest.java | 4 +- ...geTest.java => MemoryDataStorageTest.java} | 5 +- .../store/MetricsDataRedisCodecTest.java | 1 + ...ageTest.java => RedisDataStorageTest.java} | 5 +- ...Test.java => TdEngineDataStorageTest.java} | 5 +- 39 files changed, 220 insertions(+), 252 deletions(-) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/{config => }/WarehouseAutoConfiguration.java (95%) delete mode 100644 warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/entrance/KafkaProperties.java rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/{ => history}/AbstractHistoryDataStorage.java (95%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/{HistoryGrepTimeDbDataStorage.java => history/greptime/GrepTimeDbDataStorage.java} (97%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/{config/store => store/history}/greptime/GreptimeProperties.java (95%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/{HistoryInfluxdbDataStorage.java => history/influxdb/InfluxdbDataStorage.java} (97%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/{config/store => store/history}/influxdb/InfluxdbProperties.java (95%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/{HistoryIotDbDataStorage.java => history/iotdb/IotDbDataStorage.java} (97%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/{config/store => store/history}/iotdb/IotDbProperties.java (97%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/{config/store => store/history}/iotdb/IotDbVersion.java (94%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/{HistoryJpaDatabaseDataStorage.java => history/jpa/JpaDatabaseDataStorage.java} (96%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/{config/store => store/history}/jpa/JpaProperties.java (96%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/{HistoryTdEngineDataStorage.java => history/tdengine/TdEngineDataStorage.java} (98%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/{config/store => store/history}/tdengine/TdEngineProperties.java (96%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/{HistoryVictoriaMetricsClusterDataStorage.java => history/vm/VictoriaMetricsClusterDataStorage.java} (98%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/{config/store/vm/cluster => store/history/vm}/VictoriaMetricsClusterProperties.java (94%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/{HistoryVictoriaMetricsDataStorage.java => history/vm/VictoriaMetricsDataStorage.java} (98%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/{config/store/vm/cluster => store/history/vm}/VictoriaMetricsInsertProperties.java (93%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/{config/store => store/history}/vm/VictoriaMetricsProperties.java (95%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/{config/store/vm/cluster => store/history/vm}/VictoriaMetricsSelectProperties.java (93%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/{ => realtime}/AbstractRealTimeDataStorage.java (93%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/{RealTimeMemoryDataStorage.java => realtime/memory/MemoryDataStorage.java} (89%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/{config/store => store/realtime}/memory/MemoryProperties.java (95%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/{ => realtime/redis}/MetricsDataRedisCodec.java (96%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/{RealTimeRedisDataStorage.java => realtime/redis/RedisDataStorage.java} (92%) rename warehouse/src/main/java/org/apache/hertzbeat/warehouse/{config/store => store/realtime}/redis/RedisProperties.java (95%) rename warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/{RealTimeRedisDataStorageTest.java => MemoryDataStorageTest.java} (87%) rename warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/{RealTimeMemoryDataStorageTest.java => RedisDataStorageTest.java} (87%) rename warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/{HistoryTdEngineDataStorageTest.java => TdEngineDataStorageTest.java} (88%) diff --git a/common/src/main/java/org/apache/hertzbeat/common/entity/dto/CollectorInfo.java b/common/src/main/java/org/apache/hertzbeat/common/entity/dto/CollectorInfo.java index b92893c26e5..47abb6ae8f8 100644 --- a/common/src/main/java/org/apache/hertzbeat/common/entity/dto/CollectorInfo.java +++ b/common/src/main/java/org/apache/hertzbeat/common/entity/dto/CollectorInfo.java @@ -44,7 +44,4 @@ public class CollectorInfo { @NotNull private String mode = CommonConstants.MODE_PUBLIC; - - // todo more - } diff --git a/common/src/main/java/org/apache/hertzbeat/common/entity/job/Job.java b/common/src/main/java/org/apache/hertzbeat/common/entity/job/Job.java index 2becfa052c6..10299a5d6e1 100644 --- a/common/src/main/java/org/apache/hertzbeat/common/entity/job/Job.java +++ b/common/src/main/java/org/apache/hertzbeat/common/entity/job/Job.java @@ -65,7 +65,7 @@ public class Job { */ private boolean hide = true; /** - * Large categories of monitoring + * Large categories of monitoring * service-application service monitoring db-database * monitoring custom-custom monitoring os-operating system monitoring... */ @@ -126,7 +126,7 @@ public class Job { */ @JsonIgnore private transient long dispatchTime; - + /** * collector usage - metric group task execution priority view * 0 - availability @@ -183,8 +183,8 @@ public synchronized void constructPriorMetrics() { * collector use - to get the next set of priority metric group tasks * * @param metrics Current Metrics - * @param first Is it the first time to get - * @return Metrics Tasks + * @param first Is it the first time to get + * @return Metrics Tasks * Returning null means: the job has been completed, and the collection of all metrics has ended * Returning the empty set metrics that there are still metrics collection tasks at the current * level that have not been completed,and the next level metrics task collection cannot be performed. @@ -238,7 +238,7 @@ public void addEnvConfigmaps(Map envConfigmaps) { if (this.envConfigmaps == null) { this.envConfigmaps = envConfigmaps; } else { - this.envConfigmaps.putAll(envConfigmaps); + this.envConfigmaps.putAll(envConfigmaps); } } diff --git a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java index c2c63dee0ae..dfb4fc4b080 100644 --- a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java +++ b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java @@ -92,6 +92,10 @@ public class CollectorJobScheduler implements CollectorScheduling, CollectJobSch @Override public void collectorGoOnline(String identity, CollectorInfo collectorInfo) { + if (identity == null) { + log.error("identity can not be null if collector not existed"); + return; + } Optional collectorOptional = collectorDao.findCollectorByName(identity); Collector collector; if (collectorOptional.isPresent()) { @@ -109,9 +113,12 @@ public void collectorGoOnline(String identity, CollectorInfo collectorInfo) { log.error("collectorInfo can not null when collector not existed"); return; } - collector = Collector.builder().name(identity).ip(collectorInfo.getIp()) + collector = Collector.builder() + .name(identity) + .ip(collectorInfo.getIp()) .mode(collectorInfo.getMode()) - .status(CommonConstants.COLLECTOR_STATUS_ONLINE).build(); + .status(CommonConstants.COLLECTOR_STATUS_ONLINE) + .build(); } collectorDao.save(collector); ConsistentHash.Node node = new ConsistentHash.Node(identity, collector.getMode(), @@ -122,42 +129,42 @@ public void collectorGoOnline(String identity, CollectorInfo collectorInfo) { List binds = collectorMonitorBindDao.findCollectorMonitorBindsByCollector(identity); for (CollectorMonitorBind bind : binds) { Optional monitorOptional = monitorDao.findById(bind.getMonitorId()); - if (monitorOptional.isPresent()) { - Monitor monitor = monitorOptional.get(); - if (monitor.getStatus() == CommonConstants.UN_MANAGE_CODE) { - continue; + if (monitorOptional.isEmpty()) { + continue; + } + Monitor monitor = monitorOptional.get(); + if (monitor.getStatus() == CommonConstants.UN_MANAGE_CODE) { + continue; + } + try { + // build collect job entity + Job appDefine = appService.getAppDefine(monitor.getApp()); + if (CommonConstants.PROMETHEUS.equals(monitor.getApp())) { + appDefine.setApp(CommonConstants.PROMETHEUS_APP_PREFIX + monitor.getName()); } - try { - // build collect job entity - Job appDefine = appService.getAppDefine(monitor.getApp()); - if (CommonConstants.PROMETHEUS.equals(monitor.getApp())) { - appDefine.setApp(CommonConstants.PROMETHEUS_APP_PREFIX + monitor.getName()); + appDefine.setMonitorId(monitor.getId()); + appDefine.setInterval(monitor.getIntervals()); + appDefine.setCyclic(true); + appDefine.setTimestamp(System.currentTimeMillis()); + List params = paramDao.findParamsByMonitorId(monitor.getId()); + List configmaps = params.stream() + .map(param -> new Configmap(param.getField(), param.getParamValue(), + param.getType())).collect(Collectors.toList()); + List paramDefaultValue = appDefine.getParams().stream() + .filter(item -> StringUtils.hasText(item.getDefaultValue())) + .toList(); + paramDefaultValue.forEach(defaultVar -> { + if (configmaps.stream().noneMatch(item -> item.getKey().equals(defaultVar.getField()))) { + Configmap configmap = new Configmap(defaultVar.getField(), defaultVar.getDefaultValue(), (byte) 1); + configmaps.add(configmap); } - appDefine.setMonitorId(monitor.getId()); - appDefine.setInterval(monitor.getIntervals()); - appDefine.setCyclic(true); - appDefine.setTimestamp(System.currentTimeMillis()); - List params = paramDao.findParamsByMonitorId(monitor.getId()); - List configmaps = params.stream() - .map(param -> new Configmap(param.getField(), param.getParamValue(), - param.getType())).collect(Collectors.toList()); - List paramDefaultValue = appDefine.getParams().stream() - .filter(item -> StringUtils.hasText(item.getDefaultValue())) - .collect(Collectors.toList()); - paramDefaultValue.forEach(defaultVar -> { - if (configmaps.stream().noneMatch(item -> item.getKey().equals(defaultVar.getField()))) { - // todo type - Configmap configmap = new Configmap(defaultVar.getField(), defaultVar.getDefaultValue(), (byte) 1); - configmaps.add(configmap); - } - }); - appDefine.setConfigmap(configmaps); - long jobId = addAsyncCollectJob(appDefine, identity); - monitor.setJobId(jobId); - monitorDao.save(monitor); - } catch (Exception e) { - log.error("insert pinned monitor job: {} in collector: {} error,continue next monitor", monitor, identity, e); - } + }); + appDefine.setConfigmap(configmaps); + long jobId = addAsyncCollectJob(appDefine, identity); + monitor.setJobId(jobId); + monitorDao.save(monitor); + } catch (Exception e) { + log.error("insert pinned monitor job: {} in collector: {} error,continue next monitor", monitor, identity, e); } } } @@ -165,14 +172,16 @@ public void collectorGoOnline(String identity, CollectorInfo collectorInfo) { @Override public void collectorGoOffline(String identity) { Optional collectorOptional = collectorDao.findCollectorByName(identity); - if (collectorOptional.isPresent()) { - log.info("the collector: {} is going offline now.", identity); - Collector collector = collectorOptional.get(); - collector.setStatus(CommonConstants.COLLECTOR_STATUS_OFFLINE); - collectorDao.save(collector); - consistentHash.removeNode(identity); - reBalanceCollectorAssignJobs(); + if (collectorOptional.isEmpty()) { + log.info("the collector : {} not found.", identity); + return; } + Collector collector = collectorOptional.get(); + collector.setStatus(CommonConstants.COLLECTOR_STATUS_OFFLINE); + collectorDao.save(collector); + consistentHash.removeNode(identity); + reBalanceCollectorAssignJobs(); + log.info("the collector: {} go offline success.", identity); } @Override @@ -180,56 +189,44 @@ public void reBalanceCollectorAssignJobs() { consistentHash.getAllNodes().entrySet().parallelStream().forEach(entry -> { String collectorName = entry.getKey(); AssignJobs assignJobs = entry.getValue().getAssignJobs(); - if (assignJobs != null) { - if (CommonConstants.MAIN_COLLECTOR_NODE.equals(collectorName)) { - if (!assignJobs.getAddingJobs().isEmpty()) { - Set addedJobIds = new HashSet<>(8); - for (Long addingJobId : assignJobs.getAddingJobs()) { - Job job = jobContentCache.get(addingJobId); - if (job == null) { - log.error("assigning job {} content is null.", addingJobId); - continue; - } - addedJobIds.add(addingJobId); - collectJobService.addAsyncCollectJob(job); - } - assignJobs.addAssignJobs(addedJobIds); - assignJobs.removeAddingJobs(addedJobIds); - } - if (!assignJobs.getRemovingJobs().isEmpty()) { - assignJobs.getRemovingJobs().forEach(jobId -> collectJobService.cancelAsyncCollectJob(jobId)); - assignJobs.clearRemovingJobs(); - } - } else { - if (!assignJobs.getAddingJobs().isEmpty()) { - Set addedJobIds = new HashSet<>(8); - for (Long addingJobId : assignJobs.getAddingJobs()) { - Job job = jobContentCache.get(addingJobId); - if (job == null) { - log.error("assigning job {} content is null.", addingJobId); - continue; - } - addedJobIds.add(addingJobId); - ClusterMsg.Message message = ClusterMsg.Message.newBuilder() - .setDirection(ClusterMsg.Direction.REQUEST) - .setType(ClusterMsg.MessageType.ISSUE_CYCLIC_TASK) - .setMsg(JsonUtil.toJson(job)) - .build(); - this.manageServer.sendMsg(collectorName, message); - } - assignJobs.addAssignJobs(addedJobIds); - assignJobs.removeAddingJobs(addedJobIds); + if (collectorName == null || assignJobs == null) { + return; + } + if (assignJobs.getAddingJobs() != null && !assignJobs.getAddingJobs().isEmpty()) { + Set addedJobIds = new HashSet<>(8); + for (Long addingJobId : assignJobs.getAddingJobs()) { + Job job = jobContentCache.get(addingJobId); + if (job == null) { + log.error("assigning job {} content is null.", addingJobId); + continue; } - if (!assignJobs.getRemovingJobs().isEmpty()) { + addedJobIds.add(addingJobId); + if (CommonConstants.MAIN_COLLECTOR_NODE.equals(collectorName)) { + collectJobService.addAsyncCollectJob(job); + } else { ClusterMsg.Message message = ClusterMsg.Message.newBuilder() .setDirection(ClusterMsg.Direction.REQUEST) - .setType(ClusterMsg.MessageType.DELETE_CYCLIC_TASK) - .setMsg(JsonUtil.toJson(assignJobs.getRemovingJobs())) + .setType(ClusterMsg.MessageType.ISSUE_CYCLIC_TASK) + .setMsg(JsonUtil.toJson(job)) .build(); this.manageServer.sendMsg(collectorName, message); - assignJobs.clearRemovingJobs(); } } + assignJobs.addAssignJobs(addedJobIds); + assignJobs.removeAddingJobs(addedJobIds); + } + if (assignJobs.getRemovingJobs() != null && !assignJobs.getRemovingJobs().isEmpty()) { + if (CommonConstants.MAIN_COLLECTOR_NODE.equals(collectorName)) { + assignJobs.getRemovingJobs().forEach(jobId -> collectJobService.cancelAsyncCollectJob(jobId)); + } else { + ClusterMsg.Message message = ClusterMsg.Message.newBuilder() + .setDirection(ClusterMsg.Direction.REQUEST) + .setType(ClusterMsg.MessageType.DELETE_CYCLIC_TASK) + .setMsg(JsonUtil.toJson(assignJobs.getRemovingJobs())) + .build(); + this.manageServer.sendMsg(collectorName, message); + } + assignJobs.clearRemovingJobs(); } }); } @@ -461,5 +458,4 @@ public void collectSyncJobResponse(List metricsDataList) public void setManageServer(ManageServer manageServer) { this.manageServer = manageServer; } - } diff --git a/manager/src/test/java/org/apache/hertzbeat/manager/ManagerTest.java b/manager/src/test/java/org/apache/hertzbeat/manager/ManagerTest.java index 79d8f528532..b962b258386 100644 --- a/manager/src/test/java/org/apache/hertzbeat/manager/ManagerTest.java +++ b/manager/src/test/java/org/apache/hertzbeat/manager/ManagerTest.java @@ -46,10 +46,10 @@ import org.apache.hertzbeat.common.support.SpringContextHolder; import org.apache.hertzbeat.warehouse.WarehouseWorkerPool; import org.apache.hertzbeat.warehouse.controller.MetricsDataController; -import org.apache.hertzbeat.warehouse.store.HistoryIotDbDataStorage; -import org.apache.hertzbeat.warehouse.store.HistoryTdEngineDataStorage; -import org.apache.hertzbeat.warehouse.store.RealTimeMemoryDataStorage; -import org.apache.hertzbeat.warehouse.store.RealTimeRedisDataStorage; +import org.apache.hertzbeat.warehouse.store.history.iotdb.IotDbDataStorage; +import org.apache.hertzbeat.warehouse.store.history.tdengine.TdEngineDataStorage; +import org.apache.hertzbeat.warehouse.store.realtime.memory.MemoryDataStorage; +import org.apache.hertzbeat.warehouse.store.realtime.redis.RedisDataStorage; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.context.ApplicationContext; @@ -99,10 +99,10 @@ void testAutoImport() { assertNotNull(ctx.getBean(WarehouseWorkerPool.class)); // default DataStorage is RealTimeMemoryDataStorage - assertNotNull(ctx.getBean(RealTimeMemoryDataStorage.class)); - assertThrows(NoSuchBeanDefinitionException.class, () -> ctx.getBean(RealTimeRedisDataStorage.class)); - assertThrows(NoSuchBeanDefinitionException.class, () -> ctx.getBean(HistoryTdEngineDataStorage.class)); - assertThrows(NoSuchBeanDefinitionException.class, () -> ctx.getBean(HistoryIotDbDataStorage.class)); + assertNotNull(ctx.getBean(MemoryDataStorage.class)); + assertThrows(NoSuchBeanDefinitionException.class, () -> ctx.getBean(RedisDataStorage.class)); + assertThrows(NoSuchBeanDefinitionException.class, () -> ctx.getBean(TdEngineDataStorage.class)); + assertThrows(NoSuchBeanDefinitionException.class, () -> ctx.getBean(IotDbDataStorage.class)); assertNotNull(ctx.getBean(MetricsDataController.class)); } diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/WarehouseAutoConfiguration.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseAutoConfiguration.java similarity index 95% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/WarehouseAutoConfiguration.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseAutoConfiguration.java index 23e3feb222e..498e487e1b2 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/WarehouseAutoConfiguration.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseAutoConfiguration.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.config; +package org.apache.hertzbeat.warehouse; import org.springframework.context.annotation.ComponentScan; diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/entrance/KafkaProperties.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/entrance/KafkaProperties.java deleted file mode 100644 index a1580705486..00000000000 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/entrance/KafkaProperties.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hertzbeat.warehouse.config.entrance; - -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.boot.context.properties.ConfigurationPropertiesScan; -import org.springframework.boot.context.properties.bind.DefaultValue; - -/** - * kafka configuration information - */ -@ConfigurationProperties(prefix = "warehouse.entrance.kafka") -@ConfigurationPropertiesScan("org.apache.hertzbeat.warehouse.config") -public record KafkaProperties(@DefaultValue("true") boolean enabled, - @DefaultValue("127.0.0.1:9092") String servers, - String topic, - String groupId) { -} diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java index 71647b794b3..cdab3eb3df5 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java @@ -35,10 +35,10 @@ import org.apache.hertzbeat.common.entity.dto.Value; import org.apache.hertzbeat.common.entity.dto.ValueRow; import org.apache.hertzbeat.common.entity.message.CollectRep; -import org.apache.hertzbeat.warehouse.store.AbstractHistoryDataStorage; -import org.apache.hertzbeat.warehouse.store.AbstractRealTimeDataStorage; -import org.apache.hertzbeat.warehouse.store.HistoryJpaDatabaseDataStorage; -import org.apache.hertzbeat.warehouse.store.RealTimeMemoryDataStorage; +import org.apache.hertzbeat.warehouse.store.history.AbstractHistoryDataStorage; +import org.apache.hertzbeat.warehouse.store.history.jpa.JpaDatabaseDataStorage; +import org.apache.hertzbeat.warehouse.store.realtime.AbstractRealTimeDataStorage; +import org.apache.hertzbeat.warehouse.store.realtime.memory.MemoryDataStorage; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; @@ -90,9 +90,9 @@ public ResponseEntity> getMetricsData( AbstractRealTimeDataStorage realTimeDataStorage = realTimeDataStorages.stream() .filter(AbstractRealTimeDataStorage::isServerAvailable) .max((o1, o2) -> { - if (o1 instanceof RealTimeMemoryDataStorage) { + if (o1 instanceof MemoryDataStorage) { return -1; - } else if (o2 instanceof RealTimeMemoryDataStorage) { + } else if (o2 instanceof MemoryDataStorage) { return 1; } else { return 0; @@ -156,9 +156,9 @@ public ResponseEntity> getMetricHistoryData( ) { AbstractHistoryDataStorage historyDataStorage = historyDataStorages.stream() .filter(AbstractHistoryDataStorage::isServerAvailable).max((o1, o2) -> { - if (o1 instanceof HistoryJpaDatabaseDataStorage) { + if (o1 instanceof JpaDatabaseDataStorage) { return -1; - } else if (o2 instanceof HistoryJpaDatabaseDataStorage) { + } else if (o2 instanceof JpaDatabaseDataStorage) { return 1; } else { return 0; diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/service/WarehouseServiceImpl.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/service/WarehouseServiceImpl.java index 69f3e61876e..63a0d802ccf 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/service/WarehouseServiceImpl.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/service/WarehouseServiceImpl.java @@ -21,8 +21,8 @@ import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.hertzbeat.common.entity.message.CollectRep; -import org.apache.hertzbeat.warehouse.store.AbstractRealTimeDataStorage; -import org.apache.hertzbeat.warehouse.store.RealTimeMemoryDataStorage; +import org.apache.hertzbeat.warehouse.store.realtime.AbstractRealTimeDataStorage; +import org.apache.hertzbeat.warehouse.store.realtime.memory.MemoryDataStorage; import org.springframework.stereotype.Service; /** @@ -43,9 +43,9 @@ public List queryMonitorMetricsData(Long monitorId) { AbstractRealTimeDataStorage realTimeDataStorage = realTimeDataStorages.stream() .filter(AbstractRealTimeDataStorage::isServerAvailable) .max((o1, o2) -> { - if (o1 instanceof RealTimeMemoryDataStorage) { + if (o1 instanceof MemoryDataStorage) { return -1; - } else if (o2 instanceof RealTimeMemoryDataStorage) { + } else if (o2 instanceof MemoryDataStorage) { return 1; } else { return 0; diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java index 2a6d84690e4..24e46f008eb 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java @@ -22,6 +22,10 @@ import org.apache.hertzbeat.common.entity.message.CollectRep; import org.apache.hertzbeat.common.queue.CommonDataQueue; import org.apache.hertzbeat.warehouse.WarehouseWorkerPool; +import org.apache.hertzbeat.warehouse.store.history.AbstractHistoryDataStorage; +import org.apache.hertzbeat.warehouse.store.history.jpa.JpaDatabaseDataStorage; +import org.apache.hertzbeat.warehouse.store.realtime.AbstractRealTimeDataStorage; +import org.apache.hertzbeat.warehouse.store.realtime.memory.MemoryDataStorage; import org.springframework.stereotype.Component; /** @@ -44,23 +48,28 @@ public DataStorageDispatch(CommonDataQueue commonDataQueue, this.workerPool = workerPool; this.historyDataStorages = historyDataStorages; this.realTimeDataStorages = realTimeDataStorages; - startStoragePersistentData(); - startStorageRealTimeData(); + startPersistentDataStorage(); + startRealTimeDataStorage(); } - private void startStorageRealTimeData() { + private void startRealTimeDataStorage() { + if (realTimeDataStorages == null || realTimeDataStorages.isEmpty()) { + log.info("no real time data storage start"); + return; + } + if (realTimeDataStorages.size() > 1) { + realTimeDataStorages.removeIf(MemoryDataStorage.class::isInstance); + } Runnable runnable = () -> { Thread.currentThread().setName("warehouse-realtime-data-storage"); - if (realTimeDataStorages != null && realTimeDataStorages.size() > 1) { - realTimeDataStorages.removeIf(item -> item instanceof RealTimeMemoryDataStorage); - } while (!Thread.currentThread().isInterrupted()) { try { CollectRep.MetricsData metricsData = commonDataQueue.pollMetricsDataToRealTimeStorage(); - if (metricsData != null && realTimeDataStorages != null) { - for (AbstractRealTimeDataStorage realTimeDataStorage : realTimeDataStorages) { - realTimeDataStorage.saveData(metricsData); - } + if (metricsData == null) { + continue; + } + for (AbstractRealTimeDataStorage realTimeDataStorage : realTimeDataStorages) { + realTimeDataStorage.saveData(metricsData); } } catch (Exception e) { log.error(e.getMessage(), e); @@ -70,19 +79,24 @@ private void startStorageRealTimeData() { workerPool.executeJob(runnable); } - protected void startStoragePersistentData() { + protected void startPersistentDataStorage() { + if (historyDataStorages == null || historyDataStorages.isEmpty()) { + log.info("no history data storage start"); + return; + } + if (historyDataStorages.size() > 1) { + historyDataStorages.removeIf(JpaDatabaseDataStorage.class::isInstance); + } Runnable runnable = () -> { Thread.currentThread().setName("warehouse-persistent-data-storage"); - if (historyDataStorages != null && historyDataStorages.size() > 1) { - historyDataStorages.removeIf(item -> item instanceof HistoryJpaDatabaseDataStorage); - } while (!Thread.currentThread().isInterrupted()) { try { CollectRep.MetricsData metricsData = commonDataQueue.pollMetricsDataToPersistentStorage(); - if (metricsData != null && historyDataStorages != null) { - for (AbstractHistoryDataStorage historyDataStorage : historyDataStorages) { - historyDataStorage.saveData(metricsData); - } + if (metricsData == null) { + continue; + } + for (AbstractHistoryDataStorage historyDataStorage : historyDataStorages) { + historyDataStorage.saveData(metricsData); } } catch (Exception e) { log.error(e.getMessage(), e); diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/AbstractHistoryDataStorage.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/AbstractHistoryDataStorage.java similarity index 95% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/AbstractHistoryDataStorage.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/AbstractHistoryDataStorage.java index f3b047de520..61a8ef5670e 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/AbstractHistoryDataStorage.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/AbstractHistoryDataStorage.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.store; +package org.apache.hertzbeat.warehouse.store.history; import java.util.List; import java.util.Map; @@ -42,7 +42,7 @@ public boolean isServerAvailable() { * save metrics data * @param metricsData metrics data */ - abstract void saveData(CollectRep.MetricsData metricsData); + public abstract void saveData(CollectRep.MetricsData metricsData); /** * query history range metrics data from tsdb diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryGrepTimeDbDataStorage.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GrepTimeDbDataStorage.java similarity index 97% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryGrepTimeDbDataStorage.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GrepTimeDbDataStorage.java index 06c52fa0f80..c0122b82897 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryGrepTimeDbDataStorage.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GrepTimeDbDataStorage.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.store; +package org.apache.hertzbeat.warehouse.store.history.greptime; import io.greptime.GreptimeDB; import io.greptime.models.ColumnDataType; @@ -53,7 +53,7 @@ import org.apache.hertzbeat.common.entity.message.CollectRep; import org.apache.hertzbeat.common.util.JsonUtil; import org.apache.hertzbeat.common.util.TimePeriodUtil; -import org.apache.hertzbeat.warehouse.config.store.greptime.GreptimeProperties; +import org.apache.hertzbeat.warehouse.store.history.AbstractHistoryDataStorage; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; @@ -61,10 +61,9 @@ * greptimeDB data storage */ @Component -@ConditionalOnProperty(prefix = "warehouse.store.greptime", - name = "enabled", havingValue = "true") +@ConditionalOnProperty(prefix = "warehouse.store.greptime", name = "enabled", havingValue = "true") @Slf4j -public class HistoryGrepTimeDbDataStorage extends AbstractHistoryDataStorage { +public class GrepTimeDbDataStorage extends AbstractHistoryDataStorage { /** * storage database @@ -83,7 +82,7 @@ public class HistoryGrepTimeDbDataStorage extends AbstractHistoryDataStorage { private static final String DATABASE_NOT_EXIST = "not exist"; private GreptimeDB greptimeDb; - public HistoryGrepTimeDbDataStorage(GreptimeProperties greptimeProperties) { + public GrepTimeDbDataStorage(GreptimeProperties greptimeProperties) { this.serverAvailable = this.initDbSession(greptimeProperties); } @@ -157,7 +156,7 @@ private boolean createDatabase() { } @Override - void saveData(CollectRep.MetricsData metricsData) { + public void saveData(CollectRep.MetricsData metricsData) { if (!isServerAvailable() || metricsData.getCode() != CollectRep.Code.SUCCESS) { return; } diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/greptime/GreptimeProperties.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GreptimeProperties.java similarity index 95% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/greptime/GreptimeProperties.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GreptimeProperties.java index b7d439d2166..0561af78d9c 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/greptime/GreptimeProperties.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GreptimeProperties.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.config.store.greptime; +package org.apache.hertzbeat.warehouse.store.history.greptime; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.bind.DefaultValue; diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryInfluxdbDataStorage.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/influxdb/InfluxdbDataStorage.java similarity index 97% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryInfluxdbDataStorage.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/influxdb/InfluxdbDataStorage.java index 522736c94f4..65aded4adc6 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryInfluxdbDataStorage.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/influxdb/InfluxdbDataStorage.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.store; +package org.apache.hertzbeat.warehouse.store.history.influxdb; import java.math.BigDecimal; import java.math.RoundingMode; @@ -40,7 +40,7 @@ import org.apache.hertzbeat.common.entity.dto.Value; import org.apache.hertzbeat.common.entity.message.CollectRep; import org.apache.hertzbeat.common.util.JsonUtil; -import org.apache.hertzbeat.warehouse.config.store.influxdb.InfluxdbProperties; +import org.apache.hertzbeat.warehouse.store.history.AbstractHistoryDataStorage; import org.apache.http.ssl.SSLContexts; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBFactory; @@ -55,10 +55,9 @@ * HistoryInfluxdbDataStorage class */ @Component -@ConditionalOnProperty(prefix = "warehouse.store.influxdb", - name = "enabled", havingValue = "true") +@ConditionalOnProperty(prefix = "warehouse.store.influxdb", name = "enabled", havingValue = "true") @Slf4j -public class HistoryInfluxdbDataStorage extends AbstractHistoryDataStorage { +public class InfluxdbDataStorage extends AbstractHistoryDataStorage { private static final String DATABASE = "hertzbeat"; @@ -79,7 +78,7 @@ public class HistoryInfluxdbDataStorage extends AbstractHistoryDataStorage { private InfluxDB influxDb; - public HistoryInfluxdbDataStorage(InfluxdbProperties influxdbProperties) { + public InfluxdbDataStorage(InfluxdbProperties influxdbProperties) { this.initInfluxDb(influxdbProperties); } @@ -139,7 +138,7 @@ private boolean createDatabase(InfluxdbProperties influxdbProperties) { } @Override - void saveData(CollectRep.MetricsData metricsData) { + public void saveData(CollectRep.MetricsData metricsData) { if (!isServerAvailable() || metricsData.getCode() != CollectRep.Code.SUCCESS) { return; } diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/influxdb/InfluxdbProperties.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/influxdb/InfluxdbProperties.java similarity index 95% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/influxdb/InfluxdbProperties.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/influxdb/InfluxdbProperties.java index 3dcbb220784..0e78e356b86 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/influxdb/InfluxdbProperties.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/influxdb/InfluxdbProperties.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.config.store.influxdb; +package org.apache.hertzbeat.warehouse.store.history.influxdb; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.bind.DefaultValue; diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryIotDbDataStorage.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/iotdb/IotDbDataStorage.java similarity index 97% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryIotDbDataStorage.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/iotdb/IotDbDataStorage.java index 39a9358eb59..44a2512b5d3 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryIotDbDataStorage.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/iotdb/IotDbDataStorage.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.store; +package org.apache.hertzbeat.warehouse.store.history.iotdb; import java.math.BigDecimal; import java.math.RoundingMode; @@ -29,8 +29,7 @@ import org.apache.hertzbeat.common.entity.dto.Value; import org.apache.hertzbeat.common.entity.message.CollectRep; import org.apache.hertzbeat.common.util.JsonUtil; -import org.apache.hertzbeat.warehouse.config.store.iotdb.IotDbProperties; -import org.apache.hertzbeat.warehouse.config.store.iotdb.IotDbVersion; +import org.apache.hertzbeat.warehouse.store.history.AbstractHistoryDataStorage; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.pool.SessionDataSetWrapper; @@ -46,10 +45,9 @@ * IoTDB data storage */ @Component -@ConditionalOnProperty(prefix = "warehouse.store.iot-db", - name = "enabled", havingValue = "true") +@ConditionalOnProperty(prefix = "warehouse.store.iot-db", name = "enabled", havingValue = "true") @Slf4j -public class HistoryIotDbDataStorage extends AbstractHistoryDataStorage { +public class IotDbDataStorage extends AbstractHistoryDataStorage { private static final String BACK_QUOTE = "`"; /** * set ttl never expire @@ -87,7 +85,7 @@ public class HistoryIotDbDataStorage extends AbstractHistoryDataStorage { private long queryTimeoutInMs; - public HistoryIotDbDataStorage(IotDbProperties iotDbProperties) { + public IotDbDataStorage(IotDbProperties iotDbProperties) { this.serverAvailable = this.initIotDbSession(iotDbProperties); } @@ -179,7 +177,7 @@ private void initTtl(String expireTime) { } @Override - void saveData(CollectRep.MetricsData metricsData) { + public void saveData(CollectRep.MetricsData metricsData) { if (!isServerAvailable() || metricsData.getCode() != CollectRep.Code.SUCCESS) { return; } diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/iotdb/IotDbProperties.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/iotdb/IotDbProperties.java similarity index 97% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/iotdb/IotDbProperties.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/iotdb/IotDbProperties.java index 28b964dbdf5..cc9e0e7a9e5 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/iotdb/IotDbProperties.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/iotdb/IotDbProperties.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.config.store.iotdb; +package org.apache.hertzbeat.warehouse.store.history.iotdb; import java.time.ZoneId; import java.util.List; diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/iotdb/IotDbVersion.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/iotdb/IotDbVersion.java similarity index 94% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/iotdb/IotDbVersion.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/iotdb/IotDbVersion.java index ae51e63b2de..3f756c7c7e3 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/iotdb/IotDbVersion.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/iotdb/IotDbVersion.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.config.store.iotdb; +package org.apache.hertzbeat.warehouse.store.history.iotdb; /** * IoTDB user version diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryJpaDatabaseDataStorage.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/jpa/JpaDatabaseDataStorage.java similarity index 96% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryJpaDatabaseDataStorage.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/jpa/JpaDatabaseDataStorage.java index 2d4f37a3b75..caea08df40c 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryJpaDatabaseDataStorage.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/jpa/JpaDatabaseDataStorage.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.store; +package org.apache.hertzbeat.warehouse.store.history.jpa; import com.google.common.util.concurrent.ThreadFactoryBuilder; import jakarta.persistence.criteria.Predicate; @@ -42,8 +42,8 @@ import org.apache.hertzbeat.common.entity.warehouse.History; import org.apache.hertzbeat.common.util.JsonUtil; import org.apache.hertzbeat.common.util.TimePeriodUtil; -import org.apache.hertzbeat.warehouse.config.store.jpa.JpaProperties; import org.apache.hertzbeat.warehouse.dao.HistoryDao; +import org.apache.hertzbeat.warehouse.store.history.AbstractHistoryDataStorage; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.domain.Sort; import org.springframework.data.jpa.domain.Specification; @@ -53,17 +53,16 @@ * data storage by mysql/h2 - jpa */ @Component -@ConditionalOnProperty(prefix = "warehouse.store.jpa", - name = "enabled", havingValue = "true") +@ConditionalOnProperty(prefix = "warehouse.store.jpa", name = "enabled", havingValue = "true") @Slf4j -public class HistoryJpaDatabaseDataStorage extends AbstractHistoryDataStorage { +public class JpaDatabaseDataStorage extends AbstractHistoryDataStorage { private final HistoryDao historyDao; private final JpaProperties jpaProperties; private static final int STRING_MAX_LENGTH = 1024; - public HistoryJpaDatabaseDataStorage(JpaProperties jpaProperties, - HistoryDao historyDao) { + public JpaDatabaseDataStorage(JpaProperties jpaProperties, + HistoryDao historyDao) { this.jpaProperties = jpaProperties; this.serverAvailable = true; this.historyDao = historyDao; @@ -116,7 +115,7 @@ public void expiredDataCleaner() { } @Override - void saveData(CollectRep.MetricsData metricsData) { + public void saveData(CollectRep.MetricsData metricsData) { if (metricsData.getCode() != CollectRep.Code.SUCCESS) { return; } diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/jpa/JpaProperties.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/jpa/JpaProperties.java similarity index 96% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/jpa/JpaProperties.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/jpa/JpaProperties.java index 986b824bf10..3b3d7a3b4eb 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/jpa/JpaProperties.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/jpa/JpaProperties.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.config.store.jpa; +package org.apache.hertzbeat.warehouse.store.history.jpa; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.bind.DefaultValue; diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryTdEngineDataStorage.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tdengine/TdEngineDataStorage.java similarity index 98% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryTdEngineDataStorage.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tdengine/TdEngineDataStorage.java index 18643b5d9a6..d50cf4c3c2e 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryTdEngineDataStorage.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tdengine/TdEngineDataStorage.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.store; +package org.apache.hertzbeat.warehouse.store.history.tdengine; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; @@ -37,7 +37,7 @@ import org.apache.hertzbeat.common.entity.dto.Value; import org.apache.hertzbeat.common.entity.message.CollectRep; import org.apache.hertzbeat.common.util.JsonUtil; -import org.apache.hertzbeat.warehouse.config.store.tdengine.TdEngineProperties; +import org.apache.hertzbeat.warehouse.store.history.AbstractHistoryDataStorage; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Component; @@ -50,7 +50,7 @@ @ConditionalOnProperty(prefix = "warehouse.store.td-engine", name = "enabled", havingValue = "true") @Slf4j -public class HistoryTdEngineDataStorage extends AbstractHistoryDataStorage { +public class TdEngineDataStorage extends AbstractHistoryDataStorage { private static final Pattern SQL_SPECIAL_STRING_PATTERN = Pattern.compile("(\\\\)|(')"); private static final String INSTANCE_NULL = "''"; @@ -71,7 +71,7 @@ public class HistoryTdEngineDataStorage extends AbstractHistoryDataStorage { private HikariDataSource hikariDataSource; private final int tableStrColumnDefineMaxLength; - public HistoryTdEngineDataStorage(TdEngineProperties tdEngineProperties) { + public TdEngineDataStorage(TdEngineProperties tdEngineProperties) { if (tdEngineProperties == null) { log.error("init error, please config Warehouse TdEngine props in application.yml"); throw new IllegalArgumentException("please config Warehouse TdEngine props"); diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/tdengine/TdEngineProperties.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tdengine/TdEngineProperties.java similarity index 96% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/tdengine/TdEngineProperties.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tdengine/TdEngineProperties.java index 313a8c239a0..d364d86b4c6 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/tdengine/TdEngineProperties.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tdengine/TdEngineProperties.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.config.store.tdengine; +package org.apache.hertzbeat.warehouse.store.history.tdengine; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.bind.DefaultValue; diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryVictoriaMetricsClusterDataStorage.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsClusterDataStorage.java similarity index 98% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryVictoriaMetricsClusterDataStorage.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsClusterDataStorage.java index 27d6a3ddb89..4a779c71584 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryVictoriaMetricsClusterDataStorage.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsClusterDataStorage.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.store; +package org.apache.hertzbeat.warehouse.store.history.vm; import com.fasterxml.jackson.databind.JsonNode; @@ -46,9 +46,7 @@ import org.apache.hertzbeat.common.util.CommonUtil; import org.apache.hertzbeat.common.util.JsonUtil; import org.apache.hertzbeat.common.util.TimePeriodUtil; -import org.apache.hertzbeat.warehouse.config.store.vm.cluster.VictoriaMetricsClusterProperties; -import org.apache.hertzbeat.warehouse.config.store.vm.cluster.VictoriaMetricsInsertProperties; -import org.apache.hertzbeat.warehouse.config.store.vm.cluster.VictoriaMetricsSelectProperties; +import org.apache.hertzbeat.warehouse.store.history.AbstractHistoryDataStorage; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Primary; import org.springframework.http.HttpEntity; @@ -69,7 +67,7 @@ @Component @ConditionalOnProperty(prefix = "warehouse.store.victoria-metrics.cluster", name = "enabled", havingValue = "true") @Slf4j -public class HistoryVictoriaMetricsClusterDataStorage extends AbstractHistoryDataStorage { +public class VictoriaMetricsClusterDataStorage extends AbstractHistoryDataStorage { private static final String IMPORT_PATH = "/api/v1/import"; private static final String EXPORT_PATH = "/api/v1/export"; @@ -90,8 +88,8 @@ public class HistoryVictoriaMetricsClusterDataStorage extends AbstractHistoryDat private final RestTemplate restTemplate; - public HistoryVictoriaMetricsClusterDataStorage(VictoriaMetricsClusterProperties vmClusterProps, - RestTemplate restTemplate) { + public VictoriaMetricsClusterDataStorage(VictoriaMetricsClusterProperties vmClusterProps, + RestTemplate restTemplate) { if (vmClusterProps == null) { log.error("init error, please config Warehouse victoriaMetrics cluster props in application.yml"); throw new IllegalArgumentException("please config Warehouse victoriaMetrics cluster props"); diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/vm/cluster/VictoriaMetricsClusterProperties.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsClusterProperties.java similarity index 94% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/vm/cluster/VictoriaMetricsClusterProperties.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsClusterProperties.java index a8e84770bc3..a89fead34b7 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/vm/cluster/VictoriaMetricsClusterProperties.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsClusterProperties.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.config.store.vm.cluster; +package org.apache.hertzbeat.warehouse.store.history.vm; import org.springframework.boot.context.properties.ConfigurationProperties; diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryVictoriaMetricsDataStorage.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsDataStorage.java similarity index 98% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryVictoriaMetricsDataStorage.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsDataStorage.java index f5e8dca383b..6a687fa2982 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/HistoryVictoriaMetricsDataStorage.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsDataStorage.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.store; +package org.apache.hertzbeat.warehouse.store.history.vm; import com.fasterxml.jackson.databind.JsonNode; import java.math.BigDecimal; @@ -45,7 +45,7 @@ import org.apache.hertzbeat.common.util.CommonUtil; import org.apache.hertzbeat.common.util.JsonUtil; import org.apache.hertzbeat.common.util.TimePeriodUtil; -import org.apache.hertzbeat.warehouse.config.store.vm.VictoriaMetricsProperties; +import org.apache.hertzbeat.warehouse.store.history.AbstractHistoryDataStorage; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Primary; import org.springframework.http.HttpEntity; @@ -63,10 +63,9 @@ */ @Primary @Component -@ConditionalOnProperty(prefix = "warehouse.store.victoria-metrics", - name = "enabled", havingValue = "true") +@ConditionalOnProperty(prefix = "warehouse.store.victoria-metrics", name = "enabled", havingValue = "true") @Slf4j -public class HistoryVictoriaMetricsDataStorage extends AbstractHistoryDataStorage { +public class VictoriaMetricsDataStorage extends AbstractHistoryDataStorage { private static final String IMPORT_PATH = "/api/v1/import"; /** @@ -91,7 +90,7 @@ public class HistoryVictoriaMetricsDataStorage extends AbstractHistoryDataStorag private final RestTemplate restTemplate; - public HistoryVictoriaMetricsDataStorage(VictoriaMetricsProperties victoriaMetricsProperties, RestTemplate restTemplate) { + public VictoriaMetricsDataStorage(VictoriaMetricsProperties victoriaMetricsProperties, RestTemplate restTemplate) { if (victoriaMetricsProperties == null) { log.error("init error, please config Warehouse victoriaMetrics props in application.yml"); throw new IllegalArgumentException("please config Warehouse victoriaMetrics props"); diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/vm/cluster/VictoriaMetricsInsertProperties.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsInsertProperties.java similarity index 93% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/vm/cluster/VictoriaMetricsInsertProperties.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsInsertProperties.java index 55ccb00bdd3..be6170ac75d 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/vm/cluster/VictoriaMetricsInsertProperties.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsInsertProperties.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.config.store.vm.cluster; +package org.apache.hertzbeat.warehouse.store.history.vm; /** * vminsert configuration information diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/vm/VictoriaMetricsProperties.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsProperties.java similarity index 95% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/vm/VictoriaMetricsProperties.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsProperties.java index f6cf555e3c1..563458511db 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/vm/VictoriaMetricsProperties.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsProperties.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.config.store.vm; +package org.apache.hertzbeat.warehouse.store.history.vm; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.bind.DefaultValue; diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/vm/cluster/VictoriaMetricsSelectProperties.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsSelectProperties.java similarity index 93% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/vm/cluster/VictoriaMetricsSelectProperties.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsSelectProperties.java index 40326414de4..dad163617b4 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/vm/cluster/VictoriaMetricsSelectProperties.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/vm/VictoriaMetricsSelectProperties.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.config.store.vm.cluster; +package org.apache.hertzbeat.warehouse.store.history.vm; /** * vmselect configuration information diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/AbstractRealTimeDataStorage.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/AbstractRealTimeDataStorage.java similarity index 93% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/AbstractRealTimeDataStorage.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/AbstractRealTimeDataStorage.java index 6973cb640c0..d5f631c5369 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/AbstractRealTimeDataStorage.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/AbstractRealTimeDataStorage.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.store; +package org.apache.hertzbeat.warehouse.store.realtime; import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -42,7 +42,7 @@ public boolean isServerAvailable() { * save collect metrics data * @param metricsData metrics data */ - abstract void saveData(CollectRep.MetricsData metricsData); + public abstract void saveData(CollectRep.MetricsData metricsData); /** * query real-time last metrics data diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/RealTimeMemoryDataStorage.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/memory/MemoryDataStorage.java similarity index 89% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/RealTimeMemoryDataStorage.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/memory/MemoryDataStorage.java index efcf6fe6175..9f28bcf782d 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/RealTimeMemoryDataStorage.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/memory/MemoryDataStorage.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.store; +package org.apache.hertzbeat.warehouse.store.realtime.memory; import java.util.ArrayList; import java.util.List; @@ -23,7 +23,7 @@ import java.util.concurrent.ConcurrentHashMap; import lombok.extern.slf4j.Slf4j; import org.apache.hertzbeat.common.entity.message.CollectRep; -import org.apache.hertzbeat.warehouse.config.store.memory.MemoryProperties; +import org.apache.hertzbeat.warehouse.store.realtime.AbstractRealTimeDataStorage; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.lang.NonNull; import org.springframework.stereotype.Component; @@ -32,10 +32,9 @@ * Store and collect real-time data - memory */ @Component -@ConditionalOnProperty(prefix = "warehouse.store.memory", - name = "enabled", havingValue = "true", matchIfMissing = true) +@ConditionalOnProperty(prefix = "warehouse.store.memory", name = "enabled", havingValue = "true", matchIfMissing = true) @Slf4j -public class RealTimeMemoryDataStorage extends AbstractRealTimeDataStorage { +public class MemoryDataStorage extends AbstractRealTimeDataStorage { /** * monitorId -> metricsName -> data @@ -44,7 +43,7 @@ public class RealTimeMemoryDataStorage extends AbstractRealTimeDataStorage { private static final Integer DEFAULT_INIT_SIZE = 16; private static final Integer METRICS_SIZE = 8; - public RealTimeMemoryDataStorage(MemoryProperties memoryProperties) { + public MemoryDataStorage(MemoryProperties memoryProperties) { int initSize = DEFAULT_INIT_SIZE; if (memoryProperties != null && memoryProperties.initSize() != null) { diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/memory/MemoryProperties.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/memory/MemoryProperties.java similarity index 95% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/memory/MemoryProperties.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/memory/MemoryProperties.java index 12003e25071..8e5dd7acd7a 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/memory/MemoryProperties.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/memory/MemoryProperties.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.config.store.memory; +package org.apache.hertzbeat.warehouse.store.realtime.memory; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.bind.DefaultValue; diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/MetricsDataRedisCodec.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/redis/MetricsDataRedisCodec.java similarity index 96% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/MetricsDataRedisCodec.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/redis/MetricsDataRedisCodec.java index d3b8318b064..7248b2bfad4 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/MetricsDataRedisCodec.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/redis/MetricsDataRedisCodec.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.store; +package org.apache.hertzbeat.warehouse.store.realtime.redis; import io.lettuce.core.codec.RedisCodec; import java.nio.ByteBuffer; diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/RealTimeRedisDataStorage.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/redis/RedisDataStorage.java similarity index 92% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/RealTimeRedisDataStorage.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/redis/RedisDataStorage.java index e5f25ad10fc..b9a08953dd7 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/RealTimeRedisDataStorage.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/redis/RedisDataStorage.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.store; +package org.apache.hertzbeat.warehouse.store.realtime.redis; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisURI; @@ -29,7 +29,7 @@ import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.hertzbeat.common.entity.message.CollectRep; -import org.apache.hertzbeat.warehouse.config.store.redis.RedisProperties; +import org.apache.hertzbeat.warehouse.store.realtime.AbstractRealTimeDataStorage; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Primary; import org.springframework.lang.NonNull; @@ -40,16 +40,15 @@ */ @Primary @Component -@ConditionalOnProperty(prefix = "warehouse.store.redis", - name = "enabled", havingValue = "true") +@ConditionalOnProperty(prefix = "warehouse.store.redis", name = "enabled", havingValue = "true") @Slf4j -public class RealTimeRedisDataStorage extends AbstractRealTimeDataStorage { +public class RedisDataStorage extends AbstractRealTimeDataStorage { private RedisClient redisClient; private final Integer db; private StatefulRedisConnection connection; - public RealTimeRedisDataStorage(RedisProperties redisProperties) { + public RedisDataStorage(RedisProperties redisProperties) { this.serverAvailable = initRedisClient(redisProperties); this.db = getRedisSelectDb(redisProperties); } diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/redis/RedisProperties.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/redis/RedisProperties.java similarity index 95% rename from warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/redis/RedisProperties.java rename to warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/redis/RedisProperties.java index 93f2d5ed9b9..5263c178408 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/config/store/redis/RedisProperties.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/redis/RedisProperties.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.warehouse.config.store.redis; +package org.apache.hertzbeat.warehouse.store.realtime.redis; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.bind.DefaultValue; diff --git a/warehouse/src/main/resources/META-INF/spring.factories b/warehouse/src/main/resources/META-INF/spring.factories index 64179dd3ecc..f155fb8251a 100644 --- a/warehouse/src/main/resources/META-INF/spring.factories +++ b/warehouse/src/main/resources/META-INF/spring.factories @@ -14,4 +14,4 @@ # limitations under the License. org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ -org.apache.hertzbeat.warehouse.config.WarehouseAutoConfiguration \ No newline at end of file +org.apache.hertzbeat.warehouse.WarehouseAutoConfiguration \ No newline at end of file diff --git a/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java b/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java index 6d8c07134e5..b9738472977 100644 --- a/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java +++ b/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java @@ -33,8 +33,8 @@ import org.apache.hertzbeat.common.constants.CommonConstants; import org.apache.hertzbeat.common.entity.dto.Value; import org.apache.hertzbeat.common.entity.message.CollectRep; -import org.apache.hertzbeat.warehouse.store.AbstractHistoryDataStorage; -import org.apache.hertzbeat.warehouse.store.AbstractRealTimeDataStorage; +import org.apache.hertzbeat.warehouse.store.history.AbstractHistoryDataStorage; +import org.apache.hertzbeat.warehouse.store.realtime.AbstractRealTimeDataStorage; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; diff --git a/warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/RealTimeRedisDataStorageTest.java b/warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/MemoryDataStorageTest.java similarity index 87% rename from warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/RealTimeRedisDataStorageTest.java rename to warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/MemoryDataStorageTest.java index f5140f58a7b..ba3a206621b 100644 --- a/warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/RealTimeRedisDataStorageTest.java +++ b/warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/MemoryDataStorageTest.java @@ -17,13 +17,14 @@ package org.apache.hertzbeat.warehouse.store; +import org.apache.hertzbeat.warehouse.store.realtime.memory.MemoryDataStorage; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; /** - * Test case for {@link RealTimeRedisDataStorage} + * Test case for {@link MemoryDataStorage} */ -class RealTimeRedisDataStorageTest { +class MemoryDataStorageTest { @BeforeEach void setUp() { diff --git a/warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/MetricsDataRedisCodecTest.java b/warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/MetricsDataRedisCodecTest.java index 81bef4e6be0..dd265d8d682 100644 --- a/warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/MetricsDataRedisCodecTest.java +++ b/warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/MetricsDataRedisCodecTest.java @@ -17,6 +17,7 @@ package org.apache.hertzbeat.warehouse.store; +import org.apache.hertzbeat.warehouse.store.realtime.redis.MetricsDataRedisCodec; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/RealTimeMemoryDataStorageTest.java b/warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/RedisDataStorageTest.java similarity index 87% rename from warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/RealTimeMemoryDataStorageTest.java rename to warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/RedisDataStorageTest.java index bd32658a59e..2beae5064ec 100644 --- a/warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/RealTimeMemoryDataStorageTest.java +++ b/warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/RedisDataStorageTest.java @@ -17,13 +17,14 @@ package org.apache.hertzbeat.warehouse.store; +import org.apache.hertzbeat.warehouse.store.realtime.redis.RedisDataStorage; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; /** - * Test case for {@link RealTimeMemoryDataStorage} + * Test case for {@link RedisDataStorage} */ -class RealTimeMemoryDataStorageTest { +class RedisDataStorageTest { @BeforeEach void setUp() { diff --git a/warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/HistoryTdEngineDataStorageTest.java b/warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/TdEngineDataStorageTest.java similarity index 88% rename from warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/HistoryTdEngineDataStorageTest.java rename to warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/TdEngineDataStorageTest.java index 4869faab0f2..dc1cfdef22a 100644 --- a/warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/HistoryTdEngineDataStorageTest.java +++ b/warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/TdEngineDataStorageTest.java @@ -17,13 +17,14 @@ package org.apache.hertzbeat.warehouse.store; +import org.apache.hertzbeat.warehouse.store.history.tdengine.TdEngineDataStorage; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; /** - * Test case for {@link HistoryTdEngineDataStorage} + * Test case for {@link TdEngineDataStorage} */ -class HistoryTdEngineDataStorageTest { +class TdEngineDataStorageTest { @BeforeEach void setUp() { From da17a9d98dd0771d290b012b904317cfdeb04bce Mon Sep 17 00:00:00 2001 From: aias00 Date: Sat, 4 May 2024 10:55:17 +0800 Subject: [PATCH 4/7] [bugfix] fix customized menu invalid bug #1898 (#1908) Co-authored-by: tomsun28 --- .../hertzbeat/manager/service/impl/AppServiceImpl.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/AppServiceImpl.java b/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/AppServiceImpl.java index e44dccd0282..a8973986888 100644 --- a/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/AppServiceImpl.java +++ b/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/AppServiceImpl.java @@ -374,6 +374,13 @@ public void applyMonitorDefineYml(String ymlContent, boolean isModify) { // app params verify verifyDefineAppContent(app, isModify); appDefineStore.save(app.getApp(), ymlContent); + // get and reset hide value + Job originalJob = appDefines.get(app.getApp().toLowerCase()); + if (Objects.nonNull(originalJob)) { + boolean hide = originalJob.isHide(); + app.setHide(hide); + } + appDefines.put(app.getApp().toLowerCase(), app); // resolve: after the template is modified, all monitoring instances of the same type of template need to be reissued in the task status SpringContextHolder.getBean(MonitorService.class).updateAppCollectJob(app); From 472604c883006ed352eab69de675a8caed8168dd Mon Sep 17 00:00:00 2001 From: aias00 Date: Sat, 4 May 2024 11:07:14 +0800 Subject: [PATCH 5/7] [bugfix] fix HTTP API bug #1895 (#1909) --- .../hertzbeat/collector/collect/http/HttpCollectImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/collector/src/main/java/org/apache/hertzbeat/collector/collect/http/HttpCollectImpl.java b/collector/src/main/java/org/apache/hertzbeat/collector/collect/http/HttpCollectImpl.java index 62f39fa221a..6194cfeb67a 100644 --- a/collector/src/main/java/org/apache/hertzbeat/collector/collect/http/HttpCollectImpl.java +++ b/collector/src/main/java/org/apache/hertzbeat/collector/collect/http/HttpCollectImpl.java @@ -199,8 +199,8 @@ private void validateParams(Metrics metrics) throws Exception { HttpProtocol httpProtocol = metrics.getHttp(); String url = httpProtocol.getUrl(); - if (StringUtils.hasText(url) || !url.startsWith(RIGHT_DASH)) { - httpProtocol.setUrl(url == null ? RIGHT_DASH : RIGHT_DASH + url.trim()); + if (!StringUtils.hasText(url) || !url.startsWith(RIGHT_DASH)) { + httpProtocol.setUrl(StringUtils.hasText(url) ? RIGHT_DASH + url.trim() : RIGHT_DASH); } if (CollectionUtils.isEmpty(httpProtocol.getSuccessCodes())) { From 29badecf7b355c32748b9b97397e49aef965e1f3 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Sat, 4 May 2024 16:57:22 +0800 Subject: [PATCH 6/7] [test] add test for WebsocketCollectImpl (#1912) --- .../websocket/WebsocketCollectImpl.java | 8 +- .../websocket/WebsocketCollectImplTest.java | 100 ++++++++++++++++++ 2 files changed, 104 insertions(+), 4 deletions(-) create mode 100644 collector/src/test/java/org/apache/hertzbeat/collector/collect/websocket/WebsocketCollectImplTest.java diff --git a/collector/src/main/java/org/apache/hertzbeat/collector/collect/websocket/WebsocketCollectImpl.java b/collector/src/main/java/org/apache/hertzbeat/collector/collect/websocket/WebsocketCollectImpl.java index 1f59f761a5b..411683b936e 100644 --- a/collector/src/main/java/org/apache/hertzbeat/collector/collect/websocket/WebsocketCollectImpl.java +++ b/collector/src/main/java/org/apache/hertzbeat/collector/collect/websocket/WebsocketCollectImpl.java @@ -123,7 +123,7 @@ public String supportProtocol() { return DispatchConstants.PROTOCOL_WEBSOCKET; } - private static void send(OutputStream out, WebsocketProtocol websocketProtocol) throws IOException { + private void send(OutputStream out, WebsocketProtocol websocketProtocol) throws IOException { byte[] key = generateRandomKey(); String base64Key = base64Encode(key); String requestLine = "GET " + websocketProtocol.getPath() + " HTTP/1.1\r\n"; @@ -141,7 +141,7 @@ private static void send(OutputStream out, WebsocketProtocol websocketProtocol) } // Read response headers - private static Map readHeaders(InputStream in) throws IOException { + private Map readHeaders(InputStream in) throws IOException { Map map = new HashMap<>(8); BufferedReader reader = new BufferedReader(new InputStreamReader(in)); @@ -173,7 +173,7 @@ private static Map readHeaders(InputStream in) throws IOExceptio return map; } - private static byte[] generateRandomKey() { + private byte[] generateRandomKey() { SecureRandom secureRandom = new SecureRandom(); byte[] key = new byte[16]; secureRandom.nextBytes(key); @@ -186,7 +186,7 @@ private void checkParam(WebsocketProtocol protocol) { Assert.hasText(protocol.getPath(), "Websocket Protocol path is required."); } - private static String base64Encode(byte[] data) { + private String base64Encode(byte[] data) { return Base64.getEncoder().encodeToString(data); } } diff --git a/collector/src/test/java/org/apache/hertzbeat/collector/collect/websocket/WebsocketCollectImplTest.java b/collector/src/test/java/org/apache/hertzbeat/collector/collect/websocket/WebsocketCollectImplTest.java new file mode 100644 index 00000000000..b7ce643591a --- /dev/null +++ b/collector/src/test/java/org/apache/hertzbeat/collector/collect/websocket/WebsocketCollectImplTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.collector.collect.websocket; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.hertzbeat.common.entity.job.Metrics; +import org.apache.hertzbeat.common.entity.job.protocol.WebsocketProtocol; +import org.apache.hertzbeat.common.entity.message.CollectRep; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.MockedConstruction; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +/** + * Test case for {@link WebsocketCollectImpl} + */ +@ExtendWith(MockitoExtension.class) +class WebsocketCollectImplTest { + + @InjectMocks + private WebsocketCollectImpl websocketCollectImpl; + + @Test + void testCollect() { + CollectRep.MetricsData.Builder builder = CollectRep.MetricsData.newBuilder(); + WebsocketProtocol websocketProtocol = WebsocketProtocol.builder() + .host("127.0.0.1") + .path("/") + .port("80") + .build(); + + String httpResponse = """ + HTTP/1.1 200 OK\r + Content-Type: text/html; charset=UTF-8\r + Content-Length: 1234\r + Date: Sat, 4 May 2024 12:00:00 GMT\r + Connection: close\r + \r + """; + byte[] responseBytes = httpResponse.getBytes(StandardCharsets.UTF_8); + InputStream inputStream = new ByteArrayInputStream(responseBytes); + + MockedConstruction socketMockedConstruction = + Mockito.mockConstruction(Socket.class, (socket, context) -> { + OutputStream out = Mockito.mock(OutputStream.class); + Mockito.doNothing().when(socket).connect(Mockito.any(SocketAddress.class)); + Mockito.when(socket.isConnected()).thenReturn(true); + Mockito.when(socket.getOutputStream()).thenReturn(out); + Mockito.doNothing().when(out).write(Mockito.any()); + Mockito.doNothing().when(out).flush(); + Mockito.when(socket.getInputStream()).thenReturn(inputStream); + + }); + + + List aliasField = new ArrayList<>(); + aliasField.add("httpVersion"); + aliasField.add("responseTime"); + aliasField.add("responseCode"); + Metrics metrics = new Metrics(); + metrics.setWebsocket(websocketProtocol); + metrics.setAliasFields(aliasField); + websocketCollectImpl.collect(builder, 1L, "test", metrics); + assertEquals(builder.getValuesCount(), 1); + for (CollectRep.ValueRow valueRow : builder.getValuesList()) { + assertEquals(valueRow.getColumns(0), "HTTP/1.1"); + assertNotNull(valueRow.getColumns(1)); + assertEquals(valueRow.getColumns(2), "200"); + } + + socketMockedConstruction.close(); + } + +} \ No newline at end of file From 9a59243690053aff2448036106c98851eb662702 Mon Sep 17 00:00:00 2001 From: zhangshenghang Date: Wed, 8 May 2024 17:41:03 +0800 Subject: [PATCH 7/7] [feature]add apache yarn monitoring --- home/docs/help/yarn.md | 83 ++++ .../current/help/yarn.md | 83 ++++ home/sidebars.json | 1 + .../src/main/resources/define/app-yarn.yml | 469 ++++++++++++++++++ 4 files changed, 636 insertions(+) create mode 100644 home/docs/help/yarn.md create mode 100644 home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/yarn.md create mode 100644 manager/src/main/resources/define/app-yarn.yml diff --git a/home/docs/help/yarn.md b/home/docs/help/yarn.md new file mode 100644 index 00000000000..176a3209fee --- /dev/null +++ b/home/docs/help/yarn.md @@ -0,0 +1,83 @@ +--- +id: yarn +title: Monitoring Apache Yarn Monitoring +sidebar_label: Apache Yarn +keywords: [Big Data Monitoring System, Apache Yarn Monitoring, ResourceManager Monitoring] +--- + +> Hertzbeat monitors Apache Yarn node monitoring metrics. + +**Protocol Used: HTTP** + +## Pre-monitoring Actions + +Retrieve the HTTP monitoring port of Apache Yarn. Value: `yarn.resourcemanager.webapp.address` + +## Configuration Parameters + +| Parameter Name | Parameter Description | +| ---------------- |----------------------------------------------------| +| Target Host | IP address, IPV6, or domain name of the monitored endpoint. Without protocol header. | +| Port | Monitoring port number of Apache Yarn, default is 8088. | +| Query Timeout | Timeout for querying Apache Yarn, in milliseconds, default is 6000 milliseconds. | +| Metrics Interval | Time interval for monitoring data collection, in seconds, minimum interval is 30 seconds. | + +### Collected Metrics + +#### Metric Set: ClusterMetrics + +| Metric Name | Unit | Metric Description | +| ----------------------- | ---- | -----------------------------------------| +| NumActiveNMs | | Number of currently active NodeManagers | +| NumDecommissionedNMs | | Number of currently decommissioned NodeManagers | +| NumDecommissioningNMs | | Number of nodes currently decommissioning | +| NumLostNMs | | Number of lost nodes in the cluster | +| NumUnhealthyNMs | | Number of unhealthy nodes in the cluster | + +#### Metric Set: JvmMetrics + +| Metric Name | Unit | Metric Description | +| ----------------------- | ---- | -------------------------------------------- | +| MemNonHeapCommittedM | MB | Current committed size of non-heap memory in JVM | +| MemNonHeapMaxM | MB | Maximum available non-heap memory in JVM | +| MemNonHeapUsedM | MB | Current used size of non-heap memory in JVM | +| MemHeapCommittedM | MB | Current committed size of heap memory in JVM | +| MemHeapMaxM | MB | Maximum available heap memory in JVM | +| MemHeapUsedM | MB | Current used size of heap memory in JVM | +| GcTimeMillis | | JVM GC time | +| GcCount | | Number of JVM GC occurrences | + +#### Metric Set: QueueMetrics + +| Metric Name | Unit | Metric Description | +| --------------------------- | ---- | -------------------------------------------- | +| queue | | Queue name | +| AllocatedVCores | | Allocated virtual cores (allocated) | +| ReservedVCores | | Reserved cores | +| AvailableVCores | | Available cores (unallocated) | +| PendingVCores | | Blocked scheduling cores | +| AllocatedMB | MB | Allocated (used) memory size | +| AvailableMB | MB | Available memory (unallocated) | +| PendingMB | MB | Blocked scheduling memory | +| ReservedMB | MB | Reserved memory | +| AllocatedContainers | | Number of allocated (used) containers | +| PendingContainers | | Number of blocked scheduling containers | +| ReservedContainers | | Number of reserved containers | +| AggregateContainersAllocated| | Total aggregated containers allocated | +| AggregateContainersReleased| | Total aggregated containers released | +| AppsCompleted | | Number of completed applications | +| AppsKilled | | Number of killed applications | +| AppsFailed | | Number of failed applications | +| AppsPending | | Number of pending applications | +| AppsRunning | | Number of currently running applications | +| AppsSubmitted | | Number of submitted applications | +| running_0 | | Number of jobs running for less than 60 minutes | +| running_60 | | Number of jobs running between 60 and 300 minutes | +| running_300 | | Number of jobs running between 300 and 1440 minutes | +| running_1440 | | Number of jobs running for more than 1440 minutes | + +#### Metric Set: runtime + +| Metric Name | Unit | Metric Description | +| ----------------------- | ---- | --------------------------| +| StartTime | | Startup timestamp | \ No newline at end of file diff --git a/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/yarn.md b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/yarn.md new file mode 100644 index 00000000000..2c88fe1e5a9 --- /dev/null +++ b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/yarn.md @@ -0,0 +1,83 @@ +--- +id: yarn +title: 监控:Apache Yarn监控 +sidebar_label: Apache Yarn +keywords: [大数据监控系统, Apache Yarn监控, 资源管理器监控] +--- + +> Hertzbeat 对 Apache Yarn 节点监控指标进行监控。 + +**使用协议:HTTP** + +## 监控前操作 + +获取 Apache Yarn 的 HTTP 监控端口。 取值:`yarn.resourcemanager.webapp.address` + +## 配置参数 + +| 参数名称 | 参数帮助描述 | +| ---------------- |---------------------------------------| +| 目标Host | 被监控的对端IPV4,IPV6或域名。不带协议头。 | +| 端口 | Apache Yarn 的监控端口号,默认为8088。 | +| 查询超时时间 | 查询 Apache Yarn 的超时时间,单位毫秒,默认6000毫秒。 | +| 指标采集间隔 | 监控数据采集的时间间隔,单位秒,最小间隔为30秒。 | + +### 采集指标 + +#### 指标集合:ClusterMetrics + +| 指标名称 | 指标单位 | 指标帮助描述 | +| -------------------- | -------- | ---------------------------------- | +| NumActiveNMs | | 当前存活的 NodeManager 个数 | +| NumDecommissionedNMs | | 当前 Decommissioned 的 NodeManager 个数 | +| NumDecommissioningNMs| | 集群正在下线的节点数 | +| NumLostNMs | | 集群丢失的节点数 | +| NumUnhealthyNMs | | 集群不健康的节点数 | + +#### 指标集合:JvmMetrics + +| 指标名称 | 指标单位 | 指标帮助描述 | +| -------------------- | -------- | ------------------------------------ | +| MemNonHeapCommittedM | MB | JVM当前非堆内存大小已提交大小 | +| MemNonHeapMaxM | MB | JVM非堆最大可用内存 | +| MemNonHeapUsedM | MB | JVM当前已使用的非堆内存大小 | +| MemHeapCommittedM | MB | JVM当前已使用堆内存大小 | +| MemHeapMaxM | MB | JVM堆内存最大可用内存 | +| MemHeapUsedM | MB | JVM当前已使用堆内存大小 | +| GcTimeMillis | | JVM GC时间 | +| GcCount | | JVM GC次数 | + +#### 指标集合:QueueMetrics + +| 指标名称 | 指标单位 | 指标帮助描述 | +| ------------------------ | -------- | ------------------------------------ | +| queue | | 队列名称 | +| AllocatedVCores | | 分配的虚拟核数(已分配) | +| ReservedVCores | | 预留核数 | +| AvailableVCores | | 可用核数(尚未分配) | +| PendingVCores | | 阻塞调度核数 | +| AllocatedMB | MB | 已分配(已用)的内存大小 | +| AvailableMB | MB | 可用内存(尚未分配) | +| PendingMB | MB | 阻塞调度内存 | +| ReservedMB | MB | 预留内存 | +| AllocatedContainers | | 已分配(已用)的container数 | +| PendingContainers | | 阻塞调度container个数 | +| ReservedContainers | | 预留container数 | +| AggregateContainersAllocated | | 累积的container分配总数 | +| AggregateContainersReleased | | 累积的container释放总数 | +| AppsCompleted | | 完成的任务数 | +| AppsKilled | | 被杀掉的任务数 | +| AppsFailed | | 失败的任务数 | +| AppsPending | | 阻塞的任务数 | +| AppsRunning | | 提正在运行的任务数 | +| AppsSubmitted | | 提交过的任务数 | +| running_0 | | 运行时间小于60分钟的作业个数 | +| running_60 | | 运行时间介于60~300分钟的作业个数 | +| running_300 | | 运行时间介于300~1440分钟的作业个数 | +| running_1440 | | 运行时间大于1440分钟的作业个数 | + +#### 指标集合:runtime + +| 指标名称 | 指标单位 | 指标帮助描述 | +| -------------------- | -------- | ---------------------------- | +| StartTime | | 启动时间戳 | diff --git a/home/sidebars.json b/home/sidebars.json index f84b92cffe3..0a0ecb1f5a4 100755 --- a/home/sidebars.json +++ b/home/sidebars.json @@ -229,6 +229,7 @@ "help/doris_be", "help/doris_fe", "help/hadoop", + "help/yarn", "help/hbase_master", "help/hbase_regionserver", "help/hdfs_namenode", diff --git a/manager/src/main/resources/define/app-yarn.yml b/manager/src/main/resources/define/app-yarn.yml new file mode 100644 index 00000000000..24886b22364 --- /dev/null +++ b/manager/src/main/resources/define/app-yarn.yml @@ -0,0 +1,469 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The monitoring type category:service-application service monitoring db-database monitoring custom-custom monitoring os-operating system monitoring +category: bigdata +# The monitoring type eg: linux windows tomcat mysql aws... +app: yarn +# The monitoring i18n name +name: + zh-CN: Apache Yarn + en-US: Apache Yarn +# The description and help of this monitoring type +help: + zh-CN: Hertzbeat 对 Yarn 节点监控指标进行监控。
您可以点击 “新建 Apache Yarn” 并进行配置,或者选择“更多操作”,导入已有配置。 + en-US: Hertzbeat monitors the Yarn metrics.
You can click "New Apache Yarn" to configure, or select "More Actions" to import an existing configuration. + zh-TW: Hertzbeat 對 Yarn 節點監控指標進行監控。
您可以點擊 “新建 Apache Yarn” 並進行配置,或者選擇“更多操作”,導入已有配置。 + +helpLink: + zh-CN: https://hertzbeat.apache.org/zh-cn/docs/help/yarn/ + en-US: https://hertzbeat.apache.org/docs/help/yarn/ +# Input params define for monitoring(render web ui by the definition) +params: + # field-param field key + - field: host + # name-param field display i18n name + name: + zh-CN: 目标Host + en-US: Target Host + # type-param field type(most mapping the html input type) + type: host + # required-true or false + required: true + # field-param field key + - field: port + # name-param field display i18n name + name: + zh-CN: 端口 + en-US: Port + # type-param field type(most mapping the html input type) + type: number + # when type is number, range is required + range: '[0,65535]' + # required-true or false + required: true + # default value + defaultValue: 8088 + # field-param field key + - field: timeout + # name-param field display i18n name + name: + zh-CN: 查询超时时间 + en-US: Query Timeout + # type-param field type(most mapping the html input type) + type: number + # required-true or false + required: false + # hide param-true or false + hide: true + # default value + defaultValue: 6000 +# collect metrics config list +metrics: + # metrics - Server + - name: ClusterMetrics + # metrics scheduling priority(0->127)->(high->low), metrics with the same priority will be scheduled in parallel + # priority 0's metrics is availability metrics, it will be scheduled first, only availability metrics collect success will the scheduling continue + priority: 0 + # collect metrics content + fields: + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: NumActiveNMs + type: 0 + i18n: + zh-CN: 当前存活的 NodeManager 个数 + en-US: NumActiveNMs + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: NumDecommissionedNMs + type: 0 + i18n: + zh-CN: 当前 Decommissioned 的 NodeManager 个数 + en-US: NumDecommissionedNMs + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: NumDecommissioningNMs + type: 0 + i18n: + zh-CN: 集群正在下线的节点数 + en-US: NumDecommissioningNMs + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: NumLostNMs + type: 0 + i18n: + zh-CN: 集群丢失的节点数 + en-US: NumLostNMs + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: NumUnhealthyNMs + type: 0 + i18n: + zh-CN: 集群不健康的节点数 + en-US: NumUnhealthyNMs + # (optional)metrics field alias name, it is used as an alias field to map and convert the collected data and metrics field + aliasFields: + - $.NumActiveNMs + - $.NumDecommissionedNMs + - $.NumDecommissioningNMs + - $.NumLostNMs + - $.NumUnhealthyNMs + - $.RpcProcessingTimeAvgTime + - $.CallQueueLength + calculates: + - NumActiveNMs=$.NumActiveNMs + - NumDecommissionedNMs=$.NumDecommissionedNMs + - NumDecommissioningNMs=$.NumDecommissioningNMs + - NumLostNMs=$.NumLostNMs + - NumUnhealthyNMs=$.NumUnhealthyNMs + - RpcProcessingTimeAvgTime=$.RpcProcessingTimeAvgTime + - CallQueueLength=$.CallQueueLength + protocol: http + http: + host: ^_^host^_^ + port: ^_^port^_^ + url: /jmx + method: GET + ssl: ^_^ssl^_^ + parseType: jsonPath + parseScript: '$.beans[?(@.name == "Hadoop:service=ResourceManager,name=ClusterMetrics")]' + - name: JvmMetrics + # metrics scheduling priority(0->127)->(high->low), metrics with the same priority will be scheduled in parallel + # priority 0's metrics is availability metrics, it will be scheduled first, only availability metrics collect success will the scheduling continue + priority: 0 + # collect metrics content + # collect metrics content + fields: + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: MemNonHeapCommittedM + type: 0 + unit: MB + i18n: + zh-CN: JVM当前非堆内存大小已提交大小 + en-US: MemNonHeapCommittedM + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: MemNonHeapMaxM + type: 0 + unit: MB + i18n: + zh-CN: JVM非堆最大可用内存 + en-US: MemNonHeapMaxM + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: MemNonHeapUsedM + type: 0 + unit: MB + i18n: + zh-CN: JVM当前已使用的非堆内存大小 + en-US: MemNonHeapUsedM + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: MemHeapCommittedM + type: 0 + unit: MB + i18n: + zh-CN: JVM当前已使用堆内存大小 + en-US: MemHeapCommittedM + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: MemHeapMaxM + type: 0 + unit: MB + i18n: + zh-CN: JVM堆内存最大可用内存 + en-US: MemHeapMaxM + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: MemHeapUsedM + type: 0 + unit: MB + i18n: + zh-CN: JVM当前已使用堆内存大小 + en-US: MemHeapUsedM + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: GcTimeMillis + type: 0 + i18n: + zh-CN: JVM GC时间 + en-US: GcTimeMillis + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: GcCount + type: 0 + i18n: + zh-CN: JVM GC次数 + en-US: GcCount + # (optional)metrics field alias name, it is used as an alias field to map and convert the collected data and metrics field + aliasFields: + - $.MemNonHeapCommittedM + - $.MemNonHeapMaxM + - $.MemNonHeapUsedM + - $.MemHeapCommittedM + - $.MemHeapMaxM + - $.MemHeapUsedM + - $.GcTimeMillis + - $.GcCount + calculates: + - MemNonHeapCommittedM=$.MemNonHeapCommittedM + - MemNonHeapMaxM=$.MemNonHeapMaxM + - MemNonHeapUsedM=$.MemNonHeapUsedM + - MemHeapCommittedM=$.MemHeapCommittedM + - MemHeapMaxM=$.MemHeapMaxM + - MemHeapUsedM=$.MemHeapUsedM + - GcTimeMillis=$.GcTimeMillis + - GcCount=$.GcCount + protocol: http + http: + host: ^_^host^_^ + port: ^_^port^_^ + url: /jmx + method: GET + ssl: ^_^ssl^_^ + parseType: jsonPath + parseScript: '$.beans[?(@.name == "Hadoop:service=ResourceManager,name=JvmMetrics")]' + - name: QueueMetrics + # metrics scheduling priority(0->127)->(high->low), metrics with the same priority will be scheduled in parallel + # priority 0's metrics is availability metrics, it will be scheduled first, only availability metrics collect success will the scheduling continue + priority: 0 + # collect metrics content + # collect metrics content + fields: + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: queue + type: 1 + label: true + i18n: + zh-CN: 队列名称 + en-US: queue + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: AllocatedVCores + type: 0 + i18n: + zh-CN: 分配的虚拟核数(已分配) + en-US: AllocatedVCores + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: ReservedVCores + type: 0 + i18n: + zh-CN: 预留核数 + en-US: ReservedVCores + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: AvailableVCores + type: 0 + i18n: + zh-CN: 可用核数(尚未分配) + en-US: AvailableVCores + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: PendingVCores + type: 0 + i18n: + zh-CN: 阻塞调度核数 + en-US: PendingVCores + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: AllocatedMB + type: 0 + unit: MB + i18n: + zh-CN: 已分配(已用)的内存大小 + en-US: AllocatedMB + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: AvailableMB + type: 0 + unit: MB + i18n: + zh-CN: 可用内存(尚未分配) + en-US: AvailableMB + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: PendingMB + type: 0 + unit: MB + i18n: + zh-CN: 阻塞调度内存 + en-US: PendingMB + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: ReservedMB + type: 0 + unit: MB + i18n: + zh-CN: 预留内存 + en-US: ReservedMB + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: AllocatedContainers + type: 0 + i18n: + zh-CN: 已分配(已用)的container数 + en-US: AllocatedContainers + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: PendingContainers + type: 0 + i18n: + zh-CN: 阻塞调度container个数 + en-US: PendingContainers + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: ReservedContainers + type: 0 + i18n: + zh-CN: 预留container数 + en-US: ReservedContainers + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: AggregateContainersAllocated + type: 0 + i18n: + zh-CN: 累积的container分配总数 + en-US: AggregateContainersAllocated + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: AggregateContainersReleased + type: 0 + i18n: + zh-CN: 累积的container释放总数 + en-US: AggregateContainersReleased + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: AppsCompleted + type: 0 + i18n: + zh-CN: 完成的任务数 + en-US: AppsCompleted + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: AppsKilled + type: 0 + i18n: + zh-CN: 被杀掉的任务数 + en-US: AppsKilled + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: AppsFailed + type: 0 + i18n: + zh-CN: 失败的任务数 + en-US: AppsFailed + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: AppsPending + type: 0 + i18n: + zh-CN: 阻塞的任务数 + en-US: AppsPending + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: AppsRunning + type: 0 + i18n: + zh-CN: 提正在运行的任务数 + en-US: AppsRunning + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: AppsSubmitted + type: 0 + i18n: + zh-CN: 提交过的任务数 + en-US: AppsSubmitted + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: running_0 + type: 0 + i18n: + zh-CN: 运行时间小于60分钟的作业个数 + en-US: running_0 + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: running_60 + type: 0 + i18n: + zh-CN: 运行时间介于60~300分钟的作业个数 + en-US: running_60 + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: running_300 + type: 0 + i18n: + zh-CN: 运行时间介于300~1440分钟的作业个数 + en-US: running_300 + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: running_1440 + type: 0 + i18n: + zh-CN: 运行时间大于1440分钟的作业个数 + en-US: running_1440 + # (optional)metrics field alias name, it is used as an alias field to map and convert the collected data and metrics field + aliasFields: + - $.['tag.Queue'] + - $.AllocatedVCores + - $.ReservedVCores + - $.AvailableVCores + - $.PendingVCores + - $.AllocatedMB + - $.AvailableMB + - $.PendingMB + - $.ReservedMB + - $.AllocatedContainers + - $.PendingContainers + - $.ReservedContainers + - $.AggregateContainersAllocated + - $.AggregateContainersReleased + - $.AppsCompleted + - $.AppsKilled + - $.AppsFailed + - $.AppsPending + - $.AppsRunning + - $.AppsSubmitted + - $.running_0 + - $.running_60 + - $.running_300 + - $.running_1440 + calculates: + - queue=$.['tag.Queue'] + - AllocatedVCores=$.AllocatedVCores + - ReservedVCores=$.ReservedVCores + - AvailableVCores=$.AvailableVCores + - PendingVCores=$.PendingVCores + - AllocatedMB=$.AllocatedMB + - AvailableMB=$.AvailableMB + - PendingMB=$.PendingMB + - ReservedMB=$.ReservedMB + - AllocatedContainers=$.AllocatedContainers + - PendingContainers=$.PendingContainers + - ReservedContainers=$.ReservedContainers + - AggregateContainersAllocated=$.AggregateContainersAllocated + - AggregateContainersReleased=$.AggregateContainersReleased + - AppsCompleted=$.AppsCompleted + - AppsKilled=$.AppsKilled + - AppsFailed=$.AppsFailed + - AppsPending=$.AppsPending + - AppsRunning=$.AppsRunning + - AppsSubmitted=$.AppsSubmitted + - running_0=$.running_0 + - running_60=$.running_60 + - running_300=$.running_300 + - running_1440=$.running_1440 + protocol: http + http: + host: ^_^host^_^ + port: ^_^port^_^ + url: /jmx + method: GET + ssl: ^_^ssl^_^ + parseType: jsonPath + parseScript: '$.beans[?(@.name =~ /Hadoop:service=ResourceManager,name=QueueMetrics.*/)]' + + - name: runtime + # metrics scheduling priority(0->127)->(high->low), metrics with the same priority will be scheduled in parallel + # priority 0's metrics is availability metrics, it will be scheduled first, only availability metrics collect success will the scheduling continue + priority: 0 + # collect metrics content + fields: + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: StartTime + type: 0 + i18n: + zh-CN: 启动时间戳 + en-US: StartTime + # (optional)metrics field alias name, it is used as an alias field to map and convert the collected data and metrics field + aliasFields: + - $.StartTime + calculates: + - StartTime=$.StartTime + protocol: http + http: + host: ^_^host^_^ + port: ^_^port^_^ + url: /jmx + method: GET + ssl: ^_^ssl^_^ + parseType: jsonPath + parseScript: '$.beans[?(@.name == "java.lang:type=Runtime")]'