-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow continue on errors #102
Conversation
This PR contains a retry mechanism in case of a failure. |
@@ -137,7 +127,7 @@ public Connector(String name, String xmlDef, int batchSize, int duration, int re | |||
.setPattern(streamName) | |||
.setBatchSize(batchSize) | |||
.setDuration(duration) | |||
.setFailurePolicy(FailurePolicy.RETRY) | |||
.setFailurePolicy(FailurePolicy.CONTINUE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be consider a breaking, we should allow to set it with register the connector and the default should be RETRY.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about it now, you should probably change it to ABORT if the retry mechanism is enable no? If the retry mechanism is enable then you will not raise an exception and will just set the error stream, so if there is an exception raised it must be a bug in the code no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, you're right. Those kind of errors are not supposed to happen in prod. However, we probably want to skip over those just to be safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can set it to continue if retryInterval < 0, otherwise retry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added an option to use DLQ in case of non connection related errors. Changed mode back to RETRY
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just missed where we use it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I moved it to separate exceptions and forgot to put it back in. Just did
lastStreamId = null; | ||
cause = e; | ||
|
||
retry(record.subList(lastCommittedIdx + 1, record.size())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should do the retry only if it was requested when register the connector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added an option to use DLQ
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not see where you use this new value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I moved it to separate exceptions and forgot to put it back in. Just did
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
Some small technical comments.
One design comment is that I believe this retry mechanism should be allow to enable/disable when we register the connector. By default it should be disabled to avoid breaking changes. I would also like to see some tests that cover this new functionality. And I see that our CI is not running, maybe @chayim can help with that.
|
||
} | ||
catch (Exception e) { | ||
msg = String.format("Failed committing transaction error='%s'", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there is no really need to set the message here right? And then no need to set it to NULL after? Also I do not see where you check the errorsToDLQ
value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I moved it to separate exceptions and forgot to put it back in. Just did
transaction.commit(); | ||
session.clear(); | ||
} catch (Exception e) { | ||
String msg = String.format("Failed retrying transaction error='%s'", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if you fail here because of connection error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few last comments
No description provided.