Skip to content

Commit

Permalink
FlinkDeltaSource_PR_14_IT_Tests - Add Execution IT tests for Delta So…
Browse files Browse the repository at this point in the history
…urce (delta-io#375)

* TestUtilCleanup - Move few methods from Flink sink test utils to common Flink connector tests utils.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanup - delta-io#2 Move few methods from Flink sink test utils to common Flink connector tests utils.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanup - delta-io#3 Move few methods from Flink sink test utils to common Flink connector tests utils.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* FlinkDeltaSource_PR_14_IT_Tests - chery pick changes to Test Utils -> pull up Row Type as an argument.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* FlinkDeltaSource_PR_14_IT_Tests - end2end WIP test

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanUp - Changes after Review

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanUp - new changes

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* FlinkDeltaSource_PR_14_IT_Tests - tests.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanUp - more refactoring

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* FlinkDeltaSource_PR_14_IT_Tests - end to end tests

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanUp - additional refactoring

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* FlinkDeltaSource_PR_14_IT_Tests - end2end test for unbounded stream with updates.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanUp - Merge From master + remove source partition table. Add log4j property file for tests.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanUp - Merge From master + remove source partition table. Add log4j property file for tests.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* FlinkDeltaSource_PR_14_IT_Tests - end to end test with reading/writing all types.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* FlinkDeltaSource_PR_14_IT_Tests - merge from master

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* FlinkDeltaSorce_SupressFLinkLogs_FixNPE - Set Log level for Flink to ERROR, fix NPE in logs for a couple of tests.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* FlinkDeltaSorce_SupressFLinkLogs_FixNPE - repeat failed integration test

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* FlinkDeltaSource_PR_14_IT_Tests - Add Source Execution test to read all data types.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* FlinkDeltaSource_PR_14_IT_Tests - Add Source Execution test for options.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* FlinkDeltaSource_PR_14_IT_Tests - Changes after merge from master.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* FlinkDeltaSource_PR_14_IT_Tests - Changes after merge from master.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* FlinkDeltaSource_PR_14_IT_Tests - Changes after Code Review

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* FlinkDeltaSource_PR_14_IT_Tests - change Delta Log last modification time attribute for tests.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* FlinkDeltaSource_PR_14_IT_Tests - changes after code review.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

Co-authored-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
  • Loading branch information
kristoffSC and Krzysztof Chmielewski committed Jun 17, 2022
1 parent 0196bee commit 098a053
Show file tree
Hide file tree
Showing 26 changed files with 1,078 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
*/
public enum DeltaDataType {
ARRAY(ArrayType.class),
BIGINT(LongType.class),
BINARY(BinaryType.class),
BYTE(ByteType.class),
BOOLEAN(BooleanType.class),
Expand All @@ -39,6 +38,7 @@ public enum DeltaDataType {
DOUBLE(DoubleType.class),
FLOAT(FloatType.class),
INTEGER(IntegerType.class),
LONG(LongType.class),
MAP(MapType.class),
NULL(NullType.class),
SMALLINT(ShortType.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static LogicalType toFlinkDataType(DataType deltaType, boolean nullable)
arrayContainsNull);
return
new ArrayType(nullable, elementType);
case BIGINT:
case LONG:
return new BigIntType(nullable);
case BINARY:
return new BinaryType(nullable, BinaryType.DEFAULT_LENGTH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private TransitiveOptional<Snapshot> getSnapshotFromStartingVersionOption(
}
return TransitiveOptional.empty();
}
// TODO PR 14 add IT test for this

private TransitiveOptional<Snapshot> getSnapshotFromStartingTimestampOption(
DeltaSourceConfiguration sourceConfiguration) {
Long startingTimestamp = sourceConfiguration.getValue(STARTING_TIMESTAMP);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
package io.delta.flink;

import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.ZoneOffset;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.sink.internal.DeltaSinkInternal;
import io.delta.flink.source.DeltaSource;
import io.delta.flink.utils.DeltaTestUtils;
import io.delta.flink.utils.FailoverType;
import io.delta.flink.utils.RecordCounterToFail.FailCheck;
import io.delta.flink.utils.TableUpdateDescriptor;
import io.delta.flink.utils.TestDescriptor;
import io.delta.flink.utils.TestParquetReader;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static io.delta.flink.utils.DeltaTestUtils.buildCluster;
import static io.delta.flink.utils.ExecutionITCaseTestConstants.ALL_DATA_TABLE_COLUMN_NAMES;
import static io.delta.flink.utils.ExecutionITCaseTestConstants.ALL_DATA_TABLE_COLUMN_TYPES;
import static io.delta.flink.utils.ExecutionITCaseTestConstants.ALL_DATA_TABLE_RECORD_COUNT;
import static io.delta.flink.utils.ExecutionITCaseTestConstants.DATA_COLUMN_NAMES;
import static io.delta.flink.utils.ExecutionITCaseTestConstants.DATA_COLUMN_TYPES;
import static io.delta.flink.utils.ExecutionITCaseTestConstants.LARGE_TABLE_ALL_COLUMN_NAMES;
import static io.delta.flink.utils.ExecutionITCaseTestConstants.LARGE_TABLE_ALL_COLUMN_TYPES;
import static io.delta.flink.utils.ExecutionITCaseTestConstants.LARGE_TABLE_RECORD_COUNT;
import static io.delta.flink.utils.ExecutionITCaseTestConstants.SMALL_TABLE_COUNT;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.hamcrest.core.IsNot.not;
import static org.junit.jupiter.api.Assertions.assertAll;

import io.delta.standalone.DeltaLog;
import io.delta.standalone.Snapshot;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.data.CloseableIterator;
import io.delta.standalone.data.RowRecord;

public class DeltaEndToEndExecutionITCaseTest {

private static final Logger LOG =
LoggerFactory.getLogger(DeltaEndToEndExecutionITCaseTest.class);

private static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();

private static final int PARALLELISM = 4;

private final MiniClusterWithClientResource miniClusterResource = buildCluster(PARALLELISM);

private String sourceTablePath;

private String sinkTablePath;

@BeforeAll
public static void beforeAll() throws IOException {
TMP_FOLDER.create();
}

@AfterAll
public static void afterAll() {
TMP_FOLDER.delete();
}

@BeforeEach
public void setUp() {
try {
miniClusterResource.before();

sourceTablePath = TMP_FOLDER.newFolder().getAbsolutePath();
sinkTablePath = TMP_FOLDER.newFolder().getAbsolutePath();

} catch (Exception e) {
throw new RuntimeException("Weren't able to setup the test dependencies", e);
}
}

@ParameterizedTest(name = "{index}: FailoverType = [{0}]")
@EnumSource(FailoverType.class)
public void testEndToEndBoundedStream(FailoverType failoverType) throws Exception {
DeltaTestUtils.initTestForNonPartitionedLargeTable(sourceTablePath);

DeltaSource<RowData> deltaSource = DeltaSource.forBoundedRowData(
new Path(sourceTablePath),
DeltaTestUtils.getHadoopConf()
)
.build();

RowType rowType = RowType.of(LARGE_TABLE_ALL_COLUMN_TYPES, LARGE_TABLE_ALL_COLUMN_NAMES);
DeltaSinkInternal<RowData> deltaSink = DeltaSink.forRowData(
new Path(sinkTablePath),
DeltaTestUtils.getHadoopConf(),
rowType)
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 1000));

DataStream<RowData> stream =
env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
stream.sinkTo(deltaSink);

DeltaTestUtils.testBoundedStream(
failoverType,
(FailCheck) readRows -> readRows == LARGE_TABLE_RECORD_COUNT / 2,
stream,
miniClusterResource
);

verifyDeltaTable(sinkTablePath, rowType, LARGE_TABLE_RECORD_COUNT);
}

@ParameterizedTest(name = "{index}: FailoverType = [{0}]")
@EnumSource(FailoverType.class)
public void testEndToEndContinuousStream(FailoverType failoverType) throws Exception {
DeltaTestUtils.initTestForNonPartitionedTable(sourceTablePath);

DeltaSource<RowData> deltaSource = DeltaSource.forContinuousRowData(
new Path(sourceTablePath),
DeltaTestUtils.getHadoopConf()
)
.build();

RowType rowType = RowType.of(DATA_COLUMN_TYPES, DATA_COLUMN_NAMES);
DeltaSinkInternal<RowData> deltaSink = DeltaSink.forRowData(
new Path(sinkTablePath),
DeltaTestUtils.getHadoopConf(),
rowType)
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 1000));
env.enableCheckpointing(100);

DataStream<RowData> stream =
env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
stream.sinkTo(deltaSink);

int numberOfTableUpdateBulks = 5;
int rowsPerTableUpdate = 5;
int expectedRowCount = SMALL_TABLE_COUNT + numberOfTableUpdateBulks * rowsPerTableUpdate;

TestDescriptor testDescriptor = DeltaTestUtils.prepareTableUpdates(
deltaSource.getTablePath().toUri().toString(),
RowType.of(DATA_COLUMN_TYPES, DATA_COLUMN_NAMES),
SMALL_TABLE_COUNT,
new TableUpdateDescriptor(numberOfTableUpdateBulks, rowsPerTableUpdate)
);

DeltaTestUtils.testContinuousStream(
failoverType,
testDescriptor,
(FailCheck) readRows -> readRows == expectedRowCount/ 2,
stream,
miniClusterResource
);

verifyDeltaTable(sinkTablePath, rowType, expectedRowCount);
}

@Test
public void testEndToEndReadAllDataTypes() throws Exception {

// this test uses test-non-partitioned-delta-table-alltypes table. See README.md from
// table's folder for detail information about this table.
DeltaTestUtils.initTestForAllDataTypes(sourceTablePath);

DeltaSource<RowData> deltaSource = DeltaSource.forBoundedRowData(
new Path(sourceTablePath),
DeltaTestUtils.getHadoopConf()
)
.build();

RowType rowType = RowType.of(ALL_DATA_TABLE_COLUMN_TYPES, ALL_DATA_TABLE_COLUMN_NAMES);
DeltaSinkInternal<RowData> deltaSink = DeltaSink.forRowData(
new Path(sinkTablePath),
DeltaTestUtils.getHadoopConf(),
rowType)
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 1000));

