Skip to content

Commit

Permalink
[HUDI-4529] Tweak some default config options for flink (apache#6287)
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 authored Aug 17, 2022
1 parent 042241f commit 1ad0e95
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
Expand Down Expand Up @@ -287,7 +287,7 @@ private FlinkOptions() {
public static final ConfigOption<String> PAYLOAD_CLASS_NAME = ConfigOptions
.key("write.payload.class")
.stringType()
.defaultValue(OverwriteWithLatestAvroPayload.class.getName())
.defaultValue(EventTimeAvroPayload.class.getName())
.withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n"
+ "This will render any value set for the option in-effective");

Expand Down Expand Up @@ -718,7 +718,7 @@ private FlinkOptions() {
public static final ConfigOption<String> HIVE_SYNC_MODE = ConfigOptions
.key("hive_sync.mode")
.stringType()
.defaultValue("jdbc")
.defaultValue("hms")
.withDescription("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql, default 'jdbc'");

public static final ConfigOption<String> HIVE_SYNC_USERNAME = ConfigOptions
Expand Down Expand Up @@ -754,7 +754,7 @@ private FlinkOptions() {
public static final ConfigOption<String> HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME = ConfigOptions
.key("hive_sync.partition_extractor_class")
.stringType()
.defaultValue(SlashEncodedDayPartitionValueExtractor.class.getCanonicalName())
.defaultValue(MultiPartKeysValueExtractor.class.getName())
.withDescription("Tool to extract the partition value from HDFS path, "
+ "default 'SlashEncodedDayPartitionValueExtractor'");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.util.FlinkStateBackendConverter;
import org.apache.hudi.util.StreamerUtil;
Expand Down Expand Up @@ -321,8 +321,8 @@ public class FlinkStreamerConfig extends Configuration {
public String hiveSyncPartitionFields = "";

@Parameter(names = {"--hive-sync-partition-extractor-class"}, description = "Tool to extract the partition value from HDFS path, "
+ "default 'SlashEncodedDayPartitionValueExtractor'")
public String hiveSyncPartitionExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getCanonicalName();
+ "default 'MultiPartKeysValueExtractor'")
public String hiveSyncPartitionExtractorClass = MultiPartKeysValueExtractor.class.getCanonicalName();

@Parameter(names = {"--hive-sync-assume-date-partitioning"}, description = "Assume partitioning is yyyy/mm/dd, default false")
public Boolean hiveSyncAssumeDatePartition = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
package org.apache.hudi.table;

import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
Expand All @@ -38,6 +36,7 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
Expand Down Expand Up @@ -71,7 +70,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
sanityCheck(conf, schema);
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);

Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
new ValidationException("Option [path] should not be empty.")));
Expand All @@ -90,7 +89,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
"Option [path] should not be empty.");
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
sanityCheck(conf, schema);
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);
return new HoodieTableSink(conf, schema);
}

Expand Down Expand Up @@ -154,35 +153,30 @@ private void sanityCheck(Configuration conf, ResolvedSchema schema) {
throw new HoodieValidationException("Field " + preCombineField + " does not exist in the table schema."
+ "Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' option.");
}
} else if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.PAYLOAD_CLASS_NAME)) {
// if precombine field is specified but payload clazz is default,
// use DefaultHoodieRecordPayload to make sure the precombine field is always taken for
// comparing.
conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, EventTimeAvroPayload.class.getName());
}
}

/**
* Sets up the config options based on the table definition, for e.g the table name, primary key.
* Sets up the config options based on the table definition, for e.g, the table name, primary key.
*
* @param conf The configuration to setup
* @param tableName The table name
* @param conf The configuration to set up
* @param tablePath The table path
* @param table The catalog table
* @param schema The physical schema
*/
private static void setupConfOptions(
Configuration conf,
String tableName,
ObjectIdentifier tablePath,
CatalogTable table,
ResolvedSchema schema) {
// table name
conf.setString(FlinkOptions.TABLE_NAME.key(), tableName);
conf.setString(FlinkOptions.TABLE_NAME.key(), tablePath.getObjectName());
// hoodie key about options
setupHoodieKeyOptions(conf, table);
// compaction options
setupCompactionOptions(conf);
// hive options
setupHiveOptions(conf);
setupHiveOptions(conf, tablePath);
// read options
setupReadOptions(conf);
// write options
Expand Down Expand Up @@ -309,10 +303,12 @@ private static void setupCompactionOptions(Configuration conf) {
/**
* Sets up the hive options from the table definition.
*/
private static void setupHiveOptions(Configuration conf) {
if (!conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)
&& FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME)) {
conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME, MultiPartKeysValueExtractor.class.getName());
private static void setupHiveOptions(Configuration conf, ObjectIdentifier tablePath) {
if (!conf.contains(FlinkOptions.HIVE_SYNC_DB)) {
conf.setString(FlinkOptions.HIVE_SYNC_DB, tablePath.getDatabaseName());
}
if (!conf.contains(FlinkOptions.HIVE_SYNC_TABLE)) {
conf.setString(FlinkOptions.HIVE_SYNC_TABLE, tablePath.getObjectName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
Expand Down Expand Up @@ -240,15 +239,21 @@ void testSetupHiveOptionsForSource() {
final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext1);
final Configuration conf1 = tableSource1.getConf();
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_DB), is("db1"));
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t1"));
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(MultiPartKeysValueExtractor.class.getName()));

// set up hive style partitioning is true.
this.conf.setString(FlinkOptions.HIVE_SYNC_DB, "db2");
this.conf.setString(FlinkOptions.HIVE_SYNC_TABLE, "t2");
this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);

final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSource tableSource2 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext2);
final Configuration conf2 = tableSource2.getConf();
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(SlashEncodedDayPartitionValueExtractor.class.getName()));
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_DB), is("db2"));
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t2"));
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(MultiPartKeysValueExtractor.class.getName()));
}

@Test
Expand Down Expand Up @@ -430,15 +435,21 @@ void testSetupHiveOptionsForSink() {
final MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSink tableSink1 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext1);
final Configuration conf1 = tableSink1.getConf();
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_DB), is("db1"));
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t1"));
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(MultiPartKeysValueExtractor.class.getName()));

// set up hive style partitioning is true.
this.conf.setString(FlinkOptions.HIVE_SYNC_DB, "db2");
this.conf.setString(FlinkOptions.HIVE_SYNC_TABLE, "t2");
this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);

final MockContext sinkContext2 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext2);
final Configuration conf2 = tableSink2.getConf();
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(SlashEncodedDayPartitionValueExtractor.class.getName()));
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_DB), is("db2"));
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t2"));
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(MultiPartKeysValueExtractor.class.getName()));
}

@Test
Expand Down Expand Up @@ -542,7 +553,7 @@ static MockContext getInstance(Configuration conf, ResolvedSchema schema, List<S

@Override
public ObjectIdentifier getObjectIdentifier() {
return ObjectIdentifier.of("hudi", "default", "t1");
return ObjectIdentifier.of("hudi", "db1", "t1");
}

@Override
Expand Down

0 comments on commit 1ad0e95

Please sign in to comment.