diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index 75493e7b463e..a070a7e94d1c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -185,7 +185,7 @@ private static Option getMetadataValue(HoodieTableMetaClient metaClient, } } - private static boolean isClusteringCommit(HoodieTableMetaClient metaClient, HoodieInstant instant) { + public static boolean isClusteringCommit(HoodieTableMetaClient metaClient, HoodieInstant instant) { try { if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) { // replacecommit is used for multiple operations: insert_overwrite/cluster etc. @@ -194,7 +194,7 @@ private static boolean isClusteringCommit(HoodieTableMetaClient metaClient, Hood metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class); return WriteOperationType.CLUSTER.equals(replaceMetadata.getOperationType()); } - + return false; } catch (IOException e) { throw new HoodieIOException("Unable to read instant information: " + instant + " for " + metaClient.getBasePath(), e); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index 35f309c4162f..5018e4f9ea39 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -148,7 +148,9 @@ public Pair>, String> fetchNextBatch(Option lastCkpt .load(srcPath) // add filtering so that only interested records are returned. .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - queryTypeAndInstantEndpts.getRight().getLeft())); + queryTypeAndInstantEndpts.getRight().getLeft())) + .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + queryTypeAndInstantEndpts.getRight().getRight())); } /* diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index 181374336584..7b8232f6194a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -157,7 +157,9 @@ public Pair>, String> fetchNextBatch(Option lastCkpt .option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(srcPath) // add filtering so that only interested records are returned. .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - queryTypeAndInstantEndpts.getRight().getLeft())); + queryTypeAndInstantEndpts.getRight().getLeft())) + .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + queryTypeAndInstantEndpts.getRight().getRight())); } if (source.isEmpty()) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java index d9415d036c31..34a6311d3fef 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java @@ -22,10 +22,10 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; - import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Row; @@ -72,8 +72,22 @@ public static Pair> calculateBeginAndEndInstants(Ja "Make sure the config hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive value"); HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build(); - final HoodieTimeline activeCommitTimeline = + // Find the earliest incomplete commit, deltacommit, or non-clustering replacecommit, + // so that the incremental pulls should be strictly before this instant. + // This is to guard around multi-writer scenarios where a commit starting later than + // another commit from a concurrent writer can finish earlier, leaving an inflight commit + // before a completed commit. + final Option firstIncompleteCommit = srcMetaClient.getCommitsTimeline() + .filterInflightsAndRequested() + .filter(instant -> + !HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction()) + || !ClusteringUtils.getClusteringPlan(srcMetaClient, instant).isPresent()) + .firstInstant(); + final HoodieTimeline completedCommitTimeline = srcMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); + final HoodieTimeline activeCommitTimeline = firstIncompleteCommit.map( + commit -> completedCommitTimeline.findInstantsBefore(commit.getTimestamp()) + ).orElse(completedCommitTimeline); String beginInstantTime = beginInstant.orElseGet(() -> { if (missingCheckpointStrategy != null) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java index b08438c5a770..5762bae3cbff 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java @@ -25,9 +25,13 @@ import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieArchivalConfig; @@ -46,20 +50,23 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.EnumSource; import java.io.IOException; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import java.util.Properties; -import java.util.stream.Stream; import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; +import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ; +import static org.apache.hudi.common.model.WriteOperationType.BULK_INSERT; +import static org.apache.hudi.common.model.WriteOperationType.INSERT; +import static org.apache.hudi.common.model.WriteOperationType.UPSERT; import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness { @@ -83,12 +90,8 @@ public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, Strin return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props); } - private static Stream tableTypeParams() { - return Arrays.stream(new HoodieTableType[][] {{HoodieTableType.COPY_ON_WRITE}, {HoodieTableType.MERGE_ON_READ}}).map(Arguments::of); - } - @ParameterizedTest - @MethodSource("tableTypeParams") + @EnumSource(HoodieTableType.class) public void testHoodieIncrSource(HoodieTableType tableType) throws IOException { this.tableType = tableType; metaClient = getHoodieMetaClient(hadoopConf(), basePath()); @@ -101,11 +104,11 @@ public void testHoodieIncrSource(HoodieTableType tableType) throws IOException { .build(); SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig); - Pair> inserts = writeRecords(writeClient, true, null, "100"); - Pair> inserts2 = writeRecords(writeClient, true, null, "200"); - Pair> inserts3 = writeRecords(writeClient, true, null, "300"); - Pair> inserts4 = writeRecords(writeClient, true, null, "400"); - Pair> inserts5 = writeRecords(writeClient, true, null, "500"); + Pair> inserts = writeRecords(writeClient, INSERT, null, "100"); + Pair> inserts2 = writeRecords(writeClient, INSERT, null, "200"); + Pair> inserts3 = writeRecords(writeClient, INSERT, null, "300"); + Pair> inserts4 = writeRecords(writeClient, INSERT, null, "400"); + Pair> inserts5 = writeRecords(writeClient, INSERT, null, "500"); // read everything upto latest readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.empty(), 500, inserts5.getKey()); @@ -122,13 +125,200 @@ public void testHoodieIncrSource(HoodieTableType tableType) throws IOException { // ensure checkpoint does not move readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(inserts5.getKey()), 0, inserts5.getKey()); - Pair> inserts6 = writeRecords(writeClient, true, null, "600"); + Pair> inserts6 = writeRecords(writeClient, INSERT, null, "600"); // insert new batch and ensure the checkpoint moves readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(inserts5.getKey()), 100, inserts6.getKey()); writeClient.close(); } + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testHoodieIncrSourceInflightCommitBeforeCompletedCommit(HoodieTableType tableType) throws IOException { + this.tableType = tableType; + metaClient = getHoodieMetaClient(hadoopConf(), basePath()); + HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(3, 4).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(2).build()) + .withCompactionConfig( + HoodieCompactionConfig.newBuilder() + .withInlineCompaction(true) + .withMaxNumDeltaCommitsBeforeCompaction(3) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .build(); + + SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig); + List>> inserts = new ArrayList<>(); + + for (int i = 0; i < 6; i++) { + inserts.add(writeRecords(writeClient, INSERT, null, HoodieActiveTimeline.createNewInstantTime())); + } + + // Emulates a scenario where an inflight commit is before a completed commit + // The checkpoint should not go past this commit + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieInstant instant4 = activeTimeline + .filter(instant -> instant.getTimestamp().equals(inserts.get(4).getKey())).firstInstant().get(); + Option instant4CommitData = activeTimeline.getInstantDetails(instant4); + activeTimeline.revertToInflight(instant4); + metaClient.reloadActiveTimeline(); + + // Reads everything up to latest + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, + Option.empty(), + 400, + inserts.get(3).getKey()); + + // Even if the beginning timestamp is archived, full table scan should kick in, but should filter for records having commit time > first instant time + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, + Option.of(inserts.get(0).getKey()), + 300, + inserts.get(3).getKey()); + + // Even if the read upto latest is set, if begin timestamp is in active timeline, only incremental should kick in. + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, + Option.of(inserts.get(2).getKey()), + 100, + inserts.get(3).getKey()); + + // Reads just the latest + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, + Option.empty(), + 100, + inserts.get(3).getKey()); + + // Ensures checkpoint does not move + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, + Option.of(inserts.get(3).getKey()), + 0, + inserts.get(3).getKey()); + + activeTimeline.reload().saveAsComplete( + new HoodieInstant(HoodieInstant.State.INFLIGHT, instant4.getAction(), inserts.get(4).getKey()), + instant4CommitData); + + // After the inflight commit completes, the checkpoint should move on after incremental pull + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, + Option.of(inserts.get(3).getKey()), + 200, + inserts.get(5).getKey()); + + writeClient.close(); + } + + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testHoodieIncrSourceWithPendingTableServices(HoodieTableType tableType) throws IOException { + this.tableType = tableType; + metaClient = getHoodieMetaClient(hadoopConf(), basePath()); + HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10, 12).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build()) + .withCompactionConfig( + HoodieCompactionConfig.newBuilder() + .withScheduleInlineCompaction(true) + .withMaxNumDeltaCommitsBeforeCompaction(1) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .build(); + + SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig); + List>> dataBatches = new ArrayList<>(); + + // For COW: + // 0: bulk_insert of 100 records + // 1: bulk_insert of 100 records + // 2: bulk_insert of 100 records + // schedule clustering + // 3: bulk_insert of 100 records + // 4: upsert of 100 records (updates only based on round 3) + // 5: upsert of 100 records (updates only based on round 3) + // 6: bulk_insert of 100 records + // For MOR: + // 0: bulk_insert of 100 records + // 1: bulk_insert of 100 records + // 2: bulk_insert of 100 records + // 3: bulk_insert of 100 records + // 4: upsert of 100 records (updates only based on round 3) + // schedule compaction + // 5: upsert of 100 records (updates only based on round 3) + // schedule clustering + // 6: bulk_insert of 100 records + for (int i = 0; i < 6; i++) { + WriteOperationType opType = i < 4 ? BULK_INSERT : UPSERT; + List recordsForUpdate = i < 4 ? null : dataBatches.get(3).getRight(); + dataBatches.add(writeRecords(writeClient, opType, recordsForUpdate, HoodieActiveTimeline.createNewInstantTime())); + if (tableType == COPY_ON_WRITE) { + if (i == 2) { + writeClient.scheduleClustering(Option.empty()); + } + } else if (tableType == MERGE_ON_READ) { + if (i == 4) { + writeClient.scheduleCompaction(Option.empty()); + } + if (i == 5) { + writeClient.scheduleClustering(Option.empty()); + } + } + } + dataBatches.add(writeRecords(writeClient, BULK_INSERT, null, HoodieActiveTimeline.createNewInstantTime())); + + String latestCommitTimestamp = dataBatches.get(dataBatches.size() - 1).getKey(); + // Pending clustering exists + Option clusteringInstant = + metaClient.getActiveTimeline().filterPendingReplaceTimeline() + .filter(instant -> ClusteringUtils.getClusteringPlan(metaClient, instant).isPresent()) + .firstInstant(); + assertTrue(clusteringInstant.isPresent()); + assertTrue(clusteringInstant.get().getTimestamp().compareTo(latestCommitTimestamp) < 0); + + if (tableType == MERGE_ON_READ) { + // Pending compaction exists + Option compactionInstant = + metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant(); + assertTrue(compactionInstant.isPresent()); + assertTrue(compactionInstant.get().getTimestamp().compareTo(latestCommitTimestamp) < 0); + } + + // The pending tables services should not block the incremental pulls + // Reads everything up to latest + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, + Option.empty(), + 500, + dataBatches.get(6).getKey()); + + // Even if the read upto latest is set, if begin timestamp is in active timeline, only incremental should kick in. + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, + Option.of(dataBatches.get(2).getKey()), + 200, + dataBatches.get(6).getKey()); + + // Reads just the latest + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, + Option.empty(), + 100, + dataBatches.get(6).getKey()); + + // Ensures checkpoint does not move + readAndAssert( + IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, + Option.of(dataBatches.get(6).getKey()), + 0, + dataBatches.get(6).getKey()); + + writeClient.close(); + } + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option checkpointToPull, int expectedCount, String expectedCheckpoint) { Properties properties = new Properties(); @@ -148,10 +338,17 @@ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingChe Assertions.assertEquals(expectedCheckpoint, batchCheckPoint.getRight()); } - private Pair> writeRecords(SparkRDDWriteClient writeClient, boolean insert, List insertRecords, String commit) throws IOException { + private Pair> writeRecords(SparkRDDWriteClient writeClient, + WriteOperationType writeOperationType, + List insertRecords, + String commit) throws IOException { writeClient.startCommitWithTime(commit); - List records = insert ? dataGen.generateInserts(commit, 100) : dataGen.generateUpdates(commit, insertRecords); - JavaRDD result = writeClient.upsert(jsc().parallelize(records, 1), commit); + // Only supports INSERT, UPSERT, and BULK_INSERT + List records = writeOperationType == WriteOperationType.UPSERT + ? dataGen.generateUpdates(commit, insertRecords) : dataGen.generateInserts(commit, 100); + JavaRDD result = writeOperationType == WriteOperationType.BULK_INSERT + ? writeClient.bulkInsert(jsc().parallelize(records, 1), commit) + : writeClient.upsert(jsc().parallelize(records, 1), commit); List statuses = result.collect(); assertNoWriteErrors(statuses); return Pair.of(commit, records);