Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Hotfix][GOBBLIN-1949] add option to detect malformed orc during commit #3818

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.gobblin.util.HadoopUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
Expand Down Expand Up @@ -61,6 +63,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
protected int batchSize;
protected final S inputSchema;

private final boolean validateORCAfterClose;
private final boolean selfTuningWriter;
private int selfTuneRowsBetweenCheck;
private double rowBatchMemoryUsageFactor;
Expand Down Expand Up @@ -94,6 +97,7 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
this.inputSchema = builder.getSchema();
this.typeDescription = getOrcSchema();
this.selfTuningWriter = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, false);
this.validateORCAfterClose = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_VALIDATE_FILE_AFTER_CLOSE, false);
this.maxOrcBatchSize = properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE,
GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_BATCH_SIZE);
this.batchSize = this.selfTuningWriter ?
Expand Down Expand Up @@ -241,6 +245,16 @@ protected synchronized void closeInternal()
throw new CloseBeforeFlushException(this.inputSchema.toString());
}
}
// Validate the ORC file after writer close. Default is false as it introduce more load to FS and decrease the performance
if(this.validateORCAfterClose) {
try(Reader reader =OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf))) {
hanghangliu marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
log.error("Found error when validating ORC file during commit phase", e);
HadoopUtils.deletePath(this.fs, this.stagingFile, false);
log.error("Delete the malformed ORC file after close the writer: {}", this.stagingFile);
throw e;
}
}
}

@Override
Expand All @@ -259,6 +273,7 @@ public void commit()
throws IOException {
closeInternal();
super.commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line calls FsDataWriter and triggers the moving from staging to output directory. Is there a reason for us to do all that work and then do the validation?

I also wonder why this is part of the commit step and not part of the close step. close does not call this method, but it does do the flush.

If we close and the flushed file turns out not to be valid, we will miss the validation here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking maybe there's something wrong during moving. But given the issue is malformed files, so the issue should already be there after writer closed. So move the logic to after closeInternal() is called.


if (this.selfTuningWriter) {
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_RECORD_SIZE, String.valueOf(getEstimatedRecordSizeBytes()));
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
*/
public class GobblinOrcWriterConfigs {
public static final String ORC_WRITER_PREFIX = "orcWriter.";
/**
* Configuration for enabling validation of ORC file to detect malformation. If enabled, will throw exception and
* delete malformed ORC file during commit
*/
public static final String ORC_WRITER_VALIDATE_FILE_AFTER_CLOSE = ORC_WRITER_PREFIX + "validate.file.after.close";
/**
* Default buffer size in the ORC Writer before sending the records to the native ORC Writer
*/
Expand Down