Skip to content

Commit

Permalink
[HUDI-5176] Fix incremental source to consider inflight commits befor…
Browse files Browse the repository at this point in the history
…e completed commits (apache#7160)

When multiple writers are writing to the same Hudi table under optimistic concurrency control, one writer may start later and finish the write transaction earlier than the other concurrent writer. In this case, the Hudi timeline contains an inflight commit before a completed commit. As a concrete example, writer 1 starts a commit at t1 and later writer 2 starts another commit at t2 (t2 > t1). Commit t2 finishes earlier than t1.

                                       t3        t4
---------------------------------------------------------> t
 instant t1 |------------------------------| (writer 1)
 instant t2         |--------------|         (writer 2)
When Hudi incremental source is used on such a Hudi table, the logic to determine the next fetch (IncrSourceHelper.calculateBeginAndEndInstants()) is susceptible to data loss, missing the inflight instants. The method filters for the completed instants only and uses that to determine the instants to fetch, thus the checkpoint. For the same example, when incrementally fetching the data at t3, the checkpoint advances to t2 and ignores t1 indefinitely.

To fix the problem, this PR adds the logic to only look at contiguous completed commits / deltacommits / non-clustering replacecommits for incremental pulls, e.g., completed instants before t1 in the example, so that the checkpoint never goes over the inflight commits which can cause data loss. To implement this, we figure out the earliest incomplete commit, deltacommit, or non-clustering replacecommit to bound the incremental pulls.

This PR fixes the HoodieIncrSource, S3EventsHoodieIncrSource, and GcsEventsHoodieIncrSource for the same issue.
  • Loading branch information
yihua authored and Alexey Kudinkin committed Dec 14, 2022
1 parent 99c5a27 commit 4d9f20e
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private static Option<String> getMetadataValue(HoodieTableMetaClient metaClient,
}
}

private static boolean isClusteringCommit(HoodieTableMetaClient metaClient, HoodieInstant instant) {
public static boolean isClusteringCommit(HoodieTableMetaClient metaClient, HoodieInstant instant) {
try {
if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
// replacecommit is used for multiple operations: insert_overwrite/cluster etc.
Expand All @@ -194,7 +194,7 @@ private static boolean isClusteringCommit(HoodieTableMetaClient metaClient, Hood
metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
return WriteOperationType.CLUSTER.equals(replaceMetadata.getOperationType());
}

return false;
} catch (IOException e) {
throw new HoodieIOException("Unable to read instant information: " + instant + " for " + metaClient.getBasePath(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
.load(srcPath)
// add filtering so that only interested records are returned.
.filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
queryTypeAndInstantEndpts.getRight().getLeft()));
queryTypeAndInstantEndpts.getRight().getLeft()))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
queryTypeAndInstantEndpts.getRight().getRight()));
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
.option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(srcPath)
// add filtering so that only interested records are returned.
.filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
queryTypeAndInstantEndpts.getRight().getLeft()));
queryTypeAndInstantEndpts.getRight().getLeft()))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
queryTypeAndInstantEndpts.getRight().getRight()));
}

if (source.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Row;

Expand Down Expand Up @@ -72,8 +72,22 @@ public static Pair<String, Pair<String, String>> calculateBeginAndEndInstants(Ja
"Make sure the config hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive value");
HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build();

final HoodieTimeline activeCommitTimeline =
// Find the earliest incomplete commit, deltacommit, or non-clustering replacecommit,
// so that the incremental pulls should be strictly before this instant.
// This is to guard around multi-writer scenarios where a commit starting later than
// another commit from a concurrent writer can finish earlier, leaving an inflight commit
// before a completed commit.
final Option<HoodieInstant> firstIncompleteCommit = srcMetaClient.getCommitsTimeline()
.filterInflightsAndRequested()
.filter(instant ->
!HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())
|| !ClusteringUtils.getClusteringPlan(srcMetaClient, instant).isPresent())
.firstInstant();
final HoodieTimeline completedCommitTimeline =
srcMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
final HoodieTimeline activeCommitTimeline = firstIncompleteCommit.map(
commit -> completedCommitTimeline.findInstantsBefore(commit.getTimestamp())
).orElse(completedCommitTimeline);

