Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3743] Support DELETE_PARTITION for metadata table #5169

Merged
merged 8 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,12 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable {
* @param instantTime instant time of the commit.
*/
void update(HoodieRollbackMetadata rollbackMetadata, String instantTime);

/**
* Deletes the given metadata partitions. This path reuses DELETE_PARTITION operation.
*
* @param instantTime - instant time when replacecommit corresponding to the drop will be recorded in the metadata timeline
* @param partitions - list of {@link MetadataPartitionType} to drop
*/
void deletePartitions(String instantTime, List<MetadataPartitionType> partitions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config
this.skipLocking = skipLocking;
}

static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {
private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {
Path deletePath = new Path(deletePathStr);
LOG.debug("Working on delete path :" + deletePath);
try {
Expand All @@ -88,7 +88,7 @@ static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throw
}
}

static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator<Pair<String, CleanFileInfo>> cleanFileInfo, HoodieTable table) {
private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator<Pair<String, CleanFileInfo>> cleanFileInfo, HoodieTable table) {
Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
FileSystem fs = table.getMetaClient().getFs();

Expand Down Expand Up @@ -138,15 +138,14 @@ List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan clean
.flatMap(x -> x.getValue().stream().map(y -> new ImmutablePair<>(x.getKey(),
new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile()))));

List<String> partitionsToBeDeleted = cleanerPlan.getPartitionsToBeDeleted() != null ? cleanerPlan.getPartitionsToBeDeleted() : new ArrayList<>();

Stream<ImmutablePair<String, PartitionCleanStat>> partitionCleanStats =
context.mapPartitionsToPairAndReduceByKey(filesToBeDeletedPerPartition,
iterator -> deleteFilesFunc(iterator, table), PartitionCleanStat::merge, cleanerParallelism);

Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));

List<String> partitionsToBeDeleted = cleanerPlan.getPartitionsToBeDeleted() != null ? cleanerPlan.getPartitionsToBeDeleted() : new ArrayList<>();
partitionsToBeDeleted.forEach(entry -> {
try {
deleteFileAndGetResult(table.getMetaClient().getFs(), table.getMetaClient().getBasePath() + "/" + entry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
Expand All @@ -45,7 +44,9 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.table.HoodieTable;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -202,11 +203,18 @@ private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata
/**
* Scan and list all partitions for cleaning.
* @return all partitions paths for the dataset.
* @throws IOException
*/
private List<String> getPartitionPathsForFullCleaning() {
// Go to brute force mode of scanning all partitions
return FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), config.getBasePath());
try {
// Because the partition of BaseTableMetadata has been deleted,
// all partition information can only be obtained from FileSystemBackedTableMetadata.
FileSystemBackedTableMetadata fsBackedTableMetadata = new FileSystemBackedTableMetadata(context,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codope : do we really need to go w/ FileSystemBackedMetadata here? this is causing performance issues for some cases when getPartitionPathsForFullCleaning is being used.
#6373

context.getHadoopConf(), config.getBasePath(), config.shouldAssumeDatePartitioning());
return fsBackedTableMetadata.getAllPartitionPaths();
} catch (IOException e) {
return Collections.emptyList();
}
}

/**
Expand Down Expand Up @@ -278,6 +286,9 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(S
* retain 10 commits, and commit batch time is 30 mins, then you have 5 hrs of lookback)
* <p>
* This policy is the default.
*
* @return A {@link Pair} whose left is boolean indicating whether partition itself needs to be deleted,
* and right is a list of {@link CleanFileInfo} about the files in the partition that needs to be deleted.
*/
private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) {
LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
Expand Down Expand Up @@ -466,7 +477,7 @@ public Option<HoodieInstant> getEarliestCommitToRetain() {

/**
* Determine if file slice needed to be preserved for pending compaction.
*
*
* @param fileSlice File Slice
* @return true if file slice needs to be preserved, false otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieNotSupportedException;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -161,4 +162,9 @@ protected void commit(String instantTime, Map<MetadataPartitionType, HoodieData<
// Update total size of the metadata and count of base/log files
metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata));
}

@Override
public void deletePartitions(String instantTime, List<MetadataPartitionType> partitions) {
throw new HoodieNotSupportedException("Dropping metadata index not supported for Flink metadata table yet.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -43,6 +46,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter {

Expand Down Expand Up @@ -177,4 +181,16 @@ protected void commit(String instantTime, Map<MetadataPartitionType, HoodieData<
// Update total size of the metadata and count of base/log files
metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata));
}

@Override
public void deletePartitions(String instantTime, List<MetadataPartitionType> partitions) {
List<String> partitionsToDrop = partitions.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList());
LOG.info("Deleting Metadata Table partitions: " + partitionsToDrop);

try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
String actionType = CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION, HoodieTableType.MERGE_ON_READ);
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
writeClient.startCommitWithTime(instantTime, actionType);
writeClient.deletePartitions(partitionsToDrop, instantTime);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,73 @@ private void testTableOperationsForMetaIndexImpl(final HoodieWriteConfig writeCo
testTableOperationsImpl(engineContext, writeConfig);
}

@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testMetadataTableDeletePartition(HoodieTableType tableType) throws IOException {
initPath();
int maxCommits = 1;
HoodieWriteConfig cfg = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
.build();
init(tableType);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);

try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, cfg)) {
// Write 1 (Bulk insert)
String newCommitTime = "0000001";
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);

// Write 2 (upserts)
newCommitTime = "0000002";
client.startCommitWithTime(newCommitTime);
validateMetadata(client);

records = dataGen.generateInserts(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);

// metadata writer to delete column_stats partition
HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
assertNotNull(metadataWriter, "MetadataWriter should have been initialized");
metadataWriter.deletePartitions("0000003", Arrays.asList(MetadataPartitionType.COLUMN_STATS));

HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, metadataMetaClient.getBasePath(), false, false);
// partition should be physically deleted
assertEquals(metadataWriter.getEnabledPartitionTypes().size(), metadataTablePartitions.size());
assertFalse(metadataTablePartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));

Option<HoodieInstant> completedReplaceInstant = metadataMetaClient.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant();
assertTrue(completedReplaceInstant.isPresent());
assertEquals("0000003", completedReplaceInstant.get().getTimestamp());

final Map<String, MetadataPartitionType> metadataEnabledPartitionTypes = new HashMap<>();
codope marked this conversation as resolved.
Show resolved Hide resolved
metadataWriter.getEnabledPartitionTypes().forEach(e -> metadataEnabledPartitionTypes.put(e.getPartitionPath(), e));
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline());
metadataTablePartitions.forEach(partition -> {
List<FileSlice> latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList());
if (MetadataPartitionType.COLUMN_STATS.getPartitionPath().equals(partition)) {
// there should not be any file slice in column_stats partition
assertTrue(latestSlices.isEmpty());
} else {
assertFalse(latestSlices.isEmpty());
assertTrue(latestSlices.stream().map(FileSlice::getBaseFile).count()
<= metadataEnabledPartitionTypes.get(partition).getFileGroupCount(), "Should have a single latest base file per file group");
assertTrue(latestSlices.size()
<= metadataEnabledPartitionTypes.get(partition).getFileGroupCount(), "Should have a single latest file slice per file group");
}
});
}
}

/**
* Tests that virtual key configs are honored in base files after compaction in metadata table.
*
Expand Down Expand Up @@ -1760,7 +1827,6 @@ public void testDeletePartitions() throws Exception {
newCommitTime = HoodieActiveTimeline.createNewInstantTime(5000);
client.startCommitWithTime(newCommitTime);
client.deletePartitions(singletonList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH), newCommitTime);
validateMetadata(client);

// add 1 more commit
newCommitTime = HoodieActiveTimeline.createNewInstantTime(5000);
Expand All @@ -1775,7 +1841,7 @@ public void testDeletePartitions() throws Exception {
writeStatuses = client.upsert(jsc.parallelize(upsertRecords, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);

// trigger clean which will actually triggger deletion of the partition
// trigger clean which will actually trigger deletion of the partition
newCommitTime = HoodieActiveTimeline.createNewInstantTime(5000);
HoodieCleanMetadata cleanMetadata = client.clean(newCommitTime);
validateMetadata(client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@

package org.apache.hudi.metadata;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieMetadataBloomFilter;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.avro.model.HoodieMetadataFileInfo;
Expand All @@ -42,6 +35,14 @@
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.io.storage.HoodieHFileReader;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
Expand Down Expand Up @@ -239,6 +240,23 @@ public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List
return new HoodieAvroRecord<>(key, payload);
}

/**
* Create and return a {@code HoodieMetadataPayload} to save list of partitions.
*
* @param partitionsAdded The list of added partitions
* @param partitionsDeleted The list of deleted partitions
*/
public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List<String> partitionsAdded, List<String> partitionsDeleted) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
partitionsAdded.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, false)));
partitionsDeleted.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, true)));

HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.getPartitionPath());
HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
fileInfo);
return new HoodieAvroRecord<>(key, payload);
}

/**
* Create and return a {@code HoodieMetadataPayload} to save list of files within a partition.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@

package org.apache.hudi.metadata;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
Expand All @@ -38,7 +33,9 @@
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
Expand All @@ -58,10 +55,17 @@
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -195,9 +199,12 @@ public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCo
List<HoodieRecord> records = new ArrayList<>(commitMetadata.getPartitionToWriteStats().size());

// Add record bearing added partitions list
ArrayList<String> partitionsAdded = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet());
List<String> partitionsAdded = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet());

// Add record bearing deleted partitions list
List<String> partitionsDeleted = getPartitionsDeleted(commitMetadata);

records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded));
records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded, partitionsDeleted));

// Update files listing records for each individual partition
List<HoodieRecord<HoodieMetadataPayload>> updatedPartitionFilesRecords =
Expand Down Expand Up @@ -247,6 +254,18 @@ public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCo
return records;
}

private static ArrayList<String> getPartitionsDeleted(HoodieCommitMetadata commitMetadata) {
if (commitMetadata instanceof HoodieReplaceCommitMetadata
&& WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) {
Map<String, List<String>> partitionToReplaceFileIds =
((HoodieReplaceCommitMetadata) commitMetadata).getPartitionToReplaceFileIds();
if (!partitionToReplaceFileIds.isEmpty()) {
return new ArrayList<>(partitionToReplaceFileIds.keySet());
}
}
return new ArrayList<>();
}

/**
* Convert commit action metadata to bloom filter records.
*
Expand Down Expand Up @@ -371,7 +390,7 @@ public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCl
records.add(HoodieMetadataPayload.createPartitionListRecord(deletedPartitions, true));
}
LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size()
+ ", #files_deleted=" + fileDeleteCount[0]);
+ ", #files_deleted=" + fileDeleteCount[0] + ", #partitions_deleted=" + deletedPartitions.size());
return records;
}

Expand Down
Loading