-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12342: Reverse module dependency between Raft and Metadata #10705
KAFKA-12342: Reverse module dependency between Raft and Metadata #10705
Conversation
…e-raft-metadata-dep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replay(message, snapshotReader.epoch(), index++); | ||
} | ||
} | ||
snapshotRegistry.createSnapshot(lastCommittedOffset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To minimize the number of changes needed to merge trunk
into this PR, I removed the code that was loading snapshots. This is okay because the quorum controllers are not generating snapshots so there aren't any snapshots to load.
I will add snapshot loading and generating on the controller as part of https://issues.apache.org/jira/browse/KAFKA-12787
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... I would rather not remove this code if possible. In another comment I suggested moving it to the snapshot load function that you also added. That would also allow the associated test to keep working.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. I didn't want do this as part of the merge commit but I'll add it back as a separate commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Added snapshot loading at startup and added back the test for it.
There is some code duplication in the tests that I'll remove as part of https://issues.apache.org/jira/browse/KAFKA-12787
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
Outdated
Show resolved
Hide resolved
@@ -28,6 +28,10 @@ public LeaderAndEpoch(OptionalInt leaderId, int epoch) { | |||
this.epoch = epoch; | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm...
-
What do you think of using
nodeId
rather thanleaderId
? -
There should be an accessor function to get the
nodeId
out. -
If we really need a shortcut method for comparing against specific node IDs like this, we could call it
LeaderAndEpoch#hasNodeId(int nodeId)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I just realized that all the data fields in here are public. Can we fix that? In Java we use public accessors, not public data fields, unless there's some really exceptional reason not to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
Outdated
Show resolved
Hide resolved
@@ -209,136 +196,6 @@ public void testUnregisterBroker() throws Throwable { | |||
} | |||
} | |||
|
|||
static class MockSnapshotWriterBuilder implements Function<Long, SnapshotWriter> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, as I said in the other comments... I'd really like to find a way to keep this test working before we merge this PR.
public void handleNewLeader(MetaLogLeader newLeader) { | ||
if (newLeader.nodeId() == nodeId) { | ||
final long newEpoch = newLeader.epoch(); | ||
public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's some ambiguity in how this API is named. "Handling" a snapshot could mean either writing one or reading it. Judging from the parameter names here, I'm guessing that we want to read the snapshot here.
My understanding is that this API will be called if the standby controller falls behind too far, OR right after it starts up and before any other records are read. Does that match up with your understanding?
If that's true, then we should be able to put the snapshot loading stuff that currently exists (in the constructor) here, with a TODO for implementing snapshot loading AFTER the controller has already been initialized (which will involve zeroing out all the existing data structures and in-memory snapshots...) I think we should do that since I don't want this patch to move us backwards in snapshot support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's true, then we should be able to put the snapshot loading stuff that currently exists (in the constructor) here, with a TODO for implementing snapshot loading AFTER the controller has already been initialized (which will involve zeroing out all the existing data structures and in-memory snapshots...) I think we should do that since I don't want this patch to move us backwards in snapshot support.
We discussed this in another comment thread in this PR but this PR has a partial implementation of snapshot loading in the controller. I am going to implement snapshot re-loading as part of https://issues.apache.org/jira/browse/KAFKA-12787.
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public CompletableFuture<Void> shutdown(int timeoutMs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the intention behind this shutdown
API is to be non-blocking, but this implementation is not non-blocking. Maybe it would be good to add a comment about this?
In general it is not possible to do a non-blocking thread join unless you have a third thread (not the calling thread, not the thread being shut down) which can wait for the blocking thread join operation to complete and then complete a future (or whatever).
That's why there are two shutdown APIs in LocalLogManager: a non-blocking beginShutdown and a blocking close which does all that, plus the thread join. This is a pattern that I use in other places as well. I think it's more useful than returning a future from close, due to the problem I mentioned above. It could be worth considering for RaftClient in the future.
@@ -2261,6 +2260,11 @@ private Long append(int epoch, List<T> records, boolean isAtomic) { | |||
return shutdownComplete; | |||
} | |||
|
|||
@Override | |||
public void resign(int epoch) { | |||
throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be supported, because the controller will resign if it detects certain bugs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. We have an issue for this: https://issues.apache.org/jira/browse/KAFKA-12631
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason, I thought this was already implemented. But you're correct, it is not currently implemented. Maybe we should have this for 3.0, since controller bugs do happen sometimes...
@@ -56,24 +57,17 @@ | |||
void handleSnapshot(SnapshotReader<T> reader); | |||
|
|||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine to combine these APIs, but we need to document what happens if the current leader resigns, but we don't know who the new leader is yet. Do we get passed a LeaderAndEpoch with the current epoch + 1 and a node ID of -1? If so, do we then expect to see another LeaderAndEpoch with the current epoch + 1 and a valid node -1?
In other words, let's say node 0 is the leader and then resigns, and then node 1 becomes the leader. Does it look like this:
handleLeaderChange(LeaderAndEpoch(nodeId=0, epoch=0))
handleLeaderChange(LeaderAndEpoch(nodeId=-1, epoch=1))
handleLeaderChange(LeaderAndEpoch(nodeId=1, epoch=1))
Or would you rather have something like this?
handleLeaderChange(LeaderAndEpoch(nodeId=0, epoch=0))
handleLeaderChange(LeaderAndEpoch(nodeId=-1, epoch=0))
handleLeaderChange(LeaderAndEpoch(nodeId=1, epoch=1))
It seems like the second one will break a lot of invariants, so probably should be avoided. The first one might break some invariants too, though. We'd have to look.
Or you could choose to burn an epoch like this:
handleLeaderChange(LeaderAndEpoch(nodeId=0, epoch=0))
handleLeaderChange(LeaderAndEpoch(nodeId=-1, epoch=1))
handleLeaderChange(LeaderAndEpoch(nodeId=1, epoch=2))
Given that we only have a 31-bit epoch in the first place, that seems unwise, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll update the documentation but in Raft epochs are not guarantee to have a leader. If there is a leader for an epoch then there is one and only one leader. So that means that the client could see.
handleLeaderChange(LeaderAndEpoch(nodeId=0, epoch=0))
handleLeaderChange(LeaderAndEpoch(nodeId=-1, epoch=1))
handleLeaderChange(LeaderAndEpoch(nodeId=1, epoch=1))
Or this for that matter
handleLeaderChange(LeaderAndEpoch(nodeId=0, epoch=0))
handleLeaderChange(LeaderAndEpoch(nodeId=-1, epoch=1))
handleLeaderChange(LeaderAndEpoch(nodeId=-1, epoch=2))
handleLeaderChange(LeaderAndEpoch(nodeId=1, epoch=2))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or you could choose to burn an epoch like this...
This is not how leader election works in Raft. When a leader fails or steps down an epoch starts without a leader, only candidate(s). If leader election succeeds for a given epoch only one leader is guaranteed to be elected for that epoch and will remain leader for the duration of that epoch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like the invariant here is that while handleLeaderChange
may be called multiple times for the same epoch, only the last call will have a valid (non-negative) node id associated with it. Can you add a JavaDoc comment like this to the API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I added a few more sentences to make this clear.
Thanks for the revisions, @jsancio . I left some comments. Looks like there are some conflicts as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…e-allocations-lz4 * apache-github/trunk: (43 commits) KAFKA-12800: Configure generator to fail on trailing JSON tokens (apache#10717) MINOR: clarify message ordering with max in-flight requests and idempotent producer (apache#10690) MINOR: Add log identifier/prefix printing in Log layer static functions (apache#10742) MINOR: update java doc for deprecated methods (apache#10722) MINOR: Fix deprecation warnings in SlidingWindowedCogroupedKStreamImplTest (apache#10703) KAFKA-12499: add transaction timeout verification (apache#10482) KAFKA-12620 Allocate producer ids on the controller (apache#10504) MINOR: Kafka Streams code samples formating unification (apache#10651) KAFKA-12808: Remove Deprecated Methods under StreamsMetrics (apache#10724) KAFKA-12522: Cast SMT should allow null value records to pass through (apache#10375) KAFKA-12820: Upgrade maven-artifact dependency to resolve CVE-2021-26291 HOTFIX: fix checkstyle issue in KAFKA-12697 KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller (apache#10572) KAFKA-12342: Remove MetaLogShim and use RaftClient directly (apache#10705) KAFKA-12779: KIP-740, Clean up public API in TaskId and fix TaskMetadata#taskId() (apache#10735) KAFKA-12814: Remove Deprecated Method StreamsConfig getConsumerConfigs (apache#10737) MINOR: Eliminate redundant functions in LogTest suite (apache#10732) MINOR: Remove unused maxProducerIdExpirationMs parameter in Log constructor (apache#10723) MINOR: Updating files with release 2.7.1 (apache#10660) KAFKA-12809: Remove deprecated methods of Stores factory (apache#10729) ...
This patch removes the temporary shim layer we added to bridge the interface differences between
MetaLogManager
andRaftClient
.handleResign
andhandleNewLeader
APIs into singlehandleLeaderChange
APIMetadataRecordSerde
into :metadataBatchReader
which takes disk reads out of the Raft IO threadMetaLogRaftShim
,MetaLogManager
, andMetaLogListener
Committer Checklist (excluded from commit message)