String beginInstantTime = beginInstant.orElseGet(() -> {
if (missingCheckpointStrategy != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
Expand All @@ -46,20 +50,23 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.EnumSource;

import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.stream.Stream;

import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
import static org.apache.hudi.common.model.WriteOperationType.BULK_INSERT;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness {

Expand All @@ -83,12 +90,8 @@ public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, Strin
return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props);
}

private static Stream<Arguments> tableTypeParams() {
return Arrays.stream(new HoodieTableType[][] {{HoodieTableType.COPY_ON_WRITE}, {HoodieTableType.MERGE_ON_READ}}).map(Arguments::of);
}

@ParameterizedTest
@MethodSource("tableTypeParams")
@EnumSource(HoodieTableType.class)
public void testHoodieIncrSource(HoodieTableType tableType) throws IOException {
this.tableType = tableType;
metaClient = getHoodieMetaClient(hadoopConf(), basePath());
Expand All @@ -101,11 +104,11 @@ public void testHoodieIncrSource(HoodieTableType tableType) throws IOException {
.build();

SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
Pair<String, List<HoodieRecord>> inserts = writeRecords(writeClient, true, null, "100");
Pair<String, List<HoodieRecord>> inserts2 = writeRecords(writeClient, true, null, "200");
Pair<String, List<HoodieRecord>> inserts3 = writeRecords(writeClient, true, null, "300");
Pair<String, List<HoodieRecord>> inserts4 = writeRecords(writeClient, true, null, "400");
Pair<String, List<HoodieRecord>> inserts5 = writeRecords(writeClient, true, null, "500");
Pair<String, List<HoodieRecord>> inserts = writeRecords(writeClient, INSERT, null, "100");
Pair<String, List<HoodieRecord>> inserts2 = writeRecords(writeClient, INSERT, null, "200");
Pair<String, List<HoodieRecord>> inserts3 = writeRecords(writeClient, INSERT, null, "300");
Pair<String, List<HoodieRecord>> inserts4 = writeRecords(writeClient, INSERT, null, "400");
Pair<String, List<HoodieRecord>> inserts5 = writeRecords(writeClient, INSERT, null, "500");

// read everything upto latest
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.empty(), 500, inserts5.getKey());
Expand All @@ -122,13 +125,200 @@ public void testHoodieIncrSource(HoodieTableType tableType) throws IOException {
// ensure checkpoint does not move
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(inserts5.getKey()), 0, inserts5.getKey());

Pair<String, List<HoodieRecord>> inserts6 = writeRecords(writeClient, true, null, "600");
Pair<String, List<HoodieRecord>> inserts6 = writeRecords(writeClient, INSERT, null, "600");

// insert new batch and ensure the checkpoint moves
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(inserts5.getKey()), 100, inserts6.getKey());
writeClient.close();
}

@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testHoodieIncrSourceInflightCommitBeforeCompletedCommit(HoodieTableType tableType) throws IOException {
this.tableType = tableType;
metaClient = getHoodieMetaClient(hadoopConf(), basePath());
HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(3, 4).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(2).build())
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.withInlineCompaction(true)
.withMaxNumDeltaCommitsBeforeCompaction(3)
.build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.build();

SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
List<Pair<String, List<HoodieRecord>>> inserts = new ArrayList<>();

for (int i = 0; i < 6; i++) {
inserts.add(writeRecords(writeClient, INSERT, null, HoodieActiveTimeline.createNewInstantTime()));
}

// Emulates a scenario where an inflight commit is before a completed commit
// The checkpoint should not go past this commit
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
HoodieInstant instant4 = activeTimeline
.filter(instant -> instant.getTimestamp().equals(inserts.get(4).getKey())).firstInstant().get();
Option<byte[]> instant4CommitData = activeTimeline.getInstantDetails(instant4);
activeTimeline.revertToInflight(instant4);
metaClient.reloadActiveTimeline();

// Reads everything up to latest
readAndAssert(
IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
Option.empty(),
400,
inserts.get(3).getKey());

// Even if the beginning timestamp is archived, full table scan should kick in, but should filter for records having commit time > first instant time
readAndAssert(
IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
Option.of(inserts.get(0).getKey()),
300,
inserts.get(3).getKey());

// Even if the read upto latest is set, if begin timestamp is in active timeline, only incremental should kick in.
readAndAssert(
IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
Option.of(inserts.get(2).getKey()),
100,
inserts.get(3).getKey());

// Reads just the latest
readAndAssert(
IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST,
Option.empty(),
100,
inserts.get(3).getKey());

// Ensures checkpoint does not move
readAndAssert(
IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST,
Option.of(inserts.get(3).getKey()),
0,
inserts.get(3).getKey());

activeTimeline.reload().saveAsComplete(
new HoodieInstant(HoodieInstant.State.INFLIGHT, instant4.getAction(), inserts.get(4).getKey()),
instant4CommitData);

// After the inflight commit completes, the checkpoint should move on after incremental pull
readAndAssert(
IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST,
Option.of(inserts.get(3).getKey()),
200,
inserts.get(5).getKey());

writeClient.close();
}

@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testHoodieIncrSourceWithPendingTableServices(HoodieTableType tableType) throws IOException {
this.tableType = tableType;
metaClient = getHoodieMetaClient(hadoopConf(), basePath());
HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10, 12).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build())
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.withScheduleInlineCompaction(true)
.withMaxNumDeltaCommitsBeforeCompaction(1)
.build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.build();

SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
List<Pair<String, List<HoodieRecord>>> dataBatches = new ArrayList<>();

// For COW:
// 0: bulk_insert of 100 records
// 1: bulk_insert of 100 records
// 2: bulk_insert of 100 records
// schedule clustering
// 3: bulk_insert of 100 records
// 4: upsert of 100 records (updates only based on round 3)
// 5: upsert of 100 records (updates only based on round 3)
// 6: bulk_insert of 100 records
// For MOR:
// 0: bulk_insert of 100 records
// 1: bulk_insert of 100 records
// 2: bulk_insert of 100 records
// 3: bulk_insert of 100 records
// 4: upsert of 100 records (updates only based on round 3)
// schedule compaction
// 5: upsert of 100 records (updates only based on round 3)
// schedule clustering
// 6: bulk_insert of 100 records
for (int i = 0; i < 6; i++) {
WriteOperationType opType = i < 4 ? BULK_INSERT : UPSERT;
List<HoodieRecord> recordsForUpdate = i < 4 ? null : dataBatches.get(3).getRight();
dataBatches.add(writeRecords(writeClient, opType, recordsForUpdate, HoodieActiveTimeline.createNewInstantTime()));
if (tableType == COPY_ON_WRITE) {
if (i == 2) {
writeClient.scheduleClustering(Option.empty());
}
} else if (tableType == MERGE_ON_READ) {
if (i == 4) {
writeClient.scheduleCompaction(Option.empty());
}
if (i == 5) {
writeClient.scheduleClustering(Option.empty());
}
}
}
dataBatches.add(writeRecords(writeClient, BULK_INSERT, null, HoodieActiveTimeline.createNewInstantTime()));

String latestCommitTimestamp = dataBatches.get(dataBatches.size() - 1).getKey();
// Pending clustering exists
Option<HoodieInstant> clusteringInstant =
metaClient.getActiveTimeline().filterPendingReplaceTimeline()
.filter(instant -> ClusteringUtils.getClusteringPlan(metaClient, instant).isPresent())
.firstInstant();
assertTrue(clusteringInstant.isPresent());
assertTrue(clusteringInstant.get().getTimestamp().compareTo(latestCommitTimestamp) < 0);

if (tableType == MERGE_ON_READ) {
// Pending compaction exists
Option<HoodieInstant> compactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
assertTrue(compactionInstant.isPresent());
assertTrue(compactionInstant.get().getTimestamp().compareTo(latestCommitTimestamp) < 0);
}

// The pending tables services should not block the incremental pulls
// Reads everything up to latest
readAndAssert(
IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
Option.empty(),
500,
dataBatches.get(6).getKey());

// Even if the read upto latest is set, if begin timestamp is in active timeline, only incremental should kick in.
readAndAssert(
IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
Option.of(dataBatches.get(2).getKey()),
200,
dataBatches.get(6).getKey());

// Reads just the latest
readAndAssert(
IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST,
Option.empty(),
100,
dataBatches.get(6).getKey());

// Ensures checkpoint does not move
readAndAssert(
IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST,
Option.of(dataBatches.get(6).getKey()),
0,
dataBatches.get(6).getKey());

writeClient.close();
}

private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, int expectedCount, String expectedCheckpoint) {

Properties properties = new Properties();
Expand All @@ -148,10 +338,17 @@ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingChe
Assertions.assertEquals(expectedCheckpoint, batchCheckPoint.getRight());
}

private Pair<String, List<HoodieRecord>> writeRecords(SparkRDDWriteClient writeClient, boolean insert, List<HoodieRecord> insertRecords, String commit) throws IOException {
private Pair<String, List<HoodieRecord>> writeRecords(SparkRDDWriteClient writeClient,
WriteOperationType writeOperationType,
List<HoodieRecord> insertRecords,
String commit) throws IOException {
writeClient.startCommitWithTime(commit);
List<HoodieRecord> records = insert ? dataGen.generateInserts(commit, 100) : dataGen.generateUpdates(commit, insertRecords);
JavaRDD<WriteStatus> result = writeClient.upsert(jsc().parallelize(records, 1), commit);
// Only supports INSERT, UPSERT, and BULK_INSERT
List<HoodieRecord> records = writeOperationType == WriteOperationType.UPSERT
? dataGen.generateUpdates(commit, insertRecords) : dataGen.generateInserts(commit, 100);
JavaRDD<WriteStatus> result = writeOperationType == WriteOperationType.BULK_INSERT
? writeClient.bulkInsert(jsc().parallelize(records, 1), commit)
: writeClient.upsert(jsc().parallelize(records, 1), commit);
List<WriteStatus> statuses = result.collect();
assertNoWriteErrors(statuses);
return Pair.of(commit, records);
Expand Down

0 comments on commit 4d9f20e

Please sign in to comment.