Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add jpa to date type storage #1431

Merged
merged 2 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class History {
@Schema(title = "Monitoring Metric usage speed count")
private String metric;

@Column(length = 2048)
@Column(length = 5000)
private String instance;

@Schema(title = "Metric Type 0: Number 1:String")
Expand All @@ -56,6 +56,9 @@ public class History {
@Column(length = 2048)
private String str;

@Schema(title = "Metric Integer Value")
private Integer int32;

@Schema(title = "Metric Number Value")
private Double dou;

Expand Down
3 changes: 2 additions & 1 deletion script/sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,10 @@ CREATE TABLE hzb_history
app varchar(100) not null comment '监控类型 mysql oracle db2',
metrics varchar(100) not null comment '指标集合名称 innodb disk cpu',
metric varchar(100) not null comment '指标名称 usage speed count',
instance varchar(1024) comment '实例',
instance varchar(5000) comment '实例',
metric_type tinyint not null comment '字段类型 0: 数值 1:字符串',
str varchar(1024) comment '字符值',
int32 int comment '整数',
dou float comment '数值',
time bigint comment '采集时间戳',
primary key (id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
package org.dromara.hertzbeat.warehouse.store;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.dromara.hertzbeat.common.constants.CommonConstants;
import org.dromara.hertzbeat.common.entity.dto.Value;
import org.dromara.hertzbeat.common.entity.message.CollectRep;
import org.dromara.hertzbeat.common.entity.warehouse.History;
import org.dromara.hertzbeat.common.constants.CommonConstants;
import org.dromara.hertzbeat.common.util.JsonUtil;
import org.dromara.hertzbeat.common.util.TimePeriodUtil;
import org.dromara.hertzbeat.warehouse.config.WarehouseProperties;
import org.dromara.hertzbeat.warehouse.dao.HistoryDao;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.domain.Sort;
import org.springframework.data.jpa.domain.Specification;
Expand All @@ -49,7 +50,6 @@
* data storage by mysql/h2 - jpa
*
* @author tom
*
*/
@Component
@ConditionalOnProperty(prefix = "warehouse.store.jpa",
Expand All @@ -62,55 +62,56 @@ public class HistoryJpaDatabaseDataStorage extends AbstractHistoryDataStorage {
private static final int STRING_MAX_LENGTH = 1024;

public HistoryJpaDatabaseDataStorage(WarehouseProperties properties,
HistoryDao historyDao) {
HistoryDao historyDao) {
this.jpaProperties = properties.getStore().getJpa();
this.serverAvailable = true;
this.historyDao = historyDao;
expiredDataCleaner();
expiredDataCleaner();
}

public void expiredDataCleaner() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((thread, throwable) -> {
log.error("Jpa metrics store has uncaughtException.");
log.error(throwable.getMessage(), throwable); })
.setDaemon(true)
.setNameFormat("jpa-metrics-cleaner-%d")
.build();
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
scheduledExecutor.scheduleAtFixedRate(() -> {
log.warn("[jpa-metrics-store]-start running expired data cleaner." +
"Please use time series db instead of jpa for better performance");
String expireTimeStr = jpaProperties.getExpireTime();
long expireTime = 0;
try {
if (NumberUtils.isParsable(expireTimeStr)) {
expireTime = NumberUtils.toLong(expireTimeStr);
expireTime = (ZonedDateTime.now().toEpochSecond() + expireTime) * 1000;
} else {
TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(expireTimeStr);
ZonedDateTime dateTime = ZonedDateTime.now().minus(temporalAmount);
expireTime = dateTime.toEpochSecond() * 1000;
}
} catch (Exception e) {
log.error("expiredDataCleaner time error: {}. use default expire time to clean: 1h", e.getMessage());
ZonedDateTime dateTime = ZonedDateTime.now().minus(Duration.ofHours(1));
expireTime = dateTime.toEpochSecond() * 1000;
}
try {
int rows = historyDao.deleteHistoriesByTimeBefore(expireTime);
log.info("[jpa-metrics-store]-delete {} rows.", rows);
long total = historyDao.count();
if (total > jpaProperties.getMaxHistoryRecordNum()) {
rows = historyDao.deleteOlderHistoriesRecord(jpaProperties.getMaxHistoryRecordNum() / 2);
log.warn("[jpa-metrics-store]-force delete {} rows due too many. Please use time series db instead of jpa for better performance.", rows);
}
} catch (Exception e) {
log.error("expiredDataCleaner database error: {}.", e.getMessage());
log.error("try to truncate table hzb_history. Please use time series db instead of jpa for better performance.");
historyDao.truncateTable();
}
}, 5, 30, TimeUnit.SECONDS);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((thread, throwable) -> {
log.error("Jpa metrics store has uncaughtException.");
log.error(throwable.getMessage(), throwable);
})
.setDaemon(true)
.setNameFormat("jpa-metrics-cleaner-%d")
.build();
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
scheduledExecutor.scheduleAtFixedRate(() -> {
log.warn("[jpa-metrics-store]-start running expired data cleaner." +
"Please use time series db instead of jpa for better performance");
String expireTimeStr = jpaProperties.getExpireTime();
long expireTime = 0;
try {
if (NumberUtils.isParsable(expireTimeStr)) {
expireTime = NumberUtils.toLong(expireTimeStr);
expireTime = (ZonedDateTime.now().toEpochSecond() + expireTime) * 1000;
} else {
TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(expireTimeStr);
ZonedDateTime dateTime = ZonedDateTime.now().minus(temporalAmount);
expireTime = dateTime.toEpochSecond() * 1000;
}
} catch (Exception e) {
log.error("expiredDataCleaner time error: {}. use default expire time to clean: 1h", e.getMessage());
ZonedDateTime dateTime = ZonedDateTime.now().minus(Duration.ofHours(1));
expireTime = dateTime.toEpochSecond() * 1000;
}
try {
int rows = historyDao.deleteHistoriesByTimeBefore(expireTime);
log.info("[jpa-metrics-store]-delete {} rows.", rows);
long total = historyDao.count();
if (total > jpaProperties.getMaxHistoryRecordNum()) {
rows = historyDao.deleteOlderHistoriesRecord(jpaProperties.getMaxHistoryRecordNum() / 2);
log.warn("[jpa-metrics-store]-force delete {} rows due too many. Please use time series db instead of jpa for better performance.", rows);
}
} catch (Exception e) {
log.error("expiredDataCleaner database error: {}.", e.getMessage());
log.error("try to truncate table hzb_history. Please use time series db instead of jpa for better performance.");
historyDao.truncateTable();
}
}, 5, 30, TimeUnit.SECONDS);
}

@Override
Expand All @@ -135,43 +136,69 @@ void saveData(CollectRep.MetricsData metricsData) {
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
Map<String, String> labels = new HashMap<>(8);
for (int i = 0; i < fieldsList.size(); i++) {
CollectRep.Field field = fieldsList.get(i);
if (field.getLabel() && !CommonConstants.NULL_VALUE.equals(valueRow.getColumns(i))) {
labels.put(field.getName(), valueRow.getColumns(i));
}
}
for (int i = 0; i < fieldsList.size(); i++) {
CollectRep.Field field = fieldsList.get(i);
// ignore string value store in db
if (field.getType() == CommonConstants.TYPE_STRING) {
continue;
}
historyBuilder.metric(field.getName());
historyBuilder.instance(JsonUtil.toJson(labels));
if (!CommonConstants.NULL_VALUE.equals(valueRow.getColumns(i))) {
if (field.getType() == CommonConstants.TYPE_NUMBER) {
historyBuilder.metricType(CommonConstants.TYPE_NUMBER)
.dou(Double.parseDouble(valueRow.getColumns(i)));
} else if (field.getType() == CommonConstants.TYPE_STRING) {
historyBuilder.metricType(CommonConstants.TYPE_STRING)
.str(formatStrValue(valueRow.getColumns(i)));
final CollectRep.Field field = fieldsList.get(i);
final int fieldType = field.getType();
final String fieldName = field.getName();
final String columnValue = valueRow.getColumns(i);

historyBuilder.metric(fieldName);

if (CommonConstants.NULL_VALUE.equals(columnValue)) {
switch (fieldType) {
case CommonConstants.TYPE_NUMBER: {
historyBuilder.metricType(CommonConstants.TYPE_NUMBER)
.dou(null);
break;
}

case CommonConstants.TYPE_STRING: {
historyBuilder.metricType(CommonConstants.TYPE_STRING)
.str(null);
break;
}

case CommonConstants.TYPE_TIME: {
historyBuilder.metricType(CommonConstants.TYPE_TIME)
.int32(null);
break;
}
}
} else {
if (field.getType() == CommonConstants.TYPE_NUMBER) {
historyBuilder.metricType(CommonConstants.TYPE_NUMBER).dou(null);
} else if (field.getType() == CommonConstants.TYPE_STRING) {
historyBuilder.metricType(CommonConstants.TYPE_STRING).str(null);
switch (fieldType) {
case CommonConstants.TYPE_NUMBER: {
historyBuilder.metricType(CommonConstants.TYPE_NUMBER)
.dou(Double.parseDouble(columnValue));
break;
}

case CommonConstants.TYPE_STRING: {
historyBuilder.metricType(CommonConstants.TYPE_STRING)
.str(formatStrValue(columnValue));
break;
}

case CommonConstants.TYPE_TIME: {
historyBuilder.metricType(CommonConstants.TYPE_TIME)
.int32(Integer.parseInt(columnValue));
break;
}
}

if (field.getLabel()) {
labels.put(fieldName, columnValue);
}
}

historyList.add(historyBuilder.build());
}
historyBuilder.instance(JsonUtil.toJson(labels));
}
historyDao.saveAll(historyList);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}

@Override
public Map<String, List<Value>> getHistoryMetricData(Long monitorId, String app, String metrics, String metric, String label, String history) {
Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
Expand All @@ -185,10 +212,12 @@ public Map<String, List<Value>> getHistoryMetricData(Long monitorId, String app,
andList.add(predicateMonitorType);
andList.add(predicateMonitorMetrics);
andList.add(predicateMonitorMetric);
if (label != null && !"".equals(label)) {

if (StringUtils.isNotBlank(label)) {
Predicate predicateMonitorInstance = criteriaBuilder.equal(root.get("instance"), label);
andList.add(predicateMonitorInstance);
}

if (history != null) {
try {
TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(history);
Expand Down
Loading