Skip to content

Commit

Permalink
[Improve][API] Check catalog table fields name legal before send to d…
Browse files Browse the repository at this point in the history
…ownstream (apache#7358)

* [Improve][API] Check catalog table fields name legal before send to downstream

* update
  • Loading branch information
Hisoka-X authored Aug 12, 2024
1 parent 068c5e3 commit fa34ac9
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 1 deletion.
38 changes: 38 additions & 0 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run seatunnel zeta integration test
if: needs.changes.outputs.api == 'true'
run: |
Expand Down Expand Up @@ -609,6 +611,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run transform-v2 integration test (part-1)
if: needs.changes.outputs.api == 'true'
run: |
Expand All @@ -633,6 +637,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run transform-v2 integration test (part-2)
if: needs.changes.outputs.api == 'true'
run: |
Expand All @@ -657,6 +663,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-1)
if: needs.changes.outputs.api == 'true'
run: |
Expand Down Expand Up @@ -684,6 +692,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-2)
if: needs.changes.outputs.api == 'true'
run: |
Expand Down Expand Up @@ -711,6 +721,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-3)
if: needs.changes.outputs.api == 'true'
run: |
Expand Down Expand Up @@ -738,6 +750,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-4)
if: needs.changes.outputs.api == 'true'
run: |
Expand Down Expand Up @@ -765,6 +779,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-5)
if: needs.changes.outputs.api == 'true'
run: |
Expand Down Expand Up @@ -792,6 +808,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-6)
if: needs.changes.outputs.api == 'true'
run: |
Expand Down Expand Up @@ -819,6 +837,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-7)
if: needs.changes.outputs.api == 'true'
run: |
Expand Down Expand Up @@ -898,6 +918,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run jdbc connectors integration test (part-3)
if: needs.changes.outputs.api == 'true'
run: |
Expand All @@ -922,6 +944,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run jdbc connectors integration test (part-4)
if: needs.changes.outputs.api == 'true'
run: |
Expand All @@ -946,6 +970,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run jdbc connectors integration test (part-5)
if: needs.changes.outputs.api == 'true'
run: |
Expand Down Expand Up @@ -996,6 +1022,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run jdbc connectors integration test (part-7)
if: needs.changes.outputs.api == 'true'
run: |
Expand All @@ -1020,6 +1048,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run kudu connector integration test
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-kudu-e2e -am -Pci
Expand All @@ -1043,6 +1073,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run amazonsqs connector integration test
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-amazonsqs-e2e -am -Pci
Expand All @@ -1066,6 +1098,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run kafka connector integration test
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-kafka-e2e -am -Pci
Expand All @@ -1089,6 +1123,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run rocket connector integration test
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-rocketmq-e2e -am -Pci
Expand Down Expand Up @@ -1139,6 +1175,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run oracle cdc connector integration test
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-cdc-oracle-e2e -am -Pci
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
package org.apache.seatunnel.api.table.factory;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.common.utils.SeaTunnelException;

import org.apache.commons.lang3.StringUtils;

import lombok.Getter;

import java.util.ArrayList;
import java.util.List;

@Getter
public abstract class TableFactoryContext {

Expand All @@ -31,4 +38,25 @@ public TableFactoryContext(ReadonlyConfig options, ClassLoader classLoader) {
this.options = options;
this.classLoader = classLoader;
}

protected static void checkCatalogTableIllegal(List<CatalogTable> catalogTables) {
for (CatalogTable catalogTable : catalogTables) {
List<String> alreadyChecked = new ArrayList<>();
for (String fieldName : catalogTable.getTableSchema().getFieldNames()) {
if (StringUtils.isBlank(fieldName)) {
throw new SeaTunnelException(
String.format(
"Table %s field name cannot be empty",
catalogTable.getTablePath().getFullName()));
}
if (alreadyChecked.contains(fieldName)) {
throw new SeaTunnelException(
String.format(
"Table %s field %s duplicate",
catalogTable.getTablePath().getFullName(), fieldName));
}
alreadyChecked.add(fieldName);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,24 @@
import org.apache.seatunnel.api.sink.TablePlaceholder;
import org.apache.seatunnel.api.table.catalog.CatalogTable;

import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;

import java.util.Collection;
import java.util.Collections;

@Getter
public class TableSinkFactoryContext extends TableFactoryContext {

private final CatalogTable catalogTable;

protected TableSinkFactoryContext(
@VisibleForTesting
public TableSinkFactoryContext(
CatalogTable catalogTable, ReadonlyConfig options, ClassLoader classLoader) {
super(options, classLoader);
if (catalogTable != null) {
checkCatalogTableIllegal(Collections.singletonList(catalogTable));
}
this.catalogTable = catalogTable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class TableTransformFactoryContext extends TableFactoryContext {
public TableTransformFactoryContext(
List<CatalogTable> catalogTables, ReadonlyConfig options, ClassLoader classLoader) {
super(options, classLoader);
checkCatalogTableIllegal(catalogTables);
this.catalogTables = catalogTables;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
package org.apache.seatunnel.api.table.catalog;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.SeaTunnelException;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -89,4 +93,62 @@ public void testReadCatalogTableWithUnsupportedType() {
});
Assertions.assertEquals(result, exception.getParamsValueAs("tableUnsupportedTypes"));
}

@Test
public void testCatalogTableWithIllegalFieldNames() {
CatalogTable catalogTable =
CatalogTable.of(
TableIdentifier.of("catalog", "database", "table"),
TableSchema.builder()
.column(
PhysicalColumn.of(
" ", BasicType.STRING_TYPE, 1L, true, null, ""))
.build(),
Collections.emptyMap(),
Collections.emptyList(),
"comment");
SeaTunnelException exception =
Assertions.assertThrows(
SeaTunnelException.class,
() ->
new TableTransformFactoryContext(
Collections.singletonList(catalogTable), null, null));
SeaTunnelException exception2 =
Assertions.assertThrows(
SeaTunnelException.class,
() -> new TableSinkFactoryContext(catalogTable, null, null));
Assertions.assertEquals(
"Table database.table field name cannot be empty", exception.getMessage());
Assertions.assertEquals(
"Table database.table field name cannot be empty", exception2.getMessage());

CatalogTable catalogTable2 =
CatalogTable.of(
TableIdentifier.of("catalog", "database", "table"),
TableSchema.builder()
.column(
PhysicalColumn.of(
"name1", BasicType.STRING_TYPE, 1L, true, null, ""))
.column(
PhysicalColumn.of(
"name1", BasicType.STRING_TYPE, 1L, true, null, ""))
.build(),
Collections.emptyMap(),
Collections.emptyList(),
"comment");
SeaTunnelException exception3 =
Assertions.assertThrows(
SeaTunnelException.class,
() ->
new TableTransformFactoryContext(
Collections.singletonList(catalogTable2), null, null));
SeaTunnelException exception4 =
Assertions.assertThrows(
SeaTunnelException.class,
() -> new TableSinkFactoryContext(catalogTable2, null, null));
Assertions.assertEquals(
"Table database.table field name1 duplicate", exception3.getMessage());
Assertions.assertEquals(
"Table database.table field name1 duplicate", exception4.getMessage());
}
}

0 comments on commit fa34ac9

Please sign in to comment.