Skip to content

Commit

Permalink
[HUDI-3579] Add timeline commands in hudi-cli
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua committed Aug 17, 2022
1 parent 9055b2f commit 734d91e
Show file tree
Hide file tree
Showing 15 changed files with 566 additions and 54 deletions.
73 changes: 60 additions & 13 deletions hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,30 +58,76 @@ public static String print(String[] header, String[][] rows) {
* @param rows List of rows
* @return Serialized form for printing
*/
public static String print(TableHeader rowHeader, Map<String, Function<Object, String>> fieldNameToConverterMap,
public static String print(
TableHeader rowHeader, Map<String, Function<Object, String>> fieldNameToConverterMap,
String sortByField, boolean isDescending, Integer limit, boolean headerOnly, List<Comparable[]> rows) {
return print(rowHeader, fieldNameToConverterMap, sortByField, isDescending, limit, headerOnly, rows, "");
return print(rowHeader, fieldNameToConverterMap, false, sortByField, isDescending, limit, headerOnly, rows);
}

/**
* Serialize Table to printable string.
*
* @param rowHeader Row Header
* @param fieldNameToConverterMap Field Specific Converters
* @param withRowNo Whether to add row number
* @param sortByField Sorting field
* @param isDescending Order
* @param limit Limit
* @param headerOnly Headers only
* @param rows List of rows
* @return Serialized form for printing
*/
public static String print(
TableHeader rowHeader, Map<String, Function<Object, String>> fieldNameToConverterMap, boolean withRowNo,
String sortByField, boolean isDescending, Integer limit, boolean headerOnly, List<Comparable[]> rows) {
return print(rowHeader, fieldNameToConverterMap, withRowNo, sortByField, isDescending, limit, headerOnly, rows, "");
}

/**
* Serialize Table to printable string and also export a temporary view to easily write sql queries.
* <p>
* Ideally, exporting view needs to be outside PrintHelper, but all commands use this. So this is easy
* way to add support for all commands
*
* @param rowHeader Row Header
* @param fieldNameToConverterMap Field Specific Converters
* @param sortByField Sorting field
* @param isDescending Order
* @param limit Limit
* @param headerOnly Headers only
* @param rows List of rows
* @param tempTableName table name to export
* @return Serialized form for printing
*/
public static String print(
TableHeader rowHeader, Map<String, Function<Object, String>> fieldNameToConverterMap,
String sortByField, boolean isDescending, Integer limit, boolean headerOnly,
List<Comparable[]> rows, String tempTableName) {
return print(rowHeader, fieldNameToConverterMap, false, sortByField, isDescending, limit,
headerOnly, rows, tempTableName);
}

/**
* Serialize Table to printable string and also export a temporary view to easily write sql queries.
* <p>
* Ideally, exporting view needs to be outside PrintHelper, but all commands use this. So this is easy
* way to add support for all commands
*
* @param rowHeader Row Header
* @param rowHeader Row Header
* @param fieldNameToConverterMap Field Specific Converters
* @param sortByField Sorting field
* @param isDescending Order
* @param limit Limit
* @param headerOnly Headers only
* @param rows List of rows
* @param tempTableName table name to export
* @param withRowNo Whether to add row number
* @param sortByField Sorting field
* @param isDescending Order
* @param limit Limit
* @param headerOnly Headers only
* @param rows List of rows
* @param tempTableName table name to export
* @return Serialized form for printing
*/
public static String print(TableHeader rowHeader, Map<String, Function<Object, String>> fieldNameToConverterMap,
String sortByField, boolean isDescending, Integer limit, boolean headerOnly, List<Comparable[]> rows,
String tempTableName) {
public static String print(
TableHeader rowHeader, Map<String, Function<Object, String>> fieldNameToConverterMap,
boolean withRowNo, String sortByField, boolean isDescending, Integer limit, boolean headerOnly,
List<Comparable[]> rows, String tempTableName) {

if (headerOnly) {
return HoodiePrintHelper.print(rowHeader);
Expand All @@ -97,7 +143,8 @@ public static String print(TableHeader rowHeader, Map<String, Function<Object, S
}

Table table =
new Table(rowHeader, fieldNameToConverterMap, Option.ofNullable(sortByField.isEmpty() ? null : sortByField),
new Table(rowHeader, fieldNameToConverterMap, withRowNo,
Option.ofNullable(sortByField.isEmpty() ? null : sortByField),
Option.ofNullable(isDescending), Option.ofNullable(limit <= 0 ? null : limit)).addAllRows(rows).flip();

return HoodiePrintHelper.print(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* Fields of print table header.
*/
public class HoodieTableHeaderFields {
public static final String HEADER_ROW_NO = "No.";
public static final String HEADER_PARTITION = "Partition";
public static final String HEADER_INSTANT = "Instant";
public static final String HEADER_PARTITION_PATH = HEADER_PARTITION + " Path";
Expand Down Expand Up @@ -166,4 +167,18 @@ public class HoodieTableHeaderFields {
public static final String HEADER_DESTINATION_FILE_PATH = "Destination " + HEADER_FILE_PATH;
public static final String HEADER_RENAME_EXECUTED = "Rename Executed?";
public static final String HEADER_RENAME_SUCCEEDED = "Rename Succeeded?";

/**
* Fields of timeline command output
*/
public static final String HEADER_REQUESTED_TIME = "Requested\nTime";
public static final String HEADER_INFLIGHT_TIME = "Inflight\nTime";
public static final String HEADER_COMPLETED_TIME = "Completed\nTime";
public static final String HEADER_ROLLBACK_INFO = "Rollback Info";
public static final String HEADER_MT_PREFIX = "MT\n";
public static final String HEADER_MT_ACTION = HEADER_MT_PREFIX + HEADER_ACTION;
public static final String HEADER_MT_STATE = HEADER_MT_PREFIX + HEADER_STATE;
public static final String HEADER_MT_REQUESTED_TIME = HEADER_MT_PREFIX + HEADER_REQUESTED_TIME;
public static final String HEADER_MT_INFLIGHT_TIME = HEADER_MT_PREFIX + HEADER_INFLIGHT_TIME;
public static final String HEADER_MT_COMPLETED_TIME = HEADER_MT_PREFIX + HEADER_COMPLETED_TIME;
}
53 changes: 41 additions & 12 deletions hudi-cli/src/main/java/org/apache/hudi/cli/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class Table implements Iterable<List<String>> {

// Header for this table
private final TableHeader rowHeader;
// Whether to print row number
private final boolean addRowNo;
// User-specified conversions before rendering
private final Map<String, Function<Object, String>> fieldNameToConverterMap;
// Option attribute to track sorting field
Expand All @@ -49,12 +51,17 @@ public class Table implements Iterable<List<String>> {
private final List<List<Comparable>> rawRows;
// Flag to determine if all the rows have been added
private boolean finishedAdding = false;
// Rows ready for Rendering
// Headers ready for rendering
private TableHeader renderHeaders;
// Rows ready for rendering
private List<List<String>> renderRows;

public Table(TableHeader rowHeader, Map<String, Function<Object, String>> fieldNameToConverterMap,
Option<String> orderingFieldNameOptional, Option<Boolean> isDescendingOptional, Option<Integer> limitOptional) {
public Table(
TableHeader rowHeader, Map<String, Function<Object, String>> fieldNameToConverterMap,
boolean addRowNo, Option<String> orderingFieldNameOptional,
Option<Boolean> isDescendingOptional, Option<Integer> limitOptional) {
this.rowHeader = rowHeader;
this.addRowNo = addRowNo;
this.fieldNameToConverterMap = fieldNameToConverterMap;
this.orderingFieldNameOptional = orderingFieldNameOptional;
this.isDescendingOptional = isDescendingOptional;
Expand All @@ -64,7 +71,7 @@ public Table(TableHeader rowHeader, Map<String, Function<Object, String>> fieldN

/**
* Main API to add row to the table.
*
*
* @param row Row
*/
public Table add(List<Comparable> row) {
Expand Down Expand Up @@ -134,15 +141,34 @@ private List<List<Comparable>> orderRows() {
private void sortAndLimit() {
this.renderRows = new ArrayList<>();
final int limit = this.limitOptional.orElse(rawRows.size());
final List<List<Comparable>> orderedRows = orderRows();
renderRows = orderedRows.stream().limit(limit).map(row -> IntStream.range(0, rowHeader.getNumFields()).mapToObj(idx -> {
String fieldName = rowHeader.get(idx);
if (fieldNameToConverterMap.containsKey(fieldName)) {
return fieldNameToConverterMap.get(fieldName).apply(row.get(idx));
// Row number is added here if enabled
final List<List<Comparable>> rawOrderedRows = orderRows();
final List<List<Comparable>> orderedRows;
if (addRowNo) {
orderedRows = new ArrayList<>();
int rowNo = 0;
for (List<Comparable> row : rawOrderedRows) {
List<Comparable> newRow = new ArrayList<>();
newRow.add(rowNo++);
newRow.addAll(row);
orderedRows.add(newRow);
}
Object v = row.get(idx);
return v == null ? "null" : v.toString();
}).collect(Collectors.toList())).collect(Collectors.toList());
} else {
orderedRows = rawOrderedRows;
}
renderHeaders = addRowNo
? new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ROW_NO)
.addTableHeaderFields(rowHeader)
: rowHeader;
renderRows = orderedRows.stream().limit(limit)
.map(row -> IntStream.range(0, renderHeaders.getNumFields()).mapToObj(idx -> {
String fieldName = renderHeaders.get(idx);
if (fieldNameToConverterMap.containsKey(fieldName)) {
return fieldNameToConverterMap.get(fieldName).apply(row.get(idx));
}
Object v = row.get(idx);
return v == null ? "null" : v.toString();
}).collect(Collectors.toList())).collect(Collectors.toList());
}

@Override
Expand All @@ -162,6 +188,9 @@ public void forEach(Consumer<? super List<String>> action) {
}

public List<String> getFieldNames() {
if (renderHeaders != null) {
return renderHeaders.getFieldNames();
}
return rowHeader.getFieldNames();
}

Expand Down
10 changes: 10 additions & 0 deletions hudi-cli/src/main/java/org/apache/hudi/cli/TableHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ public TableHeader addTableHeaderField(String fieldName) {
return this;
}

/**
* Add fields from another {@link TableHeader} instance.
*
* @param tableHeader {@link TableHeader} instance.
*/
public TableHeader addTableHeaderFields(TableHeader tableHeader) {
fieldNames.addAll(tableHeader.getFieldNames());
return this;
}

/**
* Get all field names.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ private String printCommits(HoodieDefaultTimeline timeline,
});

final TableHeader header = new TableHeader()
.addTableHeaderField(HoodieTableHeaderFields.HEADER_COMMIT_TIME)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_PARTITIONS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATE_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS);
.addTableHeaderField(HoodieTableHeaderFields.HEADER_COMMIT_TIME)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_PARTITIONS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATE_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS);

return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending,
limit, headerOnly, rows, tempTableName);
Expand Down Expand Up @@ -162,7 +162,7 @@ private String printCommitsWithMetadata(HoodieDefaultTimeline timeline,
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN);

return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending,
limit, headerOnly, rows, tempTableName);
limit, headerOnly, rows, tempTableName);
}

@CliCommand(value = "commits show", help = "Show the commits")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,18 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.avro.AvroRuntimeException;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.util.StringUtils;

import org.apache.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
import scala.collection.JavaConverters;

import java.io.FileInputStream;
import java.io.IOException;
Expand All @@ -56,6 +54,8 @@
import java.util.TreeSet;
import java.util.stream.Collectors;

import scala.collection.JavaConverters;

import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;

/**
Expand Down Expand Up @@ -170,7 +170,7 @@ public String overwriteHoodieProperties(
String[][] rows = new String[allPropKeys.size()][];
int ind = 0;
for (String propKey : allPropKeys) {
String[] row = new String[]{
String[] row = new String[] {
propKey,
oldProps.getOrDefault(propKey, "null"),
newProps.getOrDefault(propKey, "null").toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ public String showRollback(
metadata.getPartitionMetadata().forEach((key, value) -> Stream
.concat(value.getSuccessDeleteFiles().stream().map(f -> Pair.of(f, true)),
value.getFailedDeleteFiles().stream().map(f -> Pair.of(f, false)))
.forEach(fileWithDeleteStatus -> {
Comparable[] row = new Comparable[5];
row[0] = metadata.getStartRollbackTime();
row[1] = metadata.getCommitsRollback().toString();
row[2] = key;
row[3] = fileWithDeleteStatus.getLeft();
row[4] = fileWithDeleteStatus.getRight();
rows.add(row);
}));
.forEach(fileWithDeleteStatus -> {
Comparable[] row = new Comparable[5];
row[0] = metadata.getStartRollbackTime();
row[1] = metadata.getCommitsRollback().toString();
row[2] = key;
row[3] = fileWithDeleteStatus.getLeft();
row[4] = fileWithDeleteStatus.getRight();
rows.add(row);
}));

TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_ROLLBACK_INSTANT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.exception.HoodieException;

import org.apache.spark.launcher.SparkLauncher;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private static String renderOldNewProps(Map<String, String> newProps, Map<String
String[][] rows = new String[allPropKeys.size()][];
int ind = 0;
for (String propKey : allPropKeys) {
String[] row = new String[]{
String[] row = new String[] {
propKey,
oldProps.getOrDefault(propKey, "null"),
newProps.getOrDefault(propKey, "null")
Expand Down
Loading

0 comments on commit 734d91e

Please sign in to comment.