Skip to content

Commit

Permalink
Check duplicated measurements in one row for all insert APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
HTHou committed Jun 26, 2024
1 parent 6ba5428 commit 877c82a
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,34 @@ public void insertTabletWithWrongTimestampPrecisionTest() {
}
}

@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void insertTabletWithDuplicatedMeasurementsTest() {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s0", TSDataType.DOUBLE, TSEncoding.RLE));
schemaList.add(new MeasurementSchema("s0", TSDataType.DOUBLE, TSEncoding.RLE));
schemaList.add(new MeasurementSchema("s0", TSDataType.DOUBLE, TSEncoding.RLE));

Tablet tablet = new Tablet("root.sg1.d1", schemaList);
for (long time = 0L; time < 10L; time++) {
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, time);

tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, (double) time);
tablet.addValue(schemaList.get(1).getMeasurementId(), rowIndex, (double) time);
tablet.addValue(schemaList.get(2).getMeasurementId(), rowIndex, (double) time);
}

if (tablet.rowSize != 0) {
session.insertTablet(tablet);
tablet.reset();
}
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Insertion contains duplicated measurement: s0"));
}
}

@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void createTimeSeriesWithDoubleTicksTest() {
Expand Down Expand Up @@ -982,6 +1010,112 @@ public void insertOneDeviceRecordsWithIncorrectOrderTest() {
}
}

@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void insertOneDeviceRecordsWithDuplicatedMeasurementsTest() {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
List<Long> times = new ArrayList<>();
List<List<String>> measurements = new ArrayList<>();
List<List<TSDataType>> datatypes = new ArrayList<>();
List<List<Object>> values = new ArrayList<>();

addLine(
times,
measurements,
datatypes,
values,
3L,
"s1",
"s2",
TSDataType.INT32,
TSDataType.INT32,
1,
2);
addLine(
times,
measurements,
datatypes,
values,
2L,
"s2",
"s2",
TSDataType.INT32,
TSDataType.INT32,
3,
4);
addLine(
times,
measurements,
datatypes,
values,
1L,
"s4",
"s5",
TSDataType.FLOAT,
TSDataType.BOOLEAN,
5.0f,
Boolean.TRUE);
session.insertRecordsOfOneDevice("root.sg.d1", times, measurements, datatypes, values);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Insertion contains duplicated measurement: s2"));
}
}

@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void insertRecordsWithDuplicatedMeasurementsTest() {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
List<Long> times = new ArrayList<>();
List<List<String>> measurements = new ArrayList<>();
List<List<TSDataType>> datatypes = new ArrayList<>();
List<List<Object>> values = new ArrayList<>();
List<String> devices = new ArrayList<>();

devices.add("root.sg.d1");
addLine(
times,
measurements,
datatypes,
values,
3L,
"s1",
"s2",
TSDataType.INT32,
TSDataType.INT32,
1,
2);
devices.add("root.sg.d2");
addLine(
times,
measurements,
datatypes,
values,
2L,
"s2",
"s2",
TSDataType.INT32,
TSDataType.INT32,
3,
4);
devices.add("root.sg.d3");
addLine(
times,
measurements,
datatypes,
values,
1L,
"s4",
"s5",
TSDataType.FLOAT,
TSDataType.BOOLEAN,
5.0f,
Boolean.TRUE);
session.insertRecords(devices, times, measurements, datatypes, values);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Insertion contains duplicated measurement: s2"));
}
}

