From f30e3b73e9c68d5b3bb7c64d1fe4cdf1c137b18c Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Thu, 10 Nov 2022 11:45:01 -0800 Subject: [PATCH] [HUDI-5176] Fix incremental source to consider inflight commits before completed commits (#7160) When multiple writers are writing to the same Hudi table under optimistic concurrency control, one writer may start later and finish the write transaction earlier than the other concurrent writer. In this case, the Hudi timeline contains an inflight commit before a completed commit. As a concrete example, writer 1 starts a commit at t1 and later writer 2 starts another commit at t2 (t2 > t1). Commit t2 finishes earlier than t1. t3 t4 ---------------------------------------------------------> t instant t1 |------------------------------| (writer 1) instant t2 |--------------| (writer 2) When Hudi incremental source is used on such a Hudi table, the logic to determine the next fetch (IncrSourceHelper.calculateBeginAndEndInstants()) is susceptible to data loss, missing the inflight instants. The method filters for the completed instants only and uses that to determine the instants to fetch, thus the checkpoint. For the same example, when incrementally fetching the data at t3, the checkpoint advances to t2 and ignores t1 indefinitely. To fix the problem, this PR adds the logic to only look at contiguous completed commits / deltacommits / non-clustering replacecommits for incremental pulls, e.g., completed instants before t1 in the example, so that the checkpoint never goes over the inflight commits which can cause data loss. To implement this, we figure out the earliest incomplete commit, deltacommit, or non-clustering replacecommit to bound the incremental pulls. This PR fixes the HoodieIncrSource, S3EventsHoodieIncrSource, and GcsEventsHoodieIncrSource for the same issue. --- .../common/table/timeline/TimelineUtils.java | 4 +- .../utilities/sources/HoodieIncrSource.java | 4 +- .../sources/S3EventsHoodieIncrSource.java | 4 +- .../sources/helpers/IncrSourceHelper.java | 18 +- .../sources/TestHoodieIncrSource.java | 233 ++++++++++++++++-- 5 files changed, 239 insertions(+), 24 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index 75493e7b463e1..a070a7e94d1c0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -185,7 +185,7 @@ private static Option getMetadataValue(HoodieTableMetaClient metaClient, } } - private static boolean isClusteringCommit(HoodieTableMetaClient metaClient, HoodieInstant instant) { + public static boolean isClusteringCommit(HoodieTableMetaClient metaClient, HoodieInstant instant) { try { if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) { // replacecommit is used for multiple operations: insert_overwrite/cluster etc. @@ -194,7 +194,7 @@ private static boolean isClusteringCommit(HoodieTableMetaClient metaClient, Hood metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class); return WriteOperationType.CLUSTER.equals(replaceMetadata.getOperationType()); } - + return false; } catch (IOException e) { throw new HoodieIOException("Unable to read instant information: " + instant + " for " + metaClient.getBasePath(), e); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index 35f309c4162f4..5018e4f9ea396 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -148,7 +148,9 @@ public Pair>, String> fetchNextBatch(Option lastCkpt .load(srcPath) // add filtering so that only interested records are returned. .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - queryTypeAndInstantEndpts.getRight().getLeft())); + queryTypeAndInstantEndpts.getRight().getLeft())) + .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + queryTypeAndInstantEndpts.getRight().getRight())); } /* diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index 181374336584d..7b8232f6194a3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -157,7 +157,9 @@ public Pair>, String> fetchNextBatch(Option lastCkpt .option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(srcPath) // add filtering so that only interested records are returned. .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - queryTypeAndInstantEndpts.getRight().getLeft())); + queryTypeAndInstantEndpts.getRight().getLeft())) + .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + queryTypeAndInstantEndpts.getRight().getRight())); } if (source.isEmpty()) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java index d9415d036c312..34a6311d3fefe 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java @@ -22,10 +22,10 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; - import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Row; @@ -72,8 +72,22 @@ public static Pair> calculateBeginAndEndInstants(Ja "Make sure the config hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive value"); HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build(); - final HoodieTimeline activeCommitTimeline = + // Find the earliest incomplete commit, deltacommit, or non-clustering replacecommit, + // so that the incremental pulls should be strictly before this instant. + // This is to guard around multi-writer scenarios where a commit starting later than + // another commit from a concurrent writer can finish earlier, leaving an inflight commit + // before a completed commit. + final Option firstIncompleteCommit = srcMetaClient.getCommitsTimeline() + .filterInflightsAndRequested() + .filter(instant -> + !HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction()) + || !ClusteringUtils.getClusteringPlan(srcMetaClient, instant).isPresent()) + .firstInstant(); + final HoodieTimeline completedCommitTimeline = srcMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); + final HoodieTimeline activeCommitTimeline = firstIncompleteCommit.map( + commit -> completedCommitTimeline.findInstantsBefore(commit.getTimestamp()) + ).orElse(completedCommitTimeline); String beginInstantTime = beginInstant.orElseGet(() -> { if (missingCheckpointStrategy != null) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java index b08438c5a7706..5762bae3cbffe 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java @@ -25,9 +25,13 @@ import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieArchivalConfig; @@ -46,20 +50,23 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.EnumSource; import java.io.IOException; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import java.util.Properties; -import java.util.stream.Stream; import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; +import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ; +import static org.apache.hudi.common.model.WriteOperationType.BULK_INSERT; +import static org.apache.hudi.common.model.WriteOperationType.INSERT; +import static org.apache.hudi.common.model.WriteOperationType.UPSERT; import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness { @@ -83,12 +90,8 @@ public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, Strin return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props); } - private static Stream tableTypeParams() { - return Arrays.stream(new HoodieTableType[][] {{HoodieTableType.COPY_ON_WRITE}, {HoodieTableType.MERGE_ON_READ}}).map(Arguments::of); - } - @ParameterizedTest - @MethodSource("tableTypeParams") + @EnumSource(HoodieTableType.class) public void testHoodieIncrSource(HoodieTableType tableType) throws IOException { this.tableType = tableType; metaClient = getHoodieMetaClient(hadoopConf(), basePath()); @@ -101,11 +104,11 @@ public void testHoodieIncrSource(HoodieTableType tableType) throws IOException { .build(); SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig); - Pair> inserts = writeRecords(writeClient, true, null, "100"); - Pair> inserts2 = writeRecords(writeClient, true, null, "200"); - Pair> inserts3 = writeRecords(writeClient, true, null, "300"); - Pair> inserts4 = writeRecords(writeClient, true, null, "400"); - Pair> inserts5 = writeRecords(writeClient, true, null, "500"); + Pair> inserts = writeRecords(writeClient, INSERT, null, "100"); + Pair> inserts2 = writeRecords(writeClient, INSERT, null, "200"); + Pair> inserts3 = writeRecords(writeClient, INSERT, null, "300"); + Pair> inserts4 = writeRecords(writeClient, INSERT, null, "400"); + Pair> inserts5 = writeRecords(writeClient, INSERT, null, "500"); // read everything upto latest readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.empty(), 500, inserts5.getKey()); @@ -122,13 +125,200 @@ public void testHoodieIncrSource(HoodieTableType tableType) throws IOException { // ensure checkpoint does not move readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(inserts5.getKey()), 0, inserts5.getKey()); - Pair> inserts6 = writeRecords(writeClient, true, null, "600"); + Pair> inserts6 = writeRecords(writeClient, INSERT, null, "600"); // insert new batch and ensure the checkpoint moves readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(inserts5.getKey()), 100, inserts6.getKey()); writeClient.close(); } + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testHoodieIncrSourceInflightCommitBeforeCompletedCommit(HoodieTableType tableType) throws IOException { + this.tableType = tableType; + metaClient = getHoodieMetaClient(hadoopConf(), basePath()); + HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(3, 4).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(2).build()) + .withCompactionConfig( + HoodieCompactionConfig.newBuilder() + .withInlineCompaction(true) + .withMaxNumDeltaCommitsBeforeCompaction(3) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .build(); + + SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig); + List>> inserts = new ArrayList<>(); + + for (int i = 0; i < 6; i++) { + inserts.add(writeRecords(writeClient, INSERT, null, HoodieActiveTimeline.createNewInstantTime())); + } + + // Emulates a scenario where an inflight commit is before a completed commit + // The checkpoint should not go past this commit + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieInstant instant4 = activeTimeline + .filter(instant -> instant.getTimestamp().equals(inserts.get(4).getKey())).firstInstant().get(); + Option instant4CommitData = activeTimeline.getInstantDetails(instant4); + activeTimeline.revertToInflight(instant4); + metaClient.reloadActiveTimeline(); + + // Reads everything up to latest + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, + Option.empty(), + 400, + inserts.get(3).getKey()); + + // Even if the beginning timestamp is archived, full table scan should kick in, but should filter for records having commit time > first instant time + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, + Option.of(inserts.get(0).getKey()), + 300, + inserts.get(3).getKey()); + + // Even if the read upto latest is set, if begin timestamp is in active timeline, only incremental should kick in. + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, + Option.of(inserts.get(2).getKey()), + 100, + inserts.get(3).getKey()); + + // Reads just the latest + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, + Option.empty(), + 100, + inserts.get(3).getKey()); + + // Ensures checkpoint does not move + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, + Option.of(inserts.get(3).getKey()), + 0, + inserts.get(3).getKey()); + + activeTimeline.reload().saveAsComplete( + new HoodieInstant(HoodieInstant.State.INFLIGHT, instant4.getAction(), inserts.get(4).getKey()), + instant4CommitData); + + // After the inflight commit completes, the checkpoint should move on after incremental pull + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, + Option.of(inserts.get(3).getKey()), + 200, + inserts.get(5).getKey()); + + writeClient.close(); + } + + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testHoodieIncrSourceWithPendingTableServices(HoodieTableType tableType) throws IOException { + this.tableType = tableType; + metaClient = getHoodieMetaClient(hadoopConf(), basePath()); + HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10, 12).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build()) + .withCompactionConfig( + HoodieCompactionConfig.newBuilder() + .withScheduleInlineCompaction(true) + .withMaxNumDeltaCommitsBeforeCompaction(1) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .build(); + + SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig); + List>> dataBatches = new ArrayList<>(); + + // For COW: + // 0: bulk_insert of 100 records + // 1: bulk_insert of 100 records + // 2: bulk_insert of 100 records + // schedule clustering + // 3: bulk_insert of 100 records + // 4: upsert of 100 records (updates only based on round 3) + // 5: upsert of 100 records (updates only based on round 3) + // 6: bulk_insert of 100 records + // For MOR: + // 0: bulk_insert of 100 records + // 1: bulk_insert of 100 records + // 2: bulk_insert of 100 records + // 3: bulk_insert of 100 records + // 4: upsert of 100 records (updates only based on round 3) + // schedule compaction + // 5: upsert of 100 records (updates only based on round 3) + // schedule clustering + // 6: bulk_insert of 100 records + for (int i = 0; i < 6; i++) { + WriteOperationType opType = i < 4 ? BULK_INSERT : UPSERT; + List recordsForUpdate = i < 4 ? null : dataBatches.get(3).getRight(); + dataBatches.add(writeRecords(writeClient, opType, recordsForUpdate, HoodieActiveTimeline.createNewInstantTime())); + if (tableType == COPY_ON_WRITE) { + if (i == 2) { + writeClient.scheduleClustering(Option.empty()); + } + } else if (tableType == MERGE_ON_READ) { + if (i == 4) { + writeClient.scheduleCompaction(Option.empty()); + } + if (i == 5) { + writeClient.scheduleClustering(Option.empty()); + } + } + } + dataBatches.add(writeRecords(writeClient, BULK_INSERT, null, HoodieActiveTimeline.createNewInstantTime())); + + String latestCommitTimestamp = dataBatches.get(dataBatches.size() - 1).getKey(); + // Pending clustering exists + Option clusteringInstant = + metaClient.getActiveTimeline().filterPendingReplaceTimeline() + .filter(instant -> ClusteringUtils.getClusteringPlan(metaClient, instant).isPresent()) + .firstInstant(); + assertTrue(clusteringInstant.isPresent()); + assertTrue(clusteringInstant.get().getTimestamp().compareTo(latestCommitTimestamp) < 0); + + if (tableType == MERGE_ON_READ) { + // Pending compaction exists + Option compactionInstant = + metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant(); + assertTrue(compactionInstant.isPresent()); + assertTrue(compactionInstant.get().getTimestamp().compareTo(latestCommitTimestamp) < 0); + } + + // The pending tables services should not block the incremental pulls + // Reads everything up to latest + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, + Option.empty(), + 500, + dataBatches.get(6).getKey()); + + // Even if the read upto latest is set, if begin timestamp is in active timeline, only incremental should kick in. + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, + Option.of(dataBatches.get(2).getKey()), + 200, + dataBatches.get(6).getKey()); + + // Reads just the latest + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, + Option.empty(), + 100, + dataBatches.get(6).getKey()); + + // Ensures checkpoint does not move + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, + Option.of(dataBatches.get(6).getKey()), + 0, + dataBatches.get(6).getKey()); + + writeClient.close(); + } + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option checkpointToPull, int expectedCount, String expectedCheckpoint) { Properties properties = new Properties(); @@ -148,10 +338,17 @@ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingChe Assertions.assertEquals(expectedCheckpoint, batchCheckPoint.getRight()); } - private Pair> writeRecords(SparkRDDWriteClient writeClient, boolean insert, List insertRecords, String commit) throws IOException { + private Pair> writeRecords(SparkRDDWriteClient writeClient, + WriteOperationType writeOperationType, + List insertRecords, + String commit) throws IOException { writeClient.startCommitWithTime(commit); - List records = insert ? dataGen.generateInserts(commit, 100) : dataGen.generateUpdates(commit, insertRecords); - JavaRDD result = writeClient.upsert(jsc().parallelize(records, 1), commit); + // Only supports INSERT, UPSERT, and BULK_INSERT + List records = writeOperationType == WriteOperationType.UPSERT + ? dataGen.generateUpdates(commit, insertRecords) : dataGen.generateInserts(commit, 100); + JavaRDD result = writeOperationType == WriteOperationType.BULK_INSERT + ? writeClient.bulkInsert(jsc().parallelize(records, 1), commit) + : writeClient.upsert(jsc().parallelize(records, 1), commit); List statuses = result.collect(); assertNoWriteErrors(statuses); return Pair.of(commit, records);