diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 395ed8ad3f06a..136d583235d9d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -21,6 +21,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.CleanFileInfo; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.FileSlice; @@ -42,7 +43,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieSavepointException; -import org.apache.hudi.metadata.FileSystemBackedTableMetadata; import org.apache.hudi.table.HoodieTable; import org.apache.log4j.LogManager; @@ -210,15 +210,7 @@ private List getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata */ private List getPartitionPathsForFullCleaning() { // Go to brute force mode of scanning all partitions - try { - // Because the partition of BaseTableMetadata has been deleted, - // all partition information can only be obtained from FileSystemBackedTableMetadata. - FileSystemBackedTableMetadata fsBackedTableMetadata = new FileSystemBackedTableMetadata(context, - context.getHadoopConf(), config.getBasePath(), config.shouldAssumeDatePartitioning()); - return fsBackedTableMetadata.getAllPartitionPaths(); - } catch (IOException e) { - return Collections.emptyList(); - } + return FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), config.getBasePath()); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 81ba4f2a66bae..5c4761f27da92 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -35,9 +35,7 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; -import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; @@ -319,10 +317,7 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCo // Add record bearing added partitions list List partitionsAdded = getPartitionsAdded(commitMetadata); - // Add record bearing deleted partitions list - List partitionsDeleted = getPartitionsDeleted(commitMetadata); - - records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded, partitionsDeleted)); + records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded)); // Update files listing records for each individual partition List> updatedPartitionFilesRecords = @@ -380,21 +375,6 @@ private static List getPartitionsAdded(HoodieCommitMetadata commitMetada .collect(Collectors.toList()); } - private static List getPartitionsDeleted(HoodieCommitMetadata commitMetadata) { - if (commitMetadata instanceof HoodieReplaceCommitMetadata - && WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) { - Map> partitionToReplaceFileIds = - ((HoodieReplaceCommitMetadata) commitMetadata).getPartitionToReplaceFileIds(); - - return partitionToReplaceFileIds.keySet().stream() - // We need to make sure we properly handle case of non-partitioned tables - .map(HoodieTableMetadataUtil::getPartitionIdentifier) - .collect(Collectors.toList()); - } - - return Collections.emptyList(); - } - /** * Convert commit action metadata to bloom filter records. * diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala index 7077b2d37a33e..3085643dc9fb4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala @@ -18,12 +18,14 @@ package org.apache.spark.sql.hudi import org.apache.hadoop.fs.Path +import org.apache.hudi.avro.model.HoodieCleanMetadata import org.apache.hudi.{HoodieSparkRecordMerger, HoodieSparkUtils} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.config.HoodieStorageConfig import org.apache.hudi.common.model.HoodieAvroRecordMerger import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.ExceptionUtil.getRootCause import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex @@ -229,6 +231,17 @@ object HoodieSparkSqlTestBase { metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getRight } + def getLastCleanMetadata(spark: SparkSession, tablePath: String) = { + val metaClient = HoodieTableMetaClient.builder() + .setConf(spark.sparkContext.hadoopConfiguration) + .setBasePath(tablePath) + .build() + + val cleanInstant = metaClient.reloadActiveTimeline().getCleanerTimeline.filterCompletedInstants().lastInstant().get() + TimelineMetadataUtils.deserializeHoodieCleanMetadata(metaClient + .getActiveTimeline.getInstantDetails(cleanInstant).get) + } + private def checkMessageContains(e: Throwable, text: String): Boolean = e.getMessage.trim.contains(text.trim) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index 02c558fc2f3cc..843b37045510d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -18,15 +18,16 @@ package org.apache.spark.sql.hudi import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils} +import org.apache.hudi.avro.model.{HoodieCleanMetadata, HoodieCleanPartitionMetadata} import org.apache.hudi.common.model.HoodieCommitMetadata import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant} -import org.apache.hudi.common.util.{Option => HOption} -import org.apache.hudi.common.util.{PartitionPathEncodeUtils, StringUtils} +import org.apache.hudi.common.util.{PartitionPathEncodeUtils, StringUtils, Option => HOption} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator} +import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils} import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCleanMetadata import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions.assertTrue @@ -117,16 +118,41 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { |location '$tablePath' |""".stripMargin) + df.write.format("hudi") + .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "dt") + .option(URL_ENCODE_PARTITIONING.key(), urlencode) + .option(KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName) + .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1") + .mode(SaveMode.Append) + .save(tablePath) + // drop 2021-10-01 partition spark.sql(s"alter table $tableName drop partition (dt='2021/10/01')") + // trigger clean so that partition deletion kicks in. + spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)") + .collect() + + val cleanMetadata: HoodieCleanMetadata = getLastCleanMetadata(spark, tablePath) + val cleanPartitionMeta = new java.util.ArrayList(cleanMetadata.getPartitionMetadata.values()).toArray() + var totalDeletedFiles = 0 + cleanPartitionMeta.foreach(entry => { + totalDeletedFiles += entry.asInstanceOf[HoodieCleanPartitionMetadata].getSuccessDeleteFiles.size() + }) + assertTrue(totalDeletedFiles > 0) + val partitionPath = if (urlencode) { PartitionPathEncodeUtils.escapePathName("2021/10/01") } else { "2021/10/01" } checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02")) - assertResult(true)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath")) + assertResult(false)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath")) // show partitions if (urlencode) { @@ -225,11 +251,20 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { "Found duplicate keys 'dt'") } + // insert data + spark.sql(s"""insert into $tableName values (3, "z5", "v1", "2021-10-01"), (4, "l5", "v1", "2021-10-02")""") // drop 2021-10-01 partition spark.sql(s"alter table $tableName drop partition (dt='2021-10-01')") - checkAnswer(s"select id, name, ts, dt from $tableName")(Seq(2, "l4", "v1", "2021-10-02")) + // trigger clean so that partition deletion kicks in. + spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)") + .collect() + + checkAnswer(s"select id, name, ts, dt from $tableName")( + Seq(2, "l4", "v1", "2021-10-02"), + Seq(4, "l5", "v1", "2021-10-02") + ) // show partitions checkAnswer(s"show partitions $tableName")(Seq("dt=2021-10-02")) @@ -269,9 +304,35 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { checkExceptionContain(s"alter table $tableName drop partition (year='2021', month='10')")( "All partition columns need to be specified for Hoodie's partition" ) + + df.write.format("hudi") + .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "year,month,day") + .option(HIVE_STYLE_PARTITIONING.key, hiveStyle) + .option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName) + .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1") + .mode(SaveMode.Append) + .save(tablePath) + // drop 2021-10-01 partition spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')") + // trigger clean so that partition deletion kicks in. + spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)") + .collect() + + val cleanMetadata: HoodieCleanMetadata = getLastCleanMetadata(spark, tablePath) + val cleanPartitionMeta = new java.util.ArrayList(cleanMetadata.getPartitionMetadata.values()).toArray() + var totalDeletedFiles = 0 + cleanPartitionMeta.foreach(entry => { + totalDeletedFiles += entry.asInstanceOf[HoodieCleanPartitionMetadata].getSuccessDeleteFiles.size() + }) + assertTrue(totalDeletedFiles > 0) + checkAnswer(s"select id, name, ts, year, month, day from $tableName")( Seq(2, "l4", "v1", "2021", "10", "02") ) @@ -320,9 +381,36 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { | ) |""".stripMargin) + df.write.format("hudi") + .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "year,month,day") + .option(HIVE_STYLE_PARTITIONING.key, hiveStyle) + .option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName) + .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1") + .mode(SaveMode.Append) + .save(tablePath) + // drop 2021-10-01 partition spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')") + spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021", "10", "02")""") + + // trigger clean so that partition deletion kicks in. + spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)") + .collect() + + val cleanMetadata: HoodieCleanMetadata = getLastCleanMetadata(spark, tablePath) + val cleanPartitionMeta = new java.util.ArrayList(cleanMetadata.getPartitionMetadata.values()).toArray() + var totalDeletedFiles = 0 + cleanPartitionMeta.foreach(entry => { + totalDeletedFiles += entry.asInstanceOf[HoodieCleanPartitionMetadata].getSuccessDeleteFiles.size() + }) + assertTrue(totalDeletedFiles > 0) + // insert data spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021", "10", "02")""")