Skip to content

Commit

Permalink
Add matching of non-transient exceptions that will avoid failing the …
Browse files Browse the repository at this point in the history
…container in GMIP (#3662)

* Add matching of non-transient exceptions that will avoid failing the container in GMIP

* Add separate variable for currently seen dataset errors
  • Loading branch information
jack-moseley authored Mar 31, 2023
1 parent bf3d212 commit 5338bdc
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
public static final String GMCE_METADATA_WRITER_CLASSES = "gmce.metadata.writer.classes";
public static final String GMCE_METADATA_WRITER_MAX_ERROR_DATASET = "gmce.metadata.writer.max.error.dataset";
public static final String TRANSIENT_EXCEPTION_MESSAGES_KEY = "gmce.metadata.writer.transient.exception.messages";
public static final String NON_TRANSIENT_EXCEPTION_MESSAGES_KEY = "gmce.metadata.writer.nonTransient.exception.messages";
public static final int DEFUALT_GMCE_METADATA_WRITER_MAX_ERROR_DATASET = 0;
public static final int DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS = 60000;
public static final String TABLE_NAME_DELIMITER = ".";
Expand All @@ -124,10 +125,12 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
private Map<String, List<HiveRegistrationUnit.Column>> partitionKeysMap;
private Closer closer = Closer.create();
protected final AtomicLong recordCount = new AtomicLong(0L);
private final Set<String> currentErrorDatasets = new HashSet<>();
@Setter
private int maxErrorDataset;
protected EventSubmitter eventSubmitter;
private final Set<String> transientExceptionMessages;
private final Set<String> nonTransientExceptionMessages;

@AllArgsConstructor
static class TableStatus {
Expand Down Expand Up @@ -161,6 +164,7 @@ public GobblinMCEWriter(DataWriterBuilder<Schema, GenericRecord> builder, State
MetricContext metricContext = Instrumented.getMetricContext(state, this.getClass(), tags);
eventSubmitter = new EventSubmitter.Builder(metricContext, GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build();
transientExceptionMessages = new HashSet<>(properties.getPropAsList(TRANSIENT_EXCEPTION_MESSAGES_KEY, ""));
nonTransientExceptionMessages = new HashSet<>(properties.getPropAsList(NON_TRANSIENT_EXCEPTION_MESSAGES_KEY, ""));
}

@Override
Expand Down Expand Up @@ -339,7 +343,7 @@ void writeWithMetadataWriters(
try {
writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
} catch (Exception e) {
if (isExceptionTransient(e, transientExceptionMessages)) {
if (exceptionMatches(e, transientExceptionMessages)) {
throw new RuntimeException("Failing container due to transient exception for db: " + dbName + " table: " + tableName, e);
}
meetException = true;
Expand Down Expand Up @@ -390,8 +394,13 @@ private void addOrThrowException(Exception e, String tableString, String dbName,
lastException.droppedPartitionValues.addAll(((HiveMetadataWriterWithPartitionInfoException) e).droppedPartitionValues);
}
this.datasetErrorMap.put(tableStatus.datasetPath, tableErrorMap);
log.error(String.format("Meet exception when flush table %s", tableString), e);
if (datasetErrorMap.size() > maxErrorDataset) {
if (!exceptionMatches(e, this.nonTransientExceptionMessages)) {
currentErrorDatasets.add(tableStatus.datasetPath);
log.error(String.format("Meet exception when flush table %s", tableString), e);
} else {
log.error(String.format("Detected known non-transient failure for table %s", tableString), e);
}
if (currentErrorDatasets.size() > maxErrorDataset) {
//Fail the job if the error size exceeds some number
throw new IOException(String.format("Container fails to flush for more than %s dataset, last exception we met is: ", maxErrorDataset), e);
}
Expand All @@ -412,7 +421,7 @@ private void flush(String dbName, String tableName) throws IOException {
try {
writer.flush(dbName, tableName);
} catch (IOException e) {
if (isExceptionTransient(e, transientExceptionMessages)) {
if (exceptionMatches(e, transientExceptionMessages)) {
throw new RuntimeException("Failing container due to transient exception for db: " + dbName + " table: " + tableName, e);
}
meetException = true;
Expand All @@ -435,11 +444,12 @@ private void flush(String dbName, String tableName) throws IOException {
}

/**
* Check if exception is contained within a known list of transient exceptions. These exceptions should not be caught
* to avoid advancing watermarks and skipping GMCEs unnecessarily.
* Check if exception is contained within a known list of known exceptions. Transient exceptions should not be caught
* to avoid advancing watermarks and skipping GMCEs unnecessarily, while non-transient exceptions should not count
* towards the maximum number of failed datasets.
*/
public static boolean isExceptionTransient(Exception e, Set<String> transientExceptionMessages) {
return transientExceptionMessages.stream().anyMatch(message -> Throwables.getRootCause(e).toString().contains(message));
public static boolean exceptionMatches(Exception e, Set<String> exceptionMessages) {
return exceptionMessages.stream().anyMatch(message -> Throwables.getRootCause(e).toString().contains(message));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,12 @@ public void testDetectTransientException() {
Set<String> transientExceptions = Sets.newHashSet("Filesystem closed", "Hive timeout", "RejectedExecutionException");
IOException transientException = new IOException("test1 Filesystem closed test");
IOException wrapperException = new IOException("wrapper exception", transientException);
Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(transientException, transientExceptions));
Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(wrapperException, transientExceptions));
Assert.assertTrue(GobblinMCEWriter.exceptionMatches(transientException, transientExceptions));
Assert.assertTrue(GobblinMCEWriter.exceptionMatches(wrapperException, transientExceptions));
IOException nonTransientException = new IOException("Write failed due to bad schema");
Assert.assertFalse(GobblinMCEWriter.isExceptionTransient(nonTransientException, transientExceptions));
Assert.assertFalse(GobblinMCEWriter.exceptionMatches(nonTransientException, transientExceptions));
RejectedExecutionException rejectedExecutionException = new RejectedExecutionException("");
Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(rejectedExecutionException, transientExceptions));
Assert.assertTrue(GobblinMCEWriter.exceptionMatches(rejectedExecutionException, transientExceptions));
}

@DataProvider(name="AllowMockMetadataWriter")
Expand Down

0 comments on commit 5338bdc

Please sign in to comment.