Skip to content

Commit

Permalink
[HUDI-4340] fix not parsable text DateTimeParseException by addng a m…
Browse files Browse the repository at this point in the history
…ethod parseDateFromInstantTimeSafely for parsing timestamp when output metrics (apache#6000)

(cherry picked from commit 71b8174)
  • Loading branch information
TengHuo authored and neverdizzy committed Dec 1, 2022
1 parent 01285c1 commit ce2e982
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -307,17 +306,14 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf);

void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
try {

if (writeTimer != null) {
long durationInMs = metrics.getDurationInMs(writeTimer.stop());
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(instantTime).getTime(), durationInMs,
metadata, actionType);
writeTimer = null;
}
} catch (ParseException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime
+ "Instant time is not of valid format", e);
if (writeTimer != null) {
long durationInMs = metrics.getDurationInMs(writeTimer.stop());
// instantTime could be a non-standard value, so use `parseDateFromInstantTimeSafely`
// e.g. INIT_INSTANT_TS, METADATA_BOOTSTRAP_INSTANT_TS and FULL_BOOTSTRAP_INSTANT_TS in HoodieTimeline
HoodieActiveTimeline.parseDateFromInstantTimeSafely(instantTime).ifPresent(parsedInstant ->
metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, actionType)
);
writeTimer = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
Expand All @@ -64,7 +63,6 @@
import org.apache.spark.api.java.JavaSparkContext;

import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -318,13 +316,9 @@ protected void completeCompaction(HoodieCommitMetadata metadata,
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+ config.getBasePath() + " at time " + compactionCommitTime, e);
}
HoodieActiveTimeline.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant ->
metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION)
);
}
LOG.info("Compacted successfully on commit " + compactionCommitTime);
}
Expand Down Expand Up @@ -402,13 +396,9 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
if (clusteringTimer != null) {
long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+ config.getBasePath() + " at time " + clusteringCommitTime, e);
}
HoodieActiveTimeline.parseDateFromInstantTimeSafely(clusteringCommitTime).ifPresent(parsedInstant ->
metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION)
);
}
LOG.info("Clustering successfully on commit " + clusteringCommitTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,57 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION,
REQUESTED_INDEX_COMMIT_EXTENSION, INFLIGHT_INDEX_COMMIT_EXTENSION, INDEX_COMMIT_EXTENSION,
REQUESTED_SAVE_SCHEMA_ACTION_EXTENSION, INFLIGHT_SAVE_SCHEMA_ACTION_EXTENSION, SAVE_SCHEMA_ACTION_EXTENSION));

private static final Set<String> NOT_PARSABLE_TIMESTAMPS = new HashSet<String>(3) {{
add(HoodieTimeline.INIT_INSTANT_TS);
add(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS);
add(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
}};

private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class);
protected HoodieTableMetaClient metaClient;

/**
* Parse the timestamp of an Instant and return a {@code Date}.
* Throw ParseException if timestamp is not valid format as
* {@link org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator#SECS_INSTANT_TIMESTAMP_FORMAT}.
*
* @param timestamp a timestamp String which follow pattern as
* {@link org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator#SECS_INSTANT_TIMESTAMP_FORMAT}.
* @return Date of instant timestamp
*/
public static Date parseDateFromInstantTime(String timestamp) throws ParseException {
return HoodieInstantTimeGenerator.parseDateFromInstantTime(timestamp);
}

