From 60b69ae35bef06347332df4313a6077d613cea51 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Thu, 18 Aug 2022 17:13:32 -0700 Subject: [PATCH] [HUDI-3579] Add timeline commands in hudi-cli (#5139) --- .../apache/hudi/cli/HoodiePrintHelper.java | 73 +++- .../hudi/cli/HoodieTableHeaderFields.java | 15 + .../main/java/org/apache/hudi/cli/Table.java | 53 ++- .../java/org/apache/hudi/cli/TableHeader.java | 10 + .../hudi/cli/commands/TimelineCommand.java | 410 ++++++++++++++++++ 5 files changed, 536 insertions(+), 25 deletions(-) create mode 100644 hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java index be640376eeef2..0ffec2cac08e5 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java @@ -58,30 +58,76 @@ public static String print(String[] header, String[][] rows) { * @param rows List of rows * @return Serialized form for printing */ - public static String print(TableHeader rowHeader, Map> fieldNameToConverterMap, + public static String print( + TableHeader rowHeader, Map> fieldNameToConverterMap, String sortByField, boolean isDescending, Integer limit, boolean headerOnly, List rows) { - return print(rowHeader, fieldNameToConverterMap, sortByField, isDescending, limit, headerOnly, rows, ""); + return print(rowHeader, fieldNameToConverterMap, false, sortByField, isDescending, limit, headerOnly, rows); + } + + /** + * Serialize Table to printable string. + * + * @param rowHeader Row Header + * @param fieldNameToConverterMap Field Specific Converters + * @param withRowNo Whether to add row number + * @param sortByField Sorting field + * @param isDescending Order + * @param limit Limit + * @param headerOnly Headers only + * @param rows List of rows + * @return Serialized form for printing + */ + public static String print( + TableHeader rowHeader, Map> fieldNameToConverterMap, boolean withRowNo, + String sortByField, boolean isDescending, Integer limit, boolean headerOnly, List rows) { + return print(rowHeader, fieldNameToConverterMap, withRowNo, sortByField, isDescending, limit, headerOnly, rows, ""); } /** * Serialize Table to printable string and also export a temporary view to easily write sql queries. + *

+ * Ideally, exporting view needs to be outside PrintHelper, but all commands use this. So this is easy + * way to add support for all commands * + * @param rowHeader Row Header + * @param fieldNameToConverterMap Field Specific Converters + * @param sortByField Sorting field + * @param isDescending Order + * @param limit Limit + * @param headerOnly Headers only + * @param rows List of rows + * @param tempTableName table name to export + * @return Serialized form for printing + */ + public static String print( + TableHeader rowHeader, Map> fieldNameToConverterMap, + String sortByField, boolean isDescending, Integer limit, boolean headerOnly, + List rows, String tempTableName) { + return print(rowHeader, fieldNameToConverterMap, false, sortByField, isDescending, limit, + headerOnly, rows, tempTableName); + } + + /** + * Serialize Table to printable string and also export a temporary view to easily write sql queries. + *

* Ideally, exporting view needs to be outside PrintHelper, but all commands use this. So this is easy * way to add support for all commands * - * @param rowHeader Row Header + * @param rowHeader Row Header * @param fieldNameToConverterMap Field Specific Converters - * @param sortByField Sorting field - * @param isDescending Order - * @param limit Limit - * @param headerOnly Headers only - * @param rows List of rows - * @param tempTableName table name to export + * @param withRowNo Whether to add row number + * @param sortByField Sorting field + * @param isDescending Order + * @param limit Limit + * @param headerOnly Headers only + * @param rows List of rows + * @param tempTableName table name to export * @return Serialized form for printing */ - public static String print(TableHeader rowHeader, Map> fieldNameToConverterMap, - String sortByField, boolean isDescending, Integer limit, boolean headerOnly, List rows, - String tempTableName) { + public static String print( + TableHeader rowHeader, Map> fieldNameToConverterMap, + boolean withRowNo, String sortByField, boolean isDescending, Integer limit, boolean headerOnly, + List rows, String tempTableName) { if (headerOnly) { return HoodiePrintHelper.print(rowHeader); @@ -97,7 +143,8 @@ public static String print(TableHeader rowHeader, Map> { // Header for this table private final TableHeader rowHeader; + // Whether to print row number + private final boolean addRowNo; // User-specified conversions before rendering private final Map> fieldNameToConverterMap; // Option attribute to track sorting field @@ -49,12 +51,17 @@ public class Table implements Iterable> { private final List> rawRows; // Flag to determine if all the rows have been added private boolean finishedAdding = false; - // Rows ready for Rendering + // Headers ready for rendering + private TableHeader renderHeaders; + // Rows ready for rendering private List> renderRows; - public Table(TableHeader rowHeader, Map> fieldNameToConverterMap, - Option orderingFieldNameOptional, Option isDescendingOptional, Option limitOptional) { + public Table( + TableHeader rowHeader, Map> fieldNameToConverterMap, + boolean addRowNo, Option orderingFieldNameOptional, + Option isDescendingOptional, Option limitOptional) { this.rowHeader = rowHeader; + this.addRowNo = addRowNo; this.fieldNameToConverterMap = fieldNameToConverterMap; this.orderingFieldNameOptional = orderingFieldNameOptional; this.isDescendingOptional = isDescendingOptional; @@ -64,7 +71,7 @@ public Table(TableHeader rowHeader, Map> fieldN /** * Main API to add row to the table. - * + * * @param row Row */ public Table add(List row) { @@ -134,15 +141,34 @@ private List> orderRows() { private void sortAndLimit() { this.renderRows = new ArrayList<>(); final int limit = this.limitOptional.orElse(rawRows.size()); - final List> orderedRows = orderRows(); - renderRows = orderedRows.stream().limit(limit).map(row -> IntStream.range(0, rowHeader.getNumFields()).mapToObj(idx -> { - String fieldName = rowHeader.get(idx); - if (fieldNameToConverterMap.containsKey(fieldName)) { - return fieldNameToConverterMap.get(fieldName).apply(row.get(idx)); + // Row number is added here if enabled + final List> rawOrderedRows = orderRows(); + final List> orderedRows; + if (addRowNo) { + orderedRows = new ArrayList<>(); + int rowNo = 0; + for (List row : rawOrderedRows) { + List newRow = new ArrayList<>(); + newRow.add(rowNo++); + newRow.addAll(row); + orderedRows.add(newRow); } - Object v = row.get(idx); - return v == null ? "null" : v.toString(); - }).collect(Collectors.toList())).collect(Collectors.toList()); + } else { + orderedRows = rawOrderedRows; + } + renderHeaders = addRowNo + ? new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ROW_NO) + .addTableHeaderFields(rowHeader) + : rowHeader; + renderRows = orderedRows.stream().limit(limit) + .map(row -> IntStream.range(0, renderHeaders.getNumFields()).mapToObj(idx -> { + String fieldName = renderHeaders.get(idx); + if (fieldNameToConverterMap.containsKey(fieldName)) { + return fieldNameToConverterMap.get(fieldName).apply(row.get(idx)); + } + Object v = row.get(idx); + return v == null ? "null" : v.toString(); + }).collect(Collectors.toList())).collect(Collectors.toList()); } @Override @@ -162,6 +188,9 @@ public void forEach(Consumer> action) { } public List getFieldNames() { + if (renderHeaders != null) { + return renderHeaders.getFieldNames(); + } return rowHeader.getFieldNames(); } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/TableHeader.java b/hudi-cli/src/main/java/org/apache/hudi/cli/TableHeader.java index 8ec392d1abfe3..ee17480a30da2 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/TableHeader.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/TableHeader.java @@ -39,6 +39,16 @@ public TableHeader addTableHeaderField(String fieldName) { return this; } + /** + * Add fields from another {@link TableHeader} instance. + * + * @param tableHeader {@link TableHeader} instance. + */ + public TableHeader addTableHeaderFields(TableHeader tableHeader) { + fieldNames.addAll(tableHeader.getFieldNames()); + return this; + } + /** * Get all field names. */ diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java new file mode 100644 index 0000000000000..9af04d155bcba --- /dev/null +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.avro.model.HoodieRollbackPlan; +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.HoodiePrintHelper; +import org.apache.hudi.cli.HoodieTableHeaderFields; +import org.apache.hudi.cli.TableHeader; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.metadata.HoodieTableMetadata; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.springframework.shell.core.CommandMarker; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * CLI command to display timeline options. + */ +@Component +public class TimelineCommand implements CommandMarker { + + private static final Logger LOG = LogManager.getLogger(TimelineCommand.class); + private static final SimpleDateFormat DATE_FORMAT_DEFAULT = new SimpleDateFormat("MM-dd HH:mm"); + private static final SimpleDateFormat DATE_FORMAT_SECONDS = new SimpleDateFormat("MM-dd HH:mm:ss"); + + @CliCommand(value = "timeline show active", help = "List all instants in active timeline") + public String showActive( + @CliOption(key = {"limit"}, help = "Limit #rows to be displayed", unspecifiedDefaultValue = "10") Integer limit, + @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, + @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, + @CliOption(key = {"headeronly"}, help = "Print Header Only", + unspecifiedDefaultValue = "false") final boolean headerOnly, + @CliOption(key = {"with-metadata-table"}, help = "Show metadata table timeline together with data table", + unspecifiedDefaultValue = "false") final boolean withMetadataTable, + @CliOption(key = {"show-rollback-info"}, help = "Show instant to rollback for rollbacks", + unspecifiedDefaultValue = "false") final boolean showRollbackInfo, + @CliOption(key = {"show-time-seconds"}, help = "Show seconds in instant file modification time", + unspecifiedDefaultValue = "false") final boolean showTimeSeconds) { + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + try { + if (withMetadataTable) { + HoodieTableMetaClient mtMetaClient = getMetadataTableMetaClient(metaClient); + return printTimelineInfoWithMetadataTable( + metaClient.getActiveTimeline(), mtMetaClient.getActiveTimeline(), + getInstantInfoFromTimeline(metaClient.getFs(), metaClient.getMetaPath()), + getInstantInfoFromTimeline(mtMetaClient.getFs(), mtMetaClient.getMetaPath()), + limit, sortByField, descending, headerOnly, true, showTimeSeconds, showRollbackInfo); + } + return printTimelineInfo( + metaClient.getActiveTimeline(), + getInstantInfoFromTimeline(metaClient.getFs(), metaClient.getMetaPath()), + limit, sortByField, descending, headerOnly, true, showTimeSeconds, showRollbackInfo); + } catch (IOException e) { + e.printStackTrace(); + return e.getMessage(); + } + } + + @CliCommand(value = "timeline show incomplete", help = "List all incomplete instants in active timeline") + public String showIncomplete( + @CliOption(key = {"limit"}, help = "Limit #rows to be displayed", unspecifiedDefaultValue = "10") Integer limit, + @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, + @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, + @CliOption(key = {"headeronly"}, help = "Print Header Only", + unspecifiedDefaultValue = "false") final boolean headerOnly, + @CliOption(key = {"show-rollback-info"}, help = "Show instant to rollback for rollbacks", + unspecifiedDefaultValue = "false") final boolean showRollbackInfo, + @CliOption(key = {"show-time-seconds"}, help = "Show seconds in instant file modification time", + unspecifiedDefaultValue = "false") final boolean showTimeSeconds) { + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + try { + return printTimelineInfo( + metaClient.getActiveTimeline().filterInflightsAndRequested(), + getInstantInfoFromTimeline(metaClient.getFs(), metaClient.getMetaPath()), + limit, sortByField, descending, headerOnly, true, showTimeSeconds, showRollbackInfo); + } catch (IOException e) { + e.printStackTrace(); + return e.getMessage(); + } + } + + @CliCommand(value = "metadata timeline show active", + help = "List all instants in active timeline of metadata table") + public String metadataShowActive( + @CliOption(key = {"limit"}, help = "Limit #rows to be displayed", unspecifiedDefaultValue = "10") Integer limit, + @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, + @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, + @CliOption(key = {"headeronly"}, help = "Print Header Only", + unspecifiedDefaultValue = "false") final boolean headerOnly, + @CliOption(key = {"show-time-seconds"}, help = "Show seconds in instant file modification time", + unspecifiedDefaultValue = "false") final boolean showTimeSeconds) { + HoodieTableMetaClient metaClient = getMetadataTableMetaClient(HoodieCLI.getTableMetaClient()); + try { + return printTimelineInfo( + metaClient.getActiveTimeline(), + getInstantInfoFromTimeline(metaClient.getFs(), metaClient.getMetaPath()), + limit, sortByField, descending, headerOnly, true, showTimeSeconds, false); + } catch (IOException e) { + e.printStackTrace(); + return e.getMessage(); + } + } + + @CliCommand(value = "metadata timeline show incomplete", + help = "List all incomplete instants in active timeline of metadata table") + public String metadataShowIncomplete( + @CliOption(key = {"limit"}, help = "Limit #rows to be displayed", unspecifiedDefaultValue = "10") Integer limit, + @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, + @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, + @CliOption(key = {"headeronly"}, help = "Print Header Only", + unspecifiedDefaultValue = "false") final boolean headerOnly, + @CliOption(key = {"show-time-seconds"}, help = "Show seconds in instant file modification time", + unspecifiedDefaultValue = "false") final boolean showTimeSeconds) { + HoodieTableMetaClient metaClient = getMetadataTableMetaClient(HoodieCLI.getTableMetaClient()); + try { + return printTimelineInfo( + metaClient.getActiveTimeline().filterInflightsAndRequested(), + getInstantInfoFromTimeline(metaClient.getFs(), metaClient.getMetaPath()), + limit, sortByField, descending, headerOnly, true, showTimeSeconds, false); + } catch (IOException e) { + e.printStackTrace(); + return e.getMessage(); + } + } + + private HoodieTableMetaClient getMetadataTableMetaClient(HoodieTableMetaClient metaClient) { + return HoodieTableMetaClient.builder().setConf(HoodieCLI.conf) + .setBasePath(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())) + .setLoadActiveTimelineOnLoad(false) + .setConsistencyGuardConfig(HoodieCLI.consistencyGuardConfig) + .build(); + } + + private Map> getInstantInfoFromTimeline( + FileSystem fs, String metaPath) throws IOException { + Map> instantMap = new HashMap<>(); + Stream instantStream = Arrays.stream( + HoodieTableMetaClient.scanFiles(fs, new Path(metaPath), path -> { + // Include only the meta files with extensions that needs to be included + String extension = HoodieInstant.getTimelineFileExtension(path.getName()); + return HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE.contains(extension); + })).map(HoodieInstantWithModTime::new); + instantStream.forEach(instant -> { + instantMap.computeIfAbsent(instant.getTimestamp(), t -> new HashMap<>()) + .put(instant.getState(), instant); + }); + return instantMap; + } + + private String getFormattedDate( + String instantTimestamp, HoodieInstant.State state, + Map> instantInfoMap, + boolean showTimeSeconds) { + Long timeMs = null; + Map mapping = instantInfoMap.get(instantTimestamp); + if (mapping != null && mapping.containsKey(state)) { + timeMs = mapping.get(state).getModificationTime(); + } + SimpleDateFormat sdf = showTimeSeconds ? DATE_FORMAT_SECONDS : DATE_FORMAT_DEFAULT; + return timeMs != null ? sdf.format(new Date(timeMs)) : "-"; + } + + private String printTimelineInfo( + HoodieTimeline timeline, + Map> instantInfoMap, + Integer limit, String sortByField, boolean descending, boolean headerOnly, boolean withRowNo, + boolean showTimeSeconds, boolean showRollbackInfo) { + Map> rollbackInfo = getRolledBackInstantInfo(timeline); + final List rows = timeline.getInstants().map(instant -> { + int numColumns = showRollbackInfo ? 7 : 6; + Comparable[] row = new Comparable[numColumns]; + String instantTimestamp = instant.getTimestamp(); + row[0] = instantTimestamp; + row[1] = instant.getAction(); + row[2] = instant.getState(); + if (showRollbackInfo) { + if (HoodieTimeline.ROLLBACK_ACTION.equalsIgnoreCase(instant.getAction())) { + row[3] = "Rolls back\n" + getInstantToRollback(timeline, instant); + } else { + if (rollbackInfo.containsKey(instantTimestamp)) { + row[3] = "Rolled back by\n" + String.join(",\n", rollbackInfo.get(instantTimestamp)); + } else { + row[3] = "-"; + } + } + } + row[numColumns - 3] = getFormattedDate( + instantTimestamp, HoodieInstant.State.REQUESTED, instantInfoMap, showTimeSeconds); + row[numColumns - 2] = getFormattedDate( + instantTimestamp, HoodieInstant.State.INFLIGHT, instantInfoMap, showTimeSeconds); + row[numColumns - 1] = getFormattedDate( + instantTimestamp, HoodieInstant.State.COMPLETED, instantInfoMap, showTimeSeconds); + return row; + }).collect(Collectors.toList()); + TableHeader header = new TableHeader() + .addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_STATE); + if (showRollbackInfo) { + header.addTableHeaderField(HoodieTableHeaderFields.HEADER_ROLLBACK_INFO); + } + header.addTableHeaderField(HoodieTableHeaderFields.HEADER_REQUESTED_TIME) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_INFLIGHT_TIME) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_COMPLETED_TIME); + return HoodiePrintHelper.print( + header, new HashMap<>(), withRowNo, sortByField, descending, limit, headerOnly, rows); + } + + private String printTimelineInfoWithMetadataTable( + HoodieTimeline dtTimeline, HoodieTimeline mtTimeline, + Map> dtInstantInfoMap, + Map> mtInstantInfoMap, + Integer limit, String sortByField, boolean descending, boolean headerOnly, boolean withRowNo, + boolean showTimeSeconds, boolean showRollbackInfo) { + Set instantTimeSet = new HashSet(dtInstantInfoMap.keySet()); + instantTimeSet.addAll(mtInstantInfoMap.keySet()); + List instantTimeList = instantTimeSet.stream() + .sorted(new HoodieInstantTimeComparator()).collect(Collectors.toList()); + Map> dtRollbackInfo = getRolledBackInstantInfo(dtTimeline); + + final List rows = instantTimeList.stream().map(instantTimestamp -> { + int numColumns = showRollbackInfo ? 12 : 11; + Option dtInstant = getInstant(dtTimeline, instantTimestamp); + Option mtInstant = getInstant(mtTimeline, instantTimestamp); + Comparable[] row = new Comparable[numColumns]; + row[0] = instantTimestamp; + row[1] = dtInstant.isPresent() ? dtInstant.get().getAction() : "-"; + row[2] = dtInstant.isPresent() ? dtInstant.get().getState() : "-"; + if (showRollbackInfo) { + if (dtInstant.isPresent() + && HoodieTimeline.ROLLBACK_ACTION.equalsIgnoreCase(dtInstant.get().getAction())) { + row[3] = "Rolls back\n" + getInstantToRollback(dtTimeline, dtInstant.get()); + } else { + if (dtRollbackInfo.containsKey(instantTimestamp)) { + row[3] = "Rolled back by\n" + String.join(",\n", dtRollbackInfo.get(instantTimestamp)); + } else { + row[3] = "-"; + } + } + } + row[numColumns - 8] = getFormattedDate( + instantTimestamp, HoodieInstant.State.REQUESTED, dtInstantInfoMap, showTimeSeconds); + row[numColumns - 7] = getFormattedDate( + instantTimestamp, HoodieInstant.State.INFLIGHT, dtInstantInfoMap, showTimeSeconds); + row[numColumns - 6] = getFormattedDate( + instantTimestamp, HoodieInstant.State.COMPLETED, dtInstantInfoMap, showTimeSeconds); + row[numColumns - 5] = mtInstant.isPresent() ? mtInstant.get().getAction() : "-"; + row[numColumns - 4] = mtInstant.isPresent() ? mtInstant.get().getState() : "-"; + row[numColumns - 3] = getFormattedDate( + instantTimestamp, HoodieInstant.State.REQUESTED, mtInstantInfoMap, showTimeSeconds); + row[numColumns - 2] = getFormattedDate( + instantTimestamp, HoodieInstant.State.INFLIGHT, mtInstantInfoMap, showTimeSeconds); + row[numColumns - 1] = getFormattedDate( + instantTimestamp, HoodieInstant.State.COMPLETED, mtInstantInfoMap, showTimeSeconds); + return row; + }).collect(Collectors.toList()); + TableHeader header = new TableHeader() + .addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_STATE); + if (showRollbackInfo) { + header.addTableHeaderField(HoodieTableHeaderFields.HEADER_ROLLBACK_INFO); + } + header.addTableHeaderField(HoodieTableHeaderFields.HEADER_REQUESTED_TIME) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_INFLIGHT_TIME) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_COMPLETED_TIME) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_MT_ACTION) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_MT_STATE) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_MT_REQUESTED_TIME) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_MT_INFLIGHT_TIME) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_MT_COMPLETED_TIME); + return HoodiePrintHelper.print( + header, new HashMap<>(), withRowNo, sortByField, descending, limit, headerOnly, rows); + } + + private Option getInstant(HoodieTimeline timeline, String instantTimestamp) { + return timeline.filter(instant -> instant.getTimestamp().equals(instantTimestamp)).firstInstant(); + } + + private String getInstantToRollback(HoodieTimeline timeline, HoodieInstant instant) { + try { + if (instant.isInflight()) { + HoodieInstant instantToUse = new HoodieInstant( + HoodieInstant.State.REQUESTED, instant.getAction(), instant.getTimestamp()); + HoodieRollbackPlan metadata = TimelineMetadataUtils + .deserializeAvroMetadata(timeline.getInstantDetails(instantToUse).get(), HoodieRollbackPlan.class); + return metadata.getInstantToRollback().getCommitTime(); + } else { + HoodieRollbackMetadata metadata = TimelineMetadataUtils + .deserializeAvroMetadata(timeline.getInstantDetails(instant).get(), HoodieRollbackMetadata.class); + return String.join(",", metadata.getCommitsRollback()); + } + } catch (IOException e) { + LOG.error(String.format("Error reading rollback info of %s", instant)); + e.printStackTrace(); + return "-"; + } + } + + private Map> getRolledBackInstantInfo(HoodieTimeline timeline) { + // Instant rolled back or to roll back -> rollback instants + Map> rollbackInfoMap = new HashMap<>(); + List rollbackInstants = timeline.filter(instant -> + HoodieTimeline.ROLLBACK_ACTION.equalsIgnoreCase(instant.getAction())) + .getInstants().collect(Collectors.toList()); + rollbackInstants.forEach(rollbackInstant -> { + try { + if (rollbackInstant.isInflight()) { + HoodieInstant instantToUse = new HoodieInstant( + HoodieInstant.State.REQUESTED, rollbackInstant.getAction(), rollbackInstant.getTimestamp()); + HoodieRollbackPlan metadata = TimelineMetadataUtils + .deserializeAvroMetadata(timeline.getInstantDetails(instantToUse).get(), HoodieRollbackPlan.class); + rollbackInfoMap.computeIfAbsent(metadata.getInstantToRollback().getCommitTime(), k -> new ArrayList<>()) + .add(rollbackInstant.getTimestamp()); + } else { + HoodieRollbackMetadata metadata = TimelineMetadataUtils + .deserializeAvroMetadata(timeline.getInstantDetails(rollbackInstant).get(), HoodieRollbackMetadata.class); + metadata.getCommitsRollback().forEach(instant -> { + rollbackInfoMap.computeIfAbsent(instant, k -> new ArrayList<>()) + .add(rollbackInstant.getTimestamp()); + }); + } + } catch (IOException e) { + LOG.error(String.format("Error reading rollback info of %s", rollbackInstant)); + e.printStackTrace(); + } + }); + return rollbackInfoMap; + } + + static class HoodieInstantWithModTime extends HoodieInstant { + + private final long modificationTimeMs; + + public HoodieInstantWithModTime(FileStatus fileStatus) { + super(fileStatus); + this.modificationTimeMs = fileStatus.getModificationTime(); + } + + public long getModificationTime() { + return modificationTimeMs; + } + } + + static class HoodieInstantTimeComparator implements Comparator { + @Override + public int compare(String o1, String o2) { + // For metadata table, the compaction instant time is "012345001" while the delta commit + // later is "012345", i.e., the compaction instant time has trailing "001". In the + // actual event sequence, metadata table compaction happens before the corresponding + // delta commit. For better visualization, we put "012345001" before "012345" + // when sorting in ascending order. + if (o1.length() != o2.length()) { + // o1 is longer than o2 + if (o1.length() - o2.length() == 3 && o1.endsWith("001") && o1.startsWith(o2)) { + return -1; + } + // o1 is shorter than o2 + if (o2.length() - o1.length() == 3 && o2.endsWith("001") && o2.startsWith(o1)) { + return 1; + } + } + return o1.compareTo(o2); + } + } +}