-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12342; Merge RaftClient and MetaLogManager interfaces and remove shim #10497
Conversation
cc @jsancio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Partial review. Wanted to provide feedback as soon as possible.
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
Outdated
Show resolved
Hide resolved
} else { | ||
offset = logManager.scheduleWrite(controllerEpoch, result.records()); | ||
offset = raftClient.scheduleAppend(controllerEpoch, result.records()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we file an Jira, if one doesn't already exists, to handle the case of when this is null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have https://issues.apache.org/jira/browse/KAFKA-12158 for this issue.
@hachikuji I think for now we should at least check the return value with offset = Object.requireNonNull(...)
.
@@ -126,10 +130,10 @@ class KafkaRaftManager[T]( | |||
private val dataDir = createDataDir() | |||
private val metadataLog = buildMetadataLog() | |||
private val netChannel = buildNetworkChannel() | |||
private val raftClient = buildRaftClient() | |||
private val raftIoThread = new RaftIoThread(raftClient, threadNamePrefix) | |||
val client: KafkaRaftClient[T] = buildRaftClient() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean to override the return type from RaftClient[T]
to KafkaRaftClient[T]
?
|
||
def kafkaRaftClient: KafkaRaftClient[T] = raftClient | ||
def kafkaRaftClient: KafkaRaftClient[T] = client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that since you added a new method client: RaftClient[T]
to RaftManager[T]
and KafkaRaftManager
overrides it to client: KafkaRaftClient[T]
we should be able to remove this KafkaRaftManager[T]
only public method.
override def run(): Unit = { | ||
try { | ||
apply(reader.next()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is technically correct based on the current implementation of KafkaRaftClient
but is there a reason why the listener is only reading one batch? Also, should it assume that the reader contains at least one batch?
When handleCommit
is fired because of an appendAsLeader
the reader is guarantee to have one batch. When reading from the ReplicatedLog
the batch reader may have more than one batch.
Should this method instead do the following?
override def run(): Unit = {
try {
while(reader.hasNext()) {
apply(reader.next())
}
} finally {
reader.close()
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created https://issues.apache.org/jira/browse/KAFKA-12837 for this suggestion.
private def applyBatch( | ||
records: List[ApiMessageAndVersion] | ||
): Unit = { | ||
val baseOffset = lastMetadataOffset + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. This is minor but does this mean that a baseOffset
of 0 is not possible since lastMetadataOffset
is initialized to 0
? Is this also true for a "regular" Kafka topic partition? Or is this just an side effect of how this test gets constructed.
@@ -200,7 +202,7 @@ public Builder setMetrics(ControllerMetrics controllerMetrics) { | |||
|
|||
@SuppressWarnings("unchecked") | |||
public QuorumController build() throws Exception { | |||
if (logManager == null) { | |||
if (raftClient == null) { | |||
throw new RuntimeException("You must set a metadata log manager."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's change the message for this exception. Maybe "Raft client was not set for the quorum controller"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved in #10705
if (leaderAndEpoch.equals(lastFiredLeaderChange)) { | ||
return false; | ||
} else if (leaderAndEpoch.epoch > lastFiredLeaderChange.epoch) { | ||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. We want to fire this event even if the leader
is Optional.empty()
because we use this event to propagate lost of leadership.
} else if (leaderAndEpoch.epoch > lastFiredLeaderChange.epoch) { | ||
return true; | ||
} else { | ||
return leaderAndEpoch.leaderId.isPresent() && !lastFiredLeaderChange.leaderId.isPresent(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works because there is an invariant that leaderAndEpoch.epoch >= lastFiredLeaderChange.epoch
is always true
, right? Should we document this above this line?
Thinking about it some more, wouldn't this always be true since we know that:
leaderAndEpoch.epoch == lastFiredLeaderChange.epoch
!leaderAndEpoch.equals(lastFiredLeaderChange)
If you agree, I think that we can change this method to just:
return !leaderAndEpoch.equals(lastFiredLeaderChange);
@@ -28,6 +28,10 @@ public LeaderAndEpoch(OptionalInt leaderId, int epoch) { | |||
this.epoch = epoch; | |||
} | |||
|
|||
public boolean isLeader(int nodeId) { | |||
return leaderId.isPresent() && leaderId.getAsInt() == nodeId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor but how about return leaderId.equals(OptionalInt.of(nodeId));
* @param epoch the epoch that the leader is resigning from | ||
*/ | ||
default void handleResign(int epoch) {} | ||
default void beginShutdown() {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's document this method.
|
||
@Override | ||
public void handleNewLeader(MetaLogLeader leader) { | ||
public void handleLeaderChange(LeaderAndEpoch leader) { | ||
appendEvent("handleNewLeader", () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably outside the scope of this PR but it looks like queue
is never read.
This patch removes the temporary shim layer we added to bridge the interface differences between
MetaLogManager
andRaftClient
.handleResign
andhandleNewLeader
APIs into singlehandleLeaderChange
APIMetadataRecordSerde
into :metadataBatchReader
which takes disk reads out of the Raft IO threadMetaLogRaftShim
,MetaLogManager
, andMetaLogListener
Committer Checklist (excluded from commit message)