Skip to content

Commit

Permalink
support v1.0.+ iotdb (#702)
Browse files Browse the repository at this point in the history
* support v1.0.+ iotdb

* add comment
  • Loading branch information
Ceilzcx authored Mar 9, 2023
1 parent 6a67037 commit d07ab7a
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 33 deletions.
3 changes: 1 addition & 2 deletions manager/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,8 @@ warehouse:
rpc-port: 6667
username: root
password: root
# org.apache.iotdb.session.util.Version: V_O_12 || V_0_13
# org.apache.iotdb.session.util.Version: V_0_13 || V_1_00
version: V_0_13
# if iotdb version >= 0.13 use default queryTimeoutInMs = -1; else use default queryTimeoutInMs = 0
query-timeout-in-ms: -1
# 数据存储时间:默认'7776000000'(90天,单位为毫秒,-1代表永不过期)
# data expire time, unit:ms, default '7776000000'(90 days, -1:never expire)
Expand Down
2 changes: 1 addition & 1 deletion warehouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
</parent>
<properties>
<common.version>1.0</common.version>
<iotdb-session.version>0.13.2</iotdb-session.version>
<iotdb-session.version>0.13.3</iotdb-session.version>
<kafka-clients.version>3.4.0</kafka-clients.version>
<spring-cloud-starter-openfeign.version>3.0.5</spring-cloud-starter-openfeign.version>
<taos-jdbcdriver.version>2.0.42</taos-jdbcdriver.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.usthe.warehouse.config;

/**
* IoTDB user version
*
* @author ceilzcx
* @since 9/3/2023
*/
public enum IotDbVersion {
/**
* version 0.13.*
*/
V_0_13,
/**
* version 1.0.*
*/
V_1_0
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package com.usthe.warehouse.config;

import org.apache.iotdb.session.util.Version;
import org.springframework.boot.context.properties.ConfigurationProperties;

import java.time.ZoneId;
Expand Down Expand Up @@ -490,7 +489,7 @@ public static class IotDbProperties {
/**
* the version of IotDb
*/
private Version version;
private IotDbVersion version;

/**
* query timeout(ms)
Expand Down Expand Up @@ -553,11 +552,11 @@ public void setNodeUrls(List<String> nodeUrls) {
this.nodeUrls = nodeUrls;
}

public Version getVersion() {
public IotDbVersion getVersion() {
return version;
}

public void setVersion(Version version) {
public void setVersion(IotDbVersion version) {
this.version = version;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import com.usthe.common.entity.dto.Value;
import com.usthe.common.entity.message.CollectRep;
import com.usthe.common.util.CommonConstants;
import com.usthe.warehouse.config.IotDbVersion;
import com.usthe.warehouse.config.WarehouseProperties;
import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionDataSetWrapper;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.session.util.Version;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.write.record.Tablet;
Expand All @@ -34,8 +34,6 @@
public class HistoryIotDbDataStorage extends AbstractHistoryDataStorage {
private static final String BACK_QUOTE = "`";
private static final String DOUBLE_QUOTATION_MARKS = "\"";
private static final String SPACE = " ";
private static final String DOT = ".";
/**
* set ttl never expire
*/
Expand All @@ -61,12 +59,7 @@ public class HistoryIotDbDataStorage extends AbstractHistoryDataStorage {

private SessionPool sessionPool;

/**
* Session有这两个字段的set方法,sessionPool暂未发现,目前存储在此类中
* version: ioTDb version
* <p>用来区分不同版本的ioTDb</p>
*/
private Version version;
private IotDbVersion version;

private long queryTimeoutInMs;

Expand Down Expand Up @@ -204,7 +197,8 @@ void saveData(CollectRep.MetricsData metricsData) {
}

@Override
public Map<String, List<Value>> getHistoryMetricData(Long monitorId, String app, String metrics, String metric, String instance, String history) {
public Map<String, List<Value>> getHistoryMetricData(Long monitorId, String app, String metrics, String metric,
String instance, String history) {
Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
if (!isServerAvailable()) {
log.error("\n\t---------------IotDb Init Failed---------------\n" +
Expand Down Expand Up @@ -264,7 +258,8 @@ private void handleHistorySelect(String selectSql, String instanceId, Map<String
}

@Override
public Map<String, List<Value>> getHistoryIntervalMetricData(Long monitorId, String app, String metrics, String metric, String instance, String history) {
public Map<String, List<Value>> getHistoryIntervalMetricData(Long monitorId, String app, String metrics,
String metric, String instance, String history) {
Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
if (!isServerAvailable()) {
log.error("\n\t---------------IotDb Init Failed---------------\n" +
Expand Down Expand Up @@ -297,7 +292,8 @@ public Map<String, List<Value>> getHistoryIntervalMetricData(Long monitorId, Str
return instanceValuesMap;
}

private void handleHistoryIntervalSelect(String selectSql, String instanceId, Map<String, List<Value>> instanceValuesMap) {
private void handleHistoryIntervalSelect(String selectSql, String instanceId,
Map<String, List<Value>> instanceValuesMap) {
SessionDataSetWrapper dataSet = null;
try {
dataSet = this.sessionPool.executeQueryStatement(selectSql, this.queryTimeoutInMs);
Expand Down Expand Up @@ -369,9 +365,9 @@ private String getDeviceId(String app, String metrics, Long monitorId, String in
String deviceId = STORAGE_GROUP + "." +
(useQuote ? addQuote(app) : app) + "." +
(useQuote ? addQuote(metrics) : metrics) + "." +
monitorId;
((IotDbVersion.V_1_0.equals(version) || useQuote) ? addQuote(monitorId.toString()) : monitorId.toString());
if (instanceId != null && !instanceId.isEmpty() && !instanceId.equals(CommonConstants.NULL_VALUE)) {
deviceId += "." + (useQuote ? addQuote(instanceId) : instanceId);
deviceId += "." + addQuote(instanceId);
}
return deviceId;
}
Expand All @@ -380,22 +376,19 @@ private String getDeviceId(String app, String metrics, Long monitorId, String in
* add quote,防止查询时关键字报错(eg: nodes)
*/
private String addQuote(String text) {
if (text == null || text.isEmpty()
|| (text.startsWith(DOUBLE_QUOTATION_MARKS) && text.endsWith(DOUBLE_QUOTATION_MARKS))
|| (text.startsWith(BACK_QUOTE) && text.endsWith(BACK_QUOTE))) {
if (text == null || text.isEmpty() || (text.startsWith(BACK_QUOTE) && text.endsWith(BACK_QUOTE))) {
return text;
}
if (this.version != null && this.version.equals(Version.V_0_13)) {
text = text.replace("'", "\\'");
text = text.replace("\"", "\\\"");
text = text.replace("*", "-");
text = String.format("`%s`", text);
} else {
if (text.contains(SPACE) || text.contains(DOT)) {
text = String.format("\"%s\"", text);
return text;
if ((text.startsWith(DOUBLE_QUOTATION_MARKS) && text.endsWith(DOUBLE_QUOTATION_MARKS))) {
if (IotDbVersion.V_1_0.equals(version)) {
text = text.replace("\"", "`");
}
return text;
}
text = text.replace("'", "\\'");
text = text.replace("\"", "\\\"");
text = text.replace("*", "-");
text = String.format("`%s`", text);
return text;
}

Expand Down

0 comments on commit d07ab7a

Please sign in to comment.