Skip to content

Commit

Permalink
Rebase and change test to check partition deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Mar 31, 2022
1 parent dfbf5f2 commit dfe7b9c
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;

Expand Down Expand Up @@ -63,11 +62,10 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable {
void update(HoodieRollbackMetadata rollbackMetadata, String instantTime);

/**
* Drop the given metadata partitions. This path reuses DELETE_PARTITION operation.
* Deletes the given metadata partitions. This path reuses DELETE_PARTITION operation.
*
* @param instantTime - instant time when replacecommit corresponding to the drop will be recorded in the metadata timeline
* @param partitions - list of {@link MetadataPartitionType} to drop
* @throws IOException
*/
void deletePartitions(String instantTime, List<MetadataPartitionType> partitions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config
this.skipLocking = skipLocking;
}

static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {
private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {
Path deletePath = new Path(deletePathStr);
LOG.debug("Working on delete path :" + deletePath);
try {
Expand All @@ -88,7 +88,7 @@ static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throw
}
}

static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator<Pair<String, CleanFileInfo>> cleanFileInfo, HoodieTable table) {
private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator<Pair<String, CleanFileInfo>> cleanFileInfo, HoodieTable table) {
Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
FileSystem fs = table.getMetaClient().getFs();

Expand Down Expand Up @@ -138,15 +138,14 @@ List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan clean
.flatMap(x -> x.getValue().stream().map(y -> new ImmutablePair<>(x.getKey(),
new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile()))));

List<String> partitionsToBeDeleted = cleanerPlan.getPartitionsToBeDeleted() != null ? cleanerPlan.getPartitionsToBeDeleted() : new ArrayList<>();

Stream<ImmutablePair<String, PartitionCleanStat>> partitionCleanStats =
context.mapPartitionsToPairAndReduceByKey(filesToBeDeletedPerPartition,
iterator -> deleteFilesFunc(iterator, table), PartitionCleanStat::merge, cleanerParallelism);

Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));

List<String> partitionsToBeDeleted = cleanerPlan.getPartitionsToBeDeleted() != null ? cleanerPlan.getPartitionsToBeDeleted() : new ArrayList<>();
partitionsToBeDeleted.forEach(entry -> {
try {
deleteFileAndGetResult(table.getMetaClient().getFs(), table.getMetaClient().getBasePath() + "/" + entry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(S
* retain 10 commits, and commit batch time is 30 mins, then you have 5 hrs of lookback)
* <p>
* This policy is the default.
*
* @return A {@link Pair} whose left is boolean indicating whether partition itself needs to be deleted,
* and right is a list of {@link CleanFileInfo} about the files in the partition that needs to be deleted.
*/
private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) {
LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,23 +450,18 @@ private void testTableOperationsForMetaIndexImpl(final HoodieWriteConfig writeCo
@EnumSource(HoodieTableType.class)
public void testMetadataTableDeletePartition(HoodieTableType tableType) throws IOException {
initPath();
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
.withIndexConfig(HoodieIndexConfig.newBuilder()
.bloomIndexBucketizedChecking(false)
.build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(true)
.withMetadataIndexBloomFilter(true)
.withMetadataIndexBloomFilterFileGroups(4)
.withMetadataIndexColumnStats(true)
.withMetadataIndexBloomFilterFileGroups(2)
.withMetadataIndexForAllColumns(true)
.build())
int maxCommits = 1;
HoodieWriteConfig cfg = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
.build();
init(tableType, writeConfig);
init(tableType);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);

try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, cfg)) {
// Write 1 (Bulk insert)
String newCommitTime = "0000001";
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
Expand All @@ -491,9 +486,9 @@ public void testMetadataTableDeletePartition(HoodieTableType tableType) throws I

HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, metadataMetaClient.getBasePath(), false, false);
// partition should still be physically present
// partition should be physically deleted
assertEquals(metadataWriter.getEnabledPartitionTypes().size(), metadataTablePartitions.size());
assertTrue(metadataTablePartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));
assertFalse(metadataTablePartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));

Option<HoodieInstant> completedReplaceInstant = metadataMetaClient.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant();
assertTrue(completedReplaceInstant.isPresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCo
List<HoodieRecord> records = new ArrayList<>(commitMetadata.getPartitionToWriteStats().size());

// Add record bearing added partitions list
ArrayList<String> partitionsAdded = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet());
List<String> partitionsAdded = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet());

records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded));

Expand Down Expand Up @@ -337,7 +337,7 @@ public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCl
records.add(HoodieMetadataPayload.createPartitionListRecord(deletedPartitions, true));
}
LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size()
+ ", #files_deleted=" + fileDeleteCount[0]);
+ ", #files_deleted=" + fileDeleteCount[0] + ", #partitions_deleted=" + deletedPartitions.size());
return records;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,8 @@ import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
import org.apache.hudi.table.BulkInsertPartitioner
import org.apache.log4j.LogManager
import org.apache.spark.SPARK_VERSION
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql._
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.types.StructType
Expand Down

0 comments on commit dfe7b9c

Please sign in to comment.