From 13cd731543f8cebb0f81d4a5342bb2de382de5e3 Mon Sep 17 00:00:00 2001 From: Gary Li Date: Tue, 26 Apr 2022 18:36:02 +0800 Subject: [PATCH] [MINOR] support different cleaning policy for flink --- .../apache/hudi/configuration/FlinkOptions.java | 14 ++++++++++++++ .../apache/hudi/streamer/FlinkStreamerConfig.java | 12 ++++++++++++ .../java/org/apache/hudi/util/StreamerUtil.java | 3 ++- 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index e2be7d364b77..14bd298f7724 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.config.HoodieIndexConfig; @@ -549,6 +550,13 @@ private FlinkOptions() { .defaultValue(true) .withDescription("Whether to cleanup the old commits immediately on new commits, enabled by default"); + public static final ConfigOption CLEAN_POLICY = ConfigOptions + .key("clean.policy") + .stringType() + .defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()) + .withDescription("Clean policy to manage the Hudi table. Available option: KEEP_LATEST_COMMITS, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS." + + "Default is KEEP_LATEST_COMMITS."); + public static final ConfigOption CLEAN_RETAIN_COMMITS = ConfigOptions .key("clean.retain_commits") .intType() @@ -556,6 +564,12 @@ private FlinkOptions() { .withDescription("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n" + "This also directly translates into how much you can incrementally pull on this table, default 30"); + public static final ConfigOption CLEAN_RETAIN_FILE_VERSIONS = ConfigOptions + .key("clean.retain_file_versions") + .intType() + .defaultValue(5)// default 5 version + .withDescription("Number of file versions to retain. default 5"); + public static final ConfigOption ARCHIVE_MAX_COMMITS = ConfigOptions .key("archive.max_commits") .intType() diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 592520bf902f..f82712bca2c2 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -19,6 +19,7 @@ package org.apache.hudi.streamer; import org.apache.hudi.client.utils.OperationConverter; +import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.StringUtils; @@ -260,11 +261,20 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default") public Boolean cleanAsyncEnabled = true; + @Parameter(names = {"--clean-policy"}, + description = "Clean policy to manage the Hudi table. Available option: KEEP_LATEST_COMMITS, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS." + + "Default is KEEP_LATEST_COMMITS.") + public String cleanPolicy = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(); + @Parameter(names = {"--clean-retain-commits"}, description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n" + "This also directly translates into how much you can incrementally pull on this table, default 10") public Integer cleanRetainCommits = 10; + @Parameter(names = {"--clean-retain-file-versions"}, + description = "Number of file versions to retain. Each file group will be retained for this number of version. default 5") + public Integer cleanRetainFileVersions = 5; + @Parameter(names = {"--archive-max-commits"}, description = "Max number of commits to keep before archiving older commits into a sequential log, default 30") public Integer archiveMaxCommits = 30; @@ -392,7 +402,9 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory); conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo); conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnabled); + conf.setString(FlinkOptions.CLEAN_POLICY, config.cleanPolicy); conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits); + conf.setInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS, config.cleanRetainFileVersions); conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits); conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, config.archiveMinCommits); conf.setBoolean(FlinkOptions.HIVE_SYNC_ENABLED, config.hiveSyncEnabled); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 3138d5d98616..dfbe0efd67c7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -176,11 +176,12 @@ public static HoodieWriteConfig getHoodieClientConfig( .withMaxDeltaSecondsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS)) .withAsyncClean(conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) .retainCommits(conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS)) + .retainFileVersions(conf.getInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS)) // override and hardcode to 20, // actually Flink cleaning is always with parallelism 1 now .withCleanerParallelism(20) .archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS)) - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .withCleanerPolicy(HoodieCleaningPolicy.valueOf(conf.getString(FlinkOptions.CLEAN_POLICY))) .build()) .withMemoryConfig( HoodieMemoryConfig.newBuilder()