From 4d9f20ee684e84369af6217eee5fd50b5153eeb2 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Thu, 10 Nov 2022 11:45:01 -0800 Subject: [PATCH] [HUDI-5176] Fix incremental source to consider inflight commits before completed commits (#7160) When multiple writers are writing to the same Hudi table under optimistic concurrency control, one writer may start later and finish the write transaction earlier than the other concurrent writer. In this case, the Hudi timeline contains an inflight commit before a completed commit. As a concrete example, writer 1 starts a commit at t1 and later writer 2 starts another commit at t2 (t2 > t1). Commit t2 finishes earlier than t1. t3 t4 ---------------------------------------------------------> t instant t1 |------------------------------| (writer 1) instant t2 |--------------| (writer 2) When Hudi incremental source is used on such a Hudi table, the logic to determine the next fetch (IncrSourceHelper.calculateBeginAndEndInstants()) is susceptible to data loss, missing the inflight instants. The method filters for the completed instants only and uses that to determine the instants to fetch, thus the checkpoint. For the same example, when incrementally fetching the data at t3, the checkpoint advances to t2 and ignores t1 indefinitely. To fix the problem, this PR adds the logic to only look at contiguous completed commits / deltacommits / non-clustering replacecommits for incremental pulls, e.g., completed instants before t1 in the example, so that the checkpoint never goes over the inflight commits which can cause data loss. To implement this, we figure out the earliest incomplete commit, deltacommit, or non-clustering replacecommit to bound the incremental pulls. This PR fixes the HoodieIncrSource, S3EventsHoodieIncrSource, and GcsEventsHoodieIncrSource for the same issue. --- .../common/table/timeline/TimelineUtils.java | 4 +- .../utilities/sources/HoodieIncrSource.java | 4 +- .../sources/S3EventsHoodieIncrSource.java | 4 +- .../sources/helpers/IncrSourceHelper.java | 18 +- .../sources/TestHoodieIncrSource.java | 233 ++++++++++++++++-- 5 files changed, 239 insertions(+), 24 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index 75493e7b463e..a070a7e94d1c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -185,7 +185,7 @@ private static Option getMetadataValue(HoodieTableMetaClient metaClient, } } - private static boolean isClusteringCommit(HoodieTableMetaClient metaClient, HoodieInstant instant) { + public static boolean isClusteringCommit(HoodieTableMetaClient metaClient, HoodieInstant instant) { try { if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) { // replacecommit is used for multiple operations: insert_overwrite/cluster etc. @@ -194,7 +194,7 @@ private static boolean isClusteringCommit(HoodieTableMetaClient metaClient, Hood metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class); return WriteOperationType.CLUSTER.equals(replaceMetadata.getOperationType()); } - + return false; } catch (IOException e) { throw new HoodieIOException("Unable to read instant information: " + instant + " for " + metaClient.getBasePath(), e); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index 35f309c4162f..5018e4f9ea39 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -148,7 +148,9 @@ public Pair>, String> fetchNextBatch(Option lastCkpt .load(srcPath) // add filtering so that only interested records are returned. .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - queryTypeAndInstantEndpts.getRight().getLeft())); + queryTypeAndInstantEndpts.getRight().getLeft())) + .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + queryTypeAndInstantEndpts.getRight().getRight())); } /* diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index 181374336584..7b8232f6194a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -157,7 +157,9 @@ public Pair>, String> fetchNextBatch(Option lastCkpt .option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(srcPath) // add filtering so that only interested records are returned. .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - queryTypeAndInstantEndpts.getRight().getLeft())); + queryTypeAndInstantEndpts.getRight().getLeft())) + .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + queryTypeAndInstantEndpts.getRight().getRight())); } if (source.isEmpty()) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java index d9415d036c31..34a6311d3fef 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java @@ -22,10 +22,10 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; - import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Row; @@ -72,8 +72,22 @@ public static Pair> calculateBeginAndEndInstants(Ja "Make sure the config hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive value"); HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build(); - final HoodieTimeline activeCommitTimeline = + // Find the earliest incomplete commit, deltacommit, or non-clustering replacecommit, + // so that the incremental pulls should be strictly before this instant. + // This is to guard around multi-writer scenarios where a commit starting later than + // another commit from a concurrent writer can finish earlier, leaving an inflight commit + // before a completed commit. + final Option firstIncompleteCommit = srcMetaClient.getCommitsTimeline() + .filterInflightsAndRequested() + .filter(instant -> + !HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction()) + || !ClusteringUtils.getClusteringPlan(srcMetaClient, instant).isPresent()) + .firstInstant(); + final HoodieTimeline completedCommitTimeline = srcMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); + final HoodieTimeline activeCommitTimeline = firstIncompleteCommit.map( + commit -> completedCommitTimeline.findInstantsBefore(commit.getTimestamp()) + ).orElse(completedCommitTimeline); String beginInstantTime = beginInstant.orElseGet(() -> { if (missingCheckpointStrategy != null) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java index b08438c5a770..5762bae3cbff 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java @@ -25,9 +25,13 @@ import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieArchivalConfig; @@ -46,20 +50,23 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.EnumSource; import java.io.IOException; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import java.util.Properties; -import java.util.stream.Stream; import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; +import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ; +import static org.apache.hudi.common.model.WriteOperationType.BULK_INSERT; +import static org.apache.hudi.common.model.WriteOperationType.INSERT; +import static org.apache.hudi.common.model.WriteOperationType.UPSERT; import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness { @@ -83,12 +90,8 @@ public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, Strin return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props); } - private static Stream tableTypeParams() { - return Arrays.stream(new HoodieTableType[][] {{HoodieTableType.COPY_ON_WRITE}, {HoodieTableType.MERGE_ON_READ}}).map(Arguments::of); - } - @ParameterizedTest - @MethodSource("tableTypeParams") + @EnumSource(HoodieTableType.class) public void testHoodieIncrSource(HoodieTableType tableType) throws IOException { this.tableType = tableType; metaClient = getHoodieMetaClient(hadoopConf(), basePath()); @@ -101,11 +104,11 @@ public void testHoodieIncrSource(HoodieTableType tableType) throws IOException { .build(); SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig); - Pair> inserts = writeRecords(writeClient, true, null, "100"); - Pair> inserts2 = writeRecords(writeClient, true, null, "200"); - Pair> inserts3 = writeRecords(writeClient, true, null, "300"); - Pair> inserts4 = writeRecords(writeClient, true, null, "400"); - Pair> inserts5 = writeRecords(writeClient, true, null, "500"); + Pair> inserts = writeRecords(writeClient, INSERT, null, "100"); + Pair> inserts2 = writeRecords(writeClient, INSERT, null, "200"); + Pair> inserts3 = writeRecords(writeClient, INSERT, null, "300"); + Pair> inserts4 = writeRecords(writeClient, INSERT, null, "400"); + Pair> inserts5 = writeRecords(writeClient, INSERT, null, "500"); // read everything upto latest readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.empty(), 500, inserts5.getKey()); @@ -122,13 +125,200 @@ public void testHoodieIncrSource(HoodieTableType tableType) throws IOException { // ensure checkpoint does not move readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(inserts5.getKey()), 0, inserts5.getKey()); - Pair> inserts6 = writeRecords(writeClient, true, null, "600"); + Pair> inserts6 = writeRecords(writeClient, INSERT, null, "600"); // insert new batch and ensure the checkpoint moves readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(inserts5.getKey()), 100, inserts6.getKey()); writeClient.close(); } + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testHoodieIncrSourceInflightCommitBeforeCompletedCommit(HoodieTableType tableType) throws IOException { + this.tableType = tableType; + metaClient = getHoodieMetaClient(hadoopConf(), basePath()); + HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(3, 4).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(2).build()) + .withCompactionConfig( + HoodieCompactionConfig.newBuilder() + .withInlineCompaction(true) + .withMaxNumDeltaCommitsBeforeCompaction(3) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .build(); + + SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig); + List>> inserts = new ArrayList<>(); + + for (int i = 0; i < 6; i++) { + inserts.add(writeRecords(writeClient, INSERT, null, HoodieActiveTimeline.createNewInstantTime())); + } + + // Emulates a scenario where an inflight commit is before a completed commit + // The checkpoint should not go past this commit + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieInstant instant4 = activeTimeline + .filter(instant -> instant.getTimestamp().equals(inserts.get(4).getKey())).firstInstant().get(); + Option instant4CommitData = activeTimeline.getInstantDetails(instant4); + activeTimeline.revertToInflight(instant4); + metaClient.reloadActiveTimeline(); + + // Reads everything up to latest + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, + Option.empty(), + 400, + inserts.get(3).getKey()); + + // Even if the beginning timestamp is archived, full table scan should kick in, but should filter for records having commit time > first instant time + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, + Option.of(inserts.get(0).getKey()), + 300, + inserts.get(3).getKey()); + + // Even if the read upto latest is set, if begin timestamp is in active timeline, only incremental should kick in. + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, + Option.of(inserts.get(2).getKey()), + 100, + inserts.get(3).getKey()); + + // Reads just the latest + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, + Option.empty(), + 100, + inserts.get(3).getKey()); + + // Ensures checkpoint does not move + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, + Option.of(inserts.get(3).getKey()), + 0, + inserts.get(3).getKey()); + + activeTimeline.reload().saveAsComplete( + new HoodieInstant(HoodieInstant.State.INFLIGHT, instant4.getAction(), inserts.get(4).getKey()), + instant4CommitData); + + // After the inflight commit completes, the checkpoint should move on after incremental pull + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, + Option.of(inserts.get(3).getKey()), + 200, + inserts.get(5).getKey()); + + writeClient.close(); + } + + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testHoodieIncrSourceWithPendingTableServices(HoodieTableType tableType) throws IOException { + this.tableType = tableType; + metaClient = getHoodieMetaClient(hadoopConf(), basePath()); + HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10, 12).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build()) + .withCompactionConfig( + HoodieCompactionConfig.newBuilder() + .withScheduleInlineCompaction(true) + .withMaxNumDeltaCommitsBeforeCompaction(1) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .build(); + + SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig); + List>> dataBatches = new ArrayList<>(); + + // For COW: + // 0: bulk_insert of 100 records + // 1: bulk_insert of 100 records + // 2: bulk_insert of 100 records + // schedule clustering + // 3: bulk_insert of 100 records + // 4: upsert of 100 records (updates only based on round 3) + // 5: upsert of 100 records (updates only based on round 3) + // 6: bulk_insert of 100 records + // For MOR: + // 0: bulk_insert of 100 records + // 1: bulk_insert of 100 records + // 2: bulk_insert of 100 records + // 3: bulk_insert of 100 records + // 4: upsert of 100 records (updates only based on round 3) + // schedule compaction + // 5: upsert of 100 records (updates only based on round 3) + // schedule clustering + // 6: bulk_insert of 100 records + for (int i = 0; i < 6; i++) { + WriteOperationType opType = i < 4 ? BULK_INSERT : UPSERT; + List recordsForUpdate = i < 4 ? null : dataBatches.get(3).getRight(); + dataBatches.add(writeRecords(writeClient, opType, recordsForUpdate, HoodieActiveTimeline.createNewInstantTime())); + if (tableType == COPY_ON_WRITE) { + if (i == 2) { + writeClient.scheduleClustering(Option.empty()); + } + } else if (tableType == MERGE_ON_READ) { + if (i == 4) { + writeClient.scheduleCompaction(Option.empty()); + } + if (i == 5) { + writeClient.scheduleClustering(Option.empty()); + } + } + } + dataBatches.add(writeRecords(writeClient, BULK_INSERT, null, HoodieActiveTimeline.createNewInstantTime())); + + String latestCommitTimestamp = dataBatches.get(dataBatches.size() - 1).getKey(); + // Pending clustering exists + Option clusteringInstant = + metaClient.getActiveTimeline().filterPendingReplaceTimeline() + .filter(instant -> ClusteringUtils.getClusteringPlan(metaClient, instant).isPresent()) + .firstInstant(); + assertTrue(clusteringInstant.isPresent()); + assertTrue(clusteringInstant.get().getTimestamp().compareTo(latestCommitTimestamp) < 0); + + if (tableType == MERGE_ON_READ) { + // Pending compaction exists + Option compactionInstant = + metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant(); + assertTrue(compactionInstant.isPresent()); + assertTrue(compactionInstant.get().getTimestamp().compareTo(latestCommitTimestamp) < 0); + } + + // The pending tables services should not block the incremental pulls + // Reads everything up to latest + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, + Option.empty(), + 500, + dataBatches.get(6).getKey()); + + // Even if the read upto latest is set, if begin timestamp is in active timeline, only incremental should kick in. + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, + Option.of(dataBatches.get(2).getKey()), + 200, + dataBatches.get(6).getKey()); + + // Reads just the latest + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, + Option.empty(), + 100, + dataBatches.get(6).getKey()); + + // Ensures checkpoint does not move + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, + Option.of(dataBatches.get(6).getKey()), + 0, + dataBatches.get(6).getKey()); + + writeClient.close(); + } + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option checkpointToPull, int expectedCount, String expectedCheckpoint) { Properties properties = new Properties(); @@ -148,10 +338,17 @@ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingChe Assertions.assertEquals(expectedCheckpoint, batchCheckPoint.getRight()); } - private Pair> writeRecords(SparkRDDWriteClient writeClient, boolean insert, List insertRecords, String commit) throws IOException { + private Pair> writeRecords(SparkRDDWriteClient writeClient, + WriteOperationType writeOperationType, + List insertRecords, + String commit) throws IOException { writeClient.startCommitWithTime(commit); - List records = insert ? dataGen.generateInserts(commit, 100) : dataGen.generateUpdates(commit, insertRecords); - JavaRDD result = writeClient.upsert(jsc().parallelize(records, 1), commit); + // Only supports INSERT, UPSERT, and BULK_INSERT + List records = writeOperationType == WriteOperationType.UPSERT + ? dataGen.generateUpdates(commit, insertRecords) : dataGen.generateInserts(commit, 100); + JavaRDD result = writeOperationType == WriteOperationType.BULK_INSERT + ? writeClient.bulkInsert(jsc().parallelize(records, 1), commit) + : writeClient.upsert(jsc().parallelize(records, 1), commit); List statuses = result.collect(); assertNoWriteErrors(statuses); return Pair.of(commit, records);