Skip to content

Commit

Permalink
Flink connector Test Util Cleanup - Move few methods from Flink sink …
Browse files Browse the repository at this point in the history
…test utils and ExecutionITCaseBase class to common util class (delta-io#372)

* TestUtilCleanup - Move few methods from Flink sink test utils to common Flink connector tests utils.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanup - delta-io#2 Move few methods from Flink sink test utils to common Flink connector tests utils.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanup - delta-io#3 Move few methods from Flink sink test utils to common Flink connector tests utils.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanUp - Changes after Review

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanUp - new changes

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanUp - more refactoring

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanUp - additional refactoring

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanUp - Merge From master + remove source partition table. Add log4j property file for tests.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanUp - Merge From master + remove source partition table. Add log4j property file for tests.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

Co-authored-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
  • Loading branch information
kristoffSC and Krzysztof Chmielewski committed Jun 14, 2022
1 parent 42e0238 commit 5f58b96
Show file tree
Hide file tree
Showing 29 changed files with 721 additions and 513 deletions.
17 changes: 0 additions & 17 deletions flink/src/test/java/io/delta/flink/DeltaTestUtils.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@

import io.delta.flink.sink.internal.DeltaSinkInternal;
import io.delta.flink.sink.utils.DeltaSinkTestUtils;
import io.delta.flink.sink.utils.TestParquetReader;
import io.delta.flink.utils.DeltaTestUtils;
import io.delta.flink.utils.TestParquetReader;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
Expand Down Expand Up @@ -57,7 +58,7 @@ public class DeltaSinkBatchExecutionITCase extends BatchExecutionFileSinkITCase
public void setup() {
try {
deltaTablePath = TEMPORARY_FOLDER.newFolder().getAbsolutePath();
DeltaSinkTestUtils.initTestForNonPartitionedTable(deltaTablePath);
DeltaTestUtils.initTestForNonPartitionedTable(deltaTablePath);
} catch (IOException e) {
throw new RuntimeException("Weren't able to setup the test dependencies", e);
}
Expand All @@ -71,7 +72,7 @@ public void testFileSink() throws Exception {

public void runDeltaSinkTest() throws Exception {
// GIVEN
DeltaLog deltaLog = DeltaLog.forTable(DeltaSinkTestUtils.getHadoopConf(), deltaTablePath);
DeltaLog deltaLog = DeltaLog.forTable(DeltaTestUtils.getHadoopConf(), deltaTablePath);
List<AddFile> initialDeltaFiles = deltaLog.snapshot().getAllFiles();
int initialTableRecordsCount = TestParquetReader.readAndValidateAllTableRecords(deltaLog);
long initialVersion = deltaLog.snapshot().getVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
import java.util.stream.LongStream;

import io.delta.flink.sink.utils.DeltaSinkTestUtils;
import io.delta.flink.sink.utils.TestParquetReader;
import io.delta.flink.utils.DeltaTestUtils;
import io.delta.flink.utils.TestParquetReader;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
Expand Down Expand Up @@ -98,9 +99,9 @@ public void setup() {
try {
deltaTablePath = TEMPORARY_FOLDER.newFolder().getAbsolutePath();
if (isPartitioned) {
DeltaSinkTestUtils.initTestForPartitionedTable(deltaTablePath);
DeltaTestUtils.initTestForPartitionedTable(deltaTablePath);
} else {
DeltaSinkTestUtils.initTestForNonPartitionedTable(deltaTablePath);
DeltaTestUtils.initTestForNonPartitionedTable(deltaTablePath);
}
} catch (IOException e) {
throw new RuntimeException("Weren't able to setup the test dependencies", e);
Expand All @@ -120,7 +121,7 @@ public void testFileSink() throws Exception {

public void runDeltaSinkTest() throws Exception {
// GIVEN
DeltaLog deltaLog = DeltaLog.forTable(DeltaSinkTestUtils.getHadoopConf(), deltaTablePath);
DeltaLog deltaLog = DeltaLog.forTable(DeltaTestUtils.getHadoopConf(), deltaTablePath);
List<AddFile> initialDeltaFiles = deltaLog.snapshot().getAllFiles();

long initialVersion = deltaLog.snapshot().getVersion();
Expand Down Expand Up @@ -289,7 +290,7 @@ public void run(SourceContext<RowData> ctx) throws Exception {
private void sendRecordsUntil(int targetNumber, SourceContext<RowData> ctx) {
while (!isCanceled && nextValue < targetNumber) {
synchronized (ctx.getCheckpointLock()) {
RowData row = DeltaSinkTestUtils.CONVERTER.toInternal(
RowData row = DeltaSinkTestUtils.TEST_ROW_TYPE_CONVERTER.toInternal(
Row.of(
String.valueOf(nextValue),
String.valueOf((nextValue + nextValue)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.List;
import java.util.Map;

import io.delta.flink.sink.utils.DeltaSinkTestUtils;
import io.delta.flink.utils.DeltaTestUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -131,7 +131,7 @@ public void testWriteReadToDeltaTable() throws Exception {

// THEN
DeltaLog deltaLog =
DeltaLog.forTable(DeltaSinkTestUtils.getHadoopConf(), deltaTablePath);
DeltaLog.forTable(DeltaTestUtils.getHadoopConf(), deltaTablePath);
waitUntilDeltaLogExists(deltaLog);
validate(deltaLog.snapshot(), testRow);
}
Expand Down Expand Up @@ -214,7 +214,7 @@ private void runFlinkJob(RowType rowType,
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
DeltaSinkTestUtils.getHadoopConf(), rowType).build();
DeltaTestUtils.getHadoopConf(), rowType).build();
env.fromCollection(testData).sinkTo(deltaSink);
try {
env.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.delta.flink.sink.internal.committables.DeltaGlobalCommittable;
import io.delta.flink.sink.internal.committables.DeltaGlobalCommittableSerializer;
import io.delta.flink.sink.utils.DeltaSinkTestUtils;
import io.delta.flink.utils.DeltaTestUtils;
import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -71,9 +72,9 @@ public void setup() throws IOException {
@Test(expected = RuntimeException.class)
public void testWrongPartitionOrderWillFail() throws IOException {
//GIVEN
DeltaSinkTestUtils.initTestForPartitionedTable(tablePath.getPath());
DeltaTestUtils.initTestForPartitionedTable(tablePath.getPath());
DeltaGlobalCommitter globalCommitter = new DeltaGlobalCommitter(
DeltaSinkTestUtils.getHadoopConf(),
DeltaTestUtils.getHadoopConf(),
tablePath,
DeltaSinkTestUtils.TEST_ROW_TYPE,
false // mergeSchema
Expand All @@ -96,9 +97,9 @@ public void testWrongPartitionOrderWillFail() throws IOException {
public void testCommitTwice() throws Exception {
//GIVEN
int numAddedFiles = 3;
DeltaSinkTestUtils.initTestForPartitionedTable(tablePath.getPath());
DeltaTestUtils.initTestForPartitionedTable(tablePath.getPath());
DeltaLog deltaLog = DeltaLog.forTable(
DeltaSinkTestUtils.getHadoopConf(), tablePath.getPath());
DeltaTestUtils.getHadoopConf(), tablePath.getPath());
assertEquals(deltaLog.snapshot().getVersion(), 0);
int initialTableFilesCount = deltaLog.snapshot().getAllFiles().size();

Expand Down Expand Up @@ -126,9 +127,9 @@ public void testCommitTwice() throws Exception {
@Test
public void testMergeSchemaSetToTrue() throws IOException {
//GIVEN
DeltaSinkTestUtils.initTestForPartitionedTable(tablePath.getPath());
DeltaTestUtils.initTestForPartitionedTable(tablePath.getPath());
DeltaLog deltaLog = DeltaLog.forTable(
DeltaSinkTestUtils.getHadoopConf(), tablePath.getPath());
DeltaTestUtils.getHadoopConf(), tablePath.getPath());
List<DeltaGlobalCommittable> globalCommittables =
DeltaSinkTestUtils.getListOfDeltaGlobalCommittables(
3, DeltaSinkTestUtils.getTestPartitionSpec());
Expand All @@ -138,7 +139,7 @@ public void testMergeSchemaSetToTrue() throws IOException {
DeltaSinkTestUtils.addNewColumnToSchema(DeltaSinkTestUtils.TEST_PARTITIONED_ROW_TYPE);

DeltaGlobalCommitter globalCommitter = new DeltaGlobalCommitter(
DeltaSinkTestUtils.getHadoopConf(),
DeltaTestUtils.getHadoopConf(),
tablePath,
updatedSchema,
true // mergeSchema
Expand All @@ -160,7 +161,7 @@ public void testMergeSchemaSetToTrue() throws IOException {
@Test(expected = RuntimeException.class)
public void testMergeSchemaSetToFalse() throws Exception {
//GIVEN
DeltaSinkTestUtils.initTestForPartitionedTable(tablePath.getPath());
DeltaTestUtils.initTestForPartitionedTable(tablePath.getPath());
List<DeltaGlobalCommittable> globalCommittables =
DeltaSinkTestUtils.getListOfDeltaGlobalCommittables(
3, DeltaSinkTestUtils.getTestPartitionSpec());
Expand All @@ -177,7 +178,7 @@ public void testMergeSchemaSetToFalse() throws Exception {
@Test(expected = RuntimeException.class)
public void testMergeIncompatibleSchema() throws Exception {
//GIVEN
DeltaSinkTestUtils.initTestForPartitionedTable(tablePath.getPath());
DeltaTestUtils.initTestForPartitionedTable(tablePath.getPath());
List<DeltaGlobalCommittable> globalCommittables =
DeltaSinkTestUtils.getListOfDeltaGlobalCommittables(
3, DeltaSinkTestUtils.getTestPartitionSpec());
Expand All @@ -195,7 +196,7 @@ public void testMergeIncompatibleSchema() throws Exception {
@Test(expected = RuntimeException.class)
public void testWrongStreamPartitionValues() throws Exception {
//GIVEN
DeltaSinkTestUtils.initTestForPartitionedTable(tablePath.getPath());
DeltaTestUtils.initTestForPartitionedTable(tablePath.getPath());
List<DeltaGlobalCommittable> globalCommittables =
DeltaSinkTestUtils.getListOfDeltaGlobalCommittables(
1, getNonMatchingPartitionSpec());
Expand All @@ -213,7 +214,7 @@ public void testCommittablesFromDifferentCheckpointInterval() {
int numAddedFiles1 = 3;
int numAddedFiles2 = 5;
DeltaLog deltaLog = DeltaLog.forTable(
DeltaSinkTestUtils.getHadoopConf(), tablePath.getPath());
DeltaTestUtils.getHadoopConf(), tablePath.getPath());
int initialTableFilesCount = deltaLog.snapshot().getAllFiles().size();
assertEquals(-1, deltaLog.snapshot().getVersion());

Expand Down Expand Up @@ -253,7 +254,7 @@ public void testCommittablesFromDifferentCheckpointIntervalOneOutdated() {
int numAddedFiles1SecondTrial = 4;
int numAddedFiles2 = 10;
DeltaLog deltaLog = DeltaLog.forTable(
DeltaSinkTestUtils.getHadoopConf(), tablePath.getPath());
DeltaTestUtils.getHadoopConf(), tablePath.getPath());
assertEquals(-1, deltaLog.snapshot().getVersion());

List<DeltaCommittable> deltaCommittables1FirstTrial =
Expand Down Expand Up @@ -307,11 +308,11 @@ public void testCommittablesFromDifferentCheckpointIntervalOneOutdated() {
public void testCommittablesFromDifferentCheckpointIntervalOneWithIncompatiblePartitions()
throws Exception {
//GIVEN
DeltaSinkTestUtils.initTestForPartitionedTable(tablePath.getPath());
DeltaTestUtils.initTestForPartitionedTable(tablePath.getPath());
int numAddedFiles1 = 3;
int numAddedFiles2 = 5;
DeltaLog deltaLog = DeltaLog.forTable(
DeltaSinkTestUtils.getHadoopConf(), tablePath.getPath());
DeltaTestUtils.getHadoopConf(), tablePath.getPath());
assertEquals(0, deltaLog.snapshot().getVersion());
int initialNumberOfFiles = deltaLog.snapshot().getAllFiles().size();

Expand Down Expand Up @@ -399,16 +400,16 @@ public void testUseFullPathForDeltaLog() throws Exception {
int numAddedFiles = 3;

assertEquals(tablePath.toUri().getScheme(), "file");
DeltaSinkTestUtils.initTestForPartitionedTable(tablePath.getPath());
DeltaTestUtils.initTestForPartitionedTable(tablePath.getPath());
DeltaLog deltaLog = DeltaLog.forTable(
DeltaSinkTestUtils.getHadoopConf(), tablePath.getPath());
DeltaTestUtils.getHadoopConf(), tablePath.getPath());
assertEquals(deltaLog.snapshot().getVersion(), 0);
int initialTableFilesCount = deltaLog.snapshot().getAllFiles().size();

List<DeltaGlobalCommittable> globalCommittables =
DeltaSinkTestUtils.getListOfDeltaGlobalCommittables(
numAddedFiles, DeltaSinkTestUtils.getTestPartitionSpec());
Configuration hadoopConfig = DeltaSinkTestUtils.getHadoopConf();
Configuration hadoopConfig = DeltaTestUtils.getHadoopConf();

// set up a simple hdfs mock as default filesystem. This FS should not be
// used by the global committer below, as the path we are passing is from
Expand Down Expand Up @@ -448,7 +449,7 @@ public void testUseFullPathForDeltaLog() throws Exception {

private DeltaGlobalCommitter getTestGlobalCommitter(RowType schema) {
return new DeltaGlobalCommitter(
DeltaSinkTestUtils.getHadoopConf(),
DeltaTestUtils.getHadoopConf(),
tablePath,
schema,
false // mergeSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.delta.flink.sink.internal.SchemaConverter;
import io.delta.flink.sink.internal.committables.DeltaGlobalCommittable;
import io.delta.flink.sink.utils.DeltaSinkTestUtils;
import io.delta.flink.utils.DeltaTestUtils;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.types.logical.RowType;
import org.junit.Before;
Expand Down Expand Up @@ -75,7 +76,7 @@ public static Collection<Object[]> params() {
);
}

@Parameterized.Parameter(0)
@Parameterized.Parameter()
public boolean mergeSchema;

@Parameterized.Parameter(1)
Expand All @@ -94,13 +95,13 @@ public void setup() throws IOException {
tablePath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
if (initializeTableBeforeCommit) {
if (partitionSpec.isEmpty()) {
DeltaSinkTestUtils.initTestForNonPartitionedTable(
DeltaTestUtils.initTestForNonPartitionedTable(
tablePath.getPath());
} else {
DeltaSinkTestUtils.initTestForPartitionedTable(tablePath.getPath());
DeltaTestUtils.initTestForPartitionedTable(tablePath.getPath());
}
}
deltaLog = DeltaLog.forTable(DeltaSinkTestUtils.getHadoopConf(), tablePath.getPath());
deltaLog = DeltaLog.forTable(DeltaTestUtils.getHadoopConf(), tablePath.getPath());
RowType rowType = (partitionSpec.isEmpty()) ?
DeltaSinkTestUtils.TEST_ROW_TYPE : DeltaSinkTestUtils.TEST_PARTITIONED_ROW_TYPE;

Expand All @@ -112,7 +113,7 @@ public void setup() throws IOException {
public void testCommitToDeltaTableInAppendMode() {
//GIVEN
DeltaGlobalCommitter globalCommitter = new DeltaGlobalCommitter(
DeltaSinkTestUtils.getHadoopConf(),
DeltaTestUtils.getHadoopConf(),
tablePath,
rowTypeToCommit,
mergeSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import io.delta.flink.sink.internal.committables.DeltaCommittable;
import io.delta.flink.sink.internal.committer.DeltaCommitter;
import io.delta.flink.sink.utils.DeltaSinkTestUtils;
import io.delta.flink.sink.utils.TestParquetReader;
import io.delta.flink.utils.TestParquetReader;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
Expand Down Expand Up @@ -301,7 +301,11 @@ private static int getWrittenRecordsCount(List<DeltaCommittable> committables,
for (DeltaCommittable committable : committables) {
Path filePath = new Path(bucketPath, committable.getDeltaPendingFile().getFileName());
writtenRecordsCount +=
TestParquetReader.parseAndCountRecords(filePath, DeltaSinkTestUtils.TEST_ROW_TYPE);
TestParquetReader.parseAndCountRecords(
filePath,
DeltaSinkTestUtils.TEST_ROW_TYPE,
DeltaSinkTestUtils.TEST_ROW_TYPE_CONVERTER
);
}
return writtenRecordsCount;
}
Expand Down
Loading

0 comments on commit 5f58b96

Please sign in to comment.