From 098a053a608bfb08fb7ed40fd35456174b1c6f3a Mon Sep 17 00:00:00 2001 From: kristoffSC Date: Fri, 17 Jun 2022 20:21:15 +0200 Subject: [PATCH] FlinkDeltaSource_PR_14_IT_Tests - Add Execution IT tests for Delta Source (#375) * TestUtilCleanup - Move few methods from Flink sink test utils to common Flink connector tests utils. Signed-off-by: Krzysztof Chmielewski * TestUtilCleanup - #2 Move few methods from Flink sink test utils to common Flink connector tests utils. Signed-off-by: Krzysztof Chmielewski * TestUtilCleanup - #3 Move few methods from Flink sink test utils to common Flink connector tests utils. Signed-off-by: Krzysztof Chmielewski * FlinkDeltaSource_PR_14_IT_Tests - chery pick changes to Test Utils -> pull up Row Type as an argument. Signed-off-by: Krzysztof Chmielewski * FlinkDeltaSource_PR_14_IT_Tests - end2end WIP test Signed-off-by: Krzysztof Chmielewski * TestUtilCleanUp - Changes after Review Signed-off-by: Krzysztof Chmielewski * TestUtilCleanUp - new changes Signed-off-by: Krzysztof Chmielewski * FlinkDeltaSource_PR_14_IT_Tests - tests. Signed-off-by: Krzysztof Chmielewski * TestUtilCleanUp - more refactoring Signed-off-by: Krzysztof Chmielewski * FlinkDeltaSource_PR_14_IT_Tests - end to end tests Signed-off-by: Krzysztof Chmielewski * TestUtilCleanUp - additional refactoring Signed-off-by: Krzysztof Chmielewski * FlinkDeltaSource_PR_14_IT_Tests - end2end test for unbounded stream with updates. Signed-off-by: Krzysztof Chmielewski * TestUtilCleanUp - Merge From master + remove source partition table. Add log4j property file for tests. Signed-off-by: Krzysztof Chmielewski * TestUtilCleanUp - Merge From master + remove source partition table. Add log4j property file for tests. Signed-off-by: Krzysztof Chmielewski * FlinkDeltaSource_PR_14_IT_Tests - end to end test with reading/writing all types. Signed-off-by: Krzysztof Chmielewski * FlinkDeltaSource_PR_14_IT_Tests - merge from master Signed-off-by: Krzysztof Chmielewski * FlinkDeltaSorce_SupressFLinkLogs_FixNPE - Set Log level for Flink to ERROR, fix NPE in logs for a couple of tests. Signed-off-by: Krzysztof Chmielewski * FlinkDeltaSorce_SupressFLinkLogs_FixNPE - repeat failed integration test Signed-off-by: Krzysztof Chmielewski * FlinkDeltaSource_PR_14_IT_Tests - Add Source Execution test to read all data types. Signed-off-by: Krzysztof Chmielewski * FlinkDeltaSource_PR_14_IT_Tests - Add Source Execution test for options. Signed-off-by: Krzysztof Chmielewski * FlinkDeltaSource_PR_14_IT_Tests - Changes after merge from master. Signed-off-by: Krzysztof Chmielewski * FlinkDeltaSource_PR_14_IT_Tests - Changes after merge from master. Signed-off-by: Krzysztof Chmielewski * FlinkDeltaSource_PR_14_IT_Tests - Changes after Code Review Signed-off-by: Krzysztof Chmielewski * FlinkDeltaSource_PR_14_IT_Tests - change Delta Log last modification time attribute for tests. Signed-off-by: Krzysztof Chmielewski * FlinkDeltaSource_PR_14_IT_Tests - changes after code review. Signed-off-by: Krzysztof Chmielewski Co-authored-by: Krzysztof Chmielewski --- .../flink/source/internal/DeltaDataType.java | 2 +- .../source/internal/SchemaConverter.java | 2 +- .../ContinuousSourceSnapshotSupplier.java | 2 +- .../DeltaEndToEndExecutionITCaseTest.java | 324 ++++++++++++++++++ .../DeltaSinkStreamingExecutionITCase.java | 3 +- .../flink/sink/DeltaSinkWriteReadITCase.java | 1 + ...DeltaSourceBoundedExecutionITCaseTest.java | 216 +++++++++++- ...taSourceContinuousExecutionITCaseTest.java | 280 +++++++++++++-- .../delta/flink/source/DeltaSourceITBase.java | 233 +++++++------ .../DefaultOptionTypeConverterTest.java | 2 +- .../delta/flink/utils/DeltaTableUpdater.java | 2 +- .../io/delta/flink/utils/DeltaTestUtils.java | 70 ++-- .../utils/ExecutionITCaseTestConstants.java | 19 + ...estDescriptor.java => TestDescriptor.java} | 4 +- .../README.md | 31 ++ .../_delta_log/00000000000000000000.json | 4 + .../_delta_log/00000000000000000001.json | 2 + .../_delta_log/00000000000000000002.json | 2 + .../_delta_log/00000000000000000003.json | 2 + ...420b-82ed-57a76b394927-c000.snappy.parquet | Bin 0 -> 1077 bytes ...4e4f-837a-120ae9cbcfe4-c000.snappy.parquet | Bin 0 -> 977 bytes ...4940-954d-f5dbe254bf6c-c000.snappy.parquet | Bin 0 -> 1031 bytes ...4aab-996a-ca44505a279f-c000.snappy.parquet | Bin 0 -> 1165 bytes .../README.md | 31 ++ .../_delta_log/00000000000000000000.json | 4 + ...448d-ba15-088a95699f01-c000.snappy.parquet | Bin 0 -> 2573 bytes 26 files changed, 1078 insertions(+), 158 deletions(-) create mode 100644 flink/src/test/java/io/delta/flink/DeltaEndToEndExecutionITCaseTest.java rename flink/src/test/java/io/delta/flink/utils/{ContinuousTestDescriptor.java => TestDescriptor.java} (95%) create mode 100644 flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/README.md create mode 100644 flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/_delta_log/00000000000000000000.json create mode 100644 flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/_delta_log/00000000000000000001.json create mode 100644 flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/_delta_log/00000000000000000002.json create mode 100644 flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/_delta_log/00000000000000000003.json create mode 100644 flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/part-00000-96103e04-7578-420b-82ed-57a76b394927-c000.snappy.parquet create mode 100644 flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/part-00000-a3dfa929-86a1-4e4f-837a-120ae9cbcfe4-c000.snappy.parquet create mode 100644 flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/part-00000-bf5cbfdd-6dae-4940-954d-f5dbe254bf6c-c000.snappy.parquet create mode 100644 flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/part-00000-c8a6b958-41e5-4aab-996a-ca44505a279f-c000.snappy.parquet create mode 100644 flink/src/test/resources/test-data/test-non-partitioned-delta-table-alltypes/README.md create mode 100644 flink/src/test/resources/test-data/test-non-partitioned-delta-table-alltypes/_delta_log/00000000000000000000.json create mode 100644 flink/src/test/resources/test-data/test-non-partitioned-delta-table-alltypes/part-00000-64ff527a-c93c-448d-ba15-088a95699f01-c000.snappy.parquet diff --git a/flink/src/main/java/io/delta/flink/source/internal/DeltaDataType.java b/flink/src/main/java/io/delta/flink/source/internal/DeltaDataType.java index 05eb4fb19e8..0f4ed41f79a 100644 --- a/flink/src/main/java/io/delta/flink/source/internal/DeltaDataType.java +++ b/flink/src/main/java/io/delta/flink/source/internal/DeltaDataType.java @@ -30,7 +30,6 @@ */ public enum DeltaDataType { ARRAY(ArrayType.class), - BIGINT(LongType.class), BINARY(BinaryType.class), BYTE(ByteType.class), BOOLEAN(BooleanType.class), @@ -39,6 +38,7 @@ public enum DeltaDataType { DOUBLE(DoubleType.class), FLOAT(FloatType.class), INTEGER(IntegerType.class), + LONG(LongType.class), MAP(MapType.class), NULL(NullType.class), SMALLINT(ShortType.class), diff --git a/flink/src/main/java/io/delta/flink/source/internal/SchemaConverter.java b/flink/src/main/java/io/delta/flink/source/internal/SchemaConverter.java index 6ea8dbd3c7c..6a5e0b7a280 100644 --- a/flink/src/main/java/io/delta/flink/source/internal/SchemaConverter.java +++ b/flink/src/main/java/io/delta/flink/source/internal/SchemaConverter.java @@ -61,7 +61,7 @@ public static LogicalType toFlinkDataType(DataType deltaType, boolean nullable) arrayContainsNull); return new ArrayType(nullable, elementType); - case BIGINT: + case LONG: return new BigIntType(nullable); case BINARY: return new BinaryType(nullable, BinaryType.DEFAULT_LENGTH); diff --git a/flink/src/main/java/io/delta/flink/source/internal/enumerator/supplier/ContinuousSourceSnapshotSupplier.java b/flink/src/main/java/io/delta/flink/source/internal/enumerator/supplier/ContinuousSourceSnapshotSupplier.java index 96549cfa666..cea8e4a8997 100644 --- a/flink/src/main/java/io/delta/flink/source/internal/enumerator/supplier/ContinuousSourceSnapshotSupplier.java +++ b/flink/src/main/java/io/delta/flink/source/internal/enumerator/supplier/ContinuousSourceSnapshotSupplier.java @@ -59,7 +59,7 @@ private TransitiveOptional getSnapshotFromStartingVersionOption( } return TransitiveOptional.empty(); } - // TODO PR 14 add IT test for this + private TransitiveOptional getSnapshotFromStartingTimestampOption( DeltaSourceConfiguration sourceConfiguration) { Long startingTimestamp = sourceConfiguration.getValue(STARTING_TIMESTAMP); diff --git a/flink/src/test/java/io/delta/flink/DeltaEndToEndExecutionITCaseTest.java b/flink/src/test/java/io/delta/flink/DeltaEndToEndExecutionITCaseTest.java new file mode 100644 index 00000000000..c2bfcbf8f49 --- /dev/null +++ b/flink/src/test/java/io/delta/flink/DeltaEndToEndExecutionITCaseTest.java @@ -0,0 +1,324 @@ +package io.delta.flink; + +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.time.ZoneOffset; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import io.delta.flink.sink.DeltaSink; +import io.delta.flink.sink.internal.DeltaSinkInternal; +import io.delta.flink.source.DeltaSource; +import io.delta.flink.utils.DeltaTestUtils; +import io.delta.flink.utils.FailoverType; +import io.delta.flink.utils.RecordCounterToFail.FailCheck; +import io.delta.flink.utils.TableUpdateDescriptor; +import io.delta.flink.utils.TestDescriptor; +import io.delta.flink.utils.TestParquetReader; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static io.delta.flink.utils.DeltaTestUtils.buildCluster; +import static io.delta.flink.utils.ExecutionITCaseTestConstants.ALL_DATA_TABLE_COLUMN_NAMES; +import static io.delta.flink.utils.ExecutionITCaseTestConstants.ALL_DATA_TABLE_COLUMN_TYPES; +import static io.delta.flink.utils.ExecutionITCaseTestConstants.ALL_DATA_TABLE_RECORD_COUNT; +import static io.delta.flink.utils.ExecutionITCaseTestConstants.DATA_COLUMN_NAMES; +import static io.delta.flink.utils.ExecutionITCaseTestConstants.DATA_COLUMN_TYPES; +import static io.delta.flink.utils.ExecutionITCaseTestConstants.LARGE_TABLE_ALL_COLUMN_NAMES; +import static io.delta.flink.utils.ExecutionITCaseTestConstants.LARGE_TABLE_ALL_COLUMN_TYPES; +import static io.delta.flink.utils.ExecutionITCaseTestConstants.LARGE_TABLE_RECORD_COUNT; +import static io.delta.flink.utils.ExecutionITCaseTestConstants.SMALL_TABLE_COUNT; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.hamcrest.core.IsNot.not; +import static org.junit.jupiter.api.Assertions.assertAll; + +import io.delta.standalone.DeltaLog; +import io.delta.standalone.Snapshot; +import io.delta.standalone.actions.AddFile; +import io.delta.standalone.data.CloseableIterator; +import io.delta.standalone.data.RowRecord; + +public class DeltaEndToEndExecutionITCaseTest { + + private static final Logger LOG = + LoggerFactory.getLogger(DeltaEndToEndExecutionITCaseTest.class); + + private static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); + + private static final int PARALLELISM = 4; + + private final MiniClusterWithClientResource miniClusterResource = buildCluster(PARALLELISM); + + private String sourceTablePath; + + private String sinkTablePath; + + @BeforeAll + public static void beforeAll() throws IOException { + TMP_FOLDER.create(); + } + + @AfterAll + public static void afterAll() { + TMP_FOLDER.delete(); + } + + @BeforeEach + public void setUp() { + try { + miniClusterResource.before(); + + sourceTablePath = TMP_FOLDER.newFolder().getAbsolutePath(); + sinkTablePath = TMP_FOLDER.newFolder().getAbsolutePath(); + + } catch (Exception e) { + throw new RuntimeException("Weren't able to setup the test dependencies", e); + } + } + + @ParameterizedTest(name = "{index}: FailoverType = [{0}]") + @EnumSource(FailoverType.class) + public void testEndToEndBoundedStream(FailoverType failoverType) throws Exception { + DeltaTestUtils.initTestForNonPartitionedLargeTable(sourceTablePath); + + DeltaSource deltaSource = DeltaSource.forBoundedRowData( + new Path(sourceTablePath), + DeltaTestUtils.getHadoopConf() + ) + .build(); + + RowType rowType = RowType.of(LARGE_TABLE_ALL_COLUMN_TYPES, LARGE_TABLE_ALL_COLUMN_NAMES); + DeltaSinkInternal deltaSink = DeltaSink.forRowData( + new Path(sinkTablePath), + DeltaTestUtils.getHadoopConf(), + rowType) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 1000)); + + DataStream stream = + env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source"); + stream.sinkTo(deltaSink); + + DeltaTestUtils.testBoundedStream( + failoverType, + (FailCheck) readRows -> readRows == LARGE_TABLE_RECORD_COUNT / 2, + stream, + miniClusterResource + ); + + verifyDeltaTable(sinkTablePath, rowType, LARGE_TABLE_RECORD_COUNT); + } + + @ParameterizedTest(name = "{index}: FailoverType = [{0}]") + @EnumSource(FailoverType.class) + public void testEndToEndContinuousStream(FailoverType failoverType) throws Exception { + DeltaTestUtils.initTestForNonPartitionedTable(sourceTablePath); + + DeltaSource deltaSource = DeltaSource.forContinuousRowData( + new Path(sourceTablePath), + DeltaTestUtils.getHadoopConf() + ) + .build(); + + RowType rowType = RowType.of(DATA_COLUMN_TYPES, DATA_COLUMN_NAMES); + DeltaSinkInternal deltaSink = DeltaSink.forRowData( + new Path(sinkTablePath), + DeltaTestUtils.getHadoopConf(), + rowType) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 1000)); + env.enableCheckpointing(100); + + DataStream stream = + env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source"); + stream.sinkTo(deltaSink); + + int numberOfTableUpdateBulks = 5; + int rowsPerTableUpdate = 5; + int expectedRowCount = SMALL_TABLE_COUNT + numberOfTableUpdateBulks * rowsPerTableUpdate; + + TestDescriptor testDescriptor = DeltaTestUtils.prepareTableUpdates( + deltaSource.getTablePath().toUri().toString(), + RowType.of(DATA_COLUMN_TYPES, DATA_COLUMN_NAMES), + SMALL_TABLE_COUNT, + new TableUpdateDescriptor(numberOfTableUpdateBulks, rowsPerTableUpdate) + ); + + DeltaTestUtils.testContinuousStream( + failoverType, + testDescriptor, + (FailCheck) readRows -> readRows == expectedRowCount/ 2, + stream, + miniClusterResource + ); + + verifyDeltaTable(sinkTablePath, rowType, expectedRowCount); + } + + @Test + public void testEndToEndReadAllDataTypes() throws Exception { + + // this test uses test-non-partitioned-delta-table-alltypes table. See README.md from + // table's folder for detail information about this table. + DeltaTestUtils.initTestForAllDataTypes(sourceTablePath); + + DeltaSource deltaSource = DeltaSource.forBoundedRowData( + new Path(sourceTablePath), + DeltaTestUtils.getHadoopConf() + ) + .build(); + + RowType rowType = RowType.of(ALL_DATA_TABLE_COLUMN_TYPES, ALL_DATA_TABLE_COLUMN_NAMES); + DeltaSinkInternal deltaSink = DeltaSink.forRowData( + new Path(sinkTablePath), + DeltaTestUtils.getHadoopConf(), + rowType) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 1000)); + + DataStream stream = + env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source"); + stream.sinkTo(deltaSink); + + DeltaTestUtils.testBoundedStream(stream, miniClusterResource); + + Snapshot snapshot = verifyDeltaTable(sinkTablePath, rowType, ALL_DATA_TABLE_RECORD_COUNT); + + // read entire snapshot using delta standalone and check every column. + final AtomicInteger index = new AtomicInteger(0); + try(CloseableIterator iterator = snapshot.open()) { + while (iterator.hasNext()) { + final int i = index.getAndIncrement(); + + BigDecimal expectedBigDecimal = BigDecimal.valueOf((double) i).setScale(18); + + RowRecord row = iterator.next(); + LOG.info("Row Content: " + row.toString()); + assertAll(() -> { + assertThat( + row.getByte(ALL_DATA_TABLE_COLUMN_NAMES[0]), + equalTo(new Integer(i).byteValue()) + ); + assertThat( + row.getShort(ALL_DATA_TABLE_COLUMN_NAMES[1]), + equalTo((short) i) + ); + assertThat(row.getInt(ALL_DATA_TABLE_COLUMN_NAMES[2]), equalTo(i)); + assertThat( + row.getDouble(ALL_DATA_TABLE_COLUMN_NAMES[3]), + equalTo(new Integer(i).doubleValue()) + ); + assertThat( + row.getFloat(ALL_DATA_TABLE_COLUMN_NAMES[4]), + equalTo(new Integer(i).floatValue()) + ); + + // In Source Table this column was generated as: BigInt(x) + assertThat( + row.getBigDecimal(ALL_DATA_TABLE_COLUMN_NAMES[5]), + equalTo(expectedBigDecimal) + ); + + // In Source Table this column was generated as: BigDecimal(x), + // There is a problem with parquet library used by delta standalone when + // reading BigDecimal values. The issue should be resolved + // after https://github.com/delta-io/connectors/pull/303 + if (i > 0) { + assertThat( + row.getBigDecimal(ALL_DATA_TABLE_COLUMN_NAMES[6]), + not(equalTo(expectedBigDecimal)) + ); + } + + // same value for all columns + assertThat( + row.getTimestamp(ALL_DATA_TABLE_COLUMN_NAMES[7]) + .toLocalDateTime().toInstant(ZoneOffset.UTC), + equalTo(Timestamp.valueOf("2022-06-14 18:54:24.547557") + .toLocalDateTime().toInstant(ZoneOffset.UTC)) + ); + assertThat( + row.getString(ALL_DATA_TABLE_COLUMN_NAMES[8]), + equalTo(String.valueOf(i)) + ); + + // same value for all columns + assertThat(row.getBoolean(ALL_DATA_TABLE_COLUMN_NAMES[9]), equalTo(true)); + } + ); + } + } + } + + /** + * Verifies if Delta table under parameter {@code sinPath} contains expected number of rows with + * given rowType format. + * + * @param sinkPath Path to Delta table. + * @param rowType {@link RowType} for test Delta table. + * @param expectedNumberOfRecords expected number of row in Delta table. + * @return Head snapshot of Delta table. + * @throws IOException If any issue while reading Delta Table. + */ + @SuppressWarnings("unchecked") + private Snapshot verifyDeltaTable( + String sinkPath, + RowType rowType, + Integer expectedNumberOfRecords) throws IOException { + + DeltaLog deltaLog = DeltaLog.forTable(DeltaTestUtils.getHadoopConf(), sinkPath); + Snapshot snapshot = deltaLog.snapshot(); + List deltaFiles = snapshot.getAllFiles(); + int finalTableRecordsCount = TestParquetReader + .readAndValidateAllTableRecords( + deltaLog, + rowType, + DataFormatConverters.getConverterForDataType( + TypeConversions.fromLogicalToDataType(rowType))); + long finalVersion = snapshot.getVersion(); + + LOG.info( + String.format( + "RESULTS: final record count: [%d], final table version: [%d], number of Delta " + + "files: [%d]", + finalTableRecordsCount, + finalVersion, + deltaFiles.size() + ) + ); + + assertThat(finalTableRecordsCount, equalTo(expectedNumberOfRecords)); + return snapshot; + } +} diff --git a/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java b/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java index 2548e7ea705..da90db7959b 100644 --- a/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java +++ b/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java @@ -125,7 +125,8 @@ public void runDeltaSinkTest() throws Exception { List initialDeltaFiles = deltaLog.snapshot().getAllFiles(); long initialVersion = deltaLog.snapshot().getVersion(); - int initialTableRecordsCount = TestParquetReader.readAndValidateAllTableRecords(deltaLog); + int initialTableRecordsCount = TestParquetReader + .readAndValidateAllTableRecords(deltaLog); assertEquals(2, initialTableRecordsCount); JobGraph jobGraph = createJobGraph(deltaTablePath); diff --git a/flink/src/test/java/io/delta/flink/sink/DeltaSinkWriteReadITCase.java b/flink/src/test/java/io/delta/flink/sink/DeltaSinkWriteReadITCase.java index bbb0aef7d5f..6e5f5d31b44 100644 --- a/flink/src/test/java/io/delta/flink/sink/DeltaSinkWriteReadITCase.java +++ b/flink/src/test/java/io/delta/flink/sink/DeltaSinkWriteReadITCase.java @@ -230,6 +230,7 @@ private static StreamExecutionEnvironment getTestStreamEnv() { return env; } + @SuppressWarnings("unchecked") private static List rowToRowData(RowType rowType, Row row) { DataFormatConverters.DataFormatConverter CONVERTER = diff --git a/flink/src/test/java/io/delta/flink/source/DeltaSourceBoundedExecutionITCaseTest.java b/flink/src/test/java/io/delta/flink/source/DeltaSourceBoundedExecutionITCaseTest.java index 1819e1d311e..f022cc5562f 100644 --- a/flink/src/test/java/io/delta/flink/source/DeltaSourceBoundedExecutionITCaseTest.java +++ b/flink/src/test/java/io/delta/flink/source/DeltaSourceBoundedExecutionITCaseTest.java @@ -7,30 +7,47 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; -import io.delta.flink.utils.ContinuousTestDescriptor.Descriptor; import io.delta.flink.utils.DeltaTableUpdater; import io.delta.flink.utils.DeltaTestUtils; import io.delta.flink.utils.FailoverType; import io.delta.flink.utils.RecordCounterToFail.FailCheck; +import io.delta.flink.utils.TestDescriptor; +import io.delta.flink.utils.TestDescriptor.Descriptor; import io.github.artsok.ParameterizedRepeatedIfExceptionsTest; import io.github.artsok.RepeatedIfExceptionsTest; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static io.delta.flink.utils.ExecutionITCaseTestConstants.*; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.jupiter.api.Assertions.assertAll; public class DeltaSourceBoundedExecutionITCaseTest extends DeltaSourceITBase { + private static final Logger LOG = + LoggerFactory.getLogger(DeltaSourceBoundedExecutionITCaseTest.class); + @BeforeAll public static void beforeAll() throws IOException { DeltaSourceITBase.beforeAll(); @@ -129,11 +146,147 @@ public void shouldReadLoadedSchemaVersion() throws Exception { // We are expecting to read version 0, before table update. assertThat(rowData.size(), equalTo(SMALL_TABLE_COUNT)); + } + /** + * @return Stream of test {@link Arguments} elements. Arguments are in order: + *
    + *
  • Snapshot version used as a value of "versionAsOf" option.
  • + *
  • Expected number of record for version defined by versionAsOf
  • + *
  • Highest value of col1 column for version defined by versionAsOf
  • + *
+ */ + private static Stream versionAsOfArguments() { + return Stream.of( + Arguments.of(0, 5, 4), + Arguments.of(1, 15, 14), + Arguments.of(2, 35, 34), + Arguments.of(3, 75, 74) + ); + } + + @ParameterizedRepeatedIfExceptionsTest( + suspend = 2000L, + repeats = 3, + name = + "{index}: versionAsOf = [{0}], " + + "Expected Number of rows = [{1}], " + + "End Index = [{2}]" + ) + @MethodSource("versionAsOfArguments") + public void shouldReadVersionAsOf( + long versionAsOf, + int expectedNumberOfRow, + int endIndex) throws Exception { + + // this test uses test-non-partitioned-delta-table-4-versions table. See README.md from + // table's folder for detail information about this table. + String sourceTablePath = TMP_FOLDER.newFolder().getAbsolutePath(); + DeltaTestUtils.initTestForVersionedTable(sourceTablePath); + + DeltaSource deltaSource = DeltaSource + .forBoundedRowData( + new Path(sourceTablePath), + DeltaTestUtils.getHadoopConf()) + .versionAsOf(versionAsOf) + .build(); + + List rowData = testBoundedDeltaSource(deltaSource); + + assertRows("versionAsOf " + versionAsOf, expectedNumberOfRow, endIndex, rowData); + } + + private static final String[] timestampAsOfValues = { + "2022-06-15 13:24:33.613", + "2022-06-15 13:25:33.632", + "2022-06-15 13:26:33.633", + "2022-06-15 13:27:33.634" + }; + + /** + * @return Stream of test {@link Arguments} elements. Arguments are in order: + *
    + *
  • Timestamp used as a value of "timestampAsOf" option.
  • + *
  • Expected number of record in version defined by timestampAsOf
  • + *
  • Highest value of col1 column for version defined by versionAsOf
  • + *
+ */ + private static Stream timestampAsOfArguments() { + return Stream.of( + Arguments.of(timestampAsOfValues[0], 5, 4), + Arguments.of(timestampAsOfValues[1], 15, 14), + Arguments.of(timestampAsOfValues[2], 35, 34), + Arguments.of(timestampAsOfValues[3], 75, 74) + ); + } + + @ParameterizedRepeatedIfExceptionsTest( + suspend = 2000L, + repeats = 3, + name = + "{index}: timestampAsOf = [{0}], " + + "Expected Number of rows = [{1}], " + + "End Index = [{2}]" + ) + @MethodSource("timestampAsOfArguments") + public void shouldReadTimestampAsOf( + String timestampAsOf, + int expectedNumberOfRow, + int endIndex) throws Exception { + + // this test uses test-non-partitioned-delta-table-4-versions table. See README.md from + // table's folder for detail information about this table. + String sourceTablePath = TMP_FOLDER.newFolder().getAbsolutePath(); + DeltaTestUtils.initTestForVersionedTable(sourceTablePath); + + // Delta standalone uses "last modification time" file attribute for providing commits + // before/after or at timestamp. It Does not use an actually commits creation timestamp + // from Delta's log. + changeDeltaLogLastModifyTimestamp(sourceTablePath, timestampAsOfValues); + + DeltaSource deltaSource = DeltaSource + .forBoundedRowData( + new Path(sourceTablePath), + DeltaTestUtils.getHadoopConf()) + .timestampAsOf(timestampAsOf) + .build(); + + List rowData = testBoundedDeltaSource(deltaSource); + + assertRows("timestampAsOf " + timestampAsOf, expectedNumberOfRow, endIndex, rowData); + } + + private void assertRows( + String sizeMsg, + int expectedNumberOfRow, + int endIndex, + List rowData) { + + String rangeMessage = "Index value for col1 should be in range of <0 - " + endIndex + ">"; + + assertAll(() -> { + assertThat( + "Source read different number of rows that expected for " + sizeMsg, + rowData.size(), equalTo(expectedNumberOfRow) + ); + rowData.forEach(row -> { + LOG.info("Row content " + row); + long col1Val = row.getLong(0); + assertThat(rangeMessage + " but was " + col1Val, col1Val >= 0, equalTo(true)); + assertThat( + rangeMessage + " but was " + col1Val, + col1Val <= endIndex, + equalTo(true) + ); + }); + } + ); } @Override - protected List testWithPartitions(DeltaSource deltaSource) throws Exception { + protected List testSource( + DeltaSource deltaSource, + TestDescriptor testDescriptor) throws Exception { return testBoundedDeltaSource(deltaSource); } @@ -189,4 +342,63 @@ private void shouldReadDeltaTable( assertThat("Source Must Have produced some duplicates.", actualValues.size(), equalTo(LARGE_TABLE_RECORD_COUNT)); } + + /** + * Base method used for testing {@link DeltaSource} in {@link Boundedness#BOUNDED} mode. This + * method creates a {@link StreamExecutionEnvironment} and uses provided {@code + * DeltaSource} instance without any failover. + * + * @param source The {@link DeltaSource} that should be used in this test. + * @param Type of objects produced by source. + * @return A {@link List} of produced records. + */ + private List testBoundedDeltaSource(DeltaSource source) + throws Exception { + + // Since we don't do any failover here (used FailoverType.NONE) we don't need any + // actually FailCheck. + // We do need to pass the check at least once, to call + // RecordCounterToFail#continueProcessing.get() hence (FailCheck) integer -> true + return testBoundedDeltaSource(FailoverType.NONE, source, (FailCheck) integer -> true); + } + + /** + * Base method used for testing {@link DeltaSource} in {@link Boundedness#BOUNDED} mode. This + * method creates a {@link StreamExecutionEnvironment} and uses provided {@code DeltaSource} + * instance. + *

+ *

+ * The created environment can perform a failover after condition described by {@link FailCheck} + * which is evaluated every record produced by {@code DeltaSource} + * + * @param failoverType The {@link FailoverType} type that should be performed for given test + * setup. + * @param source The {@link DeltaSource} that should be used in this test. + * @param failCheck The {@link FailCheck} condition which is evaluated for every row produced + * by source. + * @param Type of objects produced by source. + * @return A {@link List} of produced records. + * @implNote For Implementation details please refer to + * {@link DeltaTestUtils#testBoundedStream(FailoverType, + * FailCheck, DataStream, MiniClusterWithClientResource)} method. + */ + private List testBoundedDeltaSource(FailoverType failoverType, DeltaSource source, + FailCheck failCheck) throws Exception { + + if (source.getBoundedness() != Boundedness.BOUNDED) { + throw new RuntimeException( + "Not using Bounded source in Bounded test setup. This will not work properly."); + } + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 1000)); + + DataStream stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "delta-source"); + + return DeltaTestUtils + .testBoundedStream(failoverType, failCheck, stream, miniClusterResource); + } } diff --git a/flink/src/test/java/io/delta/flink/source/DeltaSourceContinuousExecutionITCaseTest.java b/flink/src/test/java/io/delta/flink/source/DeltaSourceContinuousExecutionITCaseTest.java index 8c218b0e868..a2988f6a0a7 100644 --- a/flink/src/test/java/io/delta/flink/source/DeltaSourceContinuousExecutionITCaseTest.java +++ b/flink/src/test/java/io/delta/flink/source/DeltaSourceContinuousExecutionITCaseTest.java @@ -2,6 +2,7 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -12,19 +13,21 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; +import java.util.stream.Stream; import io.delta.flink.source.internal.DeltaSourceConfiguration; import io.delta.flink.source.internal.DeltaSourceOptions; -import io.delta.flink.utils.ContinuousTestDescriptor; -import io.delta.flink.utils.ContinuousTestDescriptor.Descriptor; import io.delta.flink.utils.DeltaTableUpdater; import io.delta.flink.utils.DeltaTestUtils; import io.delta.flink.utils.FailoverType; import io.delta.flink.utils.RecordCounterToFail.FailCheck; import io.delta.flink.utils.TableUpdateDescriptor; +import io.delta.flink.utils.TestDescriptor; +import io.delta.flink.utils.TestDescriptor.Descriptor; import io.github.artsok.ParameterizedRepeatedIfExceptionsTest; import io.github.artsok.RepeatedIfExceptionsTest; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamUtils; @@ -32,29 +35,28 @@ import org.apache.flink.streaming.api.operators.collect.ClientAndIterator; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static io.delta.flink.utils.ExecutionITCaseTestConstants.*; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.fail; public class DeltaSourceContinuousExecutionITCaseTest extends DeltaSourceITBase { - /** - * Number of updates done on Delta table, where each updated is bounded into one transaction - */ - private static final int NUMBER_OF_TABLE_UPDATE_BULKS = 5; - - /** - * Number of rows added per each update of Delta table - */ - private static final int ROWS_PER_TABLE_UPDATE = 5; + private static final Logger LOG = + LoggerFactory.getLogger(DeltaSourceContinuousExecutionITCaseTest.class); /** * Number of rows in Delta table before inserting a new data into it. @@ -94,7 +96,7 @@ public void shouldReadTableWithNoUpdates(FailoverType failoverType) throws Excep // Fail TaskManager or JobManager after half of the records or do not fail anything if // FailoverType.NONE. List> resultData = testContinuousDeltaSource(failoverType, deltaSource, - new ContinuousTestDescriptor( + new TestDescriptor( deltaSource.getTablePath().toUri().toString(), INITIAL_DATA_SIZE), (FailCheck) readRows -> readRows == SMALL_TABLE_COUNT / 2); @@ -128,7 +130,7 @@ public void shouldReadLargeDeltaTableWithNoUpdates(FailoverType failoverType) th // WHEN List> resultData = testContinuousDeltaSource(failoverType, deltaSource, - new ContinuousTestDescriptor( + new TestDescriptor( deltaSource.getTablePath().toUri().toString(), LARGE_TABLE_RECORD_COUNT), (FailCheck) readRows -> readRows == LARGE_TABLE_RECORD_COUNT / 2); @@ -276,7 +278,7 @@ public void shouldReadLoadedSchemaVersion() throws Exception { // Read data Future> dataFuture = DeltaTestUtils.startInitialResultsFetcherThread( - new ContinuousTestDescriptor( + new TestDescriptor( source.getTablePath().toUri().toString(), versionOneUpdate.getNumberOfNewRows() + versionTwoUpdate.getNumberOfNewRows()), client, @@ -322,12 +324,169 @@ public void shouldReadLoadedSchemaVersion() throws Exception { ); } + /** + * @return Stream of test {@link Arguments} elements. Arguments are in order: + *

    + *
  • Version used as a value of "startingVersion" option.
  • + *
  • Expected number of record/changes read starting from version defined by + * startingVersion
  • + *
  • Lowest expected value of col1 column for version defined by startingVersion
  • + *
+ */ + private static Stream startingVersionArguments() { + return Stream.of( + // Skipping version 0 due to know issue of not supporting Metadata and Protocol actions + // Waiting for Delta standalone enhancement. + // Arguments.of(0, 75, 0), + Arguments.of(1, 70, 5), + Arguments.of(2, 60, 15), + Arguments.of(3, 40, 35) + ); + } + + @ParameterizedRepeatedIfExceptionsTest( + suspend = 2000L, + repeats = 3, + name = + "{index}: startingVersion = [{0}], " + + "Expected Number of rows = [{1}], " + + "Start Index = [{2}]" + ) + @MethodSource("startingVersionArguments") + public void shouldReadStartingVersion( + long versionAsOf, + int expectedNumberOfRow, + int startIndex) throws Exception { + + // this test uses test-non-partitioned-delta-table-4-versions table. See README.md from + // table's folder for detail information about this table. + String sourceTablePath = TMP_FOLDER.newFolder().getAbsolutePath(); + DeltaTestUtils.initTestForVersionedTable(sourceTablePath); + + DeltaSource deltaSource = DeltaSource + .forContinuousRowData( + new Path(sourceTablePath), + DeltaTestUtils.getHadoopConf()) + .startingVersion(versionAsOf) + .build(); + + List rowData = testContinuousDeltaSource( + deltaSource, + new TestDescriptor(sourceTablePath, expectedNumberOfRow) + ); + + assertRows("startingVersion " + versionAsOf, expectedNumberOfRow, startIndex, rowData); + } + + private static final String[] startingTimestampValues = { + "2022-06-15 13:23:33.613", + "2022-06-15 13:24:33.630", + "2022-06-15 13:25:33.633", + "2022-06-15 13:26:33.634", + }; + + /** + * @return Stream of test {@link Arguments} elements. Arguments are in order: + *
    + *
  • Version used as a value of "startingTimestamp" option.
  • + *
  • Expected number of record/changes read starting from version defined by + * startingTimestamp
  • + *
  • Lowest expected value of col1 column for version defined by startingTimestamp
  • + *
+ */ + private static Stream startingTimestampArguments() { + return Stream.of( + // Skipping version 0 due to know issue of not supporting Metadata and Protocol actions + // Waiting for Delta standalone enhancement. + // Arguments.of(startingTimestampValues[0], 75, 0), + Arguments.of(startingTimestampValues[1], 70, 5), + Arguments.of(startingTimestampValues[2], 60, 15), + Arguments.of(startingTimestampValues[3], 40, 35) + ); + } + + @ParameterizedRepeatedIfExceptionsTest( + suspend = 2000L, + repeats = 3, + name = + "{index}: startingTimestamp = [{0}], " + + "Expected Number of rows = [{1}], " + + "Start Index = [{2}]" + ) + @MethodSource("startingTimestampArguments") + public void shouldReadStartingTimestamp( + String startingTimestamp, + int expectedNumberOfRow, + int startIndex) throws Exception { + + LOG.info("Running shouldReadStartingTimestamp test for startingTimestamp - " + + startingTimestamp); + // this test uses test-non-partitioned-delta-table-4-versions table. See README.md from + // table's folder for detail information about this table. + String sourceTablePath = TMP_FOLDER.newFolder().getAbsolutePath(); + DeltaTestUtils.initTestForVersionedTable(sourceTablePath); + + // Delta standalone uses "last modification time" file attribute for providing commits + // before/after or at timestamp. It Does not use an actually commits creation timestamp + // from Delta's log. + changeDeltaLogLastModifyTimestamp(sourceTablePath, startingTimestampValues); + + DeltaSource deltaSource = DeltaSource + .forContinuousRowData( + new Path(sourceTablePath), + DeltaTestUtils.getHadoopConf()) + .startingTimestamp(startingTimestamp) + .build(); + + List rowData = testContinuousDeltaSource( + deltaSource, + new TestDescriptor(sourceTablePath, expectedNumberOfRow) + ); + + assertRows( + "startingTimestamp " + startingTimestamp, + expectedNumberOfRow, + startIndex, + rowData + ); + } + + private void assertRows( + String sizeMsg, + int expectedNumberOfRow, + int startIndex, + List rowData) { + + String rangeMessage = + "Index value for col1 should be in range of <" + startIndex + " - 74>"; + + assertAll(() -> { + assertThat( + "Source read different number of rows that expected for " + sizeMsg, + rowData.size(), equalTo(expectedNumberOfRow) + ); + rowData.forEach(row -> { + LOG.info("Row content " + row); + long col1Val = row.getLong(0); + assertThat( + rangeMessage + " but was " + col1Val, + col1Val >= startIndex, + equalTo(true) + ); + assertThat(rangeMessage + " but was " + col1Val, col1Val <= 74, equalTo(true)); + }); + } + ); + } + @Override - protected List testWithPartitions(DeltaSource deltaSource) throws Exception { + protected List testSource( + DeltaSource deltaSource, + TestDescriptor testDescriptor) throws Exception { return testContinuousDeltaSource( FailoverType.NONE, deltaSource, - new ContinuousTestDescriptor(deltaSource.getTablePath().toUri().toString(), 2), + testDescriptor, (FailCheck) integer -> true) .get(0); } @@ -370,11 +529,14 @@ private void shouldReadDeltaTableFromSnapshotAndUpdates( FailoverType failoverType) throws Exception { - ContinuousTestDescriptor testDescriptor = DeltaTestUtils.prepareTableUpdates( + int numberOfTableUpdateBulks = 5; + int rowsPerTableUpdate = 5; + + TestDescriptor testDescriptor = DeltaTestUtils.prepareTableUpdates( deltaSource.getTablePath().toUri().toString(), RowType.of(DATA_COLUMN_TYPES, DATA_COLUMN_NAMES), INITIAL_DATA_SIZE, - new TableUpdateDescriptor(NUMBER_OF_TABLE_UPDATE_BULKS, ROWS_PER_TABLE_UPDATE) + new TableUpdateDescriptor(numberOfTableUpdateBulks, rowsPerTableUpdate) ); // WHEN @@ -382,7 +544,7 @@ private void shouldReadDeltaTableFromSnapshotAndUpdates( testContinuousDeltaSource(failoverType, deltaSource, testDescriptor, (FailCheck) readRows -> readRows == - (INITIAL_DATA_SIZE + NUMBER_OF_TABLE_UPDATE_BULKS * ROWS_PER_TABLE_UPDATE) + (INITIAL_DATA_SIZE + numberOfTableUpdateBulks * rowsPerTableUpdate) / 2); int totalNumberOfRows = resultData.stream().mapToInt(List::size).sum(); @@ -399,9 +561,85 @@ private void shouldReadDeltaTableFromSnapshotAndUpdates( // THEN assertThat("Source read different number of rows that Delta Table have.", totalNumberOfRows, - equalTo(INITIAL_DATA_SIZE + NUMBER_OF_TABLE_UPDATE_BULKS * ROWS_PER_TABLE_UPDATE)); + equalTo(INITIAL_DATA_SIZE + numberOfTableUpdateBulks * rowsPerTableUpdate)); assertThat("Source Produced Different Rows that were in Delta Table", uniqueValues.size(), - equalTo(INITIAL_DATA_SIZE + NUMBER_OF_TABLE_UPDATE_BULKS * ROWS_PER_TABLE_UPDATE)); + equalTo(INITIAL_DATA_SIZE + numberOfTableUpdateBulks * rowsPerTableUpdate)); + } + + /** + * Base method used for testing {@link DeltaSource} in {@link Boundedness#CONTINUOUS_UNBOUNDED} + * mode. This method creates a {@link StreamExecutionEnvironment} and uses provided {@code + * DeltaSource} instance without any failover. + * + * @param source The {@link DeltaSource} that should be used in this test. + * @param testDescriptor The {@link TestDescriptor} used for test run. + * @param Type of objects produced by source. + * @return A {@link List} of produced records. + */ + private List testContinuousDeltaSource( + DeltaSource source, + TestDescriptor testDescriptor) + throws Exception { + + // Since we don't do any failover here (used FailoverType.NONE) we don't need any + // actually FailCheck. + // We do need to pass the check at least once, to call + // RecordCounterToFail#continueProcessing.get() hence (FailCheck) integer -> true + List> tmpResult = testContinuousDeltaSource( + FailoverType.NONE, + source, + testDescriptor, + (FailCheck) integer -> true + ); + + ArrayList result = new ArrayList<>(); + for (List list : tmpResult) { + result.addAll(list); + } + + return result; + } + + /** + * Base method used for testing {@link DeltaSource} in {@link Boundedness#CONTINUOUS_UNBOUNDED} + * mode. This method creates a {@link StreamExecutionEnvironment} and uses provided {@code + * DeltaSource} instance. + *

+ *

+ * The created environment can perform a failover after condition described by {@link FailCheck} + * which is evaluated every record produced by {@code DeltaSource} + * + * @param failoverType The {@link FailoverType} type that should be performed for given test + * setup. + * @param source The {@link DeltaSource} that should be used in this test. + * @param testDescriptor The {@link TestDescriptor} used for test run. + * @param failCheck The {@link FailCheck} condition which is evaluated for every row + * produced by source. + * @param Type of objects produced by source. + * @return A {@link List} of produced records. + * @implNote For Implementation details please refer to + * {@link DeltaTestUtils#testContinuousStream(FailoverType, + * TestDescriptor, FailCheck, DataStream, MiniClusterWithClientResource)} + */ + private List> testContinuousDeltaSource( + FailoverType failoverType, + DeltaSource source, + TestDescriptor testDescriptor, + FailCheck failCheck) + throws Exception { + + StreamExecutionEnvironment env = prepareStreamingEnvironment(source); + + DataStream stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "delta-source"); + + return DeltaTestUtils.testContinuousStream( + failoverType, + testDescriptor, + failCheck, + stream, + miniClusterResource + ); } } diff --git a/flink/src/test/java/io/delta/flink/source/DeltaSourceITBase.java b/flink/src/test/java/io/delta/flink/source/DeltaSourceITBase.java index 37b6184d80b..4eb10ed689b 100644 --- a/flink/src/test/java/io/delta/flink/source/DeltaSourceITBase.java +++ b/flink/src/test/java/io/delta/flink/source/DeltaSourceITBase.java @@ -1,26 +1,32 @@ package io.delta.flink.source; import java.io.IOException; +import java.math.BigDecimal; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Timestamp; +import java.time.ZoneOffset; +import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import io.delta.flink.utils.ContinuousTestDescriptor; +import io.delta.flink.source.internal.enumerator.supplier.TimestampFormatConverter; import io.delta.flink.utils.DeltaTestUtils; import io.delta.flink.utils.ExecutionITCaseTestConstants; -import io.delta.flink.utils.FailoverType; -import io.delta.flink.utils.RecordCounterToFail.FailCheck; +import io.delta.flink.utils.TestDescriptor; import io.github.artsok.RepeatedIfExceptionsTest; import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.connector.source.Boundedness; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.RowData; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.TestLogger; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static io.delta.flink.utils.DeltaTestUtils.buildCluster; import static io.delta.flink.utils.ExecutionITCaseTestConstants.*; import static org.hamcrest.MatcherAssert.assertThat; @@ -30,6 +36,8 @@ public abstract class DeltaSourceITBase extends TestLogger { + private static final Logger LOG = LoggerFactory.getLogger(DeltaSourceITBase.class); + protected static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); protected static final int PARALLELISM = 4; @@ -98,7 +106,10 @@ public void testReadPartitionedTableSkippingPartitionColumns() throws Exception ); // WHEN - List resultData = testWithPartitions(deltaSource); + List resultData = this.testSource( + deltaSource, + new TestDescriptor(partitionedTablePath, 2) + ); List readNames = resultData.stream() @@ -142,7 +153,10 @@ public void testReadOnlyPartitionColumns() throws Exception { ); // WHEN - List resultData = testWithPartitions(deltaSource); + List resultData = this.testSource( + deltaSource, + new TestDescriptor(partitionedTablePath, 2) + ); // THEN assertThat("Source read different number of rows that Delta Table have.", @@ -175,7 +189,10 @@ public void testWithOnePartition() throws Exception { ); // WHEN - List resultData = testWithPartitions(deltaSource); + List resultData = this.testSource( + deltaSource, + new TestDescriptor(partitionedTablePath, 2) + ); Set readSurnames = resultData.stream().map(row -> row.getString(0).toString()).collect(Collectors.toSet()); @@ -212,7 +229,10 @@ public void testWithBothPartitions() throws Exception { DeltaSource deltaSource = initSourceAllColumns(partitionedTablePath); // WHEN - List resultData = testWithPartitions(deltaSource); + List resultData = this.testSource( + deltaSource, + new TestDescriptor(partitionedTablePath, 2) + ); List readNames = resultData.stream() @@ -254,114 +274,79 @@ public void testWithBothPartitions() throws Exception { assertNoMoreColumns(resultData,5); } - protected abstract DeltaSource initSourceAllColumns(String tablePath); - - protected abstract DeltaSource initSourceForColumns( - String tablePath, - String[] columnNames); + @RepeatedIfExceptionsTest(suspend = 2000L, repeats = 3) + public void shouldReadTableWithAllDataTypes() throws Exception { + String sourceTablePath = TMP_FOLDER.newFolder().getAbsolutePath(); + DeltaTestUtils.initTestForAllDataTypes(sourceTablePath); - protected abstract List testWithPartitions(DeltaSource deltaSource) - throws Exception; + DeltaSource deltaSource = initSourceAllColumns(sourceTablePath); - /** - * Base method used for testing {@link DeltaSource} in {@link Boundedness#BOUNDED} mode. This - * method creates a {@link StreamExecutionEnvironment} and uses provided {@code - * DeltaSource} instance without any failover. - * - * @param source The {@link DeltaSource} that should be used in this test. - * @param Type of objects produced by source. - * @return A {@link List} of produced records. - */ - protected List testBoundedDeltaSource(DeltaSource source) - throws Exception { - - // Since we don't do any failover here (used FailoverType.NONE) we don't need any - // actually FailCheck. - // We do need to pass the check at least once, to call - // RecordCounterToFail#continueProcessing.get() hence (FailCheck) integer -> true - return testBoundedDeltaSource(FailoverType.NONE, source, (FailCheck) integer -> true); - } + List rowData = this.testSource( + deltaSource, + new TestDescriptor(sourceTablePath, ALL_DATA_TABLE_RECORD_COUNT) + ); - /** - * Base method used for testing {@link DeltaSource} in {@link Boundedness#BOUNDED} mode. This - * method creates a {@link StreamExecutionEnvironment} and uses provided {@code DeltaSource} - * instance. - *

- *

- * The created environment can perform a failover after condition described by {@link FailCheck} - * which is evaluated every record produced by {@code DeltaSource} - * - * @param failoverType The {@link FailoverType} type that should be performed for given test - * setup. - * @param source The {@link DeltaSource} that should be used in this test. - * @param failCheck The {@link FailCheck} condition which is evaluated for every row produced - * by source. - * @param Type of objects produced by source. - * @return A {@link List} of produced records. - * @implNote For Implementation details please refer to - * {@link DeltaTestUtils#testBoundedStream(FailoverType, - * FailCheck, DataStream, MiniClusterWithClientResource)} method. - */ - protected List testBoundedDeltaSource(FailoverType failoverType, DeltaSource source, - FailCheck failCheck) throws Exception { + assertThat( + "Source read different number of records than expected.", + rowData.size(), + equalTo(5) + ); - if (source.getBoundedness() != Boundedness.BOUNDED) { - throw new RuntimeException( - "Not using Bounded source in Bounded test setup. This will not work properly."); + Iterator rowDataIterator = rowData.iterator(); + AtomicInteger index = new AtomicInteger(0); + while (rowDataIterator.hasNext()) { + int i = index.getAndIncrement(); + RowData row = rowDataIterator.next(); + LOG.info("Row Content: " + row); + assertRowValues(i, row); } + } - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARALLELISM); - env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 1000)); + private void assertRowValues(int i, RowData row) { + assertAll(() -> { + assertThat(row.getByte(0), equalTo(new Integer(i).byteValue())); + assertThat(row.getShort(1), equalTo((short) i)); + assertThat(row.getInt(2), equalTo(i)); + assertThat(row.getDouble(3), equalTo(new Integer(i).doubleValue())); + assertThat(row.getFloat(4), equalTo(new Integer(i).floatValue())); + assertThat( + row.getDecimal(5, 1, 1).toBigDecimal().setScale(18), + equalTo(BigDecimal.valueOf(i).setScale(18)) + ); + assertThat( + row.getDecimal(6, 1, 1).toBigDecimal().setScale(18), + equalTo(BigDecimal.valueOf(i).setScale(18)) + ); + + // same value for all columns + assertThat( + row.getTimestamp(7, 18).toLocalDateTime().toInstant(ZoneOffset.UTC), + equalTo(Timestamp.valueOf("2022-06-14 18:54:24.547557") + .toLocalDateTime().toInstant(ZoneOffset.UTC)) + ); + assertThat(row.getString(8).toString(), equalTo(String.valueOf(i))); + + // same value for all columns + assertThat(row.getBoolean(9), equalTo(true)); + } + ); + } - DataStream stream = - env.fromSource(source, WatermarkStrategy.noWatermarks(), "delta-source"); + protected abstract DeltaSource initSourceAllColumns(String tablePath); - return DeltaTestUtils - .testBoundedStream(failoverType, failCheck, stream, miniClusterResource); - } + protected abstract DeltaSource initSourceForColumns( + String tablePath, + String[] columnNames); /** - * Base method used for testing {@link DeltaSource} in {@link Boundedness#CONTINUOUS_UNBOUNDED} - * mode. This method creates a {@link StreamExecutionEnvironment} and uses provided {@code - * DeltaSource} instance. - *

- *

- * The created environment can perform a failover after condition described by {@link FailCheck} - * which is evaluated every record produced by {@code DeltaSource} - * - * @param failoverType The {@link FailoverType} type that should be performed for given test - * setup. - * @param source The {@link DeltaSource} that should be used in this test. - * @param failCheck The {@link FailCheck} condition which is evaluated for every row produced - * by source. - * @param Type of objects produced by source. + * Test a source without failover setup. + * @param deltaSource delta source to test. + * @param testDescriptor A {@link TestDescriptor} for this test run. * @return A {@link List} of produced records. - * @implNote For Implementation details please refer to - * {@link DeltaTestUtils#testContinuousStream(FailoverType, ContinuousTestDescriptor, - * FailCheck, DataStream, MiniClusterWithClientResource)} */ - protected List> testContinuousDeltaSource( - FailoverType failoverType, - DeltaSource source, - ContinuousTestDescriptor testDescriptor, - FailCheck failCheck) - throws Exception { - - StreamExecutionEnvironment env = prepareStreamingEnvironment(source); - - DataStream stream = - env.fromSource(source, WatermarkStrategy.noWatermarks(), "delta-source"); - - return DeltaTestUtils.testContinuousStream( - failoverType, - testDescriptor, - failCheck, - stream, - miniClusterResource - ); - } + protected abstract List testSource( + DeltaSource deltaSource, + TestDescriptor testDescriptor) throws Exception; protected void assertPartitionValue( RowData rowData, @@ -395,6 +380,44 @@ protected StreamExecutionEnvironment prepareStreamingEnvironment( return env; } + /** + * Changes last modification time for delta log files. + * + * @param sourceTablePath Path to delta log to change last modification time. + * @param lastModifyValues An array of times to which last modification time should be change + * to. + */ + protected void changeDeltaLogLastModifyTimestamp( + String sourceTablePath, + String[] lastModifyValues) throws IOException { + + List sortedLogFiles = + Files.list(Paths.get(sourceTablePath + "/_delta_log")) + .filter(file -> file.getFileName().toUri().toString().endsWith(".json")) + .sorted() + .collect(Collectors.toList()); + + assertThat( + "Delta log for table " + sourceTablePath + " size, does not match" + + " test's last modify argument size " + lastModifyValues.length, + sortedLogFiles.size(), + equalTo(lastModifyValues.length) + ); + + int i = 0; + for (java.nio.file.Path logFile : sortedLogFiles) { + String timestampAsOfValue = lastModifyValues[i++]; + long toTimestamp = TimestampFormatConverter.convertToTimestamp(timestampAsOfValue); + LOG.info( + "Changing Last Modified timestamp on file " + logFile + + " to " + timestampAsOfValue + " -> " + timestampAsOfValue + ); + assertThat( + "Unable to modify " + logFile + " last modified timestamp.", + logFile.toFile().setLastModified(toTimestamp), equalTo(true)); + } + } + private void assertNoMoreColumns(List resultData, int columnIndex) { resultData.forEach(rowData -> assertThrows( diff --git a/flink/src/test/java/io/delta/flink/source/internal/builder/DefaultOptionTypeConverterTest.java b/flink/src/test/java/io/delta/flink/source/internal/builder/DefaultOptionTypeConverterTest.java index 66b93e663f7..f16049bdc37 100644 --- a/flink/src/test/java/io/delta/flink/source/internal/builder/DefaultOptionTypeConverterTest.java +++ b/flink/src/test/java/io/delta/flink/source/internal/builder/DefaultOptionTypeConverterTest.java @@ -10,7 +10,7 @@ class DefaultOptionTypeConverterTest { - private OptionTypeConverter typeConverter; + private OptionTypeConverter typeConverter; @BeforeEach public void setUp() { diff --git a/flink/src/test/java/io/delta/flink/utils/DeltaTableUpdater.java b/flink/src/test/java/io/delta/flink/utils/DeltaTableUpdater.java index 3badccc908b..f14aff3ee17 100644 --- a/flink/src/test/java/io/delta/flink/utils/DeltaTableUpdater.java +++ b/flink/src/test/java/io/delta/flink/utils/DeltaTableUpdater.java @@ -5,7 +5,7 @@ import java.util.List; import java.util.UUID; -import io.delta.flink.utils.ContinuousTestDescriptor.Descriptor; +import io.delta.flink.utils.TestDescriptor.Descriptor; import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; diff --git a/flink/src/test/java/io/delta/flink/utils/DeltaTestUtils.java b/flink/src/test/java/io/delta/flink/utils/DeltaTestUtils.java index dd1db91a8b5..b56dca35148 100644 --- a/flink/src/test/java/io/delta/flink/utils/DeltaTestUtils.java +++ b/flink/src/test/java/io/delta/flink/utils/DeltaTestUtils.java @@ -49,36 +49,49 @@ public static org.apache.hadoop.conf.Configuration getHadoopConf() { public static final String TEST_DELTA_TABLE_INITIAL_STATE_NP_DIR = "/test-data/test-non-partitioned-delta-table-initial-state"; + public static final String TEST_DELTA_TABLE_INITIAL_STATE_P_DIR = "/test-data/test-partitioned-delta-table-initial-state"; + public static final String TEST_DELTA_LARGE_TABLE_INITIAL_STATE_DIR = "/test-data/test-non-partitioned-delta-table_1100_records"; + public static final String TEST_DELTA_TABLE_ALL_DATA_TYPES = + "/test-data/test-non-partitioned-delta-table-alltypes"; + + public static final String TEST_VERSIONED_DELTA_TABLE = + "/test-data/test-non-partitioned-delta-table-4-versions"; + + public static void initTestForAllDataTypes(String targetTablePath) + throws IOException { + initTestFor(TEST_DELTA_TABLE_ALL_DATA_TYPES, targetTablePath); + } + public static void initTestForNonPartitionedTable(String targetTablePath) throws IOException { - File resourcesDirectory = new File("src/test/resources"); - String initialTablePath = - resourcesDirectory.getAbsolutePath() + TEST_DELTA_TABLE_INITIAL_STATE_NP_DIR; - FileUtils.copyDirectory( - new File(initialTablePath), - new File(targetTablePath)); + initTestFor(TEST_DELTA_TABLE_INITIAL_STATE_NP_DIR, targetTablePath); } public static void initTestForPartitionedTable(String targetTablePath) throws IOException { - File resourcesDirectory = new File("src/test/resources"); - String initialTablePath = - resourcesDirectory.getAbsolutePath() + TEST_DELTA_TABLE_INITIAL_STATE_P_DIR; - FileUtils.copyDirectory( - new File(initialTablePath), - new File(targetTablePath)); + initTestFor(TEST_DELTA_TABLE_INITIAL_STATE_P_DIR, targetTablePath); } public static void initTestForNonPartitionedLargeTable(String targetTablePath) + throws IOException { + initTestFor(TEST_DELTA_LARGE_TABLE_INITIAL_STATE_DIR, targetTablePath); + } + + public static void initTestForVersionedTable(String targetTablePath) + throws IOException { + initTestFor(TEST_VERSIONED_DELTA_TABLE, targetTablePath); + } + + public static void initTestFor(String testDeltaTableInitialStateNpDir, String targetTablePath) throws IOException { File resourcesDirectory = new File("src/test/resources"); String initialTablePath = - resourcesDirectory.getAbsolutePath() + TEST_DELTA_LARGE_TABLE_INITIAL_STATE_DIR; + resourcesDirectory.getAbsolutePath() + testDeltaTableInitialStateNpDir; FileUtils.copyDirectory( new File(initialTablePath), new File(targetTablePath)); @@ -130,6 +143,19 @@ public static MiniClusterWithClientResource buildCluster(int parallelismLevel) { .build()); } + public static List testBoundedStream( + DataStream stream, + MiniClusterWithClientResource miniClusterResource) + throws Exception { + + return testBoundedStream( + FailoverType.NONE, + (FailCheck) integer -> true, + stream, + miniClusterResource + ); + } + /** * A utility method to test bounded {@link DataStream} with failover scenarios. *

@@ -230,7 +256,7 @@ public static List testBoundedStream( * * @param failoverType The {@link FailoverType} type that should be performed for given * test setup. - * @param testDescriptor The {@link ContinuousTestDescriptor} used for test run. + * @param testDescriptor The {@link TestDescriptor} used for test run. * @param failCheck The {@link FailCheck} condition which is evaluated for every row * produced by source. * @param stream The {@link DataStream} under test. @@ -280,7 +306,7 @@ public static List testBoundedStream( */ public static List> testContinuousStream( FailoverType failoverType, - ContinuousTestDescriptor testDescriptor, + TestDescriptor testDescriptor, FailCheck failCheck, DataStream stream, MiniClusterWithClientResource miniClusterResource) throws Exception { @@ -323,7 +349,7 @@ public static List> testContinuousStream( } public static Future> startInitialResultsFetcherThread( - ContinuousTestDescriptor testDescriptor, + TestDescriptor testDescriptor, ClientAndIterator client, ExecutorService threadExecutor) { @@ -333,7 +359,7 @@ public static Future> startInitialResultsFetcherThread( } public static Future> startTableUpdaterThread( - ContinuousTestDescriptor testDescriptor, + TestDescriptor testDescriptor, DeltaTableUpdater tableUpdater, ClientAndIterator client, ExecutorService threadExecutor) { @@ -346,7 +372,7 @@ public static Future> startTableUpdaterThread( tableUpdater.writeToTable(descriptor); List records = DataStreamUtils.collectRecordsFromUnboundedStream(client, descriptor.getNumberOfNewRows()); - System.out.println("Stream update result size: " + records.size()); + LOG.info("Stream update result size: " + records.size()); results.addAll(records); }); return results; @@ -354,21 +380,21 @@ public static Future> startTableUpdaterThread( } /** - * Creates a {@link ContinuousTestDescriptor} for tests. The descriptor created by this method + * Creates a {@link TestDescriptor} for tests. The descriptor created by this method * describes a scenario where Delta table will be updated * {@link TableUpdateDescriptor#getNumberOfNewVersions()} * times, where every update/version will contain * {@link TableUpdateDescriptor#getNumberOfRecordsPerNewVersion()} * new unique rows. */ - public static ContinuousTestDescriptor prepareTableUpdates( + public static TestDescriptor prepareTableUpdates( String tablePath, RowType rowType, int initialDataSize, TableUpdateDescriptor tableUpdateDescriptor) { - ContinuousTestDescriptor testDescriptor = - new ContinuousTestDescriptor(tablePath, initialDataSize); + TestDescriptor testDescriptor = + new TestDescriptor(tablePath, initialDataSize); for (int i = 0; i < tableUpdateDescriptor.getNumberOfNewVersions(); i++) { List newRows = new ArrayList<>(); diff --git a/flink/src/test/java/io/delta/flink/utils/ExecutionITCaseTestConstants.java b/flink/src/test/java/io/delta/flink/utils/ExecutionITCaseTestConstants.java index 2545918389e..2103e7943b0 100644 --- a/flink/src/test/java/io/delta/flink/utils/ExecutionITCaseTestConstants.java +++ b/flink/src/test/java/io/delta/flink/utils/ExecutionITCaseTestConstants.java @@ -6,9 +6,16 @@ import java.util.stream.Stream; import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; import org.apache.flink.table.types.logical.VarCharType; public final class ExecutionITCaseTestConstants { @@ -29,6 +36,18 @@ private ExecutionITCaseTestConstants() { public static final Set AGE_COLUMN_VALUES = Stream.of(1, 2).collect(Collectors.toSet()); + public static final String[] ALL_DATA_TABLE_COLUMN_NAMES = { + "col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10" + }; + + public static final LogicalType[] ALL_DATA_TABLE_COLUMN_TYPES = { + new TinyIntType(), new SmallIntType(), new IntType(), new DoubleType(), new FloatType(), + new DecimalType(), new DecimalType(), new TimestampType(), new VarCharType(), + new BooleanType() + }; + + public static final int ALL_DATA_TABLE_RECORD_COUNT = 5; + /** * Columns that are not used as a partition columns. */ diff --git a/flink/src/test/java/io/delta/flink/utils/ContinuousTestDescriptor.java b/flink/src/test/java/io/delta/flink/utils/TestDescriptor.java similarity index 95% rename from flink/src/test/java/io/delta/flink/utils/ContinuousTestDescriptor.java rename to flink/src/test/java/io/delta/flink/utils/TestDescriptor.java index 5c50e6f61e1..5ff7ee4dfcb 100644 --- a/flink/src/test/java/io/delta/flink/utils/ContinuousTestDescriptor.java +++ b/flink/src/test/java/io/delta/flink/utils/TestDescriptor.java @@ -11,7 +11,7 @@ * This class describes a Delta table update scenario for IT case test. Information from this class * is used by updater thread that updates Delta table with new rows during test run. */ -public class ContinuousTestDescriptor { +public class TestDescriptor { /** * Path to Delta table @@ -29,7 +29,7 @@ public class ContinuousTestDescriptor { */ private final List updateDescriptors = new ArrayList<>(); - public ContinuousTestDescriptor(String tablePath, int initialDataSize) { + public TestDescriptor(String tablePath, int initialDataSize) { this.tablePath = tablePath; this.initialDataSize = initialDataSize; } diff --git a/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/README.md b/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/README.md new file mode 100644 index 00000000000..b698b1616c7 --- /dev/null +++ b/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/README.md @@ -0,0 +1,31 @@ +# test-non-partitioned-delta-table-4-versions table info +This table contains 75 rows with 3 columns for each row. This table has no partition columns. +This table has four Delta Snapshot versions. + +Table schea: + +| Column name | Column Type | +|-------------|:-----------:| +| col1 | long | +| col2 | long | +| col3 | string | + +This table was generated using scala/spark code: +``` +spark.range(0, 5) + .map(x => (x, x % 5, s"test-${x % 2}")) + .toDF("col1", "col2", "col3") + .write + .mode("append") + .format("delta") + .save(table) +``` +This code was executed 4 times, adding new version to Delta table. +Each time spark.range(a, b) had different values, resulting with different number of rows per version + +| Version number | Number of rows for version | col1 min value | col1 max value | +|----------------|:--------------------------:|:--------------:|:--------------:| +| 0 | 5 | 0 | 4 | +| 1 | 10 | 5 | 14 | +| 2 | 20 | 15 | 34 | +| 3 | 40 | 35 | 74 | diff --git a/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/_delta_log/00000000000000000000.json b/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/_delta_log/00000000000000000000.json new file mode 100644 index 00000000000..ca4a1ef028e --- /dev/null +++ b/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"commitInfo":{"timestamp":1655298619847,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"977","numOutputRows":"5"}}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"e4b9eeda-29e8-4c83-b6d6-177b37b3d28f","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"col1\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col2\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col3\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1655298618059}} +{"add":{"path":"part-00000-a3dfa929-86a1-4e4f-837a-120ae9cbcfe4-c000.snappy.parquet","partitionValues":{},"size":977,"modificationTime":1655298619775,"dataChange":true}} diff --git a/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/_delta_log/00000000000000000001.json b/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/_delta_log/00000000000000000001.json new file mode 100644 index 00000000000..922519d30db --- /dev/null +++ b/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/_delta_log/00000000000000000001.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1655298655683,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"1031","numOutputRows":"10"}}} +{"add":{"path":"part-00000-bf5cbfdd-6dae-4940-954d-f5dbe254bf6c-c000.snappy.parquet","partitionValues":{},"size":1031,"modificationTime":1655298655655,"dataChange":true}} diff --git a/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/_delta_log/00000000000000000002.json b/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/_delta_log/00000000000000000002.json new file mode 100644 index 00000000000..b14bd292e7f --- /dev/null +++ b/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/_delta_log/00000000000000000002.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1655298690839,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":1,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"1077","numOutputRows":"20"}}} +{"add":{"path":"part-00000-96103e04-7578-420b-82ed-57a76b394927-c000.snappy.parquet","partitionValues":{},"size":1077,"modificationTime":1655298690811,"dataChange":true}} diff --git a/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/_delta_log/00000000000000000003.json b/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/_delta_log/00000000000000000003.json new file mode 100644 index 00000000000..284da6c66ab --- /dev/null +++ b/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/_delta_log/00000000000000000003.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1655298772372,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":2,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"1165","numOutputRows":"40"}}} +{"add":{"path":"part-00000-c8a6b958-41e5-4aab-996a-ca44505a279f-c000.snappy.parquet","partitionValues":{},"size":1165,"modificationTime":1655298772351,"dataChange":true}} diff --git a/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/part-00000-96103e04-7578-420b-82ed-57a76b394927-c000.snappy.parquet b/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/part-00000-96103e04-7578-420b-82ed-57a76b394927-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..de69ad324266d3e45435f530490e111ae256086a GIT binary patch literal 1077 zcmbtU&5F}d7`;hjn;Bci>fD%&QEOyM1~MUOn9QVPp&%mC#o$&%k|rIC>CYsoBBfjL zIcDLaFW_Fph0I1D#f586n$!v%5i#U&Z|*tY`F<|s`2G`<5_-u7NOPR)Y>IPP$gM%o4iap>7Yz_Dc4h0 zP=zlg;S<~B-G8_{Nk-{}W`#A++1!-pa?{qr^erp=V8Fq3ePDkWqoifSf-9`E1?C?) znO0Hp(%f2@pE1Im0q0&YuyqsRds$G4q>17vKGD6ghq>vC5$dnPIO&gu;w4Krbc2X{ zYl`GF(lPDRNto*K=(RowQ!nsRPyEJAmGsRxRN6|C#*=QUXiBdiMnR&qpH7vbH^`{$ zQa}{KXf!;**KiU=o*#inV5liePo+Jb&9vWRTmKPjtzuDN|L>Wf&4~E8=yiV(H;Umj zOx-kev!VKveiSskBPZw`^&H1)`F7Z|ZOaM_+pwMR!00;up>t^Xp&yugk}DIEzvKfh I^B6yjpE&Qp-v9sr literal 0 HcmV?d00001 diff --git a/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/part-00000-a3dfa929-86a1-4e4f-837a-120ae9cbcfe4-c000.snappy.parquet b/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/part-00000-a3dfa929-86a1-4e4f-837a-120ae9cbcfe4-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..d1a39563117179aaca299775034d18eb927ab9e9 GIT binary patch literal 977 zcmcIjL5tHs6n@jh5JUGOPMAQz5ZKa%?2t5Uc2jckwnz_)R~4Bw)3um3-6U0{^dIbR z@aWm!;IT&!{vKbNth?fZs2D=#W#0R~_q}hJi>J>63iKsC_o#t^Ms=I%Dzp#Ny8uiW z-m{BY&U*M&p1=VCa-b8FkVNTFH#Gmhfl7zen^2Wv%f}N9``5CnkNqD}Gvp&`HbbiW zAtM-S`?^58Zc0qu9Y;UR!& zShkf6bepQyi3YmW*3+J8nBR!`g$(rM4-B?!6b`hJHQ14#Nk?}`cjRwbuN9m2QL5TW z4fm`ciTQ~Pu9f5-pDeF28a1J?X2GF%s%e1bF;r!&V1 zc;NHE8K&!bo;l(y&Sqz`I1Wc?Bxg|+hM6DvQ7ljVX`G(Mr+zBaEI6_f8=!sRGoHf* GUgq!Iso(`!$pbg3cbO2fe z2vRwu)+#Cj#f8Q++Rh#FmBTH2K$!~I=E4$T#zmgM5JU< znw{FWnT7pi%69!+=!hkUm8*qCjR&PJ5k7O2QwL9*=`u*(yiP}#(JY`MHag-MKU&20 zF6{zsR=ds-fFi4!>K{uQ<<%ZYqFVAPr^w1Rr~D+xlK&6Kp=#(HXzuIyh7_Wb$d{ts zLi9r+dP7ike^`GQp=#u_LRFN`7Zm?d(`Zxe-HJPf_$Q)V6U6*G=Cv=Pd=nHIRD-ca za$yF`0Ow{FBQ)PeNjhE3nc$d~X#qRgW29HHiPOGXMVXl_-kGx~3&uefueM) zyZ=$^>}t{A^zWTtZGb(izV&pLw5#E?ZPT_)yWI&_(|Ftt&b{&EeByadH*}+k>pITZ haxK@3de+Db&%84$jKZ;fqWT&@dBayc%pv{)egO*9x^Msh literal 0 HcmV?d00001 diff --git a/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/part-00000-c8a6b958-41e5-4aab-996a-ca44505a279f-c000.snappy.parquet b/flink/src/test/resources/test-data/test-non-partitioned-delta-table-4-versions/part-00000-c8a6b958-41e5-4aab-996a-ca44505a279f-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..77e579519afcfaca195a4e04b3cf67a0e077d268 GIT binary patch literal 1165 zcmbu9KX21O7{M#Dw?^JdT|fDMc0H0#vXBG;kH@ zzyMWXf@M$x7FYqRzy@`24LIOBxB+g0TVM^`26w=iWf9Ow|TD)SYW&ksJA8v}7EObSykiFB~syk#aW45tCj=Q9(}@U}itWjj~c> zGav4H!>3Qe){fuvXqHfJkrlbtATW-HRYLRWthq>t;7YROlz5qPQjPFgDaog-;0mv@ z!uL#xcmCmVg#ujSN{9)J~^u-xd(>0xlm8pzy zFVNuj-pG%%;o!N}^CPeAMV|PL*;k?$yS`Fa!e}^ZMT)9)x_;0OmHMN7rSJ9PP<$x} zq=~_xzm2#4Cr!u6OwB18 znxSh(vD_SWgLctdwcDN5j%}OOrsa1m%QV}%rCYXN(_40P#a_{yezR> ( + x.toByte, x.toShort, x.toInt, x.toDouble, x.toFloat, BigInt(x), BigDecimal(x), Timestamp.valueOf(java.time.LocalDateTime.now), x.toString, true) + ) +.toDF("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10") +.write +.mode("append") +.format("delta") +.save(table) +``` diff --git a/flink/src/test/resources/test-data/test-non-partitioned-delta-table-alltypes/_delta_log/00000000000000000000.json b/flink/src/test/resources/test-data/test-non-partitioned-delta-table-alltypes/_delta_log/00000000000000000000.json new file mode 100644 index 00000000000..b3949b3b84f --- /dev/null +++ b/flink/src/test/resources/test-data/test-non-partitioned-delta-table-alltypes/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"commitInfo":{"timestamp":1655232870674,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"2573","numOutputRows":"5"}}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"447f54c2-6f7c-4d7e-8ddf-0b6a51fe03b6","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"col1\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col2\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col4\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col5\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col6\",\"type\":\"decimal(38,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col7\",\"type\":\"decimal(38,18)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col8\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col9\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col10\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1655232868484}} +{"add":{"path":"part-00000-64ff527a-c93c-448d-ba15-088a95699f01-c000.snappy.parquet","partitionValues":{},"size":2573,"modificationTime":1655232870594,"dataChange":true}} diff --git a/flink/src/test/resources/test-data/test-non-partitioned-delta-table-alltypes/part-00000-64ff527a-c93c-448d-ba15-088a95699f01-c000.snappy.parquet b/flink/src/test/resources/test-data/test-non-partitioned-delta-table-alltypes/part-00000-64ff527a-c93c-448d-ba15-088a95699f01-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..0216532ad459668c3e31000973691c6aaa2d4a50 GIT binary patch literal 2573 zcmd5;Pl)4G7=JHKJDF~0Ez%c~F*>D8LkF@UY5I4lZoP>tGKhHa51KaVY%y(S+jL|Z z1_p&?PlI@KK@cxqdQ&_Mvf$0rp2UNA@F47-L_EkI{a%`qmvNY#-VAB-zW05<-}}BV z-w(R?>YFA(q()jA$-*K-1V!R`gpfoZNJJIJf{jgwgJ)`sIx+l03*I#Pe@XNn=`y7W zkS1LDt`x1p5--?=AYF<|tzyLgb#vo6)Lj?0g)Swa6k)kFh8%iu(vABTTM)jvBunGk z$xYbk!VSL@N;Ai7k_}YJLjZg6()k_geS67y2wwv4ZXsOPVU^N1P!8Pug89QZLzcPu z$*+&T{0zSHrOfYORM`0a`)|K-6NxCd2uGj? zIqm+V8=pRY86o?AnGQvs6lf0oV!CpyS4vngZ3#Q+g^~m%`Xni!edS!!a81Gn1Z`Co zL3BI~obH%(0pAcIi#}=qy-uEwyCPu>H=G^5A!04+>lWeUA70B%5A4ZDA58nV-H}ZS zh;U_2;`_d5mS2jEYPlF279!awsD_9TE$j5$B3XbNWh%E@>{b)Dw&_+T0<;jxMay=X z$_tD%J=j2ER#xP>DkzX*J%a>!krCm&x6uvHFyU++wA| z6fhQIS?*;Na-Csmge+fi;&)iRm+7$Dg0|A@BGs~tXKP8u=92Mete&kU7?(o3K&>p} z>AFHPa-Gja1JMc@f5YM*SUp|uB=wZEo}oQi1O4(aF)%jt1%vZjx#RG#I*` zGcE1DdsG_RBkH1h14k6tyBoM*jc1-`_dM_e(A7#y&eHDD@v+8Yt1)(Z+n>-gt`4?h z@NgWs`|e~VcrAuI{*0!(%Hj1GJ@9;cwagna*m3*Ak?mEimS*g%ltJ@C60@}$ODkpx zh9h?x*rS7$(rCv7Fek(D{z_=mh~Yio_gs7YEd2Y&NP0aQ*&CxtF`DtBshfsw7Aw8k z&~u7*tL+S0gLd1h_8RV>(XcGXXc&#QTQ~acUbEdadT!4#cjUQ(&?Pr|oIz+0{s{gH DLD<+; literal 0 HcmV?d00001