Skip to content

Commit

Permalink
Make UDTF execution framework batch processing (#12562)
Browse files Browse the repository at this point in the history
* Add `.putFloats()` to insert multiple rows at the same time.

* Complete all batch insert method in `BatchData.java`.

* Add overload methods with indexes to avoid copying.

* Add Java doc for new methods in `BatchData.java`.

* Merge two `BatchDataTest.java` files.

* Add `.putColumns()` method to `SerializableRowRecordList.java`.

* Finish batch insert method prototype in row record list and tv list.

* UDTF execution framework can support running really simple UDTF.

* Change TVList to list of columns to avoid copying.

* Compile DataNode without error.

* Format all files by `mvn splotless:apply`.

* Optimize import by using IDEA shortcut.

* Fix put data into existing list error.

* Generate data in terminate method.

* Format again using `mvn spotless:apply`.

* Fix some bugs and implement RowWindow UDTF.

* Basically pass all UDF-related IT.

* Add license header to all newly-added files.

* Add memory control strategy for binary list.

* Add memory control strategy for row list that contains binary field.

* Copy subregion to avoid memory leakage.

* Update iterator by recording first data index when apply new memory strategy.

* Unify `newXXX` series methods name to `construct`.

* Basically refactor storage layer.

* Add and update copy method to Column based classes.

* Rewrite unit test for ElasticSerializableRowList.

* Rewrite unit test for SerializableRowList.

* Rewrite unit test for SerializableTVList.

* Rewrite unit test for ElasticSerializableTVList.

* Sync changes between TVList and RowList.

* Refine UT of ESRowList and ESTVList with random size strings.

* Do not cache internal list in some situation.

* Add forgotten files.

* Revert `BatchData.java` relevant files.

* Remove redundant `timeHeap.clear()` in MultiInputColumnIntermediateLayer.

* Add constant folding in BinaryTransformer.

* Refactor computational layer.

* Replace origin normal column in constant reader with RLEColumn.

* Batch sliding time row window in SingleInputColumnSingleReferenceIntermediateLayer.

* Batch other row windows in SingleInputColumnSingleReferenceIntermediateLayer.

* Add forgotten `TransformUtils.java` file.

* Batch all row windows in SingleInputColumnMultiReferenceIntermediateLayer.

* Batch all row windows in all intermediate layer.

* Remove code with no usage.

* Remove code with no usage agaion.

* Remove nonexistent files.

* Refactor project's structure.

* Optimize when there is only one transformer.

* Fix losing data bug in benchmarking.

* Refactor UDTF executor.

* Skip cache update when getting column size.

* Discard TsBlock when there is only one UDF.

* Fix transformer IT errors.

* Remove unnecessary bitmap in ESRowList.

* Update TsFile version.

* Fix some IT OOM bugs.

* Try to fix UT OOM error.

* Apply spotless plugin.

* Evict data in UDF output storage.

* Revert optimization when there is only one UDTF.

* Fix losing data error.

* Update internal column count when ESList apply new memory control strategy.

* Add new types in SerializableRowList.

* Apply mvn spotless plugin.

* Fix integration-tests errors.

* Modify code according to committer's advices.[Part One]

* Modify code according to committer's advices.[Part Two]

* Modify code according to committer's advices.[Part Three]
  • Loading branch information
Sh-Zh-7 authored May 31, 2024
1 parent 6ebaa43 commit fe35c8f
Show file tree
Hide file tree
Showing 105 changed files with 7,148 additions and 6,750 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public interface UDTF extends UDF {
@SuppressWarnings("squid:S112")
void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception;

default void transform(Column[] columns, ColumnBuilder timesBuilder, ColumnBuilder valuesBuilder)
throws Exception {
throw new UnsupportedOperationException();
}

/**
* When the user specifies {@link RowByRowAccessStrategy} to access the original data in {@link
* UDTFConfigurations}, this method will be called to process the transformation. In a single UDF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public interface Row {
* @param columnIndex index of the specified column
* @return {@code true} if the value of the specified column is null
*/
boolean isNull(int columnIndex);
boolean isNull(int columnIndex) throws IOException;

/**
* Returns the number of columns.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,11 @@ public boolean isNull(int columnIndex) {
return rowRecord[columnIndex] == null;
}

// Value columns count
// thus exclude time column
@Override
public int size() {
return size;
return size - 1;
}

public void setRowRecord(Object[] rowRecord) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.queryengine.transformation.api.LayerPointReader;
import org.apache.iotdb.db.queryengine.transformation.api.LayerReader;
import org.apache.iotdb.db.queryengine.transformation.api.YieldableState;
import org.apache.iotdb.db.queryengine.transformation.dag.builder.EvaluationDAGBuilder;
import org.apache.iotdb.db.queryengine.transformation.dag.input.QueryDataSetInputLayer;
Expand All @@ -39,11 +39,13 @@
import org.apache.iotdb.db.utils.datastructure.TimeSelector;

import com.google.common.util.concurrent.ListenableFuture;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.read.common.block.column.TimeColumn;
import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.UnSupportedDataTypeException;
Expand Down Expand Up @@ -72,10 +74,12 @@ public class TransformOperator implements ProcessOperator {

protected QueryDataSetInputLayer inputLayer;
protected UDTFContext udtfContext;
protected LayerPointReader[] transformers;
protected LayerReader[] transformers;
protected List<TSDataType> outputDataTypes;

protected TimeSelector timeHeap;
protected TsBlock[] outputColumns;
protected int[] currentIndexes;
protected boolean[] shouldIterateReadersToNextValid;

private final String udtfQueryId;
Expand All @@ -102,6 +106,10 @@ public TransformOperator(
initInputLayer(inputDataTypes);
initUdtfContext(outputExpressions, zoneId);
initTransformers(inputLocations, outputExpressions, expressionTypes);

outputColumns = new TsBlock[transformers.length];
currentIndexes = new int[transformers.length];

timeHeap = new TimeSelector(transformers.length << 1, isAscending);
shouldIterateReadersToNextValid = new boolean[outputExpressions.length];
Arrays.fill(shouldIterateReadersToNextValid, true);
Expand Down Expand Up @@ -141,7 +149,7 @@ protected void initTransformers(
.buildLayerMemoryAssigner()
.bindInputLayerColumnIndexWithExpression()
.buildResultColumnPointReaders()
.getOutputPointReaders();
.getOutputReaders();
} finally {
UDFManagementService.getInstance().releaseLock();
}
Expand All @@ -150,7 +158,7 @@ protected void initTransformers(
protected YieldableState iterateAllColumnsToNextValid() throws Exception {
for (int i = 0, n = shouldIterateReadersToNextValid.length; i < n; ++i) {
if (shouldIterateReadersToNextValid[i]) {
final YieldableState yieldableState = iterateReaderToNextValid(transformers[i]);
final YieldableState yieldableState = iterateReaderToNextValid(i);
if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
}
Expand All @@ -161,20 +169,45 @@ protected YieldableState iterateAllColumnsToNextValid() throws Exception {
}

@SuppressWarnings("squid:S135")
protected YieldableState iterateReaderToNextValid(LayerPointReader reader) throws Exception {
protected YieldableState iterateReaderToNextValid(int index) throws Exception {
// Since a constant operand is not allowed to be a result column, the reader will not be
// a ConstantLayerPointReader.
// If keepNull is false, we must iterate the reader until a non-null row is returned.
YieldableState yieldableState;
while ((yieldableState = reader.yield()) == YieldableState.YIELDABLE) {
if (reader.isCurrentNull() && !keepNull) {
reader.readyForNext();
continue;
while (true) {
if (outputColumns[index] == null) {
YieldableState state = transformers[index].yield();
if (state != YieldableState.YIELDABLE) {
return state;
}

Column[] columns = transformers[index].current();

int count = columns[0].getPositionCount();
Column[] valueColumns = {columns[0]};
TsBlock.wrapBlocksWithoutCopy(count, (TimeColumn) columns[1], valueColumns);
TsBlock block = new TsBlock((TimeColumn) columns[1], columns[0]);
outputColumns[index] = block;
currentIndexes[index] = 0;
}

Column outputValueColumn = outputColumns[index].getColumn(0);
while (outputValueColumn.isNull(currentIndexes[index]) && !keepNull) {
currentIndexes[index]++;
if (currentIndexes[index] == outputValueColumn.getPositionCount()) {
transformers[index].consumedAll();
outputColumns[index] = null;
currentIndexes[index] = 0;
break;
}
}

if (outputColumns[index] != null) {
TimeColumn outputTimeColumn = outputColumns[index].getTimeColumn();
long time = outputTimeColumn.getLong(currentIndexes[index]);
timeHeap.add(time);
return YieldableState.YIELDABLE;
}
timeHeap.add(reader.currentTime());
break;
}
return yieldableState;
}

@SuppressWarnings("squid:S112")
Expand Down Expand Up @@ -218,7 +251,7 @@ public TsBlock next() throws Exception {

// values
for (int i = 0; i < columnCount; ++i) {
yieldableState = collectDataPoint(transformers[i], columnBuilders[i], currentTime, i);
yieldableState = collectDataPoint(columnBuilders[i], currentTime, i);
if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
for (int j = 0; j <= i; ++j) {
shouldIterateReadersToNextValid[j] = false;
Expand Down Expand Up @@ -253,8 +286,8 @@ public TsBlock next() throws Exception {
protected void prepareTsBlockBuilder(TsBlockBuilder tsBlockBuilder) {
if (outputDataTypes == null) {
outputDataTypes = new ArrayList<>();
for (LayerPointReader reader : transformers) {
outputDataTypes.add(reader.getDataType());
for (LayerReader reader : transformers) {
outputDataTypes.add(reader.getDataTypes()[0]);
}
}
tsBlockBuilder.buildValueColumnBuilders(outputDataTypes);
Expand All @@ -263,81 +296,69 @@ protected void prepareTsBlockBuilder(TsBlockBuilder tsBlockBuilder) {
private void prepareEachColumn(int columnCount) {
for (int i = 0; i < columnCount; ++i) {
if (shouldIterateReadersToNextValid[i]) {
transformers[i].readyForNext();
currentIndexes[i]++;
if (currentIndexes[i] == outputColumns[i].getColumn(0).getPositionCount()) {
transformers[i].consumedAll();
outputColumns[i] = null;
currentIndexes[i] = 0;
}
}
}
}

protected boolean collectReaderAppendIsNull(LayerPointReader reader, long currentTime)
throws Exception {
final YieldableState yieldableState = reader.yield();

if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
return true;
}

if (yieldableState != YieldableState.YIELDABLE) {
return false;
}

if (reader.currentTime() != currentTime) {
return true;
}

return reader.isCurrentNull();
}

protected YieldableState collectDataPoint(
LayerPointReader reader, ColumnBuilder writer, long currentTime, int readerIndex)
protected YieldableState collectDataPoint(ColumnBuilder writer, long currentTime, int index)
throws Exception {
final YieldableState yieldableState = reader.yield();
if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
YieldableState state = transformers[index].yield();
if (state == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
writer.appendNull();
return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
}
if (yieldableState != YieldableState.YIELDABLE) {
return yieldableState;
if (state != YieldableState.YIELDABLE) {
return state;
}

if (reader.currentTime() != currentTime) {
TimeColumn timeColumn = outputColumns[index].getTimeColumn();
Column valueColumn = outputColumns[index].getColumn(0);
int currentIndex = currentIndexes[index];
if (timeColumn.getLong(currentIndex) != currentTime) {
writer.appendNull();
return YieldableState.YIELDABLE;
}

if (reader.isCurrentNull()) {
if (valueColumn.isNull(currentIndex)) {
writer.appendNull();
} else {
TSDataType type = reader.getDataType();
TSDataType type = transformers[index].getDataTypes()[0];
switch (type) {
case INT32:
case DATE:
writer.writeInt(reader.currentInt());
writer.writeInt(valueColumn.getInt(currentIndex));
break;
case INT64:
case TIMESTAMP:
writer.writeLong(reader.currentLong());
writer.writeLong(valueColumn.getLong(currentIndex));
break;
case FLOAT:
writer.writeFloat(reader.currentFloat());
writer.writeFloat(valueColumn.getFloat(currentIndex));
break;
case DOUBLE:
writer.writeDouble(reader.currentDouble());
writer.writeDouble(valueColumn.getDouble(currentIndex));
break;
case BOOLEAN:
writer.writeBoolean(reader.currentBoolean());
writer.writeBoolean(valueColumn.getBoolean(currentIndex));
break;
case TEXT:
case BLOB:
case STRING:
writer.writeBinary(reader.currentBinary());
writer.writeBinary(valueColumn.getBinary(currentIndex));
break;
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.", type));
}
}

shouldIterateReadersToNextValid[readerIndex] = true;
shouldIterateReadersToNextValid[index] = true;

return YieldableState.YIELDABLE;
}
Expand Down Expand Up @@ -398,7 +419,7 @@ public long calculateRetainedSizeAfterCallingNext() {
}

@TestOnly
public LayerPointReader[] getTransformers() {
public LayerReader[] getTransformers() {
return transformers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.queryengine.transformation.api.LayerPointReader;
import org.apache.iotdb.db.queryengine.transformation.api.LayerReader;
import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
import org.apache.iotdb.db.queryengine.transformation.dag.transformer.Transformer;

Expand Down Expand Up @@ -74,11 +74,11 @@ ColumnTransformer getBuiltInScalarFunctionColumnTransformer(
* org.apache.iotdb.db.queryengine.plan.expression.visitor.IntermediateLayerVisitor}
*
* @param expression The FunctionExpression representing the scalar function
* @param layerPointReader input reader
* @param layerReader input reader
* @return Specific Transformer of this scalar function
*/
Transformer getBuiltInScalarFunctionTransformer(
FunctionExpression expression, LayerPointReader layerPointReader);
FunctionExpression expression, LayerReader layerReader);

/**
* Some built-in scalar functions may have a different header. This method will be called by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.queryengine.plan.expression.multi.builtin.BuiltInScalarFunctionHelper;
import org.apache.iotdb.db.queryengine.transformation.api.LayerPointReader;
import org.apache.iotdb.db.queryengine.transformation.api.LayerReader;
import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.CastFunctionColumnTransformer;
import org.apache.iotdb.db.queryengine.transformation.dag.transformer.Transformer;
Expand Down Expand Up @@ -61,9 +61,9 @@ public ColumnTransformer getBuiltInScalarFunctionColumnTransformer(

@Override
public Transformer getBuiltInScalarFunctionTransformer(
FunctionExpression expression, LayerPointReader layerPointReader) {
FunctionExpression expression, LayerReader layerReader) {
return new CastFunctionTransformer(
layerPointReader, this.getBuiltInScalarFunctionReturnType(expression));
layerReader, this.getBuiltInScalarFunctionReturnType(expression));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.queryengine.plan.expression.multi.builtin.BuiltInScalarFunctionHelper;
import org.apache.iotdb.db.queryengine.transformation.api.LayerPointReader;
import org.apache.iotdb.db.queryengine.transformation.api.LayerReader;
import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.DiffFunctionColumnTransformer;
import org.apache.iotdb.db.queryengine.transformation.dag.transformer.Transformer;
Expand Down Expand Up @@ -60,9 +60,9 @@ public ColumnTransformer getBuiltInScalarFunctionColumnTransformer(

@Override
public Transformer getBuiltInScalarFunctionTransformer(
FunctionExpression expression, LayerPointReader layerPointReader) {
FunctionExpression expression, LayerReader layerReader) {
return new DiffFunctionTransformer(
layerPointReader,
layerReader,
Boolean.parseBoolean(
expression.getFunctionAttributes().getOrDefault("ignoreNull", "true")));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.queryengine.plan.expression.multi.builtin.BuiltInScalarFunctionHelper;
import org.apache.iotdb.db.queryengine.transformation.api.LayerPointReader;
import org.apache.iotdb.db.queryengine.transformation.api.LayerReader;
import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.ReplaceFunctionColumnTransformer;
import org.apache.iotdb.db.queryengine.transformation.dag.transformer.Transformer;
Expand Down Expand Up @@ -65,10 +65,10 @@ public ColumnTransformer getBuiltInScalarFunctionColumnTransformer(

@Override
public Transformer getBuiltInScalarFunctionTransformer(
FunctionExpression expression, LayerPointReader layerPointReader) {
FunctionExpression expression, LayerReader layerReader) {
checkFromAndToAttributes(expression);
return new ReplaceFunctionTransformer(
layerPointReader,
layerReader,
expression.getFunctionAttributes().get(REPLACE_FROM),
expression.getFunctionAttributes().get(REPLACE_TO));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.queryengine.plan.expression.multi.builtin.BuiltInScalarFunctionHelper;
import org.apache.iotdb.db.queryengine.transformation.api.LayerPointReader;
import org.apache.iotdb.db.queryengine.transformation.api.LayerReader;
import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.RoundFunctionColumnTransformer;
import org.apache.iotdb.db.queryengine.transformation.dag.transformer.Transformer;
Expand Down Expand Up @@ -70,9 +70,9 @@ public ColumnTransformer getBuiltInScalarFunctionColumnTransformer(

@Override
public Transformer getBuiltInScalarFunctionTransformer(
FunctionExpression expression, LayerPointReader layerPointReader) {
FunctionExpression expression, LayerReader layerReader) {
return new RoundFunctionTransformer(
layerPointReader,
layerReader,
this.getBuiltInScalarFunctionReturnType(expression),
Integer.parseInt(expression.getFunctionAttributes().getOrDefault(ROUND_PLACES, "0")));
}
Expand Down
Loading

0 comments on commit fe35c8f

Please sign in to comment.