diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java index 32b68b8973f..7a9e633ea3e 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java @@ -54,6 +54,8 @@ public class ZetaSQLEngine implements SQLEngine { private ZetaSQLFilter zetaSQLFilter; private ZetaSQLType zetaSQLType; + private Integer allColumnsCount = null; + public ZetaSQLEngine() {} @Override @@ -131,11 +133,11 @@ private void validateSQL(Statement statement) { throw new IllegalArgumentException("Unsupported LIMIT,OFFSET syntax"); } - for (SelectItem selectItem : selectBody.getSelectItems()) { - if (selectItem instanceof AllColumns) { - throw new IllegalArgumentException("Unsupported all columns select syntax"); - } - } + // for (SelectItem selectItem : selectBody.getSelectItems()) { + // if (selectItem instanceof AllColumns) { + // throw new IllegalArgumentException("Unsupported all columns select syntax"); + // } + // } } catch (Exception e) { throw new TransformException( CommonErrorCode.UNSUPPORTED_OPERATION, @@ -147,10 +149,13 @@ private void validateSQL(Statement statement) { public SeaTunnelRowType typeMapping(List inputColumnsMapping) { List selectItems = selectBody.getSelectItems(); - String[] fieldNames = new String[selectItems.size()]; - SeaTunnelDataType[] seaTunnelDataTypes = new SeaTunnelDataType[selectItems.size()]; + // count number of all columns + int columnsSize = countColumnsSize(selectItems); + + String[] fieldNames = new String[columnsSize]; + SeaTunnelDataType[] seaTunnelDataTypes = new SeaTunnelDataType[columnsSize]; if (inputColumnsMapping != null) { - for (int i = 0; i < selectItems.size(); i++) { + for (int i = 0; i < columnsSize; i++) { inputColumnsMapping.add(null); } } @@ -158,29 +163,41 @@ public SeaTunnelRowType typeMapping(List inputColumnsMapping) { List inputColumnNames = Arrays.stream(inputRowType.getFieldNames()).collect(Collectors.toList()); - for (int i = 0; i < selectItems.size(); i++) { - SelectItem selectItem = selectItems.get(i); - if (selectItem instanceof SelectExpressionItem) { + int idx = 0; + for (SelectItem selectItem : selectItems) { + if (selectItem instanceof AllColumns) { + for (int i = 0; i < inputRowType.getFieldNames().length; i++) { + fieldNames[idx] = inputRowType.getFieldName(i); + seaTunnelDataTypes[idx] = inputRowType.getFieldType(i); + if (inputColumnsMapping != null) { + inputColumnsMapping.set(idx, inputRowType.getFieldName(i)); + } + idx++; + } + } else if (selectItem instanceof SelectExpressionItem) { SelectExpressionItem expressionItem = (SelectExpressionItem) selectItem; Expression expression = expressionItem.getExpression(); if (expressionItem.getAlias() != null) { - fieldNames[i] = expressionItem.getAlias().getName(); + fieldNames[idx] = expressionItem.getAlias().getName(); } else { if (expression instanceof Column) { - fieldNames[i] = ((Column) expression).getColumnName(); + fieldNames[idx] = ((Column) expression).getColumnName(); } else { - fieldNames[i] = expression.toString(); + fieldNames[idx] = expression.toString(); } } if (inputColumnsMapping != null && expression instanceof Column && inputColumnNames.contains(((Column) expression).getColumnName())) { - inputColumnsMapping.set(i, ((Column) expression).getColumnName()); + inputColumnsMapping.set(idx, ((Column) expression).getColumnName()); } - seaTunnelDataTypes[i] = zetaSQLType.getExpressionType(expression); + seaTunnelDataTypes[idx] = zetaSQLType.getExpressionType(expression); + idx++; + } else { + idx++; } } return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes); @@ -214,13 +231,47 @@ private Object[] scanTable(SeaTunnelRow inputRow) { private Object[] project(Object[] inputFields) { List selectItems = selectBody.getSelectItems(); - Object[] fields = new Object[selectItems.size()]; - for (int i = 0; i < selectItems.size(); i++) { - SelectItem selectItem = selectItems.get(i); - SelectExpressionItem expressionItem = (SelectExpressionItem) selectItem; - Expression expression = expressionItem.getExpression(); - fields[i] = zetaSQLFunction.computeForValue(expression, inputFields); + + int columnsSize = countColumnsSize(selectItems); + + Object[] fields = new Object[columnsSize]; + for (int i = 0; i < columnsSize; i++) { + fields[i] = null; + } + + int idx = 0; + for (SelectItem selectItem : selectItems) { + if (selectItem instanceof AllColumns) { + for (Object inputField : inputFields) { + fields[idx] = inputField; + idx++; + } + } else if (selectItem instanceof SelectExpressionItem) { + SelectExpressionItem expressionItem = (SelectExpressionItem) selectItem; + Expression expression = expressionItem.getExpression(); + fields[idx] = zetaSQLFunction.computeForValue(expression, inputFields); + idx++; + } else { + idx++; + } } return fields; } + + private int countColumnsSize(List selectItems) { + if (allColumnsCount != null) { + return allColumnsCount; + } + int allColumnsCnt = 0; + for (SelectItem selectItem : selectItems) { + if (selectItem instanceof AllColumns) { + allColumnsCnt++; + } + } + allColumnsCount = + selectItems.size() + + inputRowType.getFieldNames().length * allColumnsCnt + - allColumnsCnt; + return allColumnsCount; + } }