Skip to content

Commit

Permalink
rpk: add 'partitions move' command
Browse files Browse the repository at this point in the history
This commit adds a new 'rpk cluster partitions move' command to
reassign given partition replicas. The command wraps the
'/v1/partitions/{ns}/{topic}/{partition}/replicas' endpoint.
  • Loading branch information
daisukebe committed Nov 15, 2023
1 parent c41a33c commit 67a32e8
Show file tree
Hide file tree
Showing 4 changed files with 428 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/go/rpk/pkg/adminapi/api_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,11 @@ func (a *AdminAPI) Reconfigurations(ctx context.Context) ([]ReconfigurationsResp
var rr []ReconfigurationsResponse
return rr, a.sendAny(ctx, http.MethodGet, "/v1/partitions/reconfigurations", nil, &rr)
}

func (a *AdminAPI) MoveReplicas(ctx context.Context, ns string, topic string, part int, r []Replica) error {
return a.sendToLeader(ctx,
http.MethodPost,
fmt.Sprintf("/v1/partitions/%s/%s/%d/replicas", ns, topic, part),
r,
nil)
}
316 changes: 316 additions & 0 deletions src/go/rpk/pkg/cli/cluster/partitions/move.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package partitions

import (
"fmt"
"math/rand"
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)

func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Command {
var (
ns string
topic string
partitions []string
)
type newAssignment struct {
Namespace string
Topic string
Partition int
NewReplicas []adminapi.Replica
}
cmd := &cobra.Command{
Use: "move",
Short: "Move partition replicas across nodes / cores",
Long: helpAlterAssignments,
Run: func(cmd *cobra.Command, topics []string) {
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "unable to load config: %v", err)
out.CheckExitCloudAdmin(p)

cl, err := adminapi.NewClient(fs, p)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

var (
mu sync.Mutex
newAssignmentList []newAssignment
reqCount int
wg sync.WaitGroup
)
errors := make([]error, 0)
brokerReqs := make(map[int]struct{})
knownNodeCore := make(map[int]int)

// Concurrently parse the requested partitions and
// find current replica assignments
g, egCtx := errgroup.WithContext(cmd.Context())
for _, partition := range partitions {
partition := partition
g.Go(func() error {
if len(topics) > 0 { // foo -p 0:1,2,3
for _, t := range topics {
_, _, part, err := extractNTP(t, partition)
out.MaybeDie(err, "failed to extract topic/partition: %s\n", err)

if nt := strings.Split(t, "/"); len(nt) == 1 {
ns = "kafka"
topic = nt[0]
} else {
ns = nt[0]
topic = nt[1]
}

current, err := cl.GetPartition(egCtx, ns, topic, part)
out.MaybeDie(err, "unable to get partition: %s\n", err)

newReplicas, err := configureReplicas(partition, current.Replicas)
out.MaybeDie(err, "unable to configure new replicas: %v\n", err)

mu.Lock()
newAssignmentList = append(newAssignmentList, newAssignment{
Namespace: ns,
Topic: topic,
Partition: part,
NewReplicas: newReplicas,
})
mu.Unlock()
}
} else { // -p foo/0:1,2,3
ns, topic, part, err := extractNTP("", partition)
out.MaybeDie(err, "failed to extract topic/partition: %s\n", err)

current, err := cl.GetPartition(egCtx, ns, topic, part)
out.MaybeDie(err, "unable to get partition: %s\n", err)

newReplicas, err := configureReplicas(partition, current.Replicas)
out.MaybeDie(err, "unable to configure new replicas: %v\n", err)

mu.Lock()
newAssignmentList = append(newAssignmentList, newAssignment{
Namespace: ns,
Topic: topic,
Partition: part,
NewReplicas: newReplicas,
})
mu.Unlock()
}
return nil
})
}

if err := g.Wait(); err != nil {
out.Die("failed to parse the arguments: %v\n", err)
}

for _, newa := range newAssignmentList {
for _, nr := range newa.NewReplicas {
if nr.Core == -1 {
brokerReqs[nr.NodeID] = struct{}{}
}
}
}
for node := range brokerReqs {
node := node
g.Go(func() error {
broker, err := cl.Broker(cmd.Context(), node)
mu.Lock()
defer mu.Unlock()
knownNodeCore[node] = broker.NumCores
return err
})
}

if err := g.Wait(); err != nil {
out.Die("unable to find core counts", err)
}

rng := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := range newAssignmentList {
newa := newAssignmentList[i]
for j, nr := range newa.NewReplicas {
if nr.Core == -1 {
numCore := knownNodeCore[nr.NodeID]
newa.NewReplicas[j].Core = rng.Intn(numCore)
}
}
wg.Add(1)
go func() {
defer wg.Done()
fmt.Printf("Requested to move %s/%s/%d to %s\n", newa.Namespace, newa.Topic, newa.Partition, formatNodeCore(newa.NewReplicas))
err := cl.MoveReplicas(cmd.Context(), newa.Namespace, newa.Topic, newa.Partition, newa.NewReplicas)
mu.Lock()
defer mu.Unlock()
reqCount = reqCount + 1
if err != nil {
reqCount = reqCount - 1
errors = append(errors, fmt.Errorf("failed to move %s/%s/%d to %s; %v", newa.Namespace, newa.Topic, newa.Partition, formatNodeCore(newa.NewReplicas), err))
}
}()
}
wg.Wait()
if len(errors) > 0 {
fmt.Println()
for _, err := range errors {
fmt.Println(err)
}
}

fmt.Println()
fmt.Printf("Successfully began %d partition movement(s).\n\nCheck the movement status with 'rpk cluster partitions move-status' or see new assignments with 'rpk topic describe -p TOPIC'.\n", reqCount)
},
}
cmd.Flags().StringArrayVarP(&partitions, "partition", "p", nil, "Topic-partitions to move and new replica locations (repeatable)")
cmd.MarkFlagRequired("partition")
return cmd
}

