-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: duplicated schemas on rapid elections while continuous produce of records #938
base: main
Are you sure you want to change the base?
Conversation
7b04f19
to
285c950
Compare
52fb34c
to
054efb7
Compare
1fa1052
to
7f657bd
Compare
Coverage reportClick to see where and how coverage changed
This report was generated by python-coverage-comment-action |
49a9fd2
to
c4e58e8
Compare
Please also add the new config |
8183430
to
784a2fa
Compare
784a2fa
to
3f84dff
Compare
9f3d8bc
to
25e6c16
Compare
@@ -1307,7 +1335,7 @@ async def _forward_request_remote( | |||
if auth_header is not None: | |||
headers["Authorization"] = auth_header | |||
|
|||
with async_timeout.timeout(timeout): | |||
async with async_timeout.timeout(timeout): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was wrong since a while
LOG.info("Resetting generation status") | ||
# this is called immediately after the election, we shouldn't reset this | ||
# until a new node its elected aka the other path where a new node its elected | ||
# otherwise this its called at each round and we keep not counting the 5 seconds | ||
# required before the election. | ||
# self._are_we_master = False | ||
self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was called because we were exiting the election loop in the _async_loop
of the master_coordinator.py
. We must keep running the thread/algorithm otherwise we are always electing a new node since that causes a rebalance and the rebalance causes a new election (the rebalance happen because we sent the reset_generation
as a side effect of closing the heartbeat task)
a422f24
to
d7cdd8a
Compare
# why do we need to close? | ||
# we just need to keep running even when the schema registry its ready | ||
# otherwise we cause a rebalance and a new election. This should run until | ||
# karapace is restarted | ||
# if self._sc.ready(): | ||
# break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this question its mainly for @jjaakola-aiven. I inherited the initial implementation from him. I think we shouldn't exit but I wait for him to reply here
2385470
to
0e374a7
Compare
8eb38e1
to
d8067fd
Compare
1. moving the coordinator in a separate thread 2. adding a waiting time between when the master its elected and the master can act. This has been done to avoid rapid elections of master that may produce schemas with different ids. Example of what could happpen without the delay: |--------------------------------------| |Node | Node1 | Node2 | Node3 | |Role | Master | Follower | Follower | |--------------------------------------| Node1 -> Send Message A{id=max(current_ids)} to kafka where the max(current_ids) = 10 --------------------------------------- Node1 its disconnected, the message its still in the producer queue of Node1 --------------------------------------- Node2 its elected master |--------------------------------------| |Node | Node1 | Node2 | Node3 | |Role | Follower | Master | Follower | |--------------------------------------| ---------------------------------------- Node2 produces a message B{id=max(current_ids)} to kafka Because the message A isn't yet delivered to Node2, the max(current_ids) returns still 10. And we have an ID clash. The solution its simple, each master should wait a reasonable high number of milliseconds before acting as a master. So that all the in-flight messages are delivered to kafka + the reasonable delay of the consumer for the master node before noticing that a message has been produced
d8067fd
to
5cb351b
Compare
"/master_available", | ||
callback=self.master_available, | ||
# post because they cannot be cached, need to be sure always gathering the real value of that property | ||
method="POST", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a big fan of this because it violates semantics of POST. Maybe set a Cache-Control: no-store, no-cache, must-revalidate
header instead?
coordinator rewrite
Example of what could happpen without the delay:
The solution its simple, each master should wait a reasonable high number of milliseconds before acting as a master.
So that all the in-flight messages are delivered to kafka + the reasonable delay of the consumer for the master node before noticing that a message has been produced