Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Hotfix][GOBBLIN-1949] add option to detect malformed orc during commit #3818

Conversation

hanghangliu
Copy link
Contributor

@hanghangliu hanghangliu commented Nov 2, 2023

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
    Added an option to validate of ORC file to detect malformation. If enabled, will throw exception and delete malformed ORC file during commit

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

OrcFile.createReader(this.outputFile, new OrcFile.ReaderOptions(conf));
} catch (IOException ioException) {
log.error("Found error when validating ORC file {} during commit phase", this.outputFile, ioException);
log.error("Delete the malformed ORC file is successful: {}", this.fs.delete(this.outputFile, false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the severity of failing to delete this ORC file, do you think we should retry this operation?

Check for references to retryer in the code base for an easy out of the box impl

Copy link
Contributor

@homatthew homatthew Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And is there a world where this operation fails because of Filesystem is closed error? Do we need to account for that edge case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also notice that the parent class FsDataWriter uses HadoopUtils.deletePath. It's subtle, but perhaps we should use that method instead when deleting because they consider some edge case where it throws an io exception if the file exists but the file fails to delete. I'd imagine if the FileSystem delete covered that edge case, the util method would not have been created, but it's hard to validate when there will be an io exception because it is not documented anywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using HadoopUtils.deletePath to delete the file now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not sure about the retries. If fs delete file fails, we won't delete the file but also won't retry. This works when we call commit because we throw the IO exception to prevent the file from being moved. But we do not do this when we close the file in the close function, which calls closeInternal().

If we flush the buffer, we should check after that the file is valid

@@ -259,6 +261,15 @@ public void commit()
throws IOException {
closeInternal();
super.commit();
if(this.validateORCDuringCommit) {
try {
OrcFile.createReader(this.outputFile, new OrcFile.ReaderOptions(conf));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future readers, I think the below observation is nuanced and worth spelling out. It may even be worth a comment.

This will work for files of size [1,3] bytes. It will not catch empty files, which I think is a very very subtle thing. and I think that's okay as long as users are using native ORC readers.

https://github.com/apache/orc/blob/24beffb6fed6d408e25654e53c255f564c8bd8a9/java/core/src/java/org/apache/orc/impl/ReaderImpl.java#L797C1-L802

Since it seems like part of the standard, computing engines like trino support it https://trino.io/blog/2019/05/29/improved-hive-bucketing.html#whats-the-problem. For some time, presto did not support these empty files but now also does to follow the convention of hive

if(this.validateORCDuringCommit) {
try {
OrcFile.createReader(this.outputFile, new OrcFile.ReaderOptions(conf));
} catch (IOException ioException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From reading the readerimpl, it can throw 2 checked exceptions,

Both of which extend IOException.

My question to you is what if there's any other runtime exception? Should we still delete? I lean toward yes. But maybe I am missing some edge case here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we should delete in case of other runtime or unchecked exception, for the sake of robustness. Changed to generic Exception

@@ -259,6 +261,15 @@ public void commit()
throws IOException {
closeInternal();
super.commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line calls FsDataWriter and triggers the moving from staging to output directory. Is there a reason for us to do all that work and then do the validation?

I also wonder why this is part of the commit step and not part of the close step. close does not call this method, but it does do the flush.

If we close and the flushed file turns out not to be valid, we will miss the validation here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking maybe there's something wrong during moving. But given the issue is malformed files, so the issue should already be there after writer closed. So move the logic to after closeInternal() is called.

@hanghangliu hanghangliu requested a review from homatthew November 2, 2023 16:50
@@ -258,7 +261,18 @@ public void close()
public void commit()
throws IOException {
closeInternal();
if(this.validateORCDuringCommit) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#3818 (comment)

This issue is still present. We want to move this to close function and not just commit because we flush the buffer there too and if it's malformed then we want to catch it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I think the current issue is caused during the close sequence, so we need to destroy the file

Caused by: java.nio.channels.ClosedChannelException
	at org.apache.hadoop.hdfs.DataStreamer$LastExceptionInStreamer.throwException4Close(DataStreamer.java:324)
	at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:152)
	at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:105)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.hadoop.fs.RetryingOutputStream.lambda$write$1(RetryingOutputStream.java:237)
	at org.apache.hadoop.fs.RetryPolicy.lambda$run$0(RetryPolicy.java:137)
	at org.apache.hadoop.fs.NoOpRetryPolicy.run(NoOpRetryPolicy.java:36)
	at org.apache.hadoop.fs.RetryPolicy.run(RetryPolicy.java:136)
	at org.apache.hadoop.fs.RetryingOutputStream.runWithRetries(RetryingOutputStream.java:301)
	at org.apache.hadoop.fs.RetryingOutputStream.write(RetryingOutputStream.java:234)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.orc.impl.PhysicalFsWriter$DirectStream.output(PhysicalFsWriter.java:314)
	at org.apache.orc.impl.OutStream.outputBuffer(OutStream.java:163)
	at org.apache.orc.impl.OutStream.flush(OutStream.java:359)
	at org.apache.orc.impl.PhysicalFsWriter.writeFileFooter(PhysicalFsWriter.java:457)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im curious whats the validation and how it works. Does it validate on the header or something else.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does all sorts of validations. I attached below 1 example. You can dig around the class for a bunch of others

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, the current issues we see are from writing a bad orc file and then moving it to the taskoutput directory where the file is effectively committed.

We do NOT want to modify the behavior of the base data publisher because its such a widely used class with very wide implications. But the current behavior of the base data publisher is to read all the files in the output dir and use runners to move them all in parallel. It has nothing to do with who originally wrote the file, it will blindly move all of them at that point.

The base data publisher is not a good place to do validation either because it does not care about the data being moved, it's agnostic to data formats.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@ZihanLi58 ZihanLi58 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we decide to add check in this step instead of GMCE emission? Will we introduce more load to HDFS and decrease the overall performance here?

@hanghangliu
Copy link
Contributor Author

Why do we decide to add check in this step instead of GMCE emission? Will we introduce more load to HDFS and decrease the overall performance here?

The main concern is if we only fail during GMCE emission, then the detection is fully rely on the iceberg registration. If it's not present or we do hive registration, the issue may still exist

@@ -258,7 +261,18 @@ public void close()
public void commit()
throws IOException {
closeInternal();
if(this.validateORCDuringCommit) {
try {
OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment that this is increasing HDFS load because we open again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add reader to the try with statement so that it is always closed

@@ -258,7 +261,18 @@ public void close()
public void commit()
throws IOException {
closeInternal();
if(this.validateORCDuringCommit) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I think the current issue is caused during the close sequence, so we need to destroy the file

Caused by: java.nio.channels.ClosedChannelException
	at org.apache.hadoop.hdfs.DataStreamer$LastExceptionInStreamer.throwException4Close(DataStreamer.java:324)
	at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:152)
	at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:105)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.hadoop.fs.RetryingOutputStream.lambda$write$1(RetryingOutputStream.java:237)
	at org.apache.hadoop.fs.RetryPolicy.lambda$run$0(RetryPolicy.java:137)
	at org.apache.hadoop.fs.NoOpRetryPolicy.run(NoOpRetryPolicy.java:36)
	at org.apache.hadoop.fs.RetryPolicy.run(RetryPolicy.java:136)
	at org.apache.hadoop.fs.RetryingOutputStream.runWithRetries(RetryingOutputStream.java:301)
	at org.apache.hadoop.fs.RetryingOutputStream.write(RetryingOutputStream.java:234)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.orc.impl.PhysicalFsWriter$DirectStream.output(PhysicalFsWriter.java:314)
	at org.apache.orc.impl.OutStream.outputBuffer(OutStream.java:163)
	at org.apache.orc.impl.OutStream.flush(OutStream.java:359)
	at org.apache.orc.impl.PhysicalFsWriter.writeFileFooter(PhysicalFsWriter.java:457)

OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf));
} catch (Exception e) {
log.error("Found error when validating ORC file during commit phase", e);
HadoopUtils.deletePath(this.fs, this.stagingFile, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to do this validation here or if its being published at a folder level at a different step, is when we have the reader validate all the files going into publish? My concern is that even if we are validating and deleting files at the writer step, this still assumes that this writer will shut down using a happy path and not suddenly. If it shuts down due to some container error code e.g. 143 and not go through the cleanup process then it can still be moved to another folder later down the line?

@@ -94,6 +96,7 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
this.inputSchema = builder.getSchema();
this.typeDescription = getOrcSchema();
this.selfTuningWriter = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, false);
this.validateORCDuringCommit = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_VALIDATE_FILE_DURING_COMMIT, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any concern not defaulting to True ?
I feel the validation should be "default". unless i miss something obvious

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are concerns about an extra HDFS call, and what that would do to HDFS load. Internally we will enable it everywhere but we wouldn't want anyone to accidentally start having increased load, so usually we keep things disabled by default for backward compatibility

} catch (Exception e) {
log.error("Found error when validating ORC file during commit phase", e);
HadoopUtils.deletePath(this.fs, this.stagingFile, false);
log.error("Delete the malformed ORC file after close the writer: {}", this.stagingFile);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this an 'info' log?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also make the statement past tense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more an error situation because ideally it shouldn't happen, so we need to delete the file and terminate the commit/ingestion

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 269 is error message, agreed.
This is info because you are giving information that you have deleted the malformed file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's an info as we delete the file. But we can log the message at the ERROR level in case user disabled for info level, as this is critical information we'd like it to speak loud.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if thats the case you can modify the error message to be like at line 269.

"Found error when validating ORC file during commit phase. Deleting the malformed ORC file and closing the writer."

The tense of the statement is important in logs to understand whats being done and what is already done.

Copy link
Contributor

@homatthew homatthew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good stuff

@hanghangliu hanghangliu requested a review from Will-Lo November 2, 2023 19:17
@Will-Lo
Copy link
Contributor

Will-Lo commented Nov 2, 2023

Can we add a JIRA ticket with some context just for tracking purposes? Even though it is a hotfix

@hanghangliu hanghangliu changed the title [Hotfix] add option to detect malformed orc during commit [Hotfix][GOBBLIN-1949] add option to detect malformed orc during commit Nov 2, 2023
@hanghangliu
Copy link
Contributor Author

For future work, we can consider clean up the task output dir every time when container start up to avoid reading files from failed container dir. It would be better on performance wise rather than read-after-write like this patch.

@codecov-commenter
Copy link

codecov-commenter commented Nov 2, 2023

Codecov Report

Merging #3818 (24c58bc) into master (da6b1df) will increase coverage by 1.27%.
Report is 3 commits behind head on master.
The diff coverage is n/a.

@@             Coverage Diff              @@
##             master    #3818      +/-   ##
============================================
+ Coverage     47.63%   48.90%   +1.27%     
+ Complexity    11047     8047    -3000     
============================================
  Files          2155     1487     -668     
  Lines         85322    59091   -26231     
  Branches       9488     6808    -2680     
============================================
- Hits          40645    28901   -11744     
+ Misses        40984    27509   -13475     
+ Partials       3693     2681    -1012     

see 698 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@Will-Lo Will-Lo merged commit c1bb67c into apache:master Nov 2, 2023
6 checks passed
Will-Lo added a commit to Will-Lo/incubator-gobblin that referenced this pull request Dec 20, 2023
* [GOBBLIN-1921] Properly handle reminder events (apache#3790)

* Add millisecond level precision to timestamp cols & proper timezone conversion

	- existing tests pass with minor modifications

* Handle reminder events properly

* Fix compilation errors & add isReminder flag

* Add unit tests

* Address review comments

* Add newline to address comment

* Include reminder/original tag in logging

* Clarify timezone issues in comment

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1924] Reminder event flag true (apache#3795)

* Set reminder event flag to true for reminders

* Update unit tests

* remove unused variable

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1923] Add retention for lease arbiter table (apache#3792)

* Add retention for lease arbiter table

* Replace blocking thread with scheduled thread pool executor

* Make Calendar instance thread-safe

* Rename variables, make values more clear

* Update timestamp related cols

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* Change debug statements to info temporarily to debug (apache#3796)

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1926] Fix Reminder Event Epsilon Comparison (apache#3797)

* Fix Reminder Event Epsilon Comparison

* Add TODO comment

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1930] Improve Multi-active related logs and metrics (apache#3800)

* Improve Multi-active related logs and metrics

* Add more metrics and logs around forwarding dag action to DagManager

* Improve logs in response to review comments

* Replace flow execution id with trigger timestamp from multi-active

* Update flow action execution id within lease arbiter

* Fix test & make Lease Statuses more lean

* Update javadoc

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* add dataset root in some common type of datasets

* [GOBBLIN-1927] Add topic validation support in KafkaSource, and add TopicNameValidator (apache#3793)

* * Add generic topic validation support
* Add the first validator TopicNameValidator into the validator chain, as a refactor of existing codes

* Refine to address comments

* Refine

---------

Co-authored-by: Tao Qin <tqin@linkedin.com>

* [GOBBLIN-1931] Refactor dag action updating method & add clarifying comment (apache#3801)

* Refactor dag action updating method & add clarifying comment

* Log filtered out duplicate messages

* logs and metrics for missing messages from change monitor

* Only add gobblin.service prefix for dagActionStoreChangeMonitor

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1934] Monitor High Level Consumer queue size (apache#3805)

* Emit metrics to monitor high level consumer queue size

* Empty commit to trigger tests

* Use BlockingQueue.size() func instead of atomic integer array

* Remove unused import & add DagActionChangeMonitor prefix to metric

* Refactor to avoid repeating code

* Make protected variables private where possible

* Fix white space

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1935] Skip null dag action types unable to be processed (apache#3807)

* Skip over null dag actions from malformed messages

* Add new metric for skipped messages

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1922]Add function in Kafka Source to recompute workUnits for filtered partitions (apache#3798)

* add function in Kafka Source to recompute workUnits for filtered partitions

* address comments

* set default min container value to 1

* add condition when create empty wu

* update the condition

* Expose functions to fetch record partitionColumn value (apache#3810)

* [GOBBLIN-1938] preserve x bit in manifest file based copy (apache#3804)

* preserve x bit in manifest file based copy
* fix project structure preventing running unit tests from intellij
* fix unit test

* [GOBBLIN-1919] Simplify a few elements of MR-related job exec before reusing code in Temporal-based execution (apache#3784)

* Simplify a few elements of MR-related job exec before reusing code in Temporal-based execution

* Add JSON-ification to several foundational config-state representations, plus encapsulated convience method `JobState.getJobIdFromProps`

* Update javadoc comments

* Encapsulate check for whether a path has the extension of a multi-work-unit

* [GOBBLIN-1939] Bump AWS version to use a compatible version of Jackson with Gobblin (apache#3809)

* Bump AWS version to use a compatible version of jackson with Gobblin

* use shared aws version

* [GOBBLIN-1937] Quantify Missed Work Completed by Reminders (apache#3808)

* Quantify Missed Work Completed by Reminders
   Also fix bug to filter out heartbeat events before extracting field

* Refactor changeMonitorUtils & add delimiter to metrics prefix

* Re-order params to group similar ones

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* GOBBLIN-1933]Change the logic in completeness verifier to support multi reference tier (apache#3806)

* address comments

* use connectionmanager when httpclient is not cloesable

* [GOBBLIN-1933] Change the logic in completeness verifier to support multi reference tier

* add uite test

* fix typo

* change the javadoc

* change the javadoc

---------

Co-authored-by: Zihan Li <zihli@zihli-mn2.linkedin.biz>

* [GOBBLIN-1943] Use AWS version 1.12.261 to fix a security vulnerability in the previous version (apache#3813)

* [GOBBLIN-1941] Develop Temporal abstractions, including `Workload` for workflows of unbounded size through sub-workflow nesting (apache#3811)

* Define `Workload` abstraction for Temporal workflows of unbounded size through sub-workflow nesting

* Adjust Gobblin-Temporal configurability for consistency and abstraction

* Define `WorkerConfig`, to pass the `TemporalWorker`'s configuration to the workflows and activities it hosts

* Improve javadoc

* Javadoc fixup

* Minor changes

* Update per review suggestions

* Insert pause, to spread the load on the temporal server, before launch of each child workflow that may have direct leaves of its own

* Appease findbugs by having `SeqSliceBackedWorkSpan::next` throw `NoSuchElementException`

* Add comment

* [GOBBLIN-1944] Add gobblin-temporal load generator for a single subsuming super-workflow with a configurable number of activities nested beneath (apache#3815)

* Add gobblin-temporal load generator for a single subsuming super-workflow with a configurable number of activities nested beneath

* Update per findbugs advice

* Improve processing of int props

* [GOBBLIN-1945] Implement Distributed Data Movement (DDM) Gobblin-on-Temporal `WorkUnit` evaluation (apache#3816)

* Implement Distributed Data Movement (DDM) Gobblin-on-Temporal `WorkUnit` evaluation

* Adjust work unit processing tuning for start-to-close timeout and nested execution branching

* Rework `ProcessWorkUnitImpl` and fix `FileSystem` misuse; plus convenience abstractions to load `FileSystem`, `JobState`, and `StateStore<TaskState>`

* Fix `FileSystem` resource lifecycle, uniquely name each workflow, and drastically reduce worker concurrent task execution

* Heed findbugs advice

* prep before commit

* Improve processing of required props

* Update comment in response to PR feedback

* [GOBBLIN-1942] Create MySQL util class for re-usable methods and setup MysqlDagActio… (apache#3812)

* Create MySQL util class for re-usable methods and setup MysqlDagActionStore retention

* Add a java doc

* Address review comments

* Close scheduled executors on shutdown & clarify naming and comments

* Remove extra period making config key invalid

* implement Closeable

* Use try with resources

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [Hotfix][GOBBLIN-1949] add option to detect malformed orc during commit (apache#3818)

* add option to detect malformed ORC during commit phase

* better logging

* address comment

* catch more generic exception

* validate ORC file after close

* move validate in between close and commit

* syntax

* whitespace

* update log

* [GOBBLIN-1948] Use same flowExecutionId across participants (apache#3819)

* Use same flowExecutionId across participants
* Set config field as well in new FlowSpec
* Use gobblin util to create config
* Rename function and move to util
---------
Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* Allow extension of functions in GobblinMCEPublisher and customization of fileList file metrics are calculated for (apache#3820)

* [GOBBLIN-1951] Emit GTE when deleting corrupted ORC files (apache#3821)

* [GOBBLIN-1951] Emit GTE when deleting corrupted ORC files

This commit adds ORC file validation during the commit phase and deletes
corrupted files. It also includes a test for ORC file validation.

* Linter fixes

* Add framework and unit tests for DagActionStoreChangeMonitor (apache#3817)

* Add framework and unit tests for DagActionStoreChangeMonitor

* Add more test cases and validation

* Add header for new file

* Move FlowSpec static function to Utils class

* Remove unused import

* Fix compile error

* Fix unit tests

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1952] Make jobname shortening in GaaS more aggressive (apache#3822)

* Make jobname shortening in GaaS more aggressive

* Change long name prefix to flowgroup

* Make KafkaTopicGroupingWorkUnitPacker pack with desired num of container (apache#3814)

* Make KafkaTopicGroupingWorkUnitPacker pack with desired num of container

* update comment

* [GOBBLIN-1953] Add an exception message to orc writer validation GTE (apache#3826)

* Fix FlowSpec Updating Function (apache#3823)

* Fix FlowSpec Updating Function
   * makes Config object with FlowSpec mutable
   * adds unit test to ensure flow compiles after updating FlowSpec
   * ensure DagManager resilient to exceptions on leadership change

* Only update Properties obj not Config to avoid GC overhead

* Address findbugs error

* Avoid updating or creating new FlowSpec objects by passing flowExecutionId directly to metadata

* Remove changes that are not needed anymore

* Add TODO to handle failed DagManager leadership change

* Overload function and add more documentation

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* Emit metric to tune LeaseArbiter Linger metric  (apache#3824)

* Monitor number of failed persisting leases to tune linger

* Increase default linger and epsilon values

* Add metric for lease persisting success

* Rename metrics

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1956]Make Kafka streaming pipeline be able to config the max poll records during runtime (apache#3827)

* address comments

* use connectionmanager when httpclient is not cloesable

* add uite test

* fix typo

* [GOBBLIN-1956] Make Kafka streaming pipeline be able to config the max poll records during runtime

* small refractor

---------

Co-authored-by: Zihan Li <zihli@zihli-mn2.linkedin.biz>

* Add semantics for failure on partial success (apache#3831)

* Consistly handle Rest.li /flowexecutions KILL and RESUME actions (apache#3830)

* [GOBBLIN-1957] GobblinOrcwriter improvements for large records (apache#3828)

* WIP

* Optimization to limit batchsize based on large record sizes

* Address review

* Use DB-qualified table ID as `IcebergTable` dataset descriptor (apache#3834)

* [GOBBLIN-1961] Allow `IcebergDatasetFinder` to use separate names for source vs. destination-side DB and table (apache#3835)

* Allow `IcebergDatasetFinder` to use separate names for source vs. destination-side DB and table

* Adjust Mockito.verify to pass test

* Prevent NPE in `FlowCompilationValidationHelper.validateAndHandleConcurrentExecution` (apache#3836)

* Prevent NPE in `FlowCompilationValidationHelper.validateAndHandleConcurrentExecution`

* improved `MultiHopFlowCompiler` javadoc

* Delete Launch Action Events After Processing (apache#3837)

* Delete launch action event after persisting

* Fix default value for flowExecutionId retrieval from metadata map

* Address review comments and add unit test

* Code clean up

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1960] Emit audit count after commit in IcebergMetadataWriter (apache#3833)

* Emit audit count after commit in IcebergMetadataWriter

* Unit tests by extracting to a post commit

* Emit audit count first

* find bugs complaint

* [GOBBLIN-1967] Add external data node for generic ingress/egress on GaaS (apache#3838)

* Add external data node for generic ingress/egress on GaaS

* Address reviews and cleanup

* Use URI representation for external dataset descriptor node

* Fix error message in containing check

* Address review

* [GOBBLIN-1971] Allow `IcebergCatalog` to specify the `DatasetDescriptor` name for the `IcebergTable`s it creates (apache#3842)

* Allow `IcebergCatalog` to specify the `DatasetDescriptor` name for the `IcebergTable`s it creates

* small method javadoc

* [GOBBLIN-1970] Consolidate processing dag actions to one code path (apache#3841)

* Consolidate processing dag actions to one code path

* Delete dag action in failure cases too

* Distinguish metrics for startup

* Refactor to avoid duplicated code and create static metrics proxy class

* Remove DagManager checks that don't apply on startup

* Add test to check kill/resume dag action removal after processing

* Remove unused import

* Initialize metrics proxy with Null Pattern

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1972] Fix `CopyDataPublisher` to avoid committing post-publish WUs before they've actually run (apache#3844)

* Fix `CopyDataPublisher` to avoid committing post-publish WUs before they've actually run

* fixup findbugsMain

* [GOBBLIN-1975] Keep job.name configuration immutable if specified on GaaS (apache#3847)

* Revert "[GOBBLIN-1952] Make jobname shortening in GaaS more aggressive (apache#3822)"

This reverts commit 5619a0a.

* use configuration to keep specified jobname if enabled

* Cleanup

* [GOBBLIN-1974] Ensure Adhoc Flows can be Executed in Multi-active Scheduler state (apache#3846)

* Ensure Adhoc Flows can be Executed in Multi-active Scheduler state

* Only delete spec for adhoc flows & always after orchestration

* Delete adhoc flows when dagManager is not present as well

* Fix flaky test for scheduler

* Add clarifying comment about failure recovery

* Re-ordered private method

* Move private methods again

* Enforce sequential ordering of unit tests to make more reliable

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1973] Change Manifest distcp logic to compare permissions of source and dest files even when source is older (apache#3845)

* change should copy logic

* Add tests, address review

* Fix checkstyle

* Remove unused imports

* [GOBBLIN-1976] Allow an `IcebergCatalog` to override the `DatasetDescriptor` platform name for the `IcebergTable`s it creates (apache#3848)

* Allow an `IcebergCatalog` to override the `DatasetDescriptor` platform for the `IcebergTable`s it creates

* fixup javadoc

* Log when `PasswordManager` fails to load any master password (apache#3849)

* [GOBBLIN-1968] Temporal commit step integration (apache#3829)

Add commit step to Gobblin temporal workflow for job publish

* Add codeql analysis

* Make gradle specific

* Add codeql as part of build script

* Initialize codeql

* Use separate workflow for codeql instead with custom build function as autobuild seems to not work

* Add jdk jar for global dependencies script

---------

Co-authored-by: umustafi <umust77@gmail.com>
Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>
Co-authored-by: Arjun <abora@linkedin.com>
Co-authored-by: Tao Qin <35046097+wsarecv@users.noreply.github.com>
Co-authored-by: Tao Qin <tqin@linkedin.com>
Co-authored-by: Hanghang Nate Liu <nate.hanghang.liu@gmail.com>
Co-authored-by: Andy Jiang <andy.jiang99@outlook.com>
Co-authored-by: Kip Kohn <ckohn@linkedin.com>
Co-authored-by: Zihan Li <zihli@linkedin.com>
Co-authored-by: Zihan Li <zihli@zihli-mn2.linkedin.biz>
Co-authored-by: Matthew Ho <mho@linkedin.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants