Skip to content

Commit

Permalink
[HUDI-4071] Remove default value for mandatory record key field
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Nov 2, 2022
1 parent 6fea6fc commit 99699df
Show file tree
Hide file tree
Showing 17 changed files with 59 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -677,10 +677,10 @@ private void validateBucketIndexConfig() {
// check the bucket index hash field
if (StringUtils.isNullOrEmpty(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD))) {
hoodieIndexConfig.setValue(BUCKET_INDEX_HASH_FIELD,
hoodieIndexConfig.getStringOrDefault(KeyGeneratorOptions.RECORDKEY_FIELD_NAME));
hoodieIndexConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME));
} else {
boolean valid = Arrays
.stream(hoodieIndexConfig.getStringOrDefault(KeyGeneratorOptions.RECORDKEY_FIELD_NAME).split(","))
.stream(hoodieIndexConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME).split(","))
.collect(Collectors.toSet())
.containsAll(Arrays.asList(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD).split(",")));
if (!valid) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.config.HoodieWriteConfig.Builder;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -374,8 +375,10 @@ public void testAutoConcurrencyConfigAdjustmentWithMetadataTableDisabled(HoodieT

@Test
public void testConsistentBucketIndexDefaultClusteringConfig() {
Properties props = new Properties();
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET)
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET)
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING).build())
.build();
assertEquals(HoodieClusteringConfig.SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY, writeConfig.getClusteringPlanStrategyClass());
Expand All @@ -384,7 +387,9 @@ public void testConsistentBucketIndexDefaultClusteringConfig() {

@Test
public void testConsistentBucketIndexInvalidClusteringConfig() {
TypedProperties consistentBucketIndexProps = HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET)
Properties props = new Properties();
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
TypedProperties consistentBucketIndexProps = HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET)
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING).build().getProps();
HoodieWriteConfig.Builder writeConfigBuilder = HoodieWriteConfig.newBuilder().withPath("/tmp");

Expand All @@ -400,14 +405,16 @@ public void testConsistentBucketIndexInvalidClusteringConfig() {

@Test
public void testSimpleBucketIndexPartitionerConfig() {
Properties props = new Properties();
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET)
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET)
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE).build())
.build();
assertEquals(HoodieLayoutConfig.SIMPLE_BUCKET_LAYOUT_PARTITIONER_CLASS_NAME, writeConfig.getString(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME));

HoodieWriteConfig overwritePartitioner = HoodieWriteConfig.newBuilder().withPath("/tmp")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET)
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET)
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE)
.build())
.withLayoutConfig(HoodieLayoutConfig.newBuilder().withLayoutPartitioner("org.apache.hudi.table.action.commit.UpsertPartitioner").build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;
Expand All @@ -43,6 +44,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -70,8 +72,10 @@ public void tearDown() throws IOException {
public void testBuildSplitClusteringGroup() throws IOException {
setup();
int maxFileSize = 5120;
Properties props = new Properties();
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET)
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET)
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING)
.withBucketMaxNum(6)
.withBucketNum("4").build())
Expand Down Expand Up @@ -110,8 +114,10 @@ public void testBuildSplitClusteringGroup() throws IOException {
public void testBuildMergeClusteringGroup() throws Exception {
setup();
int maxFileSize = 5120;
Properties props = new Properties();
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET)
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET)
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING)
.withBucketMinNum(4)
.withBucketNum("4").build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex;
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex;
import org.apache.hudi.index.simple.HoodieSimpleIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -39,6 +40,7 @@
import org.junit.jupiter.params.provider.EnumSource;

