Skip to content

Commit

Permalink
MINOR: show LogRecoveryState in MetadataShell and fix log message
Browse files Browse the repository at this point in the history
Show the LeaderRecoveryState in MetadataShell.

Fix a case where we were comparing a Byte type with an enum type.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
  • Loading branch information
dengziming authored Mar 21, 2022
1 parent 4c8685e commit d449f85
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 2 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ class Partition(val topicPartition: TopicPartition,
val addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt)
val removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt)

if (partitionState.leaderRecoveryState == LeaderRecoveryState.RECOVERING) {
if (partitionState.leaderRecoveryState == LeaderRecoveryState.RECOVERING.value()) {
stateChangeLogger.info(
s"The topic partition $topicPartition was marked as RECOVERING. Leader log recovery is not implemented. " +
"Marking the topic partition as RECOVERED."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public enum LeaderRecoveryState {
* A special value used to represent that the LeaderRecoveryState field of a
* PartitionChangeRecord didn't change.
*/
private static final byte NO_CHANGE = (byte) -1;
public static final byte NO_CHANGE = (byte) -1;

public static LeaderRecoveryState of(byte value) {
return optionalOf(value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import static org.apache.kafka.metadata.LeaderRecoveryState.NO_CHANGE;

/**
* Maintains the in-memory metadata for the metadata tool.
*/
Expand Down Expand Up @@ -280,6 +282,9 @@ private void handleCommitImpl(MetadataRecordType type, ApiMessage message)
partition.setLeader(record.leader());
partition.setLeaderEpoch(partition.leaderEpoch() + 1);
}
if (record.leaderRecoveryState() != NO_CHANGE) {
partition.setLeaderRecoveryState(record.leaderRecoveryState());
}
partition.setPartitionEpoch(partition.partitionEpoch() + 1);
file.setContents(PartitionRecordJsonConverter.write(partition,
PartitionRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -209,6 +210,12 @@ public void testPartitionChangeRecord() {
partitionChangeRecord.duplicate().setLeader(1),
newPartitionRecord.duplicate().setLeader(1).setLeaderEpoch(1)
);

// Change leader recovery state
checkPartitionChangeRecord(
oldPartitionRecord,
partitionChangeRecord.duplicate().setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value()),
newPartitionRecord.duplicate().setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value()));
}

private void checkPartitionChangeRecord(PartitionRecord oldPartitionRecord,
Expand Down

0 comments on commit d449f85

Please sign in to comment.