DataStream<RowData> stream =
env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
stream.sinkTo(deltaSink);

DeltaTestUtils.testBoundedStream(stream, miniClusterResource);

Snapshot snapshot = verifyDeltaTable(sinkTablePath, rowType, ALL_DATA_TABLE_RECORD_COUNT);

// read entire snapshot using delta standalone and check every column.
final AtomicInteger index = new AtomicInteger(0);
try(CloseableIterator<RowRecord> iterator = snapshot.open()) {
while (iterator.hasNext()) {
final int i = index.getAndIncrement();

BigDecimal expectedBigDecimal = BigDecimal.valueOf((double) i).setScale(18);

RowRecord row = iterator.next();
LOG.info("Row Content: " + row.toString());
assertAll(() -> {
assertThat(
row.getByte(ALL_DATA_TABLE_COLUMN_NAMES[0]),
equalTo(new Integer(i).byteValue())
);
assertThat(
row.getShort(ALL_DATA_TABLE_COLUMN_NAMES[1]),
equalTo((short) i)
);
assertThat(row.getInt(ALL_DATA_TABLE_COLUMN_NAMES[2]), equalTo(i));
assertThat(
row.getDouble(ALL_DATA_TABLE_COLUMN_NAMES[3]),
equalTo(new Integer(i).doubleValue())
);
assertThat(
row.getFloat(ALL_DATA_TABLE_COLUMN_NAMES[4]),
equalTo(new Integer(i).floatValue())
);

// In Source Table this column was generated as: BigInt(x)
assertThat(
row.getBigDecimal(ALL_DATA_TABLE_COLUMN_NAMES[5]),
equalTo(expectedBigDecimal)
);

// In Source Table this column was generated as: BigDecimal(x),
// There is a problem with parquet library used by delta standalone when
// reading BigDecimal values. The issue should be resolved
// after https://github.com/delta-io/connectors/pull/303
if (i > 0) {
assertThat(
row.getBigDecimal(ALL_DATA_TABLE_COLUMN_NAMES[6]),
not(equalTo(expectedBigDecimal))
);
}

// same value for all columns
assertThat(
row.getTimestamp(ALL_DATA_TABLE_COLUMN_NAMES[7])
.toLocalDateTime().toInstant(ZoneOffset.UTC),
equalTo(Timestamp.valueOf("2022-06-14 18:54:24.547557")
.toLocalDateTime().toInstant(ZoneOffset.UTC))
);
assertThat(
row.getString(ALL_DATA_TABLE_COLUMN_NAMES[8]),
equalTo(String.valueOf(i))
);

// same value for all columns
assertThat(row.getBoolean(ALL_DATA_TABLE_COLUMN_NAMES[9]), equalTo(true));
}
);
}
}
}

