Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][SQL-Transform] Remove escape identifier from output fields #7297

Merged
merged 1 commit into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ transform {
Sql {
source_table_name = "fake"
result_table_name = "fake1"
query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1, ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1, nullif(name,'Joy Ding_') name2, cast(c4 as timestamp) as c4_1, cast(c4 as decimal(17,4)) as c4_2, cast(c5 as date) as c5, cast(c6 as time) as c6, cast(name as bytes) as c7 from fake"
query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1, ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1, nullif(name,'Joy Ding_') name2, cast(c4 as timestamp) as c4_1, cast(c4 as decimal(17,4)) as c4_2, cast(c5 as date) as c5, cast(c6 as time) as c6, cast(name as bytes) as c7, name as `apply` from fake"
}
}

Expand Down Expand Up @@ -164,6 +164,13 @@ sink {
rule_type = NOT_NULL
}
]
},
{
field_name = "apply"
field_type = "string"
field_value = [
{equals_to = "Joy Ding"}
]
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@

public class ZetaSQLEngine implements SQLEngine {
private static final Logger log = LoggerFactory.getLogger(ZetaSQLEngine.class);
public static final String ESCAPE_IDENTIFIER = "`";

private String inputTableName;
@Nullable private String catalogTableName;
private SeaTunnelRowType inputRowType;
Expand Down Expand Up @@ -193,9 +195,13 @@ public SeaTunnelRowType typeMapping(List<String> inputColumnsMapping) {
} else if (selectItem instanceof SelectExpressionItem) {
SelectExpressionItem expressionItem = (SelectExpressionItem) selectItem;
Expression expression = expressionItem.getExpression();

if (expressionItem.getAlias() != null) {
fieldNames[idx] = expressionItem.getAlias().getName();
String aliasName = expressionItem.getAlias().getName();
if (aliasName.startsWith(ESCAPE_IDENTIFIER)
&& aliasName.endsWith(ESCAPE_IDENTIFIER)) {
aliasName = aliasName.substring(1, aliasName.length() - 1);
}
fieldNames[idx] = aliasName;
} else {
if (expression instanceof Column) {
fieldNames[idx] = ((Column) expression).getColumnName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ public Object computeForValue(Expression expression, Object[] inputFields) {
Column columnExp = (Column) expression;
String columnName = columnExp.getColumnName();
int index = inputRowType.indexOf(columnName, false);
if (index == -1
&& columnName.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
&& columnName.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
columnName = columnName.substring(1, columnName.length() - 1);
index = inputRowType.indexOf(columnName, false);
}

if (index != -1) {
return inputFields[index];
} else {
Expand All @@ -237,11 +244,26 @@ public Object computeForValue(Expression expression, Object[] inputFields) {
SeaTunnelRow parRowValues = new SeaTunnelRow(inputFields);
Object res = parRowValues;
for (int i = 0; i < deep; i++) {
String key = columnNames[i];
if (parDataType instanceof MapType) {
return ((Map) res).get(columnNames[i]);
Map<String, Object> mapValue = ((Map) res);
if (mapValue.containsKey(key)) {
return mapValue.get(key);
} else if (key.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
&& key.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
key = key.substring(1, key.length() - 1);
return mapValue.get(key);
}
return null;
}
parRowValues = (SeaTunnelRow) res;
int idx = ((SeaTunnelRowType) parDataType).indexOf(columnNames[i], false);
int idx = ((SeaTunnelRowType) parDataType).indexOf(key, false);
if (idx == -1
&& key.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
&& key.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
key = key.substring(1, key.length() - 1);
idx = ((SeaTunnelRowType) parDataType).indexOf(key, false);
}
if (idx == -1) {
throw new IllegalArgumentException(
String.format("can't find field [%s]", fullyQualifiedName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ public SeaTunnelDataType<?> getExpressionType(Expression expression) {
Column columnExp = (Column) expression;
String columnName = columnExp.getColumnName();
int index = inputRowType.indexOf(columnName, false);
if (index == -1
&& columnName.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
&& columnName.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
columnName = columnName.substring(1, columnName.length() - 1);
index = inputRowType.indexOf(columnName, false);
}

if (index != -1) {
return inputRowType.getFieldType(index);
} else {
Expand All @@ -121,7 +128,14 @@ public SeaTunnelDataType<?> getExpressionType(Expression expression) {
SeaTunnelRowType parRowType = inputRowType;
SeaTunnelDataType<?> filedTypeRes = null;
for (int i = 0; i < deep; i++) {
int idx = parRowType.indexOf(columnNames[i], false);
String key = columnNames[i];
int idx = parRowType.indexOf(key, false);
if (idx == -1
&& key.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
&& key.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
key = key.substring(1, key.length() - 1);
idx = parRowType.indexOf(key, false);
}
if (idx == -1) {
throw new IllegalArgumentException(
String.format("can't find field [%s]", fullyQualifiedName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;

Expand Down Expand Up @@ -144,4 +148,136 @@ private CatalogTable getCatalogTable() {
new ArrayList<>(),
"It has column information.");
}

@Test
public void testEscapeIdentifier() {
String tableName = "test";
String[] fields = new String[] {"id", "apply"};
CatalogTable table =
CatalogTableUtil.getCatalogTable(
tableName,
new SeaTunnelRowType(
fields,
new SeaTunnelDataType[] {
BasicType.INT_TYPE, BasicType.STRING_TYPE
}));
ReadonlyConfig config =
ReadonlyConfig.fromMap(
Collections.singletonMap(
"query",
"select id, trim(`apply`) as `apply` from test where `apply` = 'a'"));
SQLTransform sqlTransform = new SQLTransform(config, table);
TableSchema tableSchema = sqlTransform.transformTableSchema();
SeaTunnelRow result =
sqlTransform.transformRow(
new SeaTunnelRow(new Object[] {Integer.valueOf(1), String.valueOf("a")}));
Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
Assertions.assertEquals("a", result.getField(1));
result =
sqlTransform.transformRow(
new SeaTunnelRow(new Object[] {Integer.valueOf(1), String.valueOf("b")}));
Assertions.assertNull(result);

config =
ReadonlyConfig.fromMap(
Collections.singletonMap(
"query",
"select id, IFNULL(`apply`, '1') as `apply` from test where `apply` = 'a'"));
sqlTransform = new SQLTransform(config, table);
tableSchema = sqlTransform.transformTableSchema();
result =
sqlTransform.transformRow(
new SeaTunnelRow(new Object[] {Integer.valueOf(1), String.valueOf("a")}));
Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
Assertions.assertEquals(
BasicType.STRING_TYPE, tableSchema.getColumns().get(1).getDataType());
Assertions.assertEquals("a", result.getField(1));

table =
CatalogTableUtil.getCatalogTable(
tableName,
new SeaTunnelRowType(
fields,
new SeaTunnelDataType[] {BasicType.INT_TYPE, BasicType.LONG_TYPE}));
config =
ReadonlyConfig.fromMap(
Collections.singletonMap(
"query",
"select id, `apply` + 1 as `apply` from test where `apply` > 0"));
sqlTransform = new SQLTransform(config, table);
tableSchema = sqlTransform.transformTableSchema();
result =
sqlTransform.transformRow(
new SeaTunnelRow(new Object[] {Integer.valueOf(1), Long.valueOf(1)}));
Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
Assertions.assertEquals(BasicType.LONG_TYPE, tableSchema.getColumns().get(1).getDataType());
Assertions.assertEquals(Long.valueOf(2), result.getField(1));
result =
sqlTransform.transformRow(
new SeaTunnelRow(new Object[] {Integer.valueOf(1), Long.valueOf(0)}));
Assertions.assertNull(result);

table =
CatalogTableUtil.getCatalogTable(
tableName,
new SeaTunnelRowType(
fields,
new SeaTunnelDataType[] {
BasicType.INT_TYPE,
new MapType<String, String>(
BasicType.STRING_TYPE, BasicType.STRING_TYPE)
}));
config =
ReadonlyConfig.fromMap(
Collections.singletonMap(
"query",
"select id, `apply`.k1 as `apply` from test where `apply`.k1 = 'a'"));
sqlTransform = new SQLTransform(config, table);
tableSchema = sqlTransform.transformTableSchema();
result =
sqlTransform.transformRow(
new SeaTunnelRow(
new Object[] {
Integer.valueOf(1), Collections.singletonMap("k1", "a")
}));
Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
Assertions.assertEquals(
BasicType.STRING_TYPE, tableSchema.getColumns().get(1).getDataType());
Assertions.assertEquals("a", result.getField(1));
result =
sqlTransform.transformRow(
new SeaTunnelRow(
new Object[] {
Integer.valueOf(1), Collections.singletonMap("k1", "b")
}));
Assertions.assertNull(result);

table =
CatalogTableUtil.getCatalogTable(
tableName,
new SeaTunnelRowType(
new String[] {"id", "map"},
new SeaTunnelDataType[] {
BasicType.INT_TYPE,
new MapType<String, String>(
BasicType.STRING_TYPE, BasicType.STRING_TYPE)
}));
config =
ReadonlyConfig.fromMap(
Collections.singletonMap(
"query",
"select id, map.`apply` as `apply` from test where map.`apply` = 'a'"));
sqlTransform = new SQLTransform(config, table);
tableSchema = sqlTransform.transformTableSchema();
result =
sqlTransform.transformRow(
new SeaTunnelRow(
new Object[] {
Integer.valueOf(1), Collections.singletonMap("apply", "a")
}));
Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
Assertions.assertEquals(
BasicType.STRING_TYPE, tableSchema.getColumns().get(1).getDataType());
Assertions.assertEquals("a", result.getField(1));
}
}
Loading