Skip to content

Commit

Permalink
[HUDI-5032] Add archive to cli (apache#7076)
Browse files Browse the repository at this point in the history
Adding archiving capability to cli.

Co-authored-by: Jonathan Vexler <=>
  • Loading branch information
jonvex authored Nov 3, 2022
1 parent 1ddf9d4 commit 14ec963
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@

package org.apache.hudi.cli.commands;

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieCommitMetadata;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand All @@ -37,6 +35,16 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
import org.springframework.shell.standard.ShellOption;
Expand All @@ -52,6 +60,37 @@
*/
@ShellComponent
public class ArchivedCommitsCommand {
private static final Logger LOG = LogManager.getLogger(ArchivedCommitsCommand.class);
@ShellMethod(key = "trigger archival", value = "trigger archival")
public String triggerArchival(
@ShellOption(value = {"--minCommits"},
help = "Minimum number of instants to retain in the active timeline. See hoodie.keep.min.commits",
defaultValue = "20") int minCommits,
@ShellOption(value = {"--maxCommits"},
help = "Maximum number of instants to retain in the active timeline. See hoodie.keep.max.commits",
defaultValue = "30") int maxCommits,
@ShellOption(value = {"--commitsRetainedByCleaner"}, help = "Number of commits to retain, without cleaning",
defaultValue = "10") int retained,
@ShellOption(value = {"--enableMetadata"},
help = "Enable the internal metadata table which serves table metadata like level file listings",
defaultValue = "true") boolean enableMetadata,
@ShellOption(value = "--sparkMemory", defaultValue = "1G",
help = "Spark executor memory") final String sparkMemory,
@ShellOption(value = "--sparkMaster", defaultValue = "local", help = "Spark Master") String master) throws Exception {
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
String cmd = SparkCommand.ARCHIVE.toString();
sparkLauncher.addAppArgs(cmd, master, sparkMemory, Integer.toString(minCommits), Integer.toString(maxCommits),
Integer.toString(retained), Boolean.toString(enableMetadata), HoodieCLI.basePath);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to trigger archival";
}
return "Archival successfully triggered";
}

@ShellMethod(key = "show archived commit stats", value = "Read commits from archived files and show details")
public String showArchivedCommits(
Expand Down Expand Up @@ -206,4 +245,5 @@ private Comparable[] readCommit(GenericRecord record, boolean skipMetadata) {
return new Comparable[] {};
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import org.apache.hudi.cli.DeDupeType;
import org.apache.hudi.cli.DedupeSparkJob;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.HoodieTimelineArchiver;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
Expand All @@ -37,6 +40,7 @@
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieBootstrapConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieIndexConfig;
Expand Down Expand Up @@ -98,7 +102,7 @@ enum SparkCommand {
BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE,
COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE,
CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_MARKER, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE,
REPAIR_DEPRECATED_PARTITION, RENAME_PARTITION
REPAIR_DEPRECATED_PARTITION, RENAME_PARTITION, ARCHIVE
}

public static void main(String[] args) throws Exception {
Expand Down Expand Up @@ -290,6 +294,10 @@ public static void main(String[] args) throws Exception {
assert (args.length == 6);
returnCode = renamePartition(jsc, args[3], args[4], args[5]);
break;
case ARCHIVE:
assert (args.length == 8);
returnCode = archive(jsc, Integer.parseInt(args[3]), Integer.parseInt(args[4]), Integer.parseInt(args[5]), Boolean.parseBoolean(args[6]), args[7]);
break;
default:
break;
}
Expand Down Expand Up @@ -646,4 +654,23 @@ private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbac
HoodieFailedWritesCleaningPolicy.EAGER).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
}

private static int archive(JavaSparkContext jsc, int minCommits, int maxCommits, int commitsRetained, boolean enableMetadata, String basePath) {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(minCommits,maxCommits).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(commitsRetained).build())
.withEmbeddedTimelineServerEnabled(false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build())
.build();
HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
HoodieSparkTable<HoodieAvroPayload> table = HoodieSparkTable.create(config, context);
try {
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table);
archiver.archiveIfRequired(context,true);
} catch (IOException ioe) {
LOG.error("Failed to archive with IOException: " + ioe);
return -1;
}
return 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.cli.commands;

import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil;
import org.apache.hudi.common.table.HoodieTableMetaClient;

import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.shell.Shell;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Tag("functional")
@SpringBootTest(properties = {"spring.shell.interactive.enabled=false", "spring.shell.command.script.enabled=false"})
public class TestArchiveCommand extends CLIFunctionalTestHarness {

@Autowired
private Shell shell;

@Test
public void testArchiving() throws Exception {
HoodieCLI.conf = hadoopConf();

// Create table and connect
String tableName = tableName();
String tablePath = tablePath(tableName);

new TableCommand().createTable(
tablePath, tableName,
"COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload");

HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();

// Create six commits
for (int i = 100; i < 106; i++) {
String timestamp = String.valueOf(i);
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath,timestamp, hadoopConf());
}

Object cmdResult = shell.evaluate(() -> "trigger archival --minCommits 2 --maxCommits 3 --commitsRetainedByCleaner 1 --enableMetadata false");
assertTrue(ShellEvaluationResultUtil.isSuccess(cmdResult));
metaClient = HoodieTableMetaClient.reload(metaClient);

//get instants in the active timeline only returns the latest state of the commit
//therefore we expect 2 instants because minCommits is 2
assertEquals(2, metaClient.getActiveTimeline().getInstants().count());

//get instants in the archived timeline returns all instants in the commit
//therefore we expect 12 instants because 6 commits - 2 commits in active timeline = 4 in archived
//since each commit is completed, there are 3 instances per commit (requested, inflight, completed)
//and 3 instances per commit * 4 commits = 12 instances
assertEquals(12, metaClient.getArchivedTimeline().getInstants().count());
}

}

0 comments on commit 14ec963

Please sign in to comment.