Skip to content

Commit

Permalink
[HUDI-5007] Prevent Hudi from reading the entire timeline's when perf…
Browse files Browse the repository at this point in the history
…orming a LATEST streaming read (apache#6920)
  • Loading branch information
voonhous authored and fengjian committed Apr 5, 2023
1 parent 41c9411 commit d6140b4
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ public static boolean isSpecificStartCommit(Configuration conf) {
&& !conf.get(FlinkOptions.READ_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST);
}

/**
* Returns true if there are no explicit start and end commits.
*/
public static boolean hasNoSpecificReadCommits(Configuration conf) {
return !conf.contains(FlinkOptions.READ_START_COMMIT) && !conf.contains(FlinkOptions.READ_END_COMMIT);
}

/**
* Returns the supplemental logging mode.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.source;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
Expand Down Expand Up @@ -474,7 +475,8 @@ private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient metaClient,
* @param issuedInstant The last issued instant that has already been delivered to downstream
* @return the filtered hoodie instants
*/
private List<HoodieInstant> filterInstantsWithRange(
@VisibleForTesting
public List<HoodieInstant> filterInstantsWithRange(
HoodieTimeline commitTimeline,
final String issuedInstant) {
HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants();
Expand All @@ -488,6 +490,15 @@ private List<HoodieInstant> filterInstantsWithRange(

Stream<HoodieInstant> instantStream = completedTimeline.getInstantsAsStream();

if (OptionsResolver.hasNoSpecificReadCommits(this.conf)) {
// by default read from the latest commit
List<HoodieInstant> instants = completedTimeline.getInstants().collect(Collectors.toList());
if (instants.size() > 1) {
return Collections.singletonList(instants.get(instants.size() - 1));
}
return instants;
}

if (OptionsResolver.isSpecificStartCommit(this.conf)) {
final String startCommit = this.conf.get(FlinkOptions.READ_START_COMMIT);
instantStream = instantStream
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.source;

import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.utils.TestConfigurations;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;

/**
* Test cases for {@link IncrementalInputSplits}.
*/
public class TestIncrementalInputSplits extends HoodieCommonTestHarness {

@BeforeEach
private void init() throws IOException {
initPath();
initMetaClient();
}

@Test
void testFilterInstantsWithRange() {
HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient, true);
Configuration conf = TestConfigurations.getDefaultConf(basePath);
IncrementalInputSplits iis = IncrementalInputSplits.builder()
.conf(conf)
.path(new Path(basePath))
.rowType(TestConfigurations.ROW_TYPE)
.build();

HoodieInstant commit1 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "1");
HoodieInstant commit2 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "2");
HoodieInstant commit3 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "3");
timeline.createNewInstant(commit1);
timeline.createNewInstant(commit2);
timeline.createNewInstant(commit3);
timeline = timeline.reload();

// previous read iteration read till instant time "1", next read iteration should return ["2", "3"]
List<HoodieInstant> instantRange2 = iis.filterInstantsWithRange(timeline, "1");
assertEquals(2, instantRange2.size());
assertIterableEquals(Arrays.asList(commit2, commit3), instantRange2);

// simulate first iteration cycle with read from LATEST commit
List<HoodieInstant> instantRange1 = iis.filterInstantsWithRange(timeline, null);
assertEquals(1, instantRange1.size());
assertIterableEquals(Collections.singletonList(commit3), instantRange1);

// specifying a start and end commit
conf.set(FlinkOptions.READ_START_COMMIT, "1");
conf.set(FlinkOptions.READ_END_COMMIT, "3");
List<HoodieInstant> instantRange3 = iis.filterInstantsWithRange(timeline, null);
assertEquals(3, instantRange3.size());
assertIterableEquals(Arrays.asList(commit1, commit2, commit3), instantRange3);
}

}

0 comments on commit d6140b4

Please sign in to comment.