Skip to content

Commit

Permalink
[HUDI-3743] Support DELETE_PARTITION for metadata table
Browse files Browse the repository at this point in the history
Minor checkstyle fix
  • Loading branch information
codope committed Mar 29, 2022
1 parent 1b2fb71 commit 25b886d
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;

/**
* Interface that supports updating metadata for a given table, as actions complete.
Expand Down Expand Up @@ -60,4 +62,12 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable {
*/
void update(HoodieRollbackMetadata rollbackMetadata, String instantTime);

/**
* Drop the given metadata indexes. This path reuses DELETE_PARTITION operation.
*
* @param instantTime - instant time when replacecommit corresponding to the drop will be recorded in the metadata timeline
* @param indexesToDrop - list of {@link MetadataPartitionType} to drop
* @throws IOException
*/
void dropIndex(String instantTime, List<MetadataPartitionType> indexesToDrop);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieNotSupportedException;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -155,4 +156,9 @@ protected void commit(String instantTime, Map<MetadataPartitionType, HoodieData<
// Update total size of the metadata and count of base/log files
metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata));
}

@Override
public void dropIndex(String instantTime, List<MetadataPartitionType> indexesToDrop) {
throw new HoodieNotSupportedException("Dropping metadata index not supported for Flink metadata table yet.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -43,6 +46,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter {

Expand Down Expand Up @@ -177,4 +181,16 @@ protected void commit(String instantTime, Map<MetadataPartitionType, HoodieData<
// Update total size of the metadata and count of base/log files
metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata));
}

@Override
public void dropIndex(String instantTime, List<MetadataPartitionType> indexesToDrop) {
List<String> partitionsToDrop = indexesToDrop.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList());
LOG.warn("Deleting Metadata Table partitions: " + partitionsToDrop);

try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
String actionType = CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION, HoodieTableType.MERGE_ON_READ);
writeClient.startCommitWithTime(instantTime, actionType);
writeClient.deletePartitions(partitionsToDrop, instantTime);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,79 @@ private void testTableOperationsForMetaIndexImpl(final HoodieWriteConfig writeCo
testTableOperationsImpl(engineContext, writeConfig);
}

@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testMetadataTableDeletePartition(HoodieTableType tableType) throws IOException {
initPath();
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
.withIndexConfig(HoodieIndexConfig.newBuilder()
.bloomIndexBucketizedChecking(false)
.build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(true)
.withMetadataIndexBloomFilter(true)
.withMetadataIndexBloomFilterFileGroups(4)
.withMetadataIndexColumnStats(true)
.withMetadataIndexBloomFilterFileGroups(2)
.withMetadataIndexForAllColumns(true)
.build())
.build();
init(tableType, writeConfig);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);

try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
// Write 1 (Bulk insert)
String newCommitTime = "0000001";
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);

// Write 2 (upserts)
newCommitTime = "0000002";
client.startCommitWithTime(newCommitTime);
validateMetadata(client);

records = dataGen.generateInserts(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);

// metadata writer to delete column_stats partition
HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
assertNotNull(metadataWriter, "MetadataWriter should have been initialized");
metadataWriter.dropIndex("0000003", Arrays.asList(MetadataPartitionType.COLUMN_STATS));

HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, metadataMetaClient.getBasePath(), false, false);
// partition should still be physically present
assertEquals(metadataWriter.getEnabledPartitionTypes().size(), metadataTablePartitions.size());
assertTrue(metadataTablePartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));

Option<HoodieInstant> completedReplaceInstant = metadataMetaClient.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant();
assertTrue(completedReplaceInstant.isPresent());
assertEquals("0000003", completedReplaceInstant.get().getTimestamp());

final Map<String, MetadataPartitionType> metadataEnabledPartitionTypes = new HashMap<>();
metadataWriter.getEnabledPartitionTypes().forEach(e -> metadataEnabledPartitionTypes.put(e.getPartitionPath(), e));
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline());
metadataTablePartitions.forEach(partition -> {
List<FileSlice> latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList());
if (MetadataPartitionType.COLUMN_STATS.getPartitionPath().equals(partition)) {
// there should not be any file slice in column_stats partition
assertTrue(latestSlices.isEmpty());
} else {
assertFalse(latestSlices.isEmpty());
assertTrue(latestSlices.stream().map(FileSlice::getBaseFile).count()
<= metadataEnabledPartitionTypes.get(partition).getFileGroupCount(), "Should have a single latest base file per file group");
assertTrue(latestSlices.size()
<= metadataEnabledPartitionTypes.get(partition).getFileGroupCount(), "Should have a single latest file slice per file group");
}
});
}
}

/**
* Tests that virtual key configs are honored in base files after compaction in metadata table.
*
Expand Down

0 comments on commit 25b886d

Please sign in to comment.