-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix bad offset management #597
Fix bad offset management #597
Conversation
Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
Codecov Report
@@ Coverage Diff @@
## master #597 +/- ##
============================================
- Coverage 71.85% 63.62% -8.24%
============================================
Files 72 28 -44
Lines 2587 1207 -1380
Branches 121 0 -121
============================================
- Hits 1859 768 -1091
+ Misses 584 364 -220
+ Partials 144 75 -69
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
/retest |
1 similar comment
/retest |
...knative/eventing/kafka/broker/dispatcher/strategy/UnorderedConsumerRecordOffsetStrategy.java
Outdated
Show resolved
Hide resolved
// If you don't want to lose hours in debugging, please don't remove this FQCNs :) | ||
final Map<io.vertx.kafka.client.common.TopicPartition, io.vertx.kafka.client.consumer.OffsetAndMetadata> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOL 🤣
...knative/eventing/kafka/broker/dispatcher/strategy/UnorderedConsumerRecordOffsetStrategy.java
Outdated
Show resolved
Hide resolved
...knative/eventing/kafka/broker/dispatcher/strategy/UnorderedConsumerRecordOffsetStrategy.java
Outdated
Show resolved
Hide resolved
...knative/eventing/kafka/broker/dispatcher/strategy/UnorderedConsumerRecordOffsetStrategy.java
Outdated
Show resolved
Hide resolved
...knative/eventing/kafka/broker/dispatcher/strategy/UnorderedConsumerRecordOffsetStrategy.java
Outdated
Show resolved
Hide resolved
consumer.commit(Map.of( | ||
key, | ||
new OffsetAndMetadata(shouldAck + 1, "")) | ||
).onSuccess(ignored -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onSuccess
on a new line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The closed paren is right there 😄
...knative/eventing/kafka/broker/dispatcher/strategy/UnorderedConsumerRecordOffsetStrategy.java
Outdated
Show resolved
Hide resolved
...knative/eventing/kafka/broker/dispatcher/strategy/UnorderedConsumerRecordOffsetStrategy.java
Outdated
Show resolved
Hide resolved
...knative/eventing/kafka/broker/dispatcher/strategy/UnorderedConsumerRecordOffsetStrategy.java
Outdated
Show resolved
Hide resolved
// If the commit failed, there are 2 situations: | ||
// * Nobody tried to commit again, so let's just restore the state | ||
// * Somebody committed with an offset greater than this one, so we just discard the error | ||
if (!(this.lastAckedMap.get(key) > shouldAck)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (!(this.lastAckedMap.get(key) > shouldAck)) { | |
if (this.lastAckedMap.get(key) <= shouldAck) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would love to keep it this way, since it's consistent with the above comment
...knative/eventing/kafka/broker/dispatcher/strategy/UnorderedConsumerRecordOffsetStrategy.java
Outdated
Show resolved
Hide resolved
long lastAckedBeforeThisOne = this.lastAckedMap.put(key, shouldAck); | ||
SortedSet<Long> messagesImGoingToAck = this.pendingAcksMap.remove(key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be updated on success or on failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is discarded in case of success, copy pasted in case of failure (if we actually need to restore the state)
if (!(this.lastAckedPerPartition.get(topicPartition) > toAck)) { | ||
this.lastAckedPerPartition.put(topicPartition, lastAckedBeforeThisOne); | ||
this.pendingAcksPerPartition.compute(topicPartition, (k, actual) -> { | ||
if (actual != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this possible since we call mutateStateAndCheckAck
above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is, because somebody could have asked meanwhile to add a new commit and that state could have been populated again
if (!(this.lastAckedPerPartition.get(topicPartition) > toAck)) { | ||
this.lastAckedPerPartition.put(topicPartition, lastAckedBeforeThisOne); | ||
this.pendingAcksPerPartition.compute(topicPartition, (k, actual) -> { | ||
if (actual != null) { | ||
actual.addAll(messagesImGoingToAck); | ||
return actual; | ||
} | ||
return messagesImGoingToAck; | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I comment out these lines, tests are green.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, if we set the state on success, there is no need to rollback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be safer since the state could have changed since lastAckedBeforeThisOne
was calculated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I comment out these lines, tests are green.
How could it be? Lemme check 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes of course it goes green, but if the system crashes after trying to commit, it will restart from the previous ack. Do we care? Otherwise I'll just remove that statement, it just makes the code more complicated
Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
Thank you!
/kind bug
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: pierDipi, slinkydeveloper The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Java tests failed with |
* Fix knative-extensions#380 Signed-off-by: Francesco Guardiani <francescoguard@gmail.com> * Fix Signed-off-by: Francesco Guardiani <francescoguard@gmail.com> * Comments Signed-off-by: Francesco Guardiani <francescoguard@gmail.com> * Comments Signed-off-by: Francesco Guardiani <francescoguard@gmail.com> * Removed complicated code which was practically useless Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
…ns#597) * Periodically resync consumer groups to downscale statefulset Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> * Autoscaler scale down delay Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> * Scale machineset for rekt tests Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> * Increase is ready timeout Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> * Skip namespace cleanup so that we can more must gather info Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> --------- Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com>
Signed-off-by: Francesco Guardiani francescoguard@gmail.com
Fixes #380
Proposed Changes
Release Note
Docs