Skip to content

Commit

Permalink
rpk: add --brokers option to rpk cluster partitions list
Browse files Browse the repository at this point in the history
  • Loading branch information
daisukebe committed Apr 18, 2024
1 parent 5c7bd94 commit c369b34
Showing 1 changed file with 45 additions and 0 deletions.
45 changes: 45 additions & 0 deletions src/go/rpk/pkg/cli/cluster/partitions/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func newListCommand(fs afero.Fs, p *config.Params) *cobra.Command {
all bool
disabledOnly bool
partitions []int
brokers []int
logFallbackOnce sync.Once
)
cmd := &cobra.Command{
Expand Down Expand Up @@ -72,6 +73,10 @@ List all partitions in the cluster.
List all partitions in the cluster, filtering for topic foo and bar.
rpk cluster partitions list foo bar
List partitions which replicas are assigned on specific brokers.
rpk cluster partitions list foo --brokers 1,2
List only the disabled partitions.
rpk cluster partitions list -a --disabled-only
Expand Down Expand Up @@ -150,12 +155,16 @@ List all in json format.
if partitions != nil {
clusterPartitions = filterPartition(clusterPartitions, partitions)
}
if len(brokers) > 0 {
clusterPartitions = filterBroker(clusterPartitions, brokers)
}
printClusterPartitions(f, clusterPartitions)
},
}
cmd.Flags().BoolVarP(&all, "all", "a", false, "If true, list all partitions in the cluster")
cmd.Flags().BoolVar(&disabledOnly, "disabled-only", false, "If true, list disabled partitions only")
cmd.Flags().IntSliceVarP(&partitions, "partition", "p", nil, "List of comma-separated partitions IDs that you wish to filter the results with")
cmd.Flags().IntSliceVarP(&brokers, "brokers", "b", nil, "List of comma-separated broker IDs that you wish to filter the results with")

p.InstallFormatFlag(cmd)
return cmd
Expand Down Expand Up @@ -248,3 +257,39 @@ func filterPartition(cPartitions []adminapi.ClusterPartition, partitions []int)
}
return
}

// filterBroker filters cPartition and returns a slice with only the
// clusterPartitions with the same brokers present in the Replicas slice.
func filterBroker(cPartitions []adminapi.ClusterPartition, brokers []int) (ret []adminapi.ClusterPartition) {
for _, p := range cPartitions {
rob := replicaOnBroker(p.Replicas, brokers)
if rob {
ret = append(ret, p)
}
}
return
}

// replicaOnBroker returns true when all nodes in the brokers slice
// exist in the Replicas slice or the brokers slice is empty.
// Otherwise, this returns false.
func replicaOnBroker(replicas adminapi.Replicas, brokers []int) bool {
if len(brokers) == 0 {
return true
}

foundCount := 0
for _, r := range replicas {
for _, b := range brokers {
if r.NodeID == b {
foundCount++
}
}
}

if foundCount == len(brokers) {
return true
} else {
return false
}
}

0 comments on commit c369b34

Please sign in to comment.