Skip to content

Commit

Permalink
[HUDI-4138] Fix the concurrency modification of hoodie table config f…
Browse files Browse the repository at this point in the history
…or flink

* Remove the metadata cleaning strategy for flink, that means the multi-modal index may be affected
* Improve the HoodieTable#clearMetadataTablePartitionsConfig to only update table config when necessary
* Remove the modification of read code path in HoodieTableConfig
  • Loading branch information
danny0405 committed May 23, 2022
1 parent 3ef137d commit 2712466
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -885,24 +885,24 @@ private boolean shouldExecuteMetadataTableDeletion() {
// partitions are ready to use
return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
&& !config.isMetadataTableEnabled()
&& (!metaClient.getTableConfig().contains(TABLE_METADATA_PARTITIONS)
|| !metaClient.getTableConfig().getMetadataPartitions().isEmpty());
&& !metaClient.getTableConfig().getMetadataPartitions().isEmpty();
}

/**
* Clears hoodie.table.metadata.partitions in hoodie.properties
*/
private void clearMetadataTablePartitionsConfig(Option<MetadataPartitionType> partitionType, boolean clearAll) {
if (clearAll) {
Set<String> partitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
if (clearAll && partitions.size() > 0) {
LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), EMPTY_STRING);
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
return;
}
Set<String> completedPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
completedPartitions.remove(partitionType.get().getPartitionPath());
metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions));
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
if (partitions.remove(partitionType.get().getPartitionPath())) {
metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", partitions));
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
}
}

public HoodieTableMetadata getMetadataTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.hudi.io.MiniBatchHandle;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -365,8 +364,7 @@ public void completeCompaction(
// commit to data table after committing to metadata table.
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter(compactionInstant.getTimestamp()).ifPresent(
w -> ((HoodieTableMetadataWriter) w).update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction())));
writeTableMetadata(table, compactionCommitTime, compactionInstant.getAction(), metadata);
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,9 @@ protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext con
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
Option<T> actionMetadata) {
if (config.isMetadataTableEnabled()) {
// even with metadata enabled, some index could have been disabled
// delete metadata partitions corresponding to such indexes
deleteMetadataIndexIfNecessary();
return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
context, actionMetadata, Option.of(triggeringInstantTimestamp)));
} else {
maybeDeleteMetadataTable();
return Option.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,6 @@ public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName
if (contains(PAYLOAD_CLASS_NAME) && payloadClassName != null
&& !getString(PAYLOAD_CLASS_NAME).equals(payloadClassName)) {
setValue(PAYLOAD_CLASS_NAME, payloadClassName);
// FIXME(vc): wonder if this can be removed. Need to look into history.
try (FSDataOutputStream outputStream = fs.create(propertyPath)) {
storeProperties(props, outputStream);
}
}
} catch (IOException e) {
throw new HoodieIOException("Could not load Hoodie properties from " + propertyPath, e);
Expand All @@ -272,8 +268,8 @@ private static Properties getOrderedPropertiesWithTableChecksum(Properties props
* @throws IOException
*/
private static String storeProperties(Properties props, FSDataOutputStream outputStream) throws IOException {
String checksum;
if (props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props)) {
final String checksum;
if (isValidChecksum(props)) {
checksum = props.getProperty(TABLE_CHECKSUM.key());
props.store(outputStream, "Updated at " + Instant.now());
} else {
Expand All @@ -285,8 +281,8 @@ private static String storeProperties(Properties props, FSDataOutputStream outpu
return checksum;
}

private boolean isValidChecksum() {
return contains(TABLE_CHECKSUM) && validateChecksum(props);
private static boolean isValidChecksum(Properties props) {
return props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props);
}

/**
Expand All @@ -298,20 +294,13 @@ public HoodieTableConfig() {

private void fetchConfigs(FileSystem fs, String metaPath) throws IOException {
Path cfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP);
try (FSDataInputStream is = fs.open(cfgPath)) {
props.load(is);
// validate checksum for latest table version
if (getTableVersion().versionCode() >= HoodieTableVersion.FOUR.versionCode() && !isValidChecksum()) {
LOG.warn("Checksum validation failed. Falling back to backed up configs.");
try (FSDataInputStream fsDataInputStream = fs.open(backupCfgPath)) {
props.load(fsDataInputStream);
}
}
} catch (IOException ioe) {
if (!fs.exists(cfgPath)) {
LOG.warn("Run `table recover-configs` if config update/delete failed midway. Falling back to backed up configs.");
// try the backup. this way no query ever fails if update fails midway.
Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP);
try (FSDataInputStream is = fs.open(backupCfgPath)) {
props.load(is);
}
Expand Down Expand Up @@ -631,7 +620,7 @@ public List<String> getMetadataPartitions() {
CONFIG_VALUES_DELIMITER
);
}

/**
* Returns the format to use for partition meta files.
*/
Expand Down

0 comments on commit 2712466

Please sign in to comment.