Skip to content

Commit

Permalink
[Fix] Fix log error when multi-table sink close (#5683)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Oct 25, 2023
1 parent 740c144 commit fea4b6f
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand All @@ -50,7 +51,8 @@
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULES;

@AutoService(SeaTunnelSink.class)
public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {
private SeaTunnelRowType seaTunnelRowType;
private CatalogTable catalogTable;
private List<AssertFieldRule> assertFieldRules;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.assertion.sink;

import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.assertion.excecutor.AssertExecutor;
Expand All @@ -33,7 +34,8 @@
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.LongAccumulator;

public class AssertSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
public class AssertSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {

private final SeaTunnelRowType seaTunnelRowType;
private final List<AssertFieldRule> assertFieldRules;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public void write(SeaTunnelRow element) throws IOException {

@Override
public List<MultiTableState> snapshotState(long checkpointId) throws IOException {
checkQueueRemain();
subSinkErrorCheck();
List<MultiTableState> multiTableStates = new ArrayList<>();
MultiTableState multiTableState = new MultiTableState(new HashMap<>());
Expand All @@ -174,6 +175,7 @@ public List<MultiTableState> snapshotState(long checkpointId) throws IOException

@Override
public Optional<MultiTableCommitInfo> prepareCommit() throws IOException {
checkQueueRemain();
subSinkErrorCheck();
MultiTableCommitInfo multiTableCommitInfo = new MultiTableCommitInfo(new HashMap<>());
for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
Expand All @@ -194,6 +196,7 @@ public Optional<MultiTableCommitInfo> prepareCommit() throws IOException {

@Override
public void abortPrepare() {
checkQueueRemain();
Throwable firstE = null;
for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
synchronized (runnable.get(i)) {
Expand All @@ -217,6 +220,7 @@ public void abortPrepare() {

@Override
public void close() throws IOException {
checkQueueRemain();
executorService.shutdownNow();
Throwable firstE = null;
for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
Expand All @@ -243,4 +247,17 @@ public void close() throws IOException {
throw new RuntimeException(firstE);
}
}

private void checkQueueRemain() {
try {
for (BlockingQueue<SeaTunnelRow> blockingQueue : blockingQueues) {
while (!blockingQueue.isEmpty()) {
Thread.sleep(100);
subSinkErrorCheck();
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public void run() {
synchronized (this) {
writer.write(row);
}
} catch (InterruptedException e) {
// When the job finished, the thread will be interrupted, so we ignore this
// exception.
throwable = e;
break;
} catch (Exception e) {
log.error("MultiTableWriterRunnable error", e);
throwable = e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ public void testFakeConnector(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult fakeWithTableNames =
container.executeJob("/fake_to_console_with_multitable_mode.conf");
Assertions.assertFalse(
container.getServerLogs().contains("MultiTableWriterRunnable error"));
Assertions.assertEquals(0, fakeWithTableNames.getExitCode());

Container.ExecResult fakeWithException =
container.executeJob("/fake_to_assert_with_multitable_exception.conf");
Assertions.assertTrue(container.getServerLogs().contains("MultiTableWriterRunnable error"));
Assertions.assertTrue(
container
.getServerLogs()
.contains(
"at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.checkQueueRemain(MultiTableSinkWriter.java"));
Assertions.assertEquals(1, fakeWithException.getExitCode());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
execution.parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [3, "C", 200]
},
{
kind = INSERT
fields = [3, "C", 100]
}
]
}
}

sink {
Assert {
rules {
field_rules = [
{
field_name = score
field_type = int
field_value = [
{
rule_type = MAX
rule_value = 100
}
]
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -255,11 +253,7 @@ public void testAutoGenerateSQL(TestContainer container)
for (String CONFIG_FILE : PG_CONFIG_FILE_LIST) {
Container.ExecResult execResult = container.executeJob(CONFIG_FILE);
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertTimeout(
Duration.of(10, ChronoUnit.SECONDS),
() ->
Assertions.assertIterableEquals(
querySql(SOURCE_SQL), querySql(SINK_SQL)));
Assertions.assertIterableEquals(querySql(SOURCE_SQL), querySql(SINK_SQL));
executeSQL("truncate table pg_e2e_sink_table");
log.info(CONFIG_FILE + " e2e test completed");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ void executeExtraCommands(ContainerExtendedFactory extendedFactory)
throws IOException, InterruptedException;

Container.ExecResult executeJob(String confFile) throws IOException, InterruptedException;

String getServerLogs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,9 @@ public Container.ExecResult executeJob(String confFile)
log.info("test in container: {}", identifier());
return executeJob(jobManager, confFile);
}

@Override
public String getServerLogs() {
return jobManager.getLogs();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,9 @@ public Container.ExecResult executeJob(String confFile)
log.info("test in container: {}", identifier());
return executeJob(server, confFile);
}

@Override
public String getServerLogs() {
return server.getLogs();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,9 @@ public Container.ExecResult executeJob(String confFile)
log.info("test in container: {}", identifier());
return executeJob(master, confFile);
}

@Override
public String getServerLogs() {
return master.getLogs();
}
}

0 comments on commit fea4b6f

Please sign in to comment.