Skip to content

Commit

Permalink
[warehouse] fix iotdb bug, database not created (#765)
Browse files Browse the repository at this point in the history
  fix iotdb v1.0.  database not found bug

  fix create database bug

---------

Co-authored-by: hujiaofen <hujiaofen@2dfire.com>
  • Loading branch information
Ceilzcx and hujiaofen authored Mar 24, 2023
1 parent 5d7d015 commit 8996374
Showing 1 changed file with 38 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ public class HistoryIotDbDataStorage extends AbstractHistoryDataStorage {
*/
private static final String STORAGE_GROUP = "root.hertzbeat";

private static final String SHOW_DATABASE = "show databases %s";

/**
* create database (version 1.0.*)
*/
private static final String CREATE_DATABASE = "create database %s";

private static final String SET_TTL = "set ttl to %s %s";

private static final String CANCEL_TTL = "unset ttl to %s";
Expand Down Expand Up @@ -109,22 +116,49 @@ private boolean initIotDbSession(WarehouseProperties.StoreProperties.IotDbProper
this.sessionPool = builder.build();
boolean available = checkConnection();
if (available) {
this.initTtl(properties.getExpireTime());
log.info("IotDB session pool init success");
available = this.createDatabase();
if (available) {
this.initTtl(properties.getExpireTime());
log.info("IotDB session pool init success");
}
}
return available;
}

private boolean checkConnection() {
try {
this.sessionPool.executeQueryStatement(SHOW_STORAGE_GROUP);
this.sessionPool.executeNonQueryStatement(SHOW_STORAGE_GROUP);
return true;
} catch (Exception e) {
log.error(e.getMessage(), e);
return false;
}
}

private boolean createDatabase() {
SessionDataSetWrapper dataSet = null;
try {
// v1.0.* create database
if (IotDbVersion.V_1_0.equals(this.version)) {
String showDatabaseSql = String.format(SHOW_DATABASE, STORAGE_GROUP);
dataSet = this.sessionPool.executeQueryStatement(showDatabaseSql);
// root.hertzbeat database not exist
if (!dataSet.hasNext()) {
String createDatabaseSql = String.format(CREATE_DATABASE, STORAGE_GROUP);
this.sessionPool.executeNonQueryStatement(createDatabaseSql);
}
}
} catch (IoTDBConnectionException | StatementExecutionException e) {
log.error("create database error, error: {}", e.getMessage());
return false;
} finally {
if (dataSet != null) {
this.sessionPool.closeResultSet(dataSet);
}
}
return true;
}

private void initTtl(String expireTime) {
if (expireTime == null || expireTime.isEmpty()) {
return;
Expand Down Expand Up @@ -153,7 +187,6 @@ void saveData(CollectRep.MetricsData metricsData) {
log.info("[warehouse iotdb] flush metrics data {} is null, ignore.", metricsData.getId());
return;
}
// tablet的deviceId添加引号会导致数据插入失败
List<MeasurementSchema> schemaList = new ArrayList<>();

// todo MeasurementSchema是在客户端生成的数据结构,编码和压缩没有作用
Expand Down Expand Up @@ -285,7 +318,7 @@ public Map<String, List<Value>> getHistoryIntervalMetricData(Long monitorId, Str
return instanceValuesMap;
}
String deviceId = getDeviceId(app, metrics, monitorId, instance, true);
String selectSql = "";
String selectSql;
if (instance != null) {
selectSql = String.format(QUERY_HISTORY_INTERVAL_WITH_INSTANCE_SQL,
addQuote(metric), addQuote(metric), addQuote(metric), addQuote(metric), deviceId, history);
Expand Down

0 comments on commit 8996374

Please sign in to comment.