Skip to content

Commit

Permalink
[warehouse] support jpa expired metrics data auto cleaner (#691)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomsun28 authored Mar 5, 2023
1 parent b03347c commit 906862e
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*
*/
@Entity
@Table(name = "history")
@Table(name = "hzb_history")
@Data
@Builder
@AllArgsConstructor
Expand Down
27 changes: 27 additions & 0 deletions common/src/main/java/com/usthe/common/util/TimePeriodUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.usthe.common.util;

import java.time.Duration;
import java.time.Period;
import java.time.temporal.TemporalAmount;

/**
* time util
*
*
*
*/
public class TimePeriodUtil {

/**
* parse tokenTime to TemporalAmount
* @param tokenTime eg: "1m", "5M", "3D", "30m", "2h", "1Y", "3W"
* @return TemporalAmount
*/
public static TemporalAmount parseTokenTime(String tokenTime) {
if (Character.isUpperCase(tokenTime.charAt(tokenTime.length() - 1))) {
return Period.parse("P" + tokenTime);
} else {
return Duration.parse("PT" + tokenTime);
}
}
}
2 changes: 2 additions & 0 deletions manager/src/main/java/com/usthe/manager/Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
*
Expand All @@ -30,6 +31,7 @@

@SpringBootApplication
@EnableJpaAuditing
@EnableScheduling
@EnableJpaRepositories(basePackages = {"com.usthe"})
@EntityScan(basePackages = {"com.usthe"})
public class Manager {
Expand Down
1 change: 1 addition & 0 deletions manager/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ warehouse:
# 存储历史数据方式, 下方只能enabled启用一种方式
jpa:
enabled: true
expire-time: 7D
td-engine:
enabled: false
driver-class-name: com.taosdata.jdbc.rs.RestfulDriver
Expand Down
1 change: 1 addition & 0 deletions script/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ warehouse:
# 存储历史数据方式, 下方只能enabled启用一种方式
jpa:
enabled: true
expire-time: 7D
td-engine:
enabled: false
driver-class-name: com.taosdata.jdbc.rs.RestfulDriver
Expand Down
17 changes: 16 additions & 1 deletion script/docker-compose/hertzbeat-mysql-iotdb/conf/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ spring:

warehouse:
store:
# store history metrics data, enable only one below
# 存储历史数据方式, 下方只能enabled启用一种方式
jpa:
enabled: false
expire-time: 7D
iot-db:
enabled: true
host: iotdb
Expand All @@ -97,4 +102,14 @@ warehouse:
query-timeout-in-ms: -1
# 数据存储时间:默认'7776000000'(90天,单位为毫秒,-1代表永不过期)
# data expire time, unit:ms, default '7776000000'(90 days, -1:never expire)
expire-time: '7776000000'
expire-time: '7776000000'
# store real-time metrics data, enable only one below
# 存储实时数据方式, 下方只能enabled启用一种方式
memory:
enabled: true
init-size: 1024
redis:
enabled: false
host: 127.0.0.1
port: 6379
password: 123456
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,24 @@ spring:

warehouse:
store:
# store history metrics data, enable only one below
# 存储历史数据方式, 下方只能enabled启用一种方式
jpa:
enabled: false
expire-time: 7D
td-engine:
enabled: true
driver-class-name: com.taosdata.jdbc.rs.RestfulDriver
url: jdbc:TAOS-RS://tdengine:6041/hertzbeat
username: root
password: taosdata
# store real-time metrics data, enable only one below
# 存储实时数据方式, 下方只能enabled启用一种方式
memory:
enabled: true
init-size: 1024
redis:
enabled: false
host: 127.0.0.1
port: 6379
password: 123456
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ public static class JpaProperties {
*/
private boolean enabled = true;

/**
* save data expire time(ms)
*/
private String expireTime = "7D";

public boolean isEnabled() {
return enabled;
}
Expand All @@ -251,6 +256,13 @@ public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public String getExpireTime() {
return expireTime;
}

public void setExpireTime(String expireTime) {
this.expireTime = expireTime;
}
}

public static class InfluxdbProperties {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,9 @@
*/
public interface HistoryDao extends JpaRepository<History, Long>, JpaSpecificationExecutor<History> {

/**
* delete history before expireTime
* @param expireTime expireTime
*/
void deleteHistoriesByTimeBefore(Long expireTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
import com.usthe.common.entity.warehouse.History;
import com.usthe.common.queue.CommonDataQueue;
import com.usthe.common.util.CommonConstants;
import com.usthe.common.util.TimePeriodUtil;
import com.usthe.warehouse.WarehouseWorkerPool;
import com.usthe.warehouse.config.WarehouseProperties;
import com.usthe.warehouse.dao.HistoryDao;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.domain.Sort;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.persistence.criteria.Predicate;
Expand All @@ -34,18 +37,44 @@
@Slf4j
public class HistoryJpaDatabaseDataStorage extends AbstractHistoryDataStorage {
private HistoryDao historyDao;
private WarehouseProperties.StoreProperties.JpaProperties jpaProperties;

public HistoryJpaDatabaseDataStorage(WarehouseWorkerPool workerPool,
WarehouseProperties properties,
HistoryDao historyDao,
CommonDataQueue commonDataQueue) {
super(workerPool, properties, commonDataQueue);

this.jpaProperties = properties.getStore().getJpa();
this.serverAvailable = true;
this.historyDao = historyDao;
this.startStorageData("warehouse-jpa-data-storage", isServerAvailable());
}

@Scheduled(cron = "0 0 23 * * ?")
public void expiredDataCleaner() {
String expireTimeStr = jpaProperties.getExpireTime();
long expireTime = 0;
try {
if (NumberUtils.isParsable(expireTimeStr)) {
expireTime = NumberUtils.toLong(expireTimeStr);
expireTime = (ZonedDateTime.now().toEpochSecond() + expireTime) * 1000;
} else {
TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(expireTimeStr);
ZonedDateTime dateTime = ZonedDateTime.now().minus(temporalAmount);
expireTime = dateTime.toEpochSecond() * 1000;
}
} catch (Exception e) {
log.error("expiredDataCleaner time error: {}. use default expire time to clean: 7d", e.getMessage());
ZonedDateTime dateTime = ZonedDateTime.now().minus(Duration.ofDays(7));
expireTime = dateTime.toEpochSecond() * 1000;
}
try {
historyDao.deleteHistoriesByTimeBefore(expireTime);
} catch (Exception e) {
log.error("expiredDataCleaner database error: {}.", e.getMessage());
}
}

@Override
void saveData(CollectRep.MetricsData metricsData) {
if (metricsData.getCode() != CollectRep.Code.SUCCESS) {
Expand Down Expand Up @@ -130,7 +159,7 @@ public Map<String, List<Value>> getHistoryMetricData(Long monitorId, String app,
}
if (history != null) {
try {
TemporalAmount temporalAmount = Duration.parse("PT" + history);
TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(history);
ZonedDateTime dateTime = ZonedDateTime.now().minus(temporalAmount);
long timeBefore = dateTime.toEpochSecond() * 1000;
Predicate timePredicate = criteriaBuilder.ge(root.get("time"), timeBefore);
Expand Down

0 comments on commit 906862e

Please sign in to comment.