Skip to content

Commit

Permalink
[HUDI-3404] Automatically adjust write configs based on metadata tabl…
Browse files Browse the repository at this point in the history
…e and write concurrency mode (apache#4975)
  • Loading branch information
yihua authored and stayrascal committed Apr 2, 2022
1 parent 00b2e45 commit 4512e96
Show file tree
Hide file tree
Showing 2 changed files with 272 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
Expand Down Expand Up @@ -66,6 +67,8 @@
import org.apache.hudi.table.storage.HoodieStorageLayout;

import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.orc.CompressionKind;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

Expand Down Expand Up @@ -93,6 +96,7 @@
+ "higher level frameworks (e.g Spark datasources, Flink sink) and utilities (e.g DeltaStreamer).")
public class HoodieWriteConfig extends HoodieConfig {

private static final Logger LOG = LogManager.getLogger(HoodieWriteConfig.class);
private static final long serialVersionUID = 0L;

// This is a constant as is should never be changed via config (will invalidate previous commits)
Expand Down Expand Up @@ -903,6 +907,11 @@ public String getTableName() {
return getString(TBL_NAME);
}

public HoodieTableType getTableType() {
return HoodieTableType.valueOf(getStringOrDefault(
HoodieTableConfig.TYPE, HoodieTableConfig.TYPE.defaultValue().name()).toUpperCase());
}

public String getPreCombineField() {
return getString(PRECOMBINE_FIELD_NAME);
}
Expand Down Expand Up @@ -1930,7 +1939,9 @@ public Boolean areAnyTableServicesExecutedInline() {
* @return True if any table services are configured to run async, false otherwise.
*/
public Boolean areAnyTableServicesAsync() {
return isAsyncClusteringEnabled() || !inlineCompactionEnabled() || isAsyncClean() || isAsyncArchive();
return isAsyncClusteringEnabled()
|| (getTableType() == HoodieTableType.MERGE_ON_READ && !inlineCompactionEnabled())
|| isAsyncClean() || isAsyncArchive();
}

public Boolean areAnyTableServicesScheduledInline() {
Expand Down Expand Up @@ -2390,19 +2401,56 @@ protected void setDefaults() {
HoodieLayoutConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(TimelineLayoutVersion.CURR_VERSION));

// Async table services can update the metadata table and a lock provider is
// needed to guard against any concurrent table write operations. If user has
// not configured any lock provider, let's use the InProcess lock provider.
autoAdjustConfigsForConcurrencyMode();
}

private void autoAdjustConfigsForConcurrencyMode() {
boolean isMetadataTableEnabled = writeConfig.getBoolean(HoodieMetadataConfig.ENABLE);
final TypedProperties writeConfigProperties = writeConfig.getProps();
final boolean isLockProviderPropertySet = writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME)
|| writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_PROP);

if (!isLockConfigSet) {
HoodieLockConfig.Builder lockConfigBuilder = HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps());
if (!isLockProviderPropertySet && writeConfig.areAnyTableServicesAsync()) {
lockConfigBuilder.withLockProvider(InProcessLockProvider.class);
}
writeConfig.setDefault(lockConfigBuilder.build());
}

if (isMetadataTableEnabled) {
// When metadata table is enabled, optimistic concurrency control must be used for
// single writer with async table services.
// Async table services can update the metadata table and a lock provider is
// needed to guard against any concurrent table write operations. If user has
// not configured any lock provider, let's use the InProcess lock provider.
boolean areTableServicesEnabled = writeConfig.areTableServicesEnabled();
boolean areAsyncTableServicesEnabled = writeConfig.areAnyTableServicesAsync();

if (!isLockProviderPropertySet && areTableServicesEnabled && areAsyncTableServicesEnabled) {
// This is targeted at Single writer with async table services
// If user does not set the lock provider, likely that the concurrency mode is not set either
// Override the configs for metadata table
writeConfig.setValue(WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
InProcessLockProvider.class.getName());
LOG.info(String.format("Automatically set %s=%s and %s=%s since user has not set the "
+ "lock provider for single writer with async table services",
WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value(),
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName()));
}
}

// We check if "hoodie.cleaner.policy.failed.writes"
// is properly set to LAZY for optimistic concurrency control
String writeConcurrencyMode = writeConfig.getString(WRITE_CONCURRENCY_MODE);
if (WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value()
.equalsIgnoreCase(writeConcurrencyMode)) {
// In this case, we assume that the user takes care of setting the lock provider used
writeConfig.setValue(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.LAZY.name());
LOG.info(String.format("Automatically set %s=%s since optimistic concurrency control is used",
HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.LAZY.name()));
}
}

private void validate() {
Expand All @@ -2411,9 +2459,9 @@ private void validate() {
new TimelineLayoutVersion(Integer.parseInt(layoutVersion));
Objects.requireNonNull(writeConfig.getString(BASE_PATH));
if (writeConfig.getString(WRITE_CONCURRENCY_MODE)
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())) {
ValidationUtils.checkArgument(writeConfig.getString(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY)
!= HoodieFailedWritesCleaningPolicy.EAGER.name(), "To enable optimistic concurrency control, set hoodie.cleaner.policy.failed.writes=LAZY");
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value())) {
ValidationUtils.checkArgument(!writeConfig.getString(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY)
.equals(HoodieFailedWritesCleaningPolicy.EAGER.name()), "To enable optimistic concurrency control, set hoodie.cleaner.policy.failed.writes=LAZY");
}
}

Expand Down
Loading

0 comments on commit 4512e96

Please sign in to comment.