diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java index c7a7a54810a..d2d72a969ea 100644 --- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java +++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java @@ -107,6 +107,7 @@ public class GobblinMCEWriter implements DataWriter { public static final String GMCE_METADATA_WRITER_CLASSES = "gmce.metadata.writer.classes"; public static final String GMCE_METADATA_WRITER_MAX_ERROR_DATASET = "gmce.metadata.writer.max.error.dataset"; public static final String TRANSIENT_EXCEPTION_MESSAGES_KEY = "gmce.metadata.writer.transient.exception.messages"; + public static final String NON_TRANSIENT_EXCEPTION_MESSAGES_KEY = "gmce.metadata.writer.nonTransient.exception.messages"; public static final int DEFUALT_GMCE_METADATA_WRITER_MAX_ERROR_DATASET = 0; public static final int DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS = 60000; public static final String TABLE_NAME_DELIMITER = "."; @@ -124,10 +125,12 @@ public class GobblinMCEWriter implements DataWriter { private Map> partitionKeysMap; private Closer closer = Closer.create(); protected final AtomicLong recordCount = new AtomicLong(0L); + private final Set currentErrorDatasets = new HashSet<>(); @Setter private int maxErrorDataset; protected EventSubmitter eventSubmitter; private final Set transientExceptionMessages; + private final Set nonTransientExceptionMessages; @AllArgsConstructor static class TableStatus { @@ -161,6 +164,7 @@ public GobblinMCEWriter(DataWriterBuilder builder, State MetricContext metricContext = Instrumented.getMetricContext(state, this.getClass(), tags); eventSubmitter = new EventSubmitter.Builder(metricContext, GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build(); transientExceptionMessages = new HashSet<>(properties.getPropAsList(TRANSIENT_EXCEPTION_MESSAGES_KEY, "")); + nonTransientExceptionMessages = new HashSet<>(properties.getPropAsList(NON_TRANSIENT_EXCEPTION_MESSAGES_KEY, "")); } @Override @@ -339,7 +343,7 @@ void writeWithMetadataWriters( try { writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec); } catch (Exception e) { - if (isExceptionTransient(e, transientExceptionMessages)) { + if (exceptionMatches(e, transientExceptionMessages)) { throw new RuntimeException("Failing container due to transient exception for db: " + dbName + " table: " + tableName, e); } meetException = true; @@ -390,8 +394,13 @@ private void addOrThrowException(Exception e, String tableString, String dbName, lastException.droppedPartitionValues.addAll(((HiveMetadataWriterWithPartitionInfoException) e).droppedPartitionValues); } this.datasetErrorMap.put(tableStatus.datasetPath, tableErrorMap); - log.error(String.format("Meet exception when flush table %s", tableString), e); - if (datasetErrorMap.size() > maxErrorDataset) { + if (!exceptionMatches(e, this.nonTransientExceptionMessages)) { + currentErrorDatasets.add(tableStatus.datasetPath); + log.error(String.format("Meet exception when flush table %s", tableString), e); + } else { + log.error(String.format("Detected known non-transient failure for table %s", tableString), e); + } + if (currentErrorDatasets.size() > maxErrorDataset) { //Fail the job if the error size exceeds some number throw new IOException(String.format("Container fails to flush for more than %s dataset, last exception we met is: ", maxErrorDataset), e); } @@ -412,7 +421,7 @@ private void flush(String dbName, String tableName) throws IOException { try { writer.flush(dbName, tableName); } catch (IOException e) { - if (isExceptionTransient(e, transientExceptionMessages)) { + if (exceptionMatches(e, transientExceptionMessages)) { throw new RuntimeException("Failing container due to transient exception for db: " + dbName + " table: " + tableName, e); } meetException = true; @@ -435,11 +444,12 @@ private void flush(String dbName, String tableName) throws IOException { } /** - * Check if exception is contained within a known list of transient exceptions. These exceptions should not be caught - * to avoid advancing watermarks and skipping GMCEs unnecessarily. + * Check if exception is contained within a known list of known exceptions. Transient exceptions should not be caught + * to avoid advancing watermarks and skipping GMCEs unnecessarily, while non-transient exceptions should not count + * towards the maximum number of failed datasets. */ - public static boolean isExceptionTransient(Exception e, Set transientExceptionMessages) { - return transientExceptionMessages.stream().anyMatch(message -> Throwables.getRootCause(e).toString().contains(message)); + public static boolean exceptionMatches(Exception e, Set exceptionMessages) { + return exceptionMessages.stream().anyMatch(message -> Throwables.getRootCause(e).toString().contains(message)); } /** diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java index 129817b5b93..415a1e5c81f 100644 --- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java +++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java @@ -213,12 +213,12 @@ public void testDetectTransientException() { Set transientExceptions = Sets.newHashSet("Filesystem closed", "Hive timeout", "RejectedExecutionException"); IOException transientException = new IOException("test1 Filesystem closed test"); IOException wrapperException = new IOException("wrapper exception", transientException); - Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(transientException, transientExceptions)); - Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(wrapperException, transientExceptions)); + Assert.assertTrue(GobblinMCEWriter.exceptionMatches(transientException, transientExceptions)); + Assert.assertTrue(GobblinMCEWriter.exceptionMatches(wrapperException, transientExceptions)); IOException nonTransientException = new IOException("Write failed due to bad schema"); - Assert.assertFalse(GobblinMCEWriter.isExceptionTransient(nonTransientException, transientExceptions)); + Assert.assertFalse(GobblinMCEWriter.exceptionMatches(nonTransientException, transientExceptions)); RejectedExecutionException rejectedExecutionException = new RejectedExecutionException(""); - Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(rejectedExecutionException, transientExceptions)); + Assert.assertTrue(GobblinMCEWriter.exceptionMatches(rejectedExecutionException, transientExceptions)); } @DataProvider(name="AllowMockMetadataWriter")