Skip to content

Commit

Permalink
[Feature][Transform-V2][SQL] Support 'select *' and 'like' clause for…
Browse files Browse the repository at this point in the history
… SQL Transform plugin (apache#4991)

Co-authored-by: mcy <rewrma@163.com>
  • Loading branch information
2 people authored and EricJoy2048 committed Jul 11, 2023
1 parent 33e1dfb commit e072ff4
Showing 1 changed file with 73 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class ZetaSQLEngine implements SQLEngine {
private ZetaSQLFilter zetaSQLFilter;
private ZetaSQLType zetaSQLType;

private Integer allColumnsCount = null;

public ZetaSQLEngine() {}

@Override
Expand Down Expand Up @@ -131,11 +133,11 @@ private void validateSQL(Statement statement) {
throw new IllegalArgumentException("Unsupported LIMIT,OFFSET syntax");
}

for (SelectItem selectItem : selectBody.getSelectItems()) {
if (selectItem instanceof AllColumns) {
throw new IllegalArgumentException("Unsupported all columns select syntax");
}
}
// for (SelectItem selectItem : selectBody.getSelectItems()) {
// if (selectItem instanceof AllColumns) {
// throw new IllegalArgumentException("Unsupported all columns select syntax");
// }
// }
} catch (Exception e) {
throw new TransformException(
CommonErrorCode.UNSUPPORTED_OPERATION,
Expand All @@ -147,40 +149,55 @@ private void validateSQL(Statement statement) {
public SeaTunnelRowType typeMapping(List<String> inputColumnsMapping) {
List<SelectItem> selectItems = selectBody.getSelectItems();

String[] fieldNames = new String[selectItems.size()];
SeaTunnelDataType<?>[] seaTunnelDataTypes = new SeaTunnelDataType<?>[selectItems.size()];
// count number of all columns
int columnsSize = countColumnsSize(selectItems);

String[] fieldNames = new String[columnsSize];
SeaTunnelDataType<?>[] seaTunnelDataTypes = new SeaTunnelDataType<?>[columnsSize];
if (inputColumnsMapping != null) {
for (int i = 0; i < selectItems.size(); i++) {
for (int i = 0; i < columnsSize; i++) {
inputColumnsMapping.add(null);
}
}

List<String> inputColumnNames =
Arrays.stream(inputRowType.getFieldNames()).collect(Collectors.toList());

for (int i = 0; i < selectItems.size(); i++) {
SelectItem selectItem = selectItems.get(i);
if (selectItem instanceof SelectExpressionItem) {
int idx = 0;
for (SelectItem selectItem : selectItems) {
if (selectItem instanceof AllColumns) {
for (int i = 0; i < inputRowType.getFieldNames().length; i++) {
fieldNames[idx] = inputRowType.getFieldName(i);
seaTunnelDataTypes[idx] = inputRowType.getFieldType(i);
if (inputColumnsMapping != null) {
inputColumnsMapping.set(idx, inputRowType.getFieldName(i));
}
idx++;
}
} else if (selectItem instanceof SelectExpressionItem) {
SelectExpressionItem expressionItem = (SelectExpressionItem) selectItem;
Expression expression = expressionItem.getExpression();

if (expressionItem.getAlias() != null) {
fieldNames[i] = expressionItem.getAlias().getName();
fieldNames[idx] = expressionItem.getAlias().getName();
} else {
if (expression instanceof Column) {
fieldNames[i] = ((Column) expression).getColumnName();
fieldNames[idx] = ((Column) expression).getColumnName();
} else {
fieldNames[i] = expression.toString();
fieldNames[idx] = expression.toString();
}
}

if (inputColumnsMapping != null
&& expression instanceof Column
&& inputColumnNames.contains(((Column) expression).getColumnName())) {
inputColumnsMapping.set(i, ((Column) expression).getColumnName());
inputColumnsMapping.set(idx, ((Column) expression).getColumnName());
}

seaTunnelDataTypes[i] = zetaSQLType.getExpressionType(expression);
seaTunnelDataTypes[idx] = zetaSQLType.getExpressionType(expression);
idx++;
} else {
idx++;
}
}
return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
Expand Down Expand Up @@ -214,13 +231,47 @@ private Object[] scanTable(SeaTunnelRow inputRow) {

private Object[] project(Object[] inputFields) {
List<SelectItem> selectItems = selectBody.getSelectItems();
Object[] fields = new Object[selectItems.size()];
for (int i = 0; i < selectItems.size(); i++) {
SelectItem selectItem = selectItems.get(i);
SelectExpressionItem expressionItem = (SelectExpressionItem) selectItem;
Expression expression = expressionItem.getExpression();
fields[i] = zetaSQLFunction.computeForValue(expression, inputFields);

int columnsSize = countColumnsSize(selectItems);

Object[] fields = new Object[columnsSize];
for (int i = 0; i < columnsSize; i++) {
fields[i] = null;
}

int idx = 0;
for (SelectItem selectItem : selectItems) {
if (selectItem instanceof AllColumns) {
for (Object inputField : inputFields) {
fields[idx] = inputField;
idx++;
}
} else if (selectItem instanceof SelectExpressionItem) {
SelectExpressionItem expressionItem = (SelectExpressionItem) selectItem;
Expression expression = expressionItem.getExpression();
fields[idx] = zetaSQLFunction.computeForValue(expression, inputFields);
idx++;
} else {
idx++;
}
}
return fields;
}

private int countColumnsSize(List<SelectItem> selectItems) {
if (allColumnsCount != null) {
return allColumnsCount;
}
int allColumnsCnt = 0;
for (SelectItem selectItem : selectItems) {
if (selectItem instanceof AllColumns) {
allColumnsCnt++;
}
}
allColumnsCount =
selectItems.size()
+ inputRowType.getFieldNames().length * allColumnsCnt
- allColumnsCnt;
return allColumnsCount;
}
}

0 comments on commit e072ff4

Please sign in to comment.