From 72afeb57f0da350bd16169c9caf1d47514507cc7 Mon Sep 17 00:00:00 2001 From: "Chase, Justin M" Date: Fri, 24 Feb 2023 09:51:10 -0600 Subject: [PATCH 1/4] use the partition leader from partition info --- src/main/java/org/akhq/models/Partition.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/akhq/models/Partition.java b/src/main/java/org/akhq/models/Partition.java index 31036ab8a..de97e17f3 100644 --- a/src/main/java/org/akhq/models/Partition.java +++ b/src/main/java/org/akhq/models/Partition.java @@ -13,6 +13,7 @@ @Getter @NoArgsConstructor public class Partition { + private Node.Partition leader; private int id; private String topic; private List nodes; @@ -27,7 +28,6 @@ public Partition(String topic, TopicPartitionInfo partitionInfo, List lo this.firstOffset = offsets.getFirstOffset(); this.lastOffset = offsets.getLastOffset(); this.nodes = new ArrayList<>(); - for (org.apache.kafka.common.Node replica : partitionInfo.replicas()) { nodes.add(new Node.Partition( replica, @@ -35,19 +35,22 @@ public Partition(String topic, TopicPartitionInfo partitionInfo, List lo partitionInfo.isr().stream().anyMatch(node -> node.id() == replica.id()) )); } + + org.apache.kafka.common.Node leader = partitionInfo.leader(); + this.leader = new Node.Partition( + leader, + true, + partitionInfo.isr().stream().anyMatch(node -> node.id() == leader.id()) + ); } public Node.Partition getLeader() { - return nodes - .stream() - .filter(Node.Partition::isLeader) - .findFirst() - .orElseThrow(() -> new NoSuchElementException("Leader not found")); + return this.leader; } public long getLogDirSize() { return this.getLogDir().stream() - .filter(logDir -> logDir.getBrokerId() == this.getLeader().getId()) + .filter(logDir -> logDir.getBrokerId() == this.leader.getId()) .map(LogDir::getSize) .reduce(0L, Long::sum); } From a956d5cc30ed960e76089a156529e50e8718d13d Mon Sep 17 00:00:00 2001 From: "Chase, Justin M" Date: Fri, 24 Feb 2023 09:57:47 -0600 Subject: [PATCH 2/4] use the leader found in the replicas if you can --- src/main/java/org/akhq/models/Partition.java | 24 +++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/akhq/models/Partition.java b/src/main/java/org/akhq/models/Partition.java index de97e17f3..740e15eb9 100644 --- a/src/main/java/org/akhq/models/Partition.java +++ b/src/main/java/org/akhq/models/Partition.java @@ -29,19 +29,27 @@ public Partition(String topic, TopicPartitionInfo partitionInfo, List lo this.lastOffset = offsets.getLastOffset(); this.nodes = new ArrayList<>(); for (org.apache.kafka.common.Node replica : partitionInfo.replicas()) { - nodes.add(new Node.Partition( + Node.Partition partition = new Node.Partition( replica, partitionInfo.leader().id() == replica.id(), partitionInfo.isr().stream().anyMatch(node -> node.id() == replica.id()) - )); + ); + + this.nodes.add(partition); + if (partition.isLeader()) { + this.leader = partition; + } } - org.apache.kafka.common.Node leader = partitionInfo.leader(); - this.leader = new Node.Partition( - leader, - true, - partitionInfo.isr().stream().anyMatch(node -> node.id() == leader.id()) - ); + if (this.leader == null) + { + org.apache.kafka.common.Node leader = partitionInfo.leader(); + this.leader = new Node.Partition( + leader, + true, + partitionInfo.isr().stream().anyMatch(node -> node.id() == leader.id()) + ); + } } public Node.Partition getLeader() { From 893a1aeeb5ad3eb730745066c4386a3e84507012 Mon Sep 17 00:00:00 2001 From: "Chase, Justin M" Date: Fri, 24 Feb 2023 12:55:33 -0600 Subject: [PATCH 3/4] null check the leader --- src/main/java/org/akhq/models/Partition.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/akhq/models/Partition.java b/src/main/java/org/akhq/models/Partition.java index 740e15eb9..4d0ecf95a 100644 --- a/src/main/java/org/akhq/models/Partition.java +++ b/src/main/java/org/akhq/models/Partition.java @@ -58,7 +58,7 @@ public Node.Partition getLeader() { public long getLogDirSize() { return this.getLogDir().stream() - .filter(logDir -> logDir.getBrokerId() == this.leader.getId()) + .filter(logDir -> this.leader != null && logDir.getBrokerId() == this.leader.getId()) .map(LogDir::getSize) .reduce(0L, Long::sum); } From 962e0e238b1ef94f98b5d27ca135c47feb36590d Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Sun, 12 Mar 2023 23:11:25 +0100 Subject: [PATCH 4/4] cleap --- src/main/java/org/akhq/models/Partition.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/akhq/models/Partition.java b/src/main/java/org/akhq/models/Partition.java index 4d0ecf95a..97b237da2 100644 --- a/src/main/java/org/akhq/models/Partition.java +++ b/src/main/java/org/akhq/models/Partition.java @@ -34,15 +34,14 @@ public Partition(String topic, TopicPartitionInfo partitionInfo, List lo partitionInfo.leader().id() == replica.id(), partitionInfo.isr().stream().anyMatch(node -> node.id() == replica.id()) ); - + this.nodes.add(partition); if (partition.isLeader()) { this.leader = partition; } } - - if (this.leader == null) - { + + if (this.leader == null) { org.apache.kafka.common.Node leader = partitionInfo.leader(); this.leader = new Node.Partition( leader,