/**
* The same parsing method as above, but this method will mute ParseException.
* If the given timestamp is invalid, returns {@code Option.empty}.
* Or a corresponding Date value if these timestamp strings are provided
* {@link org.apache.hudi.common.table.timeline.HoodieTimeline#INIT_INSTANT_TS},
* {@link org.apache.hudi.common.table.timeline.HoodieTimeline#METADATA_BOOTSTRAP_INSTANT_TS},
* {@link org.apache.hudi.common.table.timeline.HoodieTimeline#FULL_BOOTSTRAP_INSTANT_TS}.
* This method is useful when parsing timestamp for metrics
*
* @param timestamp a timestamp String which follow pattern as
* {@link org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator#SECS_INSTANT_TIMESTAMP_FORMAT}.
* @return {@code Option<Date>} of instant timestamp, {@code Option.empty} if invalid timestamp
*/
public static Option<Date> parseDateFromInstantTimeSafely(String timestamp) {
Option<Date> parsedDate;
try {
parsedDate = Option.of(HoodieInstantTimeGenerator.parseDateFromInstantTime(timestamp));
} catch (ParseException e) {
if (NOT_PARSABLE_TIMESTAMPS.contains(timestamp)) {
parsedDate = Option.of(new Date(Integer.parseInt(timestamp)));
} else {
LOG.warn("Failed to parse timestamp " + timestamp + ": " + e.getMessage());
parsedDate = Option.empty();
}
}
return parsedDate;
}

/**
* Format the Date to a String representing the timestamp of a Hoodie Instant.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public class HoodieInstantTimeGenerator {

// The last Instant timestamp generated
private static AtomicReference<String> lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE));
private static final String ALL_ZERO_TIMESTAMP = "00000000000000";

// The default number of milliseconds that we add if they are not present
// We prefer the max timestamp as it mimics the current behavior with second granularity
Expand Down Expand Up @@ -96,11 +95,7 @@ public static Date parseDateFromInstantTime(String timestamp) throws ParseExcept
LocalDateTime dt = LocalDateTime.parse(timestampInMillis, MILLIS_INSTANT_TIME_FORMATTER);
return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant());
} catch (DateTimeParseException e) {
// Special handling for all zero timestamp which is not parsable by DateTimeFormatter
if (timestamp.equals(ALL_ZERO_TIMESTAMP)) {
return new Date(0);
}
throw e;
throw new ParseException(e.getMessage(), e.getErrorIndex());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,6 @@ public void testCreateNewInstantTime() throws Exception {
lastInstantTime = newInstantTime;
}

// All zero timestamp can be parsed
HoodieActiveTimeline.parseDateFromInstantTime("00000000000000");

// Multiple thread test
final int numChecks = 100000;
final int numThreads = 100;
Expand Down Expand Up @@ -631,6 +628,26 @@ public void testMillisGranularityInstantDateParsing() throws ParseException {
);
}

@Test
public void testInvalidInstantDateParsing() throws ParseException {
// Test all invalid timestamp in HoodieTimeline, shouldn't throw any error and should return a correct value
assertEquals(Long.parseLong(HoodieTimeline.INIT_INSTANT_TS),
HoodieActiveTimeline.parseDateFromInstantTimeSafely(HoodieTimeline.INIT_INSTANT_TS).get().getTime());
assertEquals(Long.parseLong(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS),
HoodieActiveTimeline.parseDateFromInstantTimeSafely(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS).get().getTime());
assertEquals(Long.parseLong(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS),
HoodieActiveTimeline.parseDateFromInstantTimeSafely(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS).get().getTime());

// Test metadata table compaction instant date parsing with INIT_INSTANT_TS, should return Option.empty
assertEquals(Option.empty(),
HoodieActiveTimeline.parseDateFromInstantTimeSafely(HoodieTimeline.INIT_INSTANT_TS + "001"));

// Test a valid instant timestamp, should equal the same result as HoodieActiveTimeline.parseDateFromInstantTime
String testInstant = "20210101120101";
assertEquals(HoodieActiveTimeline.parseDateFromInstantTime(testInstant).getTime(),
HoodieActiveTimeline.parseDateFromInstantTimeSafely(testInstant).get().getTime());
}

/**
* Returns an exhaustive list of all possible HoodieInstant.
* @return list of HoodieInstant
Expand Down

0 comments on commit ce2e982

Please sign in to comment.