Skip to content

Commit

Permalink
[HUDI-5393] Remove the reuse of metadata table writer for flink write…
Browse files Browse the repository at this point in the history
… client (apache#7466)
  • Loading branch information
danny0405 authored and fengjian committed Apr 5, 2023
1 parent 9aba2ff commit 2bd7514
Showing 1 changed file with 10 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,6 @@ public class HoodieFlinkWriteClient<T> extends
*/
private final Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles;

/**
* Cached metadata writer for coordinator to reuse for each commit.
*/
private HoodieBackedTableMetadataWriter metadataWriter;

public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance());
this.bucketToHandles = new HashMap<>();
Expand Down Expand Up @@ -283,30 +278,19 @@ public void preWrite(String instantTime, WriteOperationType writeOperationType,

@Override
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
if (this.metadataWriter == null) {
initMetadataWriter();
}
// refresh the timeline

// Note: the data meta client is not refreshed currently, some code path
// relies on the meta client for resolving the latest data schema,
// the schema expects to be immutable for SQL jobs but may be not for non-SQL
// jobs.
this.metadataWriter.initTableMetadata();
this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType, instantTime));
try {
this.metadataWriter.close();
try (HoodieBackedTableMetadataWriter metadataWriter = initMetadataWriter()) {
metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType, instantTime));
} catch (Exception e) {
throw new HoodieException("Failed to close metadata writer ", e);
throw new HoodieException("Failed to update metadata", e);
}
}

/**
* Initialize the table metadata writer, for e.g, bootstrap the metadata table
* from the filesystem if it does not exist.
*/
public void initMetadataWriter() {
this.metadataWriter = (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create(
private HoodieBackedTableMetadataWriter initMetadataWriter() {
return (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create(
FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT);
}

Expand All @@ -317,7 +301,11 @@ public void initMetadataTable() {
HoodieFlinkTable<?> table = getHoodieTable();
if (config.isMetadataTableEnabled()) {
// initialize the metadata table path
initMetadataWriter();
try (HoodieBackedTableMetadataWriter metadataWriter = initMetadataWriter()) {
// do nothing
} catch (Exception e) {
throw new HoodieException("Failed to initialize metadata table", e);
}
// clean the obsolete index stats
table.deleteMetadataIndexIfNecessary();
} else {
Expand Down

0 comments on commit 2bd7514

Please sign in to comment.