func formatNodeCore(replicas []adminapi.Replica) string {
var formatted string
for i, r := range replicas {
if i == 0 {
formatted = "["
}
formatted = formatted + fmt.Sprintf("Node: %d, Core: %d", r.NodeID, r.Core)
if i != len(replicas)-1 {
formatted = formatted + ", "
}
}
return formatted + "]"
}

var (
ntpRe *regexp.Regexp
ntpReOnce sync.Once
)

// extractNTP parses the partition flag with format; foo/0:1,2,3 or 0:1,2,3.
// It extracts letters before the colon and formats it.
func extractNTP(topic string, ntp string) (ns string, t string, p int, err error) {
ntpReOnce.Do(func() {
ntpRe = regexp.MustCompile(`^((?:[^:]+/)?\d+):.*$`)
})
m := ntpRe.FindStringSubmatch(ntp)
if len(m) == 0 {
return "", "", -1, fmt.Errorf("invalid format for %s", ntp)
}
beforeColon := m[1]
if topic != "" {
p, err = strconv.Atoi(beforeColon)
if err != nil {
return "", "", -1, fmt.Errorf("%s", err)
}
} else if n := strings.Split(beforeColon, "/"); len(n) == 3 {
ns = n[0]
t = n[1]
p, err = strconv.Atoi(n[2])
if err != nil {
return "", "", -1, fmt.Errorf("%s", err)
}
} else if len(n) == 2 {
ns = "kafka"
t = n[0]
p, err = strconv.Atoi(n[1])
if err != nil {
return "", "", -1, fmt.Errorf("%s", err)
}
} else {
return "", "", -1, fmt.Errorf("invalid format for %s", ntp)
}
return ns, t, p, nil
}

var (
replicaRe *regexp.Regexp
replicaReOnce sync.Once
)

// configureReplicas parses the partition flag with format; foo/0:1-0,2-1,3-2 or 0:1,2,3
// It extracts letters after the colon and return as a slice of []adminapi.Replica.
func configureReplicas(partition string, currentReplicas []adminapi.Replica) ([]adminapi.Replica, error) {
replicaReOnce.Do(func() {
replicaRe = regexp.MustCompile(`^[^:]+:(\d+(?:-\d+)?(?:,\d+(?:-\d+)?)*)$`)
})
m := replicaRe.FindStringSubmatch(partition)
if len(m) == 0 {
return nil, fmt.Errorf("invalid format for %s", partition)
}
var newReplicas []adminapi.Replica
for _, nodeCore := range strings.Split(m[1], ",") {
if split := strings.Split(nodeCore, "-"); len(split) == 1 {
node, _ := strconv.Atoi(split[0])
core := findCore(node, currentReplicas)
newReplicas = append(newReplicas, adminapi.Replica{
NodeID: node,
Core: core,
})
} else {
node, _ := strconv.Atoi(split[0])
core, _ := strconv.Atoi(split[1])
newReplicas = append(newReplicas, adminapi.Replica{
NodeID: node,
Core: core,
})
}
}
return newReplicas, nil
}

// findCore finds a shard (CPU core) where an existing replica is
// assigned on. Returns '-1' for a new node.
func findCore(nodeID int, currentReplicas []adminapi.Replica) int {
for _, i := range currentReplicas {
if nodeID == i.NodeID {
return i.Core
}
}
return -1
}

const helpAlterAssignments = `Move partition replicas across nodes / cores.
This command changes replica assignments for given partitions. By default, it
assumes the "kafka" namespace, but you can specify an internal namespace using
the "{namespace}/" prefix.
To move replicas, use the following syntax:
rpk cluster partitions move foo --partition 0:1,2,3 -p 1:2,3,4
Here, the command assigns new replicas for partition 0 to brokers [1, 2, 3] and
for partition 1 to brokers [2, 3, 4] for the topic "foo".
You can also specify the core id with "-{core_id}" where the new replicas
should be placed:
rpk cluster partitions move foo -p 0:1-0,2-0,3-0
Here all new replicas [1, 2, 3] will be assigned on core 0 on the nodes.
The command does not change a "core" assignment unless it is explicitly
specified. When a core is not specified for a new node, the command randomly
picks a core and assign a replica on the core.
Topic arguments are optional. For more control, you can specify the topic name
in the "--partition" flag:
rpk cluster partitions move -p foo/0:1,2,3 -p kafka_internal/tx/0:1-0,2-0,3-0
`
Loading

0 comments on commit 67a32e8

Please sign in to comment.