Skip to content

Commit

Permalink
[Fix] Fix MultiTableSink return committer but sink do not support (#5710
Browse files Browse the repository at this point in the history
)

* [Fix] Fix MultiTableSink return committer but sink do not support

* update

* update

* update

* update
  • Loading branch information
Hisoka-X authored Oct 27, 2023
1 parent fce9dda commit c413040
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 7 deletions.
42 changes: 36 additions & 6 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ jobs:
- name: run updated modules integration test (part-2)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 1`
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 1`
if [ ! -z $sub_modules ]; then
./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
else
Expand Down Expand Up @@ -369,7 +369,7 @@ jobs:
- name: run updated modules integration test (part-3)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 2`
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 2`
if [ ! -z $sub_modules ]; then
./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
else
Expand Down Expand Up @@ -398,7 +398,7 @@ jobs:
- name: run updated modules integration test (part-4)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 3`
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 3`
if [ ! -z $sub_modules ]; then
./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
else
Expand Down Expand Up @@ -426,7 +426,7 @@ jobs:
- name: run updated modules integration test (part-5)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 4`
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 4`
if [ ! -z $sub_modules ]; then
./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
else
Expand Down Expand Up @@ -454,7 +454,7 @@ jobs:
- name: run updated modules integration test (part-6)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 5`
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 5`
if [ ! -z $sub_modules ]; then
./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
else
Expand Down Expand Up @@ -482,14 +482,44 @@ jobs:
- name: run updated modules integration test (part-7)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 7 6`
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 6`
if [ ! -z $sub_modules ]; then
./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
else
echo "sub modules is empty, skipping"
fi
env:
MAVEN_OPTS: -Xmx2048m

updated-modules-integration-test-part-8:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
runs-on: ${{ matrix.os }}
strategy:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v3
with:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: run updated modules integration test (part-8)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 7`
if [ ! -z $sub_modules ]; then
./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
else
echo "sub modules is empty, skipping"
fi
env:
MAVEN_OPTS: -Xmx2048m

engine-v2-it:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ public Optional<SinkCommitter<MultiTableCommitInfo>> createCommitter() throws IO
committer ->
committers.put(tableIdentifier, (SinkCommitter<?>) committer));
}
if (committers.isEmpty()) {
return Optional.empty();
}
return Optional.of(new MultiTableSinkCommitter(committers));
}

Expand All @@ -162,6 +165,9 @@ public Optional<Serializer<MultiTableCommitInfo>> getCommitInfoSerializer() {
sinkAggregatedCommitter ->
aggCommitters.put(tableIdentifier, sinkAggregatedCommitter));
}
if (aggCommitters.isEmpty()) {
return Optional.empty();
}
return Optional.of(new MultiTableSinkAggregatedCommitter(aggCommitters));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;

import org.apache.commons.lang3.tuple.ImmutablePair;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.Arrays;
Expand Down Expand Up @@ -109,7 +111,7 @@ public void testMultipleSinkName() {
}

@Test
public void testMultipleTableSourceWithMultiTableSinkParse() {
public void testMultipleTableSourceWithMultiTableSinkParse() throws IOException {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/batch_fake_to_console_multi_table.conf");
JobConfig jobConfig = new JobConfig();
Expand All @@ -121,5 +123,9 @@ public void testMultipleTableSourceWithMultiTableSinkParse() {
List<Action> actions = parse.getLeft();
Assertions.assertEquals(1, actions.size());
Assertions.assertEquals("MultiTableSink-Console", actions.get(0).getName());
Assertions.assertFalse(
((SinkAction) actions.get(0)).getSink().createCommitter().isPresent());
Assertions.assertFalse(
((SinkAction) actions.get(0)).getSink().createAggregatedCommitter().isPresent());
}
}

0 comments on commit c413040

Please sign in to comment.