Skip to content

Commit

Permalink
Fixing delete partition flow
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Apr 6, 2023
1 parent 98414ab commit 5f94019
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
Expand Down Expand Up @@ -319,10 +317,7 @@ public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCo
// Add record bearing added partitions list
List<String> partitionsAdded = getPartitionsAdded(commitMetadata);

// Add record bearing deleted partitions list
List<String> partitionsDeleted = getPartitionsDeleted(commitMetadata);

records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded, partitionsDeleted));
records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded));

// Update files listing records for each individual partition
List<HoodieRecord<HoodieMetadataPayload>> updatedPartitionFilesRecords =
Expand Down Expand Up @@ -381,7 +376,7 @@ private static List<String> getPartitionsAdded(HoodieCommitMetadata commitMetada
}

private static List<String> getPartitionsDeleted(HoodieCommitMetadata commitMetadata) {
if (commitMetadata instanceof HoodieReplaceCommitMetadata
/*if (commitMetadata instanceof HoodieReplaceCommitMetadata
&& WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) {
Map<String, List<String>> partitionToReplaceFileIds =
((HoodieReplaceCommitMetadata) commitMetadata).getPartitionToReplaceFileIds();
Expand All @@ -390,7 +385,7 @@ private static List<String> getPartitionsDeleted(HoodieCommitMetadata commitMeta
// We need to make sure we properly handle case of non-partitioned tables
.map(HoodieTableMetadataUtil::getPartitionIdentifier)
.collect(Collectors.toList());
}
}*/

return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,33 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
|location '$tablePath'
|""".stripMargin)

df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
.option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "dt")
.option(URL_ENCODE_PARTITIONING.key(), urlencode)
.option(KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
.mode(SaveMode.Append)
.save(tablePath)

// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021/10/01')")

// trigger clean so that partition deletion kicks in.
spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)")
.collect()

val partitionPath = if (urlencode) {
PartitionPathEncodeUtils.escapePathName("2021/10/01")
} else {
"2021/10/01"
}
checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02"))
assertResult(true)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath"))
assertResult(false)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath"))

// show partitions
if (urlencode) {
Expand Down Expand Up @@ -225,10 +242,16 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
"Found duplicate keys 'dt'")
}

// insert data
spark.sql(s"""insert into $tableName values (3, "z5", "v1", "2021-10-01"), (4, "l5", "v1", "2021-10-02")""")

// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021-10-01')")

// trigger clean so that partition deletion kicks in.
spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)")
.collect()

checkAnswer(s"select id, name, ts, dt from $tableName")(Seq(2, "l4", "v1", "2021-10-02"))

// show partitions
Expand Down Expand Up @@ -269,9 +292,27 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
checkExceptionContain(s"alter table $tableName drop partition (year='2021', month='10')")(
"All partition columns need to be specified for Hoodie's partition"
)

df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
.option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "year,month,day")
.option(HIVE_STYLE_PARTITIONING.key, hiveStyle)
.option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName)
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
.mode(SaveMode.Append)
.save(tablePath)

// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')")

// trigger clean so that partition deletion kicks in.
spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)")
.collect()

checkAnswer(s"select id, name, ts, year, month, day from $tableName")(
Seq(2, "l4", "v1", "2021", "10", "02")
)
Expand Down Expand Up @@ -320,9 +361,26 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
| )
|""".stripMargin)

df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
.option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "year,month,day")
.option(HIVE_STYLE_PARTITIONING.key, hiveStyle)
.option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName)
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
.mode(SaveMode.Append)
.save(tablePath)

// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')")

// trigger clean so that partition deletion kicks in.
spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)")
.collect()

// insert data
spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021", "10", "02")""")

Expand Down

0 comments on commit 5f94019

Please sign in to comment.