@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void insertStringRecordsOfOneDeviceWithOrderTest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2207,7 +2207,6 @@ private void checkAliasUniqueness(
@Override
public Analysis visitInsert(InsertStatement insertStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
insertStatement.semanticCheck();
long[] timeArray = insertStatement.getTimes();
PartialPath devicePath = insertStatement.getDevice();
String[] measurementList = insertStatement.getMeasurementList();
Expand All @@ -2218,9 +2217,7 @@ public Analysis visitInsert(InsertStatement insertStatement, MPPQueryContext con
insertRowStatement.setTime(timeArray[0]);
insertRowStatement.setMeasurements(measurementList);
insertRowStatement.setDataTypes(new TSDataType[measurementList.length]);
Object[] values = new Object[measurementList.length];
System.arraycopy(insertStatement.getValuesList().get(0), 0, values, 0, values.length);
insertRowStatement.setValues(values);
insertRowStatement.setValues(insertStatement.getValuesList().get(0));
insertRowStatement.setNeedInferType(true);
insertRowStatement.setAligned(insertStatement.isAligned());
return insertRowStatement.accept(this, context);
Expand Down Expand Up @@ -2252,9 +2249,7 @@ public Analysis visitInsert(InsertStatement insertStatement, MPPQueryContext con
statement.setTime(timeArray[i]);
TSDataType[] dataTypes = new TSDataType[measurementList.length];
statement.setDataTypes(dataTypes);
Object[] values = new Object[measurementList.length];
System.arraycopy(insertStatement.getValuesList().get(i), 0, values, 0, values.length);
statement.setValues(values);
statement.setValues(insertStatement.getValuesList().get(i));
statement.setAligned(insertStatement.isAligned());
statement.setNeedInferType(true);
insertRowStatementList.add(statement);
Expand Down Expand Up @@ -2513,6 +2508,7 @@ public Analysis visitAlterTimeseries(
public Analysis visitInsertTablet(
InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
insertTabletStatement.semanticCheck();
Analysis analysis = new Analysis();
validateSchema(analysis, insertTabletStatement, context);
InsertBaseStatement realStatement = removeLogicalView(analysis, insertTabletStatement);
Expand Down Expand Up @@ -2544,6 +2540,7 @@ public Analysis visitInsertTablet(
@Override
public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
insertRowStatement.semanticCheck();
Analysis analysis = new Analysis();
validateSchema(analysis, insertRowStatement, context);
InsertBaseStatement realInsertStatement = removeLogicalView(analysis, insertRowStatement);
Expand Down Expand Up @@ -2594,6 +2591,7 @@ private Analysis computeAnalysisForInsertRows(
public Analysis visitInsertRows(
InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
insertRowsStatement.semanticCheck();
Analysis analysis = new Analysis();
validateSchema(analysis, insertRowsStatement, context);
InsertRowsStatement realInsertRowsStatement =
Expand Down Expand Up @@ -2633,6 +2631,7 @@ private Analysis computeAnalysisForMultiTablets(
public Analysis visitInsertMultiTablets(
InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
insertMultiTabletsStatement.semanticCheck();
Analysis analysis = new Analysis();
validateSchema(analysis, insertMultiTabletsStatement, context);
InsertMultiTabletsStatement realStatement =
Expand All @@ -2650,6 +2649,7 @@ public Analysis visitInsertMultiTablets(
public Analysis visitInsertRowsOfOneDevice(
InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
insertRowsOfOneDeviceStatement.semanticCheck();
Analysis analysis = new Analysis();
validateSchema(analysis, insertRowsOfOneDeviceStatement, context);
InsertBaseStatement realInsertStatement =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1852,13 +1852,13 @@ private void parseInsertValuesSpec(
if (timeIndex == -1 && rows.size() != 1) {
throw new SemanticException("need timestamps when insert multi rows");
}
List<String[]> valuesList = new ArrayList<>();
List<Object[]> valuesList = new ArrayList<>();
long[] timeArray = new long[rows.size()];
for (int i = 0, size = rows.size(); i < size; i++) {
IoTDBSqlParser.RowContext row = rows.get(i);
// parse timestamp
long timestamp;
List<String> valueList = new ArrayList<>();
List<Object> valueList = new ArrayList<>();
// using now() instead
if (timeIndex == -1) {
timestamp = CommonDateTimeUtils.currentTime();
Expand All @@ -1880,7 +1880,7 @@ private void parseInsertValuesSpec(
}
}

valuesList.add(valueList.toArray(new String[0]));
valuesList.add(valueList.toArray(new Object[0]));
}
insertStatement.setTimes(timeArray);
insertStatement.setValuesList(valuesList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,21 @@ protected void selfCheckDataTypes(int index)

public abstract Object getFirstValueOfIndex(int index);

public void semanticCheck() {
Set<String> deduplicatedMeasurements = new HashSet<>();
for (String measurement : measurements) {
if (measurement == null || measurement.isEmpty()) {
throw new SemanticException(
"Measurement contains null or empty string: " + Arrays.toString(measurements));
}
if (deduplicatedMeasurements.contains(measurement)) {
throw new SemanticException("Insertion contains duplicated measurement: " + measurement);
} else {
deduplicatedMeasurements.add(measurement);
}
}
}

// region partial insert
/**
* Mark failed measurement, measurements[index], dataTypes[index] and values/columns[index] would
Expand Down Expand Up @@ -308,7 +323,7 @@ protected Map<PartialPath, List<Pair<String, Integer>>> getMapFromDeviceToMeasur
}
}
// construct map from device to measurements and record the index of its measurement
// schemaengine
// schema
Map<PartialPath, List<Pair<String, Integer>>> mapFromDeviceToMeasurementAndIndex =
new HashMap<>();
for (int i = 0; i < this.measurements.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
return false;
}

@Override
public void semanticCheck() {
for (InsertTabletStatement insertTabletStatement : insertTabletStatementList) {
insertTabletStatement.semanticCheck();
}
}

@Override
public long getMinTime() {
throw new NotImplementedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,17 @@ public void markFailedMeasurement(int index, Exception cause) {
values[index] = null;
}

@Override
public void semanticCheck() {
super.semanticCheck();
if (measurements.length != values.length) {
throw new SemanticException(
String.format(
"the measurementList's size %d is not consistent with the valueList's size %d",
measurements.length, values.length));
}
}

public boolean isNeedSplit() {
return hasLogicalViewNeedProcess();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
return false;
}

@Override
public void semanticCheck() {
for (InsertRowStatement insertRowStatement : insertRowStatementList) {
insertRowStatement.semanticCheck();
}
}

@Override
public long getMinTime() {
throw new NotImplementedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
return false;
}

@Override
public void semanticCheck() {
for (InsertRowStatement insertRowStatement : insertRowStatementList) {
insertRowStatement.semanticCheck();
}
}

@Override
public long getMinTime() {
throw new NotImplementedException();
Expand Down
Loading

0 comments on commit 877c82a

Please sign in to comment.