diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java index d8a2a353e9d..9f81e82d9ca 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java @@ -24,10 +24,12 @@ import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.gobblin.util.HadoopUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.orc.OrcConf; import org.apache.orc.OrcFile; +import org.apache.orc.Reader; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; @@ -61,6 +63,7 @@ public abstract class GobblinBaseOrcWriter extends FsDataWriter { protected int batchSize; protected final S inputSchema; + private final boolean validateORCAfterClose; private final boolean selfTuningWriter; private int selfTuneRowsBetweenCheck; private double rowBatchMemoryUsageFactor; @@ -94,6 +97,7 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder builder, State properties) this.inputSchema = builder.getSchema(); this.typeDescription = getOrcSchema(); this.selfTuningWriter = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, false); + this.validateORCAfterClose = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_VALIDATE_FILE_AFTER_CLOSE, false); this.maxOrcBatchSize = properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE, GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_BATCH_SIZE); this.batchSize = this.selfTuningWriter ? @@ -258,7 +262,18 @@ public void close() public void commit() throws IOException { closeInternal(); + // Validate the ORC file after writer close. Default is false as it introduce more load to FS and decrease the performance + if(this.validateORCAfterClose) { + try (Reader reader = OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf))) { + } catch (Exception e) { + log.error("Found error when validating staging ORC file {} during commit phase. " + + "Will delete the malformed file and terminate the commit", this.stagingFile, e); + HadoopUtils.deletePath(this.fs, this.stagingFile, false); + throw e; + } + } super.commit(); + if (this.selfTuningWriter) { properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_RECORD_SIZE, String.valueOf(getEstimatedRecordSizeBytes())); properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY, diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java index b0b859f930e..89621d38c1b 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java @@ -22,6 +22,11 @@ */ public class GobblinOrcWriterConfigs { public static final String ORC_WRITER_PREFIX = "orcWriter."; + /** + * Configuration for enabling validation of ORC file to detect malformation. If enabled, will throw exception and + * delete malformed ORC file during commit + */ + public static final String ORC_WRITER_VALIDATE_FILE_AFTER_CLOSE = ORC_WRITER_PREFIX + "validate.file.after.close"; /** * Default buffer size in the ORC Writer before sending the records to the native ORC Writer */