From 27124664c10c076848727e949964fb0214012d7e Mon Sep 17 00:00:00 2001 From: "yuzhao.cyz" Date: Mon, 23 May 2022 15:02:30 +0800 Subject: [PATCH] [HUDI-4138] Fix the concurrency modification of hoodie table config for flink * Remove the metadata cleaning strategy for flink, that means the multi-modal index may be affected * Improve the HoodieTable#clearMetadataTablePartitionsConfig to only update table config when necessary * Remove the modification of read code path in HoodieTableConfig --- .../org/apache/hudi/table/HoodieTable.java | 14 +++++------ .../hudi/client/HoodieFlinkWriteClient.java | 4 +--- .../apache/hudi/table/HoodieFlinkTable.java | 4 ---- .../hudi/common/table/HoodieTableConfig.java | 23 +++++-------------- 4 files changed, 14 insertions(+), 31 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 807865dae2416..cf124593c009f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -885,24 +885,24 @@ private boolean shouldExecuteMetadataTableDeletion() { // partitions are ready to use return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath()) && !config.isMetadataTableEnabled() - && (!metaClient.getTableConfig().contains(TABLE_METADATA_PARTITIONS) - || !metaClient.getTableConfig().getMetadataPartitions().isEmpty()); + && !metaClient.getTableConfig().getMetadataPartitions().isEmpty(); } /** * Clears hoodie.table.metadata.partitions in hoodie.properties */ private void clearMetadataTablePartitionsConfig(Option partitionType, boolean clearAll) { - if (clearAll) { + Set partitions = getCompletedMetadataPartitions(metaClient.getTableConfig()); + if (clearAll && partitions.size() > 0) { LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties"); metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), EMPTY_STRING); HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps()); return; } - Set completedPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig()); - completedPartitions.remove(partitionType.get().getPartitionPath()); - metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions)); - HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps()); + if (partitions.remove(partitionType.get().getPartitionPath())) { + metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", partitions)); + HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps()); + } } public HoodieTableMetadata getMetadataTable() { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 2d23c3afb7f14..49fa2ec246cf9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -53,7 +53,6 @@ import org.apache.hudi.io.MiniBatchHandle; import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; -import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; @@ -365,8 +364,7 @@ public void completeCompaction( // commit to data table after committing to metadata table. // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - table.getMetadataWriter(compactionInstant.getTimestamp()).ifPresent( - w -> ((HoodieTableMetadataWriter) w).update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction()))); + writeTableMetadata(table, compactionCommitTime, compactionInstant.getAction(), metadata); LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); } finally { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index f1e43b9d30d42..6eae15e7e1aff 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -105,13 +105,9 @@ protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext con public Option getMetadataWriter(String triggeringInstantTimestamp, Option actionMetadata) { if (config.isMetadataTableEnabled()) { - // even with metadata enabled, some index could have been disabled - // delete metadata partitions corresponding to such indexes - deleteMetadataIndexIfNecessary(); return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context, actionMetadata, Option.of(triggeringInstantTimestamp))); } else { - maybeDeleteMetadataTable(); return Option.empty(); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index edc6caa5bcbdf..e6a21aca4a450 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -245,10 +245,6 @@ public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName if (contains(PAYLOAD_CLASS_NAME) && payloadClassName != null && !getString(PAYLOAD_CLASS_NAME).equals(payloadClassName)) { setValue(PAYLOAD_CLASS_NAME, payloadClassName); - // FIXME(vc): wonder if this can be removed. Need to look into history. - try (FSDataOutputStream outputStream = fs.create(propertyPath)) { - storeProperties(props, outputStream); - } } } catch (IOException e) { throw new HoodieIOException("Could not load Hoodie properties from " + propertyPath, e); @@ -272,8 +268,8 @@ private static Properties getOrderedPropertiesWithTableChecksum(Properties props * @throws IOException */ private static String storeProperties(Properties props, FSDataOutputStream outputStream) throws IOException { - String checksum; - if (props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props)) { + final String checksum; + if (isValidChecksum(props)) { checksum = props.getProperty(TABLE_CHECKSUM.key()); props.store(outputStream, "Updated at " + Instant.now()); } else { @@ -285,8 +281,8 @@ private static String storeProperties(Properties props, FSDataOutputStream outpu return checksum; } - private boolean isValidChecksum() { - return contains(TABLE_CHECKSUM) && validateChecksum(props); + private static boolean isValidChecksum(Properties props) { + return props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props); } /** @@ -298,20 +294,13 @@ public HoodieTableConfig() { private void fetchConfigs(FileSystem fs, String metaPath) throws IOException { Path cfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE); - Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP); try (FSDataInputStream is = fs.open(cfgPath)) { props.load(is); - // validate checksum for latest table version - if (getTableVersion().versionCode() >= HoodieTableVersion.FOUR.versionCode() && !isValidChecksum()) { - LOG.warn("Checksum validation failed. Falling back to backed up configs."); - try (FSDataInputStream fsDataInputStream = fs.open(backupCfgPath)) { - props.load(fsDataInputStream); - } - } } catch (IOException ioe) { if (!fs.exists(cfgPath)) { LOG.warn("Run `table recover-configs` if config update/delete failed midway. Falling back to backed up configs."); // try the backup. this way no query ever fails if update fails midway. + Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP); try (FSDataInputStream is = fs.open(backupCfgPath)) { props.load(is); } @@ -631,7 +620,7 @@ public List getMetadataPartitions() { CONFIG_VALUES_DELIMITER ); } - + /** * Returns the format to use for partition meta files. */