diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index 76d775557f7c..fae9162f3920 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -196,6 +196,13 @@ public static boolean isSpecificStartCommit(Configuration conf) { && !conf.get(FlinkOptions.READ_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST); } + /** + * Returns true if there are no explicit start and end commits. + */ + public static boolean hasNoSpecificReadCommits(Configuration conf) { + return !conf.contains(FlinkOptions.READ_START_COMMIT) && !conf.contains(FlinkOptions.READ_END_COMMIT); + } + /** * Returns the supplemental logging mode. */ diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index bb826ce79b6c..9855feb1da87 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -18,6 +18,7 @@ package org.apache.hudi.source; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.FileSlice; @@ -474,7 +475,8 @@ private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient metaClient, * @param issuedInstant The last issued instant that has already been delivered to downstream * @return the filtered hoodie instants */ - private List filterInstantsWithRange( + @VisibleForTesting + public List filterInstantsWithRange( HoodieTimeline commitTimeline, final String issuedInstant) { HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants(); @@ -488,6 +490,15 @@ private List filterInstantsWithRange( Stream instantStream = completedTimeline.getInstants(); + if (OptionsResolver.hasNoSpecificReadCommits(this.conf)) { + // by default read from the latest commit + List instants = completedTimeline.getInstants().collect(Collectors.toList()); + if (instants.size() > 1) { + return Collections.singletonList(instants.get(instants.size() - 1)); + } + return instants; + } + if (OptionsResolver.isSpecificStartCommit(this.conf)) { final String startCommit = this.conf.get(FlinkOptions.READ_START_COMMIT); instantStream = instantStream diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java new file mode 100644 index 000000000000..b42fd2c04a3c --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source; + +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.utils.TestConfigurations; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertIterableEquals; + +/** + * Test cases for {@link IncrementalInputSplits}. + */ +public class TestIncrementalInputSplits extends HoodieCommonTestHarness { + + @BeforeEach + private void init() throws IOException { + initPath(); + initMetaClient(); + } + + @Test + void testFilterInstantsWithRange() { + HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient, true); + Configuration conf = TestConfigurations.getDefaultConf(basePath); + IncrementalInputSplits iis = IncrementalInputSplits.builder() + .conf(conf) + .path(new Path(basePath)) + .rowType(TestConfigurations.ROW_TYPE) + .build(); + + HoodieInstant commit1 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "1"); + HoodieInstant commit2 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "2"); + HoodieInstant commit3 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "3"); + timeline.createNewInstant(commit1); + timeline.createNewInstant(commit2); + timeline.createNewInstant(commit3); + timeline = timeline.reload(); + + // previous read iteration read till instant time "1", next read iteration should return ["2", "3"] + List instantRange2 = iis.filterInstantsWithRange(timeline, "1"); + assertEquals(2, instantRange2.size()); + assertIterableEquals(Arrays.asList(commit2, commit3), instantRange2); + + // simulate first iteration cycle with read from LATEST commit + List instantRange1 = iis.filterInstantsWithRange(timeline, null); + assertEquals(1, instantRange1.size()); + assertIterableEquals(Collections.singletonList(commit3), instantRange1); + + // specifying a start and end commit + conf.set(FlinkOptions.READ_START_COMMIT, "1"); + conf.set(FlinkOptions.READ_END_COMMIT, "3"); + List instantRange3 = iis.filterInstantsWithRange(timeline, null); + assertEquals(3, instantRange3.size()); + assertIterableEquals(Arrays.asList(commit1, commit2, commit3), instantRange3); + } + +}