Skip to content

Commit

Permalink
[HUDI-4621] Add validation that bucket index fields should be subset …
Browse files Browse the repository at this point in the history
…of primary keys (apache#6396)

* check bucket index fields

Co-authored-by: 吴文池 <wuwenchi@deepexi.com>
  • Loading branch information
2 people authored and fengjian committed Apr 5, 2023
1 parent 320b15b commit 805e769
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.ValidationUtils.checkArgument;

Expand Down Expand Up @@ -207,10 +208,21 @@ private static void setupHoodieKeyOptions(Configuration conf, CatalogTable table
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", partitionKeys));
}
// set index key for bucket index if not defined
if (conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name())
&& conf.getString(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) {
conf.setString(FlinkOptions.INDEX_KEY_FIELD, conf.getString(FlinkOptions.RECORD_KEY_FIELD));
if (conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name())) {
if (conf.getString(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) {
conf.setString(FlinkOptions.INDEX_KEY_FIELD, conf.getString(FlinkOptions.RECORD_KEY_FIELD));
} else {
Set<String> recordKeySet =
Arrays.stream(conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")).collect(Collectors.toSet());
Set<String> indexKeySet =
Arrays.stream(conf.getString(FlinkOptions.INDEX_KEY_FIELD).split(",")).collect(Collectors.toSet());
if (!recordKeySet.containsAll(indexKeySet)) {
throw new HoodieValidationException(
FlinkOptions.INDEX_KEY_FIELD + " should be a subset of or equal to the recordKey fields");
}
}
}

// tweak the key gen class if possible
final String[] partitions = conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",");
final String[] pks = conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,47 @@ void testSetupReadOptionsForSource() {
assertThat(conf2.getString(FlinkOptions.QUERY_TYPE), is(FlinkOptions.QUERY_TYPE_INCREMENTAL));
}

@Test
void testBucketIndexOptionForSink() {
ResolvedSchema schema1 = SchemaBuilder.instance()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20).notNull())
.field("f2", DataTypes.TIMESTAMP(3))
.primaryKey("f0", "f1")
.build();

this.conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());

// default use recordKey fields
final MockContext context = MockContext.getInstance(this.conf, schema1, "f2");
HoodieTableSink tableSink = (HoodieTableSink) (new HoodieTableFactory().createDynamicTableSink(context));
final Configuration conf = tableSink.getConf();
assertThat(conf.getString(FlinkOptions.INDEX_KEY_FIELD), is("f0,f1"));

this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f0");
final MockContext context2 = MockContext.getInstance(this.conf, schema1, "f2");
HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(context2);
final Configuration conf2 = tableSink2.getConf();
assertThat(conf2.getString(FlinkOptions.INDEX_KEY_FIELD), is("f0"));

this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f1");
final MockContext context3 = MockContext.getInstance(this.conf, schema1, "f2");
HoodieTableSink tableSink3 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(context3);
final Configuration conf3 = tableSink3.getConf();
assertThat(conf3.getString(FlinkOptions.INDEX_KEY_FIELD), is("f1"));

this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f0,f1");
final MockContext context4 = MockContext.getInstance(this.conf, schema1, "f2");
HoodieTableSink tableSink4 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(context4);
final Configuration conf4 = tableSink4.getConf();
assertThat(conf4.getString(FlinkOptions.INDEX_KEY_FIELD), is("f0,f1"));

// index key field is not a subset of or equal to the recordKey fields, will throw exception
this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f2");
final MockContext context5 = MockContext.getInstance(this.conf, schema1, "f2");
assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(context5));
}

@Test
void testInferAvroSchemaForSink() {
// infer the schema if not specified
Expand Down

0 comments on commit 805e769

Please sign in to comment.