Skip to content

Commit

Permalink
[HUDI-3135] Fix Show Partitions Command's Result after drop partition
Browse files Browse the repository at this point in the history
  • Loading branch information
XuQianJin-Stars committed Jan 2, 2022
1 parent 9412281 commit 9bfe088
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@

package org.apache.hudi.metadata;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieMetadataFileInfo;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.common.model.HoodieKey;
Expand All @@ -26,13 +33,6 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -87,7 +87,7 @@ public HoodieMetadataPayload(Option<GenericRecord> record) {
key = record.get().get(SCHEMA_FIELD_ID_KEY).toString();
type = (int) record.get().get(SCHEMA_FIELD_ID_TYPE);
if (record.get().get(SCHEMA_FIELD_ID_METADATA) != null) {
filesystemMetadata = (Map<String, HoodieMetadataFileInfo>) record.get().get("filesystemMetadata");
filesystemMetadata = (Map<String, HoodieMetadataFileInfo>) record.get().get(SCHEMA_FIELD_ID_METADATA);
filesystemMetadata.keySet().forEach(k -> {
GenericRecord v = filesystemMetadata.get(k);
filesystemMetadata.put(k.toString(), new HoodieMetadataFileInfo((Long) v.get("size"), (Boolean) v.get("isDeleted")));
Expand All @@ -108,8 +108,17 @@ private HoodieMetadataPayload(String key, int type, Map<String, HoodieMetadataFi
* @param partitions The list of partitions
*/
public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List<String> partitions) {
return createPartitionListRecord(partitions, false);
}

/**
* Create and return a {@code HoodieMetadataPayload} to save list of partitions.
*
* @param partitions The list of partitions
*/
public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List<String> partitions, boolean isDeleted) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, false)));
partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, isDeleted)));

HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.partitionPath());
HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
Expand Down Expand Up @@ -176,7 +185,7 @@ public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
* Returns the list of filenames added as part of this record.
*/
public List<String> getFilenames() {
return filterFileInfoEntries(false).map(e -> e.getKey()).sorted().collect(Collectors.toList());
return filterFileInfoEntries(false).map(Map.Entry::getKey).sorted().collect(Collectors.toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.metadata;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
Expand All @@ -26,6 +28,8 @@
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
Expand All @@ -35,9 +39,6 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieMetadataException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -89,6 +90,7 @@ public static void deleteMetadataTable(String basePath, HoodieEngineContext cont
public static List<HoodieRecord> convertMetadataToRecords(HoodieCommitMetadata commitMetadata, String instantTime) {
List<HoodieRecord> records = new LinkedList<>();
List<String> allPartitions = new LinkedList<>();

commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> {
final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionStatName;
allPartitions.add(partition);
Expand All @@ -109,15 +111,29 @@ public static List<HoodieRecord> convertMetadataToRecords(HoodieCommitMetadata c
: hoodieWriteStat.getTotalWriteBytes();
newFiles.put(filename, totalWriteBytes);
});

// New files added to a partition
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
partition, Option.of(newFiles), Option.empty());
records.add(record);
if (!newFiles.isEmpty()) {
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
partition, Option.of(newFiles), Option.empty());
records.add(record);
}
});

// New partitions created
HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new ArrayList<>(allPartitions));
records.add(record);
// Add delete partition's record
if (commitMetadata instanceof HoodieReplaceCommitMetadata
&& WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) {
Map<String, List<String>> partitionToReplaceFileIds = ((HoodieReplaceCommitMetadata) commitMetadata).getPartitionToReplaceFileIds();
HoodieRecord deleteRecord = HoodieMetadataPayload.createPartitionListRecord(
new ArrayList<>(partitionToReplaceFileIds.keySet()), true);
records.add(deleteRecord);
}

// Add created new partition's record
if (!allPartitions.isEmpty()) {
HoodieRecord addRecord = HoodieMetadataPayload.createPartitionListRecord(new ArrayList<>(allPartitions));
records.add(addRecord);
}

LOG.info("Updating at " + instantTime + " from Commit/" + commitMetadata.getOperationType()
+ ". #partitions_updated=" + records.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,14 @@ import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.sync.common.AbstractSyncTool
import org.apache.hudi.table.BulkInsertPartitioner
import org.apache.log4j.LogManager
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.SparkContext

import java.util.Properties

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -204,6 +203,7 @@ object HoodieSparkSqlWriter {
} else {
genericRecords.map(gr => keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect()
}

// Create a HoodieWriteClient & issue the delete.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path, tblName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {

checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01')")(
s"$tableName is a non-partitioned table that is not allowed to drop partition")

// show partitions
checkAnswer(s"show partitions $tableName")(Seq.empty: _*)
}

test("Purge drop non-partitioned table") {
Expand All @@ -71,6 +74,9 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {

checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01') purge")(
s"$tableName is a non-partitioned table that is not allowed to drop partition")

// show partitions
checkAnswer(s"show partitions $tableName")(Seq.empty: _*)
}

Seq(false, true).foreach { urlencode =>
Expand Down Expand Up @@ -118,6 +124,13 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
}
checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02"))
assertResult(true)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath"))

// show partitions
if (urlencode) {
checkAnswer(s"show partitions $tableName")(Seq(PartitionPathEncodeUtils.escapePathName("2021/10/02")))
} else {
checkAnswer(s"show partitions $tableName")(Seq("2021/10/02"))
}
}
}
}
Expand Down Expand Up @@ -167,6 +180,13 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
}
checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02"))
assertResult(false)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath"))

// show partitions
if (urlencode) {
checkAnswer(s"show partitions $tableName")(Seq(PartitionPathEncodeUtils.escapePathName("2021/10/02")))
} else {
checkAnswer(s"show partitions $tableName")(Seq("2021/10/02"))
}
}
}
}
Expand Down Expand Up @@ -199,7 +219,10 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021-10-01')")

checkAnswer(s"select id, name, ts, dt from $tableName") (Seq(2, "l4", "v1", "2021-10-02"))
checkAnswer(s"select id, name, ts, dt from $tableName")(Seq(2, "l4", "v1", "2021-10-02"))

// show partitions
checkAnswer(s"show partitions $tableName")(Seq("dt=2021-10-02"))
}

Seq(false, true).foreach { hiveStyle =>
Expand Down Expand Up @@ -247,6 +270,13 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
checkAnswer(s"select id, name, ts, year, month, day from $tableName")(
Seq(2, "l4", "v1", "2021", "10", "02")
)

// show partitions
if (hiveStyle) {
checkAnswer(s"show partitions $tableName")(Seq("year=2021/month=10/day=02"))
} else {
checkAnswer(s"show partitions $tableName")(Seq("2021/10/02"))
}
}
}
}
Expand Down Expand Up @@ -294,6 +324,13 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
)
assertResult(false)(existsPath(
s"${tmp.getCanonicalPath}/$tableName/year=2021/month=10/day=01"))

// show partitions
if (hiveStyle) {
checkAnswer(s"show partitions $tableName")(Seq("year=2021/month=10/day=02"))
} else {
checkAnswer(s"show partitions $tableName")(Seq("2021/10/02"))
}
}
}
}
Expand Down

0 comments on commit 9bfe088

Please sign in to comment.