import java.nio.file.Path;
import java.util.Properties;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -88,13 +90,15 @@ public void testCreateIndex(IndexType indexType) {
assertTrue(SparkHoodieIndexFactory.createIndex(config) instanceof SparkHoodieHBaseIndex);
break;
case BUCKET:
Properties props = new Properties();
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
config = clientConfigBuilder.withPath(basePath)
.withIndexConfig(indexConfigBuilder.withIndexType(IndexType.BUCKET)
.withIndexConfig(indexConfigBuilder.fromProperties(props).withIndexType(IndexType.BUCKET)
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE).build()).build();
assertTrue(SparkHoodieIndexFactory.createIndex(config) instanceof HoodieSimpleBucketIndex);

config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withIndexConfig(indexConfigBuilder.withIndexType(IndexType.BUCKET)
.withIndexConfig(indexConfigBuilder.fromProperties(props).withIndexType(IndexType.BUCKET)
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING).build())
.build();
assertTrue(SparkHoodieIndexFactory.createIndex(config) instanceof HoodieSparkConsistentBucketIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void testBucketIndexValidityCheck() {
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE)
.withBucketNum("8").build();
});
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
props.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key(), "uuid");
HoodieIndexConfig.newBuilder().fromProperties(props)
.withIndexType(HoodieIndex.IndexType.BUCKET)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,17 @@ private Properties makeIndexConfig(HoodieIndex.IndexType indexType) {
Properties props = new Properties();
HoodieIndexConfig.Builder indexConfig = HoodieIndexConfig.newBuilder()
.withIndexType(indexType);
props.putAll(indexConfig.build().getProps());
if (indexType.equals(HoodieIndex.IndexType.BUCKET)) {
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
indexConfig.fromProperties(props)
.withIndexKeyField("_row_key")
.withBucketNum("1")
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE);
props.putAll(indexConfig.build().getProps());
props.putAll(HoodieLayoutConfig.newBuilder().fromProperties(props)
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build().getProps());
}
props.putAll(indexConfig.build().getProps());
return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void setup() {
@ParameterizedTest
@MethodSource("writePayloadTest")
public void testWriteDuringCompaction(String payloadClass) throws IOException {
Properties props = getPropertiesForKeyGen(true);
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
.withPath(basePath())
Expand All @@ -107,10 +108,8 @@ public void testWriteDuringCompaction(String payloadClass) throws IOException {
.withLayoutConfig(HoodieLayoutConfig.newBuilder()
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
.build();

Properties props = getPropertiesForKeyGen(true);
props.putAll(config.getProps());

metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
Expand Down Expand Up @@ -139,6 +138,7 @@ public void testWriteDuringCompaction(String payloadClass) throws IOException {
@ParameterizedTest
@MethodSource("writeLogTest")
public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean enableTimelineServer) throws IOException {
Properties props = getPropertiesForKeyGen(true);
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
.withPath(basePath())
Expand All @@ -152,10 +152,8 @@ public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean en
.withLayoutConfig(HoodieLayoutConfig.newBuilder()
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
.build();

Properties props = getPropertiesForKeyGen(true);
props.putAll(config.getProps());

metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.Objects;

/**
* ConfigProperty describes a configuration property. It contains the configuration
Expand Down Expand Up @@ -76,7 +76,7 @@ public String key() {

public T defaultValue() {
if (defaultValue == null) {
throw new HoodieException("There's no default value for this config");
throw new HoodieException(String.format("There's no default value for this config: %s", key));
}
return defaultValue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class KeyGeneratorOptions extends HoodieConfig {

public static final ConfigProperty<String> RECORDKEY_FIELD_NAME = ConfigProperty
.key("hoodie.datasource.write.recordkey.field")
.defaultValue("uuid")
.noDefaultValue()
.withDocumentation("Record key field. Value to be used as the `recordKey` component of `HoodieKey`.\n"
+ "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using\n"
+ "the dot notation eg: `a.b.c`");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public static Dataset<Row> delete(SparkSession spark, String tablePath, String t
df.write().format("org.apache.hudi")
.options(QuickstartUtils.getQuickstartWriteConfigs())
.option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts")
.option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "uuid")
.option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid")
.option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath")
.option(TBL_NAME.key(), tableName)
.option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.examples.common.HoodieExampleDataGenerator;
import org.apache.hudi.testutils.providers.SparkProvider;

import org.apache.spark.SparkConf;
Expand All @@ -37,15 +35,7 @@
import java.io.File;
import java.nio.file.Paths;

import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.delete;
import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.deleteByPartition;
import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.incrementalQuery;
import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.insertData;
import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.insertOverwriteData;
import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.pointInTimeQuery;
import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.queryData;
import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.runQuickstart;
import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.updateData;

public class TestHoodieSparkQuickstart implements SparkProvider {
protected static HoodieSparkEngineContext context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,9 +612,6 @@ object DataSourceWriteOptions {
val RECORDKEY_FIELD_OPT_KEY = KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()
/** @deprecated Use {@link RECORDKEY_FIELD} and its methods instead */
@Deprecated
val DEFAULT_RECORDKEY_FIELD_OPT_VAL = RECORDKEY_FIELD.defaultValue()
/** @deprecated Use {@link PARTITIONPATH_FIELD} and its methods instead */
@Deprecated
val PARTITIONPATH_FIELD_OPT_KEY = KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()
/** @deprecated Use {@link PARTITIONPATH_FIELD} and its methods instead */
@Deprecated
Expand Down Expand Up @@ -794,7 +791,7 @@ object DataSourceOptionsHelper {

def inferKeyGenClazz(props: TypedProperties): String = {
val partitionFields = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), null)
val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue())
val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key())
inferKeyGenClazz(recordsKeyFields, partitionFields)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ object HoodieWriterUtils {
hoodieConfig.setDefaultValue(TABLE_TYPE)
hoodieConfig.setDefaultValue(PRECOMBINE_FIELD)
hoodieConfig.setDefaultValue(PAYLOAD_CLASS_NAME)
hoodieConfig.setDefaultValue(RECORDKEY_FIELD)
hoodieConfig.setDefaultValue(KEYGENERATOR_CLASS_NAME)
hoodieConfig.setDefaultValue(ENABLE)
hoodieConfig.setDefaultValue(COMMIT_METADATA_KEYPREFIX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ object HoodieOptionConfig {
.withSqlKey("primaryKey")
.withHoodieKey(DataSourceWriteOptions.RECORDKEY_FIELD.key)
.withTableConfigKey(HoodieTableConfig.RECORDKEY_FIELDS.key)
.defaultValue(DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue())
.build()

val SQL_KEY_TABLE_TYPE: HoodieSQLOption[String] = buildConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test
class TestDataSourceOptions {
@Test def inferDataSourceOptions(): Unit = {
val inputOptions1 = Map(
RECORDKEY_FIELD.key -> "uuid",
TABLE_NAME.key -> "hudi_table",
PARTITIONPATH_FIELD.key -> "year,month"
)
Expand All @@ -38,6 +39,7 @@ class TestDataSourceOptions {
modifiedOptions1(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key))

val inputOptions2 = Map(
RECORDKEY_FIELD.key -> "uuid",
TABLE_NAME.key -> "hudi_table",
PARTITIONPATH_FIELD.key -> "year",
HIVE_STYLE_PARTITIONING.key -> "true"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,14 +261,22 @@ class TestHoodieSparkSqlWriter {
@Test
def testThrowExceptionAlreadyExistsWithAppendSaveMode(): Unit = {
//create a new table
val fooTableModifier = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
"hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4")
val fooTableModifier = Map(
"path" -> tempBasePath,
HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
"hoodie.datasource.write.recordkey.field" -> "uuid",
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4")
val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, dataFrame)

//on same path try append with different("hoodie_bar_tbl") table name which should throw an exception
val barTableModifier = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> "hoodie_bar_tbl",
"hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4")
val barTableModifier = Map(
"path" -> tempBasePath,
HoodieWriteConfig.TBL_NAME.key -> "hoodie_bar_tbl",
"hoodie.datasource.write.recordkey.field" -> "uuid",
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4")
val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableModifier, dataFrame2))
assert(tableAlreadyExistException.getMessage.contains("Config conflict"))
Expand Down
Loading

0 comments on commit 99699df

Please sign in to comment.