From 2bd751427c759d676959c969083ab3f6311741e8 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Fri, 16 Dec 2022 10:27:56 +0800 Subject: [PATCH] [HUDI-5393] Remove the reuse of metadata table writer for flink write client (#7466) --- .../hudi/client/HoodieFlinkWriteClient.java | 32 ++++++------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 278e85fbedd70..3e551f7a17fe7 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -96,11 +96,6 @@ public class HoodieFlinkWriteClient extends */ private final Map> bucketToHandles; - /** - * Cached metadata writer for coordinator to reuse for each commit. - */ - private HoodieBackedTableMetadataWriter metadataWriter; - public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) { super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance()); this.bucketToHandles = new HashMap<>(); @@ -283,21 +278,10 @@ public void preWrite(String instantTime, WriteOperationType writeOperationType, @Override protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) { - if (this.metadataWriter == null) { - initMetadataWriter(); - } - // refresh the timeline - - // Note: the data meta client is not refreshed currently, some code path - // relies on the meta client for resolving the latest data schema, - // the schema expects to be immutable for SQL jobs but may be not for non-SQL - // jobs. - this.metadataWriter.initTableMetadata(); - this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType, instantTime)); - try { - this.metadataWriter.close(); + try (HoodieBackedTableMetadataWriter metadataWriter = initMetadataWriter()) { + metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType, instantTime)); } catch (Exception e) { - throw new HoodieException("Failed to close metadata writer ", e); + throw new HoodieException("Failed to update metadata", e); } } @@ -305,8 +289,8 @@ protected void writeTableMetadata(HoodieTable table, String instantTime, String * Initialize the table metadata writer, for e.g, bootstrap the metadata table * from the filesystem if it does not exist. */ - public void initMetadataWriter() { - this.metadataWriter = (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create( + private HoodieBackedTableMetadataWriter initMetadataWriter() { + return (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create( FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT); } @@ -317,7 +301,11 @@ public void initMetadataTable() { HoodieFlinkTable table = getHoodieTable(); if (config.isMetadataTableEnabled()) { // initialize the metadata table path - initMetadataWriter(); + try (HoodieBackedTableMetadataWriter metadataWriter = initMetadataWriter()) { + // do nothing + } catch (Exception e) { + throw new HoodieException("Failed to initialize metadata table", e); + } // clean the obsolete index stats table.deleteMetadataIndexIfNecessary(); } else {