Skip to content

Commit

Permalink
[HUDI-5414] No need to guard the table initialization by lock for Hoo…
Browse files Browse the repository at this point in the history
…dieFlinkWriteClient (#7509)

Different with other write clients, HoodieFlinkWriteClient invokes the dataset writing methods(#upsert or #insert)
for each batch of new data set in the long running task. In current impl, a engine-specific hoodie table would be created before performing
these actions, and before the table creation, some table bootstrapping operations are performed(such as table upgrade/downgrade, the metadata table
bootstrap). These bootstrapping operations are guarded by a trasanction lock.

In Flink, these bootstrapping operations can be avoided because they are all performed only once on the coordinator.

The changes:

- Make BaseHoodieWriteClient#doInitTable non abstract, it now only performs the bootstrapping operations
- Add a default impl BaseHoodieWriteClient#initMetadataTable for metadata table bootstrap specifically
- Add a new abstract method for creating engine-specific hoodie table
  • Loading branch information
danny0405 authored Dec 20, 2022
1 parent 4fd3ff3 commit fd62a14
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom

protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf);

protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient);

void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
if (writeTimer != null) {
long durationInMs = metrics.getDurationInMs(writeTimer.stop());
Expand Down Expand Up @@ -1530,17 +1532,38 @@ public HoodieMetrics getMetrics() {
}

/**
* Instantiates engine-specific instance of {@link HoodieTable} as well as performs necessary
* bootstrapping operations (for ex, validating whether Metadata Table has to be bootstrapped)
* Performs necessary bootstrapping operations (for ex, validating whether Metadata Table has to be bootstrapped).
*
* NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY OPERATIONS
* NOT REQUIRING EXTERNAL SYNCHRONIZATION
* <p>NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY OPERATIONS
* NOT REQUIRING EXTERNAL SYNCHRONIZATION
*
* @param metaClient instance of {@link HoodieTableMetaClient}
* @param instantTime current inflight instant time
* @return instantiated {@link HoodieTable}
*/
protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary);
protected void doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
Option<HoodieInstant> ownerInstant = Option.empty();
if (instantTime.isPresent()) {
ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get()));
}
this.txnManager.beginTransaction(ownerInstant, Option.empty());
try {
tryUpgrade(metaClient, instantTime);
if (initialMetadataTableIfNecessary) {
initMetadataTable(instantTime);
}
} finally {
this.txnManager.endTransaction(ownerInstant);
}
}

/**
* Bootstrap the metadata table.
*
* @param instantTime current inflight instant time
*/
protected void initMetadataTable(Option<String> instantTime) {
// by default do nothing.
}

/**
* Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping
Expand All @@ -1562,18 +1585,8 @@ protected final HoodieTable initTable(WriteOperationType operationType, Option<S
setWriteSchemaForDeletes(metaClient);
}

HoodieTable table;
Option<HoodieInstant> ownerInstant = Option.empty();
if (instantTime.isPresent()) {
ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get()));
}
this.txnManager.beginTransaction(ownerInstant, Option.empty());
try {
tryUpgrade(metaClient, instantTime);
table = doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
} finally {
this.txnManager.endTransaction(ownerInstant);
}
doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
HoodieTable table = createTable(config, hadoopConf, metaClient);

// Validate table properties
metaClient.validateTableProperties(config.getProps());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoop
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
}

@Override
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) {
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context, metaClient);
}

@Override
public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
Expand Down Expand Up @@ -301,10 +306,14 @@ public void initMetadataTable() {
HoodieFlinkTable<?> table = getHoodieTable();
if (config.isMetadataTableEnabled()) {
// initialize the metadata table path
try (HoodieBackedTableMetadataWriter metadataWriter = initMetadataWriter()) {
// do nothing
// guard the metadata writer with concurrent lock
try {
this.txnManager.getLockManager().lock();
initMetadataWriter().close();
} catch (Exception e) {
throw new HoodieException("Failed to initialize metadata table", e);
} finally {
this.txnManager.getLockManager().unlock();
}
// clean the obsolete index stats
table.deleteMetadataIndexIfNecessary();
Expand Down Expand Up @@ -488,16 +497,13 @@ private void completeClustering(
}

@Override
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// Create a Hoodie table which encapsulated the commits and files visible
return getHoodieTable();
}

@Override
protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
protected void doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// do nothing.

// flink executes the upgrade/downgrade once when initializing the first instant on start up,
// no need to execute the upgrade/downgrade on each write in streaming.

// flink performs metadata table bootstrap on the coordinator when it starts up.
}

public void completeTableService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ public static <T> HoodieFlinkTable<T> create(HoodieWriteConfig config, HoodieFli
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
.setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
if (config.getSchemaEvolutionEnable()) {
setLatestInternalSchema(config, metaClient);
}
return HoodieFlinkTable.create(config, context, metaClient);
}

public static <T> HoodieFlinkTable<T> create(HoodieWriteConfig config,
HoodieFlinkEngineContext context,
HoodieTableMetaClient metaClient) {
if (config.getSchemaEvolutionEnable()) {
setLatestInternalSchema(config, metaClient);
}
final HoodieFlinkTable<T> hoodieFlinkTable;
switch (metaClient.getTableType()) {
case COPY_ON_WRITE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoop
return HoodieJavaTable.create(config, context);
}

@Override
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) {
return HoodieJavaTable.create(config, context, metaClient);
}

@Override
public List<WriteStatus> upsert(List<HoodieRecord<T>> records,
String instantTime) {
Expand Down Expand Up @@ -230,13 +235,4 @@ protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstan
public HoodieWriteMetadata<List<WriteStatus>> cluster(final String clusteringInstant, final boolean shouldComplete) {
throw new HoodieNotSupportedException("Cluster is not supported in HoodieJavaClient");
}

@Override
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime);

// Create a Hoodie table which encapsulated the commits and files visible
return HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, metaClient);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public static <T> HoodieJavaTable<T> create(HoodieWriteConfig config, HoodieEngi
}

public static <T> HoodieJavaTable<T> create(HoodieWriteConfig config,
HoodieJavaEngineContext context,
HoodieTableMetaClient metaClient) {
HoodieEngineContext context,
HoodieTableMetaClient metaClient) {
switch (metaClient.getTableType()) {
case COPY_ON_WRITE:
return new HoodieJavaCopyOnWriteTable<>(config, context, metaClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoop
return HoodieSparkTable.create(config, context);
}

@Override
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) {
return HoodieSparkTable.create(config, context, metaClient);
}

@Override
public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
Expand Down Expand Up @@ -480,16 +485,11 @@ private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitM
}

@Override
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
if (initialMetadataTableIfNecessary) {
// Initialize Metadata Table to make sure it's bootstrapped _before_ the operation,
// if it didn't exist before
// See https://issues.apache.org/jira/browse/HUDI-3343 for more details
initializeMetadataTable(instantTime);
}

// Create a Hoodie table which encapsulated the commits and files visible
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
protected void initMetadataTable(Option<String> instantTime) {
// Initialize Metadata Table to make sure it's bootstrapped _before_ the operation,
// if it didn't exist before
// See https://issues.apache.org/jira/browse/HUDI-3343 for more details
initializeMetadataTable(instantTime);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hudi.table;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
Expand Down Expand Up @@ -59,11 +58,11 @@ public static <T> HoodieSparkTable<T> create(HoodieWriteConfig config, HoodieEng
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
.setFileSystemRetryConfig(config.getFileSystemRetryConfig())
.setProperties(config.getProps()).build();
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
return HoodieSparkTable.create(config, context, metaClient);
}

public static <T> HoodieSparkTable<T> create(HoodieWriteConfig config,
HoodieSparkEngineContext context,
HoodieEngineContext context,
HoodieTableMetaClient metaClient) {
HoodieSparkTable<T> hoodieSparkTable;
switch (metaClient.getTableType()) {
Expand Down

0 comments on commit fd62a14

Please sign in to comment.