Skip to content

Commit

Permalink
[MINOR] support different cleaning policy for flink (#5459)
Browse files Browse the repository at this point in the history
  • Loading branch information
garyli1019 authored Apr 29, 2022
1 parent 4e928a6 commit b27e8b5
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.config.HoodieIndexConfig;
Expand Down Expand Up @@ -550,13 +551,26 @@ private FlinkOptions() {
.defaultValue(true)
.withDescription("Whether to cleanup the old commits immediately on new commits, enabled by default");

public static final ConfigOption<String> CLEAN_POLICY = ConfigOptions
.key("clean.policy")
.stringType()
.defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
.withDescription("Clean policy to manage the Hudi table. Available option: KEEP_LATEST_COMMITS, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS."
+ "Default is KEEP_LATEST_COMMITS.");

public static final ConfigOption<Integer> CLEAN_RETAIN_COMMITS = ConfigOptions
.key("clean.retain_commits")
.intType()
.defaultValue(30)// default 30 commits
.withDescription("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
+ "This also directly translates into how much you can incrementally pull on this table, default 30");

public static final ConfigOption<Integer> CLEAN_RETAIN_FILE_VERSIONS = ConfigOptions
.key("clean.retain_file_versions")
.intType()
.defaultValue(5)// default 5 version
.withDescription("Number of file versions to retain. default 5");

public static final ConfigOption<Integer> ARCHIVE_MAX_COMMITS = ConfigOptions
.key("archive.max_commits")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.streamer;

import org.apache.hudi.client.utils.OperationConverter;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -260,11 +261,20 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default")
public Boolean cleanAsyncEnabled = true;

@Parameter(names = {"--clean-policy"},
description = "Clean policy to manage the Hudi table. Available option: KEEP_LATEST_COMMITS, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS."
+ "Default is KEEP_LATEST_COMMITS.")
public String cleanPolicy = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name();

@Parameter(names = {"--clean-retain-commits"},
description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
+ "This also directly translates into how much you can incrementally pull on this table, default 10")
public Integer cleanRetainCommits = 10;

@Parameter(names = {"--clean-retain-file-versions"},
description = "Number of file versions to retain. Each file group will be retained for this number of version. default 5")
public Integer cleanRetainFileVersions = 5;

@Parameter(names = {"--archive-max-commits"},
description = "Max number of commits to keep before archiving older commits into a sequential log, default 30")
public Integer archiveMaxCommits = 30;
Expand Down Expand Up @@ -392,7 +402,9 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt
conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory);
conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo);
conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnabled);
conf.setString(FlinkOptions.CLEAN_POLICY, config.cleanPolicy);
conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits);
conf.setInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS, config.cleanRetainFileVersions);
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits);
conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, config.archiveMinCommits);
conf.setBoolean(FlinkOptions.HIVE_SYNC_ENABLED, config.hiveSyncEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,12 @@ public static HoodieWriteConfig getHoodieClientConfig(
.withMaxDeltaSecondsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS))
.withAsyncClean(conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED))
.retainCommits(conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS))
.retainFileVersions(conf.getInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS))
// override and hardcode to 20,
// actually Flink cleaning is always with parallelism 1 now
.withCleanerParallelism(20)
.archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS))
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.withCleanerPolicy(HoodieCleaningPolicy.valueOf(conf.getString(FlinkOptions.CLEAN_POLICY)))
.build())
.withMemoryConfig(
HoodieMemoryConfig.newBuilder()
Expand Down

0 comments on commit b27e8b5

Please sign in to comment.