Skip to content

Commit

Permalink
Merge pull request #15119 from r-vasquez/manual-backport-14862-v23.2.x
Browse files Browse the repository at this point in the history
[ v23.2.x] Manual backport: add rpk cluster partitions list
  • Loading branch information
twmb authored Nov 27, 2023
2 parents 3cbc8bc + e8c2bea commit 06d914d
Show file tree
Hide file tree
Showing 9 changed files with 470 additions and 164 deletions.
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/adminapi/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (a *AdminAPI) sendAny(ctx context.Context, method, path string, body, into

// If err is set, we are retrying after a failure on the previous node
if err != nil {
fmt.Printf("Request error, trying another node: %s\n", err.Error())
zap.L().Warn(fmt.Sprintf("Request error, trying another node: %s", err.Error()))
var httpErr *HTTPResponseError
if errors.As(err, &httpErr) {
status := httpErr.Response.StatusCode
Expand Down
51 changes: 43 additions & 8 deletions src/go/rpk/pkg/adminapi/api_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
"net/http"
)

const partitionsBaseURL = "/v1/cluster/partitions"

// Replica contains the information of a partition replica.
type Replica struct {
NodeID int `json:"node_id"`
Core int `json:"core"`
NodeID int `json:"node_id" yaml:"node_id"`
Core int `json:"core" yaml:"core"`
}

// Partition is the information returned from the Redpanda admin partitions endpoints.
Expand Down Expand Up @@ -58,21 +60,54 @@ type ReconfigurationsResponse struct {
ReconciliationStatuses []Status `json:"reconciliation_statuses"`
}

type ClusterPartition struct {
Ns string `json:"ns" yaml:"ns"`
Topic string `json:"topic" yaml:"topic"`
PartitionID int `json:"partition_id" yaml:"partition_id"`
LeaderID *int `json:"leader_id,omitempty" yaml:"leader_id,omitempty"` // LeaderID may be missing in the response.
Replicas []Replica `json:"replicas" yaml:"replicas"`
Disabled *bool `json:"disabled,omitempty" yaml:"disabled,omitempty"` // Disabled may be discarded if not present.
}

// GetPartition returns detailed partition information.
func (a *AdminAPI) GetPartition(
ctx context.Context, namespace, topic string, partition int,
) (Partition, error) {
var pa Partition
return pa, a.sendAny(
ctx,
http.MethodGet,
fmt.Sprintf("/v1/partitions/%s/%s/%d", namespace, topic, partition),
nil,
&pa)
return pa, a.sendAny(ctx, http.MethodGet, fmt.Sprintf("/v1/partitions/%s/%s/%d", namespace, topic, partition), nil, &pa)
}

// GetTopic returns detailed information of all partitions for a given topic.
func (a *AdminAPI) GetTopic(ctx context.Context, namespace, topic string) ([]Partition, error) {
var pa []Partition
return pa, a.sendAny(ctx, http.MethodGet, fmt.Sprintf("/v1/partitions/%s/%s", namespace, topic), nil, &pa)
}

// Reconfigurations returns the list of ongoing partition reconfigurations.
func (a *AdminAPI) Reconfigurations(ctx context.Context) ([]ReconfigurationsResponse, error) {
var rr []ReconfigurationsResponse
return rr, a.sendAny(ctx, http.MethodGet, "/v1/partitions/reconfigurations", nil, &rr)
}

// AllClusterPartitions returns cluster level metadata of all partitions in a
// cluster. If withInternal is true, internal topics will be returned. If
// disabled is true, only disabled partitions are returned.
func (a *AdminAPI) AllClusterPartitions(ctx context.Context, withInternal, disabled bool) ([]ClusterPartition, error) {
var clusterPartitions []ClusterPartition
partitionsURL := fmt.Sprintf("%v?with_internal=%v", partitionsBaseURL, withInternal)
if disabled {
partitionsURL += "&disabled=true"
}
return clusterPartitions, a.sendAny(ctx, http.MethodGet, partitionsURL, nil, &clusterPartitions)
}

// TopicClusterPartitions returns cluster level metadata of all partitions in
// a given topic. If disabled is true, only disabled partitions are returned.
func (a *AdminAPI) TopicClusterPartitions(ctx context.Context, namespace, topic string, disabled bool) ([]ClusterPartition, error) {
var clusterPartition []ClusterPartition
partitionURL := fmt.Sprintf("%v/%v/%v", partitionsBaseURL, namespace, topic)
if disabled {
partitionURL += "?disabled=true"
}
return clusterPartition, a.sendAny(ctx, http.MethodGet, partitionURL, nil, &clusterPartition)
}
9 changes: 9 additions & 0 deletions src/go/rpk/pkg/cli/cluster/partitions/cancel.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package partitions

import (
Expand Down
9 changes: 9 additions & 0 deletions src/go/rpk/pkg/cli/cluster/partitions/cancel_hidden.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package partitions

import (
Expand Down
Loading

0 comments on commit 06d914d

Please sign in to comment.