Skip to content

Commit

Permalink
rpk: add 'partitions move' command
Browse files Browse the repository at this point in the history
This commit adds a new 'rpk cluster partitions move' command to
reassign given partition replicas. The command wraps the
'/v1/partitions/{ns}/{topic}/{partition}/replicas' endpoint.
  • Loading branch information
daisukebe committed Sep 26, 2023
1 parent b580d9a commit f5d8698
Show file tree
Hide file tree
Showing 4 changed files with 415 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/go/rpk/pkg/adminapi/api_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,11 @@ func (a *AdminAPI) Reconfigurations(ctx context.Context) ([]ReconfigurationsResp
var rr []ReconfigurationsResponse
return rr, a.sendAny(ctx, http.MethodGet, "/v1/partitions/reconfigurations", nil, &rr)
}

func (a *AdminAPI) MoveReplicas(ctx context.Context, ntp string, r []Replica) error {
return a.sendToLeader(ctx,
http.MethodPost,
fmt.Sprintf("/v1/partitions/%s/replicas", ntp),
r,
nil)
}
179 changes: 179 additions & 0 deletions src/go/rpk/pkg/cli/cluster/partitions/move.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package partitions

import (
"fmt"
"strconv"
"strings"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
)

func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Command {
var partitions []string
cmd := &cobra.Command{
Use: "move",
Short: "Move partition replicas across nodes / cores",
Long: helpAlterAssignments,
Run: func(cmd *cobra.Command, topics []string) {
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "unable to load config: %v", err)
out.CheckExitCloudAdmin(p)

cl, err := adminapi.NewClient(fs, p)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

for i, topic := range topics {
if !strings.Contains(topic, "/") {
topics[i] = "kafka/" + topic
}
}
for _, partition := range partitions {
if len(topics) > 0 { // foo -p 0:1,2,3
p, replicas, err := parsePartitionReplicas(partition)
out.MaybeDie(err, "unable to execute the request: %v\n", err)
for _, nt := range topics {
err = cl.MoveReplicas(cmd.Context(), nt+"/"+p, replicas)
out.MaybeDie(err, "unable to move partition replicas: %v\n", err)
}
} else { // -p foo/0:1,2,3
ntp, replicas, err := parseTopicPartitionReplicas(partition)
out.MaybeDie(err, "unable to execute the request: %v\n", err)
err = cl.MoveReplicas(cmd.Context(), ntp, replicas)
out.MaybeDie(err, "unable to move partition replicas: %v\n", err)
}
}
fmt.Print("Successfully executed the request(s). Check the status with 'rpk cluster partitions move-status' or see the new assignments with 'rpk topic describe -p TOPIC'.\n")
},
}
cmd.Flags().StringArrayVarP(&partitions, "partition", "p", nil, "Partitions to move and new replica locations (repeatable)")
cmd.MarkFlagRequired("partition")
return cmd
}

// Expected input format is '0:1,2,3' or '1:1-0,2-0,3-0'
// If '0:1,2,3' is passed, returns
//
// - '0'
// - [{NodeID:1},{NodeID:2},{NodeID:3}]
//
// If '1:1-0,2-0,3-0' is passed, returns
//
// - '1'
// - [{NodeID:1,Core:0},{NodeID:2,Core:0},{NodeID:3,Core:0}].
func parsePartitionReplicas(partition string) (string, []adminapi.Replica, error) {
var replicas []adminapi.Replica
pr := strings.Split(partition, ":")
if len(pr) != 2 {
return "", nil, fmt.Errorf("invalid format for %s", partition)
}
if strings.Contains(pr[0], "/") {
return "", nil, fmt.Errorf("invalid format for %s; unexpected \"/\"", partition)
}
replicasTmp := strings.Split(pr[1], ",")
for _, replicaCore := range replicasTmp {
nc := strings.Split(replicaCore, "-")
if len(nc) > 1 {
nodeID, err := strconv.Atoi(nc[0])
if err != nil {
return "", nil, fmt.Errorf("invalid format; %s in %s", nc[0], replicaCore)
}
core, err := strconv.Atoi(nc[1])
if err != nil {
return "", nil, fmt.Errorf("invalid format; %s in %s", nc[1], replicaCore)
}
replicas = append(replicas, adminapi.Replica{
NodeID: nodeID,
Core: core,
})
} else {
nodeID, err := strconv.Atoi(replicaCore)
if err != nil {
return "", nil, fmt.Errorf("invalid format; %s in %s", nc[0], replicaCore)
}
replicas = append(replicas, adminapi.Replica{
NodeID: nodeID,
})
}
}
return pr[0], replicas, nil
}

// Expected input format is 'foo/0:1,2,3' or 'redpanda/controller/0:1-0,2-0,3-0'
// If 'foo/0:1,2,3' is passed, returns
//
// - 'kafka/foo/0'
// - [{NodeID:1,},{NodeID:2},{NodeID:3}]
//
// If 'redpanda/controller/0:1-0,2-0,3-0' is passed, returns
//
// - 'redpanda/controller/0'
// - [{NodeID:1,Core:0},{NodeID:2,Core:0},{NodeID:3,Core:0}].
func parseTopicPartitionReplicas(partition string) (string, []adminapi.Replica, error) {
var replicas []adminapi.Replica
tpr := strings.Split(partition, ":")
if len(tpr) != 2 {
return "", nil, fmt.Errorf("invalid format for %s", partition)
}
if !strings.Contains(tpr[0], "/") {
return "", nil, fmt.Errorf("invalid format for %s; specify the topic name", tpr[0])
}
if strings.Count(tpr[0], "/") == 1 {
tpr[0] = "kafka/" + tpr[0]
}
replicasTmp := strings.Split(tpr[1], ",")
for _, replicaCore := range replicasTmp {
nc := strings.Split(replicaCore, "-")
if len(nc) > 1 {
nodeID, err := strconv.Atoi(nc[0])
if err != nil {
return "", nil, fmt.Errorf("invalid format; %s in %s", nc[0], replicaCore)
}
core, err := strconv.Atoi(nc[1])
if err != nil {
return "", nil, fmt.Errorf("invalid format; %s in %s", nc[1], replicaCore)
}
replicas = append(replicas, adminapi.Replica{
NodeID: nodeID,
Core: core,
})
} else {
nodeID, err := strconv.Atoi(replicaCore)
if err != nil {
return "", nil, fmt.Errorf("invalid format; %s in %s", nc[0], replicaCore)
}
replicas = append(replicas, adminapi.Replica{
NodeID: nodeID,
})
}
}
return tpr[0], replicas, nil
}

const helpAlterAssignments = `Move partition replicas across nodes / cores".
This command changes replica assignments for given partitions:
rpk cluster partitions move foo --partition 0:1,2,3 -p 1:2,3,4
where the command assigns new replicas to [1,2,3] for partition 0 and
to [2,3,4] for partition 1 for topic "foo".
By default, this command assumes the "kafka" namespace, but you can use
a "{namespace}/" to specify internal namespaces.
rpk cluster partitions move kafka_internal/tx --partition 0:1,2,3
You can also specify the core id with "-{core_id}" where the new replicas should be on:
rpk cluster partitions move foo -p 0:1-0,2-0,3-0
where all new replicas [1,2,3] will be assigned on core 0 on the nodes.
If topic(s) isn't set, specify the topic name in the "--partition / -p" option:
rpk cluster partitions move -p foo/0:1,2,3 -p kafka_internal/tx/0:1-0,2-0,3-0
`
Loading

0 comments on commit f5d8698

Please sign in to comment.