Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2181] ]Handling for Non transient exceptions #4084

Merged
merged 6 commits into from
Dec 13, 2024

Conversation

vsinghal85
Copy link
Contributor

@vsinghal85 vsinghal85 commented Dec 11, 2024

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
  • Many times we experience failures in flow initialization or processing due to which flow could not be concluded properly
  • Azkaban client exceptions and SQLIntegrityViolation exceptions are examples which have caused failures in recent history
  • Currently most of these failures are by default considered transient exceptions and are retried infinitely
  • As a side effect, it causes flows not to conclude and causes failures in future flow submissions which have caused incidents recently
  • As a first step we want to consider all exceptions as non transient and not retry and remove conclude the flow by removing flowspec and dag action
  • This PR contains the changes to conclude the flow for non transient exceptions and also mark them as failure to reflect the correct status of the flow

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@vsinghal85 vsinghal85 changed the title Handling for Non transient exceptions [GOBBLIN-2181] ]Handling for Non transient exceptions Dec 11, 2024
@vsinghal85 vsinghal85 marked this pull request as ready for review December 11, 2024 15:08
if (jobFailedTimer != null) {
jobFailedTimer.stop(jobMetadata);
// Only mark the job as failed in case of non transient exceptions
if(!DagProcessingEngine.isTransientException(e)){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing spaces
if (!DagProcessingEngine.isTransientException(e)) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if(!DagProcessingEngine.isTransientException(e)){
TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
String message = "Cannot submit job " + DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
log.error(message, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we atleast log the message? Even if it is a transient exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now all exceptions would be considered non transient, still updated as per suggestion

}
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a Mockito.verify for all the tests on dagManagementStateStore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure makes sense, added in all tests

Comment on lines 162 to 163
log.error("Ignoring non transient exception. DagTask {} will conclude and will not be retried. Exception - {} ",
dagTask, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the exception is included as part of the message string instead of passed as a separate parameter, this would log only the exception's toString(), without the full stack trace.

Please update the log statement to pass the exception as a separate parameter to log the stack trace

log.error("Ignoring non-transient exception. DagTask {} will conclude and will not be retried.", dagTask, e);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

@@ -67,6 +70,8 @@ public class DagProcessingEngine extends AbstractIdleService {
public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS = "defaultJobStartDeadlineTimeMillis";
@Getter static long defaultJobStartDeadlineTimeMillis;
public static final String DEFAULT_FLOW_FAILURE_OPTION = FailureOption.FINISH_ALL_POSSIBLE.name();
// Todo Update to fetch list from config once transient exception handling is implemented and retryable exceptions defined
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the convention is to use TODO: , please update from Todo to TODO: for consistency

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

@@ -149,6 +158,13 @@ public void run() {
dagTask.conclude();
log.info(dagProc.contextualizeStatus("concluded dagTask"));
} catch (Exception e) {
if(!DagProcessingEngine.isTransientException(e)) {
log.error("Ignoring non transient exception. DagTask {} will conclude and will not be retried. Exception - {} ",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't see toString() for dagTask, what does dagTask log? It would be useful to have dagId & dagAction in the log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out, I had moved the existing log as it is, updated it now.

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good start!

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nearly there!

Comment on lines 203 to 206
@Test
public void isNonTransientExceptionTest(){
Assert.assertTrue(!DagProcessingEngine.isTransientException(new RuntimeException("Simulating a non retryable exception!")));
Assert.assertTrue(!DagProcessingEngine.isTransientException(new AzkabanClientException("Simulating a retryable exception!")));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could use clarifying comment that there's nothing intrinsically transient or not about these, and in fact it just comes down to config, which is not presently provided.

even better however, would be to actually pass some test config and then demonstrate that the impl accordingly judges some as transient and others not. BTW, in authoring such a test you may find that it's challenging to have isTransientException as a static method, and yet be configuration-driven

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added clarifying statement, probably test config based tests can be added when we actually start maintaining config and we also add tests for transient exceptions handling.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...and make isTransientException non-static to accept config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure makes sense

jobExecutionPlan.getFlowExecutionId(), jobExecutionPlan.getJobName(),
DagActionStore.DagActionType.REEVALUATE);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely an improvement on verification! you forgot Mockito.verifyNoMoreInteractions(DMSS)

(please add to other two tests too)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice work here!

@phet phet merged commit faecb35 into apache:master Dec 13, 2024
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants