From 7ac1f4c06738c9daf9a0d0f38584cebcc9c2a4ea Mon Sep 17 00:00:00 2001 From: hanghangliu Date: Thu, 2 Nov 2023 00:11:12 -0700 Subject: [PATCH 1/9] add option to detect malformed ORC during commit phase --- .../apache/gobblin/writer/GobblinBaseOrcWriter.java | 11 +++++++++++ .../gobblin/writer/GobblinOrcWriterConfigs.java | 5 +++++ 2 files changed, 16 insertions(+) diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java index d8a2a353e9d..3741f4cca92 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java @@ -61,6 +61,7 @@ public abstract class GobblinBaseOrcWriter extends FsDataWriter { protected int batchSize; protected final S inputSchema; + private final boolean validateORCDuringCommit; private final boolean selfTuningWriter; private int selfTuneRowsBetweenCheck; private double rowBatchMemoryUsageFactor; @@ -94,6 +95,7 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder builder, State properties) this.inputSchema = builder.getSchema(); this.typeDescription = getOrcSchema(); this.selfTuningWriter = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, false); + this.validateORCDuringCommit = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_VALIDATE_FILE_DURING_COMMIT, false); this.maxOrcBatchSize = properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE, GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_BATCH_SIZE); this.batchSize = this.selfTuningWriter ? @@ -259,6 +261,15 @@ public void commit() throws IOException { closeInternal(); super.commit(); + if(this.validateORCDuringCommit) { + try { + OrcFile.createReader(this.outputFile, new OrcFile.ReaderOptions(conf)); + } catch (IOException ioException) { + log.error("Found error when validating ORC file during commit phase", ioException); + log.error("Delete the malformed ORC file is successful: {}", this.fs.delete(this.outputFile, false)); + throw ioException; + } + } if (this.selfTuningWriter) { properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_RECORD_SIZE, String.valueOf(getEstimatedRecordSizeBytes())); properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY, diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java index b0b859f930e..4b54f261ab0 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java @@ -22,6 +22,11 @@ */ public class GobblinOrcWriterConfigs { public static final String ORC_WRITER_PREFIX = "orcWriter."; + /** + * Configuration for enabling validation of ORC file to detect malformation. If enabled, will throw exception and + * delete malformed ORC file during commit + */ + public static final String ORC_WRITER_VALIDATE_FILE_DURING_COMMIT = ORC_WRITER_PREFIX + "validate.commit.file"; /** * Default buffer size in the ORC Writer before sending the records to the native ORC Writer */ From a0ba3b3d5f17afc50346e2b41adc67d195edb496 Mon Sep 17 00:00:00 2001 From: hanghangliu Date: Thu, 2 Nov 2023 00:14:09 -0700 Subject: [PATCH 2/9] better logging --- .../java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java index 3741f4cca92..412dcbdf1b1 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java @@ -265,7 +265,7 @@ public void commit() try { OrcFile.createReader(this.outputFile, new OrcFile.ReaderOptions(conf)); } catch (IOException ioException) { - log.error("Found error when validating ORC file during commit phase", ioException); + log.error("Found error when validating ORC file {} during commit phase", this.outputFile, ioException); log.error("Delete the malformed ORC file is successful: {}", this.fs.delete(this.outputFile, false)); throw ioException; } From 0ebce6053d04a436b8f2d6997b56f10266a4015f Mon Sep 17 00:00:00 2001 From: hanghangliu Date: Thu, 2 Nov 2023 09:43:44 -0700 Subject: [PATCH 3/9] address comment --- .../apache/gobblin/writer/GobblinBaseOrcWriter.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java index 412dcbdf1b1..8a49655c602 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java @@ -24,6 +24,7 @@ import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.gobblin.util.HadoopUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.orc.OrcConf; @@ -260,16 +261,18 @@ public void close() public void commit() throws IOException { closeInternal(); - super.commit(); if(this.validateORCDuringCommit) { try { - OrcFile.createReader(this.outputFile, new OrcFile.ReaderOptions(conf)); + OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf)); } catch (IOException ioException) { - log.error("Found error when validating ORC file {} during commit phase", this.outputFile, ioException); - log.error("Delete the malformed ORC file is successful: {}", this.fs.delete(this.outputFile, false)); + log.error("Found error when validating ORC file during commit phase", ioException); + HadoopUtils.deletePath(this.fs, this.stagingFile, false); + log.error("Delete the malformed ORC file after close the writer: {}", this.stagingFile); throw ioException; } } + super.commit(); + if (this.selfTuningWriter) { properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_RECORD_SIZE, String.valueOf(getEstimatedRecordSizeBytes())); properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY, From b4dee971a1ca3045bf9cc151b8e6cd8671cc6863 Mon Sep 17 00:00:00 2001 From: hanghangliu Date: Thu, 2 Nov 2023 09:47:39 -0700 Subject: [PATCH 4/9] catch more generic exception --- .../org/apache/gobblin/writer/GobblinBaseOrcWriter.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java index 8a49655c602..d195fb8194d 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java @@ -264,11 +264,11 @@ public void commit() if(this.validateORCDuringCommit) { try { OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf)); - } catch (IOException ioException) { - log.error("Found error when validating ORC file during commit phase", ioException); + } catch (Exception e) { + log.error("Found error when validating ORC file during commit phase", e); HadoopUtils.deletePath(this.fs, this.stagingFile, false); log.error("Delete the malformed ORC file after close the writer: {}", this.stagingFile); - throw ioException; + throw e; } } super.commit(); From c0b45186479438b6521bbcf31145842aaab44704 Mon Sep 17 00:00:00 2001 From: hanghangliu Date: Thu, 2 Nov 2023 11:34:01 -0700 Subject: [PATCH 5/9] validate ORC file after close --- .../gobblin/writer/GobblinBaseOrcWriter.java | 25 ++++++++++--------- .../writer/GobblinOrcWriterConfigs.java | 2 +- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java index d195fb8194d..cbc19eda008 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.orc.OrcConf; import org.apache.orc.OrcFile; +import org.apache.orc.Reader; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; @@ -62,7 +63,7 @@ public abstract class GobblinBaseOrcWriter extends FsDataWriter { protected int batchSize; protected final S inputSchema; - private final boolean validateORCDuringCommit; + private final boolean validateORCAfterClose; private final boolean selfTuningWriter; private int selfTuneRowsBetweenCheck; private double rowBatchMemoryUsageFactor; @@ -96,7 +97,7 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder builder, State properties) this.inputSchema = builder.getSchema(); this.typeDescription = getOrcSchema(); this.selfTuningWriter = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, false); - this.validateORCDuringCommit = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_VALIDATE_FILE_DURING_COMMIT, false); + this.validateORCAfterClose = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_VALIDATE_FILE_AFTER_CLOSE, false); this.maxOrcBatchSize = properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE, GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_BATCH_SIZE); this.batchSize = this.selfTuningWriter ? @@ -244,6 +245,16 @@ protected synchronized void closeInternal() throw new CloseBeforeFlushException(this.inputSchema.toString()); } } + // Validate the ORC file after writer close. Default is false as it introduce more load to FS and decrease the performance + if(this.validateORCAfterClose) { + try(Reader reader =OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf))) { + } catch (Exception e) { + log.error("Found error when validating ORC file during commit phase", e); + HadoopUtils.deletePath(this.fs, this.stagingFile, false); + log.error("Delete the malformed ORC file after close the writer: {}", this.stagingFile); + throw e; + } + } } @Override @@ -261,16 +272,6 @@ public void close() public void commit() throws IOException { closeInternal(); - if(this.validateORCDuringCommit) { - try { - OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf)); - } catch (Exception e) { - log.error("Found error when validating ORC file during commit phase", e); - HadoopUtils.deletePath(this.fs, this.stagingFile, false); - log.error("Delete the malformed ORC file after close the writer: {}", this.stagingFile); - throw e; - } - } super.commit(); if (this.selfTuningWriter) { diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java index 4b54f261ab0..89621d38c1b 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java @@ -26,7 +26,7 @@ public class GobblinOrcWriterConfigs { * Configuration for enabling validation of ORC file to detect malformation. If enabled, will throw exception and * delete malformed ORC file during commit */ - public static final String ORC_WRITER_VALIDATE_FILE_DURING_COMMIT = ORC_WRITER_PREFIX + "validate.commit.file"; + public static final String ORC_WRITER_VALIDATE_FILE_AFTER_CLOSE = ORC_WRITER_PREFIX + "validate.file.after.close"; /** * Default buffer size in the ORC Writer before sending the records to the native ORC Writer */ From 0adc6228446ac9922effbc0e7c774712d6ab742a Mon Sep 17 00:00:00 2001 From: hanghangliu Date: Thu, 2 Nov 2023 11:42:31 -0700 Subject: [PATCH 6/9] move validate in between close and commit --- .../gobblin/writer/GobblinBaseOrcWriter.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java index cbc19eda008..55dc05d4bb9 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java @@ -245,16 +245,6 @@ protected synchronized void closeInternal() throw new CloseBeforeFlushException(this.inputSchema.toString()); } } - // Validate the ORC file after writer close. Default is false as it introduce more load to FS and decrease the performance - if(this.validateORCAfterClose) { - try(Reader reader =OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf))) { - } catch (Exception e) { - log.error("Found error when validating ORC file during commit phase", e); - HadoopUtils.deletePath(this.fs, this.stagingFile, false); - log.error("Delete the malformed ORC file after close the writer: {}", this.stagingFile); - throw e; - } - } } @Override @@ -272,6 +262,16 @@ public void close() public void commit() throws IOException { closeInternal(); + // Validate the ORC file after writer close. Default is false as it introduce more load to FS and decrease the performance + if(this.validateORCAfterClose) { + try(Reader reader =OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf))) { + } catch (Exception e) { + log.error("Found error when validating ORC file during commit phase", e); + HadoopUtils.deletePath(this.fs, this.stagingFile, false); + log.error("Delete the malformed ORC file after close the writer: {}", this.stagingFile); + throw e; + } + } super.commit(); if (this.selfTuningWriter) { From 7be8b8060d83639509cb5207112576f97ad24ab0 Mon Sep 17 00:00:00 2001 From: hanghangliu Date: Thu, 2 Nov 2023 11:45:44 -0700 Subject: [PATCH 7/9] syntax --- .../java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java index 55dc05d4bb9..02f8aa41cfc 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java @@ -264,7 +264,7 @@ public void commit() closeInternal(); // Validate the ORC file after writer close. Default is false as it introduce more load to FS and decrease the performance if(this.validateORCAfterClose) { - try(Reader reader =OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf))) { + try (Reader reader =OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf))) { } catch (Exception e) { log.error("Found error when validating ORC file during commit phase", e); HadoopUtils.deletePath(this.fs, this.stagingFile, false); From 662ffbb33d2ea908c5dfb028caccf16411d926f5 Mon Sep 17 00:00:00 2001 From: hanghangliu Date: Thu, 2 Nov 2023 11:51:28 -0700 Subject: [PATCH 8/9] whitespace --- .../java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java index 02f8aa41cfc..94d44c811ba 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java @@ -264,7 +264,7 @@ public void commit() closeInternal(); // Validate the ORC file after writer close. Default is false as it introduce more load to FS and decrease the performance if(this.validateORCAfterClose) { - try (Reader reader =OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf))) { + try (Reader reader = OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf))) { } catch (Exception e) { log.error("Found error when validating ORC file during commit phase", e); HadoopUtils.deletePath(this.fs, this.stagingFile, false); From 24c58bcb3242098ff38dacc159a96bdf4bff9249 Mon Sep 17 00:00:00 2001 From: hanghangliu Date: Thu, 2 Nov 2023 14:26:47 -0700 Subject: [PATCH 9/9] update log --- .../java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java index 94d44c811ba..9f81e82d9ca 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java @@ -266,9 +266,9 @@ public void commit() if(this.validateORCAfterClose) { try (Reader reader = OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf))) { } catch (Exception e) { - log.error("Found error when validating ORC file during commit phase", e); + log.error("Found error when validating staging ORC file {} during commit phase. " + + "Will delete the malformed file and terminate the commit", this.stagingFile, e); HadoopUtils.deletePath(this.fs, this.stagingFile, false); - log.error("Delete the malformed ORC file after close the writer: {}", this.stagingFile); throw e; } }