diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 41e0abd6baf10..51bbb2dc87019 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -54,6 +54,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; @@ -207,10 +208,21 @@ private static void setupHoodieKeyOptions(Configuration conf, CatalogTable table conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", partitionKeys)); } // set index key for bucket index if not defined - if (conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name()) - && conf.getString(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) { - conf.setString(FlinkOptions.INDEX_KEY_FIELD, conf.getString(FlinkOptions.RECORD_KEY_FIELD)); + if (conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name())) { + if (conf.getString(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) { + conf.setString(FlinkOptions.INDEX_KEY_FIELD, conf.getString(FlinkOptions.RECORD_KEY_FIELD)); + } else { + Set recordKeySet = + Arrays.stream(conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")).collect(Collectors.toSet()); + Set indexKeySet = + Arrays.stream(conf.getString(FlinkOptions.INDEX_KEY_FIELD).split(",")).collect(Collectors.toSet()); + if (!recordKeySet.containsAll(indexKeySet)) { + throw new HoodieValidationException( + FlinkOptions.INDEX_KEY_FIELD + " should be a subset of or equal to the recordKey fields"); + } + } } + // tweak the key gen class if possible final String[] partitions = conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(","); final String[] pks = conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index 1fca92567a553..f27ab4ca53890 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -307,6 +307,47 @@ void testSetupReadOptionsForSource() { assertThat(conf2.getString(FlinkOptions.QUERY_TYPE), is(FlinkOptions.QUERY_TYPE_INCREMENTAL)); } + @Test + void testBucketIndexOptionForSink() { + ResolvedSchema schema1 = SchemaBuilder.instance() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20).notNull()) + .field("f2", DataTypes.TIMESTAMP(3)) + .primaryKey("f0", "f1") + .build(); + + this.conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name()); + + // default use recordKey fields + final MockContext context = MockContext.getInstance(this.conf, schema1, "f2"); + HoodieTableSink tableSink = (HoodieTableSink) (new HoodieTableFactory().createDynamicTableSink(context)); + final Configuration conf = tableSink.getConf(); + assertThat(conf.getString(FlinkOptions.INDEX_KEY_FIELD), is("f0,f1")); + + this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f0"); + final MockContext context2 = MockContext.getInstance(this.conf, schema1, "f2"); + HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(context2); + final Configuration conf2 = tableSink2.getConf(); + assertThat(conf2.getString(FlinkOptions.INDEX_KEY_FIELD), is("f0")); + + this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f1"); + final MockContext context3 = MockContext.getInstance(this.conf, schema1, "f2"); + HoodieTableSink tableSink3 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(context3); + final Configuration conf3 = tableSink3.getConf(); + assertThat(conf3.getString(FlinkOptions.INDEX_KEY_FIELD), is("f1")); + + this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f0,f1"); + final MockContext context4 = MockContext.getInstance(this.conf, schema1, "f2"); + HoodieTableSink tableSink4 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(context4); + final Configuration conf4 = tableSink4.getConf(); + assertThat(conf4.getString(FlinkOptions.INDEX_KEY_FIELD), is("f0,f1")); + + // index key field is not a subset of or equal to the recordKey fields, will throw exception + this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f2"); + final MockContext context5 = MockContext.getInstance(this.conf, schema1, "f2"); + assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(context5)); + } + @Test void testInferAvroSchemaForSink() { // infer the schema if not specified