-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make UDTF execution framework batch processing #12562
Conversation
} | ||
|
||
Column[] columns = transformers[0].current(); | ||
TsBlock block = new TsBlock((TimeColumn) columns[1], columns[0]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use API wrapBlocksWithoutCopy🤔?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this API:
int count = columns[0].getPositionCount();
Column[] valueColumns = { columns[0] };
TsBlock block = TsBlock.wrapBlocksWithoutCopy(count, (TimeColumn) columns[1], valueColumns);
Without this API:
TsBlock block = new TsBlock((TimeColumn) columns[1], columns[0]);
The code hasn't gotten any more elegant.
} | ||
|
||
Column[] columns = transformers[index].current(); | ||
TsBlock block = new TsBlock((TimeColumn) columns[1], columns[0]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
|
||
public ConstantInputReader(ConstantOperand expression) throws QueryProcessException { | ||
this.expression = Validate.notNull(expression); | ||
Validate.notNull(expression); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a Deprecated method. You could add some error msg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
public boolean currentBoolean() throws IOException { | ||
return cachedBoolean; | ||
public Column[] current() throws IOException { | ||
return new Column[] {cachedColumn}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps making cachedColumn Column[] instead of Column could avoid new array each time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
|
||
public class QueryDataSetInputLayer { | ||
|
||
private IUDFInputDataSet queryDataSet; | ||
private TsBlockInputDataSet queryDataSet; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps using interface IUDFInputDataSet is better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think TsBlockInputDataSet
would provide more information than IUDFInputDataSet
.
And QueryDataSetInputLayer
must be TsBlockInputDataSet
.
cachedRowRecord = null; | ||
currentRowIndex = -1; | ||
} | ||
private RowListForwardIterator iterator = rowList.constructIterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private final RowListForwardIterator iterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
void readyForNext(); | ||
void consumed(int count); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
|
||
executor.execute(columns, valueColumnBuilder); | ||
|
||
Column timeColumn = columns[1]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure that columns returned by layerReader only contains two columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
public long getTimeByIndex(int index) { | ||
assert !isConstant; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw UnsupportedException instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually TVColumns
class is only for internal use. So it will never trigger the assertion failed.
I don't think there is the need to throw an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL~
// region single data methods | ||
public long getTime(int index) { | ||
// Never access null row's time | ||
assert index >= prefixNullCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace assert with checkState()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the benefits of the checkState()
over the asset
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
PTAL~ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work~
* Add `.putFloats()` to insert multiple rows at the same time. * Complete all batch insert method in `BatchData.java`. * Add overload methods with indexes to avoid copying. * Add Java doc for new methods in `BatchData.java`. * Merge two `BatchDataTest.java` files. * Add `.putColumns()` method to `SerializableRowRecordList.java`. * Finish batch insert method prototype in row record list and tv list. * UDTF execution framework can support running really simple UDTF. * Change TVList to list of columns to avoid copying. * Compile DataNode without error. * Format all files by `mvn splotless:apply`. * Optimize import by using IDEA shortcut. * Fix put data into existing list error. * Generate data in terminate method. * Format again using `mvn spotless:apply`. * Fix some bugs and implement RowWindow UDTF. * Basically pass all UDF-related IT. * Add license header to all newly-added files. * Add memory control strategy for binary list. * Add memory control strategy for row list that contains binary field. * Copy subregion to avoid memory leakage. * Update iterator by recording first data index when apply new memory strategy. * Unify `newXXX` series methods name to `construct`. * Basically refactor storage layer. * Add and update copy method to Column based classes. * Rewrite unit test for ElasticSerializableRowList. * Rewrite unit test for SerializableRowList. * Rewrite unit test for SerializableTVList. * Rewrite unit test for ElasticSerializableTVList. * Sync changes between TVList and RowList. * Refine UT of ESRowList and ESTVList with random size strings. * Do not cache internal list in some situation. * Add forgotten files. * Revert `BatchData.java` relevant files. * Remove redundant `timeHeap.clear()` in MultiInputColumnIntermediateLayer. * Add constant folding in BinaryTransformer. * Refactor computational layer. * Replace origin normal column in constant reader with RLEColumn. * Batch sliding time row window in SingleInputColumnSingleReferenceIntermediateLayer. * Batch other row windows in SingleInputColumnSingleReferenceIntermediateLayer. * Add forgotten `TransformUtils.java` file. * Batch all row windows in SingleInputColumnMultiReferenceIntermediateLayer. * Batch all row windows in all intermediate layer. * Remove code with no usage. * Remove code with no usage agaion. * Remove nonexistent files. * Refactor project's structure. * Optimize when there is only one transformer. * Fix losing data bug in benchmarking. * Refactor UDTF executor. * Skip cache update when getting column size. * Discard TsBlock when there is only one UDF. * Fix transformer IT errors. * Remove unnecessary bitmap in ESRowList. * Update TsFile version. * Fix some IT OOM bugs. * Try to fix UT OOM error. * Apply spotless plugin. * Evict data in UDF output storage. * Revert optimization when there is only one UDTF. * Fix losing data error. * Update internal column count when ESList apply new memory control strategy. * Add new types in SerializableRowList. * Apply mvn spotless plugin. * Fix integration-tests errors. * Modify code according to committer's advices.[Part One] * Modify code according to committer's advices.[Part Two] * Modify code according to committer's advices.[Part Three]
Description
The origin UDTF execution framework process one data point at time, which is inefficient. This PR introduce a new UDTF execution framework which process one batch at time. While lowering the function call times, it also reduce the copy significantly.
Additionally, this PR also introduce some optimizations like constant folding and skip time join when there is only one UDTF expression. The code is refactor to improve readability as well.
This PR has:
for an unfamiliar reader.