diff --git a/warehouse/src/main/java/com/usthe/warehouse/store/HistoryIotDbDataStorage.java b/warehouse/src/main/java/com/usthe/warehouse/store/HistoryIotDbDataStorage.java index 2cf31739797..ca365c418e7 100644 --- a/warehouse/src/main/java/com/usthe/warehouse/store/HistoryIotDbDataStorage.java +++ b/warehouse/src/main/java/com/usthe/warehouse/store/HistoryIotDbDataStorage.java @@ -61,6 +61,13 @@ public class HistoryIotDbDataStorage extends AbstractHistoryDataStorage { */ private static final String STORAGE_GROUP = "root.hertzbeat"; + private static final String SHOW_DATABASE = "show databases %s"; + + /** + * create database (version 1.0.*) + */ + private static final String CREATE_DATABASE = "create database %s"; + private static final String SET_TTL = "set ttl to %s %s"; private static final String CANCEL_TTL = "unset ttl to %s"; @@ -109,15 +116,18 @@ private boolean initIotDbSession(WarehouseProperties.StoreProperties.IotDbProper this.sessionPool = builder.build(); boolean available = checkConnection(); if (available) { - this.initTtl(properties.getExpireTime()); - log.info("IotDB session pool init success"); + available = this.createDatabase(); + if (available) { + this.initTtl(properties.getExpireTime()); + log.info("IotDB session pool init success"); + } } return available; } private boolean checkConnection() { try { - this.sessionPool.executeQueryStatement(SHOW_STORAGE_GROUP); + this.sessionPool.executeNonQueryStatement(SHOW_STORAGE_GROUP); return true; } catch (Exception e) { log.error(e.getMessage(), e); @@ -125,6 +135,30 @@ private boolean checkConnection() { } } + private boolean createDatabase() { + SessionDataSetWrapper dataSet = null; + try { + // v1.0.* create database + if (IotDbVersion.V_1_0.equals(this.version)) { + String showDatabaseSql = String.format(SHOW_DATABASE, STORAGE_GROUP); + dataSet = this.sessionPool.executeQueryStatement(showDatabaseSql); + // root.hertzbeat database not exist + if (!dataSet.hasNext()) { + String createDatabaseSql = String.format(CREATE_DATABASE, STORAGE_GROUP); + this.sessionPool.executeNonQueryStatement(createDatabaseSql); + } + } + } catch (IoTDBConnectionException | StatementExecutionException e) { + log.error("create database error, error: {}", e.getMessage()); + return false; + } finally { + if (dataSet != null) { + this.sessionPool.closeResultSet(dataSet); + } + } + return true; + } + private void initTtl(String expireTime) { if (expireTime == null || expireTime.isEmpty()) { return; @@ -153,7 +187,6 @@ void saveData(CollectRep.MetricsData metricsData) { log.info("[warehouse iotdb] flush metrics data {} is null, ignore.", metricsData.getId()); return; } - // tablet的deviceId添加引号会导致数据插入失败 List schemaList = new ArrayList<>(); // todo MeasurementSchema是在客户端生成的数据结构,编码和压缩没有作用 @@ -285,7 +318,7 @@ public Map> getHistoryIntervalMetricData(Long monitorId, Str return instanceValuesMap; } String deviceId = getDeviceId(app, metrics, monitorId, instance, true); - String selectSql = ""; + String selectSql; if (instance != null) { selectSql = String.format(QUERY_HISTORY_INTERVAL_WITH_INSTANCE_SQL, addQuote(metric), addQuote(metric), addQuote(metric), addQuote(metric), deviceId, history);