From fd62a1413e74de686935672aec812aacd5c43a63 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Tue, 20 Dec 2022 19:21:01 +0800 Subject: [PATCH] [HUDI-5414] No need to guard the table initialization by lock for HoodieFlinkWriteClient (#7509) Different with other write clients, HoodieFlinkWriteClient invokes the dataset writing methods(#upsert or #insert) for each batch of new data set in the long running task. In current impl, a engine-specific hoodie table would be created before performing these actions, and before the table creation, some table bootstrapping operations are performed(such as table upgrade/downgrade, the metadata table bootstrap). These bootstrapping operations are guarded by a trasanction lock. In Flink, these bootstrapping operations can be avoided because they are all performed only once on the coordinator. The changes: - Make BaseHoodieWriteClient#doInitTable non abstract, it now only performs the bootstrapping operations - Add a default impl BaseHoodieWriteClient#initMetadataTable for metadata table bootstrap specifically - Add a new abstract method for creating engine-specific hoodie table --- .../hudi/client/BaseHoodieWriteClient.java | 49 ++++++++++++------- .../hudi/client/HoodieFlinkWriteClient.java | 24 +++++---- .../apache/hudi/table/HoodieFlinkTable.java | 6 +-- .../hudi/client/HoodieJavaWriteClient.java | 14 ++---- .../apache/hudi/table/HoodieJavaTable.java | 4 +- .../hudi/client/SparkRDDWriteClient.java | 20 ++++---- .../apache/hudi/table/HoodieSparkTable.java | 5 +- 7 files changed, 68 insertions(+), 54 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 48382e0870a3..5d23918e1a3a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -331,6 +331,8 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf); + protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient); + void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) { if (writeTimer != null) { long durationInMs = metrics.getDurationInMs(writeTimer.stop()); @@ -1530,17 +1532,38 @@ public HoodieMetrics getMetrics() { } /** - * Instantiates engine-specific instance of {@link HoodieTable} as well as performs necessary - * bootstrapping operations (for ex, validating whether Metadata Table has to be bootstrapped) + * Performs necessary bootstrapping operations (for ex, validating whether Metadata Table has to be bootstrapped). * - * NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY OPERATIONS - * NOT REQUIRING EXTERNAL SYNCHRONIZATION + *

NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY OPERATIONS + * NOT REQUIRING EXTERNAL SYNCHRONIZATION * * @param metaClient instance of {@link HoodieTableMetaClient} * @param instantTime current inflight instant time - * @return instantiated {@link HoodieTable} */ - protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option instantTime, boolean initialMetadataTableIfNecessary); + protected void doInitTable(HoodieTableMetaClient metaClient, Option instantTime, boolean initialMetadataTableIfNecessary) { + Option ownerInstant = Option.empty(); + if (instantTime.isPresent()) { + ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get())); + } + this.txnManager.beginTransaction(ownerInstant, Option.empty()); + try { + tryUpgrade(metaClient, instantTime); + if (initialMetadataTableIfNecessary) { + initMetadataTable(instantTime); + } + } finally { + this.txnManager.endTransaction(ownerInstant); + } + } + + /** + * Bootstrap the metadata table. + * + * @param instantTime current inflight instant time + */ + protected void initMetadataTable(Option instantTime) { + // by default do nothing. + } /** * Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping @@ -1562,18 +1585,8 @@ protected final HoodieTable initTable(WriteOperationType operationType, Option ownerInstant = Option.empty(); - if (instantTime.isPresent()) { - ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get())); - } - this.txnManager.beginTransaction(ownerInstant, Option.empty()); - try { - tryUpgrade(metaClient, instantTime); - table = doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary); - } finally { - this.txnManager.endTransaction(ownerInstant); - } + doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary); + HoodieTable table = createTable(config, hadoopConf, metaClient); // Validate table properties metaClient.validateTableProperties(config.getProps()); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 3e551f7a17fe..5bf948de368b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -128,6 +128,11 @@ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoop return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); } + @Override + protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) { + return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context, metaClient); + } + @Override public List> filterExists(List> hoodieRecords) { // Create a Hoodie table which encapsulated the commits and files visible @@ -301,10 +306,14 @@ public void initMetadataTable() { HoodieFlinkTable table = getHoodieTable(); if (config.isMetadataTableEnabled()) { // initialize the metadata table path - try (HoodieBackedTableMetadataWriter metadataWriter = initMetadataWriter()) { - // do nothing + // guard the metadata writer with concurrent lock + try { + this.txnManager.getLockManager().lock(); + initMetadataWriter().close(); } catch (Exception e) { throw new HoodieException("Failed to initialize metadata table", e); + } finally { + this.txnManager.getLockManager().unlock(); } // clean the obsolete index stats table.deleteMetadataIndexIfNecessary(); @@ -488,16 +497,13 @@ private void completeClustering( } @Override - protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option instantTime, boolean initialMetadataTableIfNecessary) { - // Create a Hoodie table which encapsulated the commits and files visible - return getHoodieTable(); - } - - @Override - protected void tryUpgrade(HoodieTableMetaClient metaClient, Option instantTime) { + protected void doInitTable(HoodieTableMetaClient metaClient, Option instantTime, boolean initialMetadataTableIfNecessary) { // do nothing. + // flink executes the upgrade/downgrade once when initializing the first instant on start up, // no need to execute the upgrade/downgrade on each write in streaming. + + // flink performs metadata table bootstrap on the coordinator when it starts up. } public void completeTableService( diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index 3302b70214e9..3d77844df6f8 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -59,15 +59,15 @@ public static HoodieFlinkTable create(HoodieWriteConfig config, HoodieFli .setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))) .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build(); - if (config.getSchemaEvolutionEnable()) { - setLatestInternalSchema(config, metaClient); - } return HoodieFlinkTable.create(config, context, metaClient); } public static HoodieFlinkTable create(HoodieWriteConfig config, HoodieFlinkEngineContext context, HoodieTableMetaClient metaClient) { + if (config.getSchemaEvolutionEnable()) { + setLatestInternalSchema(config, metaClient); + } final HoodieFlinkTable hoodieFlinkTable; switch (metaClient.getTableType()) { case COPY_ON_WRITE: diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index 041d994623c4..ed1b400a7574 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -94,6 +94,11 @@ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoop return HoodieJavaTable.create(config, context); } + @Override + protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) { + return HoodieJavaTable.create(config, context, metaClient); + } + @Override public List upsert(List> records, String instantTime) { @@ -230,13 +235,4 @@ protected HoodieWriteMetadata> compact(String compactionInstan public HoodieWriteMetadata> cluster(final String clusteringInstant, final boolean shouldComplete) { throw new HoodieNotSupportedException("Cluster is not supported in HoodieJavaClient"); } - - @Override - protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option instantTime, boolean initialMetadataTableIfNecessary) { - // new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime); - - // Create a Hoodie table which encapsulated the commits and files visible - return HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, metaClient); - } - } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java index 7b25f389cf80..66e4795ba421 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java @@ -50,8 +50,8 @@ public static HoodieJavaTable create(HoodieWriteConfig config, HoodieEngi } public static HoodieJavaTable create(HoodieWriteConfig config, - HoodieJavaEngineContext context, - HoodieTableMetaClient metaClient) { + HoodieEngineContext context, + HoodieTableMetaClient metaClient) { switch (metaClient.getTableType()) { case COPY_ON_WRITE: return new HoodieJavaCopyOnWriteTable<>(config, context, metaClient); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index df2741e80642..4ddcd13e4f42 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -120,6 +120,11 @@ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoop return HoodieSparkTable.create(config, context); } + @Override + protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) { + return HoodieSparkTable.create(config, context, metaClient); + } + @Override public JavaRDD> filterExists(JavaRDD> hoodieRecords) { // Create a Hoodie table which encapsulated the commits and files visible @@ -480,16 +485,11 @@ private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitM } @Override - protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option instantTime, boolean initialMetadataTableIfNecessary) { - if (initialMetadataTableIfNecessary) { - // Initialize Metadata Table to make sure it's bootstrapped _before_ the operation, - // if it didn't exist before - // See https://issues.apache.org/jira/browse/HUDI-3343 for more details - initializeMetadataTable(instantTime); - } - - // Create a Hoodie table which encapsulated the commits and files visible - return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient); + protected void initMetadataTable(Option instantTime) { + // Initialize Metadata Table to make sure it's bootstrapped _before_ the operation, + // if it didn't exist before + // See https://issues.apache.org/jira/browse/HUDI-3343 for more details + initializeMetadataTable(instantTime); } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index 789f044cf52f..7c4c909ae71b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -19,7 +19,6 @@ package org.apache.hudi.table; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; @@ -59,11 +58,11 @@ public static HoodieSparkTable create(HoodieWriteConfig config, HoodieEng .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))) .setFileSystemRetryConfig(config.getFileSystemRetryConfig()) .setProperties(config.getProps()).build(); - return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient); + return HoodieSparkTable.create(config, context, metaClient); } public static HoodieSparkTable create(HoodieWriteConfig config, - HoodieSparkEngineContext context, + HoodieEngineContext context, HoodieTableMetaClient metaClient) { HoodieSparkTable hoodieSparkTable; switch (metaClient.getTableType()) {