/**
* Verifies if Delta table under parameter {@code sinPath} contains expected number of rows with
* given rowType format.
*
* @param sinkPath Path to Delta table.
* @param rowType {@link RowType} for test Delta table.
* @param expectedNumberOfRecords expected number of row in Delta table.
* @return Head snapshot of Delta table.
* @throws IOException If any issue while reading Delta Table.
*/
@SuppressWarnings("unchecked")
private Snapshot verifyDeltaTable(
String sinkPath,
RowType rowType,
Integer expectedNumberOfRecords) throws IOException {

DeltaLog deltaLog = DeltaLog.forTable(DeltaTestUtils.getHadoopConf(), sinkPath);
Snapshot snapshot = deltaLog.snapshot();
List<AddFile> deltaFiles = snapshot.getAllFiles();
int finalTableRecordsCount = TestParquetReader
.readAndValidateAllTableRecords(
deltaLog,
rowType,
DataFormatConverters.getConverterForDataType(
TypeConversions.fromLogicalToDataType(rowType)));
long finalVersion = snapshot.getVersion();

LOG.info(
String.format(
"RESULTS: final record count: [%d], final table version: [%d], number of Delta "
+ "files: [%d]",
finalTableRecordsCount,
finalVersion,
deltaFiles.size()
)
);

assertThat(finalTableRecordsCount, equalTo(expectedNumberOfRecords));
return snapshot;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ public void runDeltaSinkTest() throws Exception {
List<AddFile> initialDeltaFiles = deltaLog.snapshot().getAllFiles();

long initialVersion = deltaLog.snapshot().getVersion();
int initialTableRecordsCount = TestParquetReader.readAndValidateAllTableRecords(deltaLog);
int initialTableRecordsCount = TestParquetReader
.readAndValidateAllTableRecords(deltaLog);
assertEquals(2, initialTableRecordsCount);

JobGraph jobGraph = createJobGraph(deltaTablePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ private static StreamExecutionEnvironment getTestStreamEnv() {
return env;
}

@SuppressWarnings("unchecked")
private static List<RowData> rowToRowData(RowType rowType,
Row row) {
DataFormatConverters.DataFormatConverter<RowData, Row> CONVERTER =
Expand Down
Loading

0 comments on commit 098a053

Please sign in to comment.