From 906862e488b437b9b13b03bf1c89806e2b07fc1d Mon Sep 17 00:00:00 2001 From: tomsun28 Date: Sun, 5 Mar 2023 17:33:12 +0800 Subject: [PATCH] [warehouse] support jpa expired metrics data auto cleaner (#691) --- .../common/entity/warehouse/History.java | 2 +- .../com/usthe/common/util/TimePeriodUtil.java | 27 +++++++++++++++ .../main/java/com/usthe/manager/Manager.java | 2 ++ manager/src/main/resources/application.yml | 1 + script/application.yml | 1 + .../conf/application.yml | 17 +++++++++- .../conf/application.yml | 15 +++++++++ .../warehouse/config/WarehouseProperties.java | 12 +++++++ .../com/usthe/warehouse/dao/HistoryDao.java | 5 +++ .../store/HistoryJpaDatabaseDataStorage.java | 33 +++++++++++++++++-- 10 files changed, 111 insertions(+), 4 deletions(-) create mode 100644 common/src/main/java/com/usthe/common/util/TimePeriodUtil.java diff --git a/common/src/main/java/com/usthe/common/entity/warehouse/History.java b/common/src/main/java/com/usthe/common/entity/warehouse/History.java index 43d2c21e519..8f8b84689bd 100644 --- a/common/src/main/java/com/usthe/common/entity/warehouse/History.java +++ b/common/src/main/java/com/usthe/common/entity/warehouse/History.java @@ -18,7 +18,7 @@ * */ @Entity -@Table(name = "history") +@Table(name = "hzb_history") @Data @Builder @AllArgsConstructor diff --git a/common/src/main/java/com/usthe/common/util/TimePeriodUtil.java b/common/src/main/java/com/usthe/common/util/TimePeriodUtil.java new file mode 100644 index 00000000000..af09adad877 --- /dev/null +++ b/common/src/main/java/com/usthe/common/util/TimePeriodUtil.java @@ -0,0 +1,27 @@ +package com.usthe.common.util; + +import java.time.Duration; +import java.time.Period; +import java.time.temporal.TemporalAmount; + +/** + * time util + * + * + * + */ +public class TimePeriodUtil { + + /** + * parse tokenTime to TemporalAmount + * @param tokenTime eg: "1m", "5M", "3D", "30m", "2h", "1Y", "3W" + * @return TemporalAmount + */ + public static TemporalAmount parseTokenTime(String tokenTime) { + if (Character.isUpperCase(tokenTime.charAt(tokenTime.length() - 1))) { + return Period.parse("P" + tokenTime); + } else { + return Duration.parse("PT" + tokenTime); + } + } +} diff --git a/manager/src/main/java/com/usthe/manager/Manager.java b/manager/src/main/java/com/usthe/manager/Manager.java index 4ec4491072b..8efcd7646a0 100644 --- a/manager/src/main/java/com/usthe/manager/Manager.java +++ b/manager/src/main/java/com/usthe/manager/Manager.java @@ -22,6 +22,7 @@ import org.springframework.boot.autoconfigure.domain.EntityScan; import org.springframework.data.jpa.repository.config.EnableJpaAuditing; import org.springframework.data.jpa.repository.config.EnableJpaRepositories; +import org.springframework.scheduling.annotation.EnableScheduling; /** * @@ -30,6 +31,7 @@ @SpringBootApplication @EnableJpaAuditing +@EnableScheduling @EnableJpaRepositories(basePackages = {"com.usthe"}) @EntityScan(basePackages = {"com.usthe"}) public class Manager { diff --git a/manager/src/main/resources/application.yml b/manager/src/main/resources/application.yml index cd694ee52fd..4f46e814a93 100644 --- a/manager/src/main/resources/application.yml +++ b/manager/src/main/resources/application.yml @@ -101,6 +101,7 @@ warehouse: # 存储历史数据方式, 下方只能enabled启用一种方式 jpa: enabled: true + expire-time: 7D td-engine: enabled: false driver-class-name: com.taosdata.jdbc.rs.RestfulDriver diff --git a/script/application.yml b/script/application.yml index 1e8e74f0d75..3fa1716e7b4 100644 --- a/script/application.yml +++ b/script/application.yml @@ -91,6 +91,7 @@ warehouse: # 存储历史数据方式, 下方只能enabled启用一种方式 jpa: enabled: true + expire-time: 7D td-engine: enabled: false driver-class-name: com.taosdata.jdbc.rs.RestfulDriver diff --git a/script/docker-compose/hertzbeat-mysql-iotdb/conf/application.yml b/script/docker-compose/hertzbeat-mysql-iotdb/conf/application.yml index fd05a46c41c..cc22e6953f0 100644 --- a/script/docker-compose/hertzbeat-mysql-iotdb/conf/application.yml +++ b/script/docker-compose/hertzbeat-mysql-iotdb/conf/application.yml @@ -85,6 +85,11 @@ spring: warehouse: store: +# store history metrics data, enable only one below +# 存储历史数据方式, 下方只能enabled启用一种方式 + jpa: + enabled: false + expire-time: 7D iot-db: enabled: true host: iotdb @@ -97,4 +102,14 @@ warehouse: query-timeout-in-ms: -1 # 数据存储时间:默认'7776000000'(90天,单位为毫秒,-1代表永不过期) # data expire time, unit:ms, default '7776000000'(90 days, -1:never expire) - expire-time: '7776000000' \ No newline at end of file + expire-time: '7776000000' +# store real-time metrics data, enable only one below +# 存储实时数据方式, 下方只能enabled启用一种方式 + memory: + enabled: true + init-size: 1024 + redis: + enabled: false + host: 127.0.0.1 + port: 6379 + password: 123456 diff --git a/script/docker-compose/hertzbeat-mysql-tdengine/conf/application.yml b/script/docker-compose/hertzbeat-mysql-tdengine/conf/application.yml index bf81b45256d..8534c384720 100644 --- a/script/docker-compose/hertzbeat-mysql-tdengine/conf/application.yml +++ b/script/docker-compose/hertzbeat-mysql-tdengine/conf/application.yml @@ -85,9 +85,24 @@ spring: warehouse: store: +# store history metrics data, enable only one below +# 存储历史数据方式, 下方只能enabled启用一种方式 + jpa: + enabled: false + expire-time: 7D td-engine: enabled: true driver-class-name: com.taosdata.jdbc.rs.RestfulDriver url: jdbc:TAOS-RS://tdengine:6041/hertzbeat username: root password: taosdata +# store real-time metrics data, enable only one below +# 存储实时数据方式, 下方只能enabled启用一种方式 + memory: + enabled: true + init-size: 1024 + redis: + enabled: false + host: 127.0.0.1 + port: 6379 + password: 123456 diff --git a/warehouse/src/main/java/com/usthe/warehouse/config/WarehouseProperties.java b/warehouse/src/main/java/com/usthe/warehouse/config/WarehouseProperties.java index cfb0d901bf5..1f052bde4db 100644 --- a/warehouse/src/main/java/com/usthe/warehouse/config/WarehouseProperties.java +++ b/warehouse/src/main/java/com/usthe/warehouse/config/WarehouseProperties.java @@ -243,6 +243,11 @@ public static class JpaProperties { */ private boolean enabled = true; + /** + * save data expire time(ms) + */ + private String expireTime = "7D"; + public boolean isEnabled() { return enabled; } @@ -251,6 +256,13 @@ public void setEnabled(boolean enabled) { this.enabled = enabled; } + public String getExpireTime() { + return expireTime; + } + + public void setExpireTime(String expireTime) { + this.expireTime = expireTime; + } } public static class InfluxdbProperties { diff --git a/warehouse/src/main/java/com/usthe/warehouse/dao/HistoryDao.java b/warehouse/src/main/java/com/usthe/warehouse/dao/HistoryDao.java index c986e69deb2..95ef8e1e053 100644 --- a/warehouse/src/main/java/com/usthe/warehouse/dao/HistoryDao.java +++ b/warehouse/src/main/java/com/usthe/warehouse/dao/HistoryDao.java @@ -12,4 +12,9 @@ */ public interface HistoryDao extends JpaRepository, JpaSpecificationExecutor { + /** + * delete history before expireTime + * @param expireTime expireTime + */ + void deleteHistoriesByTimeBefore(Long expireTime); } diff --git a/warehouse/src/main/java/com/usthe/warehouse/store/HistoryJpaDatabaseDataStorage.java b/warehouse/src/main/java/com/usthe/warehouse/store/HistoryJpaDatabaseDataStorage.java index 2718cbb0698..60ea062342b 100644 --- a/warehouse/src/main/java/com/usthe/warehouse/store/HistoryJpaDatabaseDataStorage.java +++ b/warehouse/src/main/java/com/usthe/warehouse/store/HistoryJpaDatabaseDataStorage.java @@ -5,13 +5,16 @@ import com.usthe.common.entity.warehouse.History; import com.usthe.common.queue.CommonDataQueue; import com.usthe.common.util.CommonConstants; +import com.usthe.common.util.TimePeriodUtil; import com.usthe.warehouse.WarehouseWorkerPool; import com.usthe.warehouse.config.WarehouseProperties; import com.usthe.warehouse.dao.HistoryDao; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.math.NumberUtils; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.domain.Sort; import org.springframework.data.jpa.domain.Specification; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.persistence.criteria.Predicate; @@ -34,18 +37,44 @@ @Slf4j public class HistoryJpaDatabaseDataStorage extends AbstractHistoryDataStorage { private HistoryDao historyDao; + private WarehouseProperties.StoreProperties.JpaProperties jpaProperties; public HistoryJpaDatabaseDataStorage(WarehouseWorkerPool workerPool, WarehouseProperties properties, HistoryDao historyDao, CommonDataQueue commonDataQueue) { super(workerPool, properties, commonDataQueue); - + this.jpaProperties = properties.getStore().getJpa(); this.serverAvailable = true; this.historyDao = historyDao; this.startStorageData("warehouse-jpa-data-storage", isServerAvailable()); } + @Scheduled(cron = "0 0 23 * * ?") + public void expiredDataCleaner() { + String expireTimeStr = jpaProperties.getExpireTime(); + long expireTime = 0; + try { + if (NumberUtils.isParsable(expireTimeStr)) { + expireTime = NumberUtils.toLong(expireTimeStr); + expireTime = (ZonedDateTime.now().toEpochSecond() + expireTime) * 1000; + } else { + TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(expireTimeStr); + ZonedDateTime dateTime = ZonedDateTime.now().minus(temporalAmount); + expireTime = dateTime.toEpochSecond() * 1000; + } + } catch (Exception e) { + log.error("expiredDataCleaner time error: {}. use default expire time to clean: 7d", e.getMessage()); + ZonedDateTime dateTime = ZonedDateTime.now().minus(Duration.ofDays(7)); + expireTime = dateTime.toEpochSecond() * 1000; + } + try { + historyDao.deleteHistoriesByTimeBefore(expireTime); + } catch (Exception e) { + log.error("expiredDataCleaner database error: {}.", e.getMessage()); + } + } + @Override void saveData(CollectRep.MetricsData metricsData) { if (metricsData.getCode() != CollectRep.Code.SUCCESS) { @@ -130,7 +159,7 @@ public Map> getHistoryMetricData(Long monitorId, String app, } if (history != null) { try { - TemporalAmount temporalAmount = Duration.parse("PT" + history); + TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(history); ZonedDateTime dateTime = ZonedDateTime.now().minus(temporalAmount); long timeBefore = dateTime.toEpochSecond() * 1000; Predicate timePredicate = criteriaBuilder.ge(root.get("time"), timeBefore);