Skip to content

Commit

Permalink
FlinkDeltaSorce_SupressFLinkLogs_FixNPE - Log Level for Flink classes (
Browse files Browse the repository at this point in the history
…delta-io#376)

* FlinkDeltaSorce_SupressFLinkLogs_FixNPE - Set Log level for Flink to ERROR, fix NPE in logs for a couple of tests.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* FlinkDeltaSorce_SupressFLinkLogs_FixNPE - repeat failed integration test

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
  • Loading branch information
kristoffSC committed Jun 15, 2022
1 parent 5f58b96 commit 0196bee
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 24 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ lazy val flink = (project in file("flink"))
"org.junit.vintage" % "junit-vintage-engine" % "5.8.2" % "test",
"org.mockito" % "mockito-junit-jupiter" % "4.5.0" % "test",
"org.junit.jupiter" % "junit-jupiter-params" % "5.8.2" % "test",
"io.github.artsok" % "rerunner-jupiter" % "2.1.6" % "test",

// Compiler plugins
// -- Bump up the genjavadoc version explicitly to 0.18 to work with Scala 2.12
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import io.delta.flink.utils.DeltaTestUtils;
import io.delta.flink.utils.FailoverType;
import io.delta.flink.utils.RecordCounterToFail.FailCheck;
import io.github.artsok.ParameterizedRepeatedIfExceptionsTest;
import io.github.artsok.RepeatedIfExceptionsTest;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
Expand All @@ -22,8 +24,6 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import static io.delta.flink.utils.ExecutionITCaseTestConstants.*;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -51,7 +51,9 @@ public void after() {
super.after();
}

@ParameterizedTest(name = "{index}: FailoverType = [{0}]")
@ParameterizedRepeatedIfExceptionsTest(
suspend = 2000L, repeats = 3, name = "{index}: FailoverType = [{0}]"
)
@EnumSource(FailoverType.class)
public void shouldReadDeltaTableUsingDeltaLogSchema(FailoverType failoverType)
throws Exception {
Expand All @@ -61,7 +63,9 @@ public void shouldReadDeltaTableUsingDeltaLogSchema(FailoverType failoverType)
shouldReadDeltaTable(deltaSource, failoverType);
}

@ParameterizedTest(name = "{index}: FailoverType = [{0}]")
@ParameterizedRepeatedIfExceptionsTest(
suspend = 2000L, repeats = 3, name = "{index}: FailoverType = [{0}]"
)
@EnumSource(FailoverType.class)
// NOTE that this test can take some time to finish since we are restarting JM here.
// It can be around 30 seconds or so.
Expand Down Expand Up @@ -103,7 +107,7 @@ public void shouldReadDeltaTableUsingUserSchema(FailoverType failoverType) throw
* </ul>
*
*/
@Test
@RepeatedIfExceptionsTest(suspend = 2000L, repeats = 3)
public void shouldReadLoadedSchemaVersion() throws Exception {

// Create a Delta source instance. In this step, builder discovered Delta table schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.delta.flink.utils.FailoverType;
import io.delta.flink.utils.RecordCounterToFail.FailCheck;
import io.delta.flink.utils.TableUpdateDescriptor;
import io.github.artsok.ParameterizedRepeatedIfExceptionsTest;
import io.github.artsok.RepeatedIfExceptionsTest;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand All @@ -36,8 +38,6 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import static io.delta.flink.utils.ExecutionITCaseTestConstants.*;
import static org.hamcrest.CoreMatchers.equalTo;
Expand Down Expand Up @@ -81,7 +81,9 @@ public void after() {
super.after();
}

@ParameterizedTest(name = "{index}: FailoverType = [{0}]")
@ParameterizedRepeatedIfExceptionsTest(
suspend = 2000L, repeats = 3, name = "{index}: FailoverType = [{0}]"
)
@EnumSource(FailoverType.class)
public void shouldReadTableWithNoUpdates(FailoverType failoverType) throws Exception {

Expand Down Expand Up @@ -115,7 +117,9 @@ public void shouldReadTableWithNoUpdates(FailoverType failoverType) throws Excep
equalTo(SURNAME_COLUMN_VALUES));
}

@ParameterizedTest(name = "{index}: FailoverType = [{0}]")
@ParameterizedRepeatedIfExceptionsTest(
suspend = 2000L, repeats = 3, name = "{index}: FailoverType = [{0}]"
)
@EnumSource(FailoverType.class)
public void shouldReadLargeDeltaTableWithNoUpdates(FailoverType failoverType) throws Exception {

Expand Down Expand Up @@ -146,7 +150,9 @@ public void shouldReadLargeDeltaTableWithNoUpdates(FailoverType failoverType) th
equalTo(LARGE_TABLE_RECORD_COUNT));
}

@ParameterizedTest(name = "{index}: FailoverType = [{0}]")
@ParameterizedRepeatedIfExceptionsTest(
suspend = 2000L, repeats = 3, name = "{index}: FailoverType = [{0}]"
)
@EnumSource(FailoverType.class)
// This test updates Delta Table 5 times, so it will take some time to finish.
public void shouldReadDeltaTableFromSnapshotAndUpdatesUsingUserSchema(FailoverType failoverType)
Expand All @@ -159,7 +165,9 @@ public void shouldReadDeltaTableFromSnapshotAndUpdatesUsingUserSchema(FailoverTy
shouldReadDeltaTableFromSnapshotAndUpdates(deltaSource, failoverType);
}

@ParameterizedTest(name = "{index}: FailoverType = [{0}]")
@ParameterizedRepeatedIfExceptionsTest(
suspend = 2000L, repeats = 3, name = "{index}: FailoverType = [{0}]"
)
@EnumSource(FailoverType.class)
// This test updates Delta Table 5 times, so it will take some time to finish. About 1 minute.
public void shouldReadDeltaTableFromSnapshotAndUpdatesUsingDeltaLogSchema(
Expand Down Expand Up @@ -211,7 +219,7 @@ public void shouldReadDeltaTableFromSnapshotAndUpdatesUsingDeltaLogSchema(
* </ul>
*
*/
@Test
@RepeatedIfExceptionsTest(suspend = 2000L, repeats = 3)
public void shouldReadLoadedSchemaVersion() throws Exception {

// Add version 1 to delta Table.
Expand Down
10 changes: 5 additions & 5 deletions flink/src/test/java/io/delta/flink/source/DeltaSourceITBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.delta.flink.utils.ExecutionITCaseTestConstants;
import io.delta.flink.utils.FailoverType;
import io.delta.flink.utils.RecordCounterToFail.FailCheck;
import io.github.artsok.RepeatedIfExceptionsTest;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
Expand All @@ -19,7 +20,6 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.jupiter.api.Test;
import org.junit.rules.TemporaryFolder;
import static io.delta.flink.utils.DeltaTestUtils.buildCluster;
import static io.delta.flink.utils.ExecutionITCaseTestConstants.*;
Expand Down Expand Up @@ -87,7 +87,7 @@ public void after() {
miniClusterResource.after();
}

@Test
@RepeatedIfExceptionsTest(suspend = 2000L, repeats = 3)
public void testReadPartitionedTableSkippingPartitionColumns() throws Exception {

// GIVEN, the full schema of used table is {name, surname, age} + col1, col2 as a partition
Expand Down Expand Up @@ -131,7 +131,7 @@ public void testReadPartitionedTableSkippingPartitionColumns() throws Exception
assertNoMoreColumns(resultData,3);
}

@Test
@RepeatedIfExceptionsTest(suspend = 2000L, repeats = 3)
public void testReadOnlyPartitionColumns() throws Exception {

// GIVEN, the full schema of used table is {name, surname, age} + col1, col2 as a partition
Expand Down Expand Up @@ -164,7 +164,7 @@ public void testReadOnlyPartitionColumns() throws Exception {
assertNoMoreColumns(resultData,2);
}

@Test
@RepeatedIfExceptionsTest(suspend = 2000L, repeats = 3)
public void testWithOnePartition() throws Exception {

// GIVEN, the full schema of used table is {name, surname, age} + col1, col2 as a partition
Expand Down Expand Up @@ -204,7 +204,7 @@ public void testWithOnePartition() throws Exception {
assertNoMoreColumns(resultData,3);
}

@Test
@RepeatedIfExceptionsTest(suspend = 2000L, repeats = 3)
public void testWithBothPartitions() throws Exception {

// GIVEN, the full schema of used table is {name, surname, age} + col1, col2 as a partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
import static java.util.Collections.singletonMap;

import io.delta.flink.source.internal.DeltaSourceConfiguration;
Expand All @@ -14,6 +15,7 @@
import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.util.EmptyIterator;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Before;
Expand All @@ -25,6 +27,7 @@
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -190,10 +193,17 @@ public void shouldCheckpointStateAfterSnapshotReadAndFirstChangeVersion() throws
@Test
public void shouldCheckpointStateAfterChangesProcessForChangesOnlyStream() throws Exception {
when(deltaLog.getSnapshotForVersionAsOf(HEAD_VERSION)).thenReturn(headSnapshot);
when(deltaLog.getChanges(HEAD_VERSION, true)).thenReturn(
Collections.singletonList(
new VersionLog(HEAD_VERSION, Collections.singletonList(ADD_FILE)))
.iterator());
when(deltaLog.getChanges(anyLong(), anyBoolean()))
.thenAnswer((Answer<Iterator<VersionLog>>) invocationOnMock -> {
long version = invocationOnMock.getArgument(0, Long.class);
if (version == HEAD_VERSION) {
return Collections.singletonList(
new VersionLog(HEAD_VERSION, Collections.singletonList(ADD_FILE))
).iterator();
} else {
return new EmptyIterator<>();
}
});

TestingSplitEnumeratorContext<DeltaSourceSplit> enumContext =
new TestingSplitEnumeratorContext<>(1);
Expand Down Expand Up @@ -251,6 +261,7 @@ public void shouldCheckpointStateAfterChangesProcessForChangesOnlyStream() throw
public void shouldCheckpointStateAfterSnapshotReadAndBeforeFirstChangeVersion()
throws Exception {
when(deltaLog.getSnapshotForVersionAsOf(HEAD_VERSION)).thenReturn(headSnapshot);
when(deltaLog.getChanges(anyLong(), anyBoolean())).thenReturn(new EmptyIterator<>());

ContinuousDeltaSourceSplitEnumerator enumerator =
splitEnumeratorProvider.createInitialStateEnumerator(
Expand Down
8 changes: 6 additions & 2 deletions flink/src/test/java/io/delta/flink/utils/DeltaTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeltaTestUtils {

private static final Logger LOG = LoggerFactory.getLogger(DeltaTestUtils.class);

///////////////////////////////////////////////////////////////////////////
// hadoop conf test utils
///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -97,7 +101,7 @@ public static void triggerFailover(FailoverType type, JobID jobId, Runnable afte

public static void triggerJobManagerFailover(
JobID jobId, Runnable afterFailAction, MiniCluster miniCluster) throws Exception {
System.out.println("Triggering Job Manager failover.");
LOG.info("Triggering Job Manager failover.");
HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
haLeadershipControl.revokeJobMasterLeadership(jobId).get();
afterFailAction.run();
Expand All @@ -106,7 +110,7 @@ public static void triggerJobManagerFailover(

public static void restartTaskManager(Runnable afterFailAction, MiniCluster miniCluster)
throws Exception {
System.out.println("Triggering Task Manager failover.");
LOG.info("Triggering Task Manager failover.");
miniCluster.terminateTaskManager(0).get();
afterFailAction.run();
miniCluster.startTaskManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.function.Predicate;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A wrapper class for {@link DataStream} that counts number of processed records and for each
Expand All @@ -15,8 +17,12 @@
*/
public class RecordCounterToFail implements Serializable {

private static final Logger LOG = LoggerFactory.getLogger(RecordCounterToFail.class);

private static AtomicInteger records;

private static CompletableFuture<Void> fail;

private static CompletableFuture<Void> continueProcessing;

/**
Expand Down Expand Up @@ -48,7 +54,7 @@ record -> {
*/
public static void waitToFail() throws Exception {
fail.get();
System.out.println("Wait to fail Finished.");
LOG.info("Fail.get finished.");
}

/**
Expand Down
2 changes: 2 additions & 0 deletions flink/src/test/resources/log4j2-test.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
logger.flink.name = org.apache.flink
logger.flink.level = ERROR

appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
Expand Down

0 comments on commit 0196bee

Please sign in to comment.