diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 4202cbda72765..b2e8b999be513 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.HoodieTableConfig; @@ -66,6 +67,8 @@ import org.apache.hudi.table.storage.HoodieStorageLayout; import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.orc.CompressionKind; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -93,6 +96,7 @@ + "higher level frameworks (e.g Spark datasources, Flink sink) and utilities (e.g DeltaStreamer).") public class HoodieWriteConfig extends HoodieConfig { + private static final Logger LOG = LogManager.getLogger(HoodieWriteConfig.class); private static final long serialVersionUID = 0L; // This is a constant as is should never be changed via config (will invalidate previous commits) @@ -903,6 +907,11 @@ public String getTableName() { return getString(TBL_NAME); } + public HoodieTableType getTableType() { + return HoodieTableType.valueOf(getStringOrDefault( + HoodieTableConfig.TYPE, HoodieTableConfig.TYPE.defaultValue().name()).toUpperCase()); + } + public String getPreCombineField() { return getString(PRECOMBINE_FIELD_NAME); } @@ -1930,7 +1939,9 @@ public Boolean areAnyTableServicesExecutedInline() { * @return True if any table services are configured to run async, false otherwise. */ public Boolean areAnyTableServicesAsync() { - return isAsyncClusteringEnabled() || !inlineCompactionEnabled() || isAsyncClean() || isAsyncArchive(); + return isAsyncClusteringEnabled() + || (getTableType() == HoodieTableType.MERGE_ON_READ && !inlineCompactionEnabled()) + || isAsyncClean() || isAsyncArchive(); } public Boolean areAnyTableServicesScheduledInline() { @@ -2390,19 +2401,56 @@ protected void setDefaults() { HoodieLayoutConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(TimelineLayoutVersion.CURR_VERSION)); - // Async table services can update the metadata table and a lock provider is - // needed to guard against any concurrent table write operations. If user has - // not configured any lock provider, let's use the InProcess lock provider. + autoAdjustConfigsForConcurrencyMode(); + } + + private void autoAdjustConfigsForConcurrencyMode() { + boolean isMetadataTableEnabled = writeConfig.getBoolean(HoodieMetadataConfig.ENABLE); final TypedProperties writeConfigProperties = writeConfig.getProps(); final boolean isLockProviderPropertySet = writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME) || writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_PROP); + if (!isLockConfigSet) { HoodieLockConfig.Builder lockConfigBuilder = HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()); - if (!isLockProviderPropertySet && writeConfig.areAnyTableServicesAsync()) { - lockConfigBuilder.withLockProvider(InProcessLockProvider.class); - } writeConfig.setDefault(lockConfigBuilder.build()); } + + if (isMetadataTableEnabled) { + // When metadata table is enabled, optimistic concurrency control must be used for + // single writer with async table services. + // Async table services can update the metadata table and a lock provider is + // needed to guard against any concurrent table write operations. If user has + // not configured any lock provider, let's use the InProcess lock provider. + boolean areTableServicesEnabled = writeConfig.areTableServicesEnabled(); + boolean areAsyncTableServicesEnabled = writeConfig.areAnyTableServicesAsync(); + + if (!isLockProviderPropertySet && areTableServicesEnabled && areAsyncTableServicesEnabled) { + // This is targeted at Single writer with async table services + // If user does not set the lock provider, likely that the concurrency mode is not set either + // Override the configs for metadata table + writeConfig.setValue(WRITE_CONCURRENCY_MODE.key(), + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value()); + writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), + InProcessLockProvider.class.getName()); + LOG.info(String.format("Automatically set %s=%s and %s=%s since user has not set the " + + "lock provider for single writer with async table services", + WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value(), + HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName())); + } + } + + // We check if "hoodie.cleaner.policy.failed.writes" + // is properly set to LAZY for optimistic concurrency control + String writeConcurrencyMode = writeConfig.getString(WRITE_CONCURRENCY_MODE); + if (WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value() + .equalsIgnoreCase(writeConcurrencyMode)) { + // In this case, we assume that the user takes care of setting the lock provider used + writeConfig.setValue(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(), + HoodieFailedWritesCleaningPolicy.LAZY.name()); + LOG.info(String.format("Automatically set %s=%s since optimistic concurrency control is used", + HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(), + HoodieFailedWritesCleaningPolicy.LAZY.name())); + } } private void validate() { @@ -2411,9 +2459,9 @@ private void validate() { new TimelineLayoutVersion(Integer.parseInt(layoutVersion)); Objects.requireNonNull(writeConfig.getString(BASE_PATH)); if (writeConfig.getString(WRITE_CONCURRENCY_MODE) - .equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())) { - ValidationUtils.checkArgument(writeConfig.getString(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY) - != HoodieFailedWritesCleaningPolicy.EAGER.name(), "To enable optimistic concurrency control, set hoodie.cleaner.policy.failed.writes=LAZY"); + .equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value())) { + ValidationUtils.checkArgument(!writeConfig.getString(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY) + .equals(HoodieFailedWritesCleaningPolicy.EAGER.name()), "To enable optimistic concurrency control, set hoodie.cleaner.policy.failed.writes=LAZY"); } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java index 2c3ae98c6e6be..778bef7324bde 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java @@ -21,20 +21,26 @@ import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteConcurrencyMode; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.marker.MarkerType; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig.Builder; import org.apache.hudi.index.HoodieIndex; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -44,9 +50,11 @@ import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE; import static org.apache.hudi.config.HoodieCompactionConfig.ASYNC_CLEAN; import static org.apache.hudi.config.HoodieCompactionConfig.AUTO_CLEAN; +import static org.apache.hudi.config.HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY; import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT; +import static org.apache.hudi.config.HoodieWriteConfig.TABLE_SERVICES_ENABLED; +import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieWriteConfig { @@ -114,89 +122,195 @@ public void testDefaultMarkersTypeAccordingToEngineType() { EngineType.JAVA, MarkerType.DIRECT)); } - @Test - public void testDefaultLockProviderWhenAsyncServicesEnabled() { + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testAutoConcurrencyConfigAdjustmentWithTableServices(HoodieTableType tableType) { final String inProcessLockProviderClassName = InProcessLockProvider.class.getCanonicalName(); - - // Any async clustering enabled should use InProcess lock provider - // as default when no other lock provider is set. - + // With metadata table enabled by default, any async table service enabled should + // use InProcess lock provider as default when no other lock provider is set. // 1. Async clustering - HoodieWriteConfig writeConfig = createWriteConfig(new HashMap() { - { - put(ASYNC_CLUSTERING_ENABLE.key(), "true"); - put(INLINE_COMPACT.key(), "true"); - put(AUTO_CLEAN.key(), "true"); - put(ASYNC_CLEAN.key(), "false"); - } - }); - assertTrue(writeConfig.areAnyTableServicesAsync()); - assertEquals(inProcessLockProviderClassName, writeConfig.getLockProviderClass()); + verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap() { + { + put(HoodieTableConfig.TYPE.key(), tableType.name()); + put(ASYNC_CLUSTERING_ENABLE.key(), "true"); + put(INLINE_COMPACT.key(), "true"); + put(AUTO_CLEAN.key(), "true"); + put(ASYNC_CLEAN.key(), "false"); + } + }), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, + HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName); // 2. Async clean - writeConfig = createWriteConfig(new HashMap() { - { - put(ASYNC_CLUSTERING_ENABLE.key(), "false"); - put(INLINE_COMPACT.key(), "true"); - put(AUTO_CLEAN.key(), "true"); - put(ASYNC_CLEAN.key(), "true"); - } - }); - assertTrue(writeConfig.areAnyTableServicesAsync()); - assertEquals(inProcessLockProviderClassName, writeConfig.getLockProviderClass()); + verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap() { + { + put(HoodieTableConfig.TYPE.key(), tableType.name()); + put(ASYNC_CLUSTERING_ENABLE.key(), "false"); + put(INLINE_COMPACT.key(), "true"); + put(AUTO_CLEAN.key(), "true"); + put(ASYNC_CLEAN.key(), "true"); + } + }), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, + HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName); - // 3. Async compaction - writeConfig = createWriteConfig(new HashMap() { - { - put(ASYNC_CLUSTERING_ENABLE.key(), "false"); - put(INLINE_COMPACT.key(), "false"); - put(AUTO_CLEAN.key(), "true"); - put(ASYNC_CLEAN.key(), "false"); - } - }); - assertTrue(writeConfig.areAnyTableServicesAsync()); - assertEquals(inProcessLockProviderClassName, writeConfig.getLockProviderClass()); + // 3. Async compaction configured + verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap() { + { + put(HoodieTableConfig.TYPE.key(), tableType.name()); + put(ASYNC_CLUSTERING_ENABLE.key(), "false"); + put(INLINE_COMPACT.key(), "false"); + put(AUTO_CLEAN.key(), "true"); + put(ASYNC_CLEAN.key(), "false"); + } + }), true, + tableType == HoodieTableType.MERGE_ON_READ, + tableType == HoodieTableType.MERGE_ON_READ + ? WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL + : WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()), + tableType == HoodieTableType.MERGE_ON_READ + ? HoodieFailedWritesCleaningPolicy.LAZY + : HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()), + tableType == HoodieTableType.MERGE_ON_READ + ? inProcessLockProviderClassName + : HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue()); // 4. All inline services - writeConfig = createWriteConfig(new HashMap() { - { - put(ASYNC_CLUSTERING_ENABLE.key(), "false"); - put(INLINE_COMPACT.key(), "true"); - put(AUTO_CLEAN.key(), "true"); - put(ASYNC_CLEAN.key(), "false"); - } - }); - assertFalse(writeConfig.areAnyTableServicesAsync()); - assertTrue(writeConfig.areAnyTableServicesExecutedInline()); - assertEquals(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue(), writeConfig.getLockProviderClass()); + verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap() { + { + put(HoodieTableConfig.TYPE.key(), tableType.name()); + put(ASYNC_CLUSTERING_ENABLE.key(), "false"); + put(INLINE_COMPACT.key(), "true"); + put(AUTO_CLEAN.key(), "true"); + put(ASYNC_CLEAN.key(), "false"); + } + }), Option.of(true), Option.of(false), Option.of(true), + WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()), + HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()), + HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue()); + } - // 5. User override for the lock provider should always take the precedence - writeConfig = HoodieWriteConfig.newBuilder() + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testAutoConcurrencyConfigAdjustmentWithUserConfigs(HoodieTableType tableType) { + // 1. User override for the lock provider should always take the precedence + TypedProperties properties = new TypedProperties(); + properties.setProperty(HoodieTableConfig.TYPE.key(), tableType.name()); + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() .withPath("/tmp") .withLockConfig(HoodieLockConfig.newBuilder() .withLockProvider(FileSystemBasedLockProviderTestClass.class) .build()) - .build(); - assertEquals(FileSystemBasedLockProviderTestClass.class.getName(), writeConfig.getLockProviderClass()); - - // 6. User can set the lock provider via properties - TypedProperties properties = new TypedProperties(); - properties.setProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), ZookeeperBasedLockProvider.class.getName()); - writeConfig = HoodieWriteConfig.newBuilder() - .withPath("/tmp") .withProperties(properties) .build(); - assertEquals(ZookeeperBasedLockProvider.class.getName(), writeConfig.getLockProviderClass()); + verifyConcurrencyControlRelatedConfigs(writeConfig, + true, tableType == HoodieTableType.MERGE_ON_READ, + WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()), + HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()), + FileSystemBasedLockProviderTestClass.class.getName()); + + // 2. User can set the lock provider via properties + verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap() { + { + put(HoodieTableConfig.TYPE.key(), tableType.name()); + put(ASYNC_CLUSTERING_ENABLE.key(), "false"); + put(INLINE_COMPACT.key(), "true"); + put(AUTO_CLEAN.key(), "true"); + put(ASYNC_CLEAN.key(), "true"); + put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), + ZookeeperBasedLockProvider.class.getName()); + } + }), true, true, + WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()), + HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()), + ZookeeperBasedLockProvider.class.getName()); - // Default config should have default lock provider - writeConfig = createWriteConfig(Collections.emptyMap()); - if (!writeConfig.areAnyTableServicesAsync()) { - assertEquals(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue(), writeConfig.getLockProviderClass()); + // 3. Default config should have default lock provider + writeConfig = createWriteConfig(new HashMap() { + { + put(HoodieTableConfig.TYPE.key(), tableType.name()); + } + }); + if (writeConfig.areAnyTableServicesAsync()) { + verifyConcurrencyControlRelatedConfigs(writeConfig, + true, true, + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, + HoodieFailedWritesCleaningPolicy.LAZY, + InProcessLockProvider.class.getName()); } else { - assertEquals(inProcessLockProviderClassName, writeConfig.getLockProviderClass()); + verifyConcurrencyControlRelatedConfigs(writeConfig, + true, false, + WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()), + HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()), + HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue()); } } + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testAutoConcurrencyConfigAdjustmentWithNoTableService(HoodieTableType tableType) { + // 1. No table service, concurrency control configs should not be overwritten + verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap() { + { + put(HoodieTableConfig.TYPE.key(), tableType.name()); + put(TABLE_SERVICES_ENABLED.key(), "false"); + } + }), false, tableType == HoodieTableType.MERGE_ON_READ, + WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()), + HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()), + HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue()); + + // 2. No table service, with optimistic concurrency control, + // failed write clean policy should be updated accordingly + verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap() { + { + put(HoodieTableConfig.TYPE.key(), tableType.name()); + put(TABLE_SERVICES_ENABLED.key(), "false"); + put(WRITE_CONCURRENCY_MODE.key(), + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value()); + put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), + FileSystemBasedLockProviderTestClass.class.getName()); + } + }), false, tableType == HoodieTableType.MERGE_ON_READ, + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, + HoodieFailedWritesCleaningPolicy.LAZY, + FileSystemBasedLockProviderTestClass.class.getName()); + } + + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testAutoConcurrencyConfigAdjustmentWithMetadataTableDisabled(HoodieTableType tableType) { + // 1. Metadata table disabled, with async table services, concurrency control configs + // should not be changed + verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap() { + { + put(HoodieTableConfig.TYPE.key(), tableType.name()); + put(HoodieMetadataConfig.ENABLE.key(), "false"); + put(ASYNC_CLUSTERING_ENABLE.key(), "true"); + put(INLINE_COMPACT.key(), "true"); + put(AUTO_CLEAN.key(), "true"); + put(ASYNC_CLEAN.key(), "false"); + } + }), true, true, + WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()), + HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()), + HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue()); + + // 2. Metadata table disabled, with optimistic concurrency control, + // failed write clean policy should be updated accordingly + verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap() { + { + put(ASYNC_CLUSTERING_ENABLE.key(), "true"); + put(INLINE_COMPACT.key(), "true"); + put(AUTO_CLEAN.key(), "true"); + put(ASYNC_CLEAN.key(), "false"); + put(WRITE_CONCURRENCY_MODE.key(), + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value()); + put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), + FileSystemBasedLockProviderTestClass.class.getName()); + } + }), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, + HoodieFailedWritesCleaningPolicy.LAZY, FileSystemBasedLockProviderTestClass.class.getName()); + } + private HoodieWriteConfig createWriteConfig(Map configs) { final Properties properties = new Properties(); configs.forEach(properties::setProperty); @@ -253,4 +367,38 @@ private Map constructConfigMap( mapping.put(k3, v3); return mapping; } + + private void verifyConcurrencyControlRelatedConfigs( + HoodieWriteConfig writeConfig, boolean expectedTableServicesEnabled, + boolean expectedAnyTableServicesAsync, + WriteConcurrencyMode expectedConcurrencyMode, + HoodieFailedWritesCleaningPolicy expectedCleanPolicy, + String expectedLockProviderName) { + verifyConcurrencyControlRelatedConfigs(writeConfig, Option.of(expectedTableServicesEnabled), + Option.of(expectedAnyTableServicesAsync), Option.empty(), expectedConcurrencyMode, + expectedCleanPolicy, expectedLockProviderName); + } + + private void verifyConcurrencyControlRelatedConfigs( + HoodieWriteConfig writeConfig, Option expectedTableServicesEnabled, + Option expectedAnyTableServicesAsync, + Option expectedAnyTableServicesExecutedInline, + WriteConcurrencyMode expectedConcurrencyMode, + HoodieFailedWritesCleaningPolicy expectedCleanPolicy, + String expectedLockProviderName) { + if (expectedTableServicesEnabled.isPresent()) { + assertEquals(expectedTableServicesEnabled.get(), writeConfig.areTableServicesEnabled()); + } + if (expectedAnyTableServicesAsync.isPresent()) { + assertEquals(expectedAnyTableServicesAsync.get(), writeConfig.areAnyTableServicesAsync()); + } + if (expectedAnyTableServicesExecutedInline.isPresent()) { + assertEquals(expectedAnyTableServicesExecutedInline.get(), + writeConfig.areAnyTableServicesExecutedInline()); + } + + assertEquals(expectedConcurrencyMode, writeConfig.getWriteConcurrencyMode()); + assertEquals(expectedCleanPolicy, writeConfig.getFailedWritesCleanPolicy()); + assertEquals(expectedLockProviderName, writeConfig.getLockProviderClass()); + } }