From d449f850e1934680a96780c01ac95bfef52eeac7 Mon Sep 17 00:00:00 2001 From: dengziming Date: Tue, 22 Mar 2022 05:33:51 +0800 Subject: [PATCH] MINOR: show LogRecoveryState in MetadataShell and fix log message Show the LeaderRecoveryState in MetadataShell. Fix a case where we were comparing a Byte type with an enum type. Reviewers: Colin P. McCabe --- core/src/main/scala/kafka/cluster/Partition.scala | 2 +- .../org/apache/kafka/metadata/LeaderRecoveryState.java | 2 +- .../java/org/apache/kafka/shell/MetadataNodeManager.java | 5 +++++ .../org/apache/kafka/shell/MetadataNodeManagerTest.java | 7 +++++++ 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 429e73b4d8348..e692db83ea80f 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -549,7 +549,7 @@ class Partition(val topicPartition: TopicPartition, val addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt) val removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt) - if (partitionState.leaderRecoveryState == LeaderRecoveryState.RECOVERING) { + if (partitionState.leaderRecoveryState == LeaderRecoveryState.RECOVERING.value()) { stateChangeLogger.info( s"The topic partition $topicPartition was marked as RECOVERING. Leader log recovery is not implemented. " + "Marking the topic partition as RECOVERED." diff --git a/metadata/src/main/java/org/apache/kafka/metadata/LeaderRecoveryState.java b/metadata/src/main/java/org/apache/kafka/metadata/LeaderRecoveryState.java index 4724e990abbb8..08086751b709d 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/LeaderRecoveryState.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/LeaderRecoveryState.java @@ -36,7 +36,7 @@ public enum LeaderRecoveryState { * A special value used to represent that the LeaderRecoveryState field of a * PartitionChangeRecord didn't change. */ - private static final byte NO_CHANGE = (byte) -1; + public static final byte NO_CHANGE = (byte) -1; public static LeaderRecoveryState of(byte value) { return optionalOf(value) diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java index f7b867a6b01c3..9d4941f8020b6 100644 --- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java +++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java @@ -59,6 +59,8 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; +import static org.apache.kafka.metadata.LeaderRecoveryState.NO_CHANGE; + /** * Maintains the in-memory metadata for the metadata tool. */ @@ -280,6 +282,9 @@ private void handleCommitImpl(MetadataRecordType type, ApiMessage message) partition.setLeader(record.leader()); partition.setLeaderEpoch(partition.leaderEpoch() + 1); } + if (record.leaderRecoveryState() != NO_CHANGE) { + partition.setLeaderRecoveryState(record.leaderRecoveryState()); + } partition.setPartitionEpoch(partition.partitionEpoch() + 1); file.setContents(PartitionRecordJsonConverter.write(partition, PartitionRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString()); diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java index f0cfffb28178b..c580e1d5c27de 100644 --- a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java +++ b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.metadata.UnfenceBrokerRecord; import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.metadata.LeaderRecoveryState; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -209,6 +210,12 @@ public void testPartitionChangeRecord() { partitionChangeRecord.duplicate().setLeader(1), newPartitionRecord.duplicate().setLeader(1).setLeaderEpoch(1) ); + + // Change leader recovery state + checkPartitionChangeRecord( + oldPartitionRecord, + partitionChangeRecord.duplicate().setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value()), + newPartitionRecord.duplicate().setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value())); } private void checkPartitionChangeRecord(PartitionRecord oldPartitionRecord,