diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 5338e252e7e..0fc5ffb77e0 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -340,7 +340,7 @@ jobs: - name: run updated modules integration test (part-2) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 1` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 1` if [ ! -z $sub_modules ]; then ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -369,7 +369,7 @@ jobs: - name: run updated modules integration test (part-3) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 2` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 2` if [ ! -z $sub_modules ]; then ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -398,7 +398,7 @@ jobs: - name: run updated modules integration test (part-4) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 3` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 3` if [ ! -z $sub_modules ]; then ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -426,7 +426,7 @@ jobs: - name: run updated modules integration test (part-5) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 4` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 4` if [ ! -z $sub_modules ]; then ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -454,7 +454,7 @@ jobs: - name: run updated modules integration test (part-6) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 5` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 5` if [ ! -z $sub_modules ]; then ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -482,7 +482,7 @@ jobs: - name: run updated modules integration test (part-7) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 6` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 6` if [ ! -z $sub_modules ]; then ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -490,6 +490,36 @@ jobs: fi env: MAVEN_OPTS: -Xmx2048m + + updated-modules-integration-test-part-8: + needs: [ changes, sanity-check ] + if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' + runs-on: ${{ matrix.os }} + strategy: + matrix: + java: [ '8', '11' ] + os: [ 'ubuntu-latest' ] + timeout-minutes: 90 + steps: + - uses: actions/checkout@v2 + - name: Set up JDK ${{ matrix.java }} + uses: actions/setup-java@v3 + with: + java-version: ${{ matrix.java }} + distribution: 'temurin' + cache: 'maven' + - name: run updated modules integration test (part-8) + if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' + run: | + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 7` + if [ ! -z $sub_modules ]; then + ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci + else + echo "sub modules is empty, skipping" + fi + env: + MAVEN_OPTS: -Xmx2048m + engine-v2-it: needs: [ changes, sanity-check ] if: needs.changes.outputs.api == 'true' diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java index 2caf405ee0d..21f25f47286 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java @@ -143,6 +143,9 @@ public Optional> createCommitter() throws IO committer -> committers.put(tableIdentifier, (SinkCommitter) committer)); } + if (committers.isEmpty()) { + return Optional.empty(); + } return Optional.of(new MultiTableSinkCommitter(committers)); } @@ -162,6 +165,9 @@ public Optional> getCommitInfoSerializer() { sinkAggregatedCommitter -> aggCommitters.put(tableIdentifier, sinkAggregatedCommitter)); } + if (aggCommitters.isEmpty()) { + return Optional.empty(); + } return Optional.of(new MultiTableSinkAggregatedCommitter(aggCommitters)); } diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java index 2248ed5d8ee..b44e90b469c 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.utils.IdGenerator; import org.apache.seatunnel.engine.core.dag.actions.Action; +import org.apache.seatunnel.engine.core.dag.actions.SinkAction; import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -33,6 +34,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.net.URL; import java.nio.file.Paths; import java.util.Arrays; @@ -109,7 +111,7 @@ public void testMultipleSinkName() { } @Test - public void testMultipleTableSourceWithMultiTableSinkParse() { + public void testMultipleTableSourceWithMultiTableSinkParse() throws IOException { Common.setDeployMode(DeployMode.CLIENT); String filePath = TestUtils.getResource("/batch_fake_to_console_multi_table.conf"); JobConfig jobConfig = new JobConfig(); @@ -121,5 +123,9 @@ public void testMultipleTableSourceWithMultiTableSinkParse() { List actions = parse.getLeft(); Assertions.assertEquals(1, actions.size()); Assertions.assertEquals("MultiTableSink-Console", actions.get(0).getName()); + Assertions.assertFalse( + ((SinkAction) actions.get(0)).getSink().createCommitter().isPresent()); + Assertions.assertFalse( + ((SinkAction) actions.get(0)).getSink().createAggregatedCommitter().isPresent()); } }