diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 7b78fb8d6a28..38dfdd58106a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -67,6 +67,7 @@ private FlinkOptions() { // ------------------------------------------------------------------------ // Base Options // ------------------------------------------------------------------------ + public static final ConfigOption PATH = ConfigOptions .key("path") .stringType() @@ -79,6 +80,38 @@ private FlinkOptions() { // Common Options // ------------------------------------------------------------------------ + public static final ConfigOption TABLE_NAME = ConfigOptions + .key(HoodieWriteConfig.TBL_NAME.key()) + .stringType() + .noDefaultValue() + .withDescription("Table name to register to Hive metastore"); + + public static final String TABLE_TYPE_COPY_ON_WRITE = HoodieTableType.COPY_ON_WRITE.name(); + public static final String TABLE_TYPE_MERGE_ON_READ = HoodieTableType.MERGE_ON_READ.name(); + public static final ConfigOption TABLE_TYPE = ConfigOptions + .key("table.type") + .stringType() + .defaultValue(TABLE_TYPE_COPY_ON_WRITE) + .withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ"); + + public static final String NO_PRE_COMBINE = "no_precombine"; + public static final ConfigOption PRECOMBINE_FIELD = ConfigOptions + .key("payload.ordering.field") + .stringType() + .defaultValue("ts") + .withFallbackKeys("write.precombine.field") + .withDescription("Field used in preCombining before actual write. When two records have the same\n" + + "key value, we will pick the one with the largest value for the precombine field,\n" + + "determined by Object.compareTo(..)"); + + public static final ConfigOption PAYLOAD_CLASS_NAME = ConfigOptions + .key("payload.class") + .stringType() + .defaultValue(EventTimeAvroPayload.class.getName()) + .withFallbackKeys("write.payload.class") + .withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n" + + "This will render any value set for the option in-effective"); + public static final ConfigOption PARTITION_DEFAULT_NAME = ConfigOptions .key("partition.default_name") .stringType() @@ -116,6 +149,7 @@ private FlinkOptions() { // ------------------------------------------------------------------------ // Index Options // ------------------------------------------------------------------------ + public static final ConfigOption INDEX_TYPE = ConfigOptions .key("index.type") .stringType() @@ -150,6 +184,7 @@ private FlinkOptions() { // ------------------------------------------------------------------------ // Read Options // ------------------------------------------------------------------------ + public static final ConfigOption READ_TASKS = ConfigOptions .key("read.tasks") .intType() @@ -247,19 +282,6 @@ private FlinkOptions() { // ------------------------------------------------------------------------ // Write Options // ------------------------------------------------------------------------ - public static final ConfigOption TABLE_NAME = ConfigOptions - .key(HoodieWriteConfig.TBL_NAME.key()) - .stringType() - .noDefaultValue() - .withDescription("Table name to register to Hive metastore"); - - public static final String TABLE_TYPE_COPY_ON_WRITE = HoodieTableType.COPY_ON_WRITE.name(); - public static final String TABLE_TYPE_MERGE_ON_READ = HoodieTableType.MERGE_ON_READ.name(); - public static final ConfigOption TABLE_TYPE = ConfigOptions - .key("table.type") - .stringType() - .defaultValue(TABLE_TYPE_COPY_ON_WRITE) - .withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ"); public static final ConfigOption INSERT_CLUSTER = ConfigOptions .key("write.insert.cluster") @@ -275,22 +297,6 @@ private FlinkOptions() { .defaultValue("upsert") .withDescription("The write operation, that this write should do"); - public static final String NO_PRE_COMBINE = "no_precombine"; - public static final ConfigOption PRECOMBINE_FIELD = ConfigOptions - .key("write.precombine.field") - .stringType() - .defaultValue("ts") - .withDescription("Field used in preCombining before actual write. When two records have the same\n" - + "key value, we will pick the one with the largest value for the precombine field,\n" - + "determined by Object.compareTo(..)"); - - public static final ConfigOption PAYLOAD_CLASS_NAME = ConfigOptions - .key("write.payload.class") - .stringType() - .defaultValue(EventTimeAvroPayload.class.getName()) - .withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n" - + "This will render any value set for the option in-effective"); - /** * Flag to indicate whether to drop duplicates before insert/upsert. * By default false to gain extra performance. @@ -395,7 +401,7 @@ private FlinkOptions() { .key("write.index_bootstrap.tasks") .intType() .noDefaultValue() - .withDescription("Parallelism of tasks that do index bootstrap, default same as the sink parallelism"); + .withDescription("Parallelism of tasks that do index bootstrap, default same as the write task parallelism"); public static final ConfigOption BUCKET_ASSIGN_TASKS = ConfigOptions .key("write.bucket_assign.tasks") @@ -580,12 +586,12 @@ private FlinkOptions() { + "This also directly translates into how much you can incrementally pull on this table, default 30"); public static final ConfigOption CLEAN_RETAIN_HOURS = ConfigOptions - .key("clean.retain_hours") - .intType() - .defaultValue(24)// default 24 hours - .withDescription("Number of hours for which commits need to be retained. This config provides a more flexible option as" - + "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group," - + " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned."); + .key("clean.retain_hours") + .intType() + .defaultValue(24)// default 24 hours + .withDescription("Number of hours for which commits need to be retained. This config provides a more flexible option as" + + "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group," + + " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned."); public static final ConfigOption CLEAN_RETAIN_FILE_VERSIONS = ConfigOptions .key("clean.retain_file_versions") @@ -691,6 +697,7 @@ private FlinkOptions() { // ------------------------------------------------------------------------ // Hive Sync Options // ------------------------------------------------------------------------ + public static final ConfigOption HIVE_SYNC_ENABLED = ConfigOptions .key("hive_sync.enable